From 3bd4a6e2657d7a5c3e9afab0aa443808dc40651a Mon Sep 17 00:00:00 2001 From: Sumeet Varma Date: Fri, 6 Sep 2024 13:44:06 -0700 Subject: [PATCH] Parallel calls --- .../CoordinatedCommitsUtils.java | 76 ------------------- .../commit/CoordinatedCommitsUtils.java | 20 ++++- .../commit/InMemoryCommitCoordinator.scala | 1 + 3 files changed, 17 insertions(+), 80 deletions(-) delete mode 100644 spark/src/main/java/io/delta/dynamodbcommitcoordinator/CoordinatedCommitsUtils.java rename storage/src/{test/scala => main/java}/io/delta/storage/commit/CoordinatedCommitsUtils.java (85%) diff --git a/spark/src/main/java/io/delta/dynamodbcommitcoordinator/CoordinatedCommitsUtils.java b/spark/src/main/java/io/delta/dynamodbcommitcoordinator/CoordinatedCommitsUtils.java deleted file mode 100644 index 492917aaca1..00000000000 --- a/spark/src/main/java/io/delta/dynamodbcommitcoordinator/CoordinatedCommitsUtils.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Copyright (2021) The Delta Lake Project Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.delta.dynamodbcommitcoordinator; - -import io.delta.storage.commit.actions.AbstractMetadata; -import io.delta.storage.commit.UpdatedActions; -import org.apache.hadoop.fs.Path; - -import java.util.UUID; - -public class CoordinatedCommitsUtils { - - private CoordinatedCommitsUtils() {} - - /** The subdirectory in which to store the unbackfilled commit files. */ - final static String COMMIT_SUBDIR = "_commits"; - - /** The configuration key for the coordinated commits owner. */ - private static final String COORDINATED_COMMITS_COORDINATOR_CONF_KEY = - "delta.coordinatedCommits.commitCoordinator-preview"; - - /** - * Creates a new unbackfilled delta file path for the given commit version. - * The path is of the form `tablePath/_delta_log/_commits/00000000000000000001.uuid.json`. - */ - public static Path generateUnbackfilledDeltaFilePath( - Path logPath, - long version) { - String uuid = UUID.randomUUID().toString(); - Path basePath = new Path(logPath, COMMIT_SUBDIR); - return new Path(basePath, String.format("%020d.%s.json", version, uuid)); - } - - /** - * Returns the path to the backfilled delta file for the given commit version. - * The path is of the form `tablePath/_delta_log/00000000000000000001.json`. - */ - public static Path getBackfilledDeltaFilePath( - Path logPath, - Long version) { - return new Path(logPath, String.format("%020d.json", version)); - } - - private static String getCoordinator(AbstractMetadata metadata) { - return metadata - .getConfiguration() - .getOrDefault(COORDINATED_COMMITS_COORDINATOR_CONF_KEY, ""); - } - - /** - * Returns true if the commit is a coordinated commits to filesystem conversion. - */ - public static boolean isCoordinatedCommitsToFSConversion( - Long commitVersion, - UpdatedActions updatedActions) { - boolean oldMetadataHasCoordinatedCommits = - !getCoordinator(updatedActions.getOldMetadata()).isEmpty(); - boolean newMetadataHasCoordinatedCommits = - !getCoordinator(updatedActions.getNewMetadata()).isEmpty(); - return oldMetadataHasCoordinatedCommits && !newMetadataHasCoordinatedCommits && commitVersion > 0; - } -} diff --git a/storage/src/test/scala/io/delta/storage/commit/CoordinatedCommitsUtils.java b/storage/src/main/java/io/delta/storage/commit/CoordinatedCommitsUtils.java similarity index 85% rename from storage/src/test/scala/io/delta/storage/commit/CoordinatedCommitsUtils.java rename to storage/src/main/java/io/delta/storage/commit/CoordinatedCommitsUtils.java index 144b6acec23..d082459816e 100644 --- a/storage/src/test/scala/io/delta/storage/commit/CoordinatedCommitsUtils.java +++ b/storage/src/main/java/io/delta/storage/commit/CoordinatedCommitsUtils.java @@ -33,12 +33,24 @@ public class CoordinatedCommitsUtils { private CoordinatedCommitsUtils() {} /** The subdirectory in which to store the unbackfilled commit files. */ - final static String COMMIT_SUBDIR = "_commits"; + final static public String COMMIT_SUBDIR = "_commits"; - /** The configuration key for the coordinated commits owner. */ - private static final String COORDINATED_COMMITS_COORDINATOR_CONF_KEY = + /** The configuration key for the coordinated commits owner name. */ + private static final String COORDINATED_COMMITS_COORDINATOR_NAME_KEY = "delta.coordinatedCommits.commitCoordinator-preview"; + /** + * Creates a new unbackfilled delta file path for the given commit version. + * The path is of the form `tablePath/_delta_log/_commits/00000000000000000001.uuid.json`. + */ + public static Path generateUnbackfilledDeltaFilePath( + Path logPath, + long version) { + String uuid = UUID.randomUUID().toString(); + Path basePath = new Path(logPath, COMMIT_SUBDIR); + return new Path(basePath, String.format("%020d.%s.json", version, uuid)); + } + /** * Returns the path to the backfilled delta file for the given commit version. * The path is of the form `tablePath/_delta_log/00000000000000000001.json`. @@ -111,7 +123,7 @@ public static Path commitDirPath(Path logPath) { private static String getCoordinator(AbstractMetadata metadata) { String coordinator = metadata .getConfiguration() - .get(COORDINATED_COMMITS_COORDINATOR_CONF_KEY); + .get(COORDINATED_COMMITS_COORDINATOR_NAME_KEY); return coordinator != null ? coordinator : ""; } } diff --git a/storage/src/test/scala/io/delta/storage/commit/InMemoryCommitCoordinator.scala b/storage/src/test/scala/io/delta/storage/commit/InMemoryCommitCoordinator.scala index 5032756bc92..551969a8d4e 100644 --- a/storage/src/test/scala/io/delta/storage/commit/InMemoryCommitCoordinator.scala +++ b/storage/src/test/scala/io/delta/storage/commit/InMemoryCommitCoordinator.scala @@ -22,6 +22,7 @@ import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.locks.ReentrantReadWriteLock import io.delta.storage.LogStore +import io.delta.storage.commit.CoordinatedCommitsUtils import io.delta.storage.commit.actions.AbstractMetadata import io.delta.storage.commit.actions.AbstractProtocol import org.apache.hadoop.conf.Configuration