diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala index 001d64aa0dd..cab20b038e6 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala @@ -685,7 +685,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite val existingFeatureNames = newProtocolBeforeAddingFeatures.readerAndWriterFeatureNames if (!newFeaturesFromTableConf.map(_.name).subsetOf(existingFeatureNames)) { // When enabling legacy features, include all preceding legacy features. - val implicitFeatures = TableFeatureProtocolUtils.implicitFeatures(newFeaturesFromTableConf) + val implicitFeatures = TableFeatureProtocolUtils.implicitlySupportedFeatures(newFeaturesFromTableConf) newProtocol = Some( Protocol( readerVersionForNewProtocol, diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/actions/TableFeatureSupport.scala b/spark/src/main/scala/org/apache/spark/sql/delta/actions/TableFeatureSupport.scala index b67385e0aaf..86e587ba063 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/actions/TableFeatureSupport.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/actions/TableFeatureSupport.scala @@ -500,9 +500,15 @@ object TableFeatureProtocolUtils { ((features.map(_.minReaderVersion) :+ 1).max, (features.map(_.minWriterVersion) :+ 1).max) /** - * Return a set with the implicit features of the provided feature set. + * Returns a set of legacy features that contains the input features as well as the + * features that will be supported by the protocol as a "byproduct" of supporting the + * given legacy `features`. + * + * As an example, the legacy protocol for supporting ColumnMapping also supports + * AppendOnly, Invariants, CheckConstraints, CDF, GeneratedColumns as byproducts and there is + * no way to not support them. */ - def implicitFeatures(features: Set[TableFeature]): Set[TableFeature] = + def implicitlySupportedFeatures(features: Set[TableFeature]): Set[TableFeature] = features.flatMap { f => Protocol(f.minReaderVersion, f.minWriterVersion).implicitlySupportedFeatures } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala b/spark/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala index 373b0784de6..c071971cfa7 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala @@ -399,7 +399,7 @@ object Protocol { } // When enabling legacy features, include all preceding legacy features. - val implicitFeatures = TableFeatureProtocolUtils.implicitFeatures(allEnabledFeatures) + val implicitFeatures = TableFeatureProtocolUtils.implicitlySupportedFeatures(allEnabledFeatures) (finalReaderVersion, finalWriterVersion, allEnabledFeatures ++ implicitFeatures ++ implicitFeaturesFromTableConf) @@ -430,7 +430,7 @@ object Protocol { } // When enabling legacy features, include all preceding legacy features. - val implicitFeatures = TableFeatureProtocolUtils.implicitFeatures(enabledFeatures) + val implicitFeatures = TableFeatureProtocolUtils.implicitlySupportedFeatures(enabledFeatures) (readerVersion, writerVersion, enabledFeatures ++ implicitFeatures) } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaProtocolVersionSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaProtocolVersionSuite.scala index 01a2d958303..b246ddf7f20 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaProtocolVersionSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaProtocolVersionSuite.scala @@ -2407,25 +2407,14 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest |$featureProperty = "true" |)""".stripMargin) - /* - val expectedReaderVersion = Math.max(feature.minReaderVersion, 1) + val expectedReaderVersion = Math.max(1, feature.minReaderVersion) val expectProtocol = if (feature.isLegacyFeature) { Protocol(expectedReaderVersion, feature.minWriterVersion) } else { - Protocol( - expectedReaderVersion, - TABLE_FEATURES_MIN_WRITER_VERSION, - if (supportsReaderFeatures(expectedReaderVersion)) Some(Set(feature.name)) else None, - Some(Set(feature.name, InvariantsTableFeature.name, AppendOnlyTableFeature.name))) - } - */ - val expectProtocol = if (feature.isLegacyFeature) { - Protocol(Math.max(1, feature.minReaderVersion), feature.minWriterVersion) - } else { - Protocol(3, 7).withFeatures(Seq( + Protocol(expectedReaderVersion, 7).withFeatures(Seq( InvariantsTableFeature, AppendOnlyTableFeature, - feature)).normalized + feature)) } assert(deltaLog.update().protocol === expectProtocol) @@ -4205,3 +4194,4 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest } class DeltaProtocolVersionSuite extends DeltaProtocolVersionSuiteBase + with DeltaProtocolVersionSuiteEdge \ No newline at end of file diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTableFeatureSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTableFeatureSuite.scala index 8ea64a04ac0..61877ebb6dd 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTableFeatureSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTableFeatureSuite.scala @@ -335,17 +335,17 @@ class DeltaTableFeatureSuite val log = DeltaLog.forTable(spark, TableIdentifier("tbl")) val protocol = log.update().protocol assert(protocol.readerAndWriterFeatureNames === Set( - AppendOnlyTableFeature.name, - InvariantsTableFeature.name, - CheckConstraintsTableFeature.name, - GeneratedColumnsTableFeature.name, - ChangeDataFeedTableFeature.name, - ColumnMappingTableFeature.name, - TestLegacyWriterFeature.name, - TestRemovableLegacyReaderWriterFeature.name, - TestLegacyReaderWriterFeature.name, - TestRemovableLegacyWriterFeature.name, - TestWriterFeature.name)) + AppendOnlyTableFeature, + InvariantsTableFeature, + CheckConstraintsTableFeature, + GeneratedColumnsTableFeature, + ChangeDataFeedTableFeature, + ColumnMappingTableFeature, + TestLegacyWriterFeature, + TestRemovableLegacyReaderWriterFeature, + TestLegacyReaderWriterFeature, + TestRemovableLegacyWriterFeature, + TestWriterFeature).map(_.name)) } } } @@ -396,12 +396,12 @@ class DeltaTableFeatureSuite commandName, targetTableName = "tbl", sourceTableName = "tbl", tblProperties)) val protocol = log.update().protocol assert(protocol.readerAndWriterFeatureNames === Set( - AppendOnlyTableFeature.name, - InvariantsTableFeature.name, - CheckConstraintsTableFeature.name, - GeneratedColumnsTableFeature.name, - ChangeDataFeedTableFeature.name, - TestWriterFeature.name)) + AppendOnlyTableFeature, + InvariantsTableFeature, + CheckConstraintsTableFeature, + GeneratedColumnsTableFeature, + ChangeDataFeedTableFeature, + TestWriterFeature).map(_.name)) } } } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/columnmapping/DropColumnMappingFeatureSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/columnmapping/DropColumnMappingFeatureSuite.scala index 365ca9ed1f7..ca65dbf4c61 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/columnmapping/DropColumnMappingFeatureSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/columnmapping/DropColumnMappingFeatureSuite.scala @@ -20,6 +20,8 @@ import java.util.concurrent.TimeUnit import org.apache.spark.sql.delta._ import org.apache.spark.sql.delta.DeltaConfigs._ +import org.apache.spark.sql.delta.actions.Protocol +import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.sources.DeltaSQLConf._ import org.apache.spark.sql.catalyst.TableIdentifier @@ -81,31 +83,35 @@ class DropColumnMappingFeatureSuite extends RemoveColumnMappingSuiteUtils { } test("drop column mapping from a table without table feature") { - sql( - s"""CREATE TABLE $testTableName - |USING delta - |TBLPROPERTIES ('${DeltaConfigs.COLUMN_MAPPING_MODE.key}' = 'name', - | '${DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.key}' = 'false', - | 'delta.minReaderVersion' = '3', - | 'delta.minWriterVersion' = '7') - |AS SELECT id as $logicalColumnName, id + 1 as $secondColumn - | FROM RANGE(0, $totalRows, 1, $numFiles) - |""".stripMargin) - testDroppingColumnMapping() + withSQLConf(DeltaSQLConf.TABLE_FEATURES_TEST_FEATURES_ENABLED.key -> false.toString) { + sql( + s"""CREATE TABLE $testTableName + |USING delta + |TBLPROPERTIES ('${DeltaConfigs.COLUMN_MAPPING_MODE.key}' = 'name', + | '${DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.key}' = 'false', + | 'delta.minReaderVersion' = '3', + | 'delta.minWriterVersion' = '7') + |AS SELECT id as $logicalColumnName, id + 1 as $secondColumn + | FROM RANGE(0, $totalRows, 1, $numFiles) + |""".stripMargin) + testDroppingColumnMapping(protocolContainsDVs = false) + } } test("drop column mapping from a table with table feature") { - sql( - s"""CREATE TABLE $testTableName - |USING delta - |TBLPROPERTIES ('${DeltaConfigs.COLUMN_MAPPING_MODE.key}' = 'name', - | '${DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.key}' = 'false', - | 'delta.minReaderVersion' = '3', - | 'delta.minWriterVersion' = '7') - |AS SELECT id as $logicalColumnName, id + 1 as $secondColumn - | FROM RANGE(0, $totalRows, 1, $numFiles) - |""".stripMargin) - testDroppingColumnMapping() + withSQLConf(DeltaSQLConf.TABLE_FEATURES_TEST_FEATURES_ENABLED.key -> false.toString) { + sql( + s"""CREATE TABLE $testTableName + |USING delta + |TBLPROPERTIES ('${DeltaConfigs.COLUMN_MAPPING_MODE.key}' = 'name', + | '${DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.key}' = 'true', + | 'delta.minReaderVersion' = '3', + | 'delta.minWriterVersion' = '7') + |AS SELECT id as $logicalColumnName, id + 1 as $secondColumn + | FROM RANGE(0, $totalRows, 1, $numFiles) + |""".stripMargin) + testDroppingColumnMapping(protocolContainsDVs = true) + } } test("drop column mapping from a table without column mapping table property") { @@ -135,20 +141,22 @@ class DropColumnMappingFeatureSuite extends RemoveColumnMappingSuiteUtils { } test("drop column mapping in id mode") { - sql( - s"""CREATE TABLE $testTableName - |USING delta - |TBLPROPERTIES ('${DeltaConfigs.COLUMN_MAPPING_MODE.key}' = 'id', - | '${DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.key}' = 'false', - | 'delta.minReaderVersion' = '3', - | 'delta.minWriterVersion' = '7') - |AS SELECT id as $logicalColumnName, id + 1 as $secondColumn - | FROM RANGE(0, $totalRows, 1, $numFiles) - |""".stripMargin) - testDroppingColumnMapping() + withSQLConf(DeltaSQLConf.TABLE_FEATURES_TEST_FEATURES_ENABLED.key -> false.toString) { + sql( + s"""CREATE TABLE $testTableName + |USING delta + |TBLPROPERTIES ('${DeltaConfigs.COLUMN_MAPPING_MODE.key}' = 'id', + | '${DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.key}' = 'false', + | 'delta.minReaderVersion' = '3', + | 'delta.minWriterVersion' = '7') + |AS SELECT id as $logicalColumnName, id + 1 as $secondColumn + | FROM RANGE(0, $totalRows, 1, $numFiles) + |""".stripMargin) + testDroppingColumnMapping(protocolContainsDVs = false) + } } - def testDroppingColumnMapping(): Unit = { + def testDroppingColumnMapping(protocolContainsDVs: Boolean): Unit = { // Verify the input data is as expected. val originalData = spark.table(tableName = testTableName).select(logicalColumnName).collect() // Add a schema comment and verify it is preserved after the rewrite. @@ -186,10 +194,10 @@ class DropColumnMappingFeatureSuite extends RemoveColumnMappingSuiteUtils { // Verify the schema comment is preserved after the rewrite. assert(deltaLog.update().schema.head.getComment().get == comment, "Should preserve the schema comment.") - verifyDropFeatureTruncateHistory() + verifyDropFeatureTruncateHistory(protocolContainsDVs) } - protected def verifyDropFeatureTruncateHistory() = { + protected def verifyDropFeatureTruncateHistory(protocolContainsDVs: Boolean) = { val deltaLog1 = DeltaLog.forTable(spark, TableIdentifier(tableName = testTableName), clock) // Populate the delta cache with the delta log with the right data path so it stores the clock. // This is currently the only way to make sure the drop feature command uses the clock. @@ -209,8 +217,23 @@ class DropColumnMappingFeatureSuite extends RemoveColumnMappingSuiteUtils { |ALTER TABLE $testTableName DROP FEATURE ${ColumnMappingTableFeature.name} TRUNCATE HISTORY |""".stripMargin) val newSnapshot = deltaLog.update() - assert(!newSnapshot.protocol.readerAndWriterFeatures.contains(ColumnMappingTableFeature), - "Should drop the feature.") + + val expectedProtocol = if (protocolContainsDVs) { + Protocol( + minReaderVersion = 3, + minWriterVersion = 7, + Some(Set(DeletionVectorsTableFeature.name)), + Some(Set( + AppendOnlyTableFeature, + InvariantsTableFeature, + CheckConstraintsTableFeature, + ChangeDataFeedTableFeature, + GeneratedColumnsTableFeature, + DeletionVectorsTableFeature).map(_.name))) + } else { + Protocol(1, 4) + } + assert(newSnapshot.protocol === expectedProtocol) } protected def dropColumnMappingTableFeature(): Unit = {