From 9576d0823b729348437833745b0d65451d95ff93 Mon Sep 17 00:00:00 2001 From: Fred Storage Liu Date: Fri, 29 Mar 2024 18:51:35 -0700 Subject: [PATCH] only overwrite schema and field id for create table Iceberg conversion txn (#2820) #### Which Delta project/connector is this regarding? - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description The existing Uniform convertion txn has a logic to launch a second txn to set schema, after the first CREATE TABLE or REPLACE TABLE txn, to set the correct field ids because Iceberg may reassign those. This behavior has following flaws: Firstly iceberg core does NOT reassign field id for REPLACE txn if the table already exists (in Uniform case it always does). So set schema for REPLACE TABLE is not necessary. Secondly, Uniform uses the replace transaction when number of snapshots to convert exceeds threshold. The replace txn will set last Delta converted version as -1, which can be confusing or erroneous. This PR fixes above flaws by NOT set schema for REPLACE txn. ## How was this patch tested? Manually tested. Unit test will come in separate PR. --- .../IcebergConversionTransaction.scala | 23 ++++++++++++------- .../icebergShaded/IcebergConverter.scala | 5 ++++ 2 files changed, 20 insertions(+), 8 deletions(-) diff --git a/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConversionTransaction.scala b/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConversionTransaction.scala index 29cbec8d7d3..7745f9e55f6 100644 --- a/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConversionTransaction.scala +++ b/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConversionTransaction.scala @@ -251,10 +251,18 @@ class IcebergConversionTransaction( throw new IllegalStateException("Delta does not support partition evolution") } - if (newMetadata.schema != prevMetadata.schema) { + + // As we do not have a second set schema txn for REPLACE_TABLE, we need to set + // the schema as part of this transaction + if (newMetadata.schema != prevMetadata.schema || tableOp == REPLACE_TABLE) { val differenceStr = SchemaUtils.reportDifferences(prevMetadata.schema, newMetadata.schema) - logInfo(s"Detected Delta schema update for table with name=${newMetadata.name}, " + - s"id=${newMetadata.id}:\n$differenceStr") + if (newMetadata.schema != prevMetadata.schema) { + logInfo(s"Detected Delta schema update for table with name=${newMetadata.name}, " + + s"id=${newMetadata.id}:\n$differenceStr ; Setting new Iceberg schema:\n $icebergSchema") + } else { + logInfo(s"Detected REPLACE_TABLE operation for table with name=${newMetadata.name}." + + s" Setting new Iceberg schema:\n $icebergSchema") + } txn.setSchema(icebergSchema).commit() @@ -301,11 +309,10 @@ class IcebergConversionTransaction( val nameMapping = NameMappingParser.toJson(MappingUtil.create(icebergSchema)) - // hard code dummy delta version as -1 for CREATE_TABLE and REPLACE_TABLE, which will be later + // hard code dummy delta version as -1 for CREATE_TABLE, which will be later // set to correct version in setSchemaTxn. -1 is chosen because it is less than the smallest // possible legitimate Delta version which is 0. - val deltaVersion = if (tableOp == CREATE_TABLE || tableOp == REPLACE_TABLE) -1 - else postCommitSnapshot.version + val deltaVersion = if (tableOp == CREATE_TABLE) -1 else postCommitSnapshot.version txn.updateProperties() .set(IcebergConverter.DELTA_VERSION_PROPERTY, deltaVersion.toString) @@ -315,8 +322,8 @@ class IcebergConversionTransaction( try { txn.commitTransaction() - if (tableOp == CREATE_TABLE || tableOp == REPLACE_TABLE) { - // Iceberg CREATE_TABLE and REPLACE_TABLE reassigns the field id in schema, which + if (tableOp == CREATE_TABLE) { + // Iceberg CREATE_TABLE reassigns the field id in schema, which // is overwritten by setting Delta schema with Delta generated field id to ensure // consistency between field id in Iceberg schema after conversion and field id in // parquet files written by Delta. diff --git a/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConverter.scala b/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConverter.scala index f5ab98a2f32..cadde5dd989 100644 --- a/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConverter.scala +++ b/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConverter.scala @@ -320,6 +320,11 @@ class IcebergConverter(spark: SparkSession) .foreach { actions => runIcebergConversionForActions(icebergTxn, actions, log.dataPath, None) } + + // Always attempt to update table metadata (schema/properties) for REPLACE_TABLE + if (tableOp == REPLACE_TABLE) { + icebergTxn.updateTableMetadata(snapshotToConvert.metadata, snapshotToConvert.metadata) + } } icebergTxn.commit() Some(snapshotToConvert.version, snapshotToConvert.timestamp)