-
Notifications
You must be signed in to change notification settings - Fork 385
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Regularly poll executors to track their utilization #613
base: master
Are you sure you want to change the base?
Conversation
Thank you for your pull request. An admin will review this request soon. |
//update ALL active stages | ||
activeStageToExecutorStatus.foreach{case(k,v) => | ||
activeStageToExecutorStatus += k -> (v + executorStatus) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that ExecutorStatus
messages even happen when there aren't any active stages. This means that you can also measure the cluster utilization across the lifetime of a spark context, not just within a stage. Eg., this would help diagnose if every stage is very well distributed, but between stages there is a lot of work happening on the master. I just decided to not include that in StatsReportListener
.
(You could achieve the same effect w/out actually sending all the messages when there are no active stages, but the cluster is idle anyway, so why not.)
Why not ganglia or other external monitoring tools? |
/** | ||
* | ||
*/ | ||
abstract class ExecutorStatusPoller extends Logging { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd suggest using a java.util.concurrent.ScheduledThreadPoolExecutor instead of an infinite while loop. This would also let you schedule the poller at a fixed interval without having to manage the sleep "catchup" time yourself, e.g.:
val pool = Executors.newSingleThreadScheduledExecutor()
val poller = new Runnable() {
override def run() { // poll each executorId here }
}
// schedule repeated task
pool.scheduleAtFixedRate(poller, 0, waitBetweenPolls, TimeUnit.MILLISECONDS)
This also lets you gracefully stop the poller via:
// gracefully shutdown the poller
pool.shutdown()
pool.awaitTermination(30, TimeUnit.SECONDS)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point, I will make that change. This also got me thinking -- do I even want to create a new thread at all? Is there an appropriate thread pool for these repeated tasks already?
@rxin yeah ganglia could provide something pretty close to this. But I thought this was useful b/c (1) this is so simple, its useful to have even if you don't setup ganglia. And as this is integrated right into spark, its easier to connect these measures w/ whats going on in your code (you don't just have a ganglia graph w/ times, which then you've got connect back to what was going on in your code). Its not measuring the exact same thing as ganglia would w/ core utilization, but I can't make a really strong case why this is particularly better. If there is eventually really tight integration w/ ganglia, then maybe this could get ripped out. (2) I was hoping that this might get used for more thorough polling of the executors, eg. stack trace sampling, task progress, etc. So might be a useful stepping stone even if the "core utilization" part is dropped eventually. |
Imran, this approach looks good to me, but I'm going to send it to Patrick, who's been looking at monitoring stuff too. I think these are reasonable API calls to add to the listener though. |
Hey Imran, Just a high level question (haven't done a close look yet). If this is all of the information we are collecting - why do you need to poll the executors in the first place? The information of which tasks are running on which executor when is available directly at the driver. You could actually get much finer grained utilization statistics using that information without the need to add RPC's.
|
Ah well, part of the motivation for this is that we noticed huge delays between when the executor thinks its finished a task, and when the driver fully registered it -- over 50% of the time actually for some of our workloads. We were able to fix this when we discovered it (in our case it seemed to be mostly the cost of merging the accumulator results on the driver), but I'd really like to make this inefficiency more obvious. If we really wanted to, we could probably do this entirely within the driver, but I guess I did it this way b/c I was hoping to piggyback more info on top of those RPCs in the future -- eg., maybe jvm metrics, shuffle status, counters could also come via the same mechanism. |
FWIW I've wanted a quick & dirty "what's the cluster utilization like?" UI for a long time--agreed that other tools should be used for more extensive monitoring, but it would be nice to have some really basic info available for free/out of the box. (I haven't looked at the code, so can't comment on the approach/implementation, just chiming in to say I'd enjoy seeing this happen.) |
Thank you for your pull request. An admin will review this request soon. |
Author: wangfei <wangfei_hello@126.com> Closes mesos#613 from scwf/masterIndex and squashes the following commits: 1463056 [wangfei] delete no use var: masterIndex
This isn't ready to be merged yet (needs tests & docs), but wanted to get some feedback.
The point of this PR is to provide a really high-level metric on how well the cluster is being utilized, by simply polling what percentages of cores are active. I'm hopefully this will diagnose a lot of cases where jobs are slow b/c spark is being used incorrectly -- imbalanced jobs, driver spending all its time merging accumulator values, long breaks between stages, etc.
The ugly part of this is plumbing the events from StandaloneSchedulerBackend --> ClusterScheduler --> SparkContext --> DAGScheduler --> SparkListener. I'm not sure what the right architecture is in general to let any arbitrary component get an event to the SparkListeners.