Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,13 @@ public class CompactionMetricsTracker {
private static final AtomicIntegerFieldUpdater<CompactionMetricsTracker> DATA_PAGES_WRITTEN_UPDATER =
newUpdater(CompactionMetricsTracker.class, "dataPagesWritten");

private static final AtomicIntegerFieldUpdater<CompactionMetricsTracker> DATA_PAGES_SKIPPED_UPDATER =
newUpdater(CompactionMetricsTracker.class, "dataPagesSkipped");

private volatile int dataPagesWritten;

private volatile int dataPagesSkipped;

private final long startNanos = System.nanoTime();

private long endNanos;
Expand All @@ -45,6 +50,15 @@ public void onDataPageWritten() {
DATA_PAGES_WRITTEN_UPDATER.incrementAndGet(this);
}

/**
* Increments counter if data page was skipped.
*
* <p>Thread safe.
*/
public void onPageSkipped() {
DATA_PAGES_SKIPPED_UPDATER.incrementAndGet(this);
}

/**
* Callback on compaction end.
*
Expand All @@ -63,6 +77,15 @@ public int dataPagesWritten() {
return dataPagesWritten;
}

/**
* Returns data pages skipped.
*
* <p>Thread safe.
*/
public int dataPagesSkipped() {
return dataPagesSkipped;
}

/**
* Returns total compaction duration.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,9 +299,10 @@ void doCompaction() {
long totalDurationInNanos = tracker.totalDuration(NANOSECONDS);

LOG.info(
"Compaction round finished [compactionId={}, pages={}, duration={}ms, avgWriteSpeed={}MB/s]",
"Compaction round finished [compactionId={}, pages={}, skipped={}, duration={}ms, avgWriteSpeed={}MB/s]",
compactionId,
tracker.dataPagesWritten(),
tracker.dataPagesSkipped(),
tracker.totalDuration(MILLISECONDS),
WriteSpeedFormatter.formatWriteSpeed(totalWriteBytes, totalDurationInNanos)
);
Expand Down Expand Up @@ -388,13 +389,37 @@ void mergeDeltaFileToMainFile(
// Copy pages deltaFilePageStore -> filePageStore.
ByteBuffer buffer = getThreadLocalBuffer(pageSize);

DeltaFilePageStoreIo[] newerDeltaFiles = filePageStore.getCompletedDeltaFiles()
.stream()
.filter(file -> file.fileIndex() > deltaFilePageStore.fileIndex())
.toArray(DeltaFilePageStoreIo[]::new);

int[] pointers = new int[newerDeltaFiles.length];

boolean shouldFsync = false;
for (long pageIndex : deltaFilePageStore.pageIndexes()) {
updateHeartbeat();

if (shouldStopCompaction(filePageStore)) {
return;
}

boolean shouldSkip = false;
for (int i = 0; i < pointers.length && !shouldSkip; i++) {
int[] newerPageIndexes = newerDeltaFiles[i].pageIndexes();

while (pointers[i] < newerPageIndexes.length - 1 && newerPageIndexes[pointers[i]] < pageIndex) {
pointers[i]++;
}

shouldSkip = pointers[i] < newerPageIndexes.length && newerPageIndexes[pointers[i]] == pageIndex;
}

if (shouldSkip) {
tracker.onPageSkipped();
continue;
}

long pageOffset = deltaFilePageStore.pageOffset(pageIndex);

// pageIndex instead of pageId, only for debugging in case of errors
Expand All @@ -417,16 +442,19 @@ void mergeDeltaFileToMainFile(
filePageStore.write(pageId, buffer.rewind());

tracker.onDataPageWritten();

shouldFsync = true;
}

// Fsync the file page store.
updateHeartbeat();
if (shouldFsync) {
updateHeartbeat();

if (shouldStopCompaction(filePageStore)) {
return;
}
if (shouldStopCompaction(filePageStore)) {
return;
}

filePageStore.sync();
filePageStore.sync();
}

// Removing the delta file page store from a file page store.
updateHeartbeat();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,23 @@ public int deltaFileCount() {
return deltaFilePageStoreIos.get(deltaFilePageStoreIos.size() - 1);
}

/**
* Returns completed delta files.
*
* <p>Thread safe.
*/
public List<DeltaFilePageStoreIo> getCompletedDeltaFiles() {
List<DeltaFilePageStoreIo> deltaFilePageStoreIos = this.deltaFilePageStoreIos;
CompletableFuture<DeltaFilePageStoreIo> newDeltaFilePageStoreIoFuture = this.newDeltaFilePageStoreIoFuture;

// Second check in case new future was created after getting snapshot of delta files.
if (newDeltaFilePageStoreIoFuture != null && deltaFilePageStoreIos.contains(newDeltaFilePageStoreIoFuture.join())) {
return deltaFilePageStoreIos.subList(1, deltaFilePageStoreIos.size());
}

return deltaFilePageStoreIos;
}

/**
* Deletes delta file.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.util.GridUnsafe.bufferAddress;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
Expand All @@ -43,6 +44,7 @@
import static org.mockito.Mockito.when;

import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -110,6 +112,106 @@ void testMergeDeltaFileToMainFile() throws Throwable {
verify(deltaFilePageStoreIo, times(1)).stop(eq(true));
}

@Test
void testMergeDeltaFileToMainFileWithNewerDeltaFile() throws Throwable {
Compactor compactor = newCompactor();

DeltaFilePageStoreIo deltaFilePageStoreIo = createDeltaFilePageStoreIo(new int[]{0, 1});
when(deltaFilePageStoreIo.fileIndex()).thenReturn(2);
FilePageStore filePageStore = createFilePageStore(deltaFilePageStoreIo);

DeltaFilePageStoreIo newerDeltaFile = createDeltaFilePageStoreIo(new int[]{1, 2});
when(newerDeltaFile.fileIndex()).thenReturn(3);
DeltaFilePageStoreIo olderDeltaFile = createDeltaFilePageStoreIo(new int[]{0, 1});
when(olderDeltaFile.fileIndex()).thenReturn(1);

when(filePageStore.getCompletedDeltaFiles()).thenReturn(List.of(deltaFilePageStoreIo, olderDeltaFile, newerDeltaFile));

compactor.mergeDeltaFileToMainFile(filePageStore, deltaFilePageStoreIo, new CompactionMetricsTracker());

verify(deltaFilePageStoreIo, times(1)).readWithMergedToFilePageStoreCheck(eq(0L), eq(0L), any(ByteBuffer.class), anyBoolean());

verify(filePageStore, times(1)).getCompletedDeltaFiles();
verify(filePageStore, times(1)).write(eq(1L), any(ByteBuffer.class));

verify(filePageStore, times(1)).sync();
verify(filePageStore, times(1)).removeDeltaFile(eq(deltaFilePageStoreIo));

verify(deltaFilePageStoreIo, times(1)).markMergedToFilePageStore();
verify(deltaFilePageStoreIo, times(1)).stop(eq(true));
}

@Test
void testMergeDeltaFileWithMultipleNewerDeltaFiles() throws Throwable {
Compactor compactor = newCompactor();

DeltaFilePageStoreIo deltaFilePageStoreIo = createDeltaFilePageStoreIo(new int[]{0, 1, 2, 3, 4, 5});
when(deltaFilePageStoreIo.fileIndex()).thenReturn(1);

// Check processing the first page index.
DeltaFilePageStoreIo newerDeltaFile1 = createDeltaFilePageStoreIo(new int[]{0});
when(newerDeltaFile1.fileIndex()).thenReturn(2);

// Check processing the last page index.
DeltaFilePageStoreIo newerDeltaFile2 = createDeltaFilePageStoreIo(new int[]{5});
when(newerDeltaFile2.fileIndex()).thenReturn(3);

// No matches.
DeltaFilePageStoreIo newerDeltaFileNoMatches = createDeltaFilePageStoreIo(new int[]{30});
when(newerDeltaFileNoMatches.fileIndex()).thenReturn(4);

// Multiple page matches.
DeltaFilePageStoreIo newerDeltaFile3 = createDeltaFilePageStoreIo(new int[]{2, 3});
when(newerDeltaFile3.fileIndex()).thenReturn(5);

FilePageStore filePageStore = createFilePageStore(deltaFilePageStoreIo);
when(filePageStore.getCompletedDeltaFiles()).thenReturn(List.of(
deltaFilePageStoreIo, newerDeltaFile1, newerDeltaFileNoMatches, newerDeltaFile2, newerDeltaFile3
));

CompactionMetricsTracker tracker = new CompactionMetricsTracker();
compactor.mergeDeltaFileToMainFile(filePageStore, deltaFilePageStoreIo, tracker);

// Pages 1, 4 were in no newer delta files, so they should be written.
verify(filePageStore, times(2)).write(eq(1L), any(ByteBuffer.class));

verify(deltaFilePageStoreIo, times(1)).readWithMergedToFilePageStoreCheck(eq(1L), anyLong(), any(ByteBuffer.class), anyBoolean());
verify(deltaFilePageStoreIo, times(1)).readWithMergedToFilePageStoreCheck(eq(4L), anyLong(), any(ByteBuffer.class), anyBoolean());

verify(filePageStore, times(1)).sync();
verify(filePageStore, times(1)).removeDeltaFile(eq(deltaFilePageStoreIo));

assertThat(tracker.dataPagesSkipped(), is(4));
assertThat(tracker.dataPagesWritten(), is(2));
}

@Test
void testMergeDeltaFileWhenAllPagesSkipped() throws Throwable {
Compactor compactor = newCompactor();

// Delta file to compact has pages [0, 1]
DeltaFilePageStoreIo deltaFilePageStoreIo = createDeltaFilePageStoreIo(new int[]{0, 1});
when(deltaFilePageStoreIo.fileIndex()).thenReturn(1);

DeltaFilePageStoreIo newerDeltaFile = createDeltaFilePageStoreIo(new int[]{0, 1});
when(newerDeltaFile.fileIndex()).thenReturn(2);

FilePageStore filePageStore = createFilePageStore(deltaFilePageStoreIo);
when(filePageStore.getCompletedDeltaFiles()).thenReturn(List.of(deltaFilePageStoreIo, newerDeltaFile));

compactor.mergeDeltaFileToMainFile(filePageStore, deltaFilePageStoreIo, new CompactionMetricsTracker());

verify(filePageStore, never()).write(anyLong(), any(ByteBuffer.class));

verify(deltaFilePageStoreIo, never()).readWithMergedToFilePageStoreCheck(anyLong(), anyLong(), any(ByteBuffer.class), anyBoolean());

verify(filePageStore, never()).sync();

verify(filePageStore, times(1)).removeDeltaFile(eq(deltaFilePageStoreIo));
verify(deltaFilePageStoreIo, times(1)).markMergedToFilePageStore();
verify(deltaFilePageStoreIo, times(1)).stop(eq(true));
}

@Test
void testDoCompaction() throws Throwable {
FilePageStore filePageStore = mock(FilePageStore.class);
Expand Down Expand Up @@ -282,9 +384,13 @@ private Compactor newCompactor(FilePageStoreManager filePageStoreManager) {
}

private static DeltaFilePageStoreIo createDeltaFilePageStoreIo() throws Exception {
return createDeltaFilePageStoreIo(new int[]{0});
}

private static DeltaFilePageStoreIo createDeltaFilePageStoreIo(int[] pageIndexes) throws Exception {
DeltaFilePageStoreIo deltaFilePageStoreIo = mock(DeltaFilePageStoreIo.class);

when(deltaFilePageStoreIo.pageIndexes()).thenReturn(new int[]{0});
when(deltaFilePageStoreIo.pageIndexes()).thenReturn(pageIndexes);

when(deltaFilePageStoreIo.readWithMergedToFilePageStoreCheck(anyLong(), anyLong(), any(ByteBuffer.class), anyBoolean()))
.then(answer -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,16 @@ public Path apply(int index) {
}
}

@Test
void testGetCompletedDeltaFiles() throws Exception {
DeltaFilePageStoreIo completedDeltaIo = mock(DeltaFilePageStoreIo.class);

try (FilePageStore filePageStore = createFilePageStore(workDir.resolve("test"), completedDeltaIo)) {
filePageStore.getOrCreateNewDeltaFile(ignored -> workDir.resolve("testDelta"), () -> INT_EMPTY_ARRAY);
assertThat(filePageStore.getCompletedDeltaFiles(), contains(completedDeltaIo));
}
}

@Test
void testStop() throws Exception {
FilePageStoreIo filePageStoreIo = mock(FilePageStoreIo.class);
Expand Down