Skip to content

Commit

Permalink
Parallel calls
Browse files Browse the repository at this point in the history
  • Loading branch information
sumeet-db committed May 3, 2024
1 parent fe88cc3 commit 91b7476
Show file tree
Hide file tree
Showing 14 changed files with 104 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2109,7 +2109,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite
override def getCommits(
logPath: Path,
managedCommitTableConf: Map[String, String],
startVersion: Long,
startVersion: Option[Long],
endVersion: Option[Long]): GetCommitsResponse =
GetCommitsResponse(Seq.empty, -1)

Expand All @@ -2118,8 +2118,8 @@ trait OptimisticTransactionImpl extends TransactionalWrite
hadoopConf: Configuration,
logPath: Path,
managedCommitTableConf: Map[String, String],
startVersion: Long,
endVersion: Option[Long]): Unit = {}
version: Long,
lastKnownBackfilledVersion: Option[Long] = None): Unit = {}

/**
* [[FileSystemBasedCommitOwnerClient]] is supposed to be treated as a singleton object for a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,8 @@ class Snapshot(
if (minUnbackfilledVersion <= version) {
val hadoopConf = deltaLog.newDeltaHadoopConf()
tableCommitOwnerClient.backfillToVersion(
startVersion = minUnbackfilledVersion, endVersion = Some(version))
version,
lastKnownBackfilledVersion = Some(minUnbackfilledVersion - 1))
val fs = deltaLog.logPath.getFileSystem(hadoopConf)
val expectedBackfilledDeltaFile = FileNames.unsafeDeltaFile(deltaLog.logPath, version)
if (!fs.exists(expectedBackfilledDeltaFile)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ trait SnapshotManagement { self: DeltaLog =>
val threadPool = SnapshotManagement.commitOwnerGetCommitsThreadPool
def getCommitsTask(async: Boolean): GetCommitsResponse = {
recordFrameProfile("DeltaLog", s"CommitOwnerClient.getCommits.async=$async") {
tableCommitOwnerClient.getCommits(startVersion, endVersion = versionToLoad)
tableCommitOwnerClient.getCommits(Some(startVersion), endVersion = versionToLoad)
}
}
val unbackfilledCommitsResponseFuture =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,14 @@ trait AbstractBatchBackfillingCommitOwnerClient extends CommitOwnerClient with L
val fs = logPath.getFileSystem(hadoopConf)
if (batchSize <= 1) {
// Backfill until `commitVersion - 1`
logInfo(s"Making sure commits are backfilled until $commitVersion version for" +
logInfo(s"Making sure commits are backfilled until ${commitVersion - 1} version for" +
s" table ${tablePath.toString}")
backfillToVersion(logStore, hadoopConf, logPath, managedCommitTableConf)
backfillToVersion(
logStore,
hadoopConf,
logPath,
managedCommitTableConf,
commitVersion - 1)
}

// Write new commit file in _commits directory
Expand Down Expand Up @@ -104,7 +109,12 @@ trait AbstractBatchBackfillingCommitOwnerClient extends CommitOwnerClient with L
} else if (commitVersion % batchSize == 0 || mcToFsConversion) {
logInfo(s"Making sure commits are backfilled till $commitVersion version for" +
s"table ${tablePath.toString}")
backfillToVersion(logStore, hadoopConf, logPath, managedCommitTableConf)
backfillToVersion(
logStore,
hadoopConf,
logPath,
managedCommitTableConf,
commitVersion)
}
logInfo(s"Commit $commitVersion done successfully on table $tablePath")
commitResponse
Expand All @@ -127,9 +137,15 @@ trait AbstractBatchBackfillingCommitOwnerClient extends CommitOwnerClient with L
hadoopConf: Configuration,
logPath: Path,
managedCommitTableConf: Map[String, String],
startVersion: Long = 0,
endVersionOpt: Option[Long] = None): Unit = {
getCommits(logPath, managedCommitTableConf, startVersion, endVersionOpt)
version: Long,
lastKnownBackfilledVersionOpt: Option[Long] = None): Unit = {
// Confirm the last backfilled version by checking the backfilled delta file's existence.
val validLastKnownBackfilledVersionOpt = lastKnownBackfilledVersionOpt.filter { version =>
val fs = logPath.getFileSystem(hadoopConf)
fs.exists(FileNames.unsafeDeltaFile(logPath, version))
}
val startVersionOpt = validLastKnownBackfilledVersionOpt.map(_ + 1)
getCommits(logPath, managedCommitTableConf, startVersionOpt, Some(version))
.commits
.foreach { commit =>
backfill(logStore, hadoopConf, logPath, commit.version, commit.fileStatus)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,22 +119,28 @@ trait CommitOwnerClient {
def getCommits(
logPath: Path,
managedCommitTableConf: Map[String, String],
startVersion: Long,
startVersion: Option[Long] = None,
endVersion: Option[Long] = None): GetCommitsResponse

/**
* API to ask the Commit-Owner to backfill all commits >= 'startVersion' and <= `endVersion`.
* API to ask the Commit-Owner to backfill all commits > `lastKnownBackfilledVersion` and
* <= `endVersion`.
*
* If this API returns successfully, that means the backfill must have been completed, although
* the Commit-Owner may not be aware of it yet.
*
* @param version The version till which the Commit-Owner should backfill.
* @param lastKnownBackfilledVersion The last known version that was backfilled by Commit-Owner
* before this API was called. If it's None or invalid, then the
* Commit-Owner should backfill from the beginning of the table.
*/
def backfillToVersion(
logStore: LogStore,
hadoopConf: Configuration,
logPath: Path,
managedCommitTableConf: Map[String, String],
startVersion: Long,
endVersion: Option[Long]): Unit
version: Long,
lastKnownBackfilledVersion: Option[Long]): Unit

