Skip to content

Commit

Permalink
[Spark][3.2] Fix dropping type widening feature with multipart identi…
Browse files Browse the repository at this point in the history
…fiers (delta-io#3036)

## Description
Fix an issue found while testing the Delta 3.2 RC2:
Dropping the type widening table feature may fail parsing the table
identifier:

```
ALTER TABLE default.type_widening_int DROP FEATURE 'typeWidening-preview' TRUNCATE HISTORY;

[PARSE_SYNTAX_ERROR] Syntax error at or near '.'.(line 1, pos 21)

== SQL ==
spark_catalog.default.type_widening_int
```

Parsing the table identifier isn't needed as it's not used by the REORG
operation that rewrite files when dropping the feature. This change skip
parsing the table identifier and directly passes the table name to the
REORG command

## How was this patch tested?
Added test covering the issue
  • Loading branch information
johanl-db authored May 3, 2024
1 parent 9ac5907 commit d68c54e
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -230,22 +230,15 @@ case class TypeWideningPreDowngradeCommand(table: DeltaTableV2)
val numFilesToRewrite = TypeWidening.numFilesRequiringRewrite(table.initialSnapshot)
if (numFilesToRewrite == 0L) return 0L

// Get the table Id and catalog from the delta table to build a ResolvedTable plan for the reorg
// command.
// Wrap `table` in a ResolvedTable that can be passed to DeltaReorgTableCommand. The catalog &
// table ID won't be used by DeltaReorgTableCommand.
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
val tableId = table.spark
.sessionState
.sqlParser
.parseTableIdentifier(table.name).nameParts.asIdentifier
val catalog = table.spark.sessionState.catalogManager.currentCatalog.asTableCatalog
val tableId = Seq(table.name()).asIdentifier

val reorg = DeltaReorgTableCommand(
ResolvedTable.create(
catalog,
tableId,
table
),
DeltaReorgTableSpec(DeltaReorgTableMode.REWRITE_TYPE_WIDENING, None)
target = ResolvedTable.create(catalog, tableId, table),
reorgTableSpec = DeltaReorgTableSpec(DeltaReorgTableMode.REWRITE_TYPE_WIDENING, None)
)(Nil)

reorg.run(table.spark)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.hadoop.fs.Path

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.sql.{AnalysisException, DataFrame, Encoder, QueryTest, Row}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Expression, Literal}
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
import org.apache.spark.sql.errors.QueryErrorsBase
Expand Down Expand Up @@ -1011,6 +1012,38 @@ trait DeltaTypeWideningTableFeatureTests {
checkAnswer(readDeltaTable(tempPath), Seq.empty)
}

test("drop feature using sql with multipart identifier") {
withTempDatabase { databaseName =>
val tableName = "test_table"
withTable(tableName) {
sql(s"CREATE TABLE $databaseName.$tableName (a byte) USING DELTA " +
s"TBLPROPERTIES ('${DeltaConfigs.ENABLE_TYPE_WIDENING.key}' = 'true')")
sql(s"INSERT INTO $databaseName.$tableName VALUES (1)")
sql(s"ALTER TABLE $databaseName.$tableName CHANGE COLUMN a TYPE INT")
sql(s"INSERT INTO $databaseName.$tableName VALUES (2)")

val deltaLog = DeltaLog.forTable(spark, TableIdentifier(tableName, Some(databaseName)))

checkError(
exception = intercept[DeltaTableFeatureException] {
sql(s"ALTER TABLE $databaseName.$tableName " +
s"DROP FEATURE '${TypeWideningTableFeature.name}'"
).collect()
},
errorClass = "DELTA_FEATURE_DROP_WAIT_FOR_RETENTION_PERIOD",
parameters = Map(
"feature" -> TypeWideningTableFeature.name,
"logRetentionPeriodKey" -> DeltaConfigs.LOG_RETENTION.key,
"logRetentionPeriod" -> DeltaConfigs.LOG_RETENTION
.fromMetaData(deltaLog.unsafeVolatileMetadata).toString,
"truncateHistoryLogRetentionPeriod" ->
DeltaConfigs.TABLE_FEATURE_DROP_TRUNCATE_HISTORY_LOG_RETENTION
.fromMetaData(deltaLog.unsafeVolatileMetadata).toString)
)
}
}
}

// Rewriting the data when dropping the table feature relies on the default row commit version
// being set even when row tracking isn't enabled.
for(rowTrackingEnabled <- BOOLEAN_DOMAIN) {
Expand Down

0 comments on commit d68c54e

Please sign in to comment.