From 8908db363819cf3b6bf6fa690e0874c99c33cb92 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Wed, 13 Feb 2013 20:29:08 -0800 Subject: [PATCH 01/12] add simple ExecutorStatus --- core/src/main/scala/spark/executor/Executor.scala | 11 +++++++++++ .../scheduler/cluster/StandaloneClusterMessage.scala | 6 ++++++ 2 files changed, 17 insertions(+) diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala index da20b84544..89a77fed6b 100644 --- a/core/src/main/scala/spark/executor/Executor.scala +++ b/core/src/main/scala/spark/executor/Executor.scala @@ -4,12 +4,14 @@ import java.io.{File, FileOutputStream} import java.net.{URI, URL, URLClassLoader} import java.util.concurrent._ +import atomic.AtomicInteger import org.apache.hadoop.fs.FileUtil import scala.collection.mutable.{ArrayBuffer, Map, HashMap} import spark.broadcast._ import spark.scheduler._ +import cluster.ExecutorStatus import spark._ import java.nio.ByteBuffer @@ -32,6 +34,8 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert // must not have port specified. assert (0 == Utils.parseHostPort(slaveHostname)._2) + val activeTasks = new AtomicInteger(0) + // Make sure the local hostname we report matches the cluster scheduler's name for this host Utils.setCustomHostname(slaveHostname) @@ -81,10 +85,15 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert threadPool.execute(new TaskRunner(context, taskId, serializedTask)) } + def status = { + ExecutorStatus(activeTasks.get()) + } + class TaskRunner(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) extends Runnable { override def run() { + activeTasks.incrementAndGet() val startTime = System.currentTimeMillis() SparkEnv.set(env) Thread.currentThread.setContextClassLoader(urlClassLoader) @@ -131,6 +140,8 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert logError("Exception in task ID " + taskId, t) //System.exit(1) } + } finally { + activeTasks.decrementAndGet() } } } diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala index 3335294844..b089410dea 100644 --- a/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala +++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala @@ -18,6 +18,9 @@ case class RegisteredExecutor(sparkProperties: Seq[(String, String)]) private[spark] case class RegisterExecutorFailed(message: String) extends StandaloneClusterMessage +private[spark] +case object RequestExecutorStatus extends StandaloneClusterMessage + // Executors to driver private[spark] case class RegisterExecutor(executorId: String, hostPort: String, cores: Int) @@ -37,6 +40,9 @@ object StatusUpdate { } } +private[spark] +case class ExecutorStatus(activeThreads: Int) + // Internal messages in driver private[spark] case object ReviveOffers extends StandaloneClusterMessage private[spark] case object StopDriver extends StandaloneClusterMessage From ffeee819ba3e026e28d0b43502b01d9c06c5febc Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Mon, 13 May 2013 22:24:28 -0700 Subject: [PATCH 02/12] plumb ExecutorStatus through system. (still missing polling) --- core/src/main/scala/spark/SparkContext.scala | 12 ++++++++++-- .../main/scala/spark/executor/Executor.scala | 2 +- .../executor/StandaloneExecutorBackend.scala | 10 ++++++++++ .../scala/spark/scheduler/SparkListener.scala | 13 +++++++++++++ .../scheduler/cluster/ClusterScheduler.scala | 4 ++++ .../cluster/StandaloneClusterMessage.scala | 4 ++-- .../cluster/StandaloneSchedulerBackend.scala | 18 ++++++++++++++++++ 7 files changed, 58 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 2ae4ad8659..cdb7153db2 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -38,12 +38,15 @@ import org.apache.mesos.MesosNativeLibrary import spark.deploy.{LocalSparkCluster, SparkHadoopUtil} import spark.partial.{ApproximateEvaluator, PartialResult} import spark.rdd.{CheckpointRDD, HadoopRDD, NewHadoopRDD, UnionRDD, ParallelCollectionRDD} -import spark.scheduler.{DAGScheduler, ResultTask, ShuffleMapTask, SparkListener, SplitInfo, Stage, StageInfo, TaskScheduler} +import spark.scheduler._ import spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend, ClusterScheduler} import spark.scheduler.local.LocalScheduler import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend} -import spark.storage.{BlockManagerUI, StorageStatus, StorageUtils, RDDInfo} +import spark.storage.{BlockManagerUI, StorageUtils} import spark.util.{MetadataCleaner, TimeStampedHashMap} +import spark.scheduler.StageInfo +import spark.storage.RDDInfo +import spark.storage.StorageStatus /** @@ -494,6 +497,11 @@ class SparkContext( dagScheduler.sparkListeners += listener } + private[spark] + def executorStatus(es: ExecutorStatus) { + dagScheduler.sparkListeners.foreach{l => l.onExecutorStatusUpdate(es)} + } + /** * Return a map from the slave to the max memory available for caching and the remaining * memory available for caching. diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala index 89a77fed6b..d6512234d0 100644 --- a/core/src/main/scala/spark/executor/Executor.scala +++ b/core/src/main/scala/spark/executor/Executor.scala @@ -86,7 +86,7 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert } def status = { - ExecutorStatus(activeTasks.get()) + ExecutorStatus(executorId, activeTasks.get()) } class TaskRunner(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) diff --git a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala index ebe2ac68d8..944cc17458 100644 --- a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala +++ b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala @@ -56,6 +56,16 @@ private[spark] class StandaloneExecutorBackend( executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask) } + case res:RequestExecutorStatus => + //TODO add to other backends + if (executor == null) { + logError("Received executor status request but executor was null") + //should I exit here? or is it possible this is OK? + System.exit(1) + } else { + driver ! executor.status + } + case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) => logError("Driver terminated or disconnected! Shutting down.") System.exit(1) diff --git a/core/src/main/scala/spark/scheduler/SparkListener.scala b/core/src/main/scala/spark/scheduler/SparkListener.scala index a65140b145..097b91dfc6 100644 --- a/core/src/main/scala/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/spark/scheduler/SparkListener.scala @@ -10,12 +10,21 @@ trait SparkListener { * called when a stage is completed, with information on the completed stage */ def onStageCompleted(stageCompleted: StageCompleted) + + /** + * can be called anytime to give information on the status of one executor. There may not be any active stages + * when this is called. Furthermore, it may be called often, so don't do anything expensive here. + */ + def onExecutorStatusUpdate(executorStatus: ExecutorStatus) } sealed trait SparkListenerEvents case class StageCompleted(val stageInfo: StageInfo) extends SparkListenerEvents +case class ExecutorStatus(val executorId: String, val activeTasks: Int, val availableCores: Int) + extends SparkListenerEvents + /** * Simple SparkListener that logs a few summary statistics when each stage completes @@ -44,6 +53,10 @@ class StatsReportListener extends SparkListener with Logging { showDistribution("other time pct: ", Distribution(runtimePcts.map{_.other * 100}), "%2.0f %%") } + def onExecutorStatusUpdate(executorStatus: ExecutorStatus) { + //TODO + } + } object StatsReportListener extends Logging { diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala index cf4483f144..dee4a3a187 100644 --- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala +++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala @@ -411,6 +411,10 @@ private[spark] class ClusterScheduler(val sc: SparkContext) } } + def executorStatusUpdate(es: spark.scheduler.ExecutorStatus) { + sc.executorStatus(es) + } + def error(message: String) { synchronized { if (activeTaskSets.size > 0) { diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala index b089410dea..6023998161 100644 --- a/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala +++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala @@ -19,7 +19,7 @@ private[spark] case class RegisterExecutorFailed(message: String) extends StandaloneClusterMessage private[spark] -case object RequestExecutorStatus extends StandaloneClusterMessage +case class RequestExecutorStatus(executorId: String) extends StandaloneClusterMessage // Executors to driver private[spark] @@ -41,7 +41,7 @@ object StatusUpdate { } private[spark] -case class ExecutorStatus(activeThreads: Int) +case class ExecutorStatus(executorId: String, activeThreads: Int) // Internal messages in driver private[spark] case object ReviveOffers extends StandaloneClusterMessage diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index 004592a540..9909eb53c2 100644 --- a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -23,12 +23,14 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor // Use an atomic variable to track total number of cores in the cluster for simplicity and speed var totalCoreCount = new AtomicInteger(0) + var allExecutors = new HashSet[String] class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor { private val executorActor = new HashMap[String, ActorRef] private val executorAddress = new HashMap[String, Address] private val executorHostPort = new HashMap[String, String] private val freeCores = new HashMap[String, Int] + private val totalCores = new HashMap[String, Int] private val actorToExecutorId = new HashMap[ActorRef, String] private val addressToExecutorId = new HashMap[Address, String] @@ -46,9 +48,11 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor logInfo("Registered executor: " + sender + " with ID " + executorId) sender ! RegisteredExecutor(sparkProperties) context.watch(sender) + allExecutors.synchronized(allExecutors += executorId) executorActor(executorId) = sender executorHostPort(executorId) = hostPort freeCores(executorId) = cores + totalCores(executorId) = cores executorAddress(executorId) = sender.path.address actorToExecutorId(sender) = executorId addressToExecutorId(sender.path.address) = executorId @@ -82,6 +86,14 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor case RemoteClientShutdown(transport, address) => addressToExecutorId.get(address).foreach(removeExecutor(_, "remote Akka client shutdown")) + + case res@RequestExecutorStatus(executorId) => + executorActor(executorId) ! res + + case es: ExecutorStatus => + //convert to public api executor status, which includes num cores on the executor + val executorStatus = spark.scheduler.ExecutorStatus(es.executorId, es.activeThreads, totalCores(es.executorId)) + scheduler.executorStatusUpdate(executorStatus) } // Make fake resource offers on all executors @@ -114,6 +126,8 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor executorActor -= executorId executorHostPort -= executorId freeCores -= executorId + totalCores -= executorId + allExecutors.synchronized(allExecutors -= executorId) executorHostPort -= executorId totalCoreCount.addAndGet(-numCores) scheduler.executorLost(executorId, SlaveLost(reason)) @@ -156,6 +170,10 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor driverActor ! ReviveOffers } + def requestExecutorStatus(executorId: String) { + driverActor ! RequestExecutorStatus(executorId) + } + override def defaultParallelism() = Option(System.getProperty("spark.default.parallelism")) .map(_.toInt).getOrElse(math.max(totalCoreCount.get(), 2)) From 6bdd9bee5b76715eb343fb0fdc59ae7241184937 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Tue, 14 May 2013 15:57:42 -0700 Subject: [PATCH 03/12] regularly poll the executors for their status (just standalone cluster for now) --- .../scheduler/ExecutorStatusPoller.scala | 52 +++++++++++++++++++ .../cluster/SparkDeploySchedulerBackend.scala | 2 + 2 files changed, 54 insertions(+) create mode 100644 core/src/main/scala/spark/scheduler/ExecutorStatusPoller.scala diff --git a/core/src/main/scala/spark/scheduler/ExecutorStatusPoller.scala b/core/src/main/scala/spark/scheduler/ExecutorStatusPoller.scala new file mode 100644 index 0000000000..55d00ec6d6 --- /dev/null +++ b/core/src/main/scala/spark/scheduler/ExecutorStatusPoller.scala @@ -0,0 +1,52 @@ +package spark.scheduler + +import scala.collection.mutable +import spark.scheduler.cluster.StandaloneSchedulerBackend +import spark.scheduler.local.LocalScheduler +import spark.Logging + +/** + * + */ +abstract class ExecutorStatusPoller extends Logging { + val waitBetweenPolls = System.getProperty(ExecutorStatusPoller.OPEN_POLLS_WAIT_KEY, "100").toLong + val executorToLastPoll = mutable.Map[String, Long]() + + //simple round-robin poll of each executor. throttle the polling + val t = new Thread("executor-poller"){ + setDaemon(true) + override def run() { + while(true) { + val now = System.currentTimeMillis() + //if we also had the results come through this class, we could also throttle in terms of number of open polls + var minWait = waitBetweenPolls + executorList.foreach{executorId => + val lastPoll = executorToLastPoll.getOrElseUpdate(executorId, now) + val remainingWait = waitBetweenPolls - (now - lastPoll) + if ( remainingWait <= 0) { + pollExecutor(executorId) + executorToLastPoll(executorId) = System.currentTimeMillis() + } else if (remainingWait < minWait){ + minWait = remainingWait + } + } + Thread.sleep(minWait) + } + } + } + t.start + + def executorList: Seq[String] + def pollExecutor(executorId: String) +} + +class StandaloneExecutorStatusPoller(val sched: StandaloneSchedulerBackend) extends ExecutorStatusPoller { + override def executorList = sched.allExecutors.toSeq + override def pollExecutor(executorId: String) { + sched.requestExecutorStatus(executorId) + } +} + +object ExecutorStatusPoller { + val OPEN_POLLS_WAIT_KEY = "spark.executor_poll.wait_ms" +} diff --git a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 170ede0f44..998c29d865 100644 --- a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -4,6 +4,7 @@ import spark.{Utils, Logging, SparkContext} import spark.deploy.client.{Client, ClientListener} import spark.deploy.{Command, ApplicationDescription} import scala.collection.mutable.HashMap +import spark.scheduler.StandaloneExecutorStatusPoller private[spark] class SparkDeploySchedulerBackend( scheduler: ClusterScheduler, @@ -14,6 +15,7 @@ private[spark] class SparkDeploySchedulerBackend( with ClientListener with Logging { + val executorStatusPoller = new StandaloneExecutorStatusPoller(this) var client: Client = null var stopping = false var shutdownCallback : (SparkDeploySchedulerBackend) => Unit = _ From 89f4cb57064677e7fc2fc6036f79ba342bb24c26 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Wed, 15 May 2013 09:16:17 -0700 Subject: [PATCH 04/12] add StageStarted event --- .../scala/spark/scheduler/DAGScheduler.scala | 2 ++ .../scala/spark/scheduler/SparkListener.scala | 17 +++++++++++++++-- 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala index b18248d2b5..bd195bf396 100644 --- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala @@ -428,6 +428,8 @@ class DAGScheduler( logDebug("missing: " + missing) if (missing == Nil) { logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents") + val stageStarted = new StageStarted(stage) + sparkListeners.foreach{_.onStageStarted(stageStarted)} submitMissingTasks(stage) running += stage } else { diff --git a/core/src/main/scala/spark/scheduler/SparkListener.scala b/core/src/main/scala/spark/scheduler/SparkListener.scala index 097b91dfc6..7db7defc21 100644 --- a/core/src/main/scala/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/spark/scheduler/SparkListener.scala @@ -6,20 +6,28 @@ import spark.{Utils, Logging} import spark.executor.TaskMetrics trait SparkListener { + + /** + * called when spark starts computing a new stage + */ + def onStageStarted(stageStarted: StageStarted) + /** * called when a stage is completed, with information on the completed stage */ def onStageCompleted(stageCompleted: StageCompleted) /** - * can be called anytime to give information on the status of one executor. There may not be any active stages - * when this is called. Furthermore, it may be called often, so don't do anything expensive here. + * called when there is information on the status of an executor. This may get called at any time. There may not be + * any active stages when this is called. Furthermore, it may be called often, so don't do anything expensive here. */ def onExecutorStatusUpdate(executorStatus: ExecutorStatus) } sealed trait SparkListenerEvents +case class StageStarted(val stage: Stage) extends SparkListenerEvents + case class StageCompleted(val stageInfo: StageInfo) extends SparkListenerEvents case class ExecutorStatus(val executorId: String, val activeTasks: Int, val availableCores: Int) @@ -30,6 +38,11 @@ case class ExecutorStatus(val executorId: String, val activeTasks: Int, val avai * Simple SparkListener that logs a few summary statistics when each stage completes */ class StatsReportListener extends SparkListener with Logging { + + def onStageStarted(stageStarted: StageStarted) { + //TODO + } + def onStageCompleted(stageCompleted: StageCompleted) { import spark.scheduler.StatsReportListener._ implicit val sc = stageCompleted From 656422c2d0ea653e78ef75738928006b4b428ff2 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Wed, 15 May 2013 09:31:49 -0700 Subject: [PATCH 05/12] StatsReportListener summarizes executor utilization at end of every stage --- .../scala/spark/scheduler/SparkListener.scala | 24 +++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/spark/scheduler/SparkListener.scala b/core/src/main/scala/spark/scheduler/SparkListener.scala index 7db7defc21..62a7df35ea 100644 --- a/core/src/main/scala/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/spark/scheduler/SparkListener.scala @@ -39,16 +39,23 @@ case class ExecutorStatus(val executorId: String, val activeTasks: Int, val avai */ class StatsReportListener extends SparkListener with Logging { + var activeStageToExecutorStatus = Map[Int, ExecutorActivitySummary]() + def onStageStarted(stageStarted: StageStarted) { - //TODO + activeStageToExecutorStatus += stageStarted.stage.id -> ExecutorActivitySummary(0,0) } def onStageCompleted(stageCompleted: StageCompleted) { import spark.scheduler.StatsReportListener._ implicit val sc = stageCompleted + val execStatus = activeStageToExecutorStatus(stageCompleted.stageInfo.stage.id) + activeStageToExecutorStatus -= stageCompleted.stageInfo.stage.id this.logInfo("Finished stage: " + stageCompleted.stageInfo) showMillisDistribution("task runtime:", (info, _) => Some(info.duration)) + //overall work distribution + this.logInfo("executor utilization: %2.0f %%".format(execStatus.activePercent)) + //shuffle write showBytesDistribution("shuffle bytes written:",(_,metric) => metric.shuffleWriteMetrics.map{_.shuffleBytesWritten}) @@ -67,7 +74,10 @@ class StatsReportListener extends SparkListener with Logging { } def onExecutorStatusUpdate(executorStatus: ExecutorStatus) { - //TODO + //update ALL active stages + activeStageToExecutorStatus.foreach{case(k,v) => + activeStageToExecutorStatus += k -> (v + executorStatus) + } } } @@ -157,6 +167,16 @@ object StatsReportListener extends Logging { } } +case class ExecutorActivitySummary(activeCores: Int, totalCores: Int) { + def +(execStatus: ExecutorStatus): ExecutorActivitySummary = { + ExecutorActivitySummary(activeCores + execStatus.activeTasks, totalCores + execStatus.availableCores) + } + + def activePercent: Double = (activeCores.toDouble / totalCores) * 100 +} + + + case class RuntimePercentage(executorPct: Double, fetchPct: Option[Double], other: Double) From 9008a55946840e6b1141295ef8509a2302d30eee Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Wed, 15 May 2013 10:08:46 -0700 Subject: [PATCH 06/12] setup executor status polling in local scheduler --- .../scala/spark/scheduler/ExecutorStatusPoller.scala | 7 +++++++ .../scala/spark/scheduler/local/LocalScheduler.scala | 9 +++++++++ 2 files changed, 16 insertions(+) diff --git a/core/src/main/scala/spark/scheduler/ExecutorStatusPoller.scala b/core/src/main/scala/spark/scheduler/ExecutorStatusPoller.scala index 55d00ec6d6..825610e35c 100644 --- a/core/src/main/scala/spark/scheduler/ExecutorStatusPoller.scala +++ b/core/src/main/scala/spark/scheduler/ExecutorStatusPoller.scala @@ -47,6 +47,13 @@ class StandaloneExecutorStatusPoller(val sched: StandaloneSchedulerBackend) exte } } +class LocalExecutorStatusPoller(val sched: LocalScheduler) extends ExecutorStatusPoller { + override def executorList = Seq("local") //just needs to have one element, value doesn't matter + override def pollExecutor(executorId: String) { + sched.reportExecutorStatus + } +} + object ExecutorStatusPoller { val OPEN_POLLS_WAIT_KEY = "spark.executor_poll.wait_ms" } diff --git a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala index 37a67f9b1b..ba2c311a10 100644 --- a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala +++ b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala @@ -18,6 +18,8 @@ private[spark] class LocalScheduler(threads: Int, maxFailures: Int, sc: SparkCon extends TaskScheduler with Logging { + val statusPoller = new LocalExecutorStatusPoller(this) + val activeTasks = new AtomicInteger(0) var attemptId = new AtomicInteger(0) var threadPool = Utils.newDaemonFixedThreadPool(threads) val env = SparkEnv.get @@ -46,7 +48,9 @@ private[spark] class LocalScheduler(threads: Int, maxFailures: Int, sc: SparkCon val myAttemptId = attemptId.getAndIncrement() threadPool.submit(new Runnable { def run() { + activeTasks.getAndIncrement runTask(task, idInJob, myAttemptId) + activeTasks.getAndDecrement } }) } @@ -148,4 +152,9 @@ private[spark] class LocalScheduler(threads: Int, maxFailures: Int, sc: SparkCon } override def defaultParallelism() = threads + + def reportExecutorStatus { + val active = activeTasks.get() + sc.executorStatus(new ExecutorStatus("local", active, threads)) + } } From f71adb18127fa9b53a6abedfd8c083fcb9548d8e Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Wed, 15 May 2013 14:15:44 -0700 Subject: [PATCH 07/12] stage started events when waiting stages start --- core/src/main/scala/spark/scheduler/DAGScheduler.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala index bd195bf396..83c7d4b844 100644 --- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala @@ -570,6 +570,8 @@ class DAGScheduler( running ++= newlyRunnable for (stage <- newlyRunnable.sortBy(_.id)) { logInfo("Submitting " + stage + " (" + stage.rdd + "), which is now runnable") + val stageStarted = new StageStarted(stage) + sparkListeners.foreach{_.onStageStarted(stageStarted)} submitMissingTasks(stage) } } From fb0db764e355f011208c6d9c0ced4b4e03fec232 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Wed, 15 May 2013 14:16:28 -0700 Subject: [PATCH 08/12] b/c of initialization order, dag scheduler may be null --- core/src/main/scala/spark/SparkContext.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index cdb7153db2..53ba69da99 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -499,7 +499,8 @@ class SparkContext( private[spark] def executorStatus(es: ExecutorStatus) { - dagScheduler.sparkListeners.foreach{l => l.onExecutorStatusUpdate(es)} + if (dagScheduler != null && dagScheduler.sparkListeners != null) + dagScheduler.sparkListeners.foreach{l => l.onExecutorStatusUpdate(es)} } /** From b7db7b6e74467aa81b397af58c3915f9a781d0e6 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Wed, 19 Jun 2013 22:38:13 -0700 Subject: [PATCH 09/12] change ExecutorStatusPoller to use a concurrent executor with a fixed rate scheduler --- .../scheduler/ExecutorStatusPoller.scala | 40 ++++++++++--------- 1 file changed, 22 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/spark/scheduler/ExecutorStatusPoller.scala b/core/src/main/scala/spark/scheduler/ExecutorStatusPoller.scala index 825610e35c..5cc11a851d 100644 --- a/core/src/main/scala/spark/scheduler/ExecutorStatusPoller.scala +++ b/core/src/main/scala/spark/scheduler/ExecutorStatusPoller.scala @@ -4,6 +4,7 @@ import scala.collection.mutable import spark.scheduler.cluster.StandaloneSchedulerBackend import spark.scheduler.local.LocalScheduler import spark.Logging +import java.util.concurrent.{TimeUnit, Executors} /** * @@ -12,32 +13,35 @@ abstract class ExecutorStatusPoller extends Logging { val waitBetweenPolls = System.getProperty(ExecutorStatusPoller.OPEN_POLLS_WAIT_KEY, "100").toLong val executorToLastPoll = mutable.Map[String, Long]() - //simple round-robin poll of each executor. throttle the polling - val t = new Thread("executor-poller"){ - setDaemon(true) + val pool = Executors.newSingleThreadScheduledExecutor() + val poller = new Runnable() { override def run() { - while(true) { - val now = System.currentTimeMillis() - //if we also had the results come through this class, we could also throttle in terms of number of open polls - var minWait = waitBetweenPolls - executorList.foreach{executorId => - val lastPoll = executorToLastPoll.getOrElseUpdate(executorId, now) - val remainingWait = waitBetweenPolls - (now - lastPoll) - if ( remainingWait <= 0) { - pollExecutor(executorId) - executorToLastPoll(executorId) = System.currentTimeMillis() - } else if (remainingWait < minWait){ - minWait = remainingWait - } + val now = System.currentTimeMillis() + //if we also had the results come through this class, we could also throttle in terms of number of open polls + var minWait = waitBetweenPolls + executorList.foreach{executorId => + val lastPoll = executorToLastPoll.getOrElseUpdate(executorId, now) + val remainingWait = waitBetweenPolls - (now - lastPoll) + if ( remainingWait <= 0) { + pollExecutor(executorId) + executorToLastPoll(executorId) = System.currentTimeMillis() + } else if (remainingWait < minWait){ + minWait = remainingWait } - Thread.sleep(minWait) } } } - t.start + + // schedule repeated task + pool.scheduleAtFixedRate(poller, 0, waitBetweenPolls, TimeUnit.MILLISECONDS) def executorList: Seq[String] def pollExecutor(executorId: String) + def shutdown() { + // gracefully shutdown the poller + pool.shutdown() + pool.awaitTermination(30, TimeUnit.SECONDS) + } } class StandaloneExecutorStatusPoller(val sched: StandaloneSchedulerBackend) extends ExecutorStatusPoller { From 8441f11a968911a7d903650cdf32f99c48cb472c Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Thu, 20 Jun 2013 10:09:43 -0700 Subject: [PATCH 10/12] combine allExecutors and totalCoreCount --- .../scala/spark/scheduler/ExecutorStatusPoller.scala | 2 +- .../scheduler/cluster/StandaloneSchedulerBackend.scala | 10 +++------- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/spark/scheduler/ExecutorStatusPoller.scala b/core/src/main/scala/spark/scheduler/ExecutorStatusPoller.scala index 5cc11a851d..1b5e838dba 100644 --- a/core/src/main/scala/spark/scheduler/ExecutorStatusPoller.scala +++ b/core/src/main/scala/spark/scheduler/ExecutorStatusPoller.scala @@ -45,7 +45,7 @@ abstract class ExecutorStatusPoller extends Logging { } class StandaloneExecutorStatusPoller(val sched: StandaloneSchedulerBackend) extends ExecutorStatusPoller { - override def executorList = sched.allExecutors.toSeq + override def executorList = sched.allExecutors.keys.toSeq override def pollExecutor(executorId: String) { sched.requestExecutorStatus(executorId) } diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index 9909eb53c2..c3f0f6e9b2 100644 --- a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -21,9 +21,7 @@ private[spark] class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: ActorSystem) extends SchedulerBackend with Logging { - // Use an atomic variable to track total number of cores in the cluster for simplicity and speed - var totalCoreCount = new AtomicInteger(0) - var allExecutors = new HashSet[String] + var allExecutors = HashMap[String, Int]() class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor { private val executorActor = new HashMap[String, ActorRef] @@ -48,7 +46,7 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor logInfo("Registered executor: " + sender + " with ID " + executorId) sender ! RegisteredExecutor(sparkProperties) context.watch(sender) - allExecutors.synchronized(allExecutors += executorId) + allExecutors.synchronized(allExecutors += executorId -> cores) executorActor(executorId) = sender executorHostPort(executorId) = hostPort freeCores(executorId) = cores @@ -56,7 +54,6 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor executorAddress(executorId) = sender.path.address actorToExecutorId(sender) = executorId addressToExecutorId(sender.path.address) = executorId - totalCoreCount.addAndGet(cores) makeOffers() } @@ -129,7 +126,6 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor totalCores -= executorId allExecutors.synchronized(allExecutors -= executorId) executorHostPort -= executorId - totalCoreCount.addAndGet(-numCores) scheduler.executorLost(executorId, SlaveLost(reason)) } } @@ -175,7 +171,7 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor } override def defaultParallelism() = Option(System.getProperty("spark.default.parallelism")) - .map(_.toInt).getOrElse(math.max(totalCoreCount.get(), 2)) + .map(_.toInt).getOrElse(math.max(allExecutors.values.sum, 2)) // Called by subclasses when notified of a lost worker def removeExecutor(executorId: String, reason: String) { From 718af5129377f88db7375efe8bd88e6d49051b5d Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Thu, 20 Jun 2013 10:12:49 -0700 Subject: [PATCH 11/12] shutdown the statusPoller --- .../spark/scheduler/cluster/SparkDeploySchedulerBackend.scala | 1 + core/src/main/scala/spark/scheduler/local/LocalScheduler.scala | 1 + 2 files changed, 2 insertions(+) diff --git a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 998c29d865..da65f82c3f 100644 --- a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -44,6 +44,7 @@ private[spark] class SparkDeploySchedulerBackend( stopping = true super.stop() client.stop() + executorStatusPoller.shutdown() if (shutdownCallback != null) { shutdownCallback(this) } diff --git a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala index ba2c311a10..921c97abac 100644 --- a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala +++ b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala @@ -148,6 +148,7 @@ private[spark] class LocalScheduler(threads: Int, maxFailures: Int, sc: SparkCon } override def stop() { + statusPoller.shutdown() threadPool.shutdownNow() } From f8bba7a7db08c919aa130473b2f25dd987c37ea2 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Thu, 20 Jun 2013 10:18:28 -0700 Subject: [PATCH 12/12] rename fields in ExecutorActivitySummary to make it more obvious that it aggregates many samples --- core/src/main/scala/spark/scheduler/SparkListener.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/spark/scheduler/SparkListener.scala b/core/src/main/scala/spark/scheduler/SparkListener.scala index 62a7df35ea..2196d3502f 100644 --- a/core/src/main/scala/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/spark/scheduler/SparkListener.scala @@ -167,12 +167,12 @@ object StatsReportListener extends Logging { } } -case class ExecutorActivitySummary(activeCores: Int, totalCores: Int) { +case class ExecutorActivitySummary(activeCoresSampled: Int, totalCoresSampled: Int) { def +(execStatus: ExecutorStatus): ExecutorActivitySummary = { - ExecutorActivitySummary(activeCores + execStatus.activeTasks, totalCores + execStatus.availableCores) + ExecutorActivitySummary(activeCoresSampled + execStatus.activeTasks, totalCoresSampled + execStatus.availableCores) } - def activePercent: Double = (activeCores.toDouble / totalCores) * 100 + def activePercent: Double = (activeCoresSampled.toDouble / totalCoresSampled) * 100 }