Skip to content
Open
Show file tree
Hide file tree
Changes from 23 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -761,6 +761,68 @@ public void testConcurrentAutoCreateAndDropColumn() throws Exception {
}
}

@Test
public void testTableObjectCheck() throws Exception {
try (final Connection connection =
EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT);
final Statement statement = connection.createStatement()) {
statement.execute("create database db2");
statement.execute("use db2");
statement.execute("create table \".\" ()");

try {
statement.execute("alter table \".\" add column a object");
fail();
} catch (final SQLException e) {
Assert.assertEquals(
"701: When there are object fields, the tableName . shall not be '.', '..' or contain './', '.\\'",
e.getMessage());
}

try {
statement.execute("create table test (\"./\" object)");
fail();
} catch (final SQLException e) {
Assert.assertEquals(
"701: When there are object fields, the objectName ./ shall not be '.', '..' or contain './', '.\\'",
e.getMessage());
}

statement.execute("create table test (a tag, b attribute, c int32, d object)");
try {
statement.execute("insert into test (a, b, c) values ('.\\', 1, 1)");
fail();
} catch (final SQLException e) {
Assert.assertEquals(
"507: When there are object fields, the deviceId [.\\] shall not be '.', '..' or contain './', '.\\'",
e.getMessage());
}

// Test cache
TestUtils.restartCluster(EnvFactory.getEnv());

try {
statement.execute("insert into test (a, b, c) values ('.\\', 1, 1)");
fail();
} catch (final SQLException e) {
Assert.assertEquals(
"507: When there are object fields, the deviceId [.\\] shall not be '.', '..' or contain './', '.\\'",
e.getMessage());
}

statement.execute("alter table test drop column d");
statement.execute("insert into test (a, b, c) values ('.\\', 1, 1)");
try {
statement.execute("alter table test add column d object");
fail();
} catch (final SQLException e) {
Assert.assertEquals(
"701: When there are object fields, the tag value .\\ shall not be '.', '..' or contain './', '.\\'",
e.getMessage());
}
}
}

