cool hit counter [spark]scheduling model(FIFO&FAIR)_Intefrankly

[spark]scheduling model(FIFO&FAIR)


The scheduling of spark applications is reflected in two places, the first is the scheduling between spark applications by Yarn, and the second is the scheduling of multiple TaskSetManager within a spark application (the same SparkContext), here for the time being, only the intra-application scheduling is analyzed.

There are two scheduling modes for spark: FIFO (first-in-first-out) and FAIR (fair scheduling). The default is FIFO, i.e. whoever commits first executes first, while FAIR supports further grouping in the scheduling pool and can have different weights to decide who executes first based on weights, resources, etc. The scheduling mode of spark can be set via spark.scheduler.mode.

Scheduling pool initialization

After the DAGScheluer has divided the stage for the job and submitted it to the TaskScheduler as a TaskSet, the TaskScheduler implementation class creates a TaskSetMagager object for each TaskSet and adds the object to the scheduling pool.


The schedulableBuilder is instantiated by newTaskSchedulerImpl(sc) in SparkContext when creating the TaskSchedulerImpl via the initialize method of scheduler.initialize(backend).

def initialize(backend: SchedulerBackend) {
    this.backend = backend
    // temporarily set rootPool name to empty
    rootPool = new Pool("", schedulingMode, 0, 0)
    schedulableBuilder = {
      schedulingMode match {
        case SchedulingMode.FIFO =>
          new FIFOSchedulableBuilder(rootPool)
        case SchedulingMode.FAIR =>
          new FairSchedulableBuilder(rootPool, conf)
        case _ =>
          throw new IllegalArgumentException(s"Unsupported spark.scheduler.mode: $schedulingMode")

You can see that the program creates different scheduling pools based on the configuration. There are two implementations of schedulableBuilder, FIFOSchedulableBuilder and FairSchedulableBuilder, followed by a call to schedulableBuilder.buildPools(), so let's see how both are implemented.

override def buildPools() {
    // nothing

FIFOSchedulableBuilder did nothing.

override def buildPools() {
    var is: Option[InputStream] = None
    try {
      is = Option { { f =>
          new FileInputStream(f)
        }.getOrElse {
      // Create from configuration filebuildFairSchedulerPool
      is.foreach { i => buildFairSchedulerPool(i) }
    } finally {

    // finally create "default" pool

You can see that the buildPools method of FairSchedulableBuilder will first read the FAIR schema configuration file located at SPARK_HOME/conf/fairscheduler.xml by default, or you can set a user-defined configuration file via the parameter spark.scheduler.allocation.file. The template is as follows.

  <pool name="production">
  <pool name="test">

Of which.

  • name: the name of the scheduling pool, you can specify a scheduling pool according to spark.scheduler.pool in the program, if not, the scheduling pool with the name default will be used.
  • schedulingMode: scheduling mode
  • weigt: weight (a weight of 2 is allocated twice as many resources as a weight of 1), if set to 1000, this scheduling pool will run as soon as a task is available, default is 1
  • minShare: the minimum number of resources (cores) required by the scheduling pool, default is 0

FAIR can be configured with multiple scheduling pools, i.e. inside the rootPool is still a set of Pools, and the Pools contain the TaskSetMagager. FairSchedulableBuilderwill Create from configuration filebuildFairSchedulerPool。

private def buildFairSchedulerPool(is: InputStream) {
    val xml = XML.load(is)
    for (poolNode <- (xml \ POOLS_PROPERTY)) {

      val poolName = (poolNode  POOL_NAME_PROPERTY).text
      var schedulingMode = DEFAULT_SCHEDULING_MODE
      var minShare = DEFAULT_MINIMUM_SHARE
      var weight = DEFAULT_WEIGHT

      val xmlSchedulingMode = (poolNode  SCHEDULING_MODE_PROPERTY).text
      if (xmlSchedulingMode != "") {
        try {
          schedulingMode = SchedulingMode.withName(xmlSchedulingMode)
        } catch {
          case e: NoSuchElementException =>
            logWarning(s"Unsupported schedulingMode: $xmlSchedulingMode, " +
              s"using the default schedulingMode: $schedulingMode")

      val xmlMinShare = (poolNode  MINIMUM_SHARES_PROPERTY).text
      if (xmlMinShare != "") {
        minShare = xmlMinShare.toInt

      val xmlWeight = (poolNode  WEIGHT_PROPERTY).text
      if (xmlWeight != "") {
        weight = xmlWeight.toInt

      val pool = new Pool(poolName, schedulingMode, minShare, weight)
      logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
        poolName, schedulingMode, minShare, weight))

Instantiate a Pool object based on each field value (default if not set) and add it to the rootPool.

A spark application contains a TaskScheduler, a TaskScheduler contains a unique RootPool, FIFO has only one layer of Pools containing the TaskSetMagager, while FARI contains two layers of Pools, the RootPool contains the subPool, the subPool contains the TaskSetMagager, and the RootPool is all created when the SchedulableBuilder is instantiated.

private def buildDefaultPool() {
    if (rootPool.getSchedulableByName(DEFAULT_POOL_NAME) == null) {
      logInfo("Created default pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(

If the scheduling pool created according to the configuration file does not have a scheduling pool with the name default, a scheduling pool with the name default is created with all parameters as default values.

Scheduling pool adds TaskSetMagager

The final implementation is the same for both scheduling modes, although FAIR will fetch the scheduling pool to be used before adding it, defaulting to the one with the name default.

override def addSchedulable(schedulable: Schedulable) {
    require(schedulable != null)
    schedulableNameToSchedulable.put(, schedulable)
    schedulable.parent = this

When adding a TaskSetMagager it is added to the tail of the queue, fetching is done from the head. For FIFO, parentPool is all RootPool, and FAIR, TaskSetMagager's parentPool is all a subPool of RootPool.

Scheduling pool to TaskSetMagager sorting algorithm

TaskScheduler will schedule all TaskSetMagager after the executor resources it gets through SchedulerBackend. Get the sorted TaskSetMagager via rootPool.getSortedTaskSetQueue.

override def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager] = {
    var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]
    val sortedSchedulableQueue =
    for (schedulable <- sortedSchedulableQueue) {
      sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue

It can be seen that the algorithm at the core of sorting is in taskSetSchedulingAlgorithm.comparator, and the corresponding implementations of taskSetSchedulingAlgorithm are different for the two modes.

var taskSetSchedulingAlgorithm: SchedulingAlgorithm = {
    schedulingMode match {
      case SchedulingMode.FAIR =>
        new FairSchedulingAlgorithm()
      case SchedulingMode.FIFO =>
        new FIFOSchedulingAlgorithm()
      case _ =>
        val msg = "Unsupported scheduling mode: $schedulingMode. Use FAIR or FIFO instead."
        throw new IllegalArgumentException(msg)

The algorithm class for the FIFO mode is FIFOSchedulingAlgorithm and the algorithm implementation class for the FAIR mode is FairSchedulingAlgorithm. See below the implementation of the comparison function in both modes, FIFO.

override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
    val priority1 = s1.priority
    val priority2 = s2.priority
    var res = math.signum(priority1 - priority2)
    if (res == 0) {
      val stageId1 = s1.stageId
      val stageId2 = s2.stageId
      res = math.signum(stageId1 - stageId2)
    res < 0
  1. First compare thepriority, in FIFO this priority is actually the Job ID, the earlier the job is submitted the smaller the jobId, the smaller thepriority and the higher the priority.
  2. If thepriority is the same, it means it is a TaskSetMagager in the same job, then compare the StageId, the smaller the StageId the higher the priority.

See FAIR's sorting algorithm below.

override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
    val minShare1 = s1.minShare
    val minShare2 = s2.minShare
    val runningTasks1 = s1.runningTasks
    val runningTasks2 = s2.runningTasks
    val s1Needy = runningTasks1 < minShare1
    val s2Needy = runningTasks2 < minShare2
    val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0)
    val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0)
    val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble
    val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble

    var compare = 0
    if (s1Needy && !s2Needy) {
      return true
    } else if (!s1Needy && s2Needy) {
      return false
    } else if (s1Needy && s2Needy) {
      compare = minShareRatio1.compareTo(minShareRatio2)
    } else {
      compare = taskToWeightRatio1.compareTo(taskToWeightRatio2)
    if (compare < 0) {
    } else if (compare > 0) {
    } else { <
  1. Scheduling pools running with less than minShare have a higher priority than those that are not less.
  2. If both run smaller number of tasks than minShare, then compare minShare usage, the lower the usage the higher the priority.
  3. If both have the same minShare usage, the weighted usage is compared, and the lower the usage about the higher the priority.
  4. If the weights are also the same, the names are compared.

In FAIR mode, you need to sort the subPool first and then sort the TaskSetMagager inside the subPool, because both Pool and TaskSetMagager inherit Schedulable traits and both use the FairSchedulingAlgorithm.

1、The design and construction of Internet data centers is evolving
2、The little gray to engage in machine is not easy clip doll god online teaching
3、2018 Shanghai Artificial Intelligence Innovation and Development Project Declaration is now open
4、Those who dont have a drivers license yet can rest assured that it may not be required in the future
5、03An introduction to Gits transfer protocol

    已推荐到看一看 和朋友分享想法
    最多200字,当前共 发送