From b88e270c0e5c464b7f3628725a7e1c75fdd91b4f Mon Sep 17 00:00:00 2001 From: Sumeet Varma Date: Wed, 20 Mar 2024 13:37:45 -0700 Subject: [PATCH] Parallel calls --- .../spark/sql/delta/SnapshotManagement.scala | 104 ++++++++++++++---- .../sql/delta/sources/DeltaSQLConf.scala | 8 ++ .../sql/delta/SnapshotManagementSuite.scala | 89 ++++++++++++++- 3 files changed, 180 insertions(+), 21 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala b/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala index c9fe365fec2..873edb26c98 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.delta import java.io.FileNotFoundException import java.util.Objects -import java.util.concurrent.Future +import java.util.concurrent.{CompletableFuture, Future} import java.util.concurrent.locks.ReentrantLock import scala.collection.mutable @@ -29,7 +29,7 @@ import scala.util.control.NonFatal import com.databricks.spark.util.TagDefinitions.TAG_ASYNC import org.apache.spark.sql.delta.actions.Metadata -import org.apache.spark.sql.delta.managedcommit.{Commit, CommitStore} +import org.apache.spark.sql.delta.managedcommit.{Commit, CommitStore, GetCommitsResponse} import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.util.FileNames._ import org.apache.spark.sql.delta.util.JsonUtils @@ -157,6 +157,13 @@ trait SnapshotManagement { self: DeltaLog => } } + /** + * This method is designed to efficiently and reliably list delta, compacted delta, and + * checkpoint files associated with a Delta Lake table. It makes parallel calls to both the + * file-system and a commit-store (if available), reconciles the results to account for + * asynchronous backfill operations, and ensures a comprehensive list of file statuses without + * missing any concurrently backfilled files. + */ protected final def listDeltaCompactedDeltaAndCheckpointFilesWithCommitStore( startVersion: Long, commitStoreOpt: Option[CommitStore], @@ -165,21 +172,31 @@ trait SnapshotManagement { self: DeltaLog => self, "delta.deltaLog.listDeltaAndCheckpointFiles") { // TODO(managed-commits): Make sure all usage of `listDeltaCompactedDeltaAndCheckpointFiles` // are replaced with this method. - val resultFromCommitStore = recordFrameProfile("DeltaLog", "CommitStore.getCommits") { - commitStoreOpt match { - case Some(cs) => cs.getCommits(logPath, startVersion, endVersion = versionToLoad).commits - case None => Seq.empty + // Submit a potential async call to get commits from commit store if available + val unbackfilledCommitsResponseOptFuture = commitStoreOpt.map { commitStore => + val threadPool = SnapshotManagement.commitStoreGetCommitsThreadPool + val task = () => recordFrameProfile("DeltaLog", "CommitStore.getCommits") { + commitStore.getCommits(logPath, startVersion, endVersion = versionToLoad) + } + if (threadPool.getActiveCount < threadPool.getMaximumPoolSize) { + threadPool.submit[GetCommitsResponse](spark)(task()) + } else { + // If the thread pool is full, we should not submit more tasks to it. Instead, we should + // run the task in the current thread. + CompletableFuture.completedFuture(task()) } } var maxDeltaVersionSeen = startVersion - 1 - val resultTuplesFromFsListingOpt = recordFrameProfile("DeltaLog", "listFromOrNone") { - listFromOrNone(startVersion).map { - _.flatMap { + def listDeltaCompactedDeltaAndCheckpointFilesOpt( + startVersionOverride: Long): Option[Array[(FileStatus, FileType.Value, Long)]] = { + recordFrameProfile("DeltaLog", "listFromOrNone") { + listFromOrNone(startVersionOverride).map { + _.flatMap { case DeltaFile(f, fileVersion) => - // Ideally listFromOrNone should return lexiographically sorted files amd so - // maxDeltaVersionSeen should be equal to fileVersion. But we are being defensive - // here and taking max of all the fileVersions seen. + // Ideally listFromOrNone should return lexicographically sorted files and so + // maxDeltaVersionSeen should be equal to fileVersion. But we are being + // defensive here and taking max of all the fileVersions seen. maxDeltaVersionSeen = math.max(maxDeltaVersionSeen, fileVersion) Some((f, FileType.DELTA, fileVersion)) case CompactedDeltaFile(f, startVersion, endVersion) @@ -196,21 +213,60 @@ trait SnapshotManagement { self: DeltaLog => // take files up to the version we want to load .takeWhile { case (_, _, fileVersion) => versionToLoad.forall(fileVersion <= _) } .toArray + } } } - val resultFromCommitStoreFiltered = resultFromCommitStore + + val initialLogTuplesFromFsListingOpt = + listDeltaCompactedDeltaAndCheckpointFilesOpt(startVersion) + val unbackfilledCommitsResponse = unbackfilledCommitsResponseOptFuture.getOrElse( + // Early exit if CommitStore is undefined. + return initialLogTuplesFromFsListingOpt.map(_.map(_._1))).get() + + // The UUID commit files (along with the delta commit files) will be deleted in the Metadata + // Cleanup job, but its corresponding tracking entry might be immediately removed from the + // CommitStore during backfill. We might notice missing delta files if they were concurrently + // backfilled from the commit store during the above list operations. It is guaranteed that the + // next optional file-system list op should detect those backfilled files. + // Note: We only care about missing delta files with version <= versionToLoad + val areDeltaFilesMissing = unbackfilledCommitsResponse.commits.headOption match { + case Some(commit) => + // Missing Delta files: [maxDeltaVersionSeen + 1, commit.head.version - 1] + maxDeltaVersionSeen + 1 < commit.version + case None => + // Missing Delta files: [maxDeltaVersionSeen + 1, latestTableVersion] + // When there are no commits, we should consider the latestTableVersion from the commit + // store to detect if ALL trailing commits were concurrently backfilled. + unbackfilledCommitsResponse.latestTableVersion >= 0 && + maxDeltaVersionSeen < unbackfilledCommitsResponse.latestTableVersion} + val needAdditionalFsListing = + versionToLoad.forall(maxDeltaVersionSeen < _) && areDeltaFilesMissing + val additionalLogTuplesFromFsListingOpt = + Option.when(needAdditionalFsListing) { + listDeltaCompactedDeltaAndCheckpointFilesOpt( + startVersionOverride = maxDeltaVersionSeen + 1) + }.flatten + val finalLogTuplesFromFsListingOpt = + (initialLogTuplesFromFsListingOpt, additionalLogTuplesFromFsListingOpt) match { + case (Some(initial), Some(additional)) => Some(initial ++ additional) + case (Some(initial), None) => Some(initial) + case _ => None + } + + val unbackfilledCommitsFiltered = unbackfilledCommitsResponse.commits .dropWhile(_.version <= maxDeltaVersionSeen) .takeWhile(commit => versionToLoad.forall(commit.version <= _)) .map(_.fileStatus) - .toArray - if (resultTuplesFromFsListingOpt.isEmpty && resultFromCommitStoreFiltered.nonEmpty) { + if (finalLogTuplesFromFsListingOpt.isEmpty && unbackfilledCommitsFiltered.nonEmpty) { throw new IllegalStateException("No files found from the file system listing, but " + - "files found from the commit store. This is unexpected.") + s"files found from the commit store. This is unexpected. First Commit Store file in the " + + s"listing: ${unbackfilledCommitsFiltered.head.getPath}") } + // If result from fs listing is None and result from commit-store is empty, return none. // This is used by caller to distinguish whether table doesn't exist. - resultTuplesFromFsListingOpt.map { resultTuplesFromFsListing => - resultTuplesFromFsListing.map(_._1) ++ resultFromCommitStoreFiltered + finalLogTuplesFromFsListingOpt.map { logTuplesFromFsListing => + logTuplesFromFsListing.map(_._1) ++ unbackfilledCommitsFiltered } } @@ -1075,7 +1131,8 @@ trait SnapshotManagement { self: DeltaLog => .map(manuallyLoadCheckpoint) getLogSegmentForVersion( versionToLoad = Some(version), - lastCheckpointInfo = lastCheckpointInfoHint + lastCheckpointInfo = lastCheckpointInfoHint, + commitStoreOpt = current.commitStoreOpt ).map { segment => createSnapshot( initSegment = segment, @@ -1101,6 +1158,13 @@ object SnapshotManagement { new DeltaThreadPool(tpe) } + protected[delta] lazy val commitStoreGetCommitsThreadPool = { + val numThreads = SparkSession.active.sessionState.conf + .getConf(DeltaSQLConf.DELTA_LIST_FROM_COMMIT_STORE_THREAD_POOL_SIZE) + val tpe = ThreadUtils.newDaemonCachedThreadPool("commit-store-get-commits", numThreads) + new DeltaThreadPool(tpe) + } + /** * - Verify the versions are contiguous. * - Verify the versions start with `expectedStartVersion` if it's specified. @@ -1123,7 +1187,7 @@ object SnapshotManagement { s"file version: $v to compute Snapshot") } expectedEndVersion.foreach { v => - require(versions.nonEmpty && versions.last == v, "Did not get the first delta " + + require(versions.nonEmpty && versions.last == v, "Did not get the last delta " + s"file version: $v to compute Snapshot") } } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala index cd5fbfd8b7c..79832792d7c 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala @@ -506,6 +506,14 @@ trait DeltaSQLConfBase { .checkValue(_ > 0, "threadPoolSize must be positive") .createWithDefault(20) + val DELTA_LIST_FROM_COMMIT_STORE_THREAD_POOL_SIZE = + buildStaticConf("listFrom.commitStore.threadPoolSize") + .internal() + .doc("The size of the thread pool for listing files from the CommitStore.") + .intConf + .checkValue(_ > 0, "threadPoolSize must be positive") + .createWithDefault(5) + val DELTA_ASSUMES_DROP_CONSTRAINT_IF_EXISTS = buildConf("constraints.assumesDropIfExists.enabled") .doc("""If true, DROP CONSTRAINT quietly drops nonexistent constraints even without diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/SnapshotManagementSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/SnapshotManagementSuite.scala index 17ee49433e3..d54b4d0d8e5 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/SnapshotManagementSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/SnapshotManagementSuite.scala @@ -17,14 +17,21 @@ package org.apache.spark.sql.delta import java.io.{File, FileNotFoundException, RandomAccessFile} +import java.lang.Thread.sleep import java.util.concurrent.ExecutionException -import org.apache.spark.sql.delta.managedcommit.Commit +import scala.collection.mutable + +import org.apache.spark.sql.delta.DeltaConfigs.MANAGED_COMMIT_OWNER_NAME +import org.apache.spark.sql.delta.managedcommit.{Commit, CommitStore, CommitStoreBuilder, CommitStoreProvider, GetCommitsResponse, InMemoryCommitStore} import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.spark.sql.delta.storage.LogStore import org.apache.spark.sql.delta.test.DeltaSQLCommandTest import org.apache.spark.sql.delta.test.DeltaSQLTestUtils import org.apache.spark.sql.delta.test.DeltaTestImplicits._ import org.apache.spark.sql.delta.util.{FileNames, JsonUtils} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.FileStatus import org.apache.hadoop.fs.Path import org.apache.spark.SparkException @@ -473,4 +480,84 @@ class SnapshotManagementSuite extends QueryTest with DeltaSQLTestUtils with Shar spark.read.format("delta").load(path).collect() } } + + Seq(true, false).foreach { gapAtTheEnd => + test(s"concurrent backfills are properly reconciled with gapAtTheEnd: $gapAtTheEnd") { + val commitStoreName = "awaiting-commit-store" + // We defer backfills to manually trigger concurrent backfills for versions [3, 5] during + // CommitStore.getCommits but after the LogStore listing. + // Depending on the value of gapAtTheEnd, we also write versions [6, 8] to the log. + val deferredBackfillMap: mutable.Map[Long, () => Unit] = mutable.Map.empty + var finishPendingBackfillsDuringListCommits = false + + case class AwaitingCommitStore() extends InMemoryCommitStore(5) { + override def getCommits( + logPath: Path, + startVersion: Long, + endVersion: Option[Long]): GetCommitsResponse = { + if (finishPendingBackfillsDuringListCommits) { + logInfo( + s"Finishing pending backfills concurrently with the list calls " + + s"${deferredBackfillMap.keySet}") + // Add a 5s delay to ensure file-system listFrom call completes and then backfill + // commit [3, 5] to create a perceived gap between file-system deltas and + // commit-store commits and test the reconciliation logic works as expected. + sleep(5000) + (3 to 5).foreach(version => deferredBackfillMap(version)()) + } + super.getCommits(logPath, startVersion, endVersion) + } + override def backfill( + logStore: LogStore, + hadoopConf: Configuration, + logPath: Path, + version: Long, + fileStatus: FileStatus): Unit = { + if (version <= 2) { + super.backfill(logStore, hadoopConf, logPath, version, fileStatus) + } else { + deferredBackfillMap(version) = + () => super.backfill(logStore, hadoopConf, logPath, version, fileStatus) + } + } + } + + object AwaitingCommitStoreBuilder extends CommitStoreBuilder { + private lazy val awaitingCommitStore = AwaitingCommitStore() + + override def name: String = commitStoreName + override def build(conf: Map[String, String]): CommitStore = awaitingCommitStore + } + + CommitStoreProvider.clearNonDefaultBuilders() + CommitStoreProvider.registerBuilder(AwaitingCommitStoreBuilder) + withSQLConf( + MANAGED_COMMIT_OWNER_NAME.defaultTablePropertyKey -> commitStoreName, + DeltaSQLConf.DELTALOG_MINOR_COMPACTION_USE_FOR_READS.key -> "false") { + withTempDir { tempDir => + val path = tempDir.getCanonicalPath + val dataPath = new Path(path) + spark.range(10).write.format("delta").save(path) + (1 to 5).foreach(_ => spark.range(10).write.format("delta").mode("append").save(path)) + if (!gapAtTheEnd) { + (6 to 8).foreach(_ => spark.range(10).write.format("delta").mode("append").save(path)) + } + finishPendingBackfillsDuringListCommits = true + // Invalidate cache to ensure delta doesn't return the cached snapshot. + DeltaLog.invalidateCache(spark, dataPath) + val snapshot = DeltaLog.forTable(spark, dataPath).update() + // Everything until + snapshot.logSegment.deltas.zipWithIndex.foreach { case (delta, index) => + if (index <= 5) { + assert(!delta.getPath.getName.matches(FileNames.uuidDeltaFileRegex.toString()), + index) + } else { + assert(delta.getPath.getName.matches(FileNames.uuidDeltaFileRegex.toString()), + index) + } + } + } + } + } + } }