Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
  • Loading branch information
sabir-akhadov committed Feb 8, 2024
1 parent 49f2625 commit 68c7458
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 4 deletions.
9 changes: 9 additions & 0 deletions spark/src/main/resources/error/delta-error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -1130,6 +1130,15 @@
],
"sqlState" : "42K05"
},
"DELTA_INVALID_CHARACTERS_IN_COLUMN_NAMES_UPON_SETTING_COLUMN_MAPPING_TO_NONE" : {
"message" : [
"Found invalid character(s) among ' ,;{}()\\n\\t=' in the column names of your schema.",
"Invalid column names: <invalidColumnNames>.",
"Column mapping cannot be disabled when there are invalid characters in the column names.",
"Please rename the columns to remove the invalid characters and execute this command again."
],
"sqlState" : "42K05"
},
"DELTA_INVALID_CLONE_PATH" : {
"message" : [
"The target location for CLONE needs to be an absolute path or table name. Use an",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,20 @@ trait DeltaColumnMappingBase extends DeltaLogging {
protocol.isFeatureSupported(ColumnMappingTableFeature)

/**
* The only allowed mode change is from NoMapping to NameMapping. Other changes
* would require re-writing Parquet files and are not supported right now.
* Allow NameMapping -> NoMapping transition behind a feature flag.
* Otherwise only NoMapping -> NameMapping is allowed.
*/
private def allowMappingModeChange(
oldMode: DeltaColumnMappingMode,
newMode: DeltaColumnMappingMode): Boolean = {
if (oldMode == newMode) true
else oldMode == NoMapping && newMode == NameMapping
val removalAllowed = SparkSession.getActiveSession
.exists(_.conf.get(DeltaSQLConf.ALLOW_COLUMN_MAPPING_REMOVAL))
// No change.
(oldMode == newMode) ||
// Downgrade allowed with a flag.
(removalAllowed && (oldMode == NameMapping && newMode == NoMapping)) ||
// Upgrade always allowed.
(oldMode == NoMapping && newMode == NameMapping)
}

def isColumnMappingUpgrade(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2030,6 +2030,12 @@ trait DeltaErrorsBase
errorClass = "DELTA_INVALID_CHARACTERS_IN_COLUMN_NAMES",
messageParameters = invalidColumnNames.toArray)

def foundInvalidCharsInColumnNamesUponSettingColumnMappingToNone(columnNames: Seq[String])
: Throwable =
new DeltaAnalysisException(
errorClass = "DELTA_INVALID_CHARACTERS_IN_COLUMN_NAMES_UPON_SETTING_COLUMN_MAPPING_TO_NONE",
messageParameters = columnNames.toArray)

def foundViolatingConstraintsForColumnChange(
operation: String,
columnName: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.actions.Protocol
import org.apache.spark.sql.delta.actions.TableFeatureProtocolUtils
import org.apache.spark.sql.delta.catalog.DeltaTableV2
import org.apache.spark.sql.delta.commands.columnmapping.RemoveColumnMappingCommand
import org.apache.spark.sql.delta.constraints.{CharVarcharConstraint, Constraints}
import org.apache.spark.sql.delta.schema.{SchemaMergingUtils, SchemaUtils}
import org.apache.spark.sql.delta.schema.SchemaUtils.transformColumnsStructs
Expand Down Expand Up @@ -111,6 +112,13 @@ case class AlterTableSetPropertiesDeltaCommand(

override def run(sparkSession: SparkSession): Seq[Row] = {
val deltaLog = table.deltaLog
val columnMappingPropertyKey = DeltaConfigs.COLUMN_MAPPING_MODE.key
val disableColumnMapping = configuration.get(columnMappingPropertyKey).contains("none")
val columnMappingRemovalAllowed = sparkSession.sessionState.conf.getConf(
DeltaSQLConf.ALLOW_COLUMN_MAPPING_REMOVAL)
if (disableColumnMapping && columnMappingRemovalAllowed) {
return new RemoveColumnMappingCommand(deltaLog, table.catalogTable).run(sparkSession)
}
recordDeltaOperation(deltaLog, "delta.ddl.alter.setProperties") {
val txn = startTransaction()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1496,6 +1496,16 @@ trait DeltaSQLConfBase {
.booleanConf
.createWithDefault(true)

val ALLOW_COLUMN_MAPPING_REMOVAL =
buildConf("columnMapping.allowRemoval")
.internal()
.doc(
"""
|If enabled, allow the column mapping to be removed from a table.
|""".stripMargin)
.booleanConf
.createWithDefault(false)

val DELTALOG_MINOR_COMPACTION_USE_FOR_READS =
buildConf("deltaLog.minorCompaction.useForReads")
.doc("If true, minor compacted delta log files will be used for creating Snapshots")
Expand Down

0 comments on commit 68c7458

Please sign in to comment.