Skip to content

Commit 806f6a7

Browse files
committed
Fixed the concurrency issue of region migrate and load (#16796)
* rq * gra * fix * fix * coverage * fix * fix * fix
1 parent 50563b6 commit 806f6a7

File tree

19 files changed

+203
-112
lines changed

19 files changed

+203
-112
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
2525
import org.apache.iotdb.commons.consensus.DataRegionId;
2626
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
27+
import org.apache.iotdb.commons.utils.TestOnly;
2728
import org.apache.iotdb.consensus.ConsensusFactory;
2829
import org.apache.iotdb.consensus.IConsensus;
2930
import org.apache.iotdb.consensus.config.ConsensusConfig;
@@ -58,6 +59,11 @@ private DataRegionConsensusImpl() {
5859
// do nothing
5960
}
6061

62+
@TestOnly
63+
public static void setInstance(final IConsensus instance) {
64+
DataRegionConsensusImplHolder.INSTANCE = instance;
65+
}
66+
6167
public static IConsensus getInstance() {
6268
return DataRegionConsensusImplHolder.INSTANCE;
6369
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ public boolean takeSnapshot(File snapshotDir) {
9393
logger.error(
9494
"Exception occurs when taking snapshot for {}-{} in {}",
9595
region.getDatabaseName(),
96-
region.getDataRegionId(),
96+
region.getDataRegionIdString(),
9797
snapshotDir,
9898
e);
9999
return false;
@@ -109,7 +109,7 @@ public boolean takeSnapshot(File snapshotDir, String snapshotTmpId, String snaps
109109
logger.error(
110110
"Exception occurs when taking snapshot for {}-{} in {}",
111111
region.getDatabaseName(),
112-
region.getDataRegionId(),
112+
region.getDataRegionIdString(),
113113
snapshotDir,
114114
e);
115115
return false;
@@ -127,7 +127,7 @@ public void loadSnapshot(File latestSnapshotRootDir) {
127127
new SnapshotLoader(
128128
latestSnapshotRootDir.getAbsolutePath(),
129129
region.getDatabaseName(),
130-
region.getDataRegionId())
130+
region.getDataRegionIdString())
131131
.loadSnapshotForStateMachine();
132132
if (newRegion == null) {
133133
logger.error("Fail to load snapshot from {}", latestSnapshotRootDir);
@@ -136,7 +136,8 @@ public void loadSnapshot(File latestSnapshotRootDir) {
136136
this.region = newRegion;
137137
try {
138138
StorageEngine.getInstance()
139-
.setDataRegion(new DataRegionId(Integer.parseInt(region.getDataRegionId())), region);
139+
.setDataRegion(
140+
new DataRegionId(Integer.parseInt(region.getDataRegionIdString())), region);
140141
ChunkCache.getInstance().clear();
141142
TimeSeriesMetadataCache.getInstance().clear();
142143
BloomFilterCache.getInstance().clear();
@@ -185,13 +186,13 @@ public List<File> getSnapshotFiles(File latestSnapshotRootDir) {
185186
return new SnapshotLoader(
186187
latestSnapshotRootDir.getAbsolutePath(),
187188
region.getDatabaseName(),
188-
region.getDataRegionId())
189+
region.getDataRegionIdString())
189190
.getSnapshotFileInfo();
190191
} catch (IOException e) {
191192
logger.error(
192193
"Meets error when getting snapshot files for {}-{}",
193194
region.getDatabaseName(),
194-
region.getDataRegionId(),
195+
region.getDataRegionIdString(),
195196
e);
196197
return null;
197198
}
@@ -272,7 +273,7 @@ public File getSnapshotRoot() {
272273
+ File.separator
273274
+ region.getDatabaseName()
274275
+ "-"
275-
+ region.getDataRegionId();
276+
+ region.getDataRegionIdString();
276277
return new File(snapshotDir).getCanonicalFile();
277278
} catch (IOException | NullPointerException e) {
278279
logger.warn("{}: cannot get the canonical file of {} due to {}", this, snapshotDir, e);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -751,7 +751,7 @@ private IQueryDataSource getUnfinishedQueryDataSource() {
751751
if (initQueryDataSourceRetryCount % 10 == 0) {
752752
LOGGER.warn(
753753
"Failed to acquire the read lock of DataRegion-{} for {} times",
754-
dataRegion == null ? "UNKNOWN" : dataRegion.getDataRegionId(),
754+
dataRegion == null ? "UNKNOWN" : dataRegion.getDataRegionIdString(),
755755
initQueryDataSourceRetryCount);
756756
}
757757
return UNFINISHED_QUERY_DATA_SOURCE;

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ private boolean fillFragmentInstanceStatistics(
168168
// We don't need to output the region having ExplainAnalyzeOperator only.
169169
return false;
170170
}
171-
statistics.setDataRegion(context.getDataRegion().getDataRegionId());
171+
statistics.setDataRegion(context.getDataRegion().getDataRegionIdString());
172172
statistics.setIp(CONFIG.getInternalAddress() + ":" + CONFIG.getInternalPort());
173173
statistics.setStartTimeInMS(context.getStartTime());
174174
statistics.setEndTimeInMS(

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -501,7 +501,7 @@ private boolean loadLocally(LoadSingleTsFileNode node) throws IoTDBException {
501501
MemTableFlushTask.recordFlushPointsMetricInternal(
502502
node.getWritePointCount(),
503503
databaseName,
504-
dataRegion.getDataRegionId());
504+
dataRegion.getDataRegionIdString());
505505

506506
MetricService.getInstance()
507507
.count(
@@ -513,7 +513,7 @@ private boolean loadLocally(LoadSingleTsFileNode node) throws IoTDBException {
513513
Tag.DATABASE.toString(),
514514
databaseName,
515515
Tag.REGION.toString(),
516-
dataRegion.getDataRegionId(),
516+
dataRegion.getDataRegionIdString(),
517517
Tag.TYPE.toString(),
518518
Metric.LOAD_POINT_COUNT.toString());
519519
MetricService.getInstance()
@@ -526,7 +526,7 @@ private boolean loadLocally(LoadSingleTsFileNode node) throws IoTDBException {
526526
Tag.DATABASE.toString(),
527527
databaseName,
528528
Tag.REGION.toString(),
529-
dataRegion.getDataRegionId(),
529+
dataRegion.getDataRegionIdString(),
530530
Tag.TYPE.toString(),
531531
Metric.LOAD_POINT_COUNT.toString());
532532
}));

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -518,7 +518,8 @@ public void unbindDataRegionMetrics() {
518518
}
519519

520520
public void createDataRegionMemoryCostMetrics(DataRegion dataRegion) {
521-
DataRegionId dataRegionId = new DataRegionId(Integer.parseInt(dataRegion.getDataRegionId()));
521+
DataRegionId dataRegionId =
522+
new DataRegionId(Integer.parseInt(dataRegion.getDataRegionIdString()));
522523
MetricService.getInstance()
523524
.createAutoGauge(
524525
Metric.DATA_REGION_MEM_COST.toString(),

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -527,7 +527,7 @@ public void syncCloseProcessorsInDatabase(String databaseName) {
527527
public void syncCloseProcessorsInRegion(List<String> dataRegionIds) {
528528
List<Future<Void>> tasks = new ArrayList<>();
529529
for (DataRegion dataRegion : dataRegionMap.values()) {
530-
if (dataRegion != null && dataRegionIds.contains(dataRegion.getDataRegionId())) {
530+
if (dataRegion != null && dataRegionIds.contains(dataRegion.getDataRegionIdString())) {
531531
tasks.add(
532532
cachedThreadPool.submit(
533533
() -> {
@@ -784,7 +784,7 @@ public void deleteDataRegion(DataRegionId regionId) {
784784
// delete wal
785785
WALManager.getInstance()
786786
.deleteWALNode(
787-
region.getDatabaseName() + FILE_NAME_SEPARATOR + region.getDataRegionId());
787+
region.getDatabaseName() + FILE_NAME_SEPARATOR + region.getDataRegionIdString());
788788
// delete snapshot
789789
for (String dataDir : CONFIG.getLocalDataDirs()) {
790790
File regionSnapshotDir =
@@ -803,12 +803,12 @@ public void deleteDataRegion(DataRegionId regionId) {
803803
WRITING_METRICS.removeDataRegionMemoryCostMetrics(regionId);
804804
WRITING_METRICS.removeFlushingMemTableStatusMetrics(regionId);
805805
WRITING_METRICS.removeActiveMemtableCounterMetrics(regionId);
806-
FileMetrics.getInstance().deleteRegion(region.getDatabaseName(), region.getDataRegionId());
806+
FileMetrics.getInstance().deleteRegion(region.getDatabaseName(), region.getDataRegionIdString());
807807
} catch (Exception e) {
808808
LOGGER.error(
809809
"Error occurs when deleting data region {}-{}",
810810
region.getDatabaseName(),
811-
region.getDataRegionId(),
811+
region.getDataRegionIdString(),
812812
e);
813813
} finally {
814814
deletingDataRegionMap.remove(regionId);

0 commit comments

Comments
 (0)