Skip to content

Commit

Permalink
[Kernel] Adds protocol checks to the public getChanges API on TableIm…
Browse files Browse the repository at this point in the history
…pl (delta-io#3651)

<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [ ] Spark
- [ ] Standalone
- [ ] Flink
- [X] Kernel
- [ ] Other (fill in here)

## Description

To avoid reading invalid tables, Kernel should check that any read
protocol actions are supported by Kernel. This PR makes the current API
private, and adds a public API around it that does this check when the
`Protocol` is included in the list of actions to be read from the file.

Also removes the "byVersion" part of the API name since we are adding
separate timestamp APIs in delta-io#3650.

## How was this patch tested?

Adds unit tests.
  • Loading branch information
allisonport-db authored Sep 11, 2024
1 parent b843ad6 commit 27cdcb9
Show file tree
Hide file tree
Showing 6 changed files with 145 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.sql.Timestamp;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;

/** Contains methods to create user-facing Delta exceptions. */
Expand Down Expand Up @@ -149,12 +150,13 @@ public static KernelException unsupportedReaderProtocol(
return new KernelException(message);
}

public static KernelException unsupportedReaderFeature(String tablePath, String readerFeature) {
public static KernelException unsupportedReaderFeature(
String tablePath, Set<String> unsupportedFeatures) {
String message =
String.format(
"Unsupported Delta reader feature: table `%s` requires reader table feature \"%s\" "
"Unsupported Delta reader features: table `%s` requires reader table features [%s] "
+ "which is unsupported by this version of Delta Kernel.",
tablePath, readerFeature);
tablePath, String.join(", ", unsupportedFeatures));
return new KernelException(message);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ private DeltaLogActionUtils() {}

/**
* Represents a Delta action. This is used to request which actions to read from the commit files
* in {@link TableImpl#getChangesByVersion(Engine, long, long, Set)}.
* in {@link TableImpl#getChanges(Engine, long, long, Set)}.
*
* <p>See the Delta protocol for more details
* https://github.com/delta-io/delta/blob/master/PROTOCOL.md#actions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,7 @@
import io.delta.kernel.internal.util.ColumnMapping;
import io.delta.kernel.internal.util.Tuple2;
import io.delta.kernel.types.StructType;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.*;
import java.util.stream.Collectors;

/** Contains utility methods related to the Delta table feature support in protocol. */
Expand All @@ -43,34 +40,40 @@ public class TableFeatures {
}
});

private static final Set<String> SUPPORTED_READER_FEATURES =
Collections.unmodifiableSet(
new HashSet<String>() {
{
add("columnMapping");
add("deletionVectors");
add("timestampNtz");
add("vacuumProtocolCheck");
add("variantType-preview");
add("v2Checkpoint");
}
});

////////////////////
// Helper Methods //
////////////////////

