Skip to content

Conversation

@Young-Leo
Copy link
Contributor

Implement PreparedStmt on the Server side, including AST cache and memory management.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR implements server-side prepared statement functionality for the IoTDB relational model, enabling SQL statement preparation, execution, and deallocation. The implementation includes AST caching to avoid re-parsing on execution and memory management through the CoordinatorMemoryManager.

  • Adds grammar support for PREPARE, EXECUTE, EXECUTE IMMEDIATE, and DEALLOCATE statements
  • Implements AST caching and memory tracking for prepared statements with session-scoped storage
  • Integrates parameter binding for prepared statements with proper validation

Reviewed changes

Copilot reviewed 26 out of 26 changed files in this pull request and generated 1 comment.

Show a summary per file
File Description
RelationalSql.g4 Adds ANTLR grammar rules for prepared statement syntax
AstBuilder.java Implements visitor methods to build AST nodes for prepared statements
Prepare.java, Execute.java, ExecuteImmediate.java, Deallocate.java New AST node classes representing prepared statement operations
AstVisitor.java Adds visitor methods for new prepared statement AST nodes
ParameterExtractor.java Utility for extracting and binding parameters in prepared statements
AstMemoryEstimator.java Estimates memory usage of AST nodes for memory management
PreparedStatementMemoryManager.java Manages memory allocation/deallocation for prepared statements
PrepareTask.java, DeallocateTask.java Config tasks for executing PREPARE and DEALLOCATE operations
TableConfigTaskVisitor.java Routes prepared statement AST nodes to appropriate config tasks
Coordinator.java Integrates prepared statement execution logic with parameter binding
TableModelPlanner.java Updated to accept parameters for prepared statement execution
StatementAnalyzer.java Re-enables parameter support in LIMIT/OFFSET clauses
SessionManager.java Releases prepared statement memory on session close
BaseServerContextHandler.java Handles session cleanup on TCP connection loss
IClientSession.java, ClientSession.java, etc. Adds prepared statement storage to session implementations
PreparedStatementInfo.java Data class storing prepared statement metadata
IoTDBPreparedStatementIT.java Integration tests for prepared statement functionality
ClusterTestStatement.java Documentation for PreparedStatement routing considerations

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 99 to 132
Statement writeStatement;
if (connection instanceof ClusterTestConnection) {
// Use write connection directly for PreparedStatement queries
writeStatement =
((ClusterTestConnection) connection)
.writeConnection
.getUnderlyingConnection()
.createStatement();
} else {
writeStatement = statement;
}

try (ResultSet resultSet = writeStatement.executeQuery(executeSql)) {
ResultSetMetaData metaData = resultSet.getMetaData();

// Verify header
assertEquals(expectedHeader.length, metaData.getColumnCount());
for (int i = 1; i <= metaData.getColumnCount(); i++) {
assertEquals(expectedHeader[i - 1], metaData.getColumnName(i));
}

// Verify data
int cnt = 0;
while (resultSet.next()) {
StringBuilder builder = new StringBuilder();
for (int i = 1; i <= expectedHeader.length; i++) {
builder.append(resultSet.getString(i)).append(",");
}
assertEquals(expectedRetArray[cnt], builder.toString());
cnt++;
}
assertEquals(expectedRetArray.length, cnt);
}
}
Copy link

Copilot AI Nov 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This Statement is not always closed on method exit.

