-
Notifications
You must be signed in to change notification settings - Fork 385
Spark Debugger
Summary: The Spark debugger provides replay debugging for deterministic (logic) errors in Spark programs. It's currently in development, but you can try it out in the arthur branch.
From a user's point of view, debugging a general distributed program can be tedious and confusing. Many distributed programs are nondeterministic; their outcome depends on the interleaving between computation and message passing across multiple machines. Also, the fact that a program is running on a cluster of hundreds or thousands of machines means that it's hard to understand the program state and pinpoint the location of problems.
In order to tame nondeterminism, a distributed debugger has to log a lot of information, imposing a serious performance penalty on the application being debugged.
But the Spark programming model lets us provide replay debugging for almost zero overhead. Spark programs are a series of RDDs and deterministic transformations, so when debugging a Spark program, we don't have to debug it all at once -- instead, we can debug each transformation individually. Broadly, the debugger lets us do the following two things:
- Recompute and inspect intermediate RDDs after the program has finished.
- Re-run a particular task in a single-threaded debugger to find exactly what went wrong.
For deterministic errors, debugging a Spark program is now as easy as debugging a single-threaded one.
As your Spark program runs, the slaves report key events back to the master -- for example, RDD creations, RDD contents, and uncaught exceptions. (A full list of event types is in EventLogging.scala.) The master logs those events, and you can load the event log into the debugger after your program is done running.
A note on nondeterminism: For fault recovery, Spark requires RDD transformations (for example, the function passed to RDD.map
) to be deterministic. The Spark debugger also relies on this property, and it can also warn you if your transformation is nondeterministic. This works by checksumming the contents of each RDD and comparing the checksums from the original execution to the checksums after recomputing the RDD in the debugger.
-
To turn on event logging for your program, set
$SPARK_JAVA_OPTS
inconf/spark-env.sh
as follows:export SPARK_JAVA_OPTS='-Dspark.logging.eventLog=path/to/event-log'
where
path/to/event-log
is where you want the event log to go relative to$SPARK_HOME
. -
To enable the collection of performance data, which includes average element processing time and total serialization time, set
$SPARK_JAVA_OPTS
as follows:export SPARK_JAVA_OPTS='-Dspark.logging.eventLog=path/to/event-log -Dspark.logging.measurePerformance=true'
This will probably slow your program down by 30% or so.
-
Run a Spark shell with
./spark-shell
. -
Use
EventLogReader
to load the event log as follows:spark> val r = new spark.EventLogReader(sc, Some("path/to/event-log")) r: spark.EventLogReader = spark.EventLogReader@726b37ad
Warning: If the event log doesn't exist or is unreadable, this will silently fail and
r.events
will be empty.
-
Use
r.rdds
to get a list of intermediate RDDs generated during your program's execution. An RDD with id x is located atr.rdds(x)
. For example:scala> r.rdds res8: scala.collection.mutable.ArrayBuffer[spark.RDD[_]] = ArrayBuffer(spark.HadoopRDD@fe85adf, spark.MappedRDD@5fa5eea1, spark.MappedRDD@6d5bd16, spark.ShuffledRDD@3a70f2db, spark.FlatMappedValuesRDD@4d5825d6, spark.MappedValuesRDD@561c2c45, spark.CoGroupedRDD@539e922d, spark.MappedValuesRDD@4f8ef33e, spark.FlatMappedRDD@32039440, spark.ShuffledRDD@8fa0f67, spark.MappedValuesRDD@590937cb, spark.CoGroupedRDD@6c2e1e17, spark.MappedValuesRDD@47b9af7d, spark.FlatMappedRDD@6fb05c54, spark.ShuffledRDD@237dc815, spark.MappedValuesRDD@16daece7, spark.CoGroupedRDD@7ef73d69, spark.MappedValuesRDD@19e0f99e, spark.FlatMappedRDD@1240158, spark.ShuffledRDD@62d438fd, spark.MappedValuesRDD@5ae99cbb, spark.FilteredRDD@1f30e79e, spark.MappedRDD@43b64611)
-
Use
r.printRDDs()
to get a formatted list of intermediate RDDs, along with the source location where they were created. For example:scala> r.printRDDs #00: HadoopRDD spark.bagel.examples.WikipediaPageRankStandalone$.main(WikipediaPageRankStandalone.scala:31) #01: MappedRDD spark.bagel.examples.WikipediaPageRankStandalone$.main(WikipediaPageRankStandalone.scala:31) #02: MappedRDD spark.bagel.examples.WikipediaPageRankStandalone$.main(WikipediaPageRankStandalone.scala:35) #03: ShuffledRDD spark.bagel.examples.WikipediaPageRankStandalone$.main(WikipediaPageRankStandalone.scala:35) #04: FlatMappedValuesRDD spark.bagel.examples.WikipediaPageRankStandalone$.main(WikipediaPageRankStandalone.scala:35) #05: MappedValuesRDD spark.bagel.examples.WikipediaPageRankStandalone$.pageRank(WikipediaPageRankStandalone.scala:91) #06: CoGroupedRDD spark.bagel.examples.WikipediaPageRankStandalone$.pageRank(WikipediaPageRankStandalone.scala:92) [...]
-
Use
r.visualizeRDDs()
to visualize the RDDs as a dependency graph. For example:scala> r.visualizeRDDs /tmp/spark-rdds-3758182885839775712.pdf
-
Iterate over the
RDDCreation
entries inr.events
(e.g.for (RDDCreation(rdd, location) <- events)
) to access the RDD creation locations as well as the RDDs themselves.
-
Find the task you want to debug. If the task threw an exception, the
ExceptionEvent
that was created will have a reference to the task. For example:spark> val task = r.events.collect { case e: ExceptionEvent => e }.head.task
Otherwise, look through the list of all tasks in
r.tasks
, or browse tasks by RDD usingr.tasksForRDD(rdd)
. -
Run the task by calling
r.debugTask(taskStageId, taskPartition)
. The task should contain these two values; you can extract them as follows:val (taskStageId, taskPartition) = task match { case rt: ResultTask[_, _] => (rt.stageId, rt.partition) case smt: ShuffleMapTask => (smt.stageId, smt.partition) case _ => throw new UnsupportedOperationException })
The Spark debugger will launch the task in a separate JVM, but you will see the task's stdout and stderr inline with the Spark shell. If you want to pass custom debugging arguments to the task's JVM (for example, to change the debugging port), set the optional
debugOpts
argument tor.debugTask
. WhendebugOpts
is left unset, it defaults to-Xdebug -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=8000
-
In another terminal, attach your favorite conventional debugger to the Spark shell. For example, if you want to use jdb, run
jdb -attach 8000
. -
Debug the task as you would debug a normal program. For example, to break when an exception is thrown:
> catch org.xml.sax.SAXParseException
-
When the task ends, its JVM will quit and control will return to the main Spark shell. To stop it prematurely, you can kill it from the debugger, or interrupt it from the terminal with Ctrl-C.
First, make sure you have run your program with performance data collection enabled as described above.
- To see the average processing time taken per element for each RDD, use
r.printProcessingTime()
. - To see the total time spent in serialization and deserialization, use
r.serializationTime
.
After recomputing the RDDs you're interested in (in order to force your transformations to run), look at r.checksumMismatches
. If it is non-empty, then one of your transformations is nondeterministic. You can find which one by looking at the rddId
property of the RDDChecksum
s.