-
Notifications
You must be signed in to change notification settings - Fork 385
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Regularly poll executors to track their utilization #613
base: master
Are you sure you want to change the base?
Changes from all commits
8908db3
ffeee81
6bdd9be
89f4cb5
656422c
9008a55
f71adb1
fb0db76
b7db7b6
8441f11
718af51
f8bba7a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
package spark.scheduler | ||
|
||
import scala.collection.mutable | ||
import spark.scheduler.cluster.StandaloneSchedulerBackend | ||
import spark.scheduler.local.LocalScheduler | ||
import spark.Logging | ||
import java.util.concurrent.{TimeUnit, Executors} | ||
|
||
/** | ||
* | ||
*/ | ||
abstract class ExecutorStatusPoller extends Logging { | ||
val waitBetweenPolls = System.getProperty(ExecutorStatusPoller.OPEN_POLLS_WAIT_KEY, "100").toLong | ||
val executorToLastPoll = mutable.Map[String, Long]() | ||
|
||
val pool = Executors.newSingleThreadScheduledExecutor() | ||
val poller = new Runnable() { | ||
override def run() { | ||
val now = System.currentTimeMillis() | ||
//if we also had the results come through this class, we could also throttle in terms of number of open polls | ||
var minWait = waitBetweenPolls | ||
executorList.foreach{executorId => | ||
val lastPoll = executorToLastPoll.getOrElseUpdate(executorId, now) | ||
val remainingWait = waitBetweenPolls - (now - lastPoll) | ||
if ( remainingWait <= 0) { | ||
pollExecutor(executorId) | ||
executorToLastPoll(executorId) = System.currentTimeMillis() | ||
} else if (remainingWait < minWait){ | ||
minWait = remainingWait | ||
} | ||
} | ||
} | ||
} | ||
|
||
// schedule repeated task | ||
pool.scheduleAtFixedRate(poller, 0, waitBetweenPolls, TimeUnit.MILLISECONDS) | ||
|
||
def executorList: Seq[String] | ||
def pollExecutor(executorId: String) | ||
def shutdown() { | ||
// gracefully shutdown the poller | ||
pool.shutdown() | ||
pool.awaitTermination(30, TimeUnit.SECONDS) | ||
} | ||
} | ||
|
||
class StandaloneExecutorStatusPoller(val sched: StandaloneSchedulerBackend) extends ExecutorStatusPoller { | ||
override def executorList = sched.allExecutors.keys.toSeq | ||
override def pollExecutor(executorId: String) { | ||
sched.requestExecutorStatus(executorId) | ||
} | ||
} | ||
|
||
class LocalExecutorStatusPoller(val sched: LocalScheduler) extends ExecutorStatusPoller { | ||
override def executorList = Seq("local") //just needs to have one element, value doesn't matter | ||
override def pollExecutor(executorId: String) { | ||
sched.reportExecutorStatus | ||
} | ||
} | ||
|
||
object ExecutorStatusPoller { | ||
val OPEN_POLLS_WAIT_KEY = "spark.executor_poll.wait_ms" | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,27 +6,56 @@ import spark.{Utils, Logging} | |
import spark.executor.TaskMetrics | ||
|
||
trait SparkListener { | ||
|
||
/** | ||
* called when spark starts computing a new stage | ||
*/ | ||
def onStageStarted(stageStarted: StageStarted) | ||
|
||
/** | ||
* called when a stage is completed, with information on the completed stage | ||
*/ | ||
def onStageCompleted(stageCompleted: StageCompleted) | ||
|
||
/** | ||
* called when there is information on the status of an executor. This may get called at any time. There may not be | ||
* any active stages when this is called. Furthermore, it may be called often, so don't do anything expensive here. | ||
*/ | ||
def onExecutorStatusUpdate(executorStatus: ExecutorStatus) | ||
} | ||
|
||
sealed trait SparkListenerEvents | ||
|
||
case class StageStarted(val stage: Stage) extends SparkListenerEvents | ||
|
||
case class StageCompleted(val stageInfo: StageInfo) extends SparkListenerEvents | ||
|
||
case class ExecutorStatus(val executorId: String, val activeTasks: Int, val availableCores: Int) | ||
extends SparkListenerEvents | ||
|
||
|
||
/** | ||
* Simple SparkListener that logs a few summary statistics when each stage completes | ||
*/ | ||
class StatsReportListener extends SparkListener with Logging { | ||
|
||
var activeStageToExecutorStatus = Map[Int, ExecutorActivitySummary]() | ||
|
||
def onStageStarted(stageStarted: StageStarted) { | ||
activeStageToExecutorStatus += stageStarted.stage.id -> ExecutorActivitySummary(0,0) | ||
} | ||
|
||
def onStageCompleted(stageCompleted: StageCompleted) { | ||
import spark.scheduler.StatsReportListener._ | ||
implicit val sc = stageCompleted | ||
val execStatus = activeStageToExecutorStatus(stageCompleted.stageInfo.stage.id) | ||
activeStageToExecutorStatus -= stageCompleted.stageInfo.stage.id | ||
this.logInfo("Finished stage: " + stageCompleted.stageInfo) | ||
showMillisDistribution("task runtime:", (info, _) => Some(info.duration)) | ||
|
||
//overall work distribution | ||
this.logInfo("executor utilization: %2.0f %%".format(execStatus.activePercent)) | ||
|
||
//shuffle write | ||
showBytesDistribution("shuffle bytes written:",(_,metric) => metric.shuffleWriteMetrics.map{_.shuffleBytesWritten}) | ||
|
||
|
@@ -44,6 +73,13 @@ class StatsReportListener extends SparkListener with Logging { | |
showDistribution("other time pct: ", Distribution(runtimePcts.map{_.other * 100}), "%2.0f %%") | ||
} | ||
|
||
def onExecutorStatusUpdate(executorStatus: ExecutorStatus) { | ||
//update ALL active stages | ||
activeStageToExecutorStatus.foreach{case(k,v) => | ||
activeStageToExecutorStatus += k -> (v + executorStatus) | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note that (You could achieve the same effect w/out actually sending all the messages when there are no active stages, but the cluster is idle anyway, so why not.) |
||
} | ||
|
||
} | ||
|
||
object StatsReportListener extends Logging { | ||
|
@@ -131,6 +167,16 @@ object StatsReportListener extends Logging { | |
} | ||
} | ||
|
||
case class ExecutorActivitySummary(activeCoresSampled: Int, totalCoresSampled: Int) { | ||
def +(execStatus: ExecutorStatus): ExecutorActivitySummary = { | ||
ExecutorActivitySummary(activeCoresSampled + execStatus.activeTasks, totalCoresSampled + execStatus.availableCores) | ||
} | ||
|
||
def activePercent: Double = (activeCoresSampled.toDouble / totalCoresSampled) * 100 | ||
} | ||
|
||
|
||
|
||
|
||
|
||
case class RuntimePercentage(executorPct: Double, fetchPct: Option[Double], other: Double) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd suggest using a java.util.concurrent.ScheduledThreadPoolExecutor instead of an infinite while loop. This would also let you schedule the poller at a fixed interval without having to manage the sleep "catchup" time yourself, e.g.:
This also lets you gracefully stop the poller via:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point, I will make that change. This also got me thinking -- do I even want to create a new thread at all? Is there an appropriate thread pool for these repeated tasks already?