title | date | tags | hidden |
---|---|---|---|
Spark SchedulerPool Deep Dive |
2019-01-13 03:35:42 -0800 |
true |
在 TaskSchedulerImpl
的initialize
方法里 schedulableBuilder.buildPools
初始化了 scheuler pool.其中有两种SchedulingMode[FIFO,FAIR]
/**
* An interface for schedulable entities.
* there are two type of Schedulable entities(Pools and TaskSetManagers)
*/
private[spark] trait Schedulable {
var parent: Pool
// child queues
def schedulableQueue: ConcurrentLinkedQueue[Schedulable]
def schedulingMode: SchedulingMode
def weight: Int
def minShare: Int
def runningTasks: Int
def priority: Int
def stageId: Int
def name: String
def addSchedulable(schedulable: Schedulable): Unit
def removeSchedulable(schedulable: Schedulable): Unit
def getSchedulableByName(name: String): Schedulable
def executorLost(executorId: String, host: String, reason: ExecutorLossReason): Unit
def checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean
def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager]
}
/**
* An interface for sort algorithm
* FIFO: FIFO algorithm between TaskSetManagers
* FS: FS algorithm between Pools, and FIFO or FS within Pools
*/
private[spark] trait SchedulingAlgorithm {
def comparator(s1: Schedulable, s2: Schedulable): Boolean
}
private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {
override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
val priority1 = s1.priority
val priority2 = s2.priority
var res = math.signum(priority1 - priority2)
if (res == 0) {
val stageId1 = s1.stageId
val stageId2 = s2.stageId
res = math.signum(stageId1 - stageId2)
}
res < 0
}
}
private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {
override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
val minShare1 = s1.minShare
val minShare2 = s2.minShare
val runningTasks1 = s1.runningTasks
val runningTasks2 = s2.runningTasks
val s1Needy = runningTasks1 < minShare1
val s2Needy = runningTasks2 < minShare2
val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0)
val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0)
val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble
val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble
var compare = 0
if (s1Needy && !s2Needy) {
return true
} else if (!s1Needy && s2Needy) {
return false
} else if (s1Needy && s2Needy) {
compare = minShareRatio1.compareTo(minShareRatio2)
} else {
compare = taskToWeightRatio1.compareTo(taskToWeightRatio2)
}
if (compare < 0) {
true
} else if (compare > 0) {
false
} else {
s1.name < s2.name
}
}
}
每一个 TaskSet 会创建一个TaskSetManager, 然后放到
org.apache.spark.scheduler.TaskSchedulerImpl#submitTasks(taskSet: TaskSet) {
...
val manager = createTaskSetManager(taskSet, maxTaskFailures)
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
...
}