diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala index 4f41612570e..63395b3e5e8 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala @@ -547,7 +547,6 @@ trait OptimisticTransactionImpl extends TransactionalWrite // 2 or 3 will add "invariants" and "appendOnly", filling in the gaps for writer // protocol version 3, and then we can downgrade to version 3. val proposedNewProtocol = protocolBeforeUpdate - .normalizeLegacyFeatures .merge(newProtocolForLatestMetadata) if (proposedNewProtocol != protocolBeforeUpdate) { diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala b/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala index 6cd786e93c4..29558d04df5 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala @@ -344,6 +344,7 @@ object TableFeature { RowTrackingFeature, InCommitTimestampTableFeature, VariantTypeTableFeature, + TestRemovableWriterFeature, CoordinatedCommitsTableFeature) if (DeltaUtils.isTesting) { features ++= Set( diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/actions/TableFeatureSupport.scala b/spark/src/main/scala/org/apache/spark/sql/delta/actions/TableFeatureSupport.scala index 238aa8f577c..e8a2d5134f1 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/actions/TableFeatureSupport.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/actions/TableFeatureSupport.scala @@ -359,19 +359,6 @@ trait TableFeatureSupport { this: Protocol => } } - /** - * For table feature protocols, add the missing legacy features to match exactly the - * implicit features of the minimum required legacy protocol versions. - */ - def normalizeLegacyFeatures: Protocol = { - if (!supportsWriterFeatures) return this - - // Prepare the protocol but filling the "gaps" with all missing legacy features. - val (minReaderVersion, minWriterVersion) = - TableFeatureProtocolUtils.minimumRequiredVersions(readerAndWriterFeatures) - merge(Protocol(minReaderVersion, minWriterVersion)) - } - /** * Check if a `feature` is supported by this protocol. This means either (a) the protocol does * not support table features and implicitly supports the feature, or (b) the protocol supports diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaProtocolVersionSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaProtocolVersionSuite.scala index c0be59fe904..e2df8acba76 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaProtocolVersionSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaProtocolVersionSuite.scala @@ -3464,10 +3464,10 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest truncateHistory = truncateHistory) } - def validVersions = Seq((1, 1), (1, 2), (1, 3), (1, 4), (2, 5), (2, 6), (1, 7), (3, 7)) - def invalidVersions = Seq((2, 2), (2, 3)) + private def validVersions = Seq((1, 1), (1, 2), (1, 3), (1, 4), (2, 5), (1, 7), (3, 7)) + private def invalidVersions = Seq((2, 2), (2, 3)) for ((readerVersion, writerVersion) <- validVersions ++ invalidVersions) - test(s"Legacy features are added when setting legacy versions: " + + test("Legacy features are added when setting legacy versions: " + s"readerVersionToSet = $readerVersion, writerVersionToSet = $writerVersion") { withTempDir { dir => val deltaLog = DeltaLog.forTable(spark, dir) @@ -3491,7 +3491,7 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest } } - test(s"Setting protocol versions in a legacy table should not add any features") { + test("Setting protocol versions in a legacy table should not add any features") { withTempDir { dir => val deltaLog = DeltaLog.forTable(spark, dir) @@ -3528,7 +3528,7 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest sql( s"""CREATE TABLE delta.`${deltaLog.dataPath}` (id bigint) USING delta |TBLPROPERTIES ( - |delta.minReaderVersion = $TABLE_FEATURES_MIN_READER_VERSION, + |delta.minReaderVersion = ${Math.max(tableFeatureToAdd.minReaderVersion, 1)}, |delta.minWriterVersion = $TABLE_FEATURES_MIN_WRITER_VERSION, |delta.feature.${tableFeatureToAdd.name} = 'supported', |delta.feature.${ChangeDataFeedTableFeature.name} = 'supported' @@ -3550,23 +3550,13 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest if (!preemptiveVersionDowngrade) sql(downgradeProtocolVersionsSQL) - // - When setting the protocol versions on a protocol with legacy features only, we ignore - // the actual version set when it is lower than required from the already enabled legacy - // features. Instead, we always attempt to set the minimum required versions. - // - When setting the protocol versions on a protocol with table features we add the legacy - // features of the requested versions. - if (preemptiveVersionDowngrade) { - val expectedProtocol = if (downgradeVersionToSet < 4) { - Protocol(tableFeatureToAdd.minReaderVersion, 7).withFeature(ChangeDataFeedTableFeature) - .merge(Protocol(1, downgradeVersionToSet)) - } else { - Protocol(1, downgradeVersionToSet) - } - assert(deltaLog.update().protocol === expectedProtocol) + val expectedProtocol = if (downgradeVersionToSet < 4) { + Protocol(tableFeatureToAdd.minReaderVersion, 7).withFeature(ChangeDataFeedTableFeature) + .merge(Protocol(1, downgradeVersionToSet)) } else { - val expectedWriterVersion = if (downgradeVersionToSet > 4) downgradeVersionToSet else 4 - assert(deltaLog.update().protocol === Protocol(1, expectedWriterVersion)) + Protocol(1, downgradeVersionToSet) } + assert(deltaLog.update().protocol === expectedProtocol) } } @@ -3604,6 +3594,8 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest tableFeatureToAdd.name, truncateHistory = tableFeatureToAdd.isReaderWriterFeature).run(spark) + if (setLegacyVersions && downgradeAfterDrop) sql(downgradeProtocolVersionsSQL) + val expectedProtocol = if (setLegacyVersions) { Protocol(1, 4) } else { @@ -3636,7 +3628,7 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest assert(deltaLog.update().protocol === Protocol(1, 2)) } } - + private def dropV2CheckpointsTableFeature(spark: SparkSession, log: DeltaLog): Unit = { spark.sql(s"ALTER TABLE delta.`${log.dataPath}` DROP FEATURE " + s"`${V2CheckpointTableFeature.name}`")