From 538e736ea5526e98a7b0e9124315757c1d5e54f3 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Mon, 23 Sep 2024 10:10:57 -0700 Subject: [PATCH] Set parallelism for the parallelize job in recursiveListDirs (#3708) #### Which Delta project/connector is this regarding? - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description `DeltaFileOperations.recursiveListDirs` calls `parallelize` without specifying the parallelism. Hence, it always uses [the number of available cores on a cluster](https://github.com/apache/spark/blob/d2e8c1cb60e34a1c7e92374c07d682aa5ca79145/core/src/main/scala/org/apache/spark/SparkContext.scala#L1003). When a cluster has many cores but `subDirs` is small, it will launch many empty tasks. This PR makes a small change to use `subDirs.length.min(spark.sparkContext.defaultParallelism)` as the parallelism so that when `subDirs` is smaller than the number of available cores, it will not launch empty tasks. ## How was this patch tested? Existing tests. ## Does this PR introduce _any_ user-facing changes? No --- .../apache/spark/sql/delta/util/DeltaFileOperations.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/util/DeltaFileOperations.scala b/spark/src/main/scala/org/apache/spark/sql/delta/util/DeltaFileOperations.scala index bf1d4727fd7..8df11b3e18b 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/util/DeltaFileOperations.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/util/DeltaFileOperations.scala @@ -243,7 +243,10 @@ object DeltaFileOperations extends DeltaLogging { import org.apache.spark.sql.delta.implicits._ if (subDirs.isEmpty) return spark.emptyDataset[SerializableFileStatus] val listParallelism = fileListingParallelism.getOrElse(spark.sparkContext.defaultParallelism) - val dirsAndFiles = spark.sparkContext.parallelize(subDirs).mapPartitions { dirs => + val subDirsParallelism = subDirs.length.min(spark.sparkContext.defaultParallelism) + val dirsAndFiles = spark.sparkContext.parallelize( + subDirs, + subDirsParallelism).mapPartitions { dirs => val logStore = LogStore(SparkEnv.get.conf, hadoopConf.value.value) listUsingLogStore( logStore,