diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/PreDowngradeTableFeatureCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/PreDowngradeTableFeatureCommand.scala index 4e4774aef51..be468a60e53 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/PreDowngradeTableFeatureCommand.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/PreDowngradeTableFeatureCommand.scala @@ -230,22 +230,15 @@ case class TypeWideningPreDowngradeCommand(table: DeltaTableV2) val numFilesToRewrite = TypeWidening.numFilesRequiringRewrite(table.initialSnapshot) if (numFilesToRewrite == 0L) return 0L - // Get the table Id and catalog from the delta table to build a ResolvedTable plan for the reorg - // command. + // Wrap `table` in a ResolvedTable that can be passed to DeltaReorgTableCommand. The catalog & + // table ID won't be used by DeltaReorgTableCommand. import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ - val tableId = table.spark - .sessionState - .sqlParser - .parseTableIdentifier(table.name).nameParts.asIdentifier val catalog = table.spark.sessionState.catalogManager.currentCatalog.asTableCatalog + val tableId = Seq(table.name()).asIdentifier val reorg = DeltaReorgTableCommand( - ResolvedTable.create( - catalog, - tableId, - table - ), - DeltaReorgTableSpec(DeltaReorgTableMode.REWRITE_TYPE_WIDENING, None) + target = ResolvedTable.create(catalog, tableId, table), + reorgTableSpec = DeltaReorgTableSpec(DeltaReorgTableMode.REWRITE_TYPE_WIDENING, None) )(Nil) reorg.run(table.spark) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala index d9d9266f222..adebbd9b9db 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala @@ -31,6 +31,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.sql.{AnalysisException, DataFrame, Encoder, QueryTest, Row} +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Expression, Literal} import org.apache.spark.sql.catalyst.plans.logical.LocalRelation import org.apache.spark.sql.errors.QueryErrorsBase @@ -1011,6 +1012,38 @@ trait DeltaTypeWideningTableFeatureTests { checkAnswer(readDeltaTable(tempPath), Seq.empty) } + test("drop feature using sql with multipart identifier") { + withTempDatabase { databaseName => + val tableName = "test_table" + withTable(tableName) { + sql(s"CREATE TABLE $databaseName.$tableName (a byte) USING DELTA " + + s"TBLPROPERTIES ('${DeltaConfigs.ENABLE_TYPE_WIDENING.key}' = 'true')") + sql(s"INSERT INTO $databaseName.$tableName VALUES (1)") + sql(s"ALTER TABLE $databaseName.$tableName CHANGE COLUMN a TYPE INT") + sql(s"INSERT INTO $databaseName.$tableName VALUES (2)") + + val deltaLog = DeltaLog.forTable(spark, TableIdentifier(tableName, Some(databaseName))) + + checkError( + exception = intercept[DeltaTableFeatureException] { + sql(s"ALTER TABLE $databaseName.$tableName " + + s"DROP FEATURE '${TypeWideningTableFeature.name}'" + ).collect() + }, + errorClass = "DELTA_FEATURE_DROP_WAIT_FOR_RETENTION_PERIOD", + parameters = Map( + "feature" -> TypeWideningTableFeature.name, + "logRetentionPeriodKey" -> DeltaConfigs.LOG_RETENTION.key, + "logRetentionPeriod" -> DeltaConfigs.LOG_RETENTION + .fromMetaData(deltaLog.unsafeVolatileMetadata).toString, + "truncateHistoryLogRetentionPeriod" -> + DeltaConfigs.TABLE_FEATURE_DROP_TRUNCATE_HISTORY_LOG_RETENTION + .fromMetaData(deltaLog.unsafeVolatileMetadata).toString) + ) + } + } + } + // Rewriting the data when dropping the table feature relies on the default row commit version // being set even when row tracking isn't enabled. for(rowTrackingEnabled <- BOOLEAN_DOMAIN) {