From 595ad93a4f59ce7af85b6e2ca45bf9a6a40d1deb Mon Sep 17 00:00:00 2001 From: Mark Hamstra Date: Wed, 14 Aug 2013 14:16:11 -0700 Subject: [PATCH 1/9] Added stageId <--> jobId mapping --- .../apache/spark/scheduler/DAGScheduler.scala | 62 ++++++++++++++++++- .../spark/scheduler/SparkListener.scala | 2 +- 2 files changed, 62 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 92add5b073..14068d824e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -109,6 +109,10 @@ class DAGScheduler( val nextStageId = new AtomicInteger(0) + val jobIdToStageIds = new TimeStampedHashMap[Int, HashSet[Int]] + + val stageIdToJobIds = new TimeStampedHashMap[Int, HashSet[Int]] + val stageIdToStage = new TimeStampedHashMap[Int, Stage] val shuffleToMapStage = new TimeStampedHashMap[Int, Stage] @@ -206,6 +210,7 @@ class DAGScheduler( val id = nextStageId.getAndIncrement() val stage = new Stage(id, rdd, shuffleDep, getParentStages(rdd, jobId), jobId, callSite) stageIdToStage(id) = stage + registerJobIdWithStages(jobId, stage) stageToInfos(stage) = StageInfo(stage) stage } @@ -261,6 +266,58 @@ class DAGScheduler( missing.toList } + private def registerJobIdWithStages(jobId: Int, stage: Stage) { + def registerJobIdWithStageList(stages: List[Stage]) { + if (!stages.isEmpty) { + val s = stages.head + stageIdToJobIds.getOrElseUpdate(s.id, new HashSet[Int]()) += jobId + val parents = getParentStages(s.rdd, jobId) + val parentsWithoutThisJobId = parents.filter(p => !stageIdToJobIds.get(p.id).exists(_.contains(jobId))) + registerJobIdWithStageList(parentsWithoutThisJobId ++ stages.tail) + } + } + registerJobIdWithStageList(List(stage)) + } + + private def jobIdToStageIdsAdd(jobId: Int) { + val stageSet = jobIdToStageIds.getOrElseUpdate(jobId, new HashSet[Int]()) + stageIdToJobIds.foreach{case (stageId, jobSet) => + if (jobSet.contains(jobId)) { + stageSet += stageId + } + } + } + + private def jobIdToStageIdsRemove(jobId: Int) { + if (!jobIdToStageIds.contains(jobId)) + logError("Trying to remove unregistered job " + jobId) + else { + val registeredStages = jobIdToStageIds(jobId) + if (registeredStages.isEmpty) + logError("No stages registered for job " + jobId) + else stageIdToJobIds.filterKeys(stageId => registeredStages.contains(stageId)).foreach { + case (stageId, jobSet) => { + if (!jobSet.contains(jobId)) + logError("Job %d not registered for stage %d even though that stage was registered for the job" + .format(jobId, stageId)) + else + jobSet -= jobId + if (jobSet.isEmpty) { + stageIdToStage.get(stageId).foreach{s => + pendingTasks -= s + waiting -= s + running -= s + failed -= s + } + stageIdToStage -= stageId + logDebug("After removal of stage %d, remaining stages = %d".format(stageId, stageIdToStage.size)) + } + } + } + jobIdToStageIds -= jobId + } + } + /** * Returns (and does not submit) a JobSubmitted event suitable to run a given job, and a * JobWaiter whose getResult() method will return the result of the job when it is complete. @@ -354,10 +411,11 @@ class DAGScheduler( // Compute very short actions like first() or take() with no parent stages locally. runLocally(job) } else { - listenerBus.post(SparkListenerJobStart(job, properties)) idToActiveJob(jobId) = job activeJobs += job resultStageToJob(finalStage) = job + jobIdToStageIdsAdd(jobId) + listenerBus.post(SparkListenerJobStart(job, jobIdToStageIds(jobId).toArray, properties)) submitStage(finalStage) } @@ -605,6 +663,7 @@ class DAGScheduler( resultStageToJob -= stage markStageAsFinished(stage) listenerBus.post(SparkListenerJobEnd(job, JobSucceeded)) + jobIdToStageIdsRemove(job.jobId) } job.listener.taskSucceeded(rt.outputId, event.result) } @@ -752,6 +811,7 @@ class DAGScheduler( val error = new SparkException("Job failed: " + reason) job.listener.jobFailed(error) listenerBus.post(SparkListenerJobEnd(job, JobFailed(error, Some(failedStage)))) + jobIdToStageIdsRemove(job.jobId) idToActiveJob -= resultStage.jobId activeJobs -= job resultStageToJob -= resultStage diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index c3cf4b8907..a004c472b1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -35,7 +35,7 @@ case class SparkListenerTaskStart(task: Task[_], taskInfo: TaskInfo) extends Spa case class SparkListenerTaskEnd(task: Task[_], reason: TaskEndReason, taskInfo: TaskInfo, taskMetrics: TaskMetrics) extends SparkListenerEvents -case class SparkListenerJobStart(job: ActiveJob, properties: Properties = null) +case class SparkListenerJobStart(job: ActiveJob, stageIds: Array[Int], properties: Properties = null) extends SparkListenerEvents case class SparkListenerJobEnd(job: ActiveJob, jobResult: JobResult) From 977a206a552bd8d97ba84ce305b3bea554fae5bf Mon Sep 17 00:00:00 2001 From: Mark Hamstra Date: Thu, 22 Aug 2013 13:16:41 -0700 Subject: [PATCH 2/9] DAGScheduler: - style fixes - clearOldValues for new TimeStampedHashMaps - log unexpected cleanup of data structures - cleanup stageToInfos and stageIdToLogIds --- .../apache/spark/scheduler/DAGScheduler.scala | 53 ++++++++++++++----- 1 file changed, 39 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 14068d824e..7b551fa8df 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -281,7 +281,7 @@ class DAGScheduler( private def jobIdToStageIdsAdd(jobId: Int) { val stageSet = jobIdToStageIds.getOrElseUpdate(jobId, new HashSet[Int]()) - stageIdToJobIds.foreach{case (stageId, jobSet) => + stageIdToJobIds.foreach { case (stageId, jobSet) => if (jobSet.contains(jobId)) { stageSet += stageId } @@ -289,31 +289,46 @@ class DAGScheduler( } private def jobIdToStageIdsRemove(jobId: Int) { - if (!jobIdToStageIds.contains(jobId)) + if (!jobIdToStageIds.contains(jobId)) { logError("Trying to remove unregistered job " + jobId) - else { + } else { val registeredStages = jobIdToStageIds(jobId) - if (registeredStages.isEmpty) + if (registeredStages.isEmpty) { logError("No stages registered for job " + jobId) - else stageIdToJobIds.filterKeys(stageId => registeredStages.contains(stageId)).foreach { + } else { stageIdToJobIds.filterKeys(stageId => registeredStages.contains(stageId)).foreach { case (stageId, jobSet) => { - if (!jobSet.contains(jobId)) + if (!jobSet.contains(jobId)) { logError("Job %d not registered for stage %d even though that stage was registered for the job" .format(jobId, stageId)) - else + } else { jobSet -= jobId + } if (jobSet.isEmpty) { - stageIdToStage.get(stageId).foreach{s => - pendingTasks -= s - waiting -= s - running -= s - failed -= s + stageIdToStage.get(stageId).foreach { s => + stageToInfos -= s + if (pendingTasks.contains(s)) { + logError("Tasks still pending for stage " + stageId + " even though there are no more jobs registered for that stage.") + pendingTasks -= s + } + if (waiting.contains(s)) { + logError("Still waiting on stage " + stageId + " even though there are no more jobs registered for that stage.") + waiting -= s + } + if (running.contains(s)) { + logError("Stage " + stageId + " still running even though there are no more jobs registered for that stage.") + running -= s + } + if (failed.contains(s)) { + logError("Stage " + stageId + " still registered as failed even though there are no more jobs registered for that stage.") + failed -= s + } } stageIdToStage -= stageId + stageIdToJobIds -= stageId logDebug("After removal of stage %d, remaining stages = %d".format(stageId, stageIdToStage.size)) } } - } + }} jobIdToStageIds -= jobId } } @@ -876,8 +891,9 @@ class DAGScheduler( case n: NarrowDependency[_] => for (inPart <- n.getParents(partition)) { val locs = getPreferredLocs(n.rdd, inPart) - if (locs != Nil) + if (locs != Nil) { return locs + } } case _ => }) @@ -900,6 +916,15 @@ class DAGScheduler( sizeBefore = stageToInfos.size stageToInfos.clearOldValues(cleanupTime) logInfo("stageToInfos " + sizeBefore + " --> " + stageToInfos.size) + + sizeBefore = jobIdToStageIds.size + jobIdToStageIds.clearOldValues(cleanupTime) + logInfo("jobIdToStageIds " + sizeBefore + " --> " + jobIdToStageIds.size) + + sizeBefore = stageIdToJobIds.size + stageIdToJobIds.clearOldValues(cleanupTime) + logInfo("stageIdToJobIds " + sizeBefore + " --> " + stageIdToJobIds.size) + } def stop() { From d202a5227d9546ed953d48dc421800e260f3d2de Mon Sep 17 00:00:00 2001 From: Mark Hamstra Date: Thu, 22 Aug 2013 16:58:39 -0700 Subject: [PATCH 3/9] cleanup --- .../apache/spark/scheduler/DAGScheduler.scala | 112 +++++++++--------- 1 file changed, 56 insertions(+), 56 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 7b551fa8df..638b8a5e30 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -289,46 +289,58 @@ class DAGScheduler( } private def jobIdToStageIdsRemove(jobId: Int) { + def removeStage(stageId: Int) { + // data structures based on Stage + stageIdToStage.get(stageId).foreach { s => { + stageToInfos -= s + if (pendingTasks.contains(s)) { + logError("Tasks still pending for stage %d even though there are no more jobs registered for that stage." + .format(stageId)) + pendingTasks -= s + } + if (waiting.contains(s)) { + logError("Still waiting on stage %d even though there are no more jobs registered for that stage." + .format(stageId)) + waiting -= s + } + if (running.contains(s)) { + logError("Stage %d still running even though there are no more jobs registered for that stage." + .format(stageId)) + running -= s + } + if (failed.contains(s)) { + logError("Stage %d still registered as failed even though there are no more jobs registered for that stage." + .format(stageId)) + failed -= s + } + }} + // data structures based on StageId + stageIdToStage -= stageId + stageIdToJobIds -= stageId + + logDebug("After removal of stage %d, remaining stages = %d".format(stageId, stageIdToStage.size)) + } + if (!jobIdToStageIds.contains(jobId)) { logError("Trying to remove unregistered job " + jobId) } else { val registeredStages = jobIdToStageIds(jobId) if (registeredStages.isEmpty) { logError("No stages registered for job " + jobId) - } else { stageIdToJobIds.filterKeys(stageId => registeredStages.contains(stageId)).foreach { - case (stageId, jobSet) => { - if (!jobSet.contains(jobId)) { - logError("Job %d not registered for stage %d even though that stage was registered for the job" - .format(jobId, stageId)) - } else { - jobSet -= jobId - } - if (jobSet.isEmpty) { - stageIdToStage.get(stageId).foreach { s => - stageToInfos -= s - if (pendingTasks.contains(s)) { - logError("Tasks still pending for stage " + stageId + " even though there are no more jobs registered for that stage.") - pendingTasks -= s - } - if (waiting.contains(s)) { - logError("Still waiting on stage " + stageId + " even though there are no more jobs registered for that stage.") - waiting -= s - } - if (running.contains(s)) { - logError("Stage " + stageId + " still running even though there are no more jobs registered for that stage.") - running -= s - } - if (failed.contains(s)) { - logError("Stage " + stageId + " still registered as failed even though there are no more jobs registered for that stage.") - failed -= s - } + } else { + stageIdToJobIds.filterKeys(stageId => registeredStages.contains(stageId)).foreach { + case (stageId, jobSet) => { + if (!jobSet.contains(jobId)) { + logError("Job %d not registered for stage %d even though that stage was registered for the job" + .format(jobId, stageId)) + } else { + jobSet -= jobId + } + if (jobSet.isEmpty) { // nobody needs this stage any more + removeStage(stageId) } - stageIdToStage -= stageId - stageIdToJobIds -= stageId - logDebug("After removal of stage %d, remaining stages = %d".format(stageId, stageIdToStage.size)) } - } - }} + }} jobIdToStageIds -= jobId } } @@ -901,30 +913,18 @@ class DAGScheduler( } private def cleanup(cleanupTime: Long) { - var sizeBefore = stageIdToStage.size - stageIdToStage.clearOldValues(cleanupTime) - logInfo("stageIdToStage " + sizeBefore + " --> " + stageIdToStage.size) - - sizeBefore = shuffleToMapStage.size - shuffleToMapStage.clearOldValues(cleanupTime) - logInfo("shuffleToMapStage " + sizeBefore + " --> " + shuffleToMapStage.size) - - sizeBefore = pendingTasks.size - pendingTasks.clearOldValues(cleanupTime) - logInfo("pendingTasks " + sizeBefore + " --> " + pendingTasks.size) - - sizeBefore = stageToInfos.size - stageToInfos.clearOldValues(cleanupTime) - logInfo("stageToInfos " + sizeBefore + " --> " + stageToInfos.size) - - sizeBefore = jobIdToStageIds.size - jobIdToStageIds.clearOldValues(cleanupTime) - logInfo("jobIdToStageIds " + sizeBefore + " --> " + jobIdToStageIds.size) - - sizeBefore = stageIdToJobIds.size - stageIdToJobIds.clearOldValues(cleanupTime) - logInfo("stageIdToJobIds " + sizeBefore + " --> " + stageIdToJobIds.size) - + Map( + "stageIdToStage" -> stageIdToStage, + "shuffleToMapStage" -> shuffleToMapStage, + "pendingTasks" -> pendingTasks, + "stageToInfos" -> stageToInfos, + "jobIdToStageIds" -> jobIdToStageIds, + "stageIdToJobIds" -> stageIdToJobIds). + foreach { case(s, t) => { + val sizeBefore = t.size + t.clearOldValues(cleanupTime) + logInfo("%s %d --> %d".format(s, sizeBefore, t.size)) + }} } def stop() { From 2eb94a0236135f8d26c366691d7b5795f7b01601 Mon Sep 17 00:00:00 2001 From: Mark Hamstra Date: Fri, 23 Aug 2013 13:48:13 -0700 Subject: [PATCH 4/9] Added empty data structure checks to DAGSchedulerSuite Clean up after jobs run locally --- .../apache/spark/scheduler/DAGScheduler.scala | 7 +++++ .../spark/scheduler/DAGSchedulerEvent.scala | 2 ++ .../spark/scheduler/DAGSchedulerSuite.scala | 28 +++++++++++++++++++ 3 files changed, 37 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 638b8a5e30..b97ab5536f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -460,6 +460,11 @@ class DAGScheduler( completion.task, completion.reason, completion.taskInfo, completion.taskMetrics)) handleTaskCompletion(completion) + case LocalJobCompleted(stage) => + stageIdToJobIds -= stage.id + stageIdToStage -= stage.id + stageToInfos -= stage + case TaskSetFailed(taskSet, reason) => abortStage(stageIdToStage(taskSet.stageId), reason) @@ -573,6 +578,8 @@ class DAGScheduler( } catch { case e: Exception => job.listener.jobFailed(e) + } finally { + eventQueue.put(LocalJobCompleted(job.finalStage)) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala index 0d99670648..eef5856145 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala @@ -55,6 +55,8 @@ private[spark] case class CompletionEvent( taskMetrics: TaskMetrics) extends DAGSchedulerEvent +private[spark] case class LocalJobCompleted(stage: Stage) extends DAGSchedulerEvent + private[spark] case class ExecutorGained(execId: String, host: String) extends DAGSchedulerEvent private[spark] case class ExecutorLost(execId: String) extends DAGSchedulerEvent diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 94f66c94c6..178c5b9f3b 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -205,6 +205,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont } submit(rdd, Array(), listener = fakeListener) assert(numResults === 0) +// assertDataStructuresEmpty(scheduler) } test("run trivial job") { @@ -212,6 +213,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont submit(rdd, Array(0)) complete(taskSets(0), List((Success, 42))) assert(results === Map(0 -> 42)) + assertDataStructuresEmpty(scheduler) } test("local job") { @@ -223,7 +225,10 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont override def toString = "DAGSchedulerSuite Local RDD" } runEvent(JobSubmitted(rdd, jobComputeFunc, Array(0), true, null, listener)) + assert(scheduler.stageToInfos.size === 1) + runEvent(LocalJobCompleted(scheduler.stageToInfos.keys.head)) assert(results === Map(0 -> 42)) + assertDataStructuresEmpty(scheduler) } test("run trivial job w/ dependency") { @@ -232,6 +237,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont submit(finalRdd, Array(0)) complete(taskSets(0), Seq((Success, 42))) assert(results === Map(0 -> 42)) + assertDataStructuresEmpty(scheduler) } test("cache location preferences w/ dependency") { @@ -244,12 +250,14 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont assertLocations(taskSet, Seq(Seq("hostA", "hostB"))) complete(taskSet, Seq((Success, 42))) assert(results === Map(0 -> 42)) + assertDataStructuresEmpty(scheduler) } test("trivial job failure") { submit(makeRdd(1, Nil), Array(0)) failed(taskSets(0), "some failure") assert(failure.getMessage === "Job failed: some failure") + assertDataStructuresEmpty(scheduler) } test("run trivial shuffle") { @@ -265,6 +273,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont Array(makeBlockManagerId("hostA"), makeBlockManagerId("hostB"))) complete(taskSets(1), Seq((Success, 42))) assert(results === Map(0 -> 42)) + assertDataStructuresEmpty(scheduler) } test("run trivial shuffle with fetch failure") { @@ -290,6 +299,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1.host) === Array("hostA", "hostB")) complete(taskSets(3), Seq((Success, 43))) assert(results === Map(0 -> 42, 1 -> 43)) + assertDataStructuresEmpty(scheduler) } test("ignore late map task completions") { @@ -318,6 +328,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont Array(makeBlockManagerId("hostB"), makeBlockManagerId("hostA"))) complete(taskSets(1), Seq((Success, 42), (Success, 43))) assert(results === Map(0 -> 42, 1 -> 43)) + assertDataStructuresEmpty(scheduler) } test("run trivial shuffle with out-of-band failure and retry") { @@ -340,6 +351,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont Array(makeBlockManagerId("hostC"), makeBlockManagerId("hostB"))) complete(taskSets(2), Seq((Success, 42))) assert(results === Map(0 -> 42)) + assertDataStructuresEmpty(scheduler) } test("recursive shuffle failures") { @@ -368,6 +380,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont complete(taskSets(4), Seq((Success, makeMapStatus("hostA", 1)))) complete(taskSets(5), Seq((Success, 42))) assert(results === Map(0 -> 42)) + assertDataStructuresEmpty(scheduler) } test("cached post-shuffle") { @@ -399,6 +412,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont complete(taskSets(3), Seq((Success, makeMapStatus("hostD", 1)))) complete(taskSets(4), Seq((Success, 42))) assert(results === Map(0 -> 42)) + assertDataStructuresEmpty(scheduler) } /** @@ -418,4 +432,18 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont private def makeBlockManagerId(host: String): BlockManagerId = BlockManagerId("exec-" + host, host, 12345, 0) + private def assertDataStructuresEmpty(d: DAGScheduler) = { + assert(d.pendingTasks.isEmpty) + assert(d.activeJobs.isEmpty) + assert(d.failed.isEmpty) + assert(d.idToActiveJob.isEmpty) + assert(d.jobIdToStageIds.isEmpty) + assert(d.stageIdToJobIds.isEmpty) + assert(d.stageIdToStage.isEmpty) + assert(d.stageToInfos.isEmpty) + assert(d.resultStageToJob.isEmpty) + assert(d.running.isEmpty) +// assert(d.shuffleToMapStage.isEmpty) + assert(d.waiting.isEmpty) + } } From d87092263207d9d1bd9a3c5cc00210329f8e8b89 Mon Sep 17 00:00:00 2001 From: Mark Hamstra Date: Mon, 26 Aug 2013 16:47:19 -0700 Subject: [PATCH 5/9] Clean up shuffleToMapStages Fixed SPARK-864 -- don't submit a stage that no active job needs --- .../org/apache/spark/MapOutputTracker.scala | 10 ++- .../apache/spark/scheduler/DAGScheduler.scala | 84 +++++++++++++------ .../spark/scheduler/DAGSchedulerSuite.scala | 2 +- 3 files changed, 67 insertions(+), 29 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index ae7cf2a893..55e5749260 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -235,6 +235,10 @@ private[spark] class MapOutputTracker extends Logging { } } + def has(shuffleId: Int): Boolean = { + cachedSerializedStatuses.get(shuffleId).isDefined || mapStatuses.contains(shuffleId) + } + def getSerializedLocations(shuffleId: Int): Array[Byte] = { var statuses: Array[MapStatus] = null var epochGotten: Long = -1 @@ -247,12 +251,12 @@ private[spark] class MapOutputTracker extends Logging { case Some(bytes) => return bytes case None => - statuses = mapStatuses(shuffleId) + statuses = mapStatuses.getOrElse(shuffleId, Array[MapStatus]()) epochGotten = epoch } } // If we got here, we failed to find the serialized locations in the cache, so we pulled - // out a snapshot of the locations as "locs"; let's serialize and return that + // out a snapshot of the locations as "statuses"; let's serialize and return that val bytes = serializeStatuses(statuses) logInfo("Size of output statuses for shuffle %d is %d bytes".format(shuffleId, bytes.length)) // Add them into the table only if the epoch hasn't changed while we were working @@ -261,7 +265,7 @@ private[spark] class MapOutputTracker extends Logging { cachedSerializedStatuses(shuffleId) = bytes } } - return bytes + bytes } // Serialize an array of map output locations into an efficient byte format so that we can send diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index b97ab5536f..15cc9a1d97 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -183,7 +183,7 @@ class DAGScheduler( shuffleToMapStage.get(shuffleDep.shuffleId) match { case Some(stage) => stage case None => - val stage = newStage(shuffleDep.rdd, Some(shuffleDep), jobId) + val stage = newOrUsedStage(shuffleDep.rdd, shuffleDep, jobId) shuffleToMapStage(shuffleDep.shuffleId) = stage stage } @@ -201,12 +201,6 @@ class DAGScheduler( callSite: Option[String] = None) : Stage = { - if (shuffleDep != None) { - // Kind of ugly: need to register RDDs with the cache and map output tracker here - // since we can't do it in the RDD constructor because # of partitions is unknown - logInfo("Registering RDD " + rdd.id + " (" + rdd.origin + ")") - mapOutputTracker.registerShuffle(shuffleDep.get.shuffleId, rdd.partitions.size) - } val id = nextStageId.getAndIncrement() val stage = new Stage(id, rdd, shuffleDep, getParentStages(rdd, jobId), jobId, callSite) stageIdToStage(id) = stage @@ -215,6 +209,28 @@ class DAGScheduler( stage } + private def newOrUsedStage( + rdd: RDD[_], + shuffleDep: ShuffleDependency[_,_], + jobId: Int, + callSite: Option[String] = None) + : Stage = + { + val stage = newStage(rdd, Some(shuffleDep), jobId, callSite) + val serLocs = mapOutputTracker.getSerializedLocations(shuffleDep.shuffleId) + val locs = mapOutputTracker.deserializeStatuses(serLocs) + if (mapOutputTracker.has(shuffleDep.shuffleId)) { + for (i <- 0 until locs.size) stage.outputLocs(i) = List(locs(i)) + stage.numAvailableOutputs = locs.size + } else { + // Kind of ugly: need to register RDDs with the cache and map output tracker here + // since we can't do it in the RDD constructor because # of partitions is unknown + logInfo("Registering RDD " + rdd.id + " (" + rdd.origin + ")") + mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.size) + } + stage + } + /** * Get or create the list of parent stages for a given RDD. The stages will be assigned the * provided jobId if they haven't already been created with a lower jobId. @@ -293,6 +309,7 @@ class DAGScheduler( // data structures based on Stage stageIdToStage.get(stageId).foreach { s => { stageToInfos -= s + shuffleToMapStage.keys.filter(shuffleToMapStage(_) == s).foreach(shuffleToMapStage.remove(_)) if (pendingTasks.contains(s)) { logError("Tasks still pending for stage %d even though there are no more jobs registered for that stage." .format(stageId)) @@ -583,27 +600,41 @@ class DAGScheduler( } } + private def activeJobForStage(stage: Stage): Option[Int] = { + if (stageIdToJobIds.contains(stage.id)) { + val jobsThatUseStage: Array[Int] = stageIdToJobIds(stage.id).toArray.sorted + jobsThatUseStage.find(idToActiveJob.contains(_)) + } else { + None + } + } + /** Submits stage, but first recursively submits any missing parents. */ private def submitStage(stage: Stage) { - logDebug("submitStage(" + stage + ")") - if (!waiting(stage) && !running(stage) && !failed(stage)) { - val missing = getMissingParentStages(stage).sortBy(_.id) - logDebug("missing: " + missing) - if (missing == Nil) { - logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents") - submitMissingTasks(stage) - running += stage - } else { - for (parent <- missing) { - submitStage(parent) + val jobId = activeJobForStage(stage) + if (jobId.isDefined) { + logDebug("submitStage(" + stage + ")") + if (!waiting(stage) && !running(stage) && !failed(stage)) { + val missing = getMissingParentStages(stage).sortBy(_.id) + logDebug("missing: " + missing) + if (missing == Nil) { + logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents") + submitMissingTasks(stage, jobId.get) + running += stage + } else { + for (parent <- missing) { + submitStage(parent) + } + waiting += stage } - waiting += stage } + } else { + abortStage(stage, "No active job for stage " + stage.id) } } /** Called when stage's parents are available and we can now do its task. */ - private def submitMissingTasks(stage: Stage) { + private def submitMissingTasks(stage: Stage, jobId: Int) { logDebug("submitMissingTasks(" + stage + ")") // Get our pending tasks and remember them in our pendingTasks entry val myPending = pendingTasks.getOrElseUpdate(stage, new HashSet) @@ -625,7 +656,7 @@ class DAGScheduler( } // must be run listener before possible NotSerializableException // should be "StageSubmitted" first and then "JobEnded" - val properties = idToActiveJob(stage.jobId).properties + val properties = idToActiveJob(jobId).properties listenerBus.post(SparkListenerStageSubmitted(stage, tasks.size, properties)) if (tasks.size > 0) { @@ -668,7 +699,7 @@ class DAGScheduler( def markStageAsFinished(stage: Stage) = { val serviceTime = stage.submissionTime match { case Some(t) => "%.03f".format((System.currentTimeMillis() - t) / 1000.0) - case _ => "Unkown" + case _ => "Unknown" } logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime)) stage.completionTime = Some(System.currentTimeMillis) @@ -733,7 +764,7 @@ class DAGScheduler( changeEpoch = true) } clearCacheLocs() - if (stage.outputLocs.count(_ == Nil) != 0) { + if (stage.outputLocs.exists(_ == Nil)) { // Some tasks had failed; let's resubmit this stage // TODO: Lower-level scheduler should also deal with this logInfo("Resubmitting " + stage + " (" + stage.name + @@ -750,9 +781,12 @@ class DAGScheduler( } waiting --= newlyRunnable running ++= newlyRunnable - for (stage <- newlyRunnable.sortBy(_.id)) { + for { + stage <- newlyRunnable.sortBy(_.id) + jobId <- activeJobForStage(stage) + } { logInfo("Submitting " + stage + " (" + stage.rdd + "), which is now runnable") - submitMissingTasks(stage) + submitMissingTasks(stage, jobId) } } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 178c5b9f3b..b751a9803e 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -443,7 +443,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont assert(d.stageToInfos.isEmpty) assert(d.resultStageToJob.isEmpty) assert(d.running.isEmpty) -// assert(d.shuffleToMapStage.isEmpty) + assert(d.shuffleToMapStage.isEmpty) assert(d.waiting.isEmpty) } } From d7479c97b94cbae4fe303f0031799043a22ac014 Mon Sep 17 00:00:00 2001 From: Mark Hamstra Date: Tue, 27 Aug 2013 16:49:23 -0700 Subject: [PATCH 6/9] Cleanup and comments --- .../apache/spark/scheduler/DAGScheduler.scala | 30 +++++++----- .../spark/scheduler/DAGSchedulerSuite.scala | 49 +++++++++---------- 2 files changed, 43 insertions(+), 36 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 15cc9a1d97..293c6dda8d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -192,7 +192,8 @@ class DAGScheduler( /** * Create a Stage for the given RDD, either as a shuffle map stage (for a ShuffleDependency) or * as a result stage for the final RDD used directly in an action. The stage will also be - * associated with the provided jobId. + * associated with the provided jobId. Shuffle map stages whose shuffleId may have previously + * been registered in the MapOutputTracker should be (re)-created using newOrUsedStage. */ private def newStage( rdd: RDD[_], @@ -209,6 +210,12 @@ class DAGScheduler( stage } + /** + * Create a shuffle map Stage for the given RDD. The stage will also be associated with the + * provided jobId. If a stage for the shuffleId existed previously so that the shuffleId is + * present in the MapOutputTracker, then the number and location of available outputs are + * recovered from the MapOutputTracker + */ private def newOrUsedStage( rdd: RDD[_], shuffleDep: ShuffleDependency[_,_], @@ -217,9 +224,9 @@ class DAGScheduler( : Stage = { val stage = newStage(rdd, Some(shuffleDep), jobId, callSite) - val serLocs = mapOutputTracker.getSerializedLocations(shuffleDep.shuffleId) - val locs = mapOutputTracker.deserializeStatuses(serLocs) if (mapOutputTracker.has(shuffleDep.shuffleId)) { + val serLocs = mapOutputTracker.getSerializedLocations(shuffleDep.shuffleId) + val locs = mapOutputTracker.deserializeStatuses(serLocs) for (i <- 0 until locs.size) stage.outputLocs(i) = List(locs(i)) stage.numAvailableOutputs = locs.size } else { @@ -307,7 +314,7 @@ class DAGScheduler( private def jobIdToStageIdsRemove(jobId: Int) { def removeStage(stageId: Int) { // data structures based on Stage - stageIdToStage.get(stageId).foreach { s => { + stageIdToStage.get(stageId).foreach { s => stageToInfos -= s shuffleToMapStage.keys.filter(shuffleToMapStage(_) == s).foreach(shuffleToMapStage.remove(_)) if (pendingTasks.contains(s)) { @@ -330,7 +337,7 @@ class DAGScheduler( .format(stageId)) failed -= s } - }} + } // data structures based on StageId stageIdToStage -= stageId stageIdToJobIds -= stageId @@ -346,7 +353,7 @@ class DAGScheduler( logError("No stages registered for job " + jobId) } else { stageIdToJobIds.filterKeys(stageId => registeredStages.contains(stageId)).foreach { - case (stageId, jobSet) => { + case (stageId, jobSet) => if (!jobSet.contains(jobId)) { logError("Job %d not registered for stage %d even though that stage was registered for the job" .format(jobId, stageId)) @@ -356,8 +363,8 @@ class DAGScheduler( if (jobSet.isEmpty) { // nobody needs this stage any more removeStage(stageId) } - } - }} + } + } jobIdToStageIds -= jobId } } @@ -478,9 +485,9 @@ class DAGScheduler( handleTaskCompletion(completion) case LocalJobCompleted(stage) => - stageIdToJobIds -= stage.id - stageIdToStage -= stage.id - stageToInfos -= stage + stageIdToJobIds -= stage.id // clean up data structures that were populated for a local job, + stageIdToStage -= stage.id // but that won't get cleaned up via tha normal path through + stageToInfos -= stage // completion events or stage abort case TaskSetFailed(taskSet, reason) => abortStage(stageIdToStage(taskSet.stageId), reason) @@ -600,6 +607,7 @@ class DAGScheduler( } } + /** Finds the earliest-created active job that needs the stage */ private def activeJobForStage(stage: Stage): Option[Int] = { if (stageIdToJobIds.contains(stage.id)) { val jobsThatUseStage: Array[Int] = stageIdToJobIds(stage.id).toArray.sorted diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index b751a9803e..4688658068 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -205,7 +205,6 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont } submit(rdd, Array(), listener = fakeListener) assert(numResults === 0) -// assertDataStructuresEmpty(scheduler) } test("run trivial job") { @@ -213,7 +212,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont submit(rdd, Array(0)) complete(taskSets(0), List((Success, 42))) assert(results === Map(0 -> 42)) - assertDataStructuresEmpty(scheduler) + assertDataStructuresEmpty } test("local job") { @@ -228,7 +227,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont assert(scheduler.stageToInfos.size === 1) runEvent(LocalJobCompleted(scheduler.stageToInfos.keys.head)) assert(results === Map(0 -> 42)) - assertDataStructuresEmpty(scheduler) + assertDataStructuresEmpty } test("run trivial job w/ dependency") { @@ -237,7 +236,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont submit(finalRdd, Array(0)) complete(taskSets(0), Seq((Success, 42))) assert(results === Map(0 -> 42)) - assertDataStructuresEmpty(scheduler) + assertDataStructuresEmpty } test("cache location preferences w/ dependency") { @@ -250,14 +249,14 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont assertLocations(taskSet, Seq(Seq("hostA", "hostB"))) complete(taskSet, Seq((Success, 42))) assert(results === Map(0 -> 42)) - assertDataStructuresEmpty(scheduler) + assertDataStructuresEmpty } test("trivial job failure") { submit(makeRdd(1, Nil), Array(0)) failed(taskSets(0), "some failure") assert(failure.getMessage === "Job failed: some failure") - assertDataStructuresEmpty(scheduler) + assertDataStructuresEmpty } test("run trivial shuffle") { @@ -273,7 +272,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont Array(makeBlockManagerId("hostA"), makeBlockManagerId("hostB"))) complete(taskSets(1), Seq((Success, 42))) assert(results === Map(0 -> 42)) - assertDataStructuresEmpty(scheduler) + assertDataStructuresEmpty } test("run trivial shuffle with fetch failure") { @@ -299,7 +298,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1.host) === Array("hostA", "hostB")) complete(taskSets(3), Seq((Success, 43))) assert(results === Map(0 -> 42, 1 -> 43)) - assertDataStructuresEmpty(scheduler) + assertDataStructuresEmpty } test("ignore late map task completions") { @@ -328,7 +327,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont Array(makeBlockManagerId("hostB"), makeBlockManagerId("hostA"))) complete(taskSets(1), Seq((Success, 42), (Success, 43))) assert(results === Map(0 -> 42, 1 -> 43)) - assertDataStructuresEmpty(scheduler) + assertDataStructuresEmpty } test("run trivial shuffle with out-of-band failure and retry") { @@ -351,7 +350,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont Array(makeBlockManagerId("hostC"), makeBlockManagerId("hostB"))) complete(taskSets(2), Seq((Success, 42))) assert(results === Map(0 -> 42)) - assertDataStructuresEmpty(scheduler) + assertDataStructuresEmpty } test("recursive shuffle failures") { @@ -380,7 +379,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont complete(taskSets(4), Seq((Success, makeMapStatus("hostA", 1)))) complete(taskSets(5), Seq((Success, 42))) assert(results === Map(0 -> 42)) - assertDataStructuresEmpty(scheduler) + assertDataStructuresEmpty } test("cached post-shuffle") { @@ -412,7 +411,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont complete(taskSets(3), Seq((Success, makeMapStatus("hostD", 1)))) complete(taskSets(4), Seq((Success, 42))) assert(results === Map(0 -> 42)) - assertDataStructuresEmpty(scheduler) + assertDataStructuresEmpty } /** @@ -432,18 +431,18 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont private def makeBlockManagerId(host: String): BlockManagerId = BlockManagerId("exec-" + host, host, 12345, 0) - private def assertDataStructuresEmpty(d: DAGScheduler) = { - assert(d.pendingTasks.isEmpty) - assert(d.activeJobs.isEmpty) - assert(d.failed.isEmpty) - assert(d.idToActiveJob.isEmpty) - assert(d.jobIdToStageIds.isEmpty) - assert(d.stageIdToJobIds.isEmpty) - assert(d.stageIdToStage.isEmpty) - assert(d.stageToInfos.isEmpty) - assert(d.resultStageToJob.isEmpty) - assert(d.running.isEmpty) - assert(d.shuffleToMapStage.isEmpty) - assert(d.waiting.isEmpty) + private def assertDataStructuresEmpty = { + assert(scheduler.pendingTasks.isEmpty) + assert(scheduler.activeJobs.isEmpty) + assert(scheduler.failed.isEmpty) + assert(scheduler.idToActiveJob.isEmpty) + assert(scheduler.jobIdToStageIds.isEmpty) + assert(scheduler.stageIdToJobIds.isEmpty) + assert(scheduler.stageIdToStage.isEmpty) + assert(scheduler.stageToInfos.isEmpty) + assert(scheduler.resultStageToJob.isEmpty) + assert(scheduler.running.isEmpty) + assert(scheduler.shuffleToMapStage.isEmpty) + assert(scheduler.waiting.isEmpty) } } From 6d54cf7f3f9b50a0082d53b91c8d33710440062a Mon Sep 17 00:00:00 2001 From: Mark Hamstra Date: Sat, 31 Aug 2013 17:08:32 -0700 Subject: [PATCH 7/9] Minor cleanup --- .../apache/spark/scheduler/DAGScheduler.scala | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 293c6dda8d..517650395e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -192,8 +192,8 @@ class DAGScheduler( /** * Create a Stage for the given RDD, either as a shuffle map stage (for a ShuffleDependency) or * as a result stage for the final RDD used directly in an action. The stage will also be - * associated with the provided jobId. Shuffle map stages whose shuffleId may have previously - * been registered in the MapOutputTracker should be (re)-created using newOrUsedStage. + * associated with the provided jobId. Shuffle map stages, whose shuffleId may have previously + * been registered in the MapOutputTracker, should be (re)-created using newOrUsedStage. */ private def newStage( rdd: RDD[_], @@ -289,6 +289,10 @@ class DAGScheduler( missing.toList } + /** + * Registers the given jobId among the jobs that need the given stage and + * all of that stage's ancestors. + */ private def registerJobIdWithStages(jobId: Int, stage: Stage) { def registerJobIdWithStageList(stages: List[Stage]) { if (!stages.isEmpty) { @@ -359,9 +363,9 @@ class DAGScheduler( .format(jobId, stageId)) } else { jobSet -= jobId - } - if (jobSet.isEmpty) { // nobody needs this stage any more - removeStage(stageId) + if (jobSet.isEmpty) { // nobody needs this stage anymore + removeStage(stageId) + } } } } @@ -486,7 +490,7 @@ class DAGScheduler( case LocalJobCompleted(stage) => stageIdToJobIds -= stage.id // clean up data structures that were populated for a local job, - stageIdToStage -= stage.id // but that won't get cleaned up via tha normal path through + stageIdToStage -= stage.id // but that won't get cleaned up via the normal paths through stageToInfos -= stage // completion events or stage abort case TaskSetFailed(taskSet, reason) => From 984f8c7030d60b6d05addb38742cd06e295ededa Mon Sep 17 00:00:00 2001 From: Mark Hamstra Date: Sun, 1 Sep 2013 21:29:07 -0700 Subject: [PATCH 8/9] pendingTasks was never getting completely cleaned up. Fixed that. --- .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 517650395e..e8ecbeb08a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -321,11 +321,11 @@ class DAGScheduler( stageIdToStage.get(stageId).foreach { s => stageToInfos -= s shuffleToMapStage.keys.filter(shuffleToMapStage(_) == s).foreach(shuffleToMapStage.remove(_)) - if (pendingTasks.contains(s)) { + if (pendingTasks.contains(s) && !pendingTasks(s).isEmpty) { logError("Tasks still pending for stage %d even though there are no more jobs registered for that stage." .format(stageId)) - pendingTasks -= s } + pendingTasks -= s if (waiting.contains(s)) { logError("Still waiting on stage %d even though there are no more jobs registered for that stage." .format(stageId)) From fff651d0ba0a810b2e122d0fbaf32be14652d8d6 Mon Sep 17 00:00:00 2001 From: Mark Hamstra Date: Thu, 5 Sep 2013 10:01:03 -0700 Subject: [PATCH 9/9] Added comment about ActiveJob priorities --- .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index e8ecbeb08a..dece3a93d7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -612,6 +612,10 @@ class DAGScheduler( } /** Finds the earliest-created active job that needs the stage */ + // TODO: Probably should actually find among the active jobs that need this + // stage the one with the highest priority (highest-priority pool, earliest created). + // That should take care of at least part of the priority inversion problem with + // cross-job dependencies. private def activeJobForStage(stage: Stage): Option[Int] = { if (stageIdToJobIds.contains(stage.id)) { val jobsThatUseStage: Array[Int] = stageIdToJobIds(stage.id).toArray.sorted