From 1022eb1f5b7c4fd7fbadb454b4df5c8458dd287e Mon Sep 17 00:00:00 2001 From: Sabir Date: Fri, 16 Feb 2024 11:26:48 +0100 Subject: [PATCH] . --- .../resources/error/delta-error-classes.json | 9 +++++++++ .../spark/sql/delta/DeltaColumnMapping.scala | 14 ++++++++++---- .../apache/spark/sql/delta/DeltaErrors.scala | 6 ++++++ .../commands/alterDeltaTableCommands.scala | 19 ++++++++++++++++++- .../spark/sql/delta/schema/SchemaUtils.scala | 6 +++++- .../sql/delta/sources/DeltaSQLConf.scala | 10 ++++++++++ 6 files changed, 58 insertions(+), 6 deletions(-) diff --git a/spark/src/main/resources/error/delta-error-classes.json b/spark/src/main/resources/error/delta-error-classes.json index db61c2a074f..2b6025062fa 100644 --- a/spark/src/main/resources/error/delta-error-classes.json +++ b/spark/src/main/resources/error/delta-error-classes.json @@ -1137,6 +1137,15 @@ ], "sqlState" : "22KD1" }, + "DELTA_INVALID_COLUMN_NAMES_WHEN_REMOVING_COLUMN_MAPPING" : { + "message" : [ + "Found invalid character(s) among ' ,;{}()\\n\\t=' in the column names of your schema.", + "Invalid column names: .", + "Column mapping cannot be removed when there are invalid characters in the column names.", + "Please rename the columns to remove the invalid characters and execute this command again." + ], + "sqlState" : "42K05" + }, "DELTA_INVALID_COMMITTED_VERSION" : { "message" : [ "The committed version is but the current version is ." diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scala index 30850d9f94e..6155ebd2f94 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scala @@ -85,14 +85,20 @@ trait DeltaColumnMappingBase extends DeltaLogging { protocol.isFeatureSupported(ColumnMappingTableFeature) /** - * The only allowed mode change is from NoMapping to NameMapping. Other changes - * would require re-writing Parquet files and are not supported right now. + * Allow NameMapping -> NoMapping transition behind a feature flag. + * Otherwise only NoMapping -> NameMapping is allowed. */ private def allowMappingModeChange( oldMode: DeltaColumnMappingMode, newMode: DeltaColumnMappingMode): Boolean = { - if (oldMode == newMode) true - else oldMode == NoMapping && newMode == NameMapping + val removalAllowed = SparkSession.getActiveSession + .exists(_.conf.get(DeltaSQLConf.ALLOW_COLUMN_MAPPING_REMOVAL)) + // No change. + (oldMode == newMode) || + // Downgrade allowed with a flag. + (removalAllowed && (oldMode == NameMapping && newMode == NoMapping)) || + // Upgrade always allowed. + (oldMode == NoMapping && newMode == NameMapping) } def isColumnMappingUpgrade( diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala index 3042d4ee443..76e7398e69c 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala @@ -2023,6 +2023,12 @@ trait DeltaErrorsBase errorClass = "DELTA_INVALID_CHARACTERS_IN_COLUMN_NAMES", messageParameters = invalidColumnNames.toArray) + def foundInvalidColumnNamesWhenRemovingColumnMapping(columnNames: Seq[String]) + : Throwable = + new DeltaAnalysisException( + errorClass = "DELTA_INVALID_COLUMN_NAMES_WHEN_REMOVING_COLUMN_MAPPING", + messageParameters = columnNames.toArray) + def foundViolatingConstraintsForColumnChange( operation: String, columnName: String, diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala index 4b9221eb0cd..885f5362673 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.delta._ import org.apache.spark.sql.delta.actions.Protocol import org.apache.spark.sql.delta.actions.TableFeatureProtocolUtils import org.apache.spark.sql.delta.catalog.DeltaTableV2 +import org.apache.spark.sql.delta.commands.columnmapping.RemoveColumnMappingCommand import org.apache.spark.sql.delta.constraints.{CharVarcharConstraint, Constraints} import org.apache.spark.sql.delta.schema.{SchemaMergingUtils, SchemaUtils} import org.apache.spark.sql.delta.schema.SchemaUtils.transformColumnsStructs @@ -111,6 +112,14 @@ case class AlterTableSetPropertiesDeltaCommand( override def run(sparkSession: SparkSession): Seq[Row] = { val deltaLog = table.deltaLog + val columnMappingPropertyKey = DeltaConfigs.COLUMN_MAPPING_MODE.key + val disableColumnMapping = configuration.get(columnMappingPropertyKey).contains("none") + val columnMappingRemovalAllowed = sparkSession.sessionState.conf.getConf( + DeltaSQLConf.ALLOW_COLUMN_MAPPING_REMOVAL) + if (disableColumnMapping && columnMappingRemovalAllowed) { + new RemoveColumnMappingCommand(deltaLog, table.catalogTable) + .run(sparkSession, removeColumnMappingTableProperty = false) + } recordDeltaOperation(deltaLog, "delta.ddl.alter.setProperties") { val txn = startTransaction() @@ -129,7 +138,7 @@ case class AlterTableSetPropertiesDeltaCommand( ClusteringTableFeature.name) case _ => true - } + }.toMap val newMetadata = metadata.copy( description = configuration.getOrElse(TableCatalog.PROP_COMMENT, metadata.description), @@ -162,6 +171,14 @@ case class AlterTableUnsetPropertiesDeltaCommand( override def run(sparkSession: SparkSession): Seq[Row] = { val deltaLog = table.deltaLog + val columnMappingPropertyKey = DeltaConfigs.COLUMN_MAPPING_MODE.key + val disableColumnMapping = propKeys.contains(columnMappingPropertyKey) + val columnMappingRemovalAllowed = sparkSession.sessionState.conf.getConf( + DeltaSQLConf.ALLOW_COLUMN_MAPPING_REMOVAL) + if (disableColumnMapping && columnMappingRemovalAllowed) { + new RemoveColumnMappingCommand(deltaLog, table.catalogTable) + .run(sparkSession, removeColumnMappingTableProperty = true) + } recordDeltaOperation(deltaLog, "delta.ddl.alter.unsetProperties") { val txn = startTransaction() val metadata = txn.metadata diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/schema/SchemaUtils.scala b/spark/src/main/scala/org/apache/spark/sql/delta/schema/SchemaUtils.scala index b028f97a977..597f5b33bd0 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/schema/SchemaUtils.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/schema/SchemaUtils.scala @@ -1113,8 +1113,12 @@ def normalizeColumnNamesInDataType( } /** - * Finds columns with illegal names, i.e. names containing any of the ' ,;{}()\n\t=' characters. + * Finds columns with invalid names, i.e. names containing any of the ' ,;{}()\n\t=' characters. */ + def findInvalidColumnNamesInSchema(schema: StructType): Seq[String] = { + findInvalidColumnNames(SchemaMergingUtils.explodeNestedFieldNames(schema)) + } + private def findInvalidColumnNames(columnNames: Seq[String]): Seq[String] = { val badChars = Seq(' ', ',', ';', '{', '}', '(', ')', '\n', '\t', '=') columnNames.filter(colName => badChars.map(_.toString).exists(colName.contains)) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala index bba5463b3bd..1bc2c06b2e4 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala @@ -1496,6 +1496,16 @@ trait DeltaSQLConfBase { .booleanConf .createWithDefault(true) + val ALLOW_COLUMN_MAPPING_REMOVAL = + buildConf("columnMapping.allowRemoval") + .internal() + .doc( + """ + |If enabled, allow the column mapping to be removed from a table. + |""".stripMargin) + .booleanConf + .createWithDefault(false) + val DELTALOG_MINOR_COMPACTION_USE_FOR_READS = buildConf("deltaLog.minorCompaction.useForReads") .doc("If true, minor compacted delta log files will be used for creating Snapshots")