Skip to content

Commit

Permalink
Parallel calls
Browse files Browse the repository at this point in the history
  • Loading branch information
sumeet-db committed Oct 14, 2024
1 parent 5d2a275 commit 04a36d0
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -359,17 +359,19 @@ public CommitResponse commit(
"Commit version 0 must go via filesystem.");
}
try {
FileSystem fs = logPath.getFileSystem(hadoopConf);
Path commitPath =
CoordinatedCommitsUtils.generateUnbackfilledDeltaFilePath(logPath, commitVersion);
logStore.write(commitPath, actions, true /* overwrite */, hadoopConf);
FileStatus commitFileStatus = fs.getFileStatus(commitPath);
FileStatus commitFileStatus = CoordinatedCommitsUtils.writeUnbackfilledCommitFile(
logStore,
hadoopConf,
logPath.toString(),
commitVersion,
actions,
UUID.randomUUID().toString());
long inCommitTimestamp = updatedActions.getCommitInfo().getCommitTimestamp();
boolean isCCtoFSConversion =
CoordinatedCommitsUtils.isCoordinatedCommitsToFSConversion(commitVersion, updatedActions);

LOG.info("Committing version {} with UUID delta file {} to DynamoDB.",
commitVersion, commitPath);
commitVersion, commitFileStatus.getPath());
CommitResponse res = commitToCoordinator(
logPath,
tableDesc.getTableConf(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.sql.delta.actions.Metadata
import org.apache.spark.sql.delta.logging.DeltaLogKeys
import org.apache.spark.sql.delta.util.FileNames
import io.delta.storage.LogStore
import io.delta.storage.commit.{CommitCoordinatorClient, CommitFailedException => JCommitFailedException, CommitResponse, TableDescriptor, TableIdentifier, UpdatedActions}
import io.delta.storage.commit.{CommitCoordinatorClient, CommitFailedException => JCommitFailedException, CommitResponse, CoordinatedCommitsUtils => JCoordinatedCommitsUtils, TableDescriptor, TableIdentifier, UpdatedActions}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}

Expand Down Expand Up @@ -70,7 +70,7 @@ trait AbstractBatchBackfillingCommitCoordinatorClient
updatedActions: UpdatedActions): CommitResponse = {
val logPath = tableDesc.getLogPath
val executionObserver = TransactionExecutionObserver.getObserver
val tablePath = CoordinatedCommitsUtils.getTablePath(logPath)
val tablePath = JCoordinatedCommitsUtils.getTablePath(logPath)
if (commitVersion == 0) {
throw new JCommitFailedException(false, false, "Commit version 0 must go via filesystem.")
}
Expand All @@ -92,8 +92,8 @@ trait AbstractBatchBackfillingCommitCoordinatorClient
}

// Write new commit file in _commits directory
val fileStatus = CoordinatedCommitsUtils.writeCommitFile(
logStore, hadoopConf, logPath, commitVersion, actions.asScala, generateUUID())
val fileStatus = JCoordinatedCommitsUtils.writeUnbackfilledCommitFile(
logStore, hadoopConf, logPath.toString, commitVersion, actions, generateUUID())

// Do the actual commit
val commitTimestamp = updatedActions.getCommitInfo.getCommitTimestamp
Expand Down Expand Up @@ -132,9 +132,9 @@ trait AbstractBatchBackfillingCommitCoordinatorClient
commitVersion: Long,
updatedActions: UpdatedActions): Boolean = {
val oldMetadataHasCoordinatedCommits =
CoordinatedCommitsUtils.getCommitCoordinatorName(updatedActions.getOldMetadata).nonEmpty
JCoordinatedCommitsUtils.getCoordinatorName(updatedActions.getOldMetadata).isPresent
val newMetadataHasCoordinatedCommits =
CoordinatedCommitsUtils.getCommitCoordinatorName(updatedActions.getNewMetadata).nonEmpty
JCoordinatedCommitsUtils.getCoordinatorName(updatedActions.getNewMetadata).isPresent
oldMetadataHasCoordinatedCommits && !newMetadataHasCoordinatedCommits && commitVersion > 0
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,27 +149,6 @@ object CoordinatedCommitsUtils extends DeltaLogging {
}
}

/**
* Write a UUID-based commit file for the specified version to the
* table at [[logPath]].
*/
def writeCommitFile(
logStore: LogStore,
hadoopConf: Configuration,
logPath: Path,
commitVersion: Long,
actions: Iterator[String],
uuid: String): FileStatus = {
val commitPath = FileNames.unbackfilledDeltaFile(logPath, commitVersion, Some(uuid))
logStore.write(commitPath, actions.asJava, true, hadoopConf)
commitPath.getFileSystem(hadoopConf).getFileStatus(commitPath)
}

/**
* Get the table path from the provided log path.
*/
def getTablePath(logPath: Path): Path = logPath.getParent

