Skip to content

Commit

Permalink
Merge branch 'master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
makuche authored Jul 3, 2024
2 parents 3f44083 + 146d497 commit 4fb4b9e
Show file tree
Hide file tree
Showing 153 changed files with 4,796 additions and 894 deletions.
6 changes: 6 additions & 0 deletions .github/workflows/spark_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ jobs:
pyenv install 3.8.18
pyenv global system 3.8.18
pipenv --python 3.8 install
# Update the pip version to 24.0. By default `pyenv.run` installs the latest pip version
# available. From version 24.1, `pip` doesn't allow installing python packages
# with version string containing `-`. In Delta-Spark case, the pypi package generated has
# `-SNAPSHOT` in version (e.g. `3.3.0-SNAPSHOT`) as the version is picked up from
# the`version.sbt` file.
pipenv run pip install pip==24.0 setuptools==69.5.1 wheel==0.43.0
pipenv run pip install pyspark==3.5.0
pipenv run pip install flake8==3.5.0 pypandoc==1.3.3
pipenv run pip install black==23.9.1
Expand Down
6 changes: 4 additions & 2 deletions PROTOCOL.md
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,9 @@ The following is an example `remove` action.
```

### Add CDC File
The `cdc` action is used to add a [file](#change-data-files) containing only the data that was changed as part of the transaction. When change data readers encounter a `cdc` action in a particular Delta table version, they must read the changes made in that version exclusively using the `cdc` files. If a version has no `cdc` action, then the data in `add` and `remove` actions are read as inserted and deleted rows, respectively.
The `cdc` action is used to add a [file](#change-data-files) containing only the data that was changed as part of the transaction. The `cdc` action is allowed to add a [Data File](#data-files) that is also added by an `add` action, when it does not contain any copied rows and the `_change_type` column is filled for all rows.

When change data readers encounter a `cdc` action in a particular Delta table version, they must read the changes made in that version exclusively using the `cdc` files. If a version has no `cdc` action, then the data in `add` and `remove` actions are read as inserted and deleted rows, respectively.

The schema of the `cdc` action is as follows:

Expand Down Expand Up @@ -523,7 +525,7 @@ Specifically, to read the row-level changes made in a version, the following str

##### Note for non-change data readers

In a table with Change Data Feed enabled, the data Parquet files referenced by `add` and `remove` actions are allowed to contain an extra column `_change_type`. This column is not present in the table's schema and will consistently have a `null` value. When accessing these files, readers should disregard this column and only process columns defined within the table's schema.
In a table with Change Data Feed enabled, the data Parquet files referenced by `add` and `remove` actions are allowed to contain an extra column `_change_type`. This column is not present in the table's schema. When accessing these files, readers should disregard this column and only process columns defined within the table's schema.

### Transaction Identifiers
Incremental processing systems (e.g., streaming systems) that track progress using their own application-specific versions need to record what progress has been made, in order to avoid duplicating data in the face of failures and retries during a write.
Expand Down
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ lazy val sharing = (project in file("sharing"))
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-sql" % defaultSparkVersion % "provided",

"io.delta" %% "delta-sharing-client" % "1.0.5",
"io.delta" %% "delta-sharing-client" % "1.1.0",

// Test deps
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
Expand Down Expand Up @@ -722,7 +722,7 @@ lazy val hudi = (project in file("hudi"))
scalaStyleSettings,
releaseSettings,
libraryDependencies ++= Seq(
"org.apache.hudi" % "hudi-java-client" % "0.14.0" % "compile" excludeAll(
"org.apache.hudi" % "hudi-java-client" % "0.15.0" % "compile" excludeAll(
ExclusionRule(organization = "org.apache.hadoop"),
ExclusionRule(organization = "org.apache.zookeeper"),
),
Expand Down
114 changes: 114 additions & 0 deletions docs/source/delta-row-tracking.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
---
description: Learn how <Delta> row tracking allows tracking how rows change across table versions.
orphan: 1
---

# Use row tracking for Delta tables

Row tracking allows <Delta> to track row-level lineage in a <Delta> table. When enabled on a <Delta> table, row tracking adds two new metadata fields to the table:

- **Row IDs** provide rows with an identifier that is unique within the table. A row keeps the same ID whenever it is modified using a `MERGE` or `UPDATE` statement.

- **Row commit versions** record the last version of the table in which the row was modified. A row is assigned a new version whenever it is modified using a `MERGE` or `UPDATE` statement.

.. note:: This feature is available in <Delta> 3.2.0 and above. This feature is in experimental support mode with [_](#limitations).

## Enable row tracking

.. warning:: Tables created with row tracking enabled have the row tracking <Delta> table feature enabled at creation and use <Delta> writer version 7. Table protocol versions cannot be downgraded, and tables with row tracking enabled are not writeable by <Delta> clients that do not support all enabled <Delta> writer protocol table features. See [_](/versioning.md).

You must explicitly enable row tracking using one of the following methods:

- **New table**: Set the table property `delta.enableRowTracking = true` in the `CREATE TABLE` command.

```sql
-- Create an empty table
CREATE TABLE student (id INT, name STRING, age INT) TBLPROPERTIES ('delta.enableRowTracking' = 'true');

