diff --git a/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConversionTransaction.scala b/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConversionTransaction.scala index 29cbec8d7d3..7745f9e55f6 100644 --- a/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConversionTransaction.scala +++ b/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConversionTransaction.scala @@ -251,10 +251,18 @@ class IcebergConversionTransaction( throw new IllegalStateException("Delta does not support partition evolution") } - if (newMetadata.schema != prevMetadata.schema) { + + // As we do not have a second set schema txn for REPLACE_TABLE, we need to set + // the schema as part of this transaction + if (newMetadata.schema != prevMetadata.schema || tableOp == REPLACE_TABLE) { val differenceStr = SchemaUtils.reportDifferences(prevMetadata.schema, newMetadata.schema) - logInfo(s"Detected Delta schema update for table with name=${newMetadata.name}, " + - s"id=${newMetadata.id}:\n$differenceStr") + if (newMetadata.schema != prevMetadata.schema) { + logInfo(s"Detected Delta schema update for table with name=${newMetadata.name}, " + + s"id=${newMetadata.id}:\n$differenceStr ; Setting new Iceberg schema:\n $icebergSchema") + } else { + logInfo(s"Detected REPLACE_TABLE operation for table with name=${newMetadata.name}." + + s" Setting new Iceberg schema:\n $icebergSchema") + } txn.setSchema(icebergSchema).commit() @@ -301,11 +309,10 @@ class IcebergConversionTransaction( val nameMapping = NameMappingParser.toJson(MappingUtil.create(icebergSchema)) - // hard code dummy delta version as -1 for CREATE_TABLE and REPLACE_TABLE, which will be later + // hard code dummy delta version as -1 for CREATE_TABLE, which will be later // set to correct version in setSchemaTxn. -1 is chosen because it is less than the smallest // possible legitimate Delta version which is 0. - val deltaVersion = if (tableOp == CREATE_TABLE || tableOp == REPLACE_TABLE) -1 - else postCommitSnapshot.version + val deltaVersion = if (tableOp == CREATE_TABLE) -1 else postCommitSnapshot.version txn.updateProperties() .set(IcebergConverter.DELTA_VERSION_PROPERTY, deltaVersion.toString) @@ -315,8 +322,8 @@ class IcebergConversionTransaction( try { txn.commitTransaction() - if (tableOp == CREATE_TABLE || tableOp == REPLACE_TABLE) { - // Iceberg CREATE_TABLE and REPLACE_TABLE reassigns the field id in schema, which + if (tableOp == CREATE_TABLE) { + // Iceberg CREATE_TABLE reassigns the field id in schema, which // is overwritten by setting Delta schema with Delta generated field id to ensure // consistency between field id in Iceberg schema after conversion and field id in // parquet files written by Delta. diff --git a/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConverter.scala b/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConverter.scala index f5ab98a2f32..cadde5dd989 100644 --- a/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConverter.scala +++ b/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConverter.scala @@ -320,6 +320,11 @@ class IcebergConverter(spark: SparkSession) .foreach { actions => runIcebergConversionForActions(icebergTxn, actions, log.dataPath, None) } + + // Always attempt to update table metadata (schema/properties) for REPLACE_TABLE + if (tableOp == REPLACE_TABLE) { + icebergTxn.updateTableMetadata(snapshotToConvert.metadata, snapshotToConvert.metadata) + } } icebergTxn.commit() Some(snapshotToConvert.version, snapshotToConvert.timestamp)