/**
* Determines whether this [[CommitOwnerClient]] is semantically equal to another
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,19 +139,21 @@ class InMemoryCommitOwner(val batchSize: Long)
override def getCommits(
logPath: Path,
managedCommitTableConf: Map[String, String],
startVersion: Long,
startVersion: Option[Long],
endVersion: Option[Long]): GetCommitsResponse = {
withReadLock[GetCommitsResponse](logPath) {
val tableData = perTableMap.get(logPath)
val effectiveStartVersion = startVersion.getOrElse(0L)
// Calculate the end version for the range, or use the last key if endVersion is not provided
val effectiveEndVersion =
endVersion.getOrElse(tableData.commitsMap.lastOption.map(_._1).getOrElse(startVersion))
val commitsInRange = tableData.commitsMap.range(startVersion, effectiveEndVersion + 1)
val effectiveEndVersion = endVersion.getOrElse(
tableData.commitsMap.lastOption.map(_._1).getOrElse(effectiveStartVersion))
val commitsInRange = tableData.commitsMap.range(
effectiveStartVersion, effectiveEndVersion + 1)
GetCommitsResponse(commitsInRange.values.toSeq, tableData.lastRatifiedCommitVersion)
}
}

override protected[delta] def registerBackfill(
override protected[sql] def registerBackfill(
logPath: Path,
backfilledVersion: Long): Unit = {
withWriteLock(logPath) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,16 @@ case class TableCommitOwnerClient(
}

def getCommits(
startVersion: Long,
startVersion: Option[Long] = None,
endVersion: Option[Long] = None): GetCommitsResponse = {
commitOwnerClient.getCommits(logPath, tableConf, startVersion, endVersion)
}

def backfillToVersion(
startVersion: Long,
endVersion: Option[Long]): Unit = {
version: Long,
lastKnownBackfilledVersion: Option[Long] = None): Unit = {
commitOwnerClient.backfillToVersion(
logStore, hadoopConf, logPath, tableConf, startVersion, endVersion)
logStore, hadoopConf, logPath, tableConf, version, lastKnownBackfilledVersion)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class DeltaLogMinorCompactionSuite extends QueryTest
endVersion: Long): Unit = {
val deltaLog = DeltaLog.forTable(spark, tablePath)
deltaLog.update().tableCommitOwnerClientOpt.foreach { tableCommitOwnerClient =>
tableCommitOwnerClient.backfillToVersion(startVersion = 0, Some(endVersion))
tableCommitOwnerClient.backfillToVersion(endVersion)
}
val logReplay = new InMemoryLogReplay(
minFileRetentionTimestamp = 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -976,7 +976,7 @@ class InCommitTimestampWithManagedCommitSuite
val commitFileProvider = DeltaCommitFileProvider(deltaLog.update())
val unbackfilledCommits =
tableCommitOwnerClient
.getCommits(1)
.getCommits(Some(1))
.commits
.map { commit => DeltaHistoryManager.Commit(commit.version, commit.commitTimestamp)}
val commits = (Seq(commit0) ++ unbackfilledCommits).toList
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ case class ConcurrentBackfillCommitOwnerClient(
override def getCommits(
logPath: Path,
managedCommitTableConf: Map[String, String],
startVersion: Long,
startVersion: Option[Long],
endVersion: Option[Long]): GetCommitsResponse = {
if (ConcurrentBackfillCommitOwnerClient.beginConcurrentBackfills) {
CountDownLatchLogStore.listFromCalled.await()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.sql.test.SharedSparkSession
class CommitOwnerClientSuite extends QueryTest with DeltaSQLTestUtils with SharedSparkSession
with DeltaSQLCommandTest {

trait TestCommitOwnerClientBase extends CommitOwnerClient {
private trait TestCommitOwnerClientBase extends CommitOwnerClient {
override def commit(
logStore: LogStore,
hadoopConf: Configuration,
Expand All @@ -47,22 +47,22 @@ class CommitOwnerClientSuite extends QueryTest with DeltaSQLTestUtils with Share
override def getCommits(
logPath: Path,
managedCommitTableConf: Map[String, String],
startVersion: Long,
startVersion: Option[Long],
endVersion: Option[Long] = None): GetCommitsResponse = GetCommitsResponse(Seq.empty, -1)

override def backfillToVersion(
logStore: LogStore,
hadoopConf: Configuration,
logPath: Path,
managedCommitTableConf: Map[String, String],
startVersion: Long,
endVersion: Option[Long]): Unit = {}
version: Long,
lastKnownBackfilledVersion: Option[Long]): Unit = {}

override def semanticEquals(other: CommitOwnerClient): Boolean = this == other
}

class TestCommitOwnerClient1 extends TestCommitOwnerClientBase
class TestCommitOwnerClient2 extends TestCommitOwnerClientBase
private class TestCommitOwnerClient1 extends TestCommitOwnerClientBase
private class TestCommitOwnerClient2 extends TestCommitOwnerClientBase

override def beforeEach(): Unit = {
super.beforeEach()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,36 +159,36 @@ class InMemoryCommitOwnerSuite extends QueryTest
val tcs = TableCommitOwnerClient(cs, log, Map.empty[String, String])

cs.registerTable(logPath, currentVersion = -1L, Metadata(), Protocol(1, 1))
assert(tcs.getCommits(0) == GetCommitsResponse(Seq.empty, -1))
assert(tcs.getCommits() == GetCommitsResponse(Seq.empty, -1))

// Commit 0 must be done by file-system
val e = intercept[CommitFailedException] { commit(version = 0, timestamp = 0, tcs) }
assert(e.getMessage === "Commit version 0 must go via filesystem.")
store.write(FileNames.unsafeDeltaFile(logPath, 0), Iterator("0", "0"), overwrite = false)
// Commit 0 doesn't go through commit-owner. So commit-owner is not aware of it in getCommits
// response.
assert(tcs.getCommits(0) == GetCommitsResponse(Seq.empty, -1))
assert(tcs.getCommits() == GetCommitsResponse(Seq.empty, -1))
assertBackfilled(0, logPath, Some(0))

val c1 = commit(1, 1, tcs)
val c2 = commit(2, 2, tcs)
assert(tcs.getCommits(0).commits.takeRight(2) == Seq(c1, c2))
assert(tcs.getCommits().commits.takeRight(2) == Seq(c1, c2))

// All 3 commits are backfilled since batchSize == 3
val c3 = commit(3, 3, tcs)
assert(tcs.getCommits(0) == GetCommitsResponse(Seq.empty, 3))
assert(tcs.getCommits() == GetCommitsResponse(Seq.empty, 3))
(1 to 3).foreach(i => assertBackfilled(i, logPath, Some(i)))

// Test that startVersion and endVersion are respected in getCommits
val c4 = commit(4, 4, tcs)
val c5 = commit(5, 5, tcs)
assert(tcs.getCommits(4) == GetCommitsResponse(Seq(c4, c5), 5))
assert(tcs.getCommits(4, Some(4)) == GetCommitsResponse(Seq(c4), 5))
assert(tcs.getCommits(5) == GetCommitsResponse(Seq(c5), 5))
assert(tcs.getCommits(Some(4)) == GetCommitsResponse(Seq(c4, c5), 5))
assert(tcs.getCommits(Some(4), Some(4)) == GetCommitsResponse(Seq(c4), 5))
assert(tcs.getCommits(Some(5)) == GetCommitsResponse(Seq(c5), 5))

// Commit [4, 6] are backfilled since batchSize == 3
val c6 = commit(6, 6, tcs)
assert(tcs.getCommits(0) == GetCommitsResponse(Seq.empty, 6))
assert(tcs.getCommits() == GetCommitsResponse(Seq.empty, 6))
(4 to 6).foreach(i => assertBackfilled(i, logPath, Some(i)))
assertInvariants(logPath, tcs.commitOwnerClient.asInstanceOf[InMemoryCommitOwner])
}
Expand All @@ -205,13 +205,13 @@ class InMemoryCommitOwnerSuite extends QueryTest
val e = intercept[CommitFailedException] { commit(version = 0, timestamp = 0, tcs) }
assert(e.getMessage === "Commit version 0 must go via filesystem.")
store.write(FileNames.unsafeDeltaFile(logPath, 0), Iterator("0", "0"), overwrite = false)
assert(tcs.getCommits(0) == GetCommitsResponse(Seq.empty, -1))
assert(tcs.getCommits() == GetCommitsResponse(Seq.empty, -1))
assertBackfilled(version = 0, logPath, Some(0L))

// Test that all commits are immediately backfilled
(1 to 3).foreach { version =>
commit(version, version, tcs)
assert(tcs.getCommits(0) == GetCommitsResponse(Seq.empty, version))
assert(tcs.getCommits() == GetCommitsResponse(Seq.empty, version))
assertBackfilled(version, logPath, Some(version))
}

Expand Down Expand Up @@ -248,7 +248,7 @@ class InMemoryCommitOwnerSuite extends QueryTest

// Verify that the conflict-checker still works even when everything has been backfilled
commit(5, 5, tcs)
assert(tcs.getCommits(0) == GetCommitsResponse(Seq.empty, 5))
assert(tcs.getCommits() == GetCommitsResponse(Seq.empty, 5))
assertCommitFail(5, 6, retryable = true, commit(5, 5, tcs))
assertCommitFail(7, 6, retryable = false, commit(7, 7, tcs))

Expand Down Expand Up @@ -306,7 +306,7 @@ class InMemoryCommitOwnerSuite extends QueryTest
override def run(): Unit = {
var currentWriterCommits = 0
while (currentWriterCommits < numberOfCommitsPerWriter) {
val nextVersion = tcs.getCommits(0).latestTableVersion + 1
val nextVersion = tcs.getCommits().latestTableVersion + 1
try {
val currentTimestamp = runningTimestamp.getAndIncrement()
val commitResponse = commit(nextVersion, currentTimestamp, tcs)
Expand Down
Loading

0 comments on commit 91b7476

Please sign in to comment.