Skip to content

Commit

Permalink
[Kernel] Add some logging to the getChanges implementation (delta-io#…
Browse files Browse the repository at this point in the history
…3667)

<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [ ] Spark
- [ ] Standalone
- [ ] Flink
- [X] Kernel
- [ ] Other (fill in here)

## Description

Adds some logging to the getChanges implementation.

## How was this patch tested?

N/A
  • Loading branch information
allisonport-db authored Sep 11, 2024
1 parent 27cdcb9 commit d7b94cd
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import java.io.UncheckedIOException;
import java.util.*;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Exposes APIs to read the raw actions within the *commit files* of the _delta_log. This is used
Expand All @@ -47,6 +49,8 @@ public class DeltaLogActionUtils {

private DeltaLogActionUtils() {}

private static final Logger logger = LoggerFactory.getLogger(DeltaLogActionUtils.class);

/////////////////
// Public APIs //
/////////////////
Expand Down Expand Up @@ -262,21 +266,38 @@ private static CloseableIterator<FileStatus> listLogDir(
private static List<FileStatus> listCommitFiles(
Engine engine, Path tablePath, long startVersion, long endVersion) {

// TODO update to support coordinated commits; suggested to load the Snapshot at endVersion
// and get the backfilled/unbackfilled commits from the LogSegment to combine with commit files
// listed from [startVersion, LogSegment.checkpointVersion]
logger.info(
"{}: Listing the commit files for versions [{}, {}]", tablePath, startVersion, endVersion);
long startTimeMillis = System.currentTimeMillis();
final List<FileStatus> output = new ArrayList<>();
try (CloseableIterator<FileStatus> fsIter = listLogDir(engine, tablePath, startVersion)) {
while (fsIter.hasNext()) {
FileStatus fs = fsIter.next();
if (!FileNames.isCommitFile(getName(fs.getPath()))) {
logger.debug("Ignoring non-commit file {}", fs.getPath());
continue;
}
if (FileNames.getFileVersion(new Path(fs.getPath())) > endVersion) {
logger.debug(
"Stopping listing found file {} with version > {}=endVersion",
fs.getPath(),
endVersion);
break;
}
output.add(fs);
}
} catch (IOException e) {
throw new UncheckedIOException("Unable to close resource", e);
}
logger.info(
"{}: Took {} ms to list the commit files for versions [{}, {}]",
tablePath,
System.currentTimeMillis() - startTimeMillis,
startVersion,
endVersion);
return output;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,13 @@
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TableImpl implements Table {

private static final Logger logger = LoggerFactory.getLogger(TableImpl.class);

public static Table forPath(Engine engine, String path) {
return forPath(engine, path, System::currentTimeMillis);
}
Expand Down Expand Up @@ -296,6 +301,8 @@ private CloseableIterator<ColumnarBatch> getRawChanges(
long endVersion,
Set<DeltaLogActionUtils.DeltaAction> actionSet) {

logger.info(
"{}: Getting the commit files for versions [{}, {}]", tablePath, startVersion, endVersion);
List<FileStatus> commitFiles =
DeltaLogActionUtils.getCommitFilesForVersionRange(
engine, new Path(tablePath), startVersion, endVersion);
Expand All @@ -306,6 +313,7 @@ private CloseableIterator<ColumnarBatch> getRawChanges(
.map(action -> new StructField(action.colName, action.schema, true))
.collect(Collectors.toList()));

logger.info("{}: Reading the commit files with readSchema {}", tablePath, readSchema);
return DeltaLogActionUtils.readCommitFiles(engine, commitFiles, readSchema);
}
}

0 comments on commit d7b94cd

Please sign in to comment.