Suggested change
Statement writeStatement;
if (connection instanceof ClusterTestConnection) {
// Use write connection directly for PreparedStatement queries
writeStatement =
((ClusterTestConnection) connection)
.writeConnection
.getUnderlyingConnection()
.createStatement();
} else {
writeStatement = statement;
}
try (ResultSet resultSet = writeStatement.executeQuery(executeSql)) {
ResultSetMetaData metaData = resultSet.getMetaData();
// Verify header
assertEquals(expectedHeader.length, metaData.getColumnCount());
for (int i = 1; i <= metaData.getColumnCount(); i++) {
assertEquals(expectedHeader[i - 1], metaData.getColumnName(i));
}
// Verify data
int cnt = 0;
while (resultSet.next()) {
StringBuilder builder = new StringBuilder();
for (int i = 1; i <= expectedHeader.length; i++) {
builder.append(resultSet.getString(i)).append(",");
}
assertEquals(expectedRetArray[cnt], builder.toString());
cnt++;
}
assertEquals(expectedRetArray.length, cnt);
}
}
if (connection instanceof ClusterTestConnection) {
// Use write connection directly for PreparedStatement queries
try (Statement writeStatement =
((ClusterTestConnection) connection)
.writeConnection
.getUnderlyingConnection()
.createStatement();
ResultSet resultSet = writeStatement.executeQuery(executeSql)) {
ResultSetMetaData metaData = resultSet.getMetaData();
// Verify header
assertEquals(expectedHeader.length, metaData.getColumnCount());
for (int i = 1; i <= metaData.getColumnCount(); i++) {
assertEquals(expectedHeader[i - 1], metaData.getColumnName(i));
}
// Verify data
int cnt = 0;
while (resultSet.next()) {
StringBuilder builder = new StringBuilder();
for (int i = 1; i <= expectedHeader.length; i++) {
builder.append(resultSet.getString(i)).append(",");
}
assertEquals(expectedRetArray[cnt], builder.toString());
cnt++;
}
assertEquals(expectedRetArray.length, cnt);
}
} else {
try (ResultSet resultSet = statement.executeQuery(executeSql)) {
ResultSetMetaData metaData = resultSet.getMetaData();
// Verify header
assertEquals(expectedHeader.length, metaData.getColumnCount());
for (int i = 1; i <= metaData.getColumnCount(); i++) {
assertEquals(expectedHeader[i - 1], metaData.getColumnName(i));
}
// Verify data
int cnt = 0;
while (resultSet.next()) {
StringBuilder builder = new StringBuilder();
for (int i = 1; i <= expectedHeader.length; i++) {
builder.append(resultSet.getString(i)).append(",");
}
assertEquals(expectedRetArray[cnt], builder.toString());
cnt++;
}
assertEquals(expectedRetArray.length, cnt);
}
}

Copilot uses AI. Check for mistakes.
Copy link
Contributor

@JackieTien97 JackieTien97 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if user only call PrepareStatement.close?(which will finally call TSStatus closeOperation(TSCloseOperationReq req) ) Did you release the Ast and memory for that case?

Comment on lines 85 to 103
@Override
public void addPreparedStatement(String statementName, PreparedStatementInfo info) {
preparedStatements.put(statementName, info);
}

@Override
public PreparedStatementInfo removePreparedStatement(String statementName) {
return preparedStatements.remove(statementName);
}

@Override
public PreparedStatementInfo getPreparedStatement(String statementName) {
return preparedStatements.get(statementName);
}

@Override
public Set<String> getPreparedStatementNames() {
return preparedStatements.keySet();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throw execption here, mqtt is for write not for query.

Comment on lines 33 to 35
// Map from statement name to PreparedStatementInfo
private final Map<String, PreparedStatementInfo> preparedStatements = new ConcurrentHashMap<>();

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Map from statement name to PreparedStatementInfo
private final Map<String, PreparedStatementInfo> preparedStatements = new ConcurrentHashMap<>();

Comment on lines 94 to 113

@Override
public void addPreparedStatement(String statementName, PreparedStatementInfo info) {
preparedStatements.put(statementName, info);
}

@Override
public PreparedStatementInfo removePreparedStatement(String statementName) {
return preparedStatements.remove(statementName);
}

@Override
public PreparedStatementInfo getPreparedStatement(String statementName) {
return preparedStatements.get(statementName);
}

@Override
public Set<String> getPreparedStatementNames() {
return preparedStatements.keySet();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throw exception here, InternalClientSession should never call these methods, they're used for internal query and write which will never use prepare and execute

Comment on lines 75 to 90
IClientSession session = getSessionManager().getCurrSession();

// Release session resources (including PreparedStatement memory)
// This handles TCP connection loss scenarios
if (session != null) {
try {
getSessionManager().closeSession(session, Coordinator.getInstance()::cleanupQueryExecution);
} catch (Exception e) {
logger.warn(
"Failed to close session during TCP connection disconnect: {}", e.getMessage(), e);
}
}

// Remove the session from the current thread
getSessionManager().removeCurrSession();

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why did you do this change?

ClientRPCServiceImpl.handleClientExit has already called closeSession(req).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants