From ac5051ffeaf6e3ebc895ea01fac793f940376cf0 Mon Sep 17 00:00:00 2001 From: Sabir Date: Thu, 8 Feb 2024 16:30:51 +0100 Subject: [PATCH] . --- .../resources/error/delta-error-classes.json | 9 ++++ .../spark/sql/delta/DeltaColumnMapping.scala | 14 ++++-- .../apache/spark/sql/delta/DeltaErrors.scala | 6 +++ .../commands/alterDeltaTableCommands.scala | 8 ++++ .../RemoveColumnMappingCommand.scala | 47 +++++++++++++++++++ .../sql/delta/sources/DeltaSQLConf.scala | 10 ++++ 6 files changed, 90 insertions(+), 4 deletions(-) create mode 100644 spark/src/main/scala/org/apache/spark/sql/delta/commands/columnmapping/RemoveColumnMappingCommand.scala diff --git a/spark/src/main/resources/error/delta-error-classes.json b/spark/src/main/resources/error/delta-error-classes.json index 854e4d5dc7c..76e0f465000 100644 --- a/spark/src/main/resources/error/delta-error-classes.json +++ b/spark/src/main/resources/error/delta-error-classes.json @@ -1130,6 +1130,15 @@ ], "sqlState" : "42K05" }, + "DELTA_INVALID_CHARACTERS_IN_COLUMN_NAMES_UPON_SETTING_COLUMN_MAPPING_TO_NONE" : { + "message" : [ + "Found invalid character(s) among ' ,;{}()\\n\\t=' in the column names of your schema.", + "Invalid column names: .", + "Column mapping cannot be disabled when there are invalid characters in the column names.", + "Please rename the columns to remove the invalid characters and execute this command again." + ], + "sqlState" : "42K05" + }, "DELTA_INVALID_CLONE_PATH" : { "message" : [ "The target location for CLONE needs to be an absolute path or table name. Use an", diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scala index 30850d9f94e..6155ebd2f94 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scala @@ -85,14 +85,20 @@ trait DeltaColumnMappingBase extends DeltaLogging { protocol.isFeatureSupported(ColumnMappingTableFeature) /** - * The only allowed mode change is from NoMapping to NameMapping. Other changes - * would require re-writing Parquet files and are not supported right now. + * Allow NameMapping -> NoMapping transition behind a feature flag. + * Otherwise only NoMapping -> NameMapping is allowed. */ private def allowMappingModeChange( oldMode: DeltaColumnMappingMode, newMode: DeltaColumnMappingMode): Boolean = { - if (oldMode == newMode) true - else oldMode == NoMapping && newMode == NameMapping + val removalAllowed = SparkSession.getActiveSession + .exists(_.conf.get(DeltaSQLConf.ALLOW_COLUMN_MAPPING_REMOVAL)) + // No change. + (oldMode == newMode) || + // Downgrade allowed with a flag. + (removalAllowed && (oldMode == NameMapping && newMode == NoMapping)) || + // Upgrade always allowed. + (oldMode == NoMapping && newMode == NameMapping) } def isColumnMappingUpgrade( diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala index 2549bea2981..d5341eed70a 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala @@ -2030,6 +2030,12 @@ trait DeltaErrorsBase errorClass = "DELTA_INVALID_CHARACTERS_IN_COLUMN_NAMES", messageParameters = invalidColumnNames.toArray) + def foundInvalidCharsInColumnNamesUponSettingColumnMappingToNone(columnNames: Seq[String]) + : Throwable = + new DeltaAnalysisException( + errorClass = "DELTA_INVALID_CHARACTERS_IN_COLUMN_NAMES_UPON_SETTING_COLUMN_MAPPING_TO_NONE", + messageParameters = columnNames.toArray) + def foundViolatingConstraintsForColumnChange( operation: String, columnName: String, diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala index 4b9221eb0cd..d8d8f2597c2 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.delta._ import org.apache.spark.sql.delta.actions.Protocol import org.apache.spark.sql.delta.actions.TableFeatureProtocolUtils import org.apache.spark.sql.delta.catalog.DeltaTableV2 +import org.apache.spark.sql.delta.commands.columnmapping.RemoveColumnMappingCommand import org.apache.spark.sql.delta.constraints.{CharVarcharConstraint, Constraints} import org.apache.spark.sql.delta.schema.{SchemaMergingUtils, SchemaUtils} import org.apache.spark.sql.delta.schema.SchemaUtils.transformColumnsStructs @@ -111,6 +112,13 @@ case class AlterTableSetPropertiesDeltaCommand( override def run(sparkSession: SparkSession): Seq[Row] = { val deltaLog = table.deltaLog + val columnMappingPropertyKey = DeltaConfigs.COLUMN_MAPPING_MODE.key + val disableColumnMapping = configuration.get(columnMappingPropertyKey).contains("none") + val columnMappingRemovalAllowed = sparkSession.sessionState.conf.getConf( + DeltaSQLConf.ALLOW_COLUMN_MAPPING_REMOVAL) + if (disableColumnMapping && columnMappingRemovalAllowed) { + return new RemoveColumnMappingCommand(deltaLog, table.catalogTable).run(sparkSession) + } recordDeltaOperation(deltaLog, "delta.ddl.alter.setProperties") { val txn = startTransaction() diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/columnmapping/RemoveColumnMappingCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/columnmapping/RemoveColumnMappingCommand.scala new file mode 100644 index 00000000000..496fdc67019 --- /dev/null +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/columnmapping/RemoveColumnMappingCommand.scala @@ -0,0 +1,47 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.commands.columnmapping + +import org.apache.spark.sql.delta.{DeltaAnalysisException, DeltaErrors, DeltaLog, NoMapping} +import org.apache.spark.sql.delta.schema.{ImplicitMetadataOperation, SchemaUtils} + +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.catalyst.catalog.CatalogTable +import org.apache.spark.sql.types.StructType + +class RemoveColumnMappingCommand(deltaLog: DeltaLog, catalogOpt: Option[CatalogTable]) + extends ImplicitMetadataOperation { + override protected val canMergeSchema: Boolean = false + override protected val canOverwriteSchema: Boolean = true + + def run(spark: SparkSession): Seq[Row] = { + val schema = deltaLog.update().schema + checkSchemaFieldNames(schema) + Seq.empty + } + + private def checkSchemaFieldNames(schema: StructType) = { + try { + SchemaUtils.checkSchemaFieldNames(schema, NoMapping) + } catch { + case e: DeltaAnalysisException + if e.errorClass.contains("DELTA_INVALID_CHARACTERS_IN_COLUMN_NAMES") => + throw DeltaErrors + .foundInvalidCharsInColumnNamesUponSettingColumnMappingToNone(e.getMessageParametersArray) + } + } +} diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala index bba5463b3bd..1bc2c06b2e4 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala @@ -1496,6 +1496,16 @@ trait DeltaSQLConfBase { .booleanConf .createWithDefault(true) + val ALLOW_COLUMN_MAPPING_REMOVAL = + buildConf("columnMapping.allowRemoval") + .internal() + .doc( + """ + |If enabled, allow the column mapping to be removed from a table. + |""".stripMargin) + .booleanConf + .createWithDefault(false) + val DELTALOG_MINOR_COMPACTION_USE_FOR_READS = buildConf("deltaLog.minorCompaction.useForReads") .doc("If true, minor compacted delta log files will be used for creating Snapshots")