From eb68cc76661ba18892a0f54df9fb7de947fbb4c1 Mon Sep 17 00:00:00 2001 From: Sumeet Varma Date: Fri, 11 Oct 2024 12:46:49 -0700 Subject: [PATCH] Parallel calls --- .../DynamoDBCommitCoordinatorClient.java | 14 +++-- ...chBackfillingCommitCoordinatorClient.scala | 12 ++-- .../CoordinatedCommitsUtils.scala | 59 ------------------- .../CoordinatedCommitsSuite.scala | 16 ++--- .../commit/CoordinatedCommitsUtils.java | 15 +++-- .../commit/InMemoryCommitCoordinator.scala | 2 +- 6 files changed, 30 insertions(+), 88 deletions(-) diff --git a/spark/src/main/java/io/delta/dynamodbcommitcoordinator/DynamoDBCommitCoordinatorClient.java b/spark/src/main/java/io/delta/dynamodbcommitcoordinator/DynamoDBCommitCoordinatorClient.java index f61a980d10..468da583d2 100644 --- a/spark/src/main/java/io/delta/dynamodbcommitcoordinator/DynamoDBCommitCoordinatorClient.java +++ b/spark/src/main/java/io/delta/dynamodbcommitcoordinator/DynamoDBCommitCoordinatorClient.java @@ -359,17 +359,19 @@ public CommitResponse commit( "Commit version 0 must go via filesystem."); } try { - FileSystem fs = logPath.getFileSystem(hadoopConf); - Path commitPath = - CoordinatedCommitsUtils.generateUnbackfilledDeltaFilePath(logPath, commitVersion); - logStore.write(commitPath, actions, true /* overwrite */, hadoopConf); - FileStatus commitFileStatus = fs.getFileStatus(commitPath); + FileStatus commitFileStatus = CoordinatedCommitsUtils.writeUnbackfilledCommitFile( + logStore, + hadoopConf, + logPath.toString(), + commitVersion, + actions, + UUID.randomUUID().toString()); long inCommitTimestamp = updatedActions.getCommitInfo().getCommitTimestamp(); boolean isCCtoFSConversion = CoordinatedCommitsUtils.isCoordinatedCommitsToFSConversion(commitVersion, updatedActions); LOG.info("Committing version {} with UUID delta file {} to DynamoDB.", - commitVersion, commitPath); + commitVersion, commitFileStatus.getPath()); CommitResponse res = commitToCoordinator( logPath, tableDesc.getTableConf(), diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/AbstractBatchBackfillingCommitCoordinatorClient.scala b/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/AbstractBatchBackfillingCommitCoordinatorClient.scala index a0b8378155..3fa361c0a4 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/AbstractBatchBackfillingCommitCoordinatorClient.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/AbstractBatchBackfillingCommitCoordinatorClient.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.delta.actions.Metadata import org.apache.spark.sql.delta.logging.DeltaLogKeys import org.apache.spark.sql.delta.util.FileNames import io.delta.storage.LogStore -import io.delta.storage.commit.{CommitCoordinatorClient, CommitFailedException => JCommitFailedException, CommitResponse, TableDescriptor, TableIdentifier, UpdatedActions} +import io.delta.storage.commit.{CommitCoordinatorClient, CommitFailedException => JCommitFailedException, CommitResponse, CoordinatedCommitsUtils => JCoordinatedCommitsUtils, TableDescriptor, TableIdentifier, UpdatedActions} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} @@ -70,7 +70,7 @@ trait AbstractBatchBackfillingCommitCoordinatorClient updatedActions: UpdatedActions): CommitResponse = { val logPath = tableDesc.getLogPath val executionObserver = TransactionExecutionObserver.getObserver - val tablePath = CoordinatedCommitsUtils.getTablePath(logPath) + val tablePath = JCoordinatedCommitsUtils.getTablePath(logPath) if (commitVersion == 0) { throw new JCommitFailedException(false, false, "Commit version 0 must go via filesystem.") } @@ -92,8 +92,8 @@ trait AbstractBatchBackfillingCommitCoordinatorClient } // Write new commit file in _commits directory - val fileStatus = CoordinatedCommitsUtils.writeCommitFile( - logStore, hadoopConf, logPath, commitVersion, actions.asScala, generateUUID()) + val fileStatus = JCoordinatedCommitsUtils.writeUnbackfilledCommitFile( + logStore, hadoopConf, logPath.toString, commitVersion, actions, generateUUID()) // Do the actual commit val commitTimestamp = updatedActions.getCommitInfo.getCommitTimestamp @@ -132,9 +132,9 @@ trait AbstractBatchBackfillingCommitCoordinatorClient commitVersion: Long, updatedActions: UpdatedActions): Boolean = { val oldMetadataHasCoordinatedCommits = - CoordinatedCommitsUtils.getCommitCoordinatorName(updatedActions.getOldMetadata).nonEmpty + JCoordinatedCommitsUtils.getCoordinatorName(updatedActions.getOldMetadata).isPresent val newMetadataHasCoordinatedCommits = - CoordinatedCommitsUtils.getCommitCoordinatorName(updatedActions.getNewMetadata).nonEmpty + JCoordinatedCommitsUtils.getCoordinatorName(updatedActions.getNewMetadata).isPresent oldMetadataHasCoordinatedCommits && !newMetadataHasCoordinatedCommits && commitVersion > 0 } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsUtils.scala b/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsUtils.scala index b3c3da26d7..a19c6ef184 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsUtils.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsUtils.scala @@ -149,27 +149,6 @@ object CoordinatedCommitsUtils extends DeltaLogging { } } - /** - * Write a UUID-based commit file for the specified version to the - * table at [[logPath]]. - */ - def writeCommitFile( - logStore: LogStore, - hadoopConf: Configuration, - logPath: Path, - commitVersion: Long, - actions: Iterator[String], - uuid: String): FileStatus = { - val commitPath = FileNames.unbackfilledDeltaFile(logPath, commitVersion, Some(uuid)) - logStore.write(commitPath, actions.asJava, true, hadoopConf) - commitPath.getFileSystem(hadoopConf).getFileStatus(commitPath) - } - - /** - * Get the table path from the provided log path. - */ - def getTablePath(logPath: Path): Path = logPath.getParent - def getCommitCoordinatorClient( spark: SparkSession, deltaLog: DeltaLog, // Used for logging @@ -236,44 +215,6 @@ object CoordinatedCommitsUtils extends DeltaLogging { } } - /** - * Helper method to recover the saved value of `deltaConfig` from `abstractMetadata`. - * If undefined, fall back to alternate keys, returning defaultValue if none match. - */ - private[delta] def fromAbstractMetadataAndDeltaConfig[T]( - abstractMetadata: AbstractMetadata, - deltaConfig: DeltaConfig[T]): T = { - val conf = abstractMetadata.getConfiguration - for (key <- deltaConfig.key +: deltaConfig.alternateKeys) { - Option(conf.get(key)).map { value => return deltaConfig.fromString(value) } - } - deltaConfig.fromString(deltaConfig.defaultValue) - } - - /** - * Get the commit coordinator name from the provided abstract metadata. - */ - def getCommitCoordinatorName(abstractMetadata: AbstractMetadata): Option[String] = { - fromAbstractMetadataAndDeltaConfig( - abstractMetadata, DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_NAME) - } - - /** - * Get the commit coordinator configuration from the provided abstract metadata. - */ - def getCommitCoordinatorConf(abstractMetadata: AbstractMetadata): Map[String, String] = { - fromAbstractMetadataAndDeltaConfig( - abstractMetadata, DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_CONF) - } - - /** - * Get the coordinated commits table configuration from the provided abstract metadata. - */ - def getCoordinatedCommitsTableConf(abstractMetadata: AbstractMetadata): Map[String, String] = { - fromAbstractMetadataAndDeltaConfig( - abstractMetadata, DeltaConfigs.COORDINATED_COMMITS_TABLE_CONF) - } - val TABLE_PROPERTY_CONFS = Seq( DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_NAME, DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_CONF, diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsSuite.scala index d513f4695f..b5b0331765 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsSuite.scala @@ -43,7 +43,7 @@ import org.apache.spark.sql.delta.test.DeltaTestImplicits._ import org.apache.spark.sql.delta.util.{FileNames, JsonUtils} import org.apache.spark.sql.delta.util.FileNames.{CompactedDeltaFile, DeltaFile, UnbackfilledDeltaFile} import io.delta.storage.LogStore -import io.delta.storage.commit.{CommitCoordinatorClient, CommitResponse, GetCommitsResponse => JGetCommitsResponse, TableDescriptor, TableIdentifier, UpdatedActions} +import io.delta.storage.commit.{CommitCoordinatorClient, CommitResponse, CoordinatedCommitsUtils => JCoordinatedCommitsUtils, GetCommitsResponse => JGetCommitsResponse, TableDescriptor, TableIdentifier, UpdatedActions} import io.delta.storage.commit.actions.{AbstractMetadata, AbstractProtocol} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} @@ -79,27 +79,23 @@ class CoordinatedCommitsSuite val m1 = Metadata( configuration = Map(COORDINATED_COMMITS_COORDINATOR_NAME.key -> "string_value") ) - assert(CoordinatedCommitsUtils.fromAbstractMetadataAndDeltaConfig( - m1, COORDINATED_COMMITS_COORDINATOR_NAME) === Some("string_value")) + assert(JCoordinatedCommitsUtils.getCoordinatorName(m1) === Optional.of("string_value")) val m2 = Metadata( configuration = Map(COORDINATED_COMMITS_COORDINATOR_NAME.key -> "") ) - assert(CoordinatedCommitsUtils.fromAbstractMetadataAndDeltaConfig( - m2, COORDINATED_COMMITS_COORDINATOR_NAME) === Some("")) + assert(JCoordinatedCommitsUtils.getCoordinatorName(m2)=== Optional.of("")) val m3 = Metadata( configuration = Map( COORDINATED_COMMITS_COORDINATOR_CONF.key -> """{"key1": "string_value", "key2Int": 2, "key3ComplexStr": "\"hello\""}""") ) - assert(CoordinatedCommitsUtils.fromAbstractMetadataAndDeltaConfig( - m3, COORDINATED_COMMITS_COORDINATOR_CONF) === - Map("key1" -> "string_value", "key2Int" -> "2", "key3ComplexStr" -> "\"hello\"")) + assert(JCoordinatedCommitsUtils.getCoordinatorConf(m3) === + Map("key1" -> "string_value", "key2Int" -> "2", "key3ComplexStr" -> "\"hello\"").asJava) val m4 = Metadata() - assert(CoordinatedCommitsUtils.fromAbstractMetadataAndDeltaConfig( - m4, COORDINATED_COMMITS_TABLE_CONF) === Map.empty) + assert(JCoordinatedCommitsUtils.getCoordinatorConf(m4) === Map.empty.asJava) } diff --git a/storage/src/main/java/io/delta/storage/commit/CoordinatedCommitsUtils.java b/storage/src/main/java/io/delta/storage/commit/CoordinatedCommitsUtils.java index cdd03d5ed7..b26897b0b3 100644 --- a/storage/src/main/java/io/delta/storage/commit/CoordinatedCommitsUtils.java +++ b/storage/src/main/java/io/delta/storage/commit/CoordinatedCommitsUtils.java @@ -98,7 +98,7 @@ public static Path getUnbackfilledDeltaFile( /** * Write a UUID-based commit file for the specified version to the table at logPath. */ - public static FileStatus writeCommitFile( + public static FileStatus writeUnbackfilledCommitFile( LogStore logStore, Configuration hadoopConf, String logPath, @@ -107,11 +107,14 @@ public static FileStatus writeCommitFile( String uuid) throws IOException { Path commitPath = new Path(getUnbackfilledDeltaFile( new Path(logPath), commitVersion, Optional.of(uuid)).toString()); - FileSystem fs = commitPath.getFileSystem(hadoopConf); - if (!fs.exists(commitPath.getParent())) { - fs.mkdirs(commitPath.getParent()); - } - logStore.write(commitPath, actions, false, hadoopConf); +// FileSystem fs = commitPath.getFileSystem(hadoopConf); +// if (!fs.exists(commitPath.getParent())) { +// fs.mkdirs(commitPath.getParent()); +// } + // Do not use Put-If-Absent for Unbackfilled Commits files since we assume that UUID-based + // commit files are globally unique, and so we will never have concurrent writers attempting + // to write the same commit file. + logStore.write(commitPath, actions, true /* overwrite */, hadoopConf); return commitPath.getFileSystem(hadoopConf).getFileStatus(commitPath); } diff --git a/storage/src/test/scala/io/delta/storage/commit/InMemoryCommitCoordinator.scala b/storage/src/test/scala/io/delta/storage/commit/InMemoryCommitCoordinator.scala index 551969a8d4..5e712d7274 100644 --- a/storage/src/test/scala/io/delta/storage/commit/InMemoryCommitCoordinator.scala +++ b/storage/src/test/scala/io/delta/storage/commit/InMemoryCommitCoordinator.scala @@ -122,7 +122,7 @@ class InMemoryCommitCoordinator(val batchSize: Long) extends CommitCoordinatorCl backfillToVersion(logStore, hadoopConf, tableDesc, commitVersion - 1, null) } // Write new commit file in _commits directory - val fileStatus = CoordinatedCommitsUtils.writeCommitFile( + val fileStatus = CoordinatedCommitsUtils.writeUnbackfilledCommitFile( logStore, hadoopConf, logPath.toString, commitVersion, actions, generateUUID()) // Do the actual commit val commitTimestamp = updatedActions.getCommitInfo.getCommitTimestamp