From 63bc4bca4e69788e3ab395aa2ca1ca2b884210d3 Mon Sep 17 00:00:00 2001 From: herefree <841043203@qq.com> Date: Fri, 3 Jan 2025 12:28:17 +0800 Subject: [PATCH 1/2] optimize SnapshotManager repetitive code login --- .../apache/paimon/utils/SnapshotManager.java | 196 +++++++----------- 1 file changed, 73 insertions(+), 123 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java index ae70d7aec5d1..f629efa97725 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java @@ -290,15 +290,11 @@ private Snapshot changelogOrSnapshot(long snapshotId) { return earliest - 1; } - while (earliest < latest) { - long mid = (earliest + latest + 1) / 2; - if (changelogOrSnapshot(mid).timeMillis() < timestampMills) { - earliest = mid; - } else { - latest = mid - 1; - } - } - return earliest; + return binarySearch( + earliest, + latest, + id -> changelogOrSnapshot(id).timeMillis() < timestampMills, + false); } /** @@ -316,22 +312,12 @@ private Snapshot changelogOrSnapshot(long snapshotId) { if (earliestSnapShot.timeMillis() > timestampMills) { return earliestSnapShot; } - Snapshot finalSnapshot = null; - while (earliest <= latest) { - long mid = earliest + (latest - earliest) / 2; // Avoid overflow - Snapshot snapshot = snapshot(mid); - long commitTime = snapshot.timeMillis(); - if (commitTime > timestampMills) { - latest = mid - 1; // Search in the left half - } else if (commitTime < timestampMills) { - earliest = mid + 1; // Search in the right half - finalSnapshot = snapshot; - } else { - finalSnapshot = snapshot; // Found the exact match - break; - } - } - return finalSnapshot; + + Long resultId = + binarySearch( + earliest, latest, id -> snapshot(id).timeMillis() <= timestampMills, false); + + return resultId == null ? null : snapshot(resultId); } /** @@ -349,22 +335,12 @@ private Snapshot changelogOrSnapshot(long snapshotId) { if (latestSnapShot.timeMillis() < timestampMills) { return null; } - Snapshot finalSnapshot = null; - while (earliest <= latest) { - long mid = earliest + (latest - earliest) / 2; // Avoid overflow - Snapshot snapshot = snapshot(mid); - long commitTime = snapshot.timeMillis(); - if (commitTime > timestampMills) { - latest = mid - 1; // Search in the left half - finalSnapshot = snapshot; - } else if (commitTime < timestampMills) { - earliest = mid + 1; // Search in the right half - } else { - finalSnapshot = snapshot; // Found the exact match - break; - } - } - return finalSnapshot; + + Long resultId = + binarySearch( + earliest, latest, id -> snapshot(id).timeMillis() >= timestampMills, true); + + return resultId == null ? null : snapshot(resultId); } public @Nullable Snapshot earlierOrEqualWatermark(long watermark) { @@ -375,55 +351,21 @@ private Snapshot changelogOrSnapshot(long snapshotId) { if (earliest == null || latest == null || snapshot(latest).watermark() == Long.MIN_VALUE) { return null; } - Long earliestWatermark = null; - // find the first snapshot with watermark - if ((earliestWatermark = snapshot(earliest).watermark()) == null) { - while (earliest < latest) { - earliest++; - earliestWatermark = snapshot(earliest).watermark(); - if (earliestWatermark != null) { - break; - } - } - } - if (earliestWatermark == null) { + Long firstValidId = findFirstSnapshotWithWatermark(earliest, latest); + if (firstValidId == null) { return null; } - if (earliestWatermark >= watermark) { - return snapshot(earliest); - } - Snapshot finalSnapshot = null; + Long resultId = + binarySearch( + firstValidId, + latest, + id -> + snapshot(id).watermark() != null + && snapshot(id).watermark() <= watermark, + false); - while (earliest <= latest) { - long mid = earliest + (latest - earliest) / 2; // Avoid overflow - Snapshot snapshot = snapshot(mid); - Long commitWatermark = snapshot.watermark(); - if (commitWatermark == null) { - // find the first snapshot with watermark - while (mid >= earliest) { - mid--; - commitWatermark = snapshot(mid).watermark(); - if (commitWatermark != null) { - break; - } - } - } - if (commitWatermark == null) { - earliest = mid + 1; - } else { - if (commitWatermark > watermark) { - latest = mid - 1; // Search in the left half - } else if (commitWatermark < watermark) { - earliest = mid + 1; // Search in the right half - finalSnapshot = snapshot; - } else { - finalSnapshot = snapshot; // Found the exact match - break; - } - } - } - return finalSnapshot; + return resultId == null ? null : snapshot(resultId); } public @Nullable Snapshot laterOrEqualWatermark(long watermark) { @@ -434,55 +376,63 @@ private Snapshot changelogOrSnapshot(long snapshotId) { if (earliest == null || latest == null || snapshot(latest).watermark() == Long.MIN_VALUE) { return null; } - Long earliestWatermark = null; - // find the first snapshot with watermark - if ((earliestWatermark = snapshot(earliest).watermark()) == null) { - while (earliest < latest) { - earliest++; - earliestWatermark = snapshot(earliest).watermark(); - if (earliestWatermark != null) { - break; - } - } - } - if (earliestWatermark == null) { + + Long firstValidId = findFirstSnapshotWithWatermark(earliest, latest); + if (firstValidId == null) { return null; } - if (earliestWatermark >= watermark) { - return snapshot(earliest); + if (snapshot(firstValidId).watermark() >= watermark) { + return snapshot(firstValidId); } - Snapshot finalSnapshot = null; + Long resultId = + binarySearch( + earliest, + latest, + id -> + snapshot(id).watermark() != null + && snapshot(id).watermark() >= watermark, + true); + + return resultId == null ? null : snapshot(resultId); + } + + private Long findFirstSnapshotWithWatermark(Long earliest, Long latest) { while (earliest <= latest) { - long mid = earliest + (latest - earliest) / 2; // Avoid overflow - Snapshot snapshot = snapshot(mid); - Long commitWatermark = snapshot.watermark(); - if (commitWatermark == null) { - // find the first snapshot with watermark - while (mid >= earliest) { - mid--; - commitWatermark = snapshot(mid).watermark(); - if (commitWatermark != null) { - break; - } - } + Long watermark = snapshot(earliest).watermark(); + if (watermark != null) { + return earliest; } - if (commitWatermark == null) { - earliest = mid + 1; + earliest++; + } + return null; + } + + private @Nullable Long binarySearch( + Long start, + Long end, + java.util.function.Predicate condition, + boolean findEarliest) { + Long result = null; + while (start <= end) { + long mid = start + (end - start) / 2; + if (condition.test(mid)) { + result = mid; + if (findEarliest) { + end = mid - 1; + } else { + start = mid + 1; + } } else { - if (commitWatermark > watermark) { - latest = mid - 1; // Search in the left half - finalSnapshot = snapshot; - } else if (commitWatermark < watermark) { - earliest = mid + 1; // Search in the right half + if (findEarliest) { + start = mid + 1; } else { - finalSnapshot = snapshot; // Found the exact match - break; + end = mid - 1; } } } - return finalSnapshot; + return result; } public long snapshotCount() throws IOException { From 6fea0910533847b62610885b716f8718473bbed6 Mon Sep 17 00:00:00 2001 From: herefree <841043203@qq.com> Date: Sat, 4 Jan 2025 15:19:03 +0800 Subject: [PATCH 2/2] add ut --- .../apache/paimon/utils/SnapshotManager.java | 4 +- .../paimon/utils/SnapshotManagerTest.java | 212 +++++++++++++++++- 2 files changed, 211 insertions(+), 5 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java index f629efa97725..bccc674a5f8a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java @@ -20,6 +20,7 @@ import org.apache.paimon.Changelog; import org.apache.paimon.Snapshot; +import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.FileStatus; import org.apache.paimon.fs.Path; @@ -409,7 +410,8 @@ private Long findFirstSnapshotWithWatermark(Long earliest, Long latest) { return null; } - private @Nullable Long binarySearch( + @VisibleForTesting + public @Nullable Long binarySearch( Long start, Long end, java.util.function.Predicate condition, diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java index e828a0c90a9d..ead53ec6e001 100644 --- a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java @@ -35,6 +35,7 @@ import java.util.Set; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Predicate; import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; @@ -155,18 +156,164 @@ public void testLaterOrEqualTimeMills() throws IOException { } @Test - public void testlaterOrEqualWatermark() throws IOException { + public void testLaterOrEqualWatermark() throws IOException { long millis = Long.MIN_VALUE; FileIO localFileIO = LocalFileIO.create(); SnapshotManager snapshotManager = new SnapshotManager(localFileIO, new Path(tempDir.toString())); - // create 10 snapshots - for (long i = 0; i < 10; i++) { + // create 10 snapshots, watermarks are Long.MIN_VALUE. + long i = 0; + for (; i < 10; i++) { Snapshot snapshot = createSnapshotWithMillis(i, millis, Long.MIN_VALUE); localFileIO.tryToWriteAtomic(snapshotManager.snapshotPath(i), snapshot.toJson()); } // smaller than the second snapshot assertThat(snapshotManager.laterOrEqualWatermark(millis + 999)).isNull(); + + // create 5 snapshots, watermarks are not Long.MIN_VALUE. + for (; i < 15; i++) { + Snapshot snapshot = createSnapshotWithMillis(i, millis, i * 10 + 1000L); + localFileIO.tryToWriteAtomic(snapshotManager.snapshotPath(i), snapshot.toJson()); + } + + assertThat(snapshotManager.laterOrEqualWatermark(1100L)) + .isEqualTo(createSnapshotWithMillis(10, millis, 1100L)); + assertThat(snapshotManager.laterOrEqualWatermark(1101L)) + .isEqualTo(createSnapshotWithMillis(11, millis, 1110L)); + assertThat(snapshotManager.laterOrEqualWatermark(99L)) + .isEqualTo(createSnapshotWithMillis(10, millis, 1100L)); + assertThat(snapshotManager.laterOrEqualWatermark(1110L)) + .isEqualTo(createSnapshotWithMillis(11, millis, 1110L)); + assertThat(snapshotManager.laterOrEqualWatermark(1140L)) + .isEqualTo(createSnapshotWithMillis(14, millis, 1140L)); + assertThat(snapshotManager.laterOrEqualWatermark(1141L)).isNull(); + + // delete snapshots + for (i = 0; i < 15; i++) { + localFileIO.deleteQuietly(snapshotManager.snapshotPath(i)); + } + + // create 15 snapshots, the first 10 watermark are null. + i = 0; + for (; i < 10; i++) { + Snapshot snapshot = createSnapshotWithMillis(i, millis, null); + localFileIO.tryToWriteAtomic(snapshotManager.snapshotPath(i), snapshot.toJson()); + } + for (; i < 15; i++) { + Snapshot snapshot = createSnapshotWithMillis(i, millis, i * 10 + 1000); + localFileIO.tryToWriteAtomic(snapshotManager.snapshotPath(i), snapshot.toJson()); + } + assertThat(snapshotManager.laterOrEqualWatermark(1100L)) + .isEqualTo(createSnapshotWithMillis(10, millis, 1100L)); + assertThat(snapshotManager.laterOrEqualWatermark(1101L)) + .isEqualTo(createSnapshotWithMillis(11, millis, 1110L)); + assertThat(snapshotManager.laterOrEqualWatermark(99L)) + .isEqualTo(createSnapshotWithMillis(10, millis, 1100L)); + assertThat(snapshotManager.laterOrEqualWatermark(1110L)) + .isEqualTo(createSnapshotWithMillis(11, millis, 1110L)); + assertThat(snapshotManager.laterOrEqualWatermark(1140L)) + .isEqualTo(createSnapshotWithMillis(14, millis, 1140L)); + assertThat(snapshotManager.laterOrEqualWatermark(1141L)).isNull(); + + // delete snapshots + for (i = 0; i < 15; i++) { + localFileIO.deleteQuietly(snapshotManager.snapshotPath(i)); + } + + // create 15 snapshots, all watermark are not null. + i = 0; + for (; i < 15; i++) { + Snapshot snapshot = createSnapshotWithMillis(i, millis, i * 10 + 1000); + localFileIO.tryToWriteAtomic(snapshotManager.snapshotPath(i), snapshot.toJson()); + } + assertThat(snapshotManager.laterOrEqualWatermark(999L)) + .isEqualTo(createSnapshotWithMillis(0, millis, 1000L)); + assertThat(snapshotManager.laterOrEqualWatermark(1000L)) + .isEqualTo(createSnapshotWithMillis(0, millis, 1000L)); + assertThat(snapshotManager.laterOrEqualWatermark(1001L)) + .isEqualTo(createSnapshotWithMillis(1, millis, 1010L)); + assertThat(snapshotManager.laterOrEqualWatermark(1140L)) + .isEqualTo(createSnapshotWithMillis(14, millis, 1140L)); + assertThat(snapshotManager.laterOrEqualWatermark(1141L)).isNull(); + } + + @Test + public void testEarlierOrEqualWatermark() throws IOException { + long millis = Long.MIN_VALUE; + FileIO localFileIO = LocalFileIO.create(); + SnapshotManager snapshotManager = + new SnapshotManager(localFileIO, new Path(tempDir.toString())); + // create 10 snapshots, watermarks are Long.MIN_VALUE. + long i = 0; + for (; i < 10; i++) { + Snapshot snapshot = createSnapshotWithMillis(i, millis, Long.MIN_VALUE); + localFileIO.tryToWriteAtomic(snapshotManager.snapshotPath(i), snapshot.toJson()); + } + // smaller than the second snapshot + assertThat(snapshotManager.earlierOrEqualWatermark(millis + 999)).isNull(); + + // create 5 snapshots, watermarks are not Long.MIN_VALUE. + for (; i < 15; i++) { + Snapshot snapshot = createSnapshotWithMillis(i, millis, i * 10 + 1000L); + localFileIO.tryToWriteAtomic(snapshotManager.snapshotPath(i), snapshot.toJson()); + } + + assertThat(snapshotManager.earlierOrEqualWatermark(1140L)) + .isEqualTo(createSnapshotWithMillis(14, millis, 1140L)); + assertThat(snapshotManager.earlierOrEqualWatermark(1141L)) + .isEqualTo(createSnapshotWithMillis(14, millis, 1140L)); + assertThat(snapshotManager.earlierOrEqualWatermark(1139L)) + .isEqualTo(createSnapshotWithMillis(13, millis, 1130L)); + assertThat(snapshotManager.earlierOrEqualWatermark(1100L)) + .isEqualTo(createSnapshotWithMillis(10, millis, 1100L)); + assertThat(snapshotManager.earlierOrEqualWatermark(1099L)) + .isEqualTo(createSnapshotWithMillis(9, millis, Long.MIN_VALUE)); + + // delete snapshots + for (i = 0; i < 15; i++) { + localFileIO.deleteQuietly(snapshotManager.snapshotPath(i)); + } + + // create 15 snapshots, the first 10 watermark are null. + i = 0; + for (; i < 10; i++) { + Snapshot snapshot = createSnapshotWithMillis(i, millis, null); + localFileIO.tryToWriteAtomic(snapshotManager.snapshotPath(i), snapshot.toJson()); + } + for (; i < 15; i++) { + Snapshot snapshot = createSnapshotWithMillis(i, millis, i * 10 + 1000L); + localFileIO.tryToWriteAtomic(snapshotManager.snapshotPath(i), snapshot.toJson()); + } + assertThat(snapshotManager.earlierOrEqualWatermark(1140L)) + .isEqualTo(createSnapshotWithMillis(14, millis, 1140L)); + assertThat(snapshotManager.earlierOrEqualWatermark(1141L)) + .isEqualTo(createSnapshotWithMillis(14, millis, 1140L)); + assertThat(snapshotManager.earlierOrEqualWatermark(1139L)) + .isEqualTo(createSnapshotWithMillis(13, millis, 1130L)); + assertThat(snapshotManager.earlierOrEqualWatermark(1100L)) + .isEqualTo(createSnapshotWithMillis(10, millis, 1100L)); + assertThat(snapshotManager.earlierOrEqualWatermark(1099L)).isNull(); + + // delete snapshots + for (i = 0; i < 15; i++) { + localFileIO.deleteQuietly(snapshotManager.snapshotPath(i)); + } + + // create 15 snapshots, all watermarks not null. + i = 0; + for (; i < 15; i++) { + Snapshot snapshot = createSnapshotWithMillis(i, millis, i * 10 + 1000L); + localFileIO.tryToWriteAtomic(snapshotManager.snapshotPath(i), snapshot.toJson()); + } + assertThat(snapshotManager.earlierOrEqualWatermark(1140L)) + .isEqualTo(createSnapshotWithMillis(14, millis, 1140L)); + assertThat(snapshotManager.earlierOrEqualWatermark(1141L)) + .isEqualTo(createSnapshotWithMillis(14, millis, 1140L)); + assertThat(snapshotManager.earlierOrEqualWatermark(1139L)) + .isEqualTo(createSnapshotWithMillis(13, millis, 1130L)); + assertThat(snapshotManager.earlierOrEqualWatermark(1000L)) + .isEqualTo(createSnapshotWithMillis(0, millis, 1000L)); + assertThat(snapshotManager.earlierOrEqualWatermark(999L)).isNull(); } private Snapshot createSnapshotWithMillis(long id, long millis) { @@ -189,7 +336,7 @@ private Snapshot createSnapshotWithMillis(long id, long millis) { null); } - private Snapshot createSnapshotWithMillis(long id, long millis, long watermark) { + private Snapshot createSnapshotWithMillis(long id, long millis, Long watermark) { return new Snapshot( id, 0L, @@ -410,4 +557,61 @@ public void testCommitChangelogWhenSameChangelogCommitTwice() throws IOException snapshotManager.commitChangelog(changelog, id); assertDoesNotThrow(() -> snapshotManager.commitChangelog(changelog, id)); } + + @Test + public void testBinarySearch() { + FileIO localFileIO = LocalFileIO.create(); + SnapshotManager snapshotManager = + new SnapshotManager(localFileIO, new Path(tempDir.toString())); + // findEarliest = true + Predicate condition = id -> id >= 5L; + Assertions.assertThat(snapshotManager.binarySearch(2L, 10L, condition, true)).isEqualTo(5L); + condition = id -> id > 5L; + Assertions.assertThat(snapshotManager.binarySearch(2L, 10L, condition, true)).isEqualTo(6L); + condition = id -> id >= 10L; + Assertions.assertThat(snapshotManager.binarySearch(2L, 10L, condition, true)) + .isEqualTo(10L); + condition = id -> id > 10L; + Assertions.assertThat(snapshotManager.binarySearch(2L, 10L, condition, true)).isNull(); + condition = id -> id >= 11L; + Assertions.assertThat(snapshotManager.binarySearch(2L, 10L, condition, true)).isNull(); + condition = id -> id > 11L; + Assertions.assertThat(snapshotManager.binarySearch(2L, 10L, condition, true)).isNull(); + condition = id -> id >= 1L; + Assertions.assertThat(snapshotManager.binarySearch(2L, 10L, condition, true)).isEqualTo(2L); + condition = id -> id > 1L; + Assertions.assertThat(snapshotManager.binarySearch(2L, 10L, condition, true)).isEqualTo(2L); + condition = id -> id >= 2L; + Assertions.assertThat(snapshotManager.binarySearch(2L, 10L, condition, true)).isEqualTo(2L); + condition = id -> id > 2L; + Assertions.assertThat(snapshotManager.binarySearch(2L, 10L, condition, true)).isEqualTo(3L); + // findEarliest = false + condition = id -> id <= 5; + Assertions.assertThat(snapshotManager.binarySearch(2L, 10L, condition, false)) + .isEqualTo(5L); + condition = id -> id < 5; + Assertions.assertThat(snapshotManager.binarySearch(2L, 10L, condition, false)) + .isEqualTo(4L); + condition = id -> id <= 10; + Assertions.assertThat(snapshotManager.binarySearch(2L, 10L, condition, false)) + .isEqualTo(10L); + condition = id -> id < 10; + Assertions.assertThat(snapshotManager.binarySearch(2L, 10L, condition, false)) + .isEqualTo(9L); + condition = id -> id <= 11; + Assertions.assertThat(snapshotManager.binarySearch(2L, 10L, condition, false)) + .isEqualTo(10L); + condition = id -> id < 11; + Assertions.assertThat(snapshotManager.binarySearch(2L, 10L, condition, false)) + .isEqualTo(10L); + condition = id -> id <= 1; + Assertions.assertThat(snapshotManager.binarySearch(2L, 10L, condition, false)).isNull(); + condition = id -> id < 1; + Assertions.assertThat(snapshotManager.binarySearch(2L, 10L, condition, false)).isNull(); + condition = id -> id <= 2; + Assertions.assertThat(snapshotManager.binarySearch(2L, 10L, condition, false)) + .isEqualTo(2L); + condition = id -> id < 2; + Assertions.assertThat(snapshotManager.binarySearch(2L, 10L, condition, false)).isNull(); + } }