Skip to content

Commit

Permalink
Parallel calls
Browse files Browse the repository at this point in the history
  • Loading branch information
sumeet-db committed Mar 20, 2024
1 parent a584fe1 commit b88e270
Show file tree
Hide file tree
Showing 3 changed files with 180 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package org.apache.spark.sql.delta

import java.io.FileNotFoundException
import java.util.Objects
import java.util.concurrent.Future
import java.util.concurrent.{CompletableFuture, Future}
import java.util.concurrent.locks.ReentrantLock

import scala.collection.mutable
Expand All @@ -29,7 +29,7 @@ import scala.util.control.NonFatal

import com.databricks.spark.util.TagDefinitions.TAG_ASYNC
import org.apache.spark.sql.delta.actions.Metadata
import org.apache.spark.sql.delta.managedcommit.{Commit, CommitStore}
import org.apache.spark.sql.delta.managedcommit.{Commit, CommitStore, GetCommitsResponse}
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.util.FileNames._
import org.apache.spark.sql.delta.util.JsonUtils
Expand Down Expand Up @@ -157,6 +157,13 @@ trait SnapshotManagement { self: DeltaLog =>
}
}

/**
* This method is designed to efficiently and reliably list delta, compacted delta, and
* checkpoint files associated with a Delta Lake table. It makes parallel calls to both the
* file-system and a commit-store (if available), reconciles the results to account for
* asynchronous backfill operations, and ensures a comprehensive list of file statuses without
* missing any concurrently backfilled files.
*/
protected final def listDeltaCompactedDeltaAndCheckpointFilesWithCommitStore(
startVersion: Long,
commitStoreOpt: Option[CommitStore],
Expand All @@ -165,21 +172,31 @@ trait SnapshotManagement { self: DeltaLog =>
self, "delta.deltaLog.listDeltaAndCheckpointFiles") {
// TODO(managed-commits): Make sure all usage of `listDeltaCompactedDeltaAndCheckpointFiles`
// are replaced with this method.
val resultFromCommitStore = recordFrameProfile("DeltaLog", "CommitStore.getCommits") {
commitStoreOpt match {
case Some(cs) => cs.getCommits(logPath, startVersion, endVersion = versionToLoad).commits
case None => Seq.empty
// Submit a potential async call to get commits from commit store if available
val unbackfilledCommitsResponseOptFuture = commitStoreOpt.map { commitStore =>
val threadPool = SnapshotManagement.commitStoreGetCommitsThreadPool
val task = () => recordFrameProfile("DeltaLog", "CommitStore.getCommits") {
commitStore.getCommits(logPath, startVersion, endVersion = versionToLoad)
}
if (threadPool.getActiveCount < threadPool.getMaximumPoolSize) {
threadPool.submit[GetCommitsResponse](spark)(task())
} else {
// If the thread pool is full, we should not submit more tasks to it. Instead, we should
// run the task in the current thread.
CompletableFuture.completedFuture(task())
}
}

var maxDeltaVersionSeen = startVersion - 1
val resultTuplesFromFsListingOpt = recordFrameProfile("DeltaLog", "listFromOrNone") {
listFromOrNone(startVersion).map {
_.flatMap {
def listDeltaCompactedDeltaAndCheckpointFilesOpt(
startVersionOverride: Long): Option[Array[(FileStatus, FileType.Value, Long)]] = {
recordFrameProfile("DeltaLog", "listFromOrNone") {
listFromOrNone(startVersionOverride).map {
_.flatMap {
case DeltaFile(f, fileVersion) =>
// Ideally listFromOrNone should return lexiographically sorted files amd so
// maxDeltaVersionSeen should be equal to fileVersion. But we are being defensive
// here and taking max of all the fileVersions seen.
// Ideally listFromOrNone should return lexicographically sorted files and so
// maxDeltaVersionSeen should be equal to fileVersion. But we are being
// defensive here and taking max of all the fileVersions seen.
maxDeltaVersionSeen = math.max(maxDeltaVersionSeen, fileVersion)
Some((f, FileType.DELTA, fileVersion))
case CompactedDeltaFile(f, startVersion, endVersion)
Expand All @@ -196,21 +213,60 @@ trait SnapshotManagement { self: DeltaLog =>
// take files up to the version we want to load
.takeWhile { case (_, _, fileVersion) => versionToLoad.forall(fileVersion <= _) }
.toArray
}
}
}
val resultFromCommitStoreFiltered = resultFromCommitStore

val initialLogTuplesFromFsListingOpt =
listDeltaCompactedDeltaAndCheckpointFilesOpt(startVersion)
val unbackfilledCommitsResponse = unbackfilledCommitsResponseOptFuture.getOrElse(
// Early exit if CommitStore is undefined.
return initialLogTuplesFromFsListingOpt.map(_.map(_._1))).get()

// The UUID commit files (along with the delta commit files) will be deleted in the Metadata
// Cleanup job, but its corresponding tracking entry might be immediately removed from the
// CommitStore during backfill. We might notice missing delta files if they were concurrently
// backfilled from the commit store during the above list operations. It is guaranteed that the
// next optional file-system list op should detect those backfilled files.
// Note: We only care about missing delta files with version <= versionToLoad
val areDeltaFilesMissing = unbackfilledCommitsResponse.commits.headOption match {
case Some(commit) =>
// Missing Delta files: [maxDeltaVersionSeen + 1, commit.head.version - 1]
maxDeltaVersionSeen + 1 < commit.version
case None =>
// Missing Delta files: [maxDeltaVersionSeen + 1, latestTableVersion]
// When there are no commits, we should consider the latestTableVersion from the commit
// store to detect if ALL trailing commits were concurrently backfilled.
unbackfilledCommitsResponse.latestTableVersion >= 0 &&
maxDeltaVersionSeen < unbackfilledCommitsResponse.latestTableVersion}
val needAdditionalFsListing =
versionToLoad.forall(maxDeltaVersionSeen < _) && areDeltaFilesMissing
val additionalLogTuplesFromFsListingOpt =
Option.when(needAdditionalFsListing) {
listDeltaCompactedDeltaAndCheckpointFilesOpt(
startVersionOverride = maxDeltaVersionSeen + 1)
}.flatten
val finalLogTuplesFromFsListingOpt =
(initialLogTuplesFromFsListingOpt, additionalLogTuplesFromFsListingOpt) match {
case (Some(initial), Some(additional)) => Some(initial ++ additional)
case (Some(initial), None) => Some(initial)
case _ => None
}

val unbackfilledCommitsFiltered = unbackfilledCommitsResponse.commits
.dropWhile(_.version <= maxDeltaVersionSeen)
.takeWhile(commit => versionToLoad.forall(commit.version <= _))
.map(_.fileStatus)
.toArray
if (resultTuplesFromFsListingOpt.isEmpty && resultFromCommitStoreFiltered.nonEmpty) {
if (finalLogTuplesFromFsListingOpt.isEmpty && unbackfilledCommitsFiltered.nonEmpty) {
throw new IllegalStateException("No files found from the file system listing, but " +
"files found from the commit store. This is unexpected.")
s"files found from the commit store. This is unexpected. First Commit Store file in the " +
s"listing: ${unbackfilledCommitsFiltered.head.getPath}")
}

// If result from fs listing is None and result from commit-store is empty, return none.
// This is used by caller to distinguish whether table doesn't exist.
resultTuplesFromFsListingOpt.map { resultTuplesFromFsListing =>
resultTuplesFromFsListing.map(_._1) ++ resultFromCommitStoreFiltered
finalLogTuplesFromFsListingOpt.map { logTuplesFromFsListing =>
logTuplesFromFsListing.map(_._1) ++ unbackfilledCommitsFiltered
}
}

Expand Down Expand Up @@ -1075,7 +1131,8 @@ trait SnapshotManagement { self: DeltaLog =>
.map(manuallyLoadCheckpoint)
getLogSegmentForVersion(
versionToLoad = Some(version),
lastCheckpointInfo = lastCheckpointInfoHint
lastCheckpointInfo = lastCheckpointInfoHint,
commitStoreOpt = current.commitStoreOpt
).map { segment =>
createSnapshot(
initSegment = segment,
Expand All @@ -1101,6 +1158,13 @@ object SnapshotManagement {
new DeltaThreadPool(tpe)
}

protected[delta] lazy val commitStoreGetCommitsThreadPool = {
val numThreads = SparkSession.active.sessionState.conf
.getConf(DeltaSQLConf.DELTA_LIST_FROM_COMMIT_STORE_THREAD_POOL_SIZE)
val tpe = ThreadUtils.newDaemonCachedThreadPool("commit-store-get-commits", numThreads)
new DeltaThreadPool(tpe)
}

/**
* - Verify the versions are contiguous.
* - Verify the versions start with `expectedStartVersion` if it's specified.
Expand All @@ -1123,7 +1187,7 @@ object SnapshotManagement {
s"file version: $v to compute Snapshot")
}
expectedEndVersion.foreach { v =>
require(versions.nonEmpty && versions.last == v, "Did not get the first delta " +
require(versions.nonEmpty && versions.last == v, "Did not get the last delta " +
s"file version: $v to compute Snapshot")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,14 @@ trait DeltaSQLConfBase {
.checkValue(_ > 0, "threadPoolSize must be positive")
.createWithDefault(20)

val DELTA_LIST_FROM_COMMIT_STORE_THREAD_POOL_SIZE =
buildStaticConf("listFrom.commitStore.threadPoolSize")
.internal()
.doc("The size of the thread pool for listing files from the CommitStore.")
.intConf
.checkValue(_ > 0, "threadPoolSize must be positive")
.createWithDefault(5)

val DELTA_ASSUMES_DROP_CONSTRAINT_IF_EXISTS =
buildConf("constraints.assumesDropIfExists.enabled")
.doc("""If true, DROP CONSTRAINT quietly drops nonexistent constraints even without
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,21 @@
package org.apache.spark.sql.delta

import java.io.{File, FileNotFoundException, RandomAccessFile}
import java.lang.Thread.sleep
import java.util.concurrent.ExecutionException

import org.apache.spark.sql.delta.managedcommit.Commit
import scala.collection.mutable

import org.apache.spark.sql.delta.DeltaConfigs.MANAGED_COMMIT_OWNER_NAME
import org.apache.spark.sql.delta.managedcommit.{Commit, CommitStore, CommitStoreBuilder, CommitStoreProvider, GetCommitsResponse, InMemoryCommitStore}
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.storage.LogStore
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
import org.apache.spark.sql.delta.test.DeltaSQLTestUtils
import org.apache.spark.sql.delta.test.DeltaTestImplicits._
import org.apache.spark.sql.delta.util.{FileNames, JsonUtils}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileStatus
import org.apache.hadoop.fs.Path

import org.apache.spark.SparkException
Expand Down Expand Up @@ -473,4 +480,84 @@ class SnapshotManagementSuite extends QueryTest with DeltaSQLTestUtils with Shar
spark.read.format("delta").load(path).collect()
}
}

Seq(true, false).foreach { gapAtTheEnd =>
test(s"concurrent backfills are properly reconciled with gapAtTheEnd: $gapAtTheEnd") {
val commitStoreName = "awaiting-commit-store"
// We defer backfills to manually trigger concurrent backfills for versions [3, 5] during
// CommitStore.getCommits but after the LogStore listing.
// Depending on the value of gapAtTheEnd, we also write versions [6, 8] to the log.
val deferredBackfillMap: mutable.Map[Long, () => Unit] = mutable.Map.empty
var finishPendingBackfillsDuringListCommits = false

case class AwaitingCommitStore() extends InMemoryCommitStore(5) {
override def getCommits(
logPath: Path,
startVersion: Long,
endVersion: Option[Long]): GetCommitsResponse = {
if (finishPendingBackfillsDuringListCommits) {
logInfo(
s"Finishing pending backfills concurrently with the list calls " +
s"${deferredBackfillMap.keySet}")
// Add a 5s delay to ensure file-system listFrom call completes and then backfill
// commit [3, 5] to create a perceived gap between file-system deltas and
// commit-store commits and test the reconciliation logic works as expected.
sleep(5000)
(3 to 5).foreach(version => deferredBackfillMap(version)())
}
super.getCommits(logPath, startVersion, endVersion)
}
override def backfill(
logStore: LogStore,
hadoopConf: Configuration,
logPath: Path,
version: Long,
fileStatus: FileStatus): Unit = {
if (version <= 2) {
super.backfill(logStore, hadoopConf, logPath, version, fileStatus)
} else {
deferredBackfillMap(version) =
() => super.backfill(logStore, hadoopConf, logPath, version, fileStatus)
}
}
}

object AwaitingCommitStoreBuilder extends CommitStoreBuilder {
private lazy val awaitingCommitStore = AwaitingCommitStore()

override def name: String = commitStoreName
override def build(conf: Map[String, String]): CommitStore = awaitingCommitStore
}

CommitStoreProvider.clearNonDefaultBuilders()
CommitStoreProvider.registerBuilder(AwaitingCommitStoreBuilder)
withSQLConf(
MANAGED_COMMIT_OWNER_NAME.defaultTablePropertyKey -> commitStoreName,
DeltaSQLConf.DELTALOG_MINOR_COMPACTION_USE_FOR_READS.key -> "false") {
withTempDir { tempDir =>
val path = tempDir.getCanonicalPath
val dataPath = new Path(path)
spark.range(10).write.format("delta").save(path)
(1 to 5).foreach(_ => spark.range(10).write.format("delta").mode("append").save(path))
if (!gapAtTheEnd) {
(6 to 8).foreach(_ => spark.range(10).write.format("delta").mode("append").save(path))
}
finishPendingBackfillsDuringListCommits = true
// Invalidate cache to ensure delta doesn't return the cached snapshot.
DeltaLog.invalidateCache(spark, dataPath)
val snapshot = DeltaLog.forTable(spark, dataPath).update()
// Everything until
snapshot.logSegment.deltas.zipWithIndex.foreach { case (delta, index) =>
if (index <= 5) {
assert(!delta.getPath.getName.matches(FileNames.uuidDeltaFileRegex.toString()),
index)
} else {
assert(delta.getPath.getName.matches(FileNames.uuidDeltaFileRegex.toString()),
index)
}
}
}
}
}
}
}

0 comments on commit b88e270

Please sign in to comment.