Skip to content

Commit

Permalink
Parallel calls
Browse files Browse the repository at this point in the history
  • Loading branch information
sumeet-db committed Mar 22, 2024
1 parent 27a894e commit c01665b
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -231,13 +231,15 @@ trait SnapshotManagement { self: DeltaLog =>
maxDeltaVersionSeen < unbackfilledCommitsResponse.latestTableVersion}
val needAdditionalFsListing =
versionToLoad.forall(maxDeltaVersionSeen < _) && areDeltaFilesMissing
val additionalLogTuplesFromFsListingOpt =
Option.when(needAdditionalFsListing) {
val additionalLogTuplesFromFsListingOpt: Option[Array[(FileStatus, FileType.Value, Long)]] =
if (needAdditionalFsListing) {
recordDeltaEvent(this, "delta.listDeltaAndCheckpointFiles.needAdditionalFsListing")
listDeltaCompactedDeltaAndCheckpointFilesOpt(
startVersionOverride = maxDeltaVersionSeen + 1)
}.flatten
val finalLogTuplesFromFsListingOpt =
} else {
None
}
val finalLogTuplesFromFsListingOpt: Option[Array[(FileStatus, FileType.Value, Long)]] =
(initialLogTuplesFromFsListingOpt, additionalLogTuplesFromFsListingOpt) match {
case (Some(initial), Some(additional)) => Some(initial ++ additional)
case (Some(initial), None) => Some(initial)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ import org.apache.spark.util.ThreadUtils.namedThreadFactory

/** A wrapper for [[ThreadPoolExecutor]] whose tasks run with the caller's [[SparkSession]]. */
private[delta] class DeltaThreadPool(tpe: ThreadPoolExecutor) {
def getActiveCount: Int = tpe.getActiveCount
def getMaximumPoolSize: Int = tpe.getMaximumPoolSize

/** Submits a task for execution and returns a [[Future]] representing that task. */
def submit[T](spark: SparkSession)(body: => T): Future[T] = {
tpe.submit { () => spark.withActive(body) }
Expand Down

0 comments on commit c01665b

Please sign in to comment.