Skip to content

Commit c20203e

Browse files
authored
IGNITE-27147 Optimise compaction of multiple delta files (#7067)
1 parent 1f269c3 commit c20203e

File tree

5 files changed

+192
-8
lines changed

5 files changed

+192
-8
lines changed

modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactionMetricsTracker.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,13 @@ public class CompactionMetricsTracker {
3030
private static final AtomicIntegerFieldUpdater<CompactionMetricsTracker> DATA_PAGES_WRITTEN_UPDATER =
3131
newUpdater(CompactionMetricsTracker.class, "dataPagesWritten");
3232

33+
private static final AtomicIntegerFieldUpdater<CompactionMetricsTracker> DATA_PAGES_SKIPPED_UPDATER =
34+
newUpdater(CompactionMetricsTracker.class, "dataPagesSkipped");
35+
3336
private volatile int dataPagesWritten;
3437

38+
private volatile int dataPagesSkipped;
39+
3540
private final long startNanos = System.nanoTime();
3641

3742
private long endNanos;
@@ -45,6 +50,15 @@ public void onDataPageWritten() {
4550
DATA_PAGES_WRITTEN_UPDATER.incrementAndGet(this);
4651
}
4752

53+
/**
54+
* Increments counter if data page was skipped.
55+
*
56+
* <p>Thread safe.
57+
*/
58+
public void onPageSkipped() {
59+
DATA_PAGES_SKIPPED_UPDATER.incrementAndGet(this);
60+
}
61+
4862
/**
4963
* Callback on compaction end.
5064
*
@@ -63,6 +77,15 @@ public int dataPagesWritten() {
6377
return dataPagesWritten;
6478
}
6579

80+
/**
81+
* Returns data pages skipped.
82+
*
83+
* <p>Thread safe.
84+
*/
85+
public int dataPagesSkipped() {
86+
return dataPagesSkipped;
87+
}
88+
6689
/**
6790
* Returns total compaction duration.
6891
*

modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -299,9 +299,10 @@ void doCompaction() {
299299
long totalDurationInNanos = tracker.totalDuration(NANOSECONDS);
300300

301301
LOG.info(
302-
"Compaction round finished [compactionId={}, pages={}, duration={}ms, avgWriteSpeed={}MB/s]",
302+
"Compaction round finished [compactionId={}, pages={}, skipped={}, duration={}ms, avgWriteSpeed={}MB/s]",
303303
compactionId,
304304
tracker.dataPagesWritten(),
305+
tracker.dataPagesSkipped(),
305306
tracker.totalDuration(MILLISECONDS),
306307
WriteSpeedFormatter.formatWriteSpeed(totalWriteBytes, totalDurationInNanos)
307308
);
@@ -388,13 +389,37 @@ void mergeDeltaFileToMainFile(
388389
// Copy pages deltaFilePageStore -> filePageStore.
389390
ByteBuffer buffer = getThreadLocalBuffer(pageSize);
390391

392+
DeltaFilePageStoreIo[] newerDeltaFiles = filePageStore.getCompletedDeltaFiles()
393+
.stream()
394+
.filter(file -> file.fileIndex() > deltaFilePageStore.fileIndex())
395+
.toArray(DeltaFilePageStoreIo[]::new);
396+
397+
int[] pointers = new int[newerDeltaFiles.length];
398+
399+
boolean shouldFsync = false;
391400
for (long pageIndex : deltaFilePageStore.pageIndexes()) {
392401
updateHeartbeat();
393402

394403
if (shouldStopCompaction(filePageStore)) {
395404
return;
396405
}
397406

407+
boolean shouldSkip = false;
408+
for (int i = 0; i < pointers.length && !shouldSkip; i++) {
409+
int[] newerPageIndexes = newerDeltaFiles[i].pageIndexes();
410+
411+
while (pointers[i] < newerPageIndexes.length - 1 && newerPageIndexes[pointers[i]] < pageIndex) {
412+
pointers[i]++;
413+
}
414+
415+
shouldSkip = pointers[i] < newerPageIndexes.length && newerPageIndexes[pointers[i]] == pageIndex;
416+
}
417+
418+
if (shouldSkip) {
419+
tracker.onPageSkipped();
420+
continue;
421+
}
422+
398423
long pageOffset = deltaFilePageStore.pageOffset(pageIndex);
399424

400425
// pageIndex instead of pageId, only for debugging in case of errors
@@ -417,16 +442,19 @@ void mergeDeltaFileToMainFile(
417442
filePageStore.write(pageId, buffer.rewind());
418443

419444
tracker.onDataPageWritten();
445+
446+
shouldFsync = true;
420447
}
421448

422-
// Fsync the file page store.
423-
updateHeartbeat();
449+
if (shouldFsync) {
450+
updateHeartbeat();
424451

425-
if (shouldStopCompaction(filePageStore)) {
426-
return;
427-
}
452+
if (shouldStopCompaction(filePageStore)) {
453+
return;
454+
}
428455

429-
filePageStore.sync();
456+
filePageStore.sync();
457+
}
430458

431459
// Removing the delta file page store from a file page store.
432460
updateHeartbeat();

modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStore.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -409,6 +409,23 @@ public int deltaFileCount() {
409409
return deltaFilePageStoreIos.get(deltaFilePageStoreIos.size() - 1);
410410
}
411411

412+
/**
413+
* Returns completed delta files.
414+
*
415+
* <p>Thread safe.
416+
*/
417+
public List<DeltaFilePageStoreIo> getCompletedDeltaFiles() {
418+
List<DeltaFilePageStoreIo> deltaFilePageStoreIos = this.deltaFilePageStoreIos;
419+
CompletableFuture<DeltaFilePageStoreIo> newDeltaFilePageStoreIoFuture = this.newDeltaFilePageStoreIoFuture;
420+
421+
// Second check in case new future was created after getting snapshot of delta files.
422+
if (newDeltaFilePageStoreIoFuture != null && deltaFilePageStoreIos.contains(newDeltaFilePageStoreIoFuture.join())) {
423+
return deltaFilePageStoreIos.subList(1, deltaFilePageStoreIos.size());
424+
}
425+
426+
return deltaFilePageStoreIos;
427+
}
428+
412429
/**
413430
* Deletes delta file.
414431
*

modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactorTest.java

Lines changed: 107 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
2525
import static org.apache.ignite.internal.util.GridUnsafe.bufferAddress;
2626
import static org.hamcrest.MatcherAssert.assertThat;
27+
import static org.hamcrest.Matchers.is;
2728
import static org.junit.jupiter.api.Assertions.assertFalse;
2829
import static org.junit.jupiter.api.Assertions.assertNull;
2930
import static org.junit.jupiter.api.Assertions.assertSame;
@@ -43,6 +44,7 @@
4344
import static org.mockito.Mockito.when;
4445

4546
import java.nio.ByteBuffer;
47+
import java.util.List;
4648
import java.util.concurrent.CompletableFuture;
4749
import java.util.concurrent.TimeoutException;
4850
import java.util.concurrent.atomic.AtomicReference;
@@ -110,6 +112,106 @@ void testMergeDeltaFileToMainFile() throws Throwable {
110112
verify(deltaFilePageStoreIo, times(1)).stop(eq(true));
111113
}
112114

115+
@Test
116+
void testMergeDeltaFileToMainFileWithNewerDeltaFile() throws Throwable {
117+
Compactor compactor = newCompactor();
118+
119+
DeltaFilePageStoreIo deltaFilePageStoreIo = createDeltaFilePageStoreIo(new int[]{0, 1});
120+
when(deltaFilePageStoreIo.fileIndex()).thenReturn(2);
121+
FilePageStore filePageStore = createFilePageStore(deltaFilePageStoreIo);
122+
123+
DeltaFilePageStoreIo newerDeltaFile = createDeltaFilePageStoreIo(new int[]{1, 2});
124+
when(newerDeltaFile.fileIndex()).thenReturn(3);
125+
DeltaFilePageStoreIo olderDeltaFile = createDeltaFilePageStoreIo(new int[]{0, 1});
126+
when(olderDeltaFile.fileIndex()).thenReturn(1);
127+
128+
when(filePageStore.getCompletedDeltaFiles()).thenReturn(List.of(deltaFilePageStoreIo, olderDeltaFile, newerDeltaFile));
129+
130+
compactor.mergeDeltaFileToMainFile(filePageStore, deltaFilePageStoreIo, new CompactionMetricsTracker());
131+
132+
verify(deltaFilePageStoreIo, times(1)).readWithMergedToFilePageStoreCheck(eq(0L), eq(0L), any(ByteBuffer.class), anyBoolean());
133+
134+
verify(filePageStore, times(1)).getCompletedDeltaFiles();
135+
verify(filePageStore, times(1)).write(eq(1L), any(ByteBuffer.class));
136+
137+
verify(filePageStore, times(1)).sync();
138+
verify(filePageStore, times(1)).removeDeltaFile(eq(deltaFilePageStoreIo));
139+
140+
verify(deltaFilePageStoreIo, times(1)).markMergedToFilePageStore();
141+
verify(deltaFilePageStoreIo, times(1)).stop(eq(true));
142+
}
143+
144+
@Test
145+
void testMergeDeltaFileWithMultipleNewerDeltaFiles() throws Throwable {
146+
Compactor compactor = newCompactor();
147+
148+
DeltaFilePageStoreIo deltaFilePageStoreIo = createDeltaFilePageStoreIo(new int[]{0, 1, 2, 3, 4, 5});
149+
when(deltaFilePageStoreIo.fileIndex()).thenReturn(1);
150+
151+
// Check processing the first page index.
152+
DeltaFilePageStoreIo newerDeltaFile1 = createDeltaFilePageStoreIo(new int[]{0});
153+
when(newerDeltaFile1.fileIndex()).thenReturn(2);
154+
155+
// Check processing the last page index.
156+
DeltaFilePageStoreIo newerDeltaFile2 = createDeltaFilePageStoreIo(new int[]{5});
157+
when(newerDeltaFile2.fileIndex()).thenReturn(3);
158+
159+
// No matches.
160+
DeltaFilePageStoreIo newerDeltaFileNoMatches = createDeltaFilePageStoreIo(new int[]{30});
161+
when(newerDeltaFileNoMatches.fileIndex()).thenReturn(4);
162+
163+
// Multiple page matches.
164+
DeltaFilePageStoreIo newerDeltaFile3 = createDeltaFilePageStoreIo(new int[]{2, 3});
165+
when(newerDeltaFile3.fileIndex()).thenReturn(5);
166+
167+
FilePageStore filePageStore = createFilePageStore(deltaFilePageStoreIo);
168+
when(filePageStore.getCompletedDeltaFiles()).thenReturn(List.of(
169+
deltaFilePageStoreIo, newerDeltaFile1, newerDeltaFileNoMatches, newerDeltaFile2, newerDeltaFile3
170+
));
171+
172+
CompactionMetricsTracker tracker = new CompactionMetricsTracker();
173+
compactor.mergeDeltaFileToMainFile(filePageStore, deltaFilePageStoreIo, tracker);
174+
175+
// Pages 1, 4 were in no newer delta files, so they should be written.
176+
verify(filePageStore, times(2)).write(eq(1L), any(ByteBuffer.class));
177+
178+
verify(deltaFilePageStoreIo, times(1)).readWithMergedToFilePageStoreCheck(eq(1L), anyLong(), any(ByteBuffer.class), anyBoolean());
179+
verify(deltaFilePageStoreIo, times(1)).readWithMergedToFilePageStoreCheck(eq(4L), anyLong(), any(ByteBuffer.class), anyBoolean());
180+
181+
verify(filePageStore, times(1)).sync();
182+
verify(filePageStore, times(1)).removeDeltaFile(eq(deltaFilePageStoreIo));
183+
184+
assertThat(tracker.dataPagesSkipped(), is(4));
185+
assertThat(tracker.dataPagesWritten(), is(2));
186+
}
187+
188+
@Test
189+
void testMergeDeltaFileWhenAllPagesSkipped() throws Throwable {
190+
Compactor compactor = newCompactor();
191+
192+
// Delta file to compact has pages [0, 1]
193+
DeltaFilePageStoreIo deltaFilePageStoreIo = createDeltaFilePageStoreIo(new int[]{0, 1});
194+
when(deltaFilePageStoreIo.fileIndex()).thenReturn(1);
195+
196+
DeltaFilePageStoreIo newerDeltaFile = createDeltaFilePageStoreIo(new int[]{0, 1});
197+
when(newerDeltaFile.fileIndex()).thenReturn(2);
198+
199+
FilePageStore filePageStore = createFilePageStore(deltaFilePageStoreIo);
200+
when(filePageStore.getCompletedDeltaFiles()).thenReturn(List.of(deltaFilePageStoreIo, newerDeltaFile));
201+
202+
compactor.mergeDeltaFileToMainFile(filePageStore, deltaFilePageStoreIo, new CompactionMetricsTracker());
203+
204+
verify(filePageStore, never()).write(anyLong(), any(ByteBuffer.class));
205+
206+
verify(deltaFilePageStoreIo, never()).readWithMergedToFilePageStoreCheck(anyLong(), anyLong(), any(ByteBuffer.class), anyBoolean());
207+
208+
verify(filePageStore, never()).sync();
209+
210+
verify(filePageStore, times(1)).removeDeltaFile(eq(deltaFilePageStoreIo));
211+
verify(deltaFilePageStoreIo, times(1)).markMergedToFilePageStore();
212+
verify(deltaFilePageStoreIo, times(1)).stop(eq(true));
213+
}
214+
113215
@Test
114216
void testDoCompaction() throws Throwable {
115217
FilePageStore filePageStore = mock(FilePageStore.class);
@@ -282,9 +384,13 @@ private Compactor newCompactor(FilePageStoreManager filePageStoreManager) {
282384
}
283385

284386
private static DeltaFilePageStoreIo createDeltaFilePageStoreIo() throws Exception {
387+
return createDeltaFilePageStoreIo(new int[]{0});
388+
}
389+
390+
private static DeltaFilePageStoreIo createDeltaFilePageStoreIo(int[] pageIndexes) throws Exception {
285391
DeltaFilePageStoreIo deltaFilePageStoreIo = mock(DeltaFilePageStoreIo.class);
286392

287-
when(deltaFilePageStoreIo.pageIndexes()).thenReturn(new int[]{0});
393+
when(deltaFilePageStoreIo.pageIndexes()).thenReturn(pageIndexes);
288394

289395
when(deltaFilePageStoreIo.readWithMergedToFilePageStoreCheck(anyLong(), anyLong(), any(ByteBuffer.class), anyBoolean()))
290396
.then(answer -> {

modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreTest.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,16 @@ public Path apply(int index) {
211211
}
212212
}
213213

214+
@Test
215+
void testGetCompletedDeltaFiles() throws Exception {
216+
DeltaFilePageStoreIo completedDeltaIo = mock(DeltaFilePageStoreIo.class);
217+
218+
try (FilePageStore filePageStore = createFilePageStore(workDir.resolve("test"), completedDeltaIo)) {
219+
filePageStore.getOrCreateNewDeltaFile(ignored -> workDir.resolve("testDelta"), () -> INT_EMPTY_ARRAY);
220+
assertThat(filePageStore.getCompletedDeltaFiles(), contains(completedDeltaIo));
221+
}
222+
}
223+
214224
@Test
215225
void testStop() throws Exception {
216226
FilePageStoreIo filePageStoreIo = mock(FilePageStoreIo.class);

0 commit comments

Comments
 (0)