diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 2ae4ad8659..53ba69da99 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -38,12 +38,15 @@ import org.apache.mesos.MesosNativeLibrary import spark.deploy.{LocalSparkCluster, SparkHadoopUtil} import spark.partial.{ApproximateEvaluator, PartialResult} import spark.rdd.{CheckpointRDD, HadoopRDD, NewHadoopRDD, UnionRDD, ParallelCollectionRDD} -import spark.scheduler.{DAGScheduler, ResultTask, ShuffleMapTask, SparkListener, SplitInfo, Stage, StageInfo, TaskScheduler} +import spark.scheduler._ import spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend, ClusterScheduler} import spark.scheduler.local.LocalScheduler import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend} -import spark.storage.{BlockManagerUI, StorageStatus, StorageUtils, RDDInfo} +import spark.storage.{BlockManagerUI, StorageUtils} import spark.util.{MetadataCleaner, TimeStampedHashMap} +import spark.scheduler.StageInfo +import spark.storage.RDDInfo +import spark.storage.StorageStatus /** @@ -494,6 +497,12 @@ class SparkContext( dagScheduler.sparkListeners += listener } + private[spark] + def executorStatus(es: ExecutorStatus) { + if (dagScheduler != null && dagScheduler.sparkListeners != null) + dagScheduler.sparkListeners.foreach{l => l.onExecutorStatusUpdate(es)} + } + /** * Return a map from the slave to the max memory available for caching and the remaining * memory available for caching. diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala index da20b84544..d6512234d0 100644 --- a/core/src/main/scala/spark/executor/Executor.scala +++ b/core/src/main/scala/spark/executor/Executor.scala @@ -4,12 +4,14 @@ import java.io.{File, FileOutputStream} import java.net.{URI, URL, URLClassLoader} import java.util.concurrent._ +import atomic.AtomicInteger import org.apache.hadoop.fs.FileUtil import scala.collection.mutable.{ArrayBuffer, Map, HashMap} import spark.broadcast._ import spark.scheduler._ +import cluster.ExecutorStatus import spark._ import java.nio.ByteBuffer @@ -32,6 +34,8 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert // must not have port specified. assert (0 == Utils.parseHostPort(slaveHostname)._2) + val activeTasks = new AtomicInteger(0) + // Make sure the local hostname we report matches the cluster scheduler's name for this host Utils.setCustomHostname(slaveHostname) @@ -81,10 +85,15 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert threadPool.execute(new TaskRunner(context, taskId, serializedTask)) } + def status = { + ExecutorStatus(executorId, activeTasks.get()) + } + class TaskRunner(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) extends Runnable { override def run() { + activeTasks.incrementAndGet() val startTime = System.currentTimeMillis() SparkEnv.set(env) Thread.currentThread.setContextClassLoader(urlClassLoader) @@ -131,6 +140,8 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert logError("Exception in task ID " + taskId, t) //System.exit(1) } + } finally { + activeTasks.decrementAndGet() } } } diff --git a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala index ebe2ac68d8..944cc17458 100644 --- a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala +++ b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala @@ -56,6 +56,16 @@ private[spark] class StandaloneExecutorBackend( executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask) } + case res:RequestExecutorStatus => + //TODO add to other backends + if (executor == null) { + logError("Received executor status request but executor was null") + //should I exit here? or is it possible this is OK? + System.exit(1) + } else { + driver ! executor.status + } + case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) => logError("Driver terminated or disconnected! Shutting down.") System.exit(1) diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala index b18248d2b5..83c7d4b844 100644 --- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala @@ -428,6 +428,8 @@ class DAGScheduler( logDebug("missing: " + missing) if (missing == Nil) { logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents") + val stageStarted = new StageStarted(stage) + sparkListeners.foreach{_.onStageStarted(stageStarted)} submitMissingTasks(stage) running += stage } else { @@ -568,6 +570,8 @@ class DAGScheduler( running ++= newlyRunnable for (stage <- newlyRunnable.sortBy(_.id)) { logInfo("Submitting " + stage + " (" + stage.rdd + "), which is now runnable") + val stageStarted = new StageStarted(stage) + sparkListeners.foreach{_.onStageStarted(stageStarted)} submitMissingTasks(stage) } } diff --git a/core/src/main/scala/spark/scheduler/ExecutorStatusPoller.scala b/core/src/main/scala/spark/scheduler/ExecutorStatusPoller.scala new file mode 100644 index 0000000000..1b5e838dba --- /dev/null +++ b/core/src/main/scala/spark/scheduler/ExecutorStatusPoller.scala @@ -0,0 +1,63 @@ +package spark.scheduler + +import scala.collection.mutable +import spark.scheduler.cluster.StandaloneSchedulerBackend +import spark.scheduler.local.LocalScheduler +import spark.Logging +import java.util.concurrent.{TimeUnit, Executors} + +/** + * + */ +abstract class ExecutorStatusPoller extends Logging { + val waitBetweenPolls = System.getProperty(ExecutorStatusPoller.OPEN_POLLS_WAIT_KEY, "100").toLong + val executorToLastPoll = mutable.Map[String, Long]() + + val pool = Executors.newSingleThreadScheduledExecutor() + val poller = new Runnable() { + override def run() { + val now = System.currentTimeMillis() + //if we also had the results come through this class, we could also throttle in terms of number of open polls + var minWait = waitBetweenPolls + executorList.foreach{executorId => + val lastPoll = executorToLastPoll.getOrElseUpdate(executorId, now) + val remainingWait = waitBetweenPolls - (now - lastPoll) + if ( remainingWait <= 0) { + pollExecutor(executorId) + executorToLastPoll(executorId) = System.currentTimeMillis() + } else if (remainingWait < minWait){ + minWait = remainingWait + } + } + } + } + + // schedule repeated task + pool.scheduleAtFixedRate(poller, 0, waitBetweenPolls, TimeUnit.MILLISECONDS) + + def executorList: Seq[String] + def pollExecutor(executorId: String) + def shutdown() { + // gracefully shutdown the poller + pool.shutdown() + pool.awaitTermination(30, TimeUnit.SECONDS) + } +} + +class StandaloneExecutorStatusPoller(val sched: StandaloneSchedulerBackend) extends ExecutorStatusPoller { + override def executorList = sched.allExecutors.keys.toSeq + override def pollExecutor(executorId: String) { + sched.requestExecutorStatus(executorId) + } +} + +class LocalExecutorStatusPoller(val sched: LocalScheduler) extends ExecutorStatusPoller { + override def executorList = Seq("local") //just needs to have one element, value doesn't matter + override def pollExecutor(executorId: String) { + sched.reportExecutorStatus + } +} + +object ExecutorStatusPoller { + val OPEN_POLLS_WAIT_KEY = "spark.executor_poll.wait_ms" +} diff --git a/core/src/main/scala/spark/scheduler/SparkListener.scala b/core/src/main/scala/spark/scheduler/SparkListener.scala index a65140b145..2196d3502f 100644 --- a/core/src/main/scala/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/spark/scheduler/SparkListener.scala @@ -6,27 +6,56 @@ import spark.{Utils, Logging} import spark.executor.TaskMetrics trait SparkListener { + + /** + * called when spark starts computing a new stage + */ + def onStageStarted(stageStarted: StageStarted) + /** * called when a stage is completed, with information on the completed stage */ def onStageCompleted(stageCompleted: StageCompleted) + + /** + * called when there is information on the status of an executor. This may get called at any time. There may not be + * any active stages when this is called. Furthermore, it may be called often, so don't do anything expensive here. + */ + def onExecutorStatusUpdate(executorStatus: ExecutorStatus) } sealed trait SparkListenerEvents +case class StageStarted(val stage: Stage) extends SparkListenerEvents + case class StageCompleted(val stageInfo: StageInfo) extends SparkListenerEvents +case class ExecutorStatus(val executorId: String, val activeTasks: Int, val availableCores: Int) + extends SparkListenerEvents + /** * Simple SparkListener that logs a few summary statistics when each stage completes */ class StatsReportListener extends SparkListener with Logging { + + var activeStageToExecutorStatus = Map[Int, ExecutorActivitySummary]() + + def onStageStarted(stageStarted: StageStarted) { + activeStageToExecutorStatus += stageStarted.stage.id -> ExecutorActivitySummary(0,0) + } + def onStageCompleted(stageCompleted: StageCompleted) { import spark.scheduler.StatsReportListener._ implicit val sc = stageCompleted + val execStatus = activeStageToExecutorStatus(stageCompleted.stageInfo.stage.id) + activeStageToExecutorStatus -= stageCompleted.stageInfo.stage.id this.logInfo("Finished stage: " + stageCompleted.stageInfo) showMillisDistribution("task runtime:", (info, _) => Some(info.duration)) + //overall work distribution + this.logInfo("executor utilization: %2.0f %%".format(execStatus.activePercent)) + //shuffle write showBytesDistribution("shuffle bytes written:",(_,metric) => metric.shuffleWriteMetrics.map{_.shuffleBytesWritten}) @@ -44,6 +73,13 @@ class StatsReportListener extends SparkListener with Logging { showDistribution("other time pct: ", Distribution(runtimePcts.map{_.other * 100}), "%2.0f %%") } + def onExecutorStatusUpdate(executorStatus: ExecutorStatus) { + //update ALL active stages + activeStageToExecutorStatus.foreach{case(k,v) => + activeStageToExecutorStatus += k -> (v + executorStatus) + } + } + } object StatsReportListener extends Logging { @@ -131,6 +167,16 @@ object StatsReportListener extends Logging { } } +case class ExecutorActivitySummary(activeCoresSampled: Int, totalCoresSampled: Int) { + def +(execStatus: ExecutorStatus): ExecutorActivitySummary = { + ExecutorActivitySummary(activeCoresSampled + execStatus.activeTasks, totalCoresSampled + execStatus.availableCores) + } + + def activePercent: Double = (activeCoresSampled.toDouble / totalCoresSampled) * 100 +} + + + case class RuntimePercentage(executorPct: Double, fetchPct: Option[Double], other: Double) diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala index cf4483f144..dee4a3a187 100644 --- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala +++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala @@ -411,6 +411,10 @@ private[spark] class ClusterScheduler(val sc: SparkContext) } } + def executorStatusUpdate(es: spark.scheduler.ExecutorStatus) { + sc.executorStatus(es) + } + def error(message: String) { synchronized { if (activeTaskSets.size > 0) { diff --git a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 170ede0f44..da65f82c3f 100644 --- a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -4,6 +4,7 @@ import spark.{Utils, Logging, SparkContext} import spark.deploy.client.{Client, ClientListener} import spark.deploy.{Command, ApplicationDescription} import scala.collection.mutable.HashMap +import spark.scheduler.StandaloneExecutorStatusPoller private[spark] class SparkDeploySchedulerBackend( scheduler: ClusterScheduler, @@ -14,6 +15,7 @@ private[spark] class SparkDeploySchedulerBackend( with ClientListener with Logging { + val executorStatusPoller = new StandaloneExecutorStatusPoller(this) var client: Client = null var stopping = false var shutdownCallback : (SparkDeploySchedulerBackend) => Unit = _ @@ -42,6 +44,7 @@ private[spark] class SparkDeploySchedulerBackend( stopping = true super.stop() client.stop() + executorStatusPoller.shutdown() if (shutdownCallback != null) { shutdownCallback(this) } diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala index 3335294844..6023998161 100644 --- a/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala +++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala @@ -18,6 +18,9 @@ case class RegisteredExecutor(sparkProperties: Seq[(String, String)]) private[spark] case class RegisterExecutorFailed(message: String) extends StandaloneClusterMessage +private[spark] +case class RequestExecutorStatus(executorId: String) extends StandaloneClusterMessage + // Executors to driver private[spark] case class RegisterExecutor(executorId: String, hostPort: String, cores: Int) @@ -37,6 +40,9 @@ object StatusUpdate { } } +private[spark] +case class ExecutorStatus(executorId: String, activeThreads: Int) + // Internal messages in driver private[spark] case object ReviveOffers extends StandaloneClusterMessage private[spark] case object StopDriver extends StandaloneClusterMessage diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index 004592a540..c3f0f6e9b2 100644 --- a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -21,14 +21,14 @@ private[spark] class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: ActorSystem) extends SchedulerBackend with Logging { - // Use an atomic variable to track total number of cores in the cluster for simplicity and speed - var totalCoreCount = new AtomicInteger(0) + var allExecutors = HashMap[String, Int]() class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor { private val executorActor = new HashMap[String, ActorRef] private val executorAddress = new HashMap[String, Address] private val executorHostPort = new HashMap[String, String] private val freeCores = new HashMap[String, Int] + private val totalCores = new HashMap[String, Int] private val actorToExecutorId = new HashMap[ActorRef, String] private val addressToExecutorId = new HashMap[Address, String] @@ -46,13 +46,14 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor logInfo("Registered executor: " + sender + " with ID " + executorId) sender ! RegisteredExecutor(sparkProperties) context.watch(sender) + allExecutors.synchronized(allExecutors += executorId -> cores) executorActor(executorId) = sender executorHostPort(executorId) = hostPort freeCores(executorId) = cores + totalCores(executorId) = cores executorAddress(executorId) = sender.path.address actorToExecutorId(sender) = executorId addressToExecutorId(sender.path.address) = executorId - totalCoreCount.addAndGet(cores) makeOffers() } @@ -82,6 +83,14 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor case RemoteClientShutdown(transport, address) => addressToExecutorId.get(address).foreach(removeExecutor(_, "remote Akka client shutdown")) + + case res@RequestExecutorStatus(executorId) => + executorActor(executorId) ! res + + case es: ExecutorStatus => + //convert to public api executor status, which includes num cores on the executor + val executorStatus = spark.scheduler.ExecutorStatus(es.executorId, es.activeThreads, totalCores(es.executorId)) + scheduler.executorStatusUpdate(executorStatus) } // Make fake resource offers on all executors @@ -114,8 +123,9 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor executorActor -= executorId executorHostPort -= executorId freeCores -= executorId + totalCores -= executorId + allExecutors.synchronized(allExecutors -= executorId) executorHostPort -= executorId - totalCoreCount.addAndGet(-numCores) scheduler.executorLost(executorId, SlaveLost(reason)) } } @@ -156,8 +166,12 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor driverActor ! ReviveOffers } + def requestExecutorStatus(executorId: String) { + driverActor ! RequestExecutorStatus(executorId) + } + override def defaultParallelism() = Option(System.getProperty("spark.default.parallelism")) - .map(_.toInt).getOrElse(math.max(totalCoreCount.get(), 2)) + .map(_.toInt).getOrElse(math.max(allExecutors.values.sum, 2)) // Called by subclasses when notified of a lost worker def removeExecutor(executorId: String, reason: String) { diff --git a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala index 37a67f9b1b..921c97abac 100644 --- a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala +++ b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala @@ -18,6 +18,8 @@ private[spark] class LocalScheduler(threads: Int, maxFailures: Int, sc: SparkCon extends TaskScheduler with Logging { + val statusPoller = new LocalExecutorStatusPoller(this) + val activeTasks = new AtomicInteger(0) var attemptId = new AtomicInteger(0) var threadPool = Utils.newDaemonFixedThreadPool(threads) val env = SparkEnv.get @@ -46,7 +48,9 @@ private[spark] class LocalScheduler(threads: Int, maxFailures: Int, sc: SparkCon val myAttemptId = attemptId.getAndIncrement() threadPool.submit(new Runnable { def run() { + activeTasks.getAndIncrement runTask(task, idInJob, myAttemptId) + activeTasks.getAndDecrement } }) } @@ -144,8 +148,14 @@ private[spark] class LocalScheduler(threads: Int, maxFailures: Int, sc: SparkCon } override def stop() { + statusPoller.shutdown() threadPool.shutdownNow() } override def defaultParallelism() = threads + + def reportExecutorStatus { + val active = activeTasks.get() + sc.executorStatus(new ExecutorStatus("local", active, threads)) + } }