Skip to content

Commit

Permalink
Added data file ordering based on (dataSequenceNumber, fileSequenceNu…
Browse files Browse the repository at this point in the history
…mber, pos)
  • Loading branch information
malhotrashivam committed Nov 7, 2024
1 parent 9a72894 commit e9f2e66
Showing 1 changed file with 17 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -149,11 +149,10 @@ public synchronized void findKeys(@NotNull final Consumer<IcebergTableLocationKe
return;
}
final Table table = tableAdapter.icebergTable();
final List<DataFile> dataFiles = new ArrayList<>();
try {
// Retrieve the manifest files from the snapshot
final List<ManifestFile> manifestFiles = snapshot.allManifests(table.io());
// Sort manifest files by sequence number to read data files in the correct commit order
manifestFiles.sort(Comparator.comparingLong(ManifestFile::sequenceNumber));
// TODO(deephaven-core#5989: Add unit tests for the ordering of manifest files
for (final ManifestFile manifestFile : manifestFiles) {
// Currently only can process manifest files with DATA content type.
Expand All @@ -163,28 +162,28 @@ public synchronized void findKeys(@NotNull final Consumer<IcebergTableLocationKe
table, snapshot.snapshotId(), manifestFile.content()));
}
try (final ManifestReader<DataFile> reader = ManifestFiles.read(manifestFile, table.io())) {
// Sort the data files by sequence number to read them in the correct order
final List<DataFile> dataFiles = new ArrayList<>();
for (final DataFile df : reader) {
dataFiles.add(df);
}
dataFiles.sort(Comparator.comparingLong(DataFile::dataSequenceNumber));

// Process the data files
for (final DataFile df : dataFiles) {
final URI fileUri = dataFileUri(df);
final IcebergTableLocationKey locationKey =
cache.computeIfAbsent(fileUri, uri -> keyFromDataFile(df, fileUri));
if (locationKey != null) {
locationKeyObserver.accept(locationKey);
}
}
reader.forEach(dataFiles::add);
}
}
} catch (final Exception e) {
throw new TableDataException(
String.format("%s:%d - error finding Iceberg locations", tableAdapter, snapshot.snapshotId()), e);
}

// Sort manifest files to read data files in the correct commit order
dataFiles.sort(Comparator
.comparingLong(DataFile::dataSequenceNumber)
.thenComparingLong(DataFile::fileSequenceNumber)
.thenComparingLong(DataFile::pos));

for (final DataFile df : dataFiles) {
final URI fileUri = dataFileUri(df);
final IcebergTableLocationKey locationKey =
cache.computeIfAbsent(fileUri, uri -> keyFromDataFile(df, fileUri));
if (locationKey != null) {
locationKeyObserver.accept(locationKey);
}
}
}

/**
Expand Down

0 comments on commit e9f2e66

Please sign in to comment.