diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala b/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala index 53de9c56650..f58666d946a 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala @@ -337,11 +337,7 @@ object TableFeature { DeletionVectorsTableFeature, VacuumProtocolCheckTableFeature, V2CheckpointTableFeature, - RowTrackingFeature, - TestRemovableWriterFeature, - TestRemovableLegacyWriterFeature, - TestRemovableReaderWriterFeature, - TestRemovableLegacyReaderWriterFeature) + RowTrackingFeature) if (DeltaUtils.isTesting) { features ++= Set( TestLegacyWriterFeature, @@ -351,6 +347,10 @@ object TableFeature { TestReaderWriterFeature, TestReaderWriterMetadataAutoUpdateFeature, TestReaderWriterMetadataNoAutoUpdateFeature, + TestRemovableWriterFeature, + TestRemovableLegacyWriterFeature, + TestRemovableReaderWriterFeature, + TestRemovableLegacyReaderWriterFeature, TestFeatureWithDependency, TestFeatureWithTransitiveDependency, TestWriterFeatureWithTransitiveDependency, diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaProtocolVersionSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaProtocolVersionSuite.scala index 5da16539a52..f21c65509ca 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaProtocolVersionSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaProtocolVersionSuite.scala @@ -3372,103 +3372,6 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest expectedDowngradedProtocol = protocolWithReaderFeature(TestRemovableReaderWriterFeature)) } - private def validVersions = Seq((1, 1), (1, 2), (1, 3), (1, 4), (2, 5), (1, 7), (3, 7)) - private def invalidVersions = Seq((2, 2), (2, 3)) - for ((readerVersion, writerVersion) <- validVersions ++ invalidVersions) - test("Legacy features are added when setting legacy versions: " + - s"readerVersionToSet = $readerVersion, writerVersionToSet = $writerVersion") { - withTempDir { dir => - val deltaLog = DeltaLog.forTable(spark, dir) - - // Creates a table with (3, 7) versions with the given table feature. - createTableWithFeature( - deltaLog, - TestRemovableWriterFeature, - TestRemovableWriterFeature.TABLE_PROP_KEY) - - sql( - s""" - |ALTER TABLE delta.`${deltaLog.dataPath}` SET TBLPROPERTIES ( - | 'delta.minReaderVersion' = $readerVersion, - | 'delta.minWriterVersion' = $writerVersion - |)""".stripMargin) - - val expected = Protocol(readerVersion, writerVersion) - .implicitlySupportedFeatures + TestRemovableWriterFeature - assert(deltaLog.update().protocol.readerAndWriterFeatureNames === expected.map(_.name)) - } - } - - test("Setting protocol versions in a legacy table should not add any features") { - withTempDir { dir => - val deltaLog = DeltaLog.forTable(spark, dir) - - sql( - s"""CREATE TABLE delta.`${deltaLog.dataPath}` (id bigint) USING delta - |TBLPROPERTIES ( - |delta.minReaderVersion = '1', - |delta.minWriterVersion = '2' - |)""".stripMargin) - - sql( - s""" - |ALTER TABLE delta.`${deltaLog.dataPath}` SET TBLPROPERTIES ( - | 'delta.minReaderVersion' = '1', - | 'delta.minWriterVersion' = '4' - |)""".stripMargin) - - // There should be no explicitly added legacy features. - assert(deltaLog.update().protocol == Protocol(1, 4)) - } - } - - for { - tableFeatureToAdd <- Seq(TestRemovableWriterFeature, TestRemovableReaderWriterFeature) - setLegacyVersions <- BOOLEAN_DOMAIN - downgradeAfterDrop <- if (setLegacyVersions) Seq(true, false) else Seq(false) - } test("SOP for downgrading to legacy protocol versions for tables created with features. " + - s"tableFeatureToAdd: ${tableFeatureToAdd.name}, setLegacyVersions: $setLegacyVersions, " + - s"downgradeAfterDrop: ${downgradeAfterDrop}") { - withTempDir { dir => - val deltaLog = DeltaLog.forTable(spark, dir) - - sql( - s"""CREATE TABLE delta.`${deltaLog.dataPath}` (id bigint) USING delta - |TBLPROPERTIES ( - |delta.minReaderVersion = $TABLE_FEATURES_MIN_READER_VERSION, - |delta.minWriterVersion = $TABLE_FEATURES_MIN_WRITER_VERSION, - |delta.feature.${tableFeatureToAdd.name} = 'supported', - |delta.feature.${ChangeDataFeedTableFeature.name} = 'supported' - |)""".stripMargin) - - val downgradeProtocolVersionsSQL = - s""" - |ALTER TABLE delta.`${deltaLog.dataPath}` SET TBLPROPERTIES ( - | 'delta.minReaderVersion' = ${ChangeDataFeedTableFeature.minReaderVersion}, - | 'delta.minWriterVersion' = ${ChangeDataFeedTableFeature.minWriterVersion} - |)""".stripMargin - - - if (setLegacyVersions && !downgradeAfterDrop) sql(downgradeProtocolVersionsSQL) - - AlterTableDropFeatureDeltaCommand( - DeltaTableV2(spark, deltaLog.dataPath), - tableFeatureToAdd.name, - truncateHistory = tableFeatureToAdd.isReaderWriterFeature).run(spark) - - if (setLegacyVersions && downgradeAfterDrop) sql(downgradeProtocolVersionsSQL) - - val expectedProtocol = if (setLegacyVersions) { - Protocol(1, 4) - } else { - Protocol(TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION) - .withFeature(ChangeDataFeedTableFeature) - - } - assert(deltaLog.update().protocol === expectedProtocol) - } - } - test("Can drop reader+writer feature when there is nothing to clean") { withTempPath { dir => val clock = new ManualClock(System.currentTimeMillis()) @@ -3498,10 +3401,10 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest } } - private def validVersions = Seq((1, 1), (1, 2), (1, 3), (1, 4), (2, 5), (2, 6), (1, 7), (3, 7)) + private def validVersions = Seq((1, 1), (1, 2), (1, 3), (1, 4), (2, 5), (1, 7), (3, 7)) private def invalidVersions = Seq((2, 2), (2, 3)) for ((readerVersion, writerVersion) <- validVersions ++ invalidVersions) - test(s"Legacy features are added when setting legacy versions: " + + test("Legacy features are added when setting legacy versions: " + s"readerVersionToSet = $readerVersion, writerVersionToSet = $writerVersion") { withTempDir { dir => val deltaLog = DeltaLog.forTable(spark, dir) @@ -3525,7 +3428,7 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest } } - test(s"Setting protocol versions in a legacy table should not add any features") { + test("Setting protocol versions in a legacy table should not add any features") { withTempDir { dir => val deltaLog = DeltaLog.forTable(spark, dir) @@ -3549,11 +3452,12 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest } for { - setLegacyVersions <- BOOLEAN_DOMAIN tableFeatureToAdd <- Seq(TestRemovableWriterFeature, TestRemovableReaderWriterFeature) - } test(s"SOP for downgrading to legacy protocol versions for tables created " + - s"with table features. " + - s"setLegacyVersions: $setLegacyVersions, tableFeatureToAdd: ${tableFeatureToAdd.name}") { + setLegacyVersions <- BOOLEAN_DOMAIN + downgradeAfterDrop <- if (setLegacyVersions) Seq(true, false) else Seq(false) + } test("SOP for downgrading to legacy protocol versions for tables created with features. " + + s"tableFeatureToAdd: ${tableFeatureToAdd.name}, setLegacyVersions: $setLegacyVersions, " + + s"downgradeAfterDrop: ${downgradeAfterDrop}") { withTempDir { dir => val deltaLog = DeltaLog.forTable(spark, dir) @@ -3566,20 +3470,23 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest |delta.feature.${ChangeDataFeedTableFeature.name} = 'supported' |)""".stripMargin) - if (setLegacyVersions) { - sql( - s""" - |ALTER TABLE delta.`${deltaLog.dataPath}` SET TBLPROPERTIES ( - | 'delta.minReaderVersion' = ${ChangeDataFeedTableFeature.minReaderVersion}, - | 'delta.minWriterVersion' = ${ChangeDataFeedTableFeature.minWriterVersion} - |)""".stripMargin) - } + val downgradeProtocolVersionsSQL = + s""" + |ALTER TABLE delta.`${deltaLog.dataPath}` SET TBLPROPERTIES ( + | 'delta.minReaderVersion' = ${ChangeDataFeedTableFeature.minReaderVersion}, + | 'delta.minWriterVersion' = ${ChangeDataFeedTableFeature.minWriterVersion} + |)""".stripMargin + + + if (setLegacyVersions && !downgradeAfterDrop) sql(downgradeProtocolVersionsSQL) AlterTableDropFeatureDeltaCommand( DeltaTableV2(spark, deltaLog.dataPath), tableFeatureToAdd.name, truncateHistory = tableFeatureToAdd.isReaderWriterFeature).run(spark) + if (setLegacyVersions && downgradeAfterDrop) sql(downgradeProtocolVersionsSQL) + val expectedProtocol = if (setLegacyVersions) { Protocol(1, 4) } else {