-- Using a CTAS statement
CREATE TABLE course_new
TBLPROPERTIES ('delta.enableRowTracking' = 'true')
AS SELECT * FROM course_old;

-- Using a LIKE statement to copy configuration
CREATE TABLE graduate LIKE student;

-- Using a CLONE statement to copy configuration
CREATE TABLE graduate CLONE student;
```

- **Existing table**: Set the table property `'delta.enableRowTracking' = 'true'` in the `ALTER TABLE` command.

```sql
ALTER TABLE grade SET TBLPROPERTIES ('delta.enableRowTracking' = 'true');
```

- **All new tables**: Set the configuration `spark.databricks.delta.properties.defaults.enableRowTracking = true` for the current session in the `SET` command.

.. code-language-tabs::
```sql
SET spark.databricks.delta.properties.defaults.enableRowTracking = true;
```

```python
spark.conf.set("spark.databricks.delta.properties.defaults.enableRowTracking", True)
```

```scala
spark.conf.set("spark.databricks.delta.properties.defaults.enableRowTracking", true)
```

.. important:: Because cloning a <Delta> table creates a separate history, the row ids and row commit versions on cloned tables do not match that of the original table.

.. important:: Enabling row tracking on existing table will automatically assign row ids and row commit versions to all existing rows in the table. This process may cause multiple new versions of the table to be created and may take a long time.

### Row tracking storage

Enabling row tracking may increase the size of the table. <Delta> stores row tracking metadata fields in hidden metadata columns in the data files. Some operations, such as insert-only operations do not use these hidden columns and instead track the row ids and row commit versions using metadata in the <Delta> log. Data reorganization operations such as `OPTIMIZE` and `REORG` cause the row ids and row commit versions to be tracked using the hidden metadata column, even when they were stored using metadata.

## Read row tracking metadata fields

Row tracking adds the following metadata fields that can be accessed when reading a table:

| Column name | Type | Values |
|--------------------------------|------|------------------------------------------------------------------|
| `_metadata.row_id` | Long | The unique identifier of the row. |
| `_metadata.row_commit_version` | Long | The table version at which the row was last inserted or updated. |

The row ids and row commit versions metadata fields are not automatically included when reading the table.
Instead, these metadata fields must be manually selected from the hidden `_metadata` column which is available for all tables in <AS>.

.. code-language-tabs::
```sql
SELECT _metadata.row_id, _metadata.row_commit_version, * FROM table_name;
```

```python
spark.read.table("table_name") \
.select("_metadata.row_id", "_metadata.row_commit_version", "*")
```

```scala
spark.read.table("table_name")
.select("_metadata.row_id", "_metadata.row_commit_version", "*")
```

## Disable row tracking

Row tracking can be disabled to reduce the storage overhead of the metadata fields. After disabling row tracking the metadata fields remain available, but all rows always get assigned a new id and commit version whenever they are touched by an operation.

```sql
ALTER TABLE table_name SET TBLPROPERTIES (delta.enableRowTracking = false);
```

.. important:: Disabling row tracking does not remove the corresponding table feature and does not downgrade the table protocol version.

## Limitations

The following limitations exist:

- The row ids and row commit versions metadata fields cannot be accessed while reading the [Change data feed](/delta/delta-change-data-feed.md).
- Row Tracking can currently only be enabled when creating the table or when the table is empty. Enabling row tracking on a non-empty table is currently not supported.
- Once the Row Tracking feature is added to the table it cannot be removed without recreating the table.

.. include:: /shared/replacements.md
1 change: 1 addition & 0 deletions docs/source/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ This is the documentation site for <Delta>.
delta-clustering
delta-deletion-vectors
delta-drop-feature
delta-row-tracking
delta-apidoc
delta-spark-connect
delta-storage
Expand Down
2 changes: 2 additions & 0 deletions docs/source/versioning.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ The following <Delta> features break forward compatibility. Features are enabled
V2 Checkpoints, [Delta Lake 3.0.0](https://github.com/delta-io/delta/releases/tag/v3.0.0),[V2 Checkpoint Spec](https://github.com/delta-io/delta/blob/master/PROTOCOL.md#v2-spec)
Domain metadata, [Delta Lake 3.0.0](https://github.com/delta-io/delta/releases/tag/v3.0.0),[Domain Metadata Spec](https://github.com/delta-io/delta/blob/master/PROTOCOL.md#domain-metadata)
Clustering, [Delta Lake 3.1.0](https://github.com/delta-io/delta/releases/tag/v3.1.0),[_](/delta-clustering.md)
Row Tracking, [Delta Lake 3.2.0](https://github.com/delta-io/delta/releases/tag/v3.2.0),[_](/delta-row-tracking.md)
Type widening (Preview),[Delta Lake 3.2.0](https://github.com/delta-io/delta/releases/tag/v3.2.0),[_](/delta-type-widening.md)
Coordinated Commits (Preview),[Delta Lake 4.0.0 Preview](https://github.com/delta-io/delta/releases/tag/v4.0.0rc1), [_](/delta-coordinated-commits.md)
Variant Type (Preview), [Delta Lake 4.0.0 Preview](https://github.com/delta-io/delta/releases/tag/v4.0.0rc1),[Variant Type](https://github.com/delta-io/delta/blob/master/protocol_rfcs/variant-type.md)
Expand Down Expand Up @@ -110,6 +111,7 @@ The following table shows minimum protocol versions required for <Delta> feature
Iceberg Compatibility V1,7,2,[IcebergCompatV1](https://github.com/delta-io/delta/blob/master/PROTOCOL.md#iceberg-compatibility-v1)
V2 Checkpoints,7,3,[V2 Checkpoint Spec](https://github.com/delta-io/delta/blob/master/PROTOCOL.md#v2-spec)
Vacuum Protocol Check,7,3,[Vacuum Protocol Check Spec](https://github.com/delta-io/delta/blob/master/PROTOCOL.md#vacuum-protocol-check)
Row Tracking,7,3,[_](/delta-row-tracking.md)
Type widening (Preview),7,3,[_](/delta-type-widening.md)
Coordinated Commits (Preview),7,3,[_](/delta-coordinated-commits.md)
Variant Type (Preview),7,3,[Variant Type](https://github.com/delta-io/delta/blob/master/protocol_rfcs/variant-type.md)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import org.apache.hudi.config.HoodieArchivalConfig
import org.apache.hudi.config.HoodieCleanConfig
import org.apache.hudi.config.HoodieIndexConfig
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.{HoodieException, HoodieRollbackException}
import org.apache.hudi.index.HoodieIndex.IndexType.INMEMORY
import org.apache.hudi.table.HoodieJavaTable
import org.apache.hudi.table.action.clean.CleanPlanner
Expand Down Expand Up @@ -86,9 +87,10 @@ class HudiConversionTransaction(
private var metaClient = providedMetaClient
private val instantTime = convertInstantToCommit(
Instant.ofEpochMilli(postCommitSnapshot.timestamp))
private var writeStatuses: util.List[WriteStatus] = Collections.emptyList[WriteStatus]
private var writeStatuses: util.List[WriteStatus] =
new util.ArrayList[WriteStatus]()
private var partitionToReplacedFileIds: util.Map[String, util.List[String]] =
Collections.emptyMap[String, util.List[String]]
new util.HashMap[String, util.List[String]]()

private val version = postCommitSnapshot.version
/** Tracks if this transaction has already committed. You can only commit once. */
Expand All @@ -101,7 +103,7 @@ class HudiConversionTransaction(
def setCommitFileUpdates(actions: scala.collection.Seq[Action]): Unit = {
// for all removed files, group by partition path and then map to
// the file group ID (name in this case)
partitionToReplacedFileIds = actions
val newPartitionToReplacedFileIds = actions
.map(_.wrap)
.filter(action => action.remove != null)
.map(_.remove)
Expand All @@ -111,21 +113,23 @@ class HudiConversionTransaction(
(partitionPath, path.getName)})
.groupBy(_._1).map(v => (v._1, v._2.map(_._2).asJava))
.asJava
partitionToReplacedFileIds.putAll(newPartitionToReplacedFileIds)
// Convert the AddFiles to write statuses for the commit
writeStatuses = actions
val newWriteStatuses = actions
.map(_.wrap)
.filter(action => action.add != null)
.map(_.add)
.map(add => {
convertAddFile(add, tablePath, instantTime)
})
.asJava
writeStatuses.addAll(newWriteStatuses)
}

def commit(): Unit = {
assert(!committed, "Cannot commit. Transaction already committed.")
val writeConfig = getWriteConfig(hudiSchema, getNumInstantsToRetain, 10, 7*24)
val engineContext: HoodieEngineContext = new HoodieJavaEngineContext(metaClient.getHadoopConf)
val engineContext: HoodieEngineContext = new HoodieJavaEngineContext(metaClient.getStorageConf)
val writeClient = new HoodieJavaWriteClient[AnyRef](engineContext, writeConfig)
try {
writeClient.startCommitWithTime(instantTime, HoodieTimeline.REPLACE_COMMIT_ACTION)
Expand All @@ -151,6 +155,16 @@ class HudiConversionTransaction(
markInstantsAsCleaned(table, writeClient.getConfig, engineContext)
runArchiver(table, writeClient.getConfig, engineContext)
} catch {
case e: HoodieException if e.getMessage == "Failed to update metadata"
|| e.getMessage == "Error getting all file groups in pending clustering"
|| e.getMessage == "Error fetching partition paths from metadata table" =>
logInfo(s"[Thread=${Thread.currentThread().getName}] " +
s"Failed to fully update Hudi metadata table for Delta snapshot version $version. " +
s"This is likely due to a concurrent commit and should not lead to data corruption.")
case e: HoodieRollbackException =>
logInfo(s"[Thread=${Thread.currentThread().getName}] " +
s"Failed to rollback Hudi metadata table for Delta snapshot version $version. " +
s"This is likely due to a concurrent commit and should not lead to data corruption.")
case NonFatal(e) =>
recordHudiCommit(Some(e))
throw e
Expand Down Expand Up @@ -229,7 +243,7 @@ class HudiConversionTransaction(
val cleanerPlan = new HoodieCleanerPlan(earliestInstantToRetain, instantTime,
writeConfig.getCleanerPolicy.name, Collections.emptyMap[String, util.List[String]],
CleanPlanner.LATEST_CLEAN_PLAN_VERSION, cleanInfoPerPartition,
Collections.emptyList[String])
Collections.emptyList[String], Collections.emptyMap[String, String])
// create a clean instant and mark it as requested with the clean plan
val requestedCleanInstant = new HoodieInstant(HoodieInstant.State.REQUESTED,
HoodieTimeline.CLEAN_ACTION, cleanTime)
Expand All @@ -244,7 +258,8 @@ class HudiConversionTransaction(
deletePaths, Collections.emptyList[String], earliestInstant.get.getTimestamp, instantTime)
}).toSeq.asJava
val cleanMetadata =
CleanerUtils.convertCleanMetadata(cleanTime, HudiOption.empty[java.lang.Long], cleanStats)
CleanerUtils.convertCleanMetadata(cleanTime, HudiOption.empty[java.lang.Long], cleanStats,
java.util.Collections.emptyMap[String, String])
// update the metadata table with the clean metadata so the files' metadata are marked for
// deletion
hoodieTableMetadataWriter.performTableServices(HudiOption.empty[String])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieReplaceCommitMetadata}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.delta.actions.Action
Expand Down Expand Up @@ -203,7 +204,7 @@ class HudiConverter(spark: SparkSession)
val log = snapshotToConvert.deltaLog
val metaClient = loadTableMetaClient(snapshotToConvert.deltaLog.dataPath.toString,
tableName, snapshotToConvert.metadata.partitionColumns,
log.newDeltaHadoopConf())
new HadoopStorageConfiguration(log.newDeltaHadoopConf()))
val lastDeltaVersionConverted: Option[Long] = loadLastDeltaVersionConverted(metaClient)
val maxCommitsToConvert =
spark.sessionState.conf.getConf(DeltaSQLConf.HUDI_MAX_COMMITS_TO_CONVERT)
Expand Down Expand Up @@ -297,7 +298,7 @@ class HudiConverter(spark: SparkSession)
def loadLastDeltaVersionConverted(snapshot: Snapshot, table: CatalogTable): Option[Long] = {
val metaClient = loadTableMetaClient(snapshot.deltaLog.dataPath.toString,
Option.apply(table.identifier.table), snapshot.metadata.partitionColumns,
snapshot.deltaLog.newDeltaHadoopConf())
new HadoopStorageConfiguration(snapshot.deltaLog.newDeltaHadoopConf()))
loadLastDeltaVersionConverted(metaClient)
}

Expand Down
Loading

0 comments on commit 4fb4b9e

Please sign in to comment.