Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Regularly poll executors to track their utilization #613

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
13 changes: 11 additions & 2 deletions core/src/main/scala/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,15 @@ import org.apache.mesos.MesosNativeLibrary
import spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import spark.partial.{ApproximateEvaluator, PartialResult}
import spark.rdd.{CheckpointRDD, HadoopRDD, NewHadoopRDD, UnionRDD, ParallelCollectionRDD}
import spark.scheduler.{DAGScheduler, ResultTask, ShuffleMapTask, SparkListener, SplitInfo, Stage, StageInfo, TaskScheduler}
import spark.scheduler._
import spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend, ClusterScheduler}
import spark.scheduler.local.LocalScheduler
import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
import spark.storage.{BlockManagerUI, StorageStatus, StorageUtils, RDDInfo}
import spark.storage.{BlockManagerUI, StorageUtils}
import spark.util.{MetadataCleaner, TimeStampedHashMap}
import spark.scheduler.StageInfo
import spark.storage.RDDInfo
import spark.storage.StorageStatus


/**
Expand Down Expand Up @@ -494,6 +497,12 @@ class SparkContext(
dagScheduler.sparkListeners += listener
}

private[spark]
def executorStatus(es: ExecutorStatus) {
if (dagScheduler != null && dagScheduler.sparkListeners != null)
dagScheduler.sparkListeners.foreach{l => l.onExecutorStatusUpdate(es)}
}

/**
* Return a map from the slave to the max memory available for caching and the remaining
* memory available for caching.
Expand Down
11 changes: 11 additions & 0 deletions core/src/main/scala/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ import java.io.{File, FileOutputStream}
import java.net.{URI, URL, URLClassLoader}
import java.util.concurrent._

import atomic.AtomicInteger
import org.apache.hadoop.fs.FileUtil

import scala.collection.mutable.{ArrayBuffer, Map, HashMap}

import spark.broadcast._
import spark.scheduler._
import cluster.ExecutorStatus
import spark._
import java.nio.ByteBuffer

Expand All @@ -32,6 +34,8 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert
// must not have port specified.
assert (0 == Utils.parseHostPort(slaveHostname)._2)

val activeTasks = new AtomicInteger(0)

// Make sure the local hostname we report matches the cluster scheduler's name for this host
Utils.setCustomHostname(slaveHostname)

Expand Down Expand Up @@ -81,10 +85,15 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert
threadPool.execute(new TaskRunner(context, taskId, serializedTask))
}

def status = {
ExecutorStatus(executorId, activeTasks.get())
}

class TaskRunner(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer)
extends Runnable {

override def run() {
activeTasks.incrementAndGet()
val startTime = System.currentTimeMillis()
SparkEnv.set(env)
Thread.currentThread.setContextClassLoader(urlClassLoader)
Expand Down Expand Up @@ -131,6 +140,8 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert
logError("Exception in task ID " + taskId, t)
//System.exit(1)
}
} finally {
activeTasks.decrementAndGet()
}
}
}
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,16 @@ private[spark] class StandaloneExecutorBackend(
executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask)
}

case res:RequestExecutorStatus =>
//TODO add to other backends
if (executor == null) {
logError("Received executor status request but executor was null")
//should I exit here? or is it possible this is OK?
System.exit(1)
} else {
driver ! executor.status
}

case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) =>
logError("Driver terminated or disconnected! Shutting down.")
System.exit(1)
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/scala/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,8 @@ class DAGScheduler(
logDebug("missing: " + missing)
if (missing == Nil) {
logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
val stageStarted = new StageStarted(stage)
sparkListeners.foreach{_.onStageStarted(stageStarted)}
submitMissingTasks(stage)
running += stage
} else {
Expand Down Expand Up @@ -568,6 +570,8 @@ class DAGScheduler(
running ++= newlyRunnable
for (stage <- newlyRunnable.sortBy(_.id)) {
logInfo("Submitting " + stage + " (" + stage.rdd + "), which is now runnable")
val stageStarted = new StageStarted(stage)
sparkListeners.foreach{_.onStageStarted(stageStarted)}
submitMissingTasks(stage)
}
}
Expand Down
59 changes: 59 additions & 0 deletions core/src/main/scala/spark/scheduler/ExecutorStatusPoller.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package spark.scheduler

import scala.collection.mutable
import spark.scheduler.cluster.StandaloneSchedulerBackend
import spark.scheduler.local.LocalScheduler
import spark.Logging

/**
*
*/
abstract class ExecutorStatusPoller extends Logging {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest using a java.util.concurrent.ScheduledThreadPoolExecutor instead of an infinite while loop. This would also let you schedule the poller at a fixed interval without having to manage the sleep "catchup" time yourself, e.g.:

