Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added stageId <--> jobId mapping in DAGScheduler #842

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Prev Previous commit
Next Next commit
Added empty data structure checks to DAGSchedulerSuite
Clean up after jobs run locally
  • Loading branch information
markhamstra committed Sep 5, 2013
commit 2eb94a0236135f8d26c366691d7b5795f7b01601
Original file line number Diff line number Diff line change
@@ -460,6 +460,11 @@ class DAGScheduler(
completion.task, completion.reason, completion.taskInfo, completion.taskMetrics))
handleTaskCompletion(completion)

case LocalJobCompleted(stage) =>
stageIdToJobIds -= stage.id
stageIdToStage -= stage.id
stageToInfos -= stage

case TaskSetFailed(taskSet, reason) =>
abortStage(stageIdToStage(taskSet.stageId), reason)

@@ -573,6 +578,8 @@ class DAGScheduler(
} catch {
case e: Exception =>
job.listener.jobFailed(e)
} finally {
eventQueue.put(LocalJobCompleted(job.finalStage))
}
}

Original file line number Diff line number Diff line change
@@ -55,6 +55,8 @@ private[spark] case class CompletionEvent(
taskMetrics: TaskMetrics)
extends DAGSchedulerEvent

private[spark] case class LocalJobCompleted(stage: Stage) extends DAGSchedulerEvent

private[spark] case class ExecutorGained(execId: String, host: String) extends DAGSchedulerEvent

private[spark] case class ExecutorLost(execId: String) extends DAGSchedulerEvent
Original file line number Diff line number Diff line change
@@ -205,13 +205,15 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
}
submit(rdd, Array(), listener = fakeListener)
assert(numResults === 0)
// assertDataStructuresEmpty(scheduler)
}

test("run trivial job") {
val rdd = makeRdd(1, Nil)
submit(rdd, Array(0))
complete(taskSets(0), List((Success, 42)))
assert(results === Map(0 -> 42))
assertDataStructuresEmpty(scheduler)
}

test("local job") {
@@ -223,7 +225,10 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
override def toString = "DAGSchedulerSuite Local RDD"
}
runEvent(JobSubmitted(rdd, jobComputeFunc, Array(0), true, null, listener))
assert(scheduler.stageToInfos.size === 1)
runEvent(LocalJobCompleted(scheduler.stageToInfos.keys.head))
assert(results === Map(0 -> 42))
assertDataStructuresEmpty(scheduler)
}

test("run trivial job w/ dependency") {
@@ -232,6 +237,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
submit(finalRdd, Array(0))
complete(taskSets(0), Seq((Success, 42)))
assert(results === Map(0 -> 42))
assertDataStructuresEmpty(scheduler)
}

test("cache location preferences w/ dependency") {
@@ -244,12 +250,14 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
assertLocations(taskSet, Seq(Seq("hostA", "hostB")))
complete(taskSet, Seq((Success, 42)))
assert(results === Map(0 -> 42))
assertDataStructuresEmpty(scheduler)
}

test("trivial job failure") {
submit(makeRdd(1, Nil), Array(0))
failed(taskSets(0), "some failure")
assert(failure.getMessage === "Job failed: some failure")
assertDataStructuresEmpty(scheduler)
}

test("run trivial shuffle") {
@@ -265,6 +273,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
Array(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")))
complete(taskSets(1), Seq((Success, 42)))
assert(results === Map(0 -> 42))
assertDataStructuresEmpty(scheduler)
}

test("run trivial shuffle with fetch failure") {
@@ -290,6 +299,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1.host) === Array("hostA", "hostB"))
complete(taskSets(3), Seq((Success, 43)))
assert(results === Map(0 -> 42, 1 -> 43))
assertDataStructuresEmpty(scheduler)
}

test("ignore late map task completions") {
@@ -318,6 +328,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
Array(makeBlockManagerId("hostB"), makeBlockManagerId("hostA")))
complete(taskSets(1), Seq((Success, 42), (Success, 43)))
assert(results === Map(0 -> 42, 1 -> 43))
assertDataStructuresEmpty(scheduler)
}

test("run trivial shuffle with out-of-band failure and retry") {
@@ -340,6 +351,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
Array(makeBlockManagerId("hostC"), makeBlockManagerId("hostB")))
complete(taskSets(2), Seq((Success, 42)))
assert(results === Map(0 -> 42))
assertDataStructuresEmpty(scheduler)
}

test("recursive shuffle failures") {
@@ -368,6 +380,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
complete(taskSets(4), Seq((Success, makeMapStatus("hostA", 1))))
complete(taskSets(5), Seq((Success, 42)))
assert(results === Map(0 -> 42))
assertDataStructuresEmpty(scheduler)
}

test("cached post-shuffle") {
@@ -399,6 +412,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
complete(taskSets(3), Seq((Success, makeMapStatus("hostD", 1))))
complete(taskSets(4), Seq((Success, 42)))
assert(results === Map(0 -> 42))
assertDataStructuresEmpty(scheduler)
}

/**
@@ -418,4 +432,18 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
private def makeBlockManagerId(host: String): BlockManagerId =
BlockManagerId("exec-" + host, host, 12345, 0)

private def assertDataStructuresEmpty(d: DAGScheduler) = {
assert(d.pendingTasks.isEmpty)
assert(d.activeJobs.isEmpty)
assert(d.failed.isEmpty)
assert(d.idToActiveJob.isEmpty)
assert(d.jobIdToStageIds.isEmpty)
assert(d.stageIdToJobIds.isEmpty)
assert(d.stageIdToStage.isEmpty)
assert(d.stageToInfos.isEmpty)
assert(d.resultStageToJob.isEmpty)
assert(d.running.isEmpty)
// assert(d.shuffleToMapStage.isEmpty)
assert(d.waiting.isEmpty)
}
}