cool hit counter [spark] spark speculative execution_Intefrankly

[spark] spark speculative execution


A speculative task is a Task that is dragging its feet inside a Stage, which is started again on the Executor of other nodes, and if one of the Task instances runs successfully then the result of the computation of this first completed Task is taken as the final result, while taking out the instances running on other Executors. Spark speculative execution is off by default and can be turned on via the spark.speculation property.

Detects if there is a Task that needs to be executed speculatively

Immediately after the SparkContext creates the schedulerBackend and taskScheduler, the start method of the taskScheduler is called.

override def start() {
    if (!isLocal && conf.getBoolean("spark.speculation", false)) {
      logInfo("Starting speculative execution thread")
      speculationScheduler.scheduleAtFixedRate(new Runnable {
        override def run(): Unit = Utils.tryOrStopSparkContext(sc) {

As you can see, after starting SchedulerBackend, TaskScheduler checks if speculative execution is on in non-local mode (off by default, can be turned on via spark.speculation), and if it is on, it starts a thread every SPECULATION_INTERVAL_MS (default 100ms, can be set via the spark.speculation.interval property) to detect if there are tasks that need speculative execution via the checkSpeculatableTasks method.

// Check for speculatable tasks in all our active jobs.
  def checkSpeculatableTasks() {
    var shouldRevive = false
    synchronized {
      shouldRevive = rootPool.checkSpeculatableTasks()
    if (shouldRevive) {

Then again, the rootPool's method determines if there are tasks that need speculative execution, and if so, it calls SchedulerBackend's reviveOffers to try to get resources to run the speculative tasks. Continuing to see what the detection logic looks like.

override def checkSpeculatableTasks(): Boolean = {
    var shouldRevive = false
    for (schedulable <- schedulableQueue.asScala) {
      shouldRevive |= schedulable.checkSpeculatableTasks()

In the rootPool and called the method of schedulable, schedulable is ConcurrentLinkedQueue [Schedulable] type, the queue is placed inside the TaskSetMagager, and then look at the TaskSetMagager checkSpeculatableTasks method, finally found the root of the detection: the

 override def checkSpeculatableTasks(): Boolean = {
// There is no need to detect if there is only one task or if all the tasks do not need to be executed anymore
    if (isZombie || numTasks == 1) {  
      return false
    var foundTasks = false
     // number of all tasks * SPECULATION_QUANTILE (default 0.75, can be set via spark.speculation.quantile) 
    val minFinishedForSpeculation = (SPECULATION_QUANTILE * numTasks).floor.toInt
    logDebug("Checking for speculative tasks: minFinished = " + minFinishedForSpeculation)
     // whether the number of successful tasks exceeds 75% of the total and whether the number of successful tasks is greater than 0
    if (tasksSuccessful >= minFinishedForSpeculation && tasksSuccessful > 0) {
      val time = clock.getTimeMillis()
       // Filter out the execution time of successfully executed tasks and sort them
      val durations = taskInfos.values.filter(_.successful).map(_.duration).toArray
      // Take the median of these multiple times
      val medianDuration = durations(min((0.5 * tasksSuccessful).round.toInt, durations.length - 1))
       // median * SPECULATION_MULTIPLIER (default 1.5, can be set via spark.speculation.multiplier)
      val threshold = max(SPECULATION_MULTIPLIER * medianDuration, 100)
      logDebug("Task length threshold for speculation: " + threshold)
       // Iterate through the tasks in this TaskSet, taking the tasks that have not been executed successfully, are being executed, have been executed for longer than the threshold, and have been executed for longer than the threshold.
       // tasks not included in the list of speculatively executable tasks are put into the list of speculatively executableTasks
      for ((tid, info) <- taskInfos) {
        val index = info.index
        if (!successful(index) && copiesRunning(index) == 1 && info.timeRunning(time) > threshold &&
          !speculatableTasks.contains(index)) {
            "Marking task %d in stage %s (on %s) as speculatable because it ran more than %.0f ms"
              .format(index,,, threshold))
          speculatableTasks += index
          foundTasks = true

The check logic code is well commented, when the number of successful Tasks exceeds 75% of the total number of Tasks (can be set by the parameter spark.speculation.quantile), then count the runtime of all successful Tasks, get a median, multiply this median by 1.5 (can be controlled by the parameter spark.speculation.multiplier) to get the runtime threshold, and if the runtime of a running Tasks exceeds this threshold, enable speculation on it. This simply means enabling speculation on those Tasks that slow down the overall progress to speed up the entire Stage. The approximate flow of the algorithm is shown in the figure.

When speculative tasks are scheduled

The dequeueTask method is called when TaskSetMagager assigns a task to an executor under the deferred scheduling policy.

private def dequeueTask(execId: String, host: String, maxLocality: TaskLocality.Value)
    : Option[(Int, TaskLocality.Value, Boolean)] =
    for (index <- dequeueTaskFromList(execId, getPendingTasksForExecutor(execId))) {
      return Some((index, TaskLocality.PROCESS_LOCAL, false))

    if (TaskLocality.isAllowed(maxLocality, TaskLocality.NODE_LOCAL)) {
      for (index <- dequeueTaskFromList(execId, getPendingTasksForHost(host))) {
        return Some((index, TaskLocality.NODE_LOCAL, false))
    // find a speculative task if all others tasks have been scheduled
    dequeueSpeculativeTask(execId, host, maxLocality).map {
      case (taskIndex, allowedLocality) => (taskIndex, allowedLocality, true)}

The last paragraph of the method is to schedule for the speculative task after all other tasks have been scheduled, look at the starting implementation.

protected def dequeueSpeculativeTask(execId: String, host: String, locality: TaskLocality.Value)
    : Option[(Int, TaskLocality.Value)] =
     // remove the task that has successfully completed from the list of speculative execution tasks, since there is a period of time between detection and scheduling.
     // Some tasks have been successfully executed
    speculatableTasks.retain(index => !successful(index)) // Remove finished tasks from set
      // Determine if the task can be executed on the Host corresponding to this executor, with the following condition.
      // task is not running on that host.
     // shouldexecutor Not intask The blacklist of the(task In thisexecutor It has failed on, and also in' darkly' short time)
    def canRunOnHost(index: Int): Boolean =
      !hasAttemptOnHost(index, host) && !executorIsBlacklisted(execId, index)
    if (!speculatableTasks.isEmpty) {
       // Get the taskIndex that can be started on this executor
      for (index <- speculatableTasks if canRunOnHost(index)) {
         // Get the priority position of the task
        val prefs = tasks(index).preferredLocations 
        val executors = prefs.flatMap(_ match {
          case e: ExecutorCacheTaskLocation => Some(e.executorId)
          case _ => None
         // If the priority location is ExecutorCacheTaskLocation and the executor where the data is located contains the current executor.
         // then return the index and Locality Levels of its task in taskSet
        if (executors.contains(execId)) {
          speculatableTasks -= index
          return Some((index, TaskLocality.PROCESS_LOCAL))

// The judgment here is a function of delayed scheduling, trying to start even speculative tasks at the best locality level possible
      if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) {
        for (index <- speculatableTasks if canRunOnHost(index)) {
          val locations = tasks(index)
          if (locations.contains(host)) {
            speculatableTasks -= index
            return Some((index, TaskLocality.NODE_LOCAL))


The code is too long only the first part is listed, but it's all similar logic and the comments in the code are clear. First filter out the tasks that have been successfully executed, in addition, presume that the execution task is not executed in the same Host as the task being executed, not in the blacklisted executor, and then decide whether to be scheduled for execution at some locality level on that executor based on the priority position of the task under the deferred scheduling policy.

1、Thanksgiving PreGift of New AI Book
2、Say one thing
3、CreditBuilder continue to increase collection efforts to boost user confidence
4、The child who speaks the truth in The Emperors New ClothesBook Summary of The Wind Blows South of the River Internet Finance II
5、2017 Internet Consumer Finance Industry Golden Shrimp Award Winner fast loans

    已推荐到看一看 和朋友分享想法
    最多200字,当前共 发送