Skip to content

Commit

Permalink
[Kernel] Add coordinated commits interfaces and table properties
Browse files Browse the repository at this point in the history
  • Loading branch information
EstherBear committed Jul 19, 2024
1 parent 4430dc1 commit 9ea06a1
Show file tree
Hide file tree
Showing 16 changed files with 838 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* Copyright (2023) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.kernel.engine;

import java.io.IOException;
import java.util.Map;

import io.delta.kernel.annotation.Evolving;
import io.delta.kernel.data.Row;
import io.delta.kernel.engine.coordinatedcommits.*;
import io.delta.kernel.engine.coordinatedcommits.actions.AbstractMetadata;
import io.delta.kernel.engine.coordinatedcommits.actions.AbstractProtocol;
import io.delta.kernel.utils.CloseableIterator;

/**
* Provides coordinated commits related functionalities to Delta Kernel.
*
* @since 3.3.0
*/
@Evolving
public interface CommitCoordinatorClientHandler {

/**
* API to register the table represented by the given `logPath` at the provided
* currentTableVersion with the commit coordinator this commit coordinator client represents.
* <p>
* This API is called when the table is being converted from a file system table to a
* coordinated-commit table.
* <p>
* When a new coordinated-commit table is being created, the currentTableVersion will be -1 and
* the upgrade commit needs to be a file system commit which will write the backfilled file
* directly.
*
* @param logPath The path to the delta log of the table that should be converted
* @param currentVersion The currentTableVersion is the version of the table just before
* conversion. currentTableVersion + 1 represents the commit that
* will do the conversion. This must be backfilled atomically.
* currentTableVersion + 2 represents the first commit after conversion.
* This will go through the CommitCoordinatorClient and the client is
* free to choose when it wants to backfill this commit.
* @param currentMetadata The metadata of the table at currentTableVersion
* @param currentProtocol The protocol of the table at currentTableVersion
* @return A map of key-value pairs which is issued by the commit coordinator to identify the
* table. This should be stored in the table's metadata. This information needs to be
* passed to the {@link #commit}, {@link #getCommits}, and {@link #backfillToVersion}
* APIs to identify the table.
*/
Map<String, String> registerTable(
String logPath,
long currentVersion,
AbstractMetadata currentMetadata,
AbstractProtocol currentProtocol);

/**
* API to commit the given set of actions to the table represented by logPath at the
* given commitVersion.
*
* @param logPath The path to the delta log of the table that should be committed to.
* @param tableConf The table configuration that was returned by the commit coordinator
* client during registration.
* @param commitVersion The version of the commit that is being committed.
* @param actions The actions that need to be committed.
* @param updatedActions The commit info and any metadata or protocol changes that are made
* as part of this commit.
* @return CommitResponse which contains the file status of the committed commit file. If the
* commit is already backfilled, then the file status could be omitted from the response
* and the client could retrieve the information by itself.
* @throws CommitFailedException if the commit failed.
*/
CommitResponse commit(
String logPath,
Map<String, String> tableConf,
long commitVersion,
CloseableIterator<Row> actions,
UpdatedActions updatedActions) throws CommitFailedException;

/**
* API to get the unbackfilled commits for the table represented by the given logPath.
* Commits older than startVersion or newer than endVersion (if given) are ignored. The
* returned commits are contiguous and in ascending version order.
*
* Note that the first version returned by this API may not be equal to startVersion. This
* happens when some versions starting from startVersion have already been backfilled and so
* the commit coordinator may have stopped tracking them.
*
* The returned latestTableVersion is the maximum commit version ratified by the commit
* coordinator. Note that returning latestTableVersion as -1 is acceptable only if the commit
* coordinator never ratified any version, i.e. it never accepted any unbackfilled commit.
*
* @param tablePath The path to the delta log of the table for which the unbackfilled
* commits should be retrieved.
* @param tableConf The table configuration that was returned by the commit coordinator
* during registration.
* @param startVersion The minimum version of the commit that should be returned. Can be null.
* @param endVersion The maximum version of the commit that should be returned. Can be null.
* @return GetCommitsResponse which has a list of {@link Commit}s and the latestTableVersion
* which is tracked by {@link CommitCoordinatorClientHandler}.
*/
GetCommitsResponse getCommits(
String tablePath,
Map<String, String> tableConf,
Long startVersion,
Long endVersion);

/**
* API to ask the commit coordinator client to backfill all commits up to {@code version}
* and notify the commit coordinator.
*
* If this API returns successfully, that means the backfill must have been completed, although
* the commit coordinator may not be aware of it yet.
*
* @param logPath The path to the delta log of the table that should be backfilled.
* @param tableConf The table configuration that was returned by the commit coordinator
* during registration.
* @param version The version till which the commit coordinator client should backfill.
* @param lastKnownBackfilledVersion The last known version that was backfilled before this API
* was called. If it is None or invalid, then the commit
* coordinator client should backfill from the beginning of
* the table. Can be null.
* @throws IOException if the backfill failed.
*/
void backfillToVersion(
String logPath,
Map<String, String> tableConf,
long version,
Long lastKnownBackfilledVersion) throws IOException;

/**
* Determines whether this CommitCoordinatorClient is semantically equal to another
* CommitCoordinatorClient.
*
* Semantic equality is determined by each CommitCoordinatorClient implementation based on
* whether the two instances can be used interchangeably when invoking any of the
* CommitCoordinatorClient APIs, such as {@link #commit}, {@link #getCommits}, etc. For example,
* both instances might be pointing to the same underlying endpoint.
*/
Boolean semanticEquals(CommitCoordinatorClientHandler other);
}
15 changes: 15 additions & 0 deletions kernel/kernel-api/src/main/java/io/delta/kernel/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package io.delta.kernel.engine;

