Skip to content

Commit

Permalink
Merge branch 'branch-3.2' into vaccum_doc_2
Browse files Browse the repository at this point in the history
  • Loading branch information
sumeet-db authored May 6, 2024
2 parents 986aa55 + 8cb2e78 commit 6230acc
Show file tree
Hide file tree
Showing 111 changed files with 3,689 additions and 889 deletions.
39 changes: 33 additions & 6 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,33 @@ lazy val kernelApi = (project in file("kernel/kernel-api"))
"com.novocode" % "junit-interface" % "0.11" % "test",
"org.slf4j" % "slf4j-log4j12" % "1.7.36" % "test"
),
// Generate the package object to provide the version information in runtime.
Compile / sourceGenerators += Def.task {
val file = (Compile / sourceManaged).value / "io" / "delta" / "kernel" / "Meta.java"
IO.write(file,
s"""/*
| * Copyright (2024) The Delta Lake Project Authors.
| *
| * Licensed under the Apache License, Version 2.0 (the "License");
| * you may not use this file except in compliance with the License.
| * You may obtain a copy of the License at
| *
| * http://www.apache.org/licenses/LICENSE-2.0
| *
| * Unless required by applicable law or agreed to in writing, software
| * distributed under the License is distributed on an "AS IS" BASIS,
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
| * See the License for the specific language governing permissions and
| * limitations under the License.
| */
|package io.delta.kernel;
|
|public final class Meta {
| public static final String KERNEL_VERSION = "${version.value}";
|}
|""".stripMargin)
Seq(file)
},
javaCheckstyleSettings("kernel/dev/checkstyle.xml"),
// Unidoc settings
unidocSourceFilePatterns := Seq(SourceFilePattern("io/delta/kernel/")),
Expand Down Expand Up @@ -1212,9 +1239,9 @@ lazy val flink = (project in file("connectors/flink"))
IO.write(file,
s"""package io.delta.flink.internal;
|
|final public class Meta {
| public static final String FLINK_VERSION = "${flinkVersion}";
| public static final String CONNECTOR_VERSION = "${version.value}";
|public final class Meta {
| public static final String FLINK_VERSION = "${flinkVersion}";
| public static final String CONNECTOR_VERSION = "${version.value}";
|}
|""".stripMargin)
Seq(file)
Expand Down Expand Up @@ -1308,9 +1335,9 @@ def javaCheckstyleSettings(checkstyleFile: String): Def.SettingsDefinition = {
// and during tests (e.g. build/sbt test)
Seq(
checkstyleConfigLocation := CheckstyleConfigLocation.File(checkstyleFile),
checkstyleSeverityLevel := Some(CheckstyleSeverityLevel.Error),
(Compile / checkstyle) := (Compile / checkstyle).triggeredBy(Compile / compile).value,
(Test / checkstyle) := (Test / checkstyle).triggeredBy(Test / compile).value
checkstyleSeverityLevel := CheckstyleSeverityLevel.Error,
(Compile / compile) := ((Compile / compile) dependsOn (Compile / checkstyle)).value,
(Test / test) := ((Test / test) dependsOn (Test / checkstyle)).value
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ package io.delta.standalone.internal

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import io.delta.kernel.{Table, TableNotFoundException}
import io.delta.kernel.defaults.client.DefaultTableClient
import io.delta.kernel.internal.{SnapshotImpl => SnapshotImplKernel, TableImpl}
import io.delta.kernel.Table
import io.delta.kernel.defaults.engine.DefaultEngine
import io.delta.kernel.exceptions.TableNotFoundException
import io.delta.kernel.internal.{TableImpl, SnapshotImpl => SnapshotImplKernel}
import io.delta.standalone.VersionLog
import io.delta.standalone.actions.{CommitInfo => CommitInfoJ}
import io.delta.standalone.internal.{SnapshotImpl => StandaloneSnapshotImpl, InitialSnapshotImpl => StandaloneInitialSnapshotImpl}
import io.delta.standalone.internal.{InitialSnapshotImpl => StandaloneInitialSnapshotImpl, SnapshotImpl => StandaloneSnapshotImpl}
import io.delta.standalone.internal.util.{Clock, SystemClock}

class KernelOptTxn(kernelDeltaLog: KernelDeltaLogDelegator, kernelSnapshot: KernelSnapshotDelegator)
Expand All @@ -44,7 +44,7 @@ class KernelOptTxn(kernelDeltaLog: KernelDeltaLogDelegator, kernelSnapshot: Kern
* provides features used by flink+startTransaction.
*/
class KernelDeltaLogDelegator(
tableClient: DefaultTableClient,
engine: DefaultEngine,
table: TableImpl,
standaloneDeltaLog: DeltaLogImpl,
hadoopConf: Configuration,
Expand All @@ -68,7 +68,7 @@ class KernelDeltaLogDelegator(
override def update(): StandaloneSnapshotImpl = {
// get latest snapshot via kernel
val kernelSnapshot = try {
table.getLatestSnapshot(tableClient).asInstanceOf[SnapshotImplKernel]
table.getLatestSnapshot(engine).asInstanceOf[SnapshotImplKernel]
} catch {
case e: TableNotFoundException =>
return new StandaloneInitialSnapshotImpl(hadoopConf, logPath, this)
Expand All @@ -82,7 +82,7 @@ class KernelDeltaLogDelegator(
kernelSnapshotWrapper,
hadoopConf,
logPath,
kernelSnapshot.getVersion(tableClient), // note: tableClient isn't used
kernelSnapshot.getVersion(engine), // note: engine isn't used
this,
standaloneDeltaLog
))
Expand Down Expand Up @@ -128,9 +128,9 @@ object KernelDeltaLogDelegator {
// the kernel does not
val standaloneDeltaLog = new DeltaLogImpl(hadoopConf, logPath, dataPathFromLog, clock)
standaloneDeltaLog.ensureLogDirectoryExist()
val tableClient = DefaultTableClient.create(hadoopConf)
val table = Table.forPath(tableClient, dataPath).asInstanceOf[TableImpl]
val engine = DefaultEngine.create(hadoopConf)
val table = Table.forPath(engine, dataPath).asInstanceOf[TableImpl]
// Todo: Potentially we could get the resolved paths out of the table above
new KernelDeltaLogDelegator(tableClient, table, standaloneDeltaLog, hadoopConf, logPath, dataPathFromLog, clock)
new KernelDeltaLogDelegator(engine, table, standaloneDeltaLog, hadoopConf, logPath, dataPathFromLog, clock)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,10 @@
import java.util.List;
import java.util.Optional;

import io.delta.kernel.data.ColumnVector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.delta.kernel.data.ColumnVector;

import io.delta.standalone.DeltaScan;
import io.delta.standalone.actions.AddFile;
import io.delta.standalone.actions.Metadata;
Expand Down Expand Up @@ -73,7 +72,7 @@ public Metadata getMetadata() {
*/
@Override
public long getVersion() {
// WARNING: getVersion in SnapshotImpl currently doesn't use the table client, so we can
// WARNING: getVersion in SnapshotImpl currently doesn't use the engine so we can
// pass null, but if this changes this code could break
return kernelSnapshot.getVersion(null);
}
Expand Down
1 change: 1 addition & 0 deletions docs/source/delta-drop-feature.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ To remove a Delta table feature, you run an `ALTER TABLE <table-name> DROP FEATU
You can drop the following Delta table features:

- `deletionVectors`. See [_](delta-deletion-vectors.md).
- `typeWidening-preview`. See [_](delta-type-widening.md). Type widening is available in preview in <Delta> 3.2.0 and above.
- `v2Checkpoint`. See [V2 Checkpoint Spec](https://github.com/delta-io/delta/blob/master/PROTOCOL.md#v2-spec). Drop support for V2 Checkpoints is available in <Delta> 3.1.0 and above.

You cannot drop other [Delta table features](https://github.com/delta-io/delta/blob/master/PROTOCOL.md#valid-feature-names-in-table-features).
Expand Down
79 changes: 79 additions & 0 deletions docs/source/delta-type-widening.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
---
description: Learn about type widening in Delta.
---

# Delta type widening

.. note:: This feature is available in preview in <Delta> 3.2.

The type widening feature allows changing the type of columns in a Delta table to a wider type. This enables manual type changes using the `ALTER TABLE ALTER COLUMN` command and automatic type migration with schema evolution in `INSERT` and `MERGE INTO` commands.

## Supported type changes

The feature preview in <Delta> 3.2 supports a limited set of type changes:
- `BYTE` to `SHORT` and `INT`.
- `SHORT` to `INT`

Type changes are supported for top-level columns as well as fields nested inside structs, maps and arrays.

## How to enable <Delta> type widening

.. important::

Enabling type widening sets the Delta table feature `typeWidening-preview`, a reader/writer protocol feature. Only clients that support this table feature can read and write to the table once the table feature is set. You must use <Delta> 3.2 or above to read and write to such Delta tables.

You can enable type widening on an existing table by setting the `delta.enableTypeWidening` table property to `true`:

```sql
ALTER TABLE <table_name> SET TBLPROPERTIES ('delta.enableTypeWidening' = 'true')
```

Alternatively, you can enable type widening during table creation:

```sql
CREATE TABLE T(c1 INT) USING DELTA TBLPROPERTIES('delta.enableTypeWidening' = 'true')
```

To disable type widening:

```sql
ALTER TABLE <table_name> SET TBLPROPERTIES ('delta.enableTypeWidening' = 'false')
```

Disabling type widening prevents future type changes from being applied to the table. It doesn't affect type changes previously applied and in particular, it doesn't remove the type widening table feature and doesn't allow clients that don't support the type widening table feature to read and write to the table.

To remove the type widening table feature from the table and allow other clients that don't support this feature to read and write to the table, see [_](#removing-the-type-widening-table-feature).
## Manually applying a type change
When type widening is enabled on a Delta table, you can change the type of a column using the `ALTER COLUMN` command:
```sql
ALTER TABLE <table_name> ALTER COLUMN <col_name> TYPE <new_type>
```
The table schema is updated without rewriting the underlying Parquet files.
## Type changes with automatic schema evolution
Type changes are applied automatically during ingestion with `INSERT` and `MERGE INTO` commands when:
- Type widening is enabled on the target table.
- The command runs with [automatic schema evolution](delta-update.md#merge-schema-evolution) enabled.
- The source column type is wider than the target column type.
- Changing the target column type to the source type is a [supported type change](#supported-type-changes)
When all conditions are satisfied, the target table schema is updated automatically to change the target column type to the source column type.
## Removing the type widening table feature
The type widening feature can be removed from a Delta table using the `DROP FEATURE` command:
```sql
ALTER TABLE <table-name> DROP FEATURE 'typeWidening-preview' [TRUNCATE HISTORY]
```
See [_](delta-drop-feature.md) for more information on dropping Delta table features.
When dropping the type widening feature, the underlying Parquet files are rewritten when necessary to ensure that the column types in the files match the column types in the Delta table schema.
After the type widening feature is removed from the table, Delta clients that don't support the feature can read and write to the table.

.. include:: /shared/replacements.md
1 change: 1 addition & 0 deletions docs/source/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ This is the documentation site for <Delta>.
delta-drop-feature
delta-apidoc
delta-storage
delta-type-widening
delta-uniform
delta-sharing
concurrency-control
Expand Down
2 changes: 2 additions & 0 deletions docs/source/versioning.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ The following <Delta> features break forward compatibility. Features are enabled
V2 Checkpoints, [Delta Lake 3.0.0](https://github.com/delta-io/delta/releases/tag/v3.0.0),[V2 Checkpoint Spec](https://github.com/delta-io/delta/blob/master/PROTOCOL.md#v2-spec)
Domain metadata, [Delta Lake 3.0.0](https://github.com/delta-io/delta/releases/tag/v3.0.0),[Domain Metadata Spec](https://github.com/delta-io/delta/blob/master/PROTOCOL.md#domain-metadata)
Clustering, [Delta Lake 3.1.0](https://github.com/delta-io/delta/releases/tag/v3.1.0),[_](/delta-clustering.md)
Type widening (Preview),[Delta Lake 3.2.0](https://github.com/delta-io/delta/releases/tag/v3.2.0),[_](/delta-type-widening.md)

<a id="table-protocol"></a>

Expand Down Expand Up @@ -107,6 +108,7 @@ The following table shows minimum protocol versions required for <Delta> feature
Iceberg Compatibility V1,7,2,[IcebergCompatV1](https://github.com/delta-io/delta/blob/master/PROTOCOL.md#iceberg-compatibility-v1)
V2 Checkpoints,7,3,[V2 Checkpoint Spec](https://github.com/delta-io/delta/blob/master/PROTOCOL.md#v2-spec)
Vacuum Protocol Check,7,3,[Vacuum Protocol Check Spec](https://github.com/delta-io/delta/blob/master/PROTOCOL.md#vacuum-protocol-check)
Type widening (Preview),7,3,[_](/delta-type-widening.md)

<a id="upgrade"></a>

Expand Down
25 changes: 25 additions & 0 deletions examples/scala/src/main/scala/example/Clustering.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package example

import io.delta.tables.DeltaTable

import org.apache.spark.sql.SparkSession

object Clustering {
Expand Down Expand Up @@ -51,6 +53,29 @@ object Clustering {
// Optimize the table
println("Optimize the table")
deltaSpark.sql(s"OPTIMIZE $tableName")

// Change the clustering columns
println("Change the clustering columns")
deltaSpark.sql(
s"""ALTER TABLE $tableName CLUSTER BY (col2, col1)""".stripMargin)


// Check the clustering columns
println("Check the clustering columns")
deltaSpark.sql(s"DESCRIBE DETAIL $tableName").show(false)
} finally {
// Cleanup
deltaSpark.sql(s"DROP TABLE IF EXISTS $tableName")
}

// DeltaTable clusterBy Scala API
try {
val table = io.delta.tables.DeltaTable.create()
.tableName(tableName)
.addColumn("col1", "INT")
.addColumn("col2", "STRING")
.clusterBy("col1", "col2")
.execute()
} finally {
// Cleanup
deltaSpark.sql(s"DROP TABLE IF EXISTS $tableName")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,15 @@
import org.apache.commons.cli.ParseException;
import org.apache.hadoop.conf.Configuration;

import io.delta.kernel.TableNotFoundException;
import io.delta.kernel.client.TableClient;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.exceptions.TableNotFoundException;
import io.delta.kernel.data.FilteredColumnarBatch;
import io.delta.kernel.data.Row;
import io.delta.kernel.expressions.Predicate;
import io.delta.kernel.types.*;
import io.delta.kernel.utils.CloseableIterator;

import io.delta.kernel.defaults.client.DefaultTableClient;
import io.delta.kernel.defaults.engine.DefaultEngine;

/**
* Base class for reading Delta Lake tables using the Delta Kernel APIs.
Expand All @@ -53,11 +53,11 @@ public abstract class BaseTableReader {
public static final int DEFAULT_LIMIT = 20;

protected final String tablePath;
protected final TableClient tableClient;
protected final Engine engine;

public BaseTableReader(String tablePath) {
this.tablePath = requireNonNull(tablePath);
this.tableClient = DefaultTableClient.create(new Configuration());
this.engine = DefaultEngine.create(new Configuration());
}

/**
Expand Down
Loading

0 comments on commit 6230acc

Please sign in to comment.