-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Implement PreparedStmt on the Server side #16764
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: force_ci/prepstm
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR implements server-side prepared statement functionality for the IoTDB relational model, enabling SQL statement preparation, execution, and deallocation. The implementation includes AST caching to avoid re-parsing on execution and memory management through the CoordinatorMemoryManager.
- Adds grammar support for PREPARE, EXECUTE, EXECUTE IMMEDIATE, and DEALLOCATE statements
- Implements AST caching and memory tracking for prepared statements with session-scoped storage
- Integrates parameter binding for prepared statements with proper validation
Reviewed changes
Copilot reviewed 26 out of 26 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
| RelationalSql.g4 | Adds ANTLR grammar rules for prepared statement syntax |
| AstBuilder.java | Implements visitor methods to build AST nodes for prepared statements |
| Prepare.java, Execute.java, ExecuteImmediate.java, Deallocate.java | New AST node classes representing prepared statement operations |
| AstVisitor.java | Adds visitor methods for new prepared statement AST nodes |
| ParameterExtractor.java | Utility for extracting and binding parameters in prepared statements |
| AstMemoryEstimator.java | Estimates memory usage of AST nodes for memory management |
| PreparedStatementMemoryManager.java | Manages memory allocation/deallocation for prepared statements |
| PrepareTask.java, DeallocateTask.java | Config tasks for executing PREPARE and DEALLOCATE operations |
| TableConfigTaskVisitor.java | Routes prepared statement AST nodes to appropriate config tasks |
| Coordinator.java | Integrates prepared statement execution logic with parameter binding |
| TableModelPlanner.java | Updated to accept parameters for prepared statement execution |
| StatementAnalyzer.java | Re-enables parameter support in LIMIT/OFFSET clauses |
| SessionManager.java | Releases prepared statement memory on session close |
| BaseServerContextHandler.java | Handles session cleanup on TCP connection loss |
| IClientSession.java, ClientSession.java, etc. | Adds prepared statement storage to session implementations |
| PreparedStatementInfo.java | Data class storing prepared statement metadata |
| IoTDBPreparedStatementIT.java | Integration tests for prepared statement functionality |
| ClusterTestStatement.java | Documentation for PreparedStatement routing considerations |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| Statement writeStatement; | ||
| if (connection instanceof ClusterTestConnection) { | ||
| // Use write connection directly for PreparedStatement queries | ||
| writeStatement = | ||
| ((ClusterTestConnection) connection) | ||
| .writeConnection | ||
| .getUnderlyingConnection() | ||
| .createStatement(); | ||
| } else { | ||
| writeStatement = statement; | ||
| } | ||
|
|
||
| try (ResultSet resultSet = writeStatement.executeQuery(executeSql)) { | ||
| ResultSetMetaData metaData = resultSet.getMetaData(); | ||
|
|
||
| // Verify header | ||
| assertEquals(expectedHeader.length, metaData.getColumnCount()); | ||
| for (int i = 1; i <= metaData.getColumnCount(); i++) { | ||
| assertEquals(expectedHeader[i - 1], metaData.getColumnName(i)); | ||
| } | ||
|
|
||
| // Verify data | ||
| int cnt = 0; | ||
| while (resultSet.next()) { | ||
| StringBuilder builder = new StringBuilder(); | ||
| for (int i = 1; i <= expectedHeader.length; i++) { | ||
| builder.append(resultSet.getString(i)).append(","); | ||
| } | ||
| assertEquals(expectedRetArray[cnt], builder.toString()); | ||
| cnt++; | ||
| } | ||
| assertEquals(expectedRetArray.length, cnt); | ||
| } | ||
| } |
Copilot
AI
Nov 24, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This Statement is not always closed on method exit.
| Statement writeStatement; | |
| if (connection instanceof ClusterTestConnection) { | |
| // Use write connection directly for PreparedStatement queries | |
| writeStatement = | |
| ((ClusterTestConnection) connection) | |
| .writeConnection | |
| .getUnderlyingConnection() | |
| .createStatement(); | |
| } else { | |
| writeStatement = statement; | |
| } | |
| try (ResultSet resultSet = writeStatement.executeQuery(executeSql)) { | |
| ResultSetMetaData metaData = resultSet.getMetaData(); | |
| // Verify header | |
| assertEquals(expectedHeader.length, metaData.getColumnCount()); | |
| for (int i = 1; i <= metaData.getColumnCount(); i++) { | |
| assertEquals(expectedHeader[i - 1], metaData.getColumnName(i)); | |
| } | |
| // Verify data | |
| int cnt = 0; | |
| while (resultSet.next()) { | |
| StringBuilder builder = new StringBuilder(); | |
| for (int i = 1; i <= expectedHeader.length; i++) { | |
| builder.append(resultSet.getString(i)).append(","); | |
| } | |
| assertEquals(expectedRetArray[cnt], builder.toString()); | |
| cnt++; | |
| } | |
| assertEquals(expectedRetArray.length, cnt); | |
| } | |
| } | |
| if (connection instanceof ClusterTestConnection) { | |
| // Use write connection directly for PreparedStatement queries | |
| try (Statement writeStatement = | |
| ((ClusterTestConnection) connection) | |
| .writeConnection | |
| .getUnderlyingConnection() | |
| .createStatement(); | |
| ResultSet resultSet = writeStatement.executeQuery(executeSql)) { | |
| ResultSetMetaData metaData = resultSet.getMetaData(); | |
| // Verify header | |
| assertEquals(expectedHeader.length, metaData.getColumnCount()); | |
| for (int i = 1; i <= metaData.getColumnCount(); i++) { | |
| assertEquals(expectedHeader[i - 1], metaData.getColumnName(i)); | |
| } | |
| // Verify data | |
| int cnt = 0; | |
| while (resultSet.next()) { | |
| StringBuilder builder = new StringBuilder(); | |
| for (int i = 1; i <= expectedHeader.length; i++) { | |
| builder.append(resultSet.getString(i)).append(","); | |
| } | |
| assertEquals(expectedRetArray[cnt], builder.toString()); | |
| cnt++; | |
| } | |
| assertEquals(expectedRetArray.length, cnt); | |
| } | |
| } else { | |
| try (ResultSet resultSet = statement.executeQuery(executeSql)) { | |
| ResultSetMetaData metaData = resultSet.getMetaData(); | |
| // Verify header | |
| assertEquals(expectedHeader.length, metaData.getColumnCount()); | |
| for (int i = 1; i <= metaData.getColumnCount(); i++) { | |
| assertEquals(expectedHeader[i - 1], metaData.getColumnName(i)); | |
| } | |
| // Verify data | |
| int cnt = 0; | |
| while (resultSet.next()) { | |
| StringBuilder builder = new StringBuilder(); | |
| for (int i = 1; i <= expectedHeader.length; i++) { | |
| builder.append(resultSet.getString(i)).append(","); | |
| } | |
| assertEquals(expectedRetArray[cnt], builder.toString()); | |
| cnt++; | |
| } | |
| assertEquals(expectedRetArray.length, cnt); | |
| } | |
| } |
JackieTien97
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if user only call PrepareStatement.close?(which will finally call TSStatus closeOperation(TSCloseOperationReq req) ) Did you release the Ast and memory for that case?
| @Override | ||
| public void addPreparedStatement(String statementName, PreparedStatementInfo info) { | ||
| preparedStatements.put(statementName, info); | ||
| } | ||
|
|
||
| @Override | ||
| public PreparedStatementInfo removePreparedStatement(String statementName) { | ||
| return preparedStatements.remove(statementName); | ||
| } | ||
|
|
||
| @Override | ||
| public PreparedStatementInfo getPreparedStatement(String statementName) { | ||
| return preparedStatements.get(statementName); | ||
| } | ||
|
|
||
| @Override | ||
| public Set<String> getPreparedStatementNames() { | ||
| return preparedStatements.keySet(); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
throw execption here, mqtt is for write not for query.
| // Map from statement name to PreparedStatementInfo | ||
| private final Map<String, PreparedStatementInfo> preparedStatements = new ConcurrentHashMap<>(); | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| // Map from statement name to PreparedStatementInfo | |
| private final Map<String, PreparedStatementInfo> preparedStatements = new ConcurrentHashMap<>(); |
|
|
||
| @Override | ||
| public void addPreparedStatement(String statementName, PreparedStatementInfo info) { | ||
| preparedStatements.put(statementName, info); | ||
| } | ||
|
|
||
| @Override | ||
| public PreparedStatementInfo removePreparedStatement(String statementName) { | ||
| return preparedStatements.remove(statementName); | ||
| } | ||
|
|
||
| @Override | ||
| public PreparedStatementInfo getPreparedStatement(String statementName) { | ||
| return preparedStatements.get(statementName); | ||
| } | ||
|
|
||
| @Override | ||
| public Set<String> getPreparedStatementNames() { | ||
| return preparedStatements.keySet(); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
throw exception here, InternalClientSession should never call these methods, they're used for internal query and write which will never use prepare and execute
| IClientSession session = getSessionManager().getCurrSession(); | ||
|
|
||
| // Release session resources (including PreparedStatement memory) | ||
| // This handles TCP connection loss scenarios | ||
| if (session != null) { | ||
| try { | ||
| getSessionManager().closeSession(session, Coordinator.getInstance()::cleanupQueryExecution); | ||
| } catch (Exception e) { | ||
| logger.warn( | ||
| "Failed to close session during TCP connection disconnect: {}", e.getMessage(), e); | ||
| } | ||
| } | ||
|
|
||
| // Remove the session from the current thread | ||
| getSessionManager().removeCurrSession(); | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why did you do this change?
ClientRPCServiceImpl.handleClientExit has already called closeSession(req).
Implement PreparedStmt on the Server side, including AST cache and memory management.