  val pool = Executors.newSingleThreadScheduledExecutor()
  val poller = new Runnable() {
    override def run() { // poll each executorId here }
  }

  // schedule repeated task
  pool.scheduleAtFixedRate(poller, 0, waitBetweenPolls, TimeUnit.MILLISECONDS)

This also lets you gracefully stop the poller via:

  // gracefully shutdown the poller
  pool.shutdown()
  pool.awaitTermination(30, TimeUnit.SECONDS)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, I will make that change. This also got me thinking -- do I even want to create a new thread at all? Is there an appropriate thread pool for these repeated tasks already?

val waitBetweenPolls = System.getProperty(ExecutorStatusPoller.OPEN_POLLS_WAIT_KEY, "100").toLong
val executorToLastPoll = mutable.Map[String, Long]()

//simple round-robin poll of each executor. throttle the polling
val t = new Thread("executor-poller"){
setDaemon(true)
override def run() {
while(true) {
val now = System.currentTimeMillis()
//if we also had the results come through this class, we could also throttle in terms of number of open polls
var minWait = waitBetweenPolls
executorList.foreach{executorId =>
val lastPoll = executorToLastPoll.getOrElseUpdate(executorId, now)
val remainingWait = waitBetweenPolls - (now - lastPoll)
if ( remainingWait <= 0) {
pollExecutor(executorId)
executorToLastPoll(executorId) = System.currentTimeMillis()
} else if (remainingWait < minWait){
minWait = remainingWait
}
}
Thread.sleep(minWait)
}
}
}
t.start

def executorList: Seq[String]
def pollExecutor(executorId: String)
}

class StandaloneExecutorStatusPoller(val sched: StandaloneSchedulerBackend) extends ExecutorStatusPoller {
override def executorList = sched.allExecutors.toSeq
override def pollExecutor(executorId: String) {
sched.requestExecutorStatus(executorId)
}
}

class LocalExecutorStatusPoller(val sched: LocalScheduler) extends ExecutorStatusPoller {
override def executorList = Seq("local") //just needs to have one element, value doesn't matter
override def pollExecutor(executorId: String) {
sched.reportExecutorStatus
}
}

object ExecutorStatusPoller {
val OPEN_POLLS_WAIT_KEY = "spark.executor_poll.wait_ms"
}
46 changes: 46 additions & 0 deletions core/src/main/scala/spark/scheduler/SparkListener.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,56 @@ import spark.{Utils, Logging}
import spark.executor.TaskMetrics

trait SparkListener {

/**
* called when spark starts computing a new stage
*/
def onStageStarted(stageStarted: StageStarted)

/**
* called when a stage is completed, with information on the completed stage
*/
def onStageCompleted(stageCompleted: StageCompleted)

/**
* called when there is information on the status of an executor. This may get called at any time. There may not be
* any active stages when this is called. Furthermore, it may be called often, so don't do anything expensive here.
*/
def onExecutorStatusUpdate(executorStatus: ExecutorStatus)
}

sealed trait SparkListenerEvents

case class StageStarted(val stage: Stage) extends SparkListenerEvents

case class StageCompleted(val stageInfo: StageInfo) extends SparkListenerEvents

case class ExecutorStatus(val executorId: String, val activeTasks: Int, val availableCores: Int)
extends SparkListenerEvents


/**
* Simple SparkListener that logs a few summary statistics when each stage completes
*/
class StatsReportListener extends SparkListener with Logging {

var activeStageToExecutorStatus = Map[Int, ExecutorActivitySummary]()

def onStageStarted(stageStarted: StageStarted) {
activeStageToExecutorStatus += stageStarted.stage.id -> ExecutorActivitySummary(0,0)
}

def onStageCompleted(stageCompleted: StageCompleted) {
import spark.scheduler.StatsReportListener._
implicit val sc = stageCompleted
val execStatus = activeStageToExecutorStatus(stageCompleted.stageInfo.stage.id)
activeStageToExecutorStatus -= stageCompleted.stageInfo.stage.id
this.logInfo("Finished stage: " + stageCompleted.stageInfo)
showMillisDistribution("task runtime:", (info, _) => Some(info.duration))

//overall work distribution
this.logInfo("executor utilization: %2.0f %%".format(execStatus.activePercent))

//shuffle write
showBytesDistribution("shuffle bytes written:",(_,metric) => metric.shuffleWriteMetrics.map{_.shuffleBytesWritten})

Expand All @@ -44,6 +73,13 @@ class StatsReportListener extends SparkListener with Logging {
showDistribution("other time pct: ", Distribution(runtimePcts.map{_.other * 100}), "%2.0f %%")
}

def onExecutorStatusUpdate(executorStatus: ExecutorStatus) {
//update ALL active stages
activeStageToExecutorStatus.foreach{case(k,v) =>
activeStageToExecutorStatus += k -> (v + executorStatus)
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that ExecutorStatus messages even happen when there aren't any active stages. This means that you can also measure the cluster utilization across the lifetime of a spark context, not just within a stage. Eg., this would help diagnose if every stage is very well distributed, but between stages there is a lot of work happening on the master. I just decided to not include that in StatsReportListener.

(You could achieve the same effect w/out actually sending all the messages when there are no active stages, but the cluster is idle anyway, so why not.)

}

}

object StatsReportListener extends Logging {
Expand Down Expand Up @@ -131,6 +167,16 @@ object StatsReportListener extends Logging {
}
}

case class ExecutorActivitySummary(activeCores: Int, totalCores: Int) {
def +(execStatus: ExecutorStatus): ExecutorActivitySummary = {
ExecutorActivitySummary(activeCores + execStatus.activeTasks, totalCores + execStatus.availableCores)
}

def activePercent: Double = (activeCores.toDouble / totalCores) * 100
}





case class RuntimePercentage(executorPct: Double, fetchPct: Option[Double], other: Double)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,10 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
}
}

def executorStatusUpdate(es: spark.scheduler.ExecutorStatus) {
sc.executorStatus(es)
}

def error(message: String) {
synchronized {
if (activeTaskSets.size > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import spark.{Utils, Logging, SparkContext}
import spark.deploy.client.{Client, ClientListener}
import spark.deploy.{Command, ApplicationDescription}
import scala.collection.mutable.HashMap
import spark.scheduler.StandaloneExecutorStatusPoller

private[spark] class SparkDeploySchedulerBackend(
scheduler: ClusterScheduler,
Expand All @@ -14,6 +15,7 @@ private[spark] class SparkDeploySchedulerBackend(
with ClientListener
with Logging {

val executorStatusPoller = new StandaloneExecutorStatusPoller(this)
var client: Client = null
var stopping = false
var shutdownCallback : (SparkDeploySchedulerBackend) => Unit = _
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ case class RegisteredExecutor(sparkProperties: Seq[(String, String)])
private[spark]
case class RegisterExecutorFailed(message: String) extends StandaloneClusterMessage

private[spark]
case class RequestExecutorStatus(executorId: String) extends StandaloneClusterMessage

// Executors to driver
private[spark]
case class RegisterExecutor(executorId: String, hostPort: String, cores: Int)
Expand All @@ -37,6 +40,9 @@ object StatusUpdate {
}
}

private[spark]
case class ExecutorStatus(executorId: String, activeThreads: Int)

// Internal messages in driver
private[spark] case object ReviveOffers extends StandaloneClusterMessage
private[spark] case object StopDriver extends StandaloneClusterMessage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor

// Use an atomic variable to track total number of cores in the cluster for simplicity and speed
var totalCoreCount = new AtomicInteger(0)
var allExecutors = new HashSet[String]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of having two state tracking variables (allExecutors and totalCores), could we not just have a single Map[String, Int] where the key is the executor id and the value would be # of cores? Something like:

  val allExecutors = Map.empty[String, Int].withDefaultValue(0)


class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor {
private val executorActor = new HashMap[String, ActorRef]
private val executorAddress = new HashMap[String, Address]
private val executorHostPort = new HashMap[String, String]
private val freeCores = new HashMap[String, Int]
private val totalCores = new HashMap[String, Int]
private val actorToExecutorId = new HashMap[ActorRef, String]
private val addressToExecutorId = new HashMap[Address, String]

Expand All @@ -46,9 +48,11 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
logInfo("Registered executor: " + sender + " with ID " + executorId)
sender ! RegisteredExecutor(sparkProperties)
context.watch(sender)
allExecutors.synchronized(allExecutors += executorId)
executorActor(executorId) = sender
executorHostPort(executorId) = hostPort
freeCores(executorId) = cores
totalCores(executorId) = cores
executorAddress(executorId) = sender.path.address
actorToExecutorId(sender) = executorId
addressToExecutorId(sender.path.address) = executorId
Expand Down Expand Up @@ -82,6 +86,14 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor

case RemoteClientShutdown(transport, address) =>
addressToExecutorId.get(address).foreach(removeExecutor(_, "remote Akka client shutdown"))

case res@RequestExecutorStatus(executorId) =>
executorActor(executorId) ! res

case es: ExecutorStatus =>
//convert to public api executor status, which includes num cores on the executor
val executorStatus = spark.scheduler.ExecutorStatus(es.executorId, es.activeThreads, totalCores(es.executorId))
scheduler.executorStatusUpdate(executorStatus)
}

// Make fake resource offers on all executors
Expand Down Expand Up @@ -114,6 +126,8 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
executorActor -= executorId
executorHostPort -= executorId
freeCores -= executorId
totalCores -= executorId
allExecutors.synchronized(allExecutors -= executorId)
executorHostPort -= executorId
totalCoreCount.addAndGet(-numCores)
scheduler.executorLost(executorId, SlaveLost(reason))
Expand Down Expand Up @@ -156,6 +170,10 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
driverActor ! ReviveOffers
}

def requestExecutorStatus(executorId: String) {
driverActor ! RequestExecutorStatus(executorId)
}

override def defaultParallelism() = Option(System.getProperty("spark.default.parallelism"))
.map(_.toInt).getOrElse(math.max(totalCoreCount.get(), 2))

Expand Down
Loading