From d7b94cd27e217f63705dd9610a5665ae7c01eb06 Mon Sep 17 00:00:00 2001 From: Allison Portis Date: Wed, 11 Sep 2024 13:03:00 -0700 Subject: [PATCH] [Kernel] Add some logging to the getChanges implementation (#3667) #### Which Delta project/connector is this regarding? - [ ] Spark - [ ] Standalone - [ ] Flink - [X] Kernel - [ ] Other (fill in here) ## Description Adds some logging to the getChanges implementation. ## How was this patch tested? N/A --- .../kernel/internal/DeltaLogActionUtils.java | 21 +++++++++++++++++++ .../io/delta/kernel/internal/TableImpl.java | 8 +++++++ 2 files changed, 29 insertions(+) diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaLogActionUtils.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaLogActionUtils.java index c39ffa10c1..ce47da8327 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaLogActionUtils.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaLogActionUtils.java @@ -38,6 +38,8 @@ import java.io.UncheckedIOException; import java.util.*; import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Exposes APIs to read the raw actions within the *commit files* of the _delta_log. This is used @@ -47,6 +49,8 @@ public class DeltaLogActionUtils { private DeltaLogActionUtils() {} + private static final Logger logger = LoggerFactory.getLogger(DeltaLogActionUtils.class); + ///////////////// // Public APIs // ///////////////// @@ -262,14 +266,25 @@ private static CloseableIterator listLogDir( private static List listCommitFiles( Engine engine, Path tablePath, long startVersion, long endVersion) { + // TODO update to support coordinated commits; suggested to load the Snapshot at endVersion + // and get the backfilled/unbackfilled commits from the LogSegment to combine with commit files + // listed from [startVersion, LogSegment.checkpointVersion] + logger.info( + "{}: Listing the commit files for versions [{}, {}]", tablePath, startVersion, endVersion); + long startTimeMillis = System.currentTimeMillis(); final List output = new ArrayList<>(); try (CloseableIterator fsIter = listLogDir(engine, tablePath, startVersion)) { while (fsIter.hasNext()) { FileStatus fs = fsIter.next(); if (!FileNames.isCommitFile(getName(fs.getPath()))) { + logger.debug("Ignoring non-commit file {}", fs.getPath()); continue; } if (FileNames.getFileVersion(new Path(fs.getPath())) > endVersion) { + logger.debug( + "Stopping listing found file {} with version > {}=endVersion", + fs.getPath(), + endVersion); break; } output.add(fs); @@ -277,6 +292,12 @@ private static List listCommitFiles( } catch (IOException e) { throw new UncheckedIOException("Unable to close resource", e); } + logger.info( + "{}: Took {} ms to list the commit files for versions [{}, {}]", + tablePath, + System.currentTimeMillis() - startTimeMillis, + startVersion, + endVersion); return output; } } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableImpl.java index ab746e1a7d..1f32099daa 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableImpl.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableImpl.java @@ -39,8 +39,13 @@ import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class TableImpl implements Table { + + private static final Logger logger = LoggerFactory.getLogger(TableImpl.class); + public static Table forPath(Engine engine, String path) { return forPath(engine, path, System::currentTimeMillis); } @@ -296,6 +301,8 @@ private CloseableIterator getRawChanges( long endVersion, Set actionSet) { + logger.info( + "{}: Getting the commit files for versions [{}, {}]", tablePath, startVersion, endVersion); List commitFiles = DeltaLogActionUtils.getCommitFilesForVersionRange( engine, new Path(tablePath), startVersion, endVersion); @@ -306,6 +313,7 @@ private CloseableIterator getRawChanges( .map(action -> new StructField(action.colName, action.schema, true)) .collect(Collectors.toList())); + logger.info("{}: Reading the commit files with readSchema {}", tablePath, readSchema); return DeltaLogActionUtils.readCommitFiles(engine, commitFiles, readSchema); } }