Skip to content

Commit

Permalink
only overwrite schema and field id for create table Iceberg conversio…
Browse files Browse the repository at this point in the history
…n txn (delta-io#2820)

<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

The existing Uniform convertion txn has a logic to launch a second txn
to set schema, after the first CREATE TABLE or REPLACE TABLE txn, to set
the correct field ids because Iceberg may reassign those. This behavior
has following flaws:

Firstly iceberg core does NOT reassign field id for REPLACE txn if the
table already exists (in Uniform case it always does). So set schema for
REPLACE TABLE is not necessary.

Secondly, Uniform uses the replace transaction when number of snapshots
to convert exceeds threshold. The replace txn will set last Delta
converted version as -1, which can be confusing or erroneous.

This PR fixes above flaws by NOT set schema for REPLACE txn.

## How was this patch tested?

Manually tested. Unit test will come in separate PR.
  • Loading branch information
lzlfred authored Mar 30, 2024
1 parent 5ae57cc commit 9576d08
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -251,10 +251,18 @@ class IcebergConversionTransaction(
throw new IllegalStateException("Delta does not support partition evolution")
}

if (newMetadata.schema != prevMetadata.schema) {

// As we do not have a second set schema txn for REPLACE_TABLE, we need to set
// the schema as part of this transaction
if (newMetadata.schema != prevMetadata.schema || tableOp == REPLACE_TABLE) {
val differenceStr = SchemaUtils.reportDifferences(prevMetadata.schema, newMetadata.schema)
logInfo(s"Detected Delta schema update for table with name=${newMetadata.name}, " +
s"id=${newMetadata.id}:\n$differenceStr")
if (newMetadata.schema != prevMetadata.schema) {
logInfo(s"Detected Delta schema update for table with name=${newMetadata.name}, " +
s"id=${newMetadata.id}:\n$differenceStr ; Setting new Iceberg schema:\n $icebergSchema")
} else {
logInfo(s"Detected REPLACE_TABLE operation for table with name=${newMetadata.name}." +
s" Setting new Iceberg schema:\n $icebergSchema")
}

txn.setSchema(icebergSchema).commit()

Expand Down Expand Up @@ -301,11 +309,10 @@ class IcebergConversionTransaction(

val nameMapping = NameMappingParser.toJson(MappingUtil.create(icebergSchema))

// hard code dummy delta version as -1 for CREATE_TABLE and REPLACE_TABLE, which will be later
// hard code dummy delta version as -1 for CREATE_TABLE, which will be later
// set to correct version in setSchemaTxn. -1 is chosen because it is less than the smallest
// possible legitimate Delta version which is 0.
val deltaVersion = if (tableOp == CREATE_TABLE || tableOp == REPLACE_TABLE) -1
else postCommitSnapshot.version
val deltaVersion = if (tableOp == CREATE_TABLE) -1 else postCommitSnapshot.version

txn.updateProperties()
.set(IcebergConverter.DELTA_VERSION_PROPERTY, deltaVersion.toString)
Expand All @@ -315,8 +322,8 @@ class IcebergConversionTransaction(

try {
txn.commitTransaction()
if (tableOp == CREATE_TABLE || tableOp == REPLACE_TABLE) {
// Iceberg CREATE_TABLE and REPLACE_TABLE reassigns the field id in schema, which
if (tableOp == CREATE_TABLE) {
// Iceberg CREATE_TABLE reassigns the field id in schema, which
// is overwritten by setting Delta schema with Delta generated field id to ensure
// consistency between field id in Iceberg schema after conversion and field id in
// parquet files written by Delta.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,11 @@ class IcebergConverter(spark: SparkSession)
.foreach { actions =>
runIcebergConversionForActions(icebergTxn, actions, log.dataPath, None)
}

// Always attempt to update table metadata (schema/properties) for REPLACE_TABLE
if (tableOp == REPLACE_TABLE) {
icebergTxn.updateTableMetadata(snapshotToConvert.metadata, snapshotToConvert.metadata)
}
}
icebergTxn.commit()
Some(snapshotToConvert.version, snapshotToConvert.timestamp)
Expand Down

0 comments on commit 9576d08

Please sign in to comment.