Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added stageId <--> jobId mapping in DAGScheduler #842

Open
wants to merge 9 commits into
base: master
Choose a base branch
from

Conversation

markhamstra
Copy link
Contributor

I'm not completely certain that this is ready to be merged yet, but I think it is ready to gather some comments.

Outside of DAGScheduler.scala, the changes are almost entirely just renaming -- I resolved the confusion of the notions of priority and job identity by consistently relabeling old, FIFO-only-based usages of "priority" to now be "jobId". The old usage of "priority" was misleading since we now have the fair scheduler infrastructure that doesn't use the old "priority" to order priority except to break what would otherwise be priority ties. Now we can speak consistently about jobs having jobIds that are unique and always increasing. Stages contain the jobId rof the job which was responsible for creating them. Again, all of that is really just renaming or thinking about things slightly differently without changing anything functionally.

There are also a few easy changes within DAGScheduler.scala that just suppress a bunch of code analyzer warnings in IntelliJ -- I got tired of looking at them.

The bigger changes in the DAGScheduler have to do with the new jobIdToStageIds and stageIdToJobIds maps and the associated methods to build them up and tear them down. (I also renamed idToStage to stageIdToStage to be more consistent and less ambiguous.) The stageIdToJobIds map gets built up as new stages are created, the jobIdToStageIds map gets sync'ed-with/built-up from stageIdToJobIds when jobs are submitted, and both maps are modified in concert when jobs are completed or aborted.

There's a fair amount of walking through entire data structures to create and maintain these maps, so I was concerned about the performance hit. I haven't rigorously tested just the relative performance of the DAGScheduler in isolation with and without these changes, but I have closely watched the total time reported by a complete run of Spark's unit tests, and there appears to be no significant difference -- a complete run takes almost 13.5 minutes when my machine is fully warmed up, and the difference with the new stageId <--> jobId mapping is only a couple of seconds. Some of that is probably because, while a lot more is done when a new stage is created, that actually results in the short-circuiting of later calls to, e.g., getShuffleMapStage.

The last bits outside the DAGScheduler have to do with SparkListenerJobStart, which now sends along an array containing all of the stageIds that the job depends on.

With all of this in place, it should now be simple for the WebUI to aggregate the appropriate stages to report progress and other information for an entire job, and it should also be easy for Ram's killJob stuff to kill all of the stages associated with a job while not killing any that are also associated with another job.

From looking at some instrumented output, I'm fairly certain that I've got things at least close to right. I don't think this is quite ready to merge yet, since the JobLogger still probably needs to be tweaked, and I need to come to an understanding of why stages seem to be left alive by the ThreadingSuite. Outside of that, this new code seems to do a nice job preventing the sizes of various DAGScheduler data structures from growing without bound, and does so while not breaking any of the unit tests.

@AmplabJenkins
Copy link

Thank you for submitting this pull request.

All automated tests for this request have passed.

Refer to this link for build results: http://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/617/

@AmplabJenkins
Copy link

Thank you for submitting this pull request.

All automated tests for this request have passed.

Refer to this link for build results: http://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/619/

@rxin
Copy link
Member

rxin commented Aug 16, 2013

Thanks for doing this, Mark.

I'd actually recommend you to separate this as two pull requests since they are fairly independent. The renaming pull request should be much easier to merge, while the other one would require a lot of testing. Note that unit test runtime by no means test the scheduler's throughput, since the runtime is dominated by failure recovery, shuffle, and other things.

I believe in the perf benchmark suite Patrick has been working on (going to open source soon or have already open sourced) includes a scheduler throughput test that just launches a large number of empty tasks. We should use that one to test the scheduler throughput here.

@markhamstra
Copy link
Contributor Author

No disagreement that just timing the unit tests isn't an adequate measure of the change in scheduler performance. I was just using what I had as a rough check that things hadn't gone horribly wrong. If Patrick's suite can be used to do some better testing, that's great.

@markhamstra
Copy link
Contributor Author

UPDATE: I broke out renaming and other minor changes into a separate pull request; so after #844 is merged, this one reduces to just the stageId <--> jobId mapping stuff.

@markhamstra
Copy link
Contributor Author

So, a few things worth talking about:

  1. This PR essentially boils down to moving at least part of the logic from the JobLogger directly into the DAGScheduler.
  2. At the moment, that work is duplicated in both DAGScheduler and JobLogger, but JobLogger should be trimmed down once we decide exactly what gets done in DAGScheduler.
  3. JobLogger originally ran in its own thread; now runs via the new async SparkListener handling, so again in a separate thread. This PR brings some of that work directly into the scheduler, with obvious performance concerns. However, if testing bears out that the performance hit is not too severe, then I think the change is worth it to not just track the jobs <--> stages relationship, but also to control the growth of DAGScheduler data structures, and doing so while keeping the concurrency issues manageable.
  4. Enough of the jobs <--> stages information needs to be sent out from the DAGScheduler to SparkListeners so that they don't need to duplicate much of the work in order to get all the information they need. Right now, I'm just sending out an array of associated StageIds within a SparkListenerJobStart, but that may not be enough information exported or the best way of making the information available to SparkListeners.
  5. Right now, this PR is maximally aggressive about cleaning up when a job completes or aborts and a stage is not part of another job. That's probably not the right thing to do if another job will soon launch and want to make use of one of the earlier job's stages. Putting the no-longer-in-a-job stages onto a queue for a while (how long?) before cleaning them out of the DAGScheduler data structures may make sense as a way to give other jobs a chance to re-use old stages.

@AmplabJenkins
Copy link

Thank you for submitting this pull request.

All automated tests for this request have passed.

Refer to this link for build results: http://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/630/

@AmplabJenkins
Copy link

Thank you for submitting this pull request.

All automated tests for this request have passed.

Refer to this link for build results: http://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/638/

@AmplabJenkins
Copy link

Thank you for submitting this pull request.

All automated tests for this request have passed.

Refer to this link for build results: http://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/674/

@AmplabJenkins
Copy link

Thank you for submitting this pull request.

All automated tests for this request have passed.

Refer to this link for build results: http://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/676/

@@ -108,6 +108,10 @@ class DAGScheduler(

val nextStageId = new AtomicInteger(0)

val jobIdToStageIds = new TimeStampedHashMap[Int, HashSet[Int]]

val stageIdToJobIds = new TimeStampedHashMap[Int, HashSet[Int]]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should call clearOldValues() on these two maps in DAGScheduler.cleanup()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@mateiz
Copy link
Member

mateiz commented Aug 22, 2013

Hey Mark, I made a few comments on the PR. Is this still a work in progress on your side as you said in the original message, or is it mostly done?

@markhamstra
Copy link
Contributor Author

Still work in progress. I'm adding "data structures are now empty" assertions at the end of the tests in DAGSchedulerSuite and need to do some work still on shuffleToMapStage in general and the handling of clean up in the zero partition and local job cases.

After that, it should be down to addressing the five points in my "things worth talking about" list.

@markhamstra
Copy link
Contributor Author

Updated. Still WIP.

@AmplabJenkins
Copy link

Thank you for submitting this pull request.

All automated tests for this request have passed.

Refer to this link for build results: http://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/697/

@AmplabJenkins
Copy link

Thank you for submitting this pull request.

All automated tests for this request have passed.

Refer to this link for build results: http://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/699/

@AmplabJenkins
Copy link

Thank you for submitting this pull request.

All automated tests for this request have passed.

Refer to this link for build results: http://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/700/

@markhamstra
Copy link
Contributor Author

UPDATED: The DAGSchedulerSuite now includes checks in all tests except the "zero split job", asserting that all DAGScheduler data structures are empty at the end of each test job. These were mostly fairly easy, but cleaning up shuffleToMapStage is considerably more involved, since it is not just internal to the DAGScheduler, but also involves external state in the MapOutputTracker.

In order to reuse map outputs that are still available for a stage that has previously been cleaned out of the DAGScheduler (and avoid re-submitting such stages), I've split DAGScheduler.newStage into two pieces: the first to initialize new stages, the second to re-use or re-constitute stages using output locations available in the mapOutputTracker.

Along the way, I also had to fix SPARK-864. That bug resulted from trying to submit stages and missing tasks for a job that is no longer active (thus failing in the idToActiveJob(stage.jobId) lookup when submitMissingTasks tries to get appropriate properties for the job.) With this PR, the properties for the earliest job associated with the stage are used, or the stage is aborted if there are no longer any active jobs associated with the stage.

With the exception of handling zero-partition RDDs and passing sufficient information to SparkListeners, I now consider this PR to be feature complete, and I'll move on to performance regression testing.

@AmplabJenkins
Copy link

Thank you for submitting this pull request.

All automated tests for this request have passed.

Refer to this link for build results: http://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/732/

@AmplabJenkins
Copy link

Thank you for submitting this pull request.

All automated tests for this request have passed.

Refer to this link for build results: http://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/733/

@AmplabJenkins
Copy link

Thank you for submitting this pull request.

Unfortunately, the automated tests for this request have failed.

Refer to this link for build results: http://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/747/

@markhamstra
Copy link
Contributor Author

I've got spark-perf numbers:

scheduling-tput
--num-tasks=10000 --num-trials=10 --inter-trial-wait=30,

baseline: 4.955, 0.156, 4.539, 4.955, 4.766
PR842: 5.016, 0.106, 4.786, 5.071, 4.892

scala-agg-by-key
--num-trials=10 --inter-trial-wait=30 --num-partitions=1 --reduce-tasks=1 --random-seed=5 --persistent-type=memory --num-records=200 --unique-keys=1 --key-length=10 --unique-values=1 --value-length=10,

baseline: 0.047, 0.002, 0.045, 0.049, 0.047
PR842: 0.049, 0.003, 0.045, 0.050, 0.049

scala-sort-by-key
--num-trials=10 --inter-trial-wait=30 --num-partitions=1 --reduce-tasks=1 --random-seed=5 --persistent-type=memory --num-records=20 --unique-keys=1 --key-length=10 --unique-values=1 --value-length=10,

baseline: 0.045, 0.004, 0.041, 0.046, 0.047
PR842: 0.046, 0.003, 0.041, 0.048, 0.052

scala-count
--num-trials=10 --inter-trial-wait=30 --num-partitions=1 --reduce-tasks=1 --random-seed=5 --persistent-type=memory --num-records=200 --unique-keys=1 --key-length=10 --unique-values=1 --value-length=10,

baseline: 0.018, 0.002, 0.017, 0.022, 0.017
PR842: 0.018, 0.003, 0.017, 0.025, 0.018

scala-count-w-fltr
--num-trials=10 --inter-trial-wait=30 --num-partitions=1 --reduce-tasks=1 --random-seed=5 --persistent-type=memory --num-records=200 --unique-keys=1 --key-length=10 --unique-values=1 --value-length=10,

baseline: 0.022, 0.002, 0.02, 0.025, 0.020
PR842: 0.024, 0.003, 0.018, 0.025, 0.020

@AmplabJenkins
Copy link

Thank you for submitting this pull request.

Unfortunately, the automated tests for this request have failed.

Refer to this link for build results: http://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/788/

@mateiz
Copy link
Member

mateiz commented Sep 1, 2013

Cool, that looks good! I'd like to wait until 0.8.0 is out to merge this but it's definitely appreciated.

@markhamstra
Copy link
Contributor Author

It's definitely getting there. I'm curious why DriverSuite is failing at least some of the time and why the Python tests are frequently complaining about tasks still pending for a stage even though there are no more jobs needing that stage. Not sure yet whether either of these are real bugs or just test artifacts. I'll look through some worker logs looking for more such logError messages, but I'll go out on a limb and claim that any such messages are as likely the result of weaknesses in existing code that the new checks are revealing as they are the result of mistakes in this PR.

Anyway, I still haven't resolved the handling of zero-partition RDDs by the DAGScheduler, so those weirdos will continue to cause data structure bloat -- but no worse than they were before, so still a large net reduction in bloat for all other circumstances. The other question is just what the JobLogger and WebUI need in order to avoid duplicating reference tracking work. Probably should be sending both jobIdToStageIds and stageIdToJobIds out to SparkListeners on job start. Each of these remaining issues can easily and without harm be resolved in follow-on pull requests, I think.

@markhamstra
Copy link
Contributor Author

Rebased and fixed some typos and grammatical niggles.

  - style fixes
  - clearOldValues for new TimeStampedHashMaps
  - log unexpected cleanup of data structures
  - cleanup stageToInfos and stageIdToLogIds
Fixed SPARK-864 -- don't submit a stage that no active job needs
@AmplabJenkins
Copy link

Thank you for submitting this pull request.

All automated tests for this request have passed.

Refer to this link for build results: http://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/837/

markhamstra added a commit to markhamstra/incubator-spark that referenced this pull request Nov 19, 2013
  ...and make sure that DAGScheduler data structures are cleaned up on job completion.
  Initial effort and discussion at mesos/spark#842

Conflicts:
	core/src/main/scala/org/apache/spark/MapOutputTracker.scala
        core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
markhamstra added a commit to markhamstra/incubator-spark that referenced this pull request Nov 22, 2013
  ...and make sure that DAGScheduler data structures are cleaned up on job completion.
  Initial effort and discussion at mesos/spark#842
markhamstra added a commit to markhamstra/incubator-spark that referenced this pull request Nov 23, 2013
  ...and make sure that DAGScheduler data structures are cleaned up on job completion.
  Initial effort and discussion at mesos/spark#842
markhamstra added a commit to markhamstra/incubator-spark that referenced this pull request Nov 25, 2013
  ...and make sure that DAGScheduler data structures are cleaned up on job completion.
  Initial effort and discussion at mesos/spark#842
markhamstra added a commit to markhamstra/incubator-spark that referenced this pull request Dec 3, 2013
  ...and make sure that DAGScheduler data structures are cleaned up on job completion.
  Initial effort and discussion at mesos/spark#842
markhamstra added a commit to markhamstra/incubator-spark that referenced this pull request Dec 6, 2013
  ...and make sure that DAGScheduler data structures are cleaned up on job completion.
  Initial effort and discussion at mesos/spark#842

Conflicts:
	core/src/main/scala/org/apache/spark/MapOutputTracker.scala
	core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
	core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
xiajunluan pushed a commit to xiajunluan/spark that referenced this pull request May 30, 2014
The example code on the configuration page currently does not compile.

Author: Andrew Or <andrewor14@gmail.com>

Closes mesos#842 from andrewor14/conf-docs and squashes the following commits:

aabff57 [Andrew Or] Correct example of creating a new SparkConf
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants