Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
  • Loading branch information
sabir-akhadov committed May 21, 2024
1 parent 25a42df commit 166115e
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ import scala.util.control.NonFatal

import org.apache.spark.sql.delta.catalog.DeltaTableV2
import org.apache.spark.sql.delta.commands.{AlterTableSetPropertiesDeltaCommand, AlterTableUnsetPropertiesDeltaCommand, DeltaReorgTableCommand, DeltaReorgTableMode, DeltaReorgTableSpec}
import org.apache.spark.sql.delta.commands.columnmapping.RemoveColumnMappingCommand
import org.apache.spark.sql.delta.managedcommit.ManagedCommitUtils
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.schema.SchemaMergingUtils
import org.apache.spark.sql.delta.util.{Utils => DeltaUtils}
import org.apache.spark.sql.util.ScalaExtensions._

Expand Down Expand Up @@ -344,3 +346,33 @@ case class TypeWideningPreDowngradeCommand(table: DeltaTableV2)
true
}
}
case class ColumnMappingPreDowngradeCommand(table: DeltaTableV2)
extends PreDowngradeTableFeatureCommand
with DeltaLogging {

/**
* We first remove the table feature property to prevent any transactions from writting data
* files with the physical names. This will cause any concurrent transactions to fail.
* Then, we run RemoveColumnMappingCommand to rewrite the files rename columns.
* Note, during the protocol downgrade phase we validate whether all invariants still hold.
* This should detect if any concurrent txns enabled the table property again.
*
* @return Returns true if it removed table property and/or has rewritten the data.
* False otherwise.
*/
override def removeFeatureTracesIfNeeded(): Boolean = {
val spark = table.spark

// Latest snapshot looks clean. No action is required. We may proceed
// to the protocol downgrade phase.
if (ColumnMappingTableFeature.validateRemoval(table.initialSnapshot)) return false

recordDeltaOperation(
table.deltaLog,
opType = "delta.columnMappingFeatureRemoval") {
RemoveColumnMappingCommand(table.deltaLog, table.catalogTable)
.run(spark, removeColumnMappingTableProperty = true)
}
true
}
}
23 changes: 23 additions & 0 deletions spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.spark.sql.delta.actions._
import org.apache.spark.sql.delta.catalog.DeltaTableV2
import org.apache.spark.sql.delta.constraints.{Constraints, Invariants}
import org.apache.spark.sql.delta.managedcommit.ManagedCommitUtils
import org.apache.spark.sql.delta.schema.SchemaMergingUtils
import org.apache.spark.sql.delta.schema.SchemaUtils
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.util.{Utils => DeltaUtils}
Expand Down Expand Up @@ -490,6 +491,7 @@ object ColumnMappingTableFeature
name = "columnMapping",
minReaderVersion = 2,
minWriterVersion = 5)
with RemovableFeature
with FeatureAutomaticallyEnabledByMetadata {
override def metadataRequiresFeatureToBeEnabled(
metadata: Metadata,
Expand All @@ -499,6 +501,27 @@ object ColumnMappingTableFeature
case _ => true
}
}

override def validateRemoval(snapshot: Snapshot): Boolean = {
val schemaHasNoColumnMappingMetadata =
SchemaMergingUtils.explode(snapshot.schema).forall { case (_, col) =>
!DeltaColumnMapping.hasPhysicalName(col) &&
!DeltaColumnMapping.hasColumnId(col)
}
val metadataHasNoMappingMode = snapshot.metadata.columnMappingMode match {
case NoMapping => true
case _ => false
}
schemaHasNoColumnMappingMetadata && metadataHasNoMappingMode
}

override def actionUsesFeature(action: Action): Boolean = action match {
case m: Metadata => DeltaConfigs.COLUMN_MAPPING_MODE.fromMetaData(m) != NoMapping
case _ => false
}

override def preDowngradeCommand(table: DeltaTableV2): PreDowngradeTableFeatureCommand =
ColumnMappingPreDowngradeCommand(table)
}

object IdentityColumnsTableFeature
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ trait RemoveColumnMappingSuiteUtils extends QueryTest with DeltaColumnMappingSui
val originalSnapshot = deltaLog.update()

assert(originalSnapshot.schema.head.getComment().get == comment,
"Renamed column should preserved comment.")
"Renamed column should preserve comment.")
val originalFiles = getFiles(originalSnapshot)
val startingVersion = deltaLog.update().version

Expand All @@ -96,22 +96,26 @@ trait RemoveColumnMappingSuiteUtils extends QueryTest with DeltaColumnMappingSui
deltaLog: DeltaLog,
originalFiles: Array[AddFile],
startingVersion: Long,
originalData: Array[Row]): Unit = {
originalData: Array[Row],
droppedFeature: Boolean = false): Unit = {
checkAnswer(
spark.table(tableName = testTableName).select(logicalColumnName),
originalData)

val newSnapshot = deltaLog.update()
assert(newSnapshot.version - startingVersion == 1, "Should rewrite the table in one commit.")
// Drop feature adds 2 empty commits.
val versionsAddedByRewrite = if (droppedFeature) 3 else 1
assert(newSnapshot.version - startingVersion == versionsAddedByRewrite,
s"Should rewrite the table in $versionsAddedByRewrite commits.")

val history = deltaLog.history.getHistory(deltaLog.update().version)
val rewriteVersion = deltaLog.update().version - versionsAddedByRewrite + 1
val history = deltaLog.history.getHistory(rewriteVersion, Some(rewriteVersion))
verifyColumnMappingOperationIsRecordedInHistory(history)

assert(newSnapshot.schema.head.name == logicalColumnName, "Should rename the first column.")

verifyColumnMappingSchemaMetadataIsRemoved(newSnapshot)

verifyColumnMappingTablePropertiesAbsent(newSnapshot, unsetTableProperty)
verifyColumnMappingTablePropertiesAbsent(newSnapshot, unsetTableProperty || droppedFeature)
assert(originalFiles.map(_.numLogicalRecords.get).sum ==
newSnapshot.allFiles.map(_.numLogicalRecords.get).collect().sum,
"Should have the same number of records.")
Expand Down

0 comments on commit 166115e

Please sign in to comment.