Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Renamed 'priority' to 'jobId' and assorted minor changes #844

Merged
merged 2 commits into from
Aug 20, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/scala/spark/scheduler/ActiveJob.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import java.util.Properties
* Tracks information about an active job in the DAGScheduler.
*/
private[spark] class ActiveJob(
val runId: Int,
val jobId: Int,
val finalStage: Stage,
val func: (TaskContext, Iterator[_]) => _,
val partitions: Array[Int],
Expand Down
84 changes: 42 additions & 42 deletions core/src/main/scala/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,11 @@ class DAGScheduler(

private val eventQueue = new LinkedBlockingQueue[DAGSchedulerEvent]

val nextRunId = new AtomicInteger(0)
val nextJobId = new AtomicInteger(0)

val nextStageId = new AtomicInteger(0)

val idToStage = new TimeStampedHashMap[Int, Stage]
val stageIdToStage = new TimeStampedHashMap[Int, Stage]

val shuffleToMapStage = new TimeStampedHashMap[Int, Stage]

Expand Down Expand Up @@ -171,28 +171,28 @@ class DAGScheduler(

/**
* Get or create a shuffle map stage for the given shuffle dependency's map side.
* The priority value passed in will be used if the stage doesn't already exist with
* a lower priority (we assume that priorities always increase across jobs for now).
* The jobId value passed in will be used if the stage doesn't already exist with
* a lower jobId (jobId always increases across jobs.)
*/
private def getShuffleMapStage(shuffleDep: ShuffleDependency[_,_], priority: Int): Stage = {
private def getShuffleMapStage(shuffleDep: ShuffleDependency[_,_], jobId: Int): Stage = {
shuffleToMapStage.get(shuffleDep.shuffleId) match {
case Some(stage) => stage
case None =>
val stage = newStage(shuffleDep.rdd, Some(shuffleDep), priority)
val stage = newStage(shuffleDep.rdd, Some(shuffleDep), jobId)
shuffleToMapStage(shuffleDep.shuffleId) = stage
stage
}
}

/**
* Create a Stage for the given RDD, either as a shuffle map stage (for a ShuffleDependency) or
* as a result stage for the final RDD used directly in an action. The stage will also be given
* the provided priority.
* as a result stage for the final RDD used directly in an action. The stage will also be
* associated with the provided jobId.
*/
private def newStage(
rdd: RDD[_],
shuffleDep: Option[ShuffleDependency[_,_]],
priority: Int,
jobId: Int,
callSite: Option[String] = None)
: Stage =
{
Expand All @@ -203,17 +203,17 @@ class DAGScheduler(
mapOutputTracker.registerShuffle(shuffleDep.get.shuffleId, rdd.partitions.size)
}
val id = nextStageId.getAndIncrement()
val stage = new Stage(id, rdd, shuffleDep, getParentStages(rdd, priority), priority, callSite)
idToStage(id) = stage
val stage = new Stage(id, rdd, shuffleDep, getParentStages(rdd, jobId), jobId, callSite)
stageIdToStage(id) = stage
stageToInfos(stage) = StageInfo(stage)
stage
}

/**
* Get or create the list of parent stages for a given RDD. The stages will be assigned the
* provided priority if they haven't already been created with a lower priority.
* provided jobId if they haven't already been created with a lower jobId.
*/
private def getParentStages(rdd: RDD[_], priority: Int): List[Stage] = {
private def getParentStages(rdd: RDD[_], jobId: Int): List[Stage] = {
val parents = new HashSet[Stage]
val visited = new HashSet[RDD[_]]
def visit(r: RDD[_]) {
Expand All @@ -224,7 +224,7 @@ class DAGScheduler(
for (dep <- r.dependencies) {
dep match {
case shufDep: ShuffleDependency[_,_] =>
parents += getShuffleMapStage(shufDep, priority)
parents += getShuffleMapStage(shufDep, jobId)
case _ =>
visit(dep.rdd)
}
Expand All @@ -245,7 +245,7 @@ class DAGScheduler(
for (dep <- rdd.dependencies) {
dep match {
case shufDep: ShuffleDependency[_,_] =>
val mapStage = getShuffleMapStage(shufDep, stage.priority)
val mapStage = getShuffleMapStage(shufDep, stage.jobId)
if (!mapStage.isAvailable) {
missing += mapStage
}
Expand Down Expand Up @@ -282,7 +282,7 @@ class DAGScheduler(
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
val toSubmit = JobSubmitted(finalRdd, func2, partitions.toArray, allowLocal, callSite, waiter,
properties)
return (toSubmit, waiter)
(toSubmit, waiter)
}

def runJob[T, U: ClassManifest](
Expand Down Expand Up @@ -329,8 +329,8 @@ class DAGScheduler(
val listener = new ApproximateActionListener(rdd, func, evaluator, timeout)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
val partitions = (0 until rdd.partitions.size).toArray
eventQueue.put(JobSubmitted(rdd, func2, partitions, false, callSite, listener, properties))
return listener.awaitResult() // Will throw an exception if the job fails
eventQueue.put(JobSubmitted(rdd, func2, partitions, allowLocal = false, callSite, listener, properties))
listener.awaitResult() // Will throw an exception if the job fails
}

/**
Expand All @@ -340,11 +340,11 @@ class DAGScheduler(
private[scheduler] def processEvent(event: DAGSchedulerEvent): Boolean = {
event match {
case JobSubmitted(finalRDD, func, partitions, allowLocal, callSite, listener, properties) =>
val runId = nextRunId.getAndIncrement()
val finalStage = newStage(finalRDD, None, runId, Some(callSite))
val job = new ActiveJob(runId, finalStage, func, partitions, callSite, listener, properties)
val jobId = nextJobId.getAndIncrement()
val finalStage = newStage(finalRDD, None, jobId, Some(callSite))
val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)
clearCacheLocs()
logInfo("Got job " + job.runId + " (" + callSite + ") with " + partitions.length +
logInfo("Got job " + job.jobId + " (" + callSite + ") with " + partitions.length +
" output partitions (allowLocal=" + allowLocal + ")")
logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
logInfo("Parents of final stage: " + finalStage.parents)
Expand All @@ -354,7 +354,7 @@ class DAGScheduler(
runLocally(job)
} else {
listenerBus.post(SparkListenerJobStart(job, properties))
idToActiveJob(runId) = job
idToActiveJob(jobId) = job
activeJobs += job
resultStageToJob(finalStage) = job
submitStage(finalStage)
Expand All @@ -375,7 +375,7 @@ class DAGScheduler(
handleTaskCompletion(completion)

case TaskSetFailed(taskSet, reason) =>
abortStage(idToStage(taskSet.stageId), reason)
abortStage(stageIdToStage(taskSet.stageId), reason)

case StopDAGScheduler =>
// Cancel any active jobs
Expand All @@ -386,7 +386,7 @@ class DAGScheduler(
}
return true
}
return false
false
}

/**
Expand All @@ -398,7 +398,7 @@ class DAGScheduler(
clearCacheLocs()
val failed2 = failed.toArray
failed.clear()
for (stage <- failed2.sortBy(_.priority)) {
for (stage <- failed2.sortBy(_.jobId)) {
submitStage(stage)
}
}
Expand All @@ -416,7 +416,7 @@ class DAGScheduler(
logTrace("failed: " + failed)
val waiting2 = waiting.toArray
waiting.clear()
for (stage <- waiting2.sortBy(_.priority)) {
for (stage <- waiting2.sortBy(_.jobId)) {
submitStage(stage)
}
}
Expand Down Expand Up @@ -463,7 +463,7 @@ class DAGScheduler(
*/
protected def runLocally(job: ActiveJob) {
logInfo("Computing the requested partition locally")
new Thread("Local computation of job " + job.runId) {
new Thread("Local computation of job " + job.jobId) {
override def run() {
runLocallyWithinThread(job)
}
Expand Down Expand Up @@ -531,7 +531,7 @@ class DAGScheduler(
}
// must be run listener before possible NotSerializableException
// should be "StageSubmitted" first and then "JobEnded"
val properties = idToActiveJob(stage.priority).properties
val properties = idToActiveJob(stage.jobId).properties
listenerBus.post(SparkListenerStageSubmitted(stage, tasks.size, properties))

if (tasks.size > 0) {
Expand All @@ -552,7 +552,7 @@ class DAGScheduler(
myPending ++= tasks
logDebug("New pending tasks: " + myPending)
taskSched.submitTasks(
new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.priority, properties))
new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
if (!stage.submissionTime.isDefined) {
stage.submissionTime = Some(System.currentTimeMillis())
}
Expand All @@ -569,7 +569,7 @@ class DAGScheduler(
*/
private def handleTaskCompletion(event: CompletionEvent) {
val task = event.task
val stage = idToStage(task.stageId)
val stage = stageIdToStage(task.stageId)

def markStageAsFinished(stage: Stage) = {
val serviceTime = stage.submissionTime match {
Expand Down Expand Up @@ -598,7 +598,7 @@ class DAGScheduler(
job.numFinished += 1
// If the whole job has finished, remove it
if (job.numFinished == job.numPartitions) {
idToActiveJob -= stage.priority
idToActiveJob -= stage.jobId
activeJobs -= job
resultStageToJob -= stage
markStageAsFinished(stage)
Expand Down Expand Up @@ -635,7 +635,7 @@ class DAGScheduler(
mapOutputTracker.registerMapOutputs(
stage.shuffleDep.get.shuffleId,
stage.outputLocs.map(list => if (list.isEmpty) null else list.head).toArray,
true)
changeEpoch = true)
}
clearCacheLocs()
if (stage.outputLocs.count(_ == Nil) != 0) {
Expand Down Expand Up @@ -669,7 +669,7 @@ class DAGScheduler(

case FetchFailed(bmAddress, shuffleId, mapId, reduceId) =>
// Mark the stage that the reducer was in as unrunnable
val failedStage = idToStage(task.stageId)
val failedStage = stageIdToStage(task.stageId)
running -= failedStage
failed += failedStage
// TODO: Cancel running tasks in the stage
Expand Down Expand Up @@ -697,7 +697,7 @@ class DAGScheduler(

case other =>
// Unrecognized failure - abort all jobs depending on this stage
abortStage(idToStage(task.stageId), task + " failed: " + other)
abortStage(stageIdToStage(task.stageId), task + " failed: " + other)
}
}

Expand All @@ -718,7 +718,7 @@ class DAGScheduler(
for ((shuffleId, stage) <- shuffleToMapStage) {
stage.removeOutputsOnExecutor(execId)
val locs = stage.outputLocs.map(list => if (list.isEmpty) null else list.head).toArray
mapOutputTracker.registerMapOutputs(shuffleId, locs, true)
mapOutputTracker.registerMapOutputs(shuffleId, locs, changeEpoch = true)
}
if (shuffleToMapStage.isEmpty) {
mapOutputTracker.incrementEpoch()
Expand Down Expand Up @@ -750,7 +750,7 @@ class DAGScheduler(
val error = new SparkException("Job failed: " + reason)
job.listener.jobFailed(error)
listenerBus.post(SparkListenerJobEnd(job, JobFailed(error, Some(failedStage))))
idToActiveJob -= resultStage.priority
idToActiveJob -= resultStage.jobId
activeJobs -= job
resultStageToJob -= resultStage
}
Expand All @@ -774,7 +774,7 @@ class DAGScheduler(
for (dep <- rdd.dependencies) {
dep match {
case shufDep: ShuffleDependency[_,_] =>
val mapStage = getShuffleMapStage(shufDep, stage.priority)
val mapStage = getShuffleMapStage(shufDep, stage.jobId)
if (!mapStage.isAvailable) {
visitedStages += mapStage
visit(mapStage.rdd)
Expand Down Expand Up @@ -812,13 +812,13 @@ class DAGScheduler(
}
case _ =>
})
return Nil
Nil
}

private def cleanup(cleanupTime: Long) {
var sizeBefore = idToStage.size
idToStage.clearOldValues(cleanupTime)
logInfo("idToStage " + sizeBefore + " --> " + idToStage.size)
var sizeBefore = stageIdToStage.size
stageIdToStage.clearOldValues(cleanupTime)
logInfo("stageIdToStage " + sizeBefore + " --> " + stageIdToStage.size)

sizeBefore = shuffleToMapStage.size
shuffleToMapStage.clearOldValues(cleanupTime)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler) extends
})

metricRegistry.register(MetricRegistry.name("job", "allJobs", "number"), new Gauge[Int] {
override def getValue: Int = dagScheduler.nextRunId.get()
override def getValue: Int = dagScheduler.nextJobId.get()
})

metricRegistry.register(MetricRegistry.name("job", "activeJobs", "number"), new Gauge[Int] {
Expand Down
24 changes: 12 additions & 12 deletions core/src/main/scala/spark/scheduler/JobLogger.scala
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
stageIDToJobID.get(stageID).foreach(jobID => jobLogInfo(jobID, info, withTime))

protected def buildJobDep(jobID: Int, stage: Stage) {
if (stage.priority == jobID) {
if (stage.jobId == jobID) {
jobIDToStages.get(jobID) match {
case Some(stageList) => stageList += stage
case None => val stageList = new ListBuffer[Stage]
Expand Down Expand Up @@ -178,12 +178,12 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
}else{
stageInfo = "STAGE_ID=" + stage.id + " RESULT_STAGE"
}
if (stage.priority == jobID) {
if (stage.jobId == jobID) {
jobLogInfo(jobID, indentString(indent) + stageInfo, false)
recordRddInStageGraph(jobID, stage.rdd, indent)
stage.parents.foreach(recordStageDepGraph(jobID, _, indent + 2))
} else
jobLogInfo(jobID, indentString(indent) + stageInfo + " JOB_ID=" + stage.priority, false)
jobLogInfo(jobID, indentString(indent) + stageInfo + " JOB_ID=" + stage.jobId, false)
}

// Record task metrics into job log files
Expand Down Expand Up @@ -260,16 +260,16 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {

override def onJobEnd(jobEnd: SparkListenerJobEnd) {
val job = jobEnd.job
var info = "JOB_ID=" + job.runId
var info = "JOB_ID=" + job.jobId
jobEnd.jobResult match {
case JobSucceeded => info += " STATUS=SUCCESS"
case JobFailed(exception, _) =>
info += " STATUS=FAILED REASON="
exception.getMessage.split("\\s+").foreach(info += _ + "_")
case _ =>
}
jobLogInfo(job.runId, info.substring(0, info.length - 1).toUpperCase)
closeLogWriter(job.runId)
jobLogInfo(job.jobId, info.substring(0, info.length - 1).toUpperCase)
closeLogWriter(job.jobId)
}

protected def recordJobProperties(jobID: Int, properties: Properties) {
Expand All @@ -282,11 +282,11 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
override def onJobStart(jobStart: SparkListenerJobStart) {
val job = jobStart.job
val properties = jobStart.properties
createLogWriter(job.runId)
recordJobProperties(job.runId, properties)
buildJobDep(job.runId, job.finalStage)
recordStageDep(job.runId)
recordStageDepGraph(job.runId, job.finalStage)
jobLogInfo(job.runId, "JOB_ID=" + job.runId + " STATUS=STARTED")
createLogWriter(job.jobId)
recordJobProperties(job.jobId, properties)
buildJobDep(job.jobId, job.finalStage)
recordStageDep(job.jobId)
recordStageDepGraph(job.jobId, job.finalStage)
jobLogInfo(job.jobId, "JOB_ID=" + job.jobId + " STATUS=STARTED")
}
}
7 changes: 4 additions & 3 deletions core/src/main/scala/spark/scheduler/Stage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,16 @@ import spark.storage.BlockManagerId
* initiated a job (e.g. count(), save(), etc). For shuffle map stages, we also track the nodes
* that each output partition is on.
*
* Each Stage also has a priority, which is (by default) based on the job it was submitted in.
* This allows Stages from earlier jobs to be computed first or recovered faster on failure.
* Each Stage also has a jobId, identifying the job that first submitted the stage. When FIFO
* scheduling is used, this allows Stages from earlier jobs to be computed first or recovered
* faster on failure.
*/
private[spark] class Stage(
val id: Int,
val rdd: RDD[_],
val shuffleDep: Option[ShuffleDependency[_,_]], // Output shuffle if stage is a map stage
val parents: List[Stage],
val priority: Int,
val jobId: Int,
callSite: Option[String])
extends Logging {

Expand Down