From 2f7492a12bd876d3fdc27594af6aacea430e0bd4 Mon Sep 17 00:00:00 2001 From: Andreas Chatzistergiou Date: Tue, 9 Jul 2024 16:55:56 +0200 Subject: [PATCH] Added a few more tests with column mapping --- .../apache/spark/sql/delta/TableFeature.scala | 8 ++- .../sql/delta/sources/DeltaSQLConf.scala | 8 +++ .../delta/DeltaProtocolTransitionsSuite.scala | 70 ++++++++++++++----- 3 files changed, 65 insertions(+), 21 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala b/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala index fd59a860e5b..3136b83217d 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala @@ -332,7 +332,11 @@ object TableFeature { * Warning: Do not call `get` on this Map to get a specific feature because keys in this map are * in lower cases. Use [[featureNameToFeature]] instead. */ - private[delta] val allSupportedFeaturesMap: Map[String, TableFeature] = { + private[delta] def allSupportedFeaturesMap: Map[String, TableFeature] = { + val testingFeaturesEnabled = SparkSession + .getActiveSession + .map(_.conf.get(DeltaSQLConf.TABLE_FEATURES_TEST_FEATURES_ENABLED)) + .getOrElse(true) var features: Set[TableFeature] = Set( AllowColumnDefaultsTableFeature, AppendOnlyTableFeature, @@ -355,7 +359,7 @@ object TableFeature { InCommitTimestampTableFeature, VariantTypeTableFeature, CoordinatedCommitsTableFeature) - if (DeltaUtils.isTesting) { + if (DeltaUtils.isTesting && testingFeaturesEnabled) { features ++= Set( TestLegacyWriterFeature, TestLegacyReaderWriterFeature, diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala index 56d7d0c06ea..7c8dff380bf 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala @@ -423,6 +423,14 @@ trait DeltaSQLConfBase { .checkValues(Set(1, 2, 3)) .createWithDefault(1) + val TABLE_FEATURES_TEST_FEATURES_ENABLED = + buildConf("tableFeatures.testFeatures.enabled") + .internal() + .doc("Controls whether test features are enabled in testing mode. " + + "This config is only used for testing purposes. ") + .booleanConf + .createWithDefault(true) + val DELTA_MAX_SNAPSHOT_LINEAGE_LENGTH = buildConf("maxSnapshotLineageLength") .internal() diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaProtocolTransitionsSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaProtocolTransitionsSuite.scala index 6ebc898b659..dee75c23b40 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaProtocolTransitionsSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaProtocolTransitionsSuite.scala @@ -148,37 +148,39 @@ class DeltaProtocolTransitionsSuite expectedProtocol = Protocol(3, 7).withFeature(TestRemovableReaderWriterFeature)) } - for ((readerVersion, writerVersion) <- Seq((2, 1), (2, 2), (2, 3), (2, 4))) + for ((readerVersion, writerVersion) <- Seq((2, 1), (2, 2), (2, 3), (2, 4), (1, 5))) test("Invalid legacy protocol normalization" + s" - invalidProtocol($readerVersion, $writerVersion)") { val expectedReaderVersion = 1 val expectedWriterVersion = Math.min(writerVersion, 4) - // Base case. - testProtocolTransition( - createTableProperties = Seq( - ("delta.minReaderVersion", readerVersion.toString), - ("delta.minWriterVersion", writerVersion.toString)), - expectedProtocol = Protocol(expectedReaderVersion, expectedWriterVersion)) + withSQLConf(DeltaSQLConf.TABLE_FEATURES_TEST_FEATURES_ENABLED.key -> false.toString) { + // Base case. + testProtocolTransition( + createTableProperties = Seq( + ("delta.minReaderVersion", readerVersion.toString), + ("delta.minWriterVersion", writerVersion.toString)), + expectedProtocol = Protocol(expectedReaderVersion, expectedWriterVersion)) - // Invalid legacy versions are normalized in default confs. - withSQLConf( + // Invalid legacy versions are normalized in default confs. + withSQLConf( DeltaSQLConf.DELTA_PROTOCOL_DEFAULT_READER_VERSION.key -> readerVersion.toString, DeltaSQLConf.DELTA_PROTOCOL_DEFAULT_WRITER_VERSION.key -> writerVersion.toString) { + testProtocolTransition( + expectedProtocol = Protocol(expectedReaderVersion, expectedWriterVersion)) + } + + // Invalid legacy versions are normalized in alter table. testProtocolTransition( + createTableProperties = Seq( + ("delta.minReaderVersion", 1.toString), + ("delta.minWriterVersion", 1.toString)), + alterTableProperties = Seq( + ("delta.minReaderVersion", readerVersion.toString), + ("delta.minWriterVersion", writerVersion.toString)), expectedProtocol = Protocol(expectedReaderVersion, expectedWriterVersion)) } - - // Invalid legacy versions are normalized in alter table. - testProtocolTransition( - createTableProperties = Seq( - ("delta.minReaderVersion", 1.toString), - ("delta.minWriterVersion", 1.toString)), - alterTableProperties = Seq( - ("delta.minReaderVersion", readerVersion.toString), - ("delta.minWriterVersion", writerVersion.toString)), - expectedProtocol = Protocol(expectedReaderVersion, expectedWriterVersion)) } test("ADD FEATURE normalization") { @@ -309,6 +311,27 @@ class DeltaProtocolTransitionsSuite alterTableProperties = Seq( (s"delta.feature.${CheckConstraintsTableFeature.name}", "supported")), expectedProtocol = Protocol(1, 3)) + + withSQLConf(DeltaSQLConf.TABLE_FEATURES_TEST_FEATURES_ENABLED.key -> false.toString) { + testProtocolTransition( + createTableProperties = Seq( + ("delta.minReaderVersion", 1.toString), + ("delta.minWriterVersion", 4.toString)), + alterTableProperties = Seq( + (s"delta.feature.${ColumnMappingTableFeature.name}", "supported")), + expectedProtocol = Protocol(2, 5)) + + + testProtocolTransition( + createTableProperties = Seq( + ("delta.minReaderVersion", 1.toString), + ("delta.minWriterVersion", 4.toString)), + alterTableProperties = Seq( + ("delta.minReaderVersion", 1.toString), + ("delta.minWriterVersion", 7.toString), + (DeltaConfigs.COLUMN_MAPPING_MODE.key, "name")), + expectedProtocol = Protocol(2, 5)) + } } test("DROP FEATURE normalization") { @@ -371,6 +394,15 @@ class DeltaProtocolTransitionsSuite InvariantsTableFeature, AppendOnlyTableFeature, TestRemovableWriterFeature))) + + withSQLConf(DeltaSQLConf.TABLE_FEATURES_TEST_FEATURES_ENABLED.key -> false.toString) { + testProtocolTransition( + createTableProperties = Seq( + ("delta.minReaderVersion", 2.toString), + ("delta.minWriterVersion", 5.toString)), + dropFeatures = Seq(ColumnMappingTableFeature), + expectedProtocol = Protocol(1, 4)) + } } test("Default Enabled native features") {