import java.util.Map;

import io.delta.kernel.annotation.Evolving;

/**
Expand Down Expand Up @@ -55,4 +57,17 @@ public interface Engine {
* @return An implementation of {@link ParquetHandler}.
*/
ParquetHandler getParquetHandler();

/**
* Get the {@link CommitCoordinatorClientHandler} for the underlying commit coordinator client
* given the name and configuration.
*
* @param name Name of the underlying commit coordinator client.
* @param conf Configuration for the underlying commit coordinator client.
* @return An implementation of {@link CommitCoordinatorClientHandler}.
*
* @since 3.3.0
*/
CommitCoordinatorClientHandler getCommitCoordinatorClientHandler(
String name, Map<String, String> conf);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.kernel.engine.coordinatedcommits;

import io.delta.kernel.annotation.Evolving;
import io.delta.kernel.utils.FileStatus;

/**
* Representation of a commit file. It contains the version of the commit, the file status of the
* commit, and the timestamp of the commit. This is used when we want to get the commit information
* from the {@link io.delta.kernel.engine.CommitCoordinatorClientHandler#commit} and
* {@link io.delta.kernel.engine.CommitCoordinatorClientHandler#getCommits} APIs.
*
* @since 3.3.0
*/
@Evolving
public class Commit {

private final long version;

private final FileStatus fileStatus;

private final long commitTimestamp;

public Commit(long version, FileStatus fileStatus, long commitTimestamp) {
this.version = version;
this.fileStatus = fileStatus;
this.commitTimestamp = commitTimestamp;
}

/**
* Get the version of the commit.
*
* @return the version of the commit.
*/
public long getVersion() {
return version;
}

/**
* Get the file status of the commit.
*
* @return the file status of the commit.
*/
public FileStatus getFileStatus() {
return fileStatus;
}

/**
* Get the timestamp of the commit as millis.
*
* @return the timestamp of the commit.
*/
public long getCommitTimestamp() {
return commitTimestamp;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.kernel.engine.coordinatedcommits;

import io.delta.kernel.annotation.Evolving;

/**
* Exception raised by {@link io.delta.kernel.engine.CommitCoordinatorClientHandler#commit}
*
* <pre>
* | retryable | conflict | meaning |
* | no | no | something bad happened (e.g. auth failure) |
* | no | yes | permanent transaction conflict (e.g. multi-table commit failed) |
* | yes | no | transient error (e.g. network hiccup) |
* | yes | yes | physical conflict (allowed to rebase and retry) |
* </pre>
*
* @since 3.3.0
*/
@Evolving
public class CommitFailedException extends Exception {

private final boolean retryable;

private final boolean conflict;

private final String message;

public CommitFailedException(boolean retryable, boolean conflict, String message) {
this.retryable = retryable;
this.conflict = conflict;
this.message = message;
}

/**
* Returns whether the commit can be retried.
*
* @return whether the commit can be retried.
*/
public boolean getRetryable() {
return retryable;
}

/**
* Returns whether the commit failed due to a conflict.
*
* @return whether the commit failed due to a conflict.
*/
public boolean getConflict() {
return conflict;
}

/**
* Returns the message of the exception.
*
* @return the message of the exception.
*/
public String getMessage() {
return message;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.kernel.engine.coordinatedcommits;

import io.delta.kernel.annotation.Evolving;

/**
* Response container for {@link io.delta.kernel.engine.CommitCoordinatorClientHandler#commit}.
*
* @since 3.3.0
*/
@Evolving
public class CommitResponse {

private final Commit commit;

public CommitResponse(Commit commit) {
this.commit = commit;
}

/**
* Get the commit object.
*
* @return the commit object.
*/
public Commit getCommit() {
return commit;
}
}
Loading

0 comments on commit 9ea06a1

Please sign in to comment.