@Test
public void testTreeViewTable() throws Exception {
try (final Connection connection = EnvFactory.getEnv().getConnection();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ public enum CnToDnAsyncRequestType {
DELETE_DATA_FOR_TABLE_DEVICE,
DELETE_TABLE_DEVICE_IN_BLACK_LIST,
DETECT_TREE_DEVICE_VIEW_FIELD_TYPE,
CHECK_DEVICE_ID_FOR_OBJECT,

// audit log and event write-back
INSERT_RECORD,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.iotdb.mpp.rpc.thrift.TActiveTriggerInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TAlterEncodingCompressorReq;
import org.apache.iotdb.mpp.rpc.thrift.TAlterViewReq;
import org.apache.iotdb.mpp.rpc.thrift.TCheckDeviceIdForObjectReq;
import org.apache.iotdb.mpp.rpc.thrift.TCheckSchemaRegionUsingTemplateReq;
import org.apache.iotdb.mpp.rpc.thrift.TCheckTimeSeriesExistenceReq;
import org.apache.iotdb.mpp.rpc.thrift.TCleanDataNodeCacheReq;
Expand Down Expand Up @@ -462,6 +463,11 @@ protected void initActionMapBuilder() {
client.deleteTableDeviceInBlackList(
(TTableDeviceDeletionWithPatternOrModReq) req,
(DataNodeTSStatusRPCHandler) handler));
actionMapBuilder.put(
CnToDnAsyncRequestType.CHECK_DEVICE_ID_FOR_OBJECT,
(req, client, handler) ->
client.checkDeviceIdForObject(
(TCheckDeviceIdForObjectReq) req, (DataNodeTSStatusRPCHandler) handler));
actionMapBuilder.put(
CnToDnAsyncRequestType.DETECT_TREE_DEVICE_VIEW_FIELD_TYPE,
(req, client, handler) ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ public static DataNodeAsyncRequestRPCHandler<?> buildHandler(
case INVALIDATE_MATCHED_TABLE_DEVICE_CACHE:
case DELETE_DATA_FOR_TABLE_DEVICE:
case DELETE_TABLE_DEVICE_IN_BLACK_LIST:
case CHECK_DEVICE_ID_FOR_OBJECT:
default:
return new DataNodeTSStatusRPCHandler(
requestType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@
import org.apache.iotdb.rpc.TSStatusCode;

import org.apache.tsfile.annotations.TableModel;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
Expand All @@ -123,6 +124,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -1331,9 +1333,14 @@ public synchronized Pair<TSStatus, TsTable> tableColumnCheckForColumnExtension(
columnSchemaList.stream()
.map(TsTableColumnSchema::getColumnName)
.collect(Collectors.joining(", ")));

final AtomicBoolean hasObject = new AtomicBoolean(false);
columnSchemaList.removeIf(
columnSchema -> {
if (Objects.isNull(originalTable.getColumnSchema(columnSchema.getColumnName()))) {
if (columnSchema.getDataType().equals(TSDataType.OBJECT)) {
hasObject.set(true);
}
expandedTable.addColumnSchema(columnSchema);
return false;
}
Expand All @@ -1343,6 +1350,10 @@ public synchronized Pair<TSStatus, TsTable> tableColumnCheckForColumnExtension(
if (columnSchemaList.isEmpty()) {
return new Pair<>(RpcUtils.getStatus(TSStatusCode.COLUMN_ALREADY_EXISTS, errorMsg), null);
}

if (hasObject.get()) {
expandedTable.checkTableNameAndObjectNames4Object();
}
return new Pair<>(RpcUtils.SUCCESS_STATUS, expandedTable);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@
import org.apache.iotdb.confignode.procedure.impl.StateMachineProcedure;
import org.apache.iotdb.confignode.procedure.impl.schema.DataNodeTSStatusTaskExecutor;
import org.apache.iotdb.confignode.procedure.impl.schema.SchemaUtils;
import org.apache.iotdb.rpc.TSStatusCode;

import org.apache.tsfile.external.commons.lang3.function.TriFunction;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -41,6 +43,8 @@
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -187,6 +191,10 @@ public void deserialize(final ByteBuffer byteBuffer) {
protected class TableRegionTaskExecutor<Q> extends DataNodeTSStatusTaskExecutor<Q> {

private final String taskName;
private final Map<TDataNodeLocation, TSStatus> failureMap = new HashMap<>();
private final TriFunction<
TConsensusGroupId, Set<TDataNodeLocation>, Map<TDataNodeLocation, TSStatus>, Exception>
exceptionGenerator;

protected TableRegionTaskExecutor(
final String taskName,
Expand All @@ -196,26 +204,84 @@ protected TableRegionTaskExecutor(
final BiFunction<TDataNodeLocation, List<TConsensusGroupId>, Q> dataNodeRequestGenerator) {
super(env, targetRegionGroup, false, dataNodeRequestType, dataNodeRequestGenerator);
this.taskName = taskName;
this.exceptionGenerator = this::getThrowable;
}

protected TableRegionTaskExecutor(
final String taskName,
final ConfigNodeProcedureEnv env,
final Map<TConsensusGroupId, TRegionReplicaSet> targetRegionGroup,
final CnToDnAsyncRequestType dataNodeRequestType,
final BiFunction<TDataNodeLocation, List<TConsensusGroupId>, Q> dataNodeRequestGenerator,
final TriFunction<
TConsensusGroupId,
Set<TDataNodeLocation>,
Map<TDataNodeLocation, TSStatus>,
Exception>
exceptionGenerator) {
super(env, targetRegionGroup, false, dataNodeRequestType, dataNodeRequestGenerator);
this.taskName = taskName;
this.exceptionGenerator = exceptionGenerator;
}

@Override
protected List<TConsensusGroupId> processResponseOfOneDataNode(
final TDataNodeLocation dataNodeLocation,
final List<TConsensusGroupId> consensusGroupIdList,
final TSStatus response) {
final List<TConsensusGroupId> failedRegionList = new ArrayList<>();
if (response.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return failedRegionList;
}

if (response.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
final List<TSStatus> subStatus = response.getSubStatus();
for (int i = 0; i < subStatus.size(); i++) {
if (subStatus.get(i).getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
failedRegionList.add(consensusGroupIdList.get(i));
}
}
} else {
failedRegionList.addAll(consensusGroupIdList);
}
if (!failedRegionList.isEmpty()) {
failureMap.put(dataNodeLocation, response);
} else {
failureMap.remove(dataNodeLocation);
}
return failedRegionList;
}

@Override
protected void onAllReplicasetFailure(
final TConsensusGroupId consensusGroupId,
final Set<TDataNodeLocation> dataNodeLocationSet) {
final Exception e =
exceptionGenerator.apply(consensusGroupId, dataNodeLocationSet, failureMap);
setFailure(
new ProcedureException(
new MetadataException(
String.format(
"[%s] for %s.%s failed when [%s] because failed to execute in all replicaset of %s %s. Failure nodes: %s",
this.getClass().getSimpleName(),
database,
tableName,
taskName,
consensusGroupId.type,
consensusGroupId.id,
dataNodeLocationSet))));
Objects.nonNull(e)
? e
: getThrowable(consensusGroupId, dataNodeLocationSet, failureMap)));
interruptTask();
}

protected Exception getThrowable(
final TConsensusGroupId consensusGroupId,
final Set<TDataNodeLocation> dataNodeLocationSet,
final Map<TDataNodeLocation, TSStatus> failureMap) {
return new MetadataException(
String.format(
"[%s] for %s.%s failed when [%s] because failed to execute in all replicaset of %s %s. Failure nodes: %s, Failures: %s",
this.getClass().getSimpleName(),
database,
tableName,
taskName,
consensusGroupId.type,
consensusGroupId.id,
dataNodeLocationSet,
failureMap));
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,24 @@

package org.apache.iotdb.confignode.procedure.impl.schema.table;

import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.schema.table.TsTable;
import org.apache.iotdb.commons.schema.table.column.TsTableColumnSchema;
import org.apache.iotdb.commons.schema.table.column.TsTableColumnSchemaUtil;
import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType;
import org.apache.iotdb.confignode.consensus.request.write.table.AddTableColumnPlan;
import org.apache.iotdb.confignode.consensus.request.write.table.view.AddTableViewColumnPlan;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
import org.apache.iotdb.confignode.procedure.impl.schema.table.view.AddViewColumnProcedure;
import org.apache.iotdb.confignode.procedure.state.schema.AddTableColumnState;
import org.apache.iotdb.confignode.procedure.store.ProcedureType;
import org.apache.iotdb.mpp.rpc.thrift.TCheckDeviceIdForObjectReq;
import org.apache.iotdb.rpc.TSStatusCode;

import org.apache.tsfile.utils.Pair;
Expand All @@ -41,7 +46,9 @@
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;

public class AddTableColumnProcedure
Expand Down Expand Up @@ -77,6 +84,10 @@ protected Flow executeFromState(final ConfigNodeProcedureEnv env, final AddTable
case PRE_RELEASE:
LOGGER.info("Pre release info of table {}.{} when adding column", database, tableName);
preRelease(env);
if (table.setNeedCheck4Object()) {
checkObject(env, database, tableName);
}
setNextState(AddTableColumnState.ADD_COLUMN);
break;
case ADD_COLUMN:
LOGGER.info("Add column to table {}.{}", database, tableName);
Expand Down Expand Up @@ -123,7 +134,53 @@ protected void columnCheck(final ConfigNodeProcedureEnv env) {
@Override
protected void preRelease(final ConfigNodeProcedureEnv env) {
super.preRelease(env);
setNextState(AddTableColumnState.ADD_COLUMN);
}

private void checkObject(
final ConfigNodeProcedureEnv env, final String database, final String tableName) {
final Map<TConsensusGroupId, TRegionReplicaSet> relatedRegionGroup =
env.getConfigManager().getRelatedSchemaRegionGroup4TableModel(database);

if (!relatedRegionGroup.isEmpty()) {
new TableRegionTaskExecutor<>(
"check deviceId for object",
env,
relatedRegionGroup,
CnToDnAsyncRequestType.CHECK_DEVICE_ID_FOR_OBJECT,
((dataNodeLocation, consensusGroupIdList) ->
new TCheckDeviceIdForObjectReq(new ArrayList<>(consensusGroupIdList), tableName)),
((tConsensusGroupId, tDataNodeLocations, failureMap) -> {
final String message = parseStatus(failureMap.values());
// Shall not be SUCCESS here
return Objects.nonNull(message)
? new IoTDBRuntimeException(
message, TSStatusCode.SEMANTIC_ERROR.getStatusCode())
: null;
}))
.execute();
}
}

// Success: ""
// All semantic: return last one
// Non-semantic error: return null
private String parseStatus(final Iterable<TSStatus> statuses) {
String message = "";
for (final TSStatus status : statuses) {
if (status.getCode() == TSStatusCode.SEMANTIC_ERROR.getStatusCode()) {
message = status.getMessage();
Comment on lines +172 to +175
Copy link

Copilot AI Nov 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initializing 'message' to empty string and then reassigning it in the loop is inefficient. Consider initializing to null and using Objects.isNull() checks, or restructure to return early when a semantic error is found, to avoid unnecessary string assignments.

Copilot uses AI. Check for mistakes.
} else if (status.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
final String tempMsg = parseStatus(status.getSubStatus());
if (Objects.isNull(tempMsg)) {
return null;
} else if (!tempMsg.isEmpty()) {
message = tempMsg;
}
} else if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return null;
}
}
return message;
}

private void addColumn(final ConfigNodeProcedureEnv env) {
Expand Down
Loading
Loading