Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
  • Loading branch information
sabir-akhadov committed Feb 16, 2024
1 parent 4ecfa45 commit 1022eb1
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 6 deletions.
9 changes: 9 additions & 0 deletions spark/src/main/resources/error/delta-error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -1137,6 +1137,15 @@
],
"sqlState" : "22KD1"
},
"DELTA_INVALID_COLUMN_NAMES_WHEN_REMOVING_COLUMN_MAPPING" : {
"message" : [
"Found invalid character(s) among ' ,;{}()\\n\\t=' in the column names of your schema.",
"Invalid column names: <invalidColumnNames>.",
"Column mapping cannot be removed when there are invalid characters in the column names.",
"Please rename the columns to remove the invalid characters and execute this command again."
],
"sqlState" : "42K05"
},
"DELTA_INVALID_COMMITTED_VERSION" : {
"message" : [
"The committed version is <committedVersion> but the current version is <currentVersion>."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,20 @@ trait DeltaColumnMappingBase extends DeltaLogging {
protocol.isFeatureSupported(ColumnMappingTableFeature)

/**
* The only allowed mode change is from NoMapping to NameMapping. Other changes
* would require re-writing Parquet files and are not supported right now.
* Allow NameMapping -> NoMapping transition behind a feature flag.
* Otherwise only NoMapping -> NameMapping is allowed.
*/
private def allowMappingModeChange(
oldMode: DeltaColumnMappingMode,
newMode: DeltaColumnMappingMode): Boolean = {
if (oldMode == newMode) true
else oldMode == NoMapping && newMode == NameMapping
val removalAllowed = SparkSession.getActiveSession
.exists(_.conf.get(DeltaSQLConf.ALLOW_COLUMN_MAPPING_REMOVAL))
// No change.
(oldMode == newMode) ||
// Downgrade allowed with a flag.
(removalAllowed && (oldMode == NameMapping && newMode == NoMapping)) ||
// Upgrade always allowed.
(oldMode == NoMapping && newMode == NameMapping)
}

def isColumnMappingUpgrade(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2023,6 +2023,12 @@ trait DeltaErrorsBase
errorClass = "DELTA_INVALID_CHARACTERS_IN_COLUMN_NAMES",
messageParameters = invalidColumnNames.toArray)

def foundInvalidColumnNamesWhenRemovingColumnMapping(columnNames: Seq[String])
: Throwable =
new DeltaAnalysisException(
errorClass = "DELTA_INVALID_COLUMN_NAMES_WHEN_REMOVING_COLUMN_MAPPING",
messageParameters = columnNames.toArray)

def foundViolatingConstraintsForColumnChange(
operation: String,
columnName: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.actions.Protocol
import org.apache.spark.sql.delta.actions.TableFeatureProtocolUtils
import org.apache.spark.sql.delta.catalog.DeltaTableV2
import org.apache.spark.sql.delta.commands.columnmapping.RemoveColumnMappingCommand
import org.apache.spark.sql.delta.constraints.{CharVarcharConstraint, Constraints}
import org.apache.spark.sql.delta.schema.{SchemaMergingUtils, SchemaUtils}
import org.apache.spark.sql.delta.schema.SchemaUtils.transformColumnsStructs
Expand Down Expand Up @@ -111,6 +112,14 @@ case class AlterTableSetPropertiesDeltaCommand(

override def run(sparkSession: SparkSession): Seq[Row] = {
val deltaLog = table.deltaLog
val columnMappingPropertyKey = DeltaConfigs.COLUMN_MAPPING_MODE.key
val disableColumnMapping = configuration.get(columnMappingPropertyKey).contains("none")
val columnMappingRemovalAllowed = sparkSession.sessionState.conf.getConf(
DeltaSQLConf.ALLOW_COLUMN_MAPPING_REMOVAL)
if (disableColumnMapping && columnMappingRemovalAllowed) {
new RemoveColumnMappingCommand(deltaLog, table.catalogTable)
.run(sparkSession, removeColumnMappingTableProperty = false)
}
recordDeltaOperation(deltaLog, "delta.ddl.alter.setProperties") {
val txn = startTransaction()

Expand All @@ -129,7 +138,7 @@ case class AlterTableSetPropertiesDeltaCommand(
ClusteringTableFeature.name)
case _ =>
true
}
}.toMap

val newMetadata = metadata.copy(
description = configuration.getOrElse(TableCatalog.PROP_COMMENT, metadata.description),
Expand Down Expand Up @@ -162,6 +171,14 @@ case class AlterTableUnsetPropertiesDeltaCommand(

override def run(sparkSession: SparkSession): Seq[Row] = {
val deltaLog = table.deltaLog
val columnMappingPropertyKey = DeltaConfigs.COLUMN_MAPPING_MODE.key
val disableColumnMapping = propKeys.contains(columnMappingPropertyKey)
val columnMappingRemovalAllowed = sparkSession.sessionState.conf.getConf(
DeltaSQLConf.ALLOW_COLUMN_MAPPING_REMOVAL)
if (disableColumnMapping && columnMappingRemovalAllowed) {
new RemoveColumnMappingCommand(deltaLog, table.catalogTable)
.run(sparkSession, removeColumnMappingTableProperty = true)
}
recordDeltaOperation(deltaLog, "delta.ddl.alter.unsetProperties") {
val txn = startTransaction()
val metadata = txn.metadata
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1113,8 +1113,12 @@ def normalizeColumnNamesInDataType(
}

/**
* Finds columns with illegal names, i.e. names containing any of the ' ,;{}()\n\t=' characters.
* Finds columns with invalid names, i.e. names containing any of the ' ,;{}()\n\t=' characters.
*/
def findInvalidColumnNamesInSchema(schema: StructType): Seq[String] = {
findInvalidColumnNames(SchemaMergingUtils.explodeNestedFieldNames(schema))
}

private def findInvalidColumnNames(columnNames: Seq[String]): Seq[String] = {
val badChars = Seq(' ', ',', ';', '{', '}', '(', ')', '\n', '\t', '=')
columnNames.filter(colName => badChars.map(_.toString).exists(colName.contains))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1496,6 +1496,16 @@ trait DeltaSQLConfBase {
.booleanConf
.createWithDefault(true)

val ALLOW_COLUMN_MAPPING_REMOVAL =
buildConf("columnMapping.allowRemoval")
.internal()
.doc(
"""
|If enabled, allow the column mapping to be removed from a table.
|""".stripMargin)
.booleanConf
.createWithDefault(false)

val DELTALOG_MINOR_COMPACTION_USE_FOR_READS =
buildConf("deltaLog.minorCompaction.useForReads")
.doc("If true, minor compacted delta log files will be used for creating Snapshots")
Expand Down

0 comments on commit 1022eb1

Please sign in to comment.