diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala index 74ff67ddf23..60748ddb0d0 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala @@ -511,8 +511,9 @@ trait OptimisticTransactionImpl extends TransactionalWrite val newProtocolForLatestMetadata = Protocol(readerVersionAsTableProp, writerVersionAsTableProp) - val proposedNewProtocol = protocolBeforeUpdate.merge(newProtocolForLatestMetadata) - .addImplicitLegacyFeatures(newProtocolForLatestMetadata) + val proposedNewProtocol = protocolBeforeUpdate + .merge(newProtocolForLatestMetadata) + .downgradeProtocolVersionsIfNeeded if (proposedNewProtocol != protocolBeforeUpdate) { // The merged protocol has higher versions and/or supports more features. diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala b/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala index f58666d946a..53de9c56650 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala @@ -337,7 +337,11 @@ object TableFeature { DeletionVectorsTableFeature, VacuumProtocolCheckTableFeature, V2CheckpointTableFeature, - RowTrackingFeature) + RowTrackingFeature, + TestRemovableWriterFeature, + TestRemovableLegacyWriterFeature, + TestRemovableReaderWriterFeature, + TestRemovableLegacyReaderWriterFeature) if (DeltaUtils.isTesting) { features ++= Set( TestLegacyWriterFeature, @@ -347,10 +351,6 @@ object TableFeature { TestReaderWriterFeature, TestReaderWriterMetadataAutoUpdateFeature, TestReaderWriterMetadataNoAutoUpdateFeature, - TestRemovableWriterFeature, - TestRemovableLegacyWriterFeature, - TestRemovableReaderWriterFeature, - TestRemovableLegacyReaderWriterFeature, TestFeatureWithDependency, TestFeatureWithTransitiveDependency, TestWriterFeatureWithTransitiveDependency, diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/actions/TableFeatureSupport.scala b/spark/src/main/scala/org/apache/spark/sql/delta/actions/TableFeatureSupport.scala index 3a56a961a88..2ab20c684cd 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/actions/TableFeatureSupport.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/actions/TableFeatureSupport.scala @@ -281,12 +281,26 @@ trait TableFeatureSupport { this: Protocol => (this.readerAndWriterFeatureNames -- to.readerAndWriterFeatureNames).size == 1 } + /** + * This is a special protocol version downgrade path that can be invoked by setting legacy + * protocol versions on table features protocol. This is the only downgrade path that may + * occur outside the DROP FEATURE command. The main requirement is the table features protocol + * only contains legacy features. + */ + def canDowngradeToLegacy(to: Protocol): Boolean = { + if (!supportsWriterFeatures || to.supportsWriterFeatures) return false + + readerAndWriterFeatures.forall(_.isLegacyFeature) && + readerAndWriterFeatureNames.subsetOf(to.implicitlySupportedFeatures.map(_.name)) + } + /** * True if this protocol can be upgraded or downgraded to the 'to' protocol. */ def canTransitionTo(to: Protocol, op: Operation): Boolean = { op match { case drop: DeltaOperations.DropTableFeature => canDowngradeTo(to, drop.featureName) + case _ if supportsWriterFeatures && !to.supportsWriterFeatures => canDowngradeToLegacy(to) case _ => canUpgradeTo(to) } } @@ -314,15 +328,6 @@ trait TableFeatureSupport { this: Protocol => } } - /** - * Adds to the protocol all implicit legacy features of the `legacy` protocol. - */ - def addImplicitLegacyFeatures(legacy: Protocol): Protocol = { - if (!this.supportsWriterFeatures || legacy.supportsWriterFeatures) return this - - this.withFeatures(legacy.implicitlySupportedFeatures) - } - /** * Remove writer feature from protocol. To remove a writer feature we only need to * remove it from the writerFeatures set. diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaProtocolVersionSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaProtocolVersionSuite.scala index 0b822e792f0..5da16539a52 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaProtocolVersionSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaProtocolVersionSuite.scala @@ -3423,10 +3423,12 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest } for { - setLegacyVersions <- BOOLEAN_DOMAIN tableFeatureToAdd <- Seq(TestRemovableWriterFeature, TestRemovableReaderWriterFeature) + setLegacyVersions <- BOOLEAN_DOMAIN + downgradeAfterDrop <- if (setLegacyVersions) Seq(true, false) else Seq(false) } test("SOP for downgrading to legacy protocol versions for tables created with features. " + - s"setLegacyVersions: $setLegacyVersions, tableFeatureToAdd: ${tableFeatureToAdd.name}") { + s"tableFeatureToAdd: ${tableFeatureToAdd.name}, setLegacyVersions: $setLegacyVersions, " + + s"downgradeAfterDrop: ${downgradeAfterDrop}") { withTempDir { dir => val deltaLog = DeltaLog.forTable(spark, dir) @@ -3439,20 +3441,23 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest |delta.feature.${ChangeDataFeedTableFeature.name} = 'supported' |)""".stripMargin) - if (setLegacyVersions) { - sql( - s""" - |ALTER TABLE delta.`${deltaLog.dataPath}` SET TBLPROPERTIES ( - | 'delta.minReaderVersion' = ${ChangeDataFeedTableFeature.minReaderVersion}, - | 'delta.minWriterVersion' = ${ChangeDataFeedTableFeature.minWriterVersion} - |)""".stripMargin) - } + val downgradeProtocolVersionsSQL = + s""" + |ALTER TABLE delta.`${deltaLog.dataPath}` SET TBLPROPERTIES ( + | 'delta.minReaderVersion' = ${ChangeDataFeedTableFeature.minReaderVersion}, + | 'delta.minWriterVersion' = ${ChangeDataFeedTableFeature.minWriterVersion} + |)""".stripMargin + + + if (setLegacyVersions && !downgradeAfterDrop) sql(downgradeProtocolVersionsSQL) AlterTableDropFeatureDeltaCommand( DeltaTableV2(spark, deltaLog.dataPath), tableFeatureToAdd.name, truncateHistory = tableFeatureToAdd.isReaderWriterFeature).run(spark) + if (setLegacyVersions && downgradeAfterDrop) sql(downgradeProtocolVersionsSQL) + val expectedProtocol = if (setLegacyVersions) { Protocol(1, 4) } else {