Skip to content

Commit 03bc260

Browse files
authored
Fixed the concurrency issue of region migrate and load (#16796)
* rq * gra * fix * fix * coverage * fix * fix * fix
1 parent ddcc646 commit 03bc260

File tree

19 files changed

+218
-122
lines changed

19 files changed

+218
-122
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.iotdb.commons.memory.IMemoryBlock;
2929
import org.apache.iotdb.commons.memory.MemoryBlockType;
3030
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
31+
import org.apache.iotdb.commons.utils.TestOnly;
3132
import org.apache.iotdb.consensus.ConsensusFactory;
3233
import org.apache.iotdb.consensus.IConsensus;
3334
import org.apache.iotdb.consensus.config.ConsensusConfig;
@@ -65,6 +66,11 @@ private DataRegionConsensusImpl() {
6566
// do nothing
6667
}
6768

69+
@TestOnly
70+
public static void setInstance(final IConsensus instance) {
71+
DataRegionConsensusImplHolder.INSTANCE = instance;
72+
}
73+
6874
public static IConsensus getInstance() {
6975
return DataRegionConsensusImplHolder.INSTANCE;
7076
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ public boolean takeSnapshot(File snapshotDir) {
9393
logger.error(
9494
"Exception occurs when taking snapshot for {}-{} in {}",
9595
region.getDatabaseName(),
96-
region.getDataRegionId(),
96+
region.getDataRegionIdString(),
9797
snapshotDir,
9898
e);
9999
return false;
@@ -109,7 +109,7 @@ public boolean takeSnapshot(File snapshotDir, String snapshotTmpId, String snaps
109109
logger.error(
110110
"Exception occurs when taking snapshot for {}-{} in {}",
111111
region.getDatabaseName(),
112-
region.getDataRegionId(),
112+
region.getDataRegionIdString(),
113113
snapshotDir,
114114
e);
115115
return false;
@@ -127,7 +127,7 @@ public void loadSnapshot(File latestSnapshotRootDir) {
127127
new SnapshotLoader(
128128
latestSnapshotRootDir.getAbsolutePath(),
129129
region.getDatabaseName(),
130-
region.getDataRegionId())
130+
region.getDataRegionIdString())
131131
.loadSnapshotForStateMachine();
132132
if (newRegion == null) {
133133
logger.error("Fail to load snapshot from {}", latestSnapshotRootDir);
@@ -136,7 +136,8 @@ public void loadSnapshot(File latestSnapshotRootDir) {
136136
this.region = newRegion;
137137
try {
138138
StorageEngine.getInstance()
139-
.setDataRegion(new DataRegionId(Integer.parseInt(region.getDataRegionId())), region);
139+
.setDataRegion(
140+
new DataRegionId(Integer.parseInt(region.getDataRegionIdString())), region);
140141
ChunkCache.getInstance().clear();
141142
TimeSeriesMetadataCache.getInstance().clear();
142143
BloomFilterCache.getInstance().clear();
@@ -185,13 +186,13 @@ public List<File> getSnapshotFiles(File latestSnapshotRootDir) {
185186
return new SnapshotLoader(
186187
latestSnapshotRootDir.getAbsolutePath(),
187188
region.getDatabaseName(),
188-
region.getDataRegionId())
189+
region.getDataRegionIdString())
189190
.getSnapshotFileInfo();
190191
} catch (IOException e) {
191192
logger.error(
192193
"Meets error when getting snapshot files for {}-{}",
193194
region.getDatabaseName(),
194-
region.getDataRegionId(),
195+
region.getDataRegionIdString(),
195196
e);
196197
return null;
197198
}
@@ -276,7 +277,7 @@ public File getSnapshotRoot() {
276277
+ File.separator
277278
+ region.getDatabaseName()
278279
+ "-"
279-
+ region.getDataRegionId();
280+
+ region.getDataRegionIdString();
280281
return new File(snapshotDir).getCanonicalFile();
281282
} catch (IOException | NullPointerException e) {
282283
logger.warn("{}: cannot get the canonical file of {} due to {}", this, snapshotDir, e);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -779,7 +779,7 @@ private IQueryDataSource getUnfinishedQueryDataSource() {
779779
if (initQueryDataSourceRetryCount % 10 == 0) {
780780
LOGGER.warn(
781781
"Failed to acquire the read lock of DataRegion-{} for {} times",
782-
dataRegion == null ? "UNKNOWN" : dataRegion.getDataRegionId(),
782+
dataRegion == null ? "UNKNOWN" : dataRegion.getDataRegionIdString(),
783783
initQueryDataSourceRetryCount);
784784
}
785785
return UNFINISHED_QUERY_DATA_SOURCE;

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ private boolean fillFragmentInstanceStatistics(
152152
// We don't need to output the region having ExplainAnalyzeOperator only.
153153
return false;
154154
}
155-
statistics.setDataRegion(context.getDataRegion().getDataRegionId());
155+
statistics.setDataRegion(context.getDataRegion().getDataRegionIdString());
156156
statistics.setIp(CONFIG.getInternalAddress() + ":" + CONFIG.getInternalPort());
157157
statistics.setStartTimeInMS(context.getStartTime());
158158
statistics.setEndTimeInMS(

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -531,7 +531,7 @@ private boolean loadLocally(LoadSingleTsFileNode node) throws IoTDBException {
531531
MemTableFlushTask.recordFlushPointsMetricInternal(
532532
node.getWritePointCount(),
533533
databaseName,
534-
dataRegion.getDataRegionId());
534+
dataRegion.getDataRegionIdString());
535535

536536
MetricService.getInstance()
537537
.count(
@@ -543,7 +543,7 @@ private boolean loadLocally(LoadSingleTsFileNode node) throws IoTDBException {
543543
Tag.DATABASE.toString(),
544544
databaseName,
545545
Tag.REGION.toString(),
546-
dataRegion.getDataRegionId(),
546+
dataRegion.getDataRegionIdString(),
547547
Tag.TYPE.toString(),
548548
Metric.LOAD_POINT_COUNT.toString());
549549
MetricService.getInstance()
@@ -556,7 +556,7 @@ private boolean loadLocally(LoadSingleTsFileNode node) throws IoTDBException {
556556
Tag.DATABASE.toString(),
557557
databaseName,
558558
Tag.REGION.toString(),
559-
dataRegion.getDataRegionId(),
559+
dataRegion.getDataRegionIdString(),
560560
Tag.TYPE.toString(),
561561
Metric.LOAD_POINT_COUNT.toString());
562562
}));

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -520,7 +520,8 @@ public void unbindDataRegionMetrics() {
520520
}
521521

522522
public void createDataRegionMemoryCostMetrics(DataRegion dataRegion) {
523-
DataRegionId dataRegionId = new DataRegionId(Integer.parseInt(dataRegion.getDataRegionId()));
523+
DataRegionId dataRegionId =
524+
new DataRegionId(Integer.parseInt(dataRegion.getDataRegionIdString()));
524525
MetricService.getInstance()
525526
.createAutoGauge(
526527
Metric.DATA_REGION_MEM_COST.toString(),

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -529,7 +529,7 @@ public void syncCloseProcessorsInDatabase(String databaseName) {
529529
public void syncCloseProcessorsInRegion(List<String> dataRegionIds) {
530530
List<Future<Void>> tasks = new ArrayList<>();
531531
for (DataRegion dataRegion : dataRegionMap.values()) {
532-
if (dataRegion != null && dataRegionIds.contains(dataRegion.getDataRegionId())) {
532+
if (dataRegion != null && dataRegionIds.contains(dataRegion.getDataRegionIdString())) {
533533
tasks.add(
534534
cachedThreadPool.submit(
535535
() -> {
@@ -785,7 +785,9 @@ public void deleteDataRegion(DataRegionId regionId) {
785785
// delete wal
786786
WALManager.getInstance()
787787
.deleteWALNode(
788-
region.getDatabaseName() + FILE_NAME_SEPARATOR + region.getDataRegionId());
788+
region.getDatabaseName()
789+
+ FILE_NAME_SEPARATOR
790+
+ region.getDataRegionIdString());
789791
// delete snapshot
790792
for (String dataDir : CONFIG.getLocalDataDirs()) {
791793
File regionSnapshotDir =
@@ -805,7 +807,7 @@ public void deleteDataRegion(DataRegionId regionId) {
805807
// delete region information in wal and may delete wal
806808
WALManager.getInstance()
807809
.deleteRegionAndMayDeleteWALNode(
808-
region.getDatabaseName(), region.getDataRegionId());
810+
region.getDatabaseName(), region.getDataRegionIdString());
809811
break;
810812
case ConsensusFactory.RATIS_CONSENSUS:
811813
default:
@@ -814,14 +816,15 @@ public void deleteDataRegion(DataRegionId regionId) {
814816
WRITING_METRICS.removeDataRegionMemoryCostMetrics(regionId);
815817
WRITING_METRICS.removeFlushingMemTableStatusMetrics(regionId);
816818
WRITING_METRICS.removeActiveMemtableCounterMetrics(regionId);
817-
FileMetrics.getInstance().deleteRegion(region.getDatabaseName(), region.getDataRegionId());
819+
FileMetrics.getInstance()
820+
.deleteRegion(region.getDatabaseName(), region.getDataRegionIdString());
818821
CompressionRatio.getInstance().removeDataRegionRatio(String.valueOf(regionId.getId()));
819822
LOGGER.info("Removed data region {}", regionId);
820823
} catch (Exception e) {
821824
LOGGER.error(
822825
"Error occurs when deleting data region {}-{}",
823826
region.getDatabaseName(),
824-
region.getDataRegionId(),
827+
region.getDataRegionIdString(),
825828
e);
826829
} finally {
827830
deletingDataRegionMap.remove(regionId);

0 commit comments

Comments
 (0)