From d68c54e2e02cd699fb52f6250c734cdd2227024c Mon Sep 17 00:00:00 2001 From: Johan Lasperas Date: Fri, 3 May 2024 18:38:16 +0200 Subject: [PATCH] [Spark][3.2] Fix dropping type widening feature with multipart identifiers (#3036) ## Description Fix an issue found while testing the Delta 3.2 RC2: Dropping the type widening table feature may fail parsing the table identifier: ``` ALTER TABLE default.type_widening_int DROP FEATURE 'typeWidening-preview' TRUNCATE HISTORY; [PARSE_SYNTAX_ERROR] Syntax error at or near '.'.(line 1, pos 21) == SQL == spark_catalog.default.type_widening_int ``` Parsing the table identifier isn't needed as it's not used by the REORG operation that rewrite files when dropping the feature. This change skip parsing the table identifier and directly passes the table name to the REORG command ## How was this patch tested? Added test covering the issue --- .../PreDowngradeTableFeatureCommand.scala | 17 +++------- .../sql/delta/DeltaTypeWideningSuite.scala | 33 +++++++++++++++++++ 2 files changed, 38 insertions(+), 12 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/PreDowngradeTableFeatureCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/PreDowngradeTableFeatureCommand.scala index 4e4774aef51..be468a60e53 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/PreDowngradeTableFeatureCommand.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/PreDowngradeTableFeatureCommand.scala @@ -230,22 +230,15 @@ case class TypeWideningPreDowngradeCommand(table: DeltaTableV2) val numFilesToRewrite = TypeWidening.numFilesRequiringRewrite(table.initialSnapshot) if (numFilesToRewrite == 0L) return 0L - // Get the table Id and catalog from the delta table to build a ResolvedTable plan for the reorg - // command. + // Wrap `table` in a ResolvedTable that can be passed to DeltaReorgTableCommand. The catalog & + // table ID won't be used by DeltaReorgTableCommand. import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ - val tableId = table.spark - .sessionState - .sqlParser - .parseTableIdentifier(table.name).nameParts.asIdentifier val catalog = table.spark.sessionState.catalogManager.currentCatalog.asTableCatalog + val tableId = Seq(table.name()).asIdentifier val reorg = DeltaReorgTableCommand( - ResolvedTable.create( - catalog, - tableId, - table - ), - DeltaReorgTableSpec(DeltaReorgTableMode.REWRITE_TYPE_WIDENING, None) + target = ResolvedTable.create(catalog, tableId, table), + reorgTableSpec = DeltaReorgTableSpec(DeltaReorgTableMode.REWRITE_TYPE_WIDENING, None) )(Nil) reorg.run(table.spark) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala index d9d9266f222..adebbd9b9db 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala @@ -31,6 +31,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.sql.{AnalysisException, DataFrame, Encoder, QueryTest, Row} +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Expression, Literal} import org.apache.spark.sql.catalyst.plans.logical.LocalRelation import org.apache.spark.sql.errors.QueryErrorsBase @@ -1011,6 +1012,38 @@ trait DeltaTypeWideningTableFeatureTests { checkAnswer(readDeltaTable(tempPath), Seq.empty) } + test("drop feature using sql with multipart identifier") { + withTempDatabase { databaseName => + val tableName = "test_table" + withTable(tableName) { + sql(s"CREATE TABLE $databaseName.$tableName (a byte) USING DELTA " + + s"TBLPROPERTIES ('${DeltaConfigs.ENABLE_TYPE_WIDENING.key}' = 'true')") + sql(s"INSERT INTO $databaseName.$tableName VALUES (1)") + sql(s"ALTER TABLE $databaseName.$tableName CHANGE COLUMN a TYPE INT") + sql(s"INSERT INTO $databaseName.$tableName VALUES (2)") + + val deltaLog = DeltaLog.forTable(spark, TableIdentifier(tableName, Some(databaseName))) + + checkError( + exception = intercept[DeltaTableFeatureException] { + sql(s"ALTER TABLE $databaseName.$tableName " + + s"DROP FEATURE '${TypeWideningTableFeature.name}'" + ).collect() + }, + errorClass = "DELTA_FEATURE_DROP_WAIT_FOR_RETENTION_PERIOD", + parameters = Map( + "feature" -> TypeWideningTableFeature.name, + "logRetentionPeriodKey" -> DeltaConfigs.LOG_RETENTION.key, + "logRetentionPeriod" -> DeltaConfigs.LOG_RETENTION + .fromMetaData(deltaLog.unsafeVolatileMetadata).toString, + "truncateHistoryLogRetentionPeriod" -> + DeltaConfigs.TABLE_FEATURE_DROP_TRUNCATE_HISTORY_LOG_RETENTION + .fromMetaData(deltaLog.unsafeVolatileMetadata).toString) + ) + } + } + } + // Rewriting the data when dropping the table feature relies on the default row commit version // being set even when row tracking isn't enabled. for(rowTrackingEnabled <- BOOLEAN_DOMAIN) {