Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into cm-removal-check-…
Browse files Browse the repository at this point in the history
…invalid-col-names
  • Loading branch information
sabir-akhadov committed Feb 22, 2024
2 parents b8ce436 + 98fac57 commit 0283140
Show file tree
Hide file tree
Showing 52 changed files with 2,725 additions and 764 deletions.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{"commitInfo":{"timestamp":1708108025792,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"200","numOutputBytes":"28420"},"engineInfo":"Apache-Spark/3.5.0 Delta-Lake/3.2.0-SNAPSHOT","txnId":"59b6eac4-ecda-4440-8268-a18cac973ba1"}}
{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"ByteType\",\"type\":\"byte\",\"nullable\":true,\"metadata\":{}},{\"name\":\"ShortType\",\"type\":\"short\",\"nullable\":true,\"metadata\":{}},{\"name\":\"IntegerType\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"LongType\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"FloatType\",\"type\":\"float\",\"nullable\":true,\"metadata\":{}},{\"name\":\"DoubleType\",\"type\":\"double\",\"nullable\":true,\"metadata\":{}},{\"name\":\"decimal\",\"type\":\"decimal(10,2)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"BooleanType\",\"type\":\"boolean\",\"nullable\":true,\"metadata\":{}},{\"name\":\"StringType\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"BinaryType\",\"type\":\"binary\",\"nullable\":true,\"metadata\":{}},{\"name\":\"DateType\",\"type\":\"date\",\"nullable\":true,\"metadata\":{}},{\"name\":\"TimestampType\",\"type\":\"timestamp\",\"nullable\":true,\"metadata\":{}},{\"name\":\"nested_struct\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"aa\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"ac\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"aca\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}},{\"name\":\"array_of_prims\",\"type\":{\"type\":\"array\",\"elementType\":\"integer\",\"containsNull\":true},\"nullable\":true,\"metadata\":{}},{\"name\":\"array_of_arrays\",\"type\":{\"type\":\"array\",\"elementType\":{\"type\":\"array\",\"elementType\":\"integer\",\"containsNull\":true},\"containsNull\":true},\"nullable\":true,\"metadata\":{}},{\"name\":\"array_of_structs\",\"type\":{\"type\":\"array\",\"elementType\":{\"type\":\"struct\",\"fields\":[{\"name\":\"ab\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}]},\"containsNull\":true},\"nullable\":true,\"metadata\":{}},{\"name\":\"map_of_prims\",\"type\":{\"type\":\"map\",\"keyType\":\"integer\",\"valueType\":\"long\",\"valueContainsNull\":true},\"nullable\":true,\"metadata\":{}},{\"name\":\"map_of_rows\",\"type\":{\"type\":\"map\",\"keyType\":\"integer\",\"valueType\":{\"type\":\"struct\",\"fields\":[{\"name\":\"ab\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}]},\"valueContainsNull\":true},\"nullable\":true,\"metadata\":{}},{\"name\":\"map_of_arrays\",\"type\":{\"type\":\"map\",\"keyType\":\"long\",\"valueType\":{\"type\":\"array\",\"elementType\":\"integer\",\"containsNull\":true},\"valueContainsNull\":true},\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1708108023726}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"add":{"path":"part-00000-4b3cf091-231f-4d77-a657-3ba905ccae36-c000.snappy.parquet","partitionValues":{},"size":28420,"modificationTime":1708108025717,"dataChange":true,"stats":"{\"numRecords\":200,\"minValues\":{\"ByteType\":-128,\"ShortType\":1,\"IntegerType\":1,\"LongType\":2,\"FloatType\":0.234,\"DoubleType\":234234.23,\"decimal\":123.52,\"StringType\":\"1\",\"DateType\":\"1970-01-01\",\"TimestampType\":\"1970-01-01T06:30:23.523Z\",\"nested_struct\":{\"aa\":\"1\",\"ac\":{\"aca\":1}}},\"maxValues\":{\"ByteType\":127,\"ShortType\":199,\"IntegerType\":199,\"LongType\":200,\"FloatType\":46.566,\"DoubleType\":4.661261177E7,\"decimal\":24580.48,\"StringType\":\"99\",\"DateType\":\"1970-02-16\",\"TimestampType\":\"1970-02-23T22:48:01.077Z\",\"nested_struct\":{\"aa\":\"99\",\"ac\":{\"aca\":199}}},\"nullCount\":{\"ByteType\":3,\"ShortType\":4,\"IntegerType\":9,\"LongType\":8,\"FloatType\":8,\"DoubleType\":4,\"decimal\":3,\"BooleanType\":3,\"StringType\":4,\"BinaryType\":4,\"DateType\":4,\"TimestampType\":4,\"nested_struct\":{\"aa\":14,\"ac\":{\"aca\":22}},\"array_of_prims\":8,\"array_of_arrays\":25,\"array_of_structs\":0,\"map_of_prims\":8,\"map_of_rows\":0,\"map_of_arrays\":7}}"}}
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -1171,7 +1171,7 @@ class GoldenTables extends QueryTest with SharedSparkSession {
val df = spark.createDataFrame(spark.sparkContext.parallelize(rows), schema)
df.repartition(1)
.write
.format("parquet")
.format("delta")
.mode("append")
.save(tablePath)
}
Expand Down
87 changes: 87 additions & 0 deletions examples/scala/src/main/scala/example/IcebergCompatV2.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package example

import java.io.{File, IOException}
import java.net.ServerSocket

import org.apache.commons.io.FileUtils

import org.apache.spark.sql.SparkSession
/**
* This example relies on an external Hive metastore (HMS) instance to run.
*
* A standalone HMS can be created using the following docker command.
* ************************************************************
* docker run -d -p 9083:9083 --env SERVICE_NAME=metastore \
* --name metastore-standalone apache/hive:4.0.0-beta-1
* ************************************************************
* The URL of this standalone HMS is thrift://localhost:9083
*
* By default this hms will use `/opt/hive/data/warehouse` as warehouse path.
* Please make sure this path exists or change it prior to running the example.
*/
object IcebergCompatV2 {

def main(args: Array[String]): Unit = {
// Update this according to the metastore config
val port = 9083
val warehousePath = "/opt/hive/data/warehouse/"

if (!UniForm.hmsReady(port)) {
print("HMS not available. Exit.")
return
}

val testTableName = "uniform_table3"
FileUtils.deleteDirectory(new File(s"${warehousePath}${testTableName}"))

val deltaSpark = SparkSession
.builder()
.appName("UniForm-Delta")
.master("local[*]")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.config("hive.metastore.uris", s"thrift://localhost:$port")
.config("spark.sql.catalogImplementation", "hive")
.getOrCreate()

deltaSpark.sql(s"DROP TABLE IF EXISTS ${testTableName}")
deltaSpark.sql(
s"""CREATE TABLE `${testTableName}`
| (id INT, ts TIMESTAMP, array_data array<int>, map_data map<int, int>)
| using DELTA""".stripMargin)
deltaSpark.sql(
s"""
|INSERT INTO `$testTableName` (id, ts, array_data, map_data)
| VALUES (123, '2024-01-01 00:00:00', array(2, 3, 4, 5), map(3, 6, 8, 7))""".stripMargin)
deltaSpark.sql(
s"""REORG TABLE `$testTableName` APPLY (UPGRADE UNIFORM
| (ICEBERG_COMPAT_VERSION = 2))""".stripMargin)

val icebergSpark = SparkSession.builder()
.master("local[*]")
.appName("UniForm-Iceberg")
.config("spark.sql.extensions",
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")
.config("hive.metastore.uris", s"thrift://localhost:$port")
.config("spark.sql.catalogImplementation", "hive")
.getOrCreate()

icebergSpark.sql(s"SELECT * FROM ${testTableName}").show()
}
}
1 change: 1 addition & 0 deletions examples/scala/src/main/scala/example/UniForm.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ object UniForm {
s"""CREATE TABLE `${testTableName}` (col1 INT) using DELTA
|TBLPROPERTIES (
| 'delta.columnMapping.mode' = 'name',
| 'delta.enableIcebergCompatV1' = 'true',
| 'delta.universalFormat.enabledFormats' = 'iceberg'
|)""".stripMargin)
deltaSpark.sql(s"INSERT INTO `$testTableName` VALUES (123)")
Expand Down
10 changes: 10 additions & 0 deletions kernel/kernel-api/src/main/java/io/delta/kernel/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,14 @@ Snapshot getLatestSnapshot(TableClient tableClient)
* @return the table path
*/
String getPath();

/**
* Get the snapshot at the given {@code versionId}.
*
* @param tableClient {@link TableClient} instance to use in Delta Kernel.
* @param versionId snapshot version to retrieve
* @return an instance of {@link Snapshot}
*/
Snapshot getSnapshotAtVersion(TableClient tableClient, long versionId)
throws TableNotFoundException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,12 @@
* <li>Since version: 3.1.0</li>
* </ul>
* </li>
* <li>Name: <code>IS_NULL</code>
* <ul>
* <li>SQL semantic: <code>expr IS NULL</code></li>
* <li>Since version: 3.2.0</li>
* </ul>
* </li>
* </ol>
*
* @since 3.0.0
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.internal;

public final class DeltaErrors {
private DeltaErrors() {}

// TODO update to be user-facing exception with future exception framework
// (see delta-io/delta#2231) & document in method docs as needed (Table::getSnapshotAtVersion)
public static RuntimeException nonReconstructableStateException(
String tablePath, long version) {
String message = String.format(
"%s: Unable to reconstruct state at version %s as the transaction log has been " +
"truncated due to manual deletion or the log retention policy and checkpoint " +
"retention policy.",
tablePath,
version);
return new RuntimeException(message);
}

// TODO update to be user-facing exception with future exception framework
// (see delta-io/delta#2231) & document in method docs as needed (Table::getSnapshotAtVersion)
public static RuntimeException nonExistentVersionException(
String tablePath, long versionToLoad, long latestVersion) {
String message = String.format(
"%s: Trying to load a non-existent version %s. The latest version available is %s",
tablePath,
versionToLoad,
latestVersion);
return new RuntimeException(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,10 @@ public Snapshot getLatestSnapshot(TableClient tableClient) throws TableNotFoundE
public String getPath() {
return tablePath;
}

@Override
public Snapshot getSnapshotAtVersion(TableClient tableClient, long versionId)
throws TableNotFoundException {
return snapshotManager.getSnapshotAt(tableClient, versionId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.delta.kernel.utils.CloseableIterator;
import io.delta.kernel.utils.FileStatus;

import io.delta.kernel.internal.DeltaErrors;
import io.delta.kernel.internal.SnapshotImpl;
import io.delta.kernel.internal.checkpoints.CheckpointInstance;
import io.delta.kernel.internal.checkpoints.CheckpointMetaData;
Expand Down Expand Up @@ -110,6 +111,28 @@ public Snapshot buildLatestSnapshot(TableClient tableClient)
return getSnapshotAtInit(tableClient);
}

/**
* Construct the snapshot for the given table at the version provided.
*
* @param tableClient Instance of {@link TableClient} to use.
* @param version The snapshot version to construct
* @return a {@link Snapshot} of the table at version {@code version}
* @throws TableNotFoundException
*/
public Snapshot getSnapshotAt(
TableClient tableClient,
Long version) throws TableNotFoundException {

Optional<LogSegment> logSegmentOpt = getLogSegmentForVersion(
tableClient,
Optional.empty(), /* startCheckpointOpt */
Optional.of(version) /* versionToLoadOpt */);

return logSegmentOpt
.map(logSegment -> createSnapshot(logSegment, tableClient))
.orElseThrow(() -> new TableNotFoundException(dataPath.toString()));
}

////////////////////
// Helper Methods //
////////////////////
Expand Down Expand Up @@ -182,17 +205,27 @@ private Optional<CloseableIterator<FileStatus>> listFromOrNone(
* Returns the delta files and checkpoint files starting from the given `startVersion`.
* `versionToLoad` is an optional parameter to set the max bound. It's usually used to load a
* table snapshot for a specific version.
* If no delta or checkpoint files exist below the versionToLoad and at least one delta file
* exists, throws an exception that the state is not reconstructable.
*
* @param startVersion the version to start. Inclusive.
* @param versionToLoad the optional parameter to set the max version we should return.
* Inclusive.
* Inclusive. Must be >= startVersion if provided.
* @return Some array of files found (possibly empty, if no usable commit files are present), or
* None if the listing returned no files at all.
*/
protected final Optional<List<FileStatus>> listDeltaAndCheckpointFiles(
TableClient tableClient,
long startVersion,
Optional<Long> versionToLoad) {
versionToLoad.ifPresent(v ->
checkArgument(
v >= startVersion,
String.format(
"versionToLoad=%s provided is less than startVersion=%s",
v,
startVersion)
));
logger.debug("startVersion: {}, versionToLoad: {}", startVersion, versionToLoad);

return listFromOrNone(
Expand Down Expand Up @@ -221,6 +254,13 @@ protected final Optional<List<FileStatus>> listDeltaAndCheckpointFiles(
.orElse(true);

if (!versionWithinRange) {
// If we haven't taken any files yet and the first file we see is greater
// than the versionToLoad then the versionToLoad is not reconstructable
// from the existing logs
if (output.isEmpty()) {
throw DeltaErrors.nonReconstructableStateException(
dataPath.toString(), versionToLoad.get());
}
break;
}

Expand Down Expand Up @@ -314,18 +354,17 @@ public Optional<LogSegment> getLogSegmentForVersion(
TableClient tableClient,
Optional<Long> startCheckpoint,
Optional<Long> versionToLoad) {
// List from the starting checkpoint. If a checkpoint doesn't exist, this will still return
// deltaVersion=0.
// TODO when implementing time-travel don't list from startCheckpoint if
// startCheckpoint > versionToLoad
// Only use startCheckpoint if it is <= versionToLoad
Optional<Long> startCheckpointToUse = startCheckpoint
.filter(v -> !versionToLoad.isPresent() || v <= versionToLoad.get());
final Optional<List<FileStatus>> newFiles =
listDeltaAndCheckpointFiles(
tableClient,
startCheckpoint.orElse(0L),
startCheckpointToUse.orElse(0L), // List from 0 if no starting checkpoint
versionToLoad);
return getLogSegmentForVersion(
tableClient,
startCheckpoint,
startCheckpointToUse,
versionToLoad,
newFiles);
}
Expand Down Expand Up @@ -369,7 +408,7 @@ protected Optional<LogSegment> getLogSegmentForVersion(
// We can't construct a snapshot because the directory contained no usable commit
// files... but we can't return Optional.empty either, because it was not truly empty.
throw new RuntimeException(
String.format("Empty directory: %s", logPath)
String.format("No delta files found in the directory: %s", logPath)
);
} else if (newFiles.isEmpty()) {
// The directory may be deleted and recreated and we may have stale state in our
Expand Down Expand Up @@ -475,6 +514,21 @@ protected Optional<LogSegment> getLogSegmentForVersion(
Arrays.toString(deltaVersionsAfterCheckpoint.toArray())
));

final long newVersion = deltaVersionsAfterCheckpoint.isEmpty() ?
newCheckpointOpt.get().version : deltaVersionsAfterCheckpoint.getLast();

// In the case where `deltasAfterCheckpoint` is empty, `deltas` should still not be empty,
// they may just be before the checkpoint version unless we have a bug in log cleanup.
if (deltas.isEmpty()) {
throw new IllegalStateException(
String.format("Could not find any delta files for version %s", newVersion)
);
}

versionToLoadOpt.filter(v -> v != newVersion).ifPresent(v -> {
throw DeltaErrors.nonExistentVersionException(dataPath.toString(), v, newVersion);
});

// We may just be getting a checkpoint file after the filtering
if (!deltaVersionsAfterCheckpoint.isEmpty()) {
if (deltaVersionsAfterCheckpoint.getFirst() != newCheckpointVersion + 1) {
Expand All @@ -492,26 +546,6 @@ protected Optional<LogSegment> getLogSegmentForVersion(
versionToLoadOpt);
}

// TODO: double check newCheckpointOpt.get() won't error out

final long newVersion = deltaVersionsAfterCheckpoint.isEmpty() ?
newCheckpointOpt.get().version : deltaVersionsAfterCheckpoint.getLast();

// In the case where `deltasAfterCheckpoint` is empty, `deltas` should still not be empty,
// they may just be before the checkpoint version unless we have a bug in log cleanup.
if (deltas.isEmpty()) {
throw new IllegalStateException(
String.format("Could not find any delta files for version %s", newVersion)
);
}

if (versionToLoadOpt.map(v -> v != newVersion).orElse(false)) {
throw new IllegalStateException(
String.format("Trying to load a non-existent version %s",
versionToLoadOpt.get())
);
}

final long lastCommitTimestamp = deltas.get(deltas.size() - 1).getModificationTime();

final List<FileStatus> newCheckpointFiles = newCheckpointOpt.map(newCheckpoint -> {
Expand Down
Loading

0 comments on commit 0283140

Please sign in to comment.