diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala index 7d23ec134d5..cb4dfa3e1b8 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala @@ -425,7 +425,7 @@ class DeltaLog private( def assertTableFeaturesMatchMetadata( targetProtocol: Protocol, targetMetadata: Metadata): Unit = { - if (!targetProtocol.supportsTableFeatures) return + if (!targetProtocol.supportsReaderFeatures && !targetProtocol.supportsWriterFeatures) return val protocolEnabledFeatures = targetProtocol.writerFeatureNames .flatMap(TableFeature.featureNameToFeature) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala b/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala index 7ed68404544..1591a99f831 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala @@ -542,7 +542,7 @@ class Snapshot( } base.put(Protocol.MIN_READER_VERSION_PROP, protocol.minReaderVersion.toString) base.put(Protocol.MIN_WRITER_VERSION_PROP, protocol.minWriterVersion.toString) - if (protocol.supportsTableFeatures) { + if (protocol.supportsReaderFeatures || protocol.supportsWriterFeatures) { val features = protocol.readerAndWriterFeatureNames.map(name => s"${TableFeatureProtocolUtils.FEATURE_PROP_PREFIX}$name" -> TableFeatureProtocolUtils.FEATURE_PROP_SUPPORTED) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/actions/TableFeatureSupport.scala b/spark/src/main/scala/org/apache/spark/sql/delta/actions/TableFeatureSupport.scala index 2d41fae0f79..b9065499084 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/actions/TableFeatureSupport.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/actions/TableFeatureSupport.scala @@ -39,32 +39,14 @@ import com.fasterxml.jackson.annotation.JsonIgnore */ trait TableFeatureSupport { this: Protocol => - /** - * Check if this protocol can support arbitrary reader features. If this returns false, - * then the table may still be able to support the "columnMapping" feature. - * See [[canSupportColumnMappingFeature]] below. - */ + /** Check if this protocol is capable of adding features into its `readerFeatures` field. */ def supportsReaderFeatures: Boolean = TableFeatureProtocolUtils.supportsReaderFeatures(minReaderVersion) - /** - * Check if this protocol is in table feature representation and can support column mapping. - * Column mapping is the only legacy reader feature and requires special handling in some - * cases. - */ - def canSupportColumnMappingFeature: Boolean = - TableFeatureProtocolUtils.canSupportColumnMappingFeature(minReaderVersion, minWriterVersion) - /** Check if this protocol is capable of adding features into its `writerFeatures` field. */ def supportsWriterFeatures: Boolean = TableFeatureProtocolUtils.supportsWriterFeatures(minWriterVersion) - /** - * As soon as a protocol supports writer features it is considered a table features protocol. - * It is not possible to support reader features without supporting writer features. - */ - def supportsTableFeatures: Boolean = supportsWriterFeatures - /** * Get a new Protocol object that has `feature` supported. Writer-only features will be added to * `writerFeatures` field, and reader-writer features will be added to `readerFeatures` and @@ -78,7 +60,7 @@ trait TableFeatureSupport { this: Protocol => */ def withFeature(feature: TableFeature): Protocol = { def shouldAddRead: Boolean = { - if (feature == ColumnMappingTableFeature && canSupportColumnMappingFeature) return true + if (supportsReaderFeatures) return true if (feature.minReaderVersion <= minReaderVersion) return false throw DeltaErrors.tableFeatureRequiresHigherReaderProtocolVersion( @@ -129,13 +111,25 @@ trait TableFeatureSupport { this: Protocol => * `writerFeatures` field. * * The method does not require the feature to be recognized by the client, therefore will not - * try keeping the protocol's `readerFeatures` and `writerFeatures` in sync. - * Should never be used directly. Always use withFeature(feature: TableFeature): Protocol. + * try keeping the protocol's `readerFeatures` and `writerFeatures` in sync. Use with caution. */ private[actions] def withFeature( name: String, addToReaderFeatures: Boolean, addToWriterFeatures: Boolean): Protocol = { + if (addToReaderFeatures && !supportsReaderFeatures) { + throw DeltaErrors.tableFeatureRequiresHigherReaderProtocolVersion( + name, + currentVersion = minReaderVersion, + requiredVersion = TableFeatureProtocolUtils.TABLE_FEATURES_MIN_READER_VERSION) + } + if (addToWriterFeatures && !supportsWriterFeatures) { + throw DeltaErrors.tableFeatureRequiresHigherWriterProtocolVersion( + name, + currentVersion = minWriterVersion, + requiredVersion = TableFeatureProtocolUtils.TABLE_FEATURES_MIN_WRITER_VERSION) + } + val addedReaderFeatureOpt = if (addToReaderFeatures) Some(name) else None val addedWriterFeatureOpt = if (addToWriterFeatures) Some(name) else None @@ -149,11 +143,11 @@ trait TableFeatureSupport { this: Protocol => * `readerFeatures` field. * * The method does not require the features to be recognized by the client, therefore will not - * try keeping the protocol's `readerFeatures` and `writerFeatures` in sync. - * Intended only for testing. Use with caution. + * try keeping the protocol's `readerFeatures` and `writerFeatures` in sync. Use with caution. */ private[delta] def withReaderFeatures(names: Iterable[String]): Protocol = { - names.foldLeft(this)(_.withFeature(_, addToReaderFeatures = true, addToWriterFeatures = false)) + names.foldLeft(this)( + _.withFeature(_, addToReaderFeatures = true, addToWriterFeatures = false)) } /** @@ -161,11 +155,11 @@ trait TableFeatureSupport { this: Protocol => * `writerFeatures` field. * * The method does not require the features to be recognized by the client, therefore will not - * try keeping the protocol's `readerFeatures` and `writerFeatures` in sync. - * Intended only for testing. Use with caution. + * try keeping the protocol's `readerFeatures` and `writerFeatures` in sync. Use with caution. */ private[delta] def withWriterFeatures(names: Iterable[String]): Protocol = { - names.foldLeft(this)(_.withFeature(_, addToReaderFeatures = false, addToWriterFeatures = true)) + names.foldLeft(this)( + _.withFeature(_, addToReaderFeatures = false, addToWriterFeatures = true)) } /** @@ -209,16 +203,14 @@ trait TableFeatureSupport { this: Protocol => */ @JsonIgnore lazy val implicitlySupportedFeatures: Set[TableFeature] = { - if (supportsTableFeatures) { - // As soon as a protocol supports writer features, all features need to be explicitly defined. - // This includes legacy reader features (the only one is Column Mapping), even if the - // reader protocol is legacy and explicitly supports Column Mapping. + if (supportsReaderFeatures && supportsWriterFeatures) { + // this protocol uses both reader and writer features, no feature can be implicitly supported Set() } else { TableFeature.allSupportedFeaturesMap.values .filter(_.isLegacyFeature) - .filter(_.minReaderVersion <= this.minReaderVersion) - .filter(_.minWriterVersion <= this.minWriterVersion) + .filterNot(supportsReaderFeatures || this.minReaderVersion < _.minReaderVersion) + .filterNot(supportsWriterFeatures || this.minWriterVersion < _.minWriterVersion) .toSet } } @@ -279,11 +271,14 @@ trait TableFeatureSupport { this: Protocol => val protocols = this +: others val mergedReaderVersion = protocols.map(_.minReaderVersion).max val mergedWriterVersion = protocols.map(_.minWriterVersion).max - val mergedFeatures = protocols.flatMap(_.readerAndWriterFeatures) + val mergedReaderFeatures = protocols.flatMap(_.readerFeatureNames) + val mergedWriterFeatures = protocols.flatMap(_.writerFeatureNames) val mergedImplicitFeatures = protocols.flatMap(_.implicitlySupportedFeatures) val mergedProtocol = Protocol(mergedReaderVersion, mergedWriterVersion) - .withFeatures(mergedFeatures ++ mergedImplicitFeatures) + .withReaderFeatures(mergedReaderFeatures) + .withWriterFeatures(mergedWriterFeatures) + .withFeatures(mergedImplicitFeatures) // The merged protocol is always normalized in order to represent the protocol // with the weakest possible form. This enables backward compatibility. @@ -353,7 +348,7 @@ trait TableFeatureSupport { this: Protocol => */ def normalized: Protocol = { // Normalization can only be applied to table feature protocols. - if (!supportsTableFeatures) return this + if (!supportsWriterFeatures) return this val (minReaderVersion, minWriterVersion) = TableFeatureProtocolUtils.minimumRequiredVersions(readerAndWriterFeatures) @@ -376,7 +371,7 @@ trait TableFeatureSupport { this: Protocol => */ def denormalized: Protocol = { // Denormalization can only be applied to legacy protocols. - if (supportsTableFeatures) return this + if (supportsWriterFeatures) return this val (minReaderVersion, _) = TableFeatureProtocolUtils.minimumRequiredVersions(implicitlySupportedFeatures.toSeq) @@ -424,7 +419,7 @@ object TableFeatureProtocolUtils { /** The string constant "supported" for uses in table properties. */ val FEATURE_PROP_SUPPORTED = "supported" - /** Min reader version that supports native reader features. */ + /** Min reader version that supports reader features. */ val TABLE_FEATURES_MIN_READER_VERSION = 3 /** Min reader version that supports writer features. */ @@ -445,20 +440,8 @@ object TableFeatureProtocolUtils { s"$DEFAULT_FEATURE_PROP_PREFIX$featureName" /** - * Determine whether a [[Protocol]] with the given reader protocol version can support column - * mapping. All table feature protocols that can support column mapping are capable of adding - * the feature to the `readerFeatures` field. This includes legacy reader protocol version - * (2, 7). - */ - def canSupportColumnMappingFeature(readerVersion: Int, writerVersion: Int): Boolean = { - readerVersion >= ColumnMappingTableFeature.minReaderVersion && - supportsWriterFeatures(writerVersion) - } - - /** - * Determine whether a [[Protocol]] with the given reader protocol version supports - * native features. All protocols that can support native reader features are capable - * of adding the feature to the `readerFeatures` field. + * Determine whether a [[Protocol]] with the given reader protocol version is capable of adding + * features into its `readerFeatures` field. */ def supportsReaderFeatures(readerVersion: Int): Boolean = { readerVersion >= TABLE_FEATURES_MIN_READER_VERSION diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala b/spark/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala index 390c8a021c2..1ffa32bd194 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala @@ -142,13 +142,13 @@ case class Protocol private ( // Correctness check // Reader and writer versions must match the status of reader and writer features require( - (supportsReaderFeatures || canSupportColumnMappingFeature) == readerFeatures.isDefined, + supportsReaderFeatures == readerFeatures.isDefined, "Mismatched minReaderVersion and readerFeatures.") require( supportsWriterFeatures == writerFeatures.isDefined, "Mismatched minWriterVersion and writerFeatures.") - // When reader is on table features, writer must be on table features too. + // When reader is on table features, writer must be on table features too if (supportsReaderFeatures && !supportsWriterFeatures) { throw DeltaErrors.tableFeatureReadRequiresWriteException( TableFeatureProtocolUtils.TABLE_FEATURES_MIN_WRITER_VERSION) @@ -165,7 +165,7 @@ case class Protocol private ( */ @JsonIgnore lazy val simpleString: String = { - if (!supportsTableFeatures) { + if (!supportsReaderFeatures && !supportsWriterFeatures) { s"$minReaderVersion,$minWriterVersion" } else { val readerFeaturesStr = readerFeatures @@ -202,12 +202,10 @@ object Protocol { def apply( minReaderVersion: Int = Action.readerVersion, minWriterVersion: Int = Action.writerVersion): Protocol = { - val shouldAddReaderFeatures = supportsReaderFeatures(minReaderVersion) || - canSupportColumnMappingFeature(minReaderVersion, minWriterVersion) new Protocol( minReaderVersion = minReaderVersion, minWriterVersion = minWriterVersion, - readerFeatures = if (shouldAddReaderFeatures) Some(Set()) else None, + readerFeatures = if (supportsReaderFeatures(minReaderVersion)) Some(Set()) else None, writerFeatures = if (supportsWriterFeatures(minWriterVersion)) Some(Set()) else None) } @@ -215,7 +213,7 @@ object Protocol { def forTableFeature(tf: TableFeature): Protocol = { // Every table feature is a writer feature. val writerFeatures = tf.requiredFeatures + tf - val readerFeatures = writerFeatures.filter(_.isReaderWriterFeature) + val readerFeatures = writerFeatures.filter(f => f.isReaderWriterFeature && !f.isLegacyFeature) val writerFeaturesNames = writerFeatures.map(_.name) val readerFeaturesNames = readerFeatures.map(_.name) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/ActionSerializerSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/ActionSerializerSuite.scala index 48aa427a539..e9121cd5ba0 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/ActionSerializerSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/ActionSerializerSuite.scala @@ -235,7 +235,7 @@ class ActionSerializerSuite extends QueryTest with SharedSparkSession with Delta expectedJson = s"""{"protocol":{"minReaderVersion":$TABLE_FEATURES_MIN_READER_VERSION,""" + s""""minWriterVersion":$TABLE_FEATURES_MIN_WRITER_VERSION,""" + - """"readerFeatures":[],""" + + """"readerFeatures":["testLegacyReaderWriter"],""" + """"writerFeatures":["testLegacyReaderWriter"]}}""") testActionSerDe( @@ -248,7 +248,7 @@ class ActionSerializerSuite extends QueryTest with SharedSparkSession with Delta expectedJson = s"""{"protocol":{"minReaderVersion":$TABLE_FEATURES_MIN_READER_VERSION,""" + s""""minWriterVersion":$TABLE_FEATURES_MIN_WRITER_VERSION,""" + - """"readerFeatures":["testReaderWriter"],""" + + """"readerFeatures":["testLegacyReaderWriter","testReaderWriter"],""" + """"writerFeatures":["testLegacyReaderWriter","testReaderWriter"]}}""") testActionSerDe( diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaCDCColumnMappingSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaCDCColumnMappingSuite.scala index 465a748ba65..76eaba21217 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaCDCColumnMappingSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaCDCColumnMappingSuite.scala @@ -619,7 +619,7 @@ trait DeltaCDCColumnMappingSuiteBase extends DeltaCDCSuiteBase } // upgrade to name mode val protocol = deltaLog.snapshot.protocol - val (r, w) = if (protocol.supportsTableFeatures) { + val (r, w) = if (protocol.supportsReaderFeatures || protocol.supportsWriterFeatures) { (TableFeatureProtocolUtils.TABLE_FEATURES_MIN_READER_VERSION, TableFeatureProtocolUtils.TABLE_FEATURES_MIN_WRITER_VERSION) } else { diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingTestUtils.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingTestUtils.scala index aced5b0ae1f..d6ad49e2c03 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingTestUtils.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingTestUtils.scala @@ -264,7 +264,7 @@ trait DeltaColumnMappingTestUtilsBase extends SharedSparkSession { Protocol.forNewTable(spark, Some(metadata)).minReaderVersion.toString), (Protocol.MIN_WRITER_VERSION_PROP, Protocol.forNewTable(spark, Some(metadata)).minWriterVersion.toString)) - if (snapshot.protocol.supportsTableFeatures) { + if (snapshot.protocol.supportsReaderFeatures || snapshot.protocol.supportsWriterFeatures) { props ++= Protocol.minProtocolComponentsFromAutomaticallyEnabledFeatures( spark, metadata, snapshot.protocol) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaProtocolVersionSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaProtocolVersionSuite.scala index 84aebb36023..4d0dfbe33a7 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaProtocolVersionSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaProtocolVersionSuite.scala @@ -214,38 +214,39 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest } test("upgrade to support table features - many features") { - withSQLConf(DeltaSQLConf.TABLE_FEATURES_TEST_FEATURES_ENABLED.key -> false.toString) { - withTempDir { path => - val log = createTableWithProtocol(Protocol(2, 5), path) - assert(log.update().protocol === Protocol(2, 5)) - val table = io.delta.tables.DeltaTable.forPath(spark, path.getCanonicalPath) - table.upgradeTableProtocol(2, TABLE_FEATURES_MIN_WRITER_VERSION) - // Setting table feature versions to a protocol without table features is a noop. - assert(log.update().protocol === Protocol(2, 5)) - spark.sql( - s"ALTER TABLE delta.`${path.getPath}` SET TBLPROPERTIES (" + - s" delta.feature.${RowTrackingFeature.name}='enabled'" + - s")") - table.upgradeTableProtocol( - TABLE_FEATURES_MIN_READER_VERSION, - TABLE_FEATURES_MIN_WRITER_VERSION) - assert( - log.snapshot.protocol === Protocol( - minReaderVersion = 2, - minWriterVersion = TABLE_FEATURES_MIN_WRITER_VERSION, - readerFeatures = Some(Set(ColumnMappingTableFeature.name)), - writerFeatures = Some( - Set( - AppendOnlyTableFeature, - InvariantsTableFeature, - ChangeDataFeedTableFeature, - CheckConstraintsTableFeature, - ColumnMappingTableFeature, - GeneratedColumnsTableFeature, - DomainMetadataTableFeature, - RowTrackingFeature) - .map(_.name)))) - } + withTempDir { path => + val log = createTableWithProtocol(Protocol(2, 5), path) + assert(log.update().protocol === Protocol(2, 5)) + val table = io.delta.tables.DeltaTable.forPath(spark, path.getCanonicalPath) + table.upgradeTableProtocol(2, TABLE_FEATURES_MIN_WRITER_VERSION) + // Setting table feature versions to a protocol without table features is a noop. + assert(log.update().protocol === Protocol(2, 5)) + spark.sql( + s"ALTER TABLE delta.`${path.getPath}` SET TBLPROPERTIES (" + + s" delta.feature.${TestWriterFeature.name}='enabled'" + + s")") + table.upgradeTableProtocol( + TABLE_FEATURES_MIN_READER_VERSION, + TABLE_FEATURES_MIN_WRITER_VERSION) + assert( + log.snapshot.protocol === Protocol( + minReaderVersion = 2, + minWriterVersion = TABLE_FEATURES_MIN_WRITER_VERSION, + readerFeatures = None, + writerFeatures = Some( + Set( + AppendOnlyTableFeature, + ChangeDataFeedTableFeature, + CheckConstraintsTableFeature, + ColumnMappingTableFeature, + GeneratedColumnsTableFeature, + InvariantsTableFeature, + TestLegacyWriterFeature, + TestRemovableLegacyWriterFeature, + TestLegacyReaderWriterFeature, + TestRemovableLegacyReaderWriterFeature, + TestWriterFeature) + .map(_.name)))) } } @@ -1312,7 +1313,7 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest deltaLog.snapshot.protocol === Protocol( minReaderVersion = 2, minWriterVersion = TABLE_FEATURES_MIN_WRITER_VERSION, - readerFeatures = Some(Set.empty), + readerFeatures = None, writerFeatures = Some(Set(TestLegacyReaderWriterFeature.name)))) assertPropertiesAndShowTblProperties(deltaLog, tableHasFeatures = true) } @@ -2135,7 +2136,7 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest assert(log.snapshot.protocol === Protocol( 2, TABLE_FEATURES_MIN_WRITER_VERSION, - readerFeatures = Some(Set.empty), + readerFeatures = None, writerFeatures = Some(Set(TestLegacyReaderWriterFeature.name)))) } } @@ -2376,27 +2377,6 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest } } - test("Column mapping appears in reader features") { - withTempDir { dir => - val deltaLog = DeltaLog.forTable(spark, dir) - sql( - s"""CREATE TABLE delta.`${deltaLog.dataPath}` (id bigint) USING delta - |TBLPROPERTIES ( - |delta.feature.${ColumnMappingTableFeature.name} = 'supported', - |delta.feature.${TestWriterFeature.name} = 'supported' - |)""".stripMargin) - assert(deltaLog.update().protocol === Protocol( - minReaderVersion = 2, - minWriterVersion = 7, - readerFeatures = Some(Set(ColumnMappingTableFeature.name)), - writerFeatures = Some(Set( - InvariantsTableFeature.name, - AppendOnlyTableFeature.name, - ColumnMappingTableFeature.name, - TestWriterFeature.name)))) - } - } - def protocolWithFeatures( readerFeatures: Seq[TableFeature] = Seq.empty, writerFeatures: Seq[TableFeature] = Seq.empty): Protocol = { @@ -2454,17 +2434,8 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest val readerVersion = Math.max(feature.minReaderVersion, 1) val expectedWriterFeatures = Some(Set(feature.name, InvariantsTableFeature.name, AppendOnlyTableFeature.name)) - val supportsColumnMapping = - canSupportColumnMappingFeature(readerVersion, TABLE_FEATURES_MIN_WRITER_VERSION) val expectedReaderFeatures: Option[Set[String]] = - if ((feature == ColumnMappingTableFeature && supportsColumnMapping) || - supportsReaderFeatures(readerVersion)) { - Some(Set(feature.name)) - } else if (supportsColumnMapping) { - Some(Set.empty) - } else { - None - } + if (supportsReaderFeatures(readerVersion)) Some(Set(feature.name)) else None assert( deltaLog.update().protocol === Protocol( diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceColumnMappingSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceColumnMappingSuite.scala index 36c142ace29..cae6b29f4c2 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceColumnMappingSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceColumnMappingSuite.scala @@ -303,7 +303,7 @@ trait ColumnMappingStreamingBlockedWorkflowSuiteBase extends ColumnMappingStream // upgrade to name mode val protocol = deltaLog.snapshot.protocol - val (r, w) = if (protocol.supportsTableFeatures) { + val (r, w) = if (protocol.supportsReaderFeatures || protocol.supportsWriterFeatures) { (TableFeatureProtocolUtils.TABLE_FEATURES_MIN_READER_VERSION, TableFeatureProtocolUtils.TABLE_FEATURES_MIN_WRITER_VERSION) } else { diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala index d87b2eebd3c..2b4f84c6916 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala @@ -3000,7 +3000,7 @@ class DeltaNameColumnMappingSuite extends DeltaSuite .save(tempDir.getCanonicalPath) val protocol = DeltaLog.forTable(spark, tempDir).snapshot.protocol - val (r, w) = if (protocol.supportsTableFeatures) { + val (r, w) = if (protocol.supportsReaderFeatures || protocol.supportsWriterFeatures) { (TableFeatureProtocolUtils.TABLE_FEATURES_MIN_READER_VERSION, TableFeatureProtocolUtils.TABLE_FEATURES_MIN_WRITER_VERSION) } else { diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTableFeatureSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTableFeatureSuite.scala index 38f1e275ad3..5dcb76e0d17 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTableFeatureSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTableFeatureSuite.scala @@ -197,8 +197,9 @@ class DeltaTableFeatureSuite val protocol = Protocol(2, TABLE_FEATURES_MIN_WRITER_VERSION).withFeature(TestLegacyReaderWriterFeature) - assert(protocol.readerFeatures.get === Set.empty) - assert(protocol.writerFeatures.get === Set(TestLegacyReaderWriterFeature.name)) + assert(!protocol.readerFeatures.isDefined) + assert( + protocol.writerFeatures.get === Set(TestLegacyReaderWriterFeature.name)) } test("merge protocols") { diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DescribeDeltaHistorySuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DescribeDeltaHistorySuite.scala index 20fdd92d18a..917a04ef5f7 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DescribeDeltaHistorySuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DescribeDeltaHistorySuite.scala @@ -424,7 +424,7 @@ trait DescribeDeltaHistorySuiteBase Seq("UPGRADE PROTOCOL", s"""{"minReaderVersion":$readerVersion,""" + s""""minWriterVersion":$writerVersion,""" + - s""""readerFeatures":[],""" + + s""""readerFeatures":["${TestLegacyReaderWriterFeature.name}"],""" + s""""writerFeatures":["${TestLegacyReaderWriterFeature.name}"]}"""), Seq($"operation", $"operationParameters.newProtocol")) // scalastyle:on line.size.limit diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/rowid/RowTrackingBackfillSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/rowid/RowTrackingBackfillSuite.scala index 1a2a76e9cdb..8eaaeef98a5 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/rowid/RowTrackingBackfillSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/rowid/RowTrackingBackfillSuite.scala @@ -460,8 +460,7 @@ class RowTrackingBackfillSuite assert( afterProtocol.minWriterVersion === TableFeatureProtocolUtils.TABLE_FEATURES_MIN_WRITER_VERSION) - assert(afterProtocol.readerFeatures === Some(Set( - ColumnMappingTableFeature.name))) + assert(afterProtocol.readerFeatures === None) assert( afterProtocol.writerFeatures === Some(( prevProtocol.implicitlyAndExplicitlySupportedFeatures ++