From 27cdcb901e6b7c2d929c5f576a90f47f38e89716 Mon Sep 17 00:00:00 2001 From: Allison Portis Date: Wed, 11 Sep 2024 11:37:13 -0700 Subject: [PATCH] [Kernel] Adds protocol checks to the public getChanges API on TableImpl (#3651) #### Which Delta project/connector is this regarding? - [ ] Spark - [ ] Standalone - [ ] Flink - [X] Kernel - [ ] Other (fill in here) ## Description To avoid reading invalid tables, Kernel should check that any read protocol actions are supported by Kernel. This PR makes the current API private, and adds a public API around it that does this check when the `Protocol` is included in the list of actions to be read from the file. Also removes the "byVersion" part of the API name since we are adding separate timestamp APIs in https://github.com/delta-io/delta/pull/3650. ## How was this patch tested? Adds unit tests. --- .../io/delta/kernel/internal/DeltaErrors.java | 8 +- .../kernel/internal/DeltaLogActionUtils.java | 2 +- .../delta/kernel/internal/TableFeatures.java | 43 +++++----- .../io/delta/kernel/internal/TableImpl.java | 66 +++++++++++++++- .../kernel/internal/replay/LogReplay.java | 3 +- .../kernel/defaults/TableChangesSuite.scala | 79 ++++++++++++------- 6 files changed, 145 insertions(+), 56 deletions(-) diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaErrors.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaErrors.java index 2ff1a315fb7..6a3dc23fd4c 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaErrors.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaErrors.java @@ -25,6 +25,7 @@ import java.sql.Timestamp; import java.util.List; import java.util.Optional; +import java.util.Set; import java.util.function.Supplier; /** Contains methods to create user-facing Delta exceptions. */ @@ -149,12 +150,13 @@ public static KernelException unsupportedReaderProtocol( return new KernelException(message); } - public static KernelException unsupportedReaderFeature(String tablePath, String readerFeature) { + public static KernelException unsupportedReaderFeature( + String tablePath, Set unsupportedFeatures) { String message = String.format( - "Unsupported Delta reader feature: table `%s` requires reader table feature \"%s\" " + "Unsupported Delta reader features: table `%s` requires reader table features [%s] " + "which is unsupported by this version of Delta Kernel.", - tablePath, readerFeature); + tablePath, String.join(", ", unsupportedFeatures)); return new KernelException(message); } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaLogActionUtils.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaLogActionUtils.java index 04ee11f6803..c39ffa10c1f 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaLogActionUtils.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaLogActionUtils.java @@ -53,7 +53,7 @@ private DeltaLogActionUtils() {} /** * Represents a Delta action. This is used to request which actions to read from the commit files - * in {@link TableImpl#getChangesByVersion(Engine, long, long, Set)}. + * in {@link TableImpl#getChanges(Engine, long, long, Set)}. * *

See the Delta protocol for more details * https://github.com/delta-io/delta/blob/master/PROTOCOL.md#actions diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableFeatures.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableFeatures.java index 7b595d9fbc9..4aa66ec2fff 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableFeatures.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableFeatures.java @@ -24,10 +24,7 @@ import io.delta.kernel.internal.util.ColumnMapping; import io.delta.kernel.internal.util.Tuple2; import io.delta.kernel.types.StructType; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; +import java.util.*; import java.util.stream.Collectors; /** Contains utility methods related to the Delta table feature support in protocol. */ @@ -43,34 +40,40 @@ public class TableFeatures { } }); + private static final Set SUPPORTED_READER_FEATURES = + Collections.unmodifiableSet( + new HashSet() { + { + add("columnMapping"); + add("deletionVectors"); + add("timestampNtz"); + add("vacuumProtocolCheck"); + add("variantType-preview"); + add("v2Checkpoint"); + } + }); + //////////////////// // Helper Methods // //////////////////// public static void validateReadSupportedTable( - Protocol protocol, Metadata metadata, String tablePath) { + Protocol protocol, String tablePath, Optional metadata) { switch (protocol.getMinReaderVersion()) { case 1: break; case 2: - ColumnMapping.throwOnUnsupportedColumnMappingMode(metadata); + metadata.ifPresent(ColumnMapping::throwOnUnsupportedColumnMappingMode); break; case 3: List readerFeatures = protocol.getReaderFeatures(); - for (String readerFeature : readerFeatures) { - switch (readerFeature) { - case "columnMapping": - ColumnMapping.throwOnUnsupportedColumnMappingMode(metadata); - break; - case "deletionVectors": // fall through - case "timestampNtz": // fall through - case "vacuumProtocolCheck": // fall through - case "variantType-preview": // fall through - case "v2Checkpoint": - break; - default: - throw DeltaErrors.unsupportedReaderFeature(tablePath, readerFeature); - } + if (!SUPPORTED_READER_FEATURES.containsAll(readerFeatures)) { + Set unsupportedFeatures = new HashSet<>(readerFeatures); + unsupportedFeatures.removeAll(SUPPORTED_READER_FEATURES); + throw DeltaErrors.unsupportedReaderFeature(tablePath, unsupportedFeatures); + } + if (readerFeatures.contains("columnMapping")) { + metadata.ifPresent(ColumnMapping::throwOnUnsupportedColumnMappingMode); } break; default: diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableImpl.java index 2dd6c6b2877..ab746e1a7d1 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableImpl.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableImpl.java @@ -18,11 +18,13 @@ import static io.delta.kernel.internal.DeltaErrors.wrapEngineExceptionThrowsIO; import io.delta.kernel.*; +import io.delta.kernel.data.ColumnVector; import io.delta.kernel.data.ColumnarBatch; import io.delta.kernel.engine.Engine; import io.delta.kernel.exceptions.CheckpointAlreadyExistsException; import io.delta.kernel.exceptions.KernelException; import io.delta.kernel.exceptions.TableNotFoundException; +import io.delta.kernel.internal.actions.Protocol; import io.delta.kernel.internal.fs.Path; import io.delta.kernel.internal.snapshot.SnapshotManager; import io.delta.kernel.internal.util.Clock; @@ -32,7 +34,9 @@ import io.delta.kernel.utils.FileStatus; import java.io.IOException; import java.io.UncheckedIOException; +import java.util.HashSet; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -112,6 +116,66 @@ public Clock getClock() { return clock; } + /** + * Returns delta actions for each version between startVersion and endVersion. Only returns the + * actions requested in actionSet. + * + *

For the returned columnar batches: + * + *

+ * + * @param engine {@link Engine} instance to use in Delta Kernel. + * @param startVersion start version (inclusive) + * @param endVersion end version (inclusive) + * @param actionSet the actions to read and return from the JSON log files + * @return an iterator of batches where each row in the batch has exactly one non-null action and + * its commit version and timestamp + * @throws TableNotFoundException if the table does not exist or if it is not a delta table + * @throws KernelException if a commit file does not exist for any of the versions in the provided + * range + * @throws KernelException if provided an invalid version range + * @throws KernelException if the version range contains a version with reader protocol that is + * unsupported by Kernel + */ + public CloseableIterator getChanges( + Engine engine, + long startVersion, + long endVersion, + Set actionSet) { + // Create a new action set that always contains protocol + Set copySet = new HashSet<>(actionSet); + copySet.add(DeltaLogActionUtils.DeltaAction.PROTOCOL); + // If protocol is not in the original requested actions we drop the column before returning + boolean shouldDropProtocolColumn = + !actionSet.contains(DeltaLogActionUtils.DeltaAction.PROTOCOL); + + return getRawChanges(engine, startVersion, endVersion, copySet) + .map( + batch -> { + int protocolIdx = batch.getSchema().indexOf("protocol"); // must exist + ColumnVector protocolVector = batch.getColumnVector(protocolIdx); + for (int rowId = 0; rowId < protocolVector.getSize(); rowId++) { + if (!protocolVector.isNullAt(rowId)) { + Protocol protocol = Protocol.fromColumnVector(protocolVector, rowId); + TableFeatures.validateReadSupportedTable( + protocol, getDataPath().toString(), Optional.empty()); + } + } + if (shouldDropProtocolColumn) { + return batch.withDeletedColumnAt(protocolIdx); + } else { + return batch; + } + }); + } + protected Path getDataPath() { return new Path(tablePath); } @@ -226,7 +290,7 @@ public long getVersionAtOrAfterTimestamp(Engine engine, long millisSinceEpochUTC * range * @throws KernelException if provided an invalid version range */ - public CloseableIterator getChangesByVersion( + private CloseableIterator getRawChanges( Engine engine, long startVersion, long endVersion, diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java index 1d1a229ca31..170dad9ac1f 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java @@ -233,7 +233,8 @@ protected Tuple2 loadTableProtocolAndMetadata( if (protocol != null) { // Stop since we have found the latest Protocol and Metadata. - TableFeatures.validateReadSupportedTable(protocol, metadata, dataPath.toString()); + TableFeatures.validateReadSupportedTable( + protocol, dataPath.toString(), Optional.of(metadata)); return new Tuple2<>(protocol, metadata); } diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/TableChangesSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/TableChangesSuite.scala index 34b57308258..5d6950d856e 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/TableChangesSuite.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/TableChangesSuite.scala @@ -19,6 +19,7 @@ import java.io.File import scala.collection.JavaConverters._ +import io.delta.golden.GoldenTableUtils.goldenTablePath import io.delta.kernel.data.Row import io.delta.kernel.data.ColumnarBatch import io.delta.kernel.defaults.utils.TestUtils @@ -49,7 +50,7 @@ class TableChangesSuite extends AnyFunSuite with TestUtils { * For the given parameters, read the table changes from Kernel using * TableImpl.getChangesByVersion and compare results with Spark */ - def testGetChangesByVersionVsSpark( + def testGetChangesVsSpark( tablePath: String, startVersion: Long, endVersion: Long, @@ -61,7 +62,7 @@ class TableChangesSuite extends AnyFunSuite with TestUtils { val kernelChanges = Table.forPath(defaultEngine, tablePath) .asInstanceOf[TableImpl] - .getChangesByVersion(defaultEngine, startVersion, endVersion, actionSet.asJava) + .getChanges(defaultEngine, startVersion, endVersion, actionSet.asJava) .toSeq // Check schema is as expected (version + timestamp column + the actions requested) @@ -74,41 +75,41 @@ class TableChangesSuite extends AnyFunSuite with TestUtils { } // Golden table from Delta Standalone test - test("getChangesByVersion - golden table deltalog-getChanges valid queries") { + test("getChanges - golden table deltalog-getChanges valid queries") { withGoldenTable("deltalog-getChanges") { tablePath => // request subset of actions - testGetChangesByVersionVsSpark( + testGetChangesVsSpark( tablePath, 0, 2, Set(DeltaAction.REMOVE) ) - testGetChangesByVersionVsSpark( + testGetChangesVsSpark( tablePath, 0, 2, Set(DeltaAction.ADD) ) - testGetChangesByVersionVsSpark( + testGetChangesVsSpark( tablePath, 0, 2, Set(DeltaAction.ADD, DeltaAction.REMOVE, DeltaAction.METADATA, DeltaAction.PROTOCOL) ) // request full actions, various versions - testGetChangesByVersionVsSpark( + testGetChangesVsSpark( tablePath, 0, 2, FULL_ACTION_SET ) - testGetChangesByVersionVsSpark( + testGetChangesVsSpark( tablePath, 1, 2, FULL_ACTION_SET ) - testGetChangesByVersionVsSpark( + testGetChangesVsSpark( tablePath, 0, 0, @@ -117,7 +118,7 @@ class TableChangesSuite extends AnyFunSuite with TestUtils { } } - test("getChangesByVersion - returns correct timestamps") { + test("getChanges - returns correct timestamps") { withTempDir { tempDir => def generateCommits(path: String, commits: Long*): Unit = { @@ -141,7 +142,7 @@ class TableChangesSuite extends AnyFunSuite with TestUtils { // Check the timestamps are returned correctly Table.forPath(defaultEngine, tempDir.getCanonicalPath) .asInstanceOf[TableImpl] - .getChangesByVersion(defaultEngine, 0, 2, Set(DeltaAction.ADD).asJava) + .getChanges(defaultEngine, 0, 2, Set(DeltaAction.ADD).asJava) .toSeq .flatMap(_.getRows.toSeq) .foreach { row => @@ -153,7 +154,7 @@ class TableChangesSuite extends AnyFunSuite with TestUtils { } // Check contents as well - testGetChangesByVersionVsSpark( + testGetChangesVsSpark( tempDir.getCanonicalPath, 0, 2, @@ -162,53 +163,53 @@ class TableChangesSuite extends AnyFunSuite with TestUtils { } } - test("getChangesByVersion - empty _delta_log folder") { + test("getChanges - empty _delta_log folder") { withTempDir { tempDir => new File(tempDir, "delta_log").mkdirs() intercept[TableNotFoundException] { Table.forPath(defaultEngine, tempDir.getCanonicalPath) .asInstanceOf[TableImpl] - .getChangesByVersion(defaultEngine, 0, 2, FULL_ACTION_SET.asJava) + .getChanges(defaultEngine, 0, 2, FULL_ACTION_SET.asJava) } } } - test("getChangesByVersion - empty folder no _delta_log dir") { + test("getChanges - empty folder no _delta_log dir") { withTempDir { tempDir => intercept[TableNotFoundException] { Table.forPath(defaultEngine, tempDir.getCanonicalPath) .asInstanceOf[TableImpl] - .getChangesByVersion(defaultEngine, 0, 2, FULL_ACTION_SET.asJava) + .getChanges(defaultEngine, 0, 2, FULL_ACTION_SET.asJava) } } } - test("getChangesByVersion - non-empty folder not a delta table") { + test("getChanges - non-empty folder not a delta table") { withTempDir { tempDir => spark.range(20).write.format("parquet").mode("overwrite").save(tempDir.getCanonicalPath) intercept[TableNotFoundException] { Table.forPath(defaultEngine, tempDir.getCanonicalPath) .asInstanceOf[TableImpl] - .getChangesByVersion(defaultEngine, 0, 2, FULL_ACTION_SET.asJava) + .getChanges(defaultEngine, 0, 2, FULL_ACTION_SET.asJava) } } } - test("getChangesByVersion - directory does not exist") { + test("getChanges - directory does not exist") { intercept[TableNotFoundException] { Table.forPath(defaultEngine, "/fake/table/path") .asInstanceOf[TableImpl] - .getChangesByVersion(defaultEngine, 0, 2, FULL_ACTION_SET.asJava) + .getChanges(defaultEngine, 0, 2, FULL_ACTION_SET.asJava) } } - test("getChangesByVersion - golden table deltalog-getChanges invalid queries") { + test("getChanges - golden table deltalog-getChanges invalid queries") { withGoldenTable("deltalog-getChanges") { tablePath => def getChangesByVersion( startVersion: Long, endVersion: Long): CloseableIterator[ColumnarBatch] = { Table.forPath(defaultEngine, tablePath) .asInstanceOf[TableImpl] - .getChangesByVersion(defaultEngine, startVersion, endVersion, FULL_ACTION_SET.asJava) + .getChanges(defaultEngine, startVersion, endVersion, FULL_ACTION_SET.asJava) } // startVersion after latest available version @@ -233,7 +234,7 @@ class TableChangesSuite extends AnyFunSuite with TestUtils { } } - test("getChangesByVersion - with truncated log") { + test("getChanges - with truncated log") { withTempDir { tempDir => // PREPARE TEST TABLE val tablePath = tempDir.getCanonicalPath @@ -269,24 +270,24 @@ class TableChangesSuite extends AnyFunSuite with TestUtils { assert(intercept[KernelException] { Table.forPath(defaultEngine, tablePath) .asInstanceOf[TableImpl] - .getChangesByVersion(defaultEngine, 0, 9, FULL_ACTION_SET.asJava) + .getChanges(defaultEngine, 0, 9, FULL_ACTION_SET.asJava) }.getMessage.contains("no log files found in the requested version range")) // startVersion less than the earliest available version assert(intercept[KernelException] { Table.forPath(defaultEngine, tablePath) .asInstanceOf[TableImpl] - .getChangesByVersion(defaultEngine, 5, 11, FULL_ACTION_SET.asJava) + .getChanges(defaultEngine, 5, 11, FULL_ACTION_SET.asJava) }.getMessage.contains("no log file found for version 5")) // TEST VALID CASES - testGetChangesByVersionVsSpark( + testGetChangesVsSpark( tablePath, 10, 12, FULL_ACTION_SET ) - testGetChangesByVersionVsSpark( + testGetChangesVsSpark( tablePath, 11, 12, @@ -295,7 +296,7 @@ class TableChangesSuite extends AnyFunSuite with TestUtils { } } - test("getChangesByVersion - table with a lot of changes") { + test("getChanges - table with a lot of changes") { withTempDir { tempDir => spark.sql( f""" @@ -333,14 +334,14 @@ class TableChangesSuite extends AnyFunSuite with TestUtils { |""".stripMargin) // Check all actions are correctly retrieved - testGetChangesByVersionVsSpark( + testGetChangesVsSpark( tempDir.getCanonicalPath, 0, 6, FULL_ACTION_SET ) // Check some subset of actions - testGetChangesByVersionVsSpark( + testGetChangesVsSpark( tempDir.getCanonicalPath, 0, 6, @@ -349,6 +350,24 @@ class TableChangesSuite extends AnyFunSuite with TestUtils { } } + test("getChanges - fails when protocol is not readable by Kernel") { + // Existing tests suffice to check if the protocol column is present/dropped correctly + // We test our protocol checks for table features in TableFeaturesSuite + // Min reader version is too high + assert(intercept[KernelException] { + // Use toSeq because we need to consume the iterator to force the exception + Table.forPath(defaultEngine, goldenTablePath("deltalog-invalid-protocol-version")) + .asInstanceOf[TableImpl] + .getChanges(defaultEngine, 0, 0, FULL_ACTION_SET.asJava).toSeq + }.getMessage.contains("Unsupported Delta protocol reader version")) + // We still get an error if we don't request the protocol file action + assert(intercept[KernelException] { + Table.forPath(defaultEngine, goldenTablePath("deltalog-invalid-protocol-version")) + .asInstanceOf[TableImpl] + .getChanges(defaultEngine, 0, 0, Set(DeltaAction.ADD).asJava).toSeq + }.getMessage.contains("Unsupported Delta protocol reader version")) + } + ////////////////////////////////////////////////////////////////////////////////// // Helpers to compare actions returned between Kernel and Spark //////////////////////////////////////////////////////////////////////////////////