public static void validateReadSupportedTable(
Protocol protocol, Metadata metadata, String tablePath) {
Protocol protocol, String tablePath, Optional<Metadata> metadata) {
switch (protocol.getMinReaderVersion()) {
case 1:
break;
case 2:
ColumnMapping.throwOnUnsupportedColumnMappingMode(metadata);
metadata.ifPresent(ColumnMapping::throwOnUnsupportedColumnMappingMode);
break;
case 3:
List<String> readerFeatures = protocol.getReaderFeatures();
for (String readerFeature : readerFeatures) {
switch (readerFeature) {
case "columnMapping":
ColumnMapping.throwOnUnsupportedColumnMappingMode(metadata);
break;
case "deletionVectors": // fall through
case "timestampNtz": // fall through
case "vacuumProtocolCheck": // fall through
case "variantType-preview": // fall through
case "v2Checkpoint":
break;
default:
throw DeltaErrors.unsupportedReaderFeature(tablePath, readerFeature);
}
if (!SUPPORTED_READER_FEATURES.containsAll(readerFeatures)) {
Set<String> unsupportedFeatures = new HashSet<>(readerFeatures);
unsupportedFeatures.removeAll(SUPPORTED_READER_FEATURES);
throw DeltaErrors.unsupportedReaderFeature(tablePath, unsupportedFeatures);
}
if (readerFeatures.contains("columnMapping")) {
metadata.ifPresent(ColumnMapping::throwOnUnsupportedColumnMappingMode);
}
break;
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
import static io.delta.kernel.internal.DeltaErrors.wrapEngineExceptionThrowsIO;

import io.delta.kernel.*;
import io.delta.kernel.data.ColumnVector;
import io.delta.kernel.data.ColumnarBatch;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.exceptions.CheckpointAlreadyExistsException;
import io.delta.kernel.exceptions.KernelException;
import io.delta.kernel.exceptions.TableNotFoundException;
import io.delta.kernel.internal.actions.Protocol;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.snapshot.SnapshotManager;
import io.delta.kernel.internal.util.Clock;
Expand All @@ -32,7 +34,9 @@
import io.delta.kernel.utils.FileStatus;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -112,6 +116,66 @@ public Clock getClock() {
return clock;
}

/**
* Returns delta actions for each version between startVersion and endVersion. Only returns the
* actions requested in actionSet.
*
* <p>For the returned columnar batches:
*
* <ul>
* <li>Each row within the same batch is guaranteed to have the same commit version
* <li>The batch commit versions are monotonically increasing
* <li>The top-level columns include "version", "timestamp", and the actions requested in
* actionSet. "version" and "timestamp" are the first and second columns in the schema,
* respectively. The remaining columns are based on the actions requested and each have the
* schema found in {@code DeltaAction.schema}.
* </ul>
*
* @param engine {@link Engine} instance to use in Delta Kernel.
* @param startVersion start version (inclusive)
* @param endVersion end version (inclusive)
* @param actionSet the actions to read and return from the JSON log files
* @return an iterator of batches where each row in the batch has exactly one non-null action and
* its commit version and timestamp
* @throws TableNotFoundException if the table does not exist or if it is not a delta table
* @throws KernelException if a commit file does not exist for any of the versions in the provided
* range
* @throws KernelException if provided an invalid version range
* @throws KernelException if the version range contains a version with reader protocol that is
* unsupported by Kernel
*/
public CloseableIterator<ColumnarBatch> getChanges(
Engine engine,
long startVersion,
long endVersion,
Set<DeltaLogActionUtils.DeltaAction> actionSet) {
// Create a new action set that always contains protocol
Set<DeltaLogActionUtils.DeltaAction> copySet = new HashSet<>(actionSet);
copySet.add(DeltaLogActionUtils.DeltaAction.PROTOCOL);
// If protocol is not in the original requested actions we drop the column before returning
boolean shouldDropProtocolColumn =
!actionSet.contains(DeltaLogActionUtils.DeltaAction.PROTOCOL);

return getRawChanges(engine, startVersion, endVersion, copySet)
.map(
batch -> {
int protocolIdx = batch.getSchema().indexOf("protocol"); // must exist
ColumnVector protocolVector = batch.getColumnVector(protocolIdx);
for (int rowId = 0; rowId < protocolVector.getSize(); rowId++) {
if (!protocolVector.isNullAt(rowId)) {
Protocol protocol = Protocol.fromColumnVector(protocolVector, rowId);
TableFeatures.validateReadSupportedTable(
protocol, getDataPath().toString(), Optional.empty());
}
}
if (shouldDropProtocolColumn) {
return batch.withDeletedColumnAt(protocolIdx);
} else {
return batch;
}
});
}

protected Path getDataPath() {
return new Path(tablePath);
}
Expand Down Expand Up @@ -226,7 +290,7 @@ public long getVersionAtOrAfterTimestamp(Engine engine, long millisSinceEpochUTC
* range
* @throws KernelException if provided an invalid version range
*/
public CloseableIterator<ColumnarBatch> getChangesByVersion(
private CloseableIterator<ColumnarBatch> getRawChanges(
Engine engine,
long startVersion,
long endVersion,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,8 @@ protected Tuple2<Protocol, Metadata> loadTableProtocolAndMetadata(

if (protocol != null) {
// Stop since we have found the latest Protocol and Metadata.
TableFeatures.validateReadSupportedTable(protocol, metadata, dataPath.toString());
TableFeatures.validateReadSupportedTable(
protocol, dataPath.toString(), Optional.of(metadata));
return new Tuple2<>(protocol, metadata);
}

Expand Down
Loading

0 comments on commit 27cdcb9

Please sign in to comment.