Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
  • Loading branch information
sabir-akhadov committed Feb 28, 2024
1 parent 5d25578 commit 0e8c931
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ case class AlterTableSetPropertiesDeltaCommand(
val columnMappingRemovalAllowed = sparkSession.sessionState.conf.getConf(
DeltaSQLConf.ALLOW_COLUMN_MAPPING_REMOVAL)
if (disableColumnMapping && columnMappingRemovalAllowed) {
new RemoveColumnMappingCommand(deltaLog, table.catalogTable)
RemoveColumnMappingCommand(deltaLog, table.catalogTable)
.run(sparkSession, removeColumnMappingTableProperty = false)
}
recordDeltaOperation(deltaLog, "delta.ddl.alter.setProperties") {
Expand Down Expand Up @@ -176,7 +176,7 @@ case class AlterTableUnsetPropertiesDeltaCommand(
val columnMappingRemovalAllowed = sparkSession.sessionState.conf.getConf(
DeltaSQLConf.ALLOW_COLUMN_MAPPING_REMOVAL)
if (disableColumnMapping && columnMappingRemovalAllowed) {
new RemoveColumnMappingCommand(deltaLog, table.catalogTable)
RemoveColumnMappingCommand(deltaLog, table.catalogTable)
.run(sparkSession, removeColumnMappingTableProperty = true)
}
recordDeltaOperation(deltaLog, "delta.ddl.alter.unsetProperties") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,16 @@ package org.apache.spark.sql.delta.commands.columnmapping
import org.apache.spark.sql.delta.{DeltaErrors, DeltaLog}
import org.apache.spark.sql.delta.schema.{ImplicitMetadataOperation, SchemaUtils}

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.types.StructType

class RemoveColumnMappingCommand(deltaLog: DeltaLog, catalogOpt: Option[CatalogTable])
/**
* A command to remove the column mapping from a table.
*/
class RemoveColumnMappingCommand(
val deltaLog: DeltaLog,
val catalogOpt: Option[CatalogTable])
extends ImplicitMetadataOperation {
override protected val canMergeSchema: Boolean = false
override protected val canOverwriteSchema: Boolean = true
Expand All @@ -41,7 +46,7 @@ class RemoveColumnMappingCommand(deltaLog: DeltaLog, catalogOpt: Option[CatalogT
/**
* Verify none of the schema fields contain invalid column names.
*/
private def verifySchemaFieldNames(schema: StructType) = {
protected def verifySchemaFieldNames(schema: StructType) = {
val invalidColumnNames =
SchemaUtils.findInvalidColumnNamesInSchema(schema)
if (invalidColumnNames.nonEmpty) {
Expand All @@ -50,3 +55,11 @@ class RemoveColumnMappingCommand(deltaLog: DeltaLog, catalogOpt: Option[CatalogT
}
}
}

object RemoveColumnMappingCommand {
def apply(
deltaLog: DeltaLog,
catalogOpt: Option[CatalogTable]): RemoveColumnMappingCommand = {
new RemoveColumnMappingCommand(deltaLog, catalogOpt)
}
}

0 comments on commit 0e8c931

Please sign in to comment.