def getCommitCoordinatorClient(
spark: SparkSession,
deltaLog: DeltaLog, // Used for logging
Expand Down Expand Up @@ -236,44 +215,6 @@ object CoordinatedCommitsUtils extends DeltaLogging {
}
}

/**
* Helper method to recover the saved value of `deltaConfig` from `abstractMetadata`.
* If undefined, fall back to alternate keys, returning defaultValue if none match.
*/
private[delta] def fromAbstractMetadataAndDeltaConfig[T](
abstractMetadata: AbstractMetadata,
deltaConfig: DeltaConfig[T]): T = {
val conf = abstractMetadata.getConfiguration
for (key <- deltaConfig.key +: deltaConfig.alternateKeys) {
Option(conf.get(key)).map { value => return deltaConfig.fromString(value) }
}
deltaConfig.fromString(deltaConfig.defaultValue)
}

/**
* Get the commit coordinator name from the provided abstract metadata.
*/
def getCommitCoordinatorName(abstractMetadata: AbstractMetadata): Option[String] = {
fromAbstractMetadataAndDeltaConfig(
abstractMetadata, DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_NAME)
}

/**
* Get the commit coordinator configuration from the provided abstract metadata.
*/
def getCommitCoordinatorConf(abstractMetadata: AbstractMetadata): Map[String, String] = {
fromAbstractMetadataAndDeltaConfig(
abstractMetadata, DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_CONF)
}

/**
* Get the coordinated commits table configuration from the provided abstract metadata.
*/
def getCoordinatedCommitsTableConf(abstractMetadata: AbstractMetadata): Map[String, String] = {
fromAbstractMetadataAndDeltaConfig(
abstractMetadata, DeltaConfigs.COORDINATED_COMMITS_TABLE_CONF)
}

val TABLE_PROPERTY_CONFS = Seq(
DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_NAME,
DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_CONF,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,10 @@ class DeltaLogSuite extends QueryTest

// Store these for later usage
val actions = deltaLog.snapshot.stateDS.collect()
val addAction = actions.filter(_.add != null).filter(_.add.path == "path").head
val removeAction = addAction.remove
val txn = deltaLog.startTransaction()
txn.commit(removeAction :: Nil, DeltaOperations.ManualUpdate)
val commitTimestamp = deltaLog.snapshot.logSegment.lastCommitFileModificationTimestamp

checkAnswer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import org.apache.spark.sql.delta.test.DeltaTestImplicits._
import org.apache.spark.sql.delta.util.{FileNames, JsonUtils}
import org.apache.spark.sql.delta.util.FileNames.{CompactedDeltaFile, DeltaFile, UnbackfilledDeltaFile}
import io.delta.storage.LogStore
import io.delta.storage.commit.{CommitCoordinatorClient, CommitResponse, GetCommitsResponse => JGetCommitsResponse, TableDescriptor, TableIdentifier, UpdatedActions}
import io.delta.storage.commit.{CommitCoordinatorClient, CommitResponse, CoordinatedCommitsUtils => JCoordinatedCommitsUtils, GetCommitsResponse => JGetCommitsResponse, TableDescriptor, TableIdentifier, UpdatedActions}
import io.delta.storage.commit.actions.{AbstractMetadata, AbstractProtocol}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
Expand Down Expand Up @@ -79,27 +79,23 @@ class CoordinatedCommitsSuite
val m1 = Metadata(
configuration = Map(COORDINATED_COMMITS_COORDINATOR_NAME.key -> "string_value")
)
assert(CoordinatedCommitsUtils.fromAbstractMetadataAndDeltaConfig(
m1, COORDINATED_COMMITS_COORDINATOR_NAME) === Some("string_value"))
assert(JCoordinatedCommitsUtils.getCoordinatorName(m1) === Optional.of("string_value"))

val m2 = Metadata(
configuration = Map(COORDINATED_COMMITS_COORDINATOR_NAME.key -> "")
)
assert(CoordinatedCommitsUtils.fromAbstractMetadataAndDeltaConfig(
m2, COORDINATED_COMMITS_COORDINATOR_NAME) === Some(""))
assert(JCoordinatedCommitsUtils.getCoordinatorName(m2)=== Optional.of(""))

val m3 = Metadata(
configuration = Map(
COORDINATED_COMMITS_COORDINATOR_CONF.key ->
"""{"key1": "string_value", "key2Int": 2, "key3ComplexStr": "\"hello\""}""")
)
assert(CoordinatedCommitsUtils.fromAbstractMetadataAndDeltaConfig(
m3, COORDINATED_COMMITS_COORDINATOR_CONF) ===
Map("key1" -> "string_value", "key2Int" -> "2", "key3ComplexStr" -> "\"hello\""))
assert(JCoordinatedCommitsUtils.getCoordinatorConf(m3) ===
Map("key1" -> "string_value", "key2Int" -> "2", "key3ComplexStr" -> "\"hello\"").asJava)

val m4 = Metadata()
assert(CoordinatedCommitsUtils.fromAbstractMetadataAndDeltaConfig(
m4, COORDINATED_COMMITS_TABLE_CONF) === Map.empty)
assert(JCoordinatedCommitsUtils.getCoordinatorConf(m4) === Map.empty.asJava)
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,18 @@

package io.delta.storage.commit;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.delta.storage.LogStore;
import io.delta.storage.commit.actions.AbstractMetadata;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;

Expand All @@ -39,6 +42,15 @@ private CoordinatedCommitsUtils() {}
private static final String COORDINATED_COMMITS_COORDINATOR_NAME_KEY =
"delta.coordinatedCommits.commitCoordinator-preview";

/** The configuration key for the coordinated commits owner conf. */
private static final String COORDINATED_COMMITS_COORDINATOR_CONF_KEY =
"delta.coordinatedCommits.commitCoordinatorConf-preview";


/** The configuration key for the coordinated commits table conf. */
private static final String COORDINATED_COMMITS_TABLE_CONF_KEY =
"delta.coordinatedCommits.tableConf-preview";

/**
* Creates a new unbackfilled delta file path for the given commit version.
* The path is of the form `tablePath/_delta_log/_commits/00000000000000000001.uuid.json`.
Expand Down Expand Up @@ -71,7 +83,8 @@ public static boolean isCoordinatedCommitsToFSConversion(
getCoordinatorName(updatedActions.getOldMetadata()).isPresent();
boolean newMetadataHasCoordinatedCommits =
getCoordinatorName(updatedActions.getNewMetadata()).isPresent();
return oldMetadataHasCoordinatedCommits && !newMetadataHasCoordinatedCommits && commitVersion > 0;
return oldMetadataHasCoordinatedCommits && !newMetadataHasCoordinatedCommits &&
commitVersion > 0;
}

/**
Expand All @@ -98,7 +111,7 @@ public static Path getUnbackfilledDeltaFile(
/**
* Write a UUID-based commit file for the specified version to the table at logPath.
*/
public static FileStatus writeCommitFile(
public static FileStatus writeUnbackfilledCommitFile(
LogStore logStore,
Configuration hadoopConf,
String logPath,
Expand All @@ -107,11 +120,10 @@ public static FileStatus writeCommitFile(
String uuid) throws IOException {
Path commitPath = new Path(getUnbackfilledDeltaFile(
new Path(logPath), commitVersion, Optional.of(uuid)).toString());
FileSystem fs = commitPath.getFileSystem(hadoopConf);
if (!fs.exists(commitPath.getParent())) {
fs.mkdirs(commitPath.getParent());
}
logStore.write(commitPath, actions, false, hadoopConf);
// Do not use Put-If-Absent for Unbackfilled Commits files since we assume that UUID-based
// commit files are globally unique, and so we will never have concurrent writers attempting
// to write the same commit file.
logStore.write(commitPath, actions, true /* overwrite */, hadoopConf);
return commitPath.getFileSystem(hadoopConf).getFileStatus(commitPath);
}

Expand All @@ -133,4 +145,33 @@ public static Optional<String> getCoordinatorName(AbstractMetadata metadata) {
.get(COORDINATED_COMMITS_COORDINATOR_NAME_KEY);
return Optional.ofNullable(coordinator);
}

private static Map<String, String> parseConfFromMetadata(
AbstractMetadata abstractMetadata,
String confKey) {
String conf = abstractMetadata
.getConfiguration()
.getOrDefault(confKey, "{}");
try {
return new ObjectMapper().readValue(
conf,
new TypeReference<Map<String, String>>() {});
} catch (JsonProcessingException e) {
throw new RuntimeException("Failed to parse conf: ", e);
}
}

/**
* Get the coordinated commits owner configuration from the provided abstract metadata.
*/
public static Map<String, String> getCoordinatorConf(AbstractMetadata abstractMetadata) {
return parseConfFromMetadata(abstractMetadata, COORDINATED_COMMITS_COORDINATOR_CONF_KEY);
}

/**
* Get the coordinated commits table configuration from the provided abstract metadata.
*/
public static Map<String, String> getTableConf(AbstractMetadata abstractMetadata) {
return parseConfFromMetadata(abstractMetadata, COORDINATED_COMMITS_TABLE_CONF_KEY);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ class InMemoryCommitCoordinator(val batchSize: Long) extends CommitCoordinatorCl
backfillToVersion(logStore, hadoopConf, tableDesc, commitVersion - 1, null)
}
// Write new commit file in _commits directory
val fileStatus = CoordinatedCommitsUtils.writeCommitFile(
val fileStatus = CoordinatedCommitsUtils.writeUnbackfilledCommitFile(
logStore, hadoopConf, logPath.toString, commitVersion, actions, generateUUID())
// Do the actual commit
val commitTimestamp = updatedActions.getCommitInfo.getCommitTimestamp
Expand Down

0 comments on commit 04a36d0

Please sign in to comment.