From c132361c790a01e9bd71cce9c6afc73fceacaa2a Mon Sep 17 00:00:00 2001 From: Andreas Chatzistergiou Date: Wed, 10 Jul 2024 16:09:35 +0200 Subject: [PATCH] Address Carmen's comments --- .../spark/sql/delta/DeltaColumnMapping.scala | 26 +++---------------- .../spark/sql/delta/actions/actions.scala | 6 +++-- .../io/delta/tables/DeltaTableSuite.scala | 16 +++++++----- .../sql/delta/DeltaColumnMappingSuite.scala | 12 ++++++--- 4 files changed, 26 insertions(+), 34 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scala index 782b11d795a..3729af5bf14 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scala @@ -131,29 +131,9 @@ trait DeltaColumnMappingBase extends DeltaLogging { } val isChangingModeOnExistingTable = oldMappingMode != newMappingMode && !isCreatingNewTable - if (isChangingModeOnExistingTable) { - if (!allowMappingModeChange(oldMappingMode, newMappingMode)) { - throw DeltaErrors.changeColumnMappingModeNotSupported( - oldMappingMode.name, newMappingMode.name) - } else { - // legal mode change, now check if protocol is upgraded before or part of this txn - val caseInsensitiveMap = CaseInsensitiveMap(newMetadata.configuration) - val minReaderVersion = caseInsensitiveMap - .get(Protocol.MIN_READER_VERSION_PROP).map(_.toInt) - .getOrElse(oldProtocol.minReaderVersion) - val minWriterVersion = caseInsensitiveMap - .get(Protocol.MIN_WRITER_VERSION_PROP).map(_.toInt) - .getOrElse(oldProtocol.minWriterVersion) - var newProtocol = Protocol(minReaderVersion, minWriterVersion) - val satisfiesWriterVersion = minWriterVersion >= ColumnMappingTableFeature.minWriterVersion - val satisfiesReaderVersion = minReaderVersion >= ColumnMappingTableFeature.minReaderVersion - // This is an OR check because `readerFeatures` and `writerFeatures` can independently - // support table features. - if ((newProtocol.supportsReaderFeatures && satisfiesWriterVersion) || - (newProtocol.supportsWriterFeatures && satisfiesReaderVersion)) { - newProtocol = newProtocol.withFeature(ColumnMappingTableFeature) - } - } + if (isChangingModeOnExistingTable && !allowMappingModeChange(oldMappingMode, newMappingMode)) { + throw DeltaErrors.changeColumnMappingModeNotSupported( + oldMappingMode.name, newMappingMode.name) } val updatedMetadata = updateColumnMappingMetadata( diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala b/spark/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala index e0d9d933008..9f9452d2d4b 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala @@ -255,8 +255,10 @@ object Protocol { val (readerVersion, writerVersion, enabledFeatures) = minProtocolComponentsFromMetadata(spark, metadata) // New table protocols should always be denormalized and then normalized to convert the - // protocol to the weakest possible form. For example: - // 1) (3, 7, RowIDs) is normalized to (3, 7, RowIDs). + // protocol to the weakest possible form. This means either converting a table features + // protocol to a legacy protocol or reducing the versions of a table features protocol. + // For example: + // 1) (3, 7, RowTracking) is normalized to (1, 7, RowTracking). // 2) (3, 7, AppendOnly, Invariants) is normalized to (1, 2). // 3) (2, 3) is normalized to (1, 3). Protocol(readerVersion, writerVersion) diff --git a/spark/src/test/scala/io/delta/tables/DeltaTableSuite.scala b/spark/src/test/scala/io/delta/tables/DeltaTableSuite.scala index b5966771666..b3a14bc0579 100644 --- a/spark/src/test/scala/io/delta/tables/DeltaTableSuite.scala +++ b/spark/src/test/scala/io/delta/tables/DeltaTableSuite.scala @@ -22,7 +22,7 @@ import java.util.Locale import scala.language.postfixOps // scalastyle:off import.ordering.noEmptyLine -import org.apache.spark.sql.delta.{DeltaIllegalArgumentException, DeltaLog, DeltaTableFeatureException, FakeFileSystem, TestReaderWriterFeature, TestWriterFeature} +import org.apache.spark.sql.delta._ import org.apache.spark.sql.delta.actions.{ Metadata, Protocol } import org.apache.spark.sql.delta.storage.LocalLogStore import org.apache.spark.sql.delta.test.DeltaSQLCommandTest @@ -574,13 +574,17 @@ class DeltaTableHadoopOptionsSuite extends QueryTest // update the protocol to support a writer feature. val table = DeltaTable.forPath(spark, path, fsOptions) table.addFeatureSupport(TestWriterFeature.name) - assert(log.update().protocol === Protocol(1, 7) - .withFeature(TestWriterFeature).merge(Protocol(1, 2))) + assert(log.update().protocol === Protocol(1, 7).withFeatures(Seq( + AppendOnlyTableFeature, + InvariantsTableFeature, + TestWriterFeature))) table.addFeatureSupport(TestReaderWriterFeature.name) assert( - log.update().protocol === Protocol(3, 7) - .withFeatures(Seq(TestWriterFeature, TestReaderWriterFeature)) - .merge(Protocol(1, 2))) + log.update().protocol === Protocol(3, 7).withFeatures(Seq( + AppendOnlyTableFeature, + InvariantsTableFeature, + TestWriterFeature, + TestReaderWriterFeature))) // update the protocol again with invalid feature name. assert(intercept[DeltaTableFeatureException] { diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingSuite.scala index 6bacb4ec030..de02e1b66a5 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingSuite.scala @@ -1895,9 +1895,7 @@ class DeltaColumnMappingSuite extends QueryTest s"""CREATE TABLE $testTableName |USING DELTA |TBLPROPERTIES( - |'$minReaderKey' = '3', - |'$minWriterKey' = '7', - |'${DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.key}' = 'true' + |'${DeltaConfigs.ROW_TRACKING_ENABLED.key}' = 'true' |) |AS SELECT * FROM RANGE(1) |""".stripMargin) @@ -1909,6 +1907,14 @@ class DeltaColumnMappingSuite extends QueryTest s"""ALTER TABLE $testTableName SET TBLPROPERTIES( |'$columnMappingMode'='name' |)""".stripMargin) + + val deltaLog = DeltaLog.forTable(spark, TableIdentifier(testTableName)) + assert(deltaLog.update().protocol === Protocol(2, 7).withFeatures(Seq( + AppendOnlyTableFeature, + InvariantsTableFeature, + ColumnMappingTableFeature, + RowTrackingFeature + ))) } }