From c01665b42cf549f9b217cf04881df9e5e24a3292 Mon Sep 17 00:00:00 2001 From: Sumeet Varma Date: Thu, 21 Mar 2024 18:44:21 -0700 Subject: [PATCH] Parallel calls --- .../apache/spark/sql/delta/SnapshotManagement.scala | 10 ++++++---- .../spark/sql/delta/util/threads/DeltaThreadPool.scala | 3 +++ 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala b/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala index dd26283c6fc..d15e468c476 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala @@ -231,13 +231,15 @@ trait SnapshotManagement { self: DeltaLog => maxDeltaVersionSeen < unbackfilledCommitsResponse.latestTableVersion} val needAdditionalFsListing = versionToLoad.forall(maxDeltaVersionSeen < _) && areDeltaFilesMissing - val additionalLogTuplesFromFsListingOpt = - Option.when(needAdditionalFsListing) { + val additionalLogTuplesFromFsListingOpt: Option[Array[(FileStatus, FileType.Value, Long)]] = + if (needAdditionalFsListing) { recordDeltaEvent(this, "delta.listDeltaAndCheckpointFiles.needAdditionalFsListing") listDeltaCompactedDeltaAndCheckpointFilesOpt( startVersionOverride = maxDeltaVersionSeen + 1) - }.flatten - val finalLogTuplesFromFsListingOpt = + } else { + None + } + val finalLogTuplesFromFsListingOpt: Option[Array[(FileStatus, FileType.Value, Long)]] = (initialLogTuplesFromFsListingOpt, additionalLogTuplesFromFsListingOpt) match { case (Some(initial), Some(additional)) => Some(initial ++ additional) case (Some(initial), None) => Some(initial) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/util/threads/DeltaThreadPool.scala b/spark/src/main/scala/org/apache/spark/sql/delta/util/threads/DeltaThreadPool.scala index f5f6e973940..68488732826 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/util/threads/DeltaThreadPool.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/util/threads/DeltaThreadPool.scala @@ -31,6 +31,9 @@ import org.apache.spark.util.ThreadUtils.namedThreadFactory /** A wrapper for [[ThreadPoolExecutor]] whose tasks run with the caller's [[SparkSession]]. */ private[delta] class DeltaThreadPool(tpe: ThreadPoolExecutor) { + def getActiveCount: Int = tpe.getActiveCount + def getMaximumPoolSize: Int = tpe.getMaximumPoolSize + /** Submits a task for execution and returns a [[Future]] representing that task. */ def submit[T](spark: SparkSession)(body: => T): Future[T] = { tpe.submit { () => spark.withActive(body) }