Skip to content

Latest commit

 

History

History
480 lines (303 loc) · 22.8 KB

spark-tasksetmanager.adoc

File metadata and controls

480 lines (303 loc) · 22.8 KB

TaskSetManager

A TaskSetManager is a Schedulable that manages execution of the tasks in a single TaskSet (after having it been handed over by TaskScheduler).

TaskSetManager TaskSchedulerImpl TaskSet
Figure 1. TaskSetManager and its Dependencies

The responsibilities of a TaskSetManager include (follow along the links to learn more in the corresponding sections):

Tip

Enable DEBUG logging level for org.apache.spark.scheduler.TaskSetManager logger to see what happens under the covers in TaskSetManager.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.scheduler.TaskSetManager=DEBUG

TaskSetManager is Schedulable

TaskSetManager is a Schedulable with the following implementation:

  • name is TaskSet_[taskSet.stageId.toString]

  • no parent is ever assigned, i.e. it is always null.

    It means that it can only be a leaf in the tree of Schedulables (with Pools being the nodes).

  • schedulingMode always returns SchedulingMode.NONE (since there is nothing to schedule).

  • weight is always 1.

  • minShare is always 0.

  • runningTasks is the number of running tasks in the internal runningTasksSet.

  • priority is the priority of the owned TaskSet (using taskSet.priority).

  • stageId is the stage id of the owned TaskSet (using taskSet.stageId).

  • schedulableQueue returns no queue, i.e. null.

  • addSchedulable and removeSchedulable do nothing.

  • getSchedulableByName always returns null.

  • getSortedTaskSetQueue returns a one-element collection with the sole element being itself.

  • executorLost

  • checkSpeculatableTasks

Handling Executor Lost Events (executorLost method)

Note
executorLost is part of the Schedulable Contract which is called by TaskSchedulerImpl to inform TaskSetManagers about executors being lost.

Since TaskSetManager manages execution of the tasks in a single TaskSet, when an executor gets lost, the affected tasks that have been running on the failed executor need to be re-enqueued. executorLost is the mechanism to "announce" the event to all TaskSetManagers.

executorLost first checks whether the TaskSet is for a ShuffleMapStage (in which case all TaskSet.tasks are instances of ShuffleMapTask) as well as whether an external shuffle server is used (that could serve the shuffle outputs in case of failure).

If it is indeed for a failed ShuffleMapStage and no external shuffle server is enabled, all successfully-completed tasks for the failed executor (using taskInfos internal registry) get added to the collection of pending tasks and the DAGScheduler is informed about resubmission (as Resubmitted end reason).

The internal registries - successful, copiesRunning, and tasksSuccessful - are updated.

Regardless of the above check, all currently-running tasks for the failed executor are reported as failed (with the task state being FAILED).

recomputeLocality is called.

Checking Speculatable Tasks (checkSpeculatableTasks method)

Note
checkSpeculatableTasks is part of the Schedulable Contract.

checkSpeculatableTasks checks whether there are speculatable tasks in the TaskSet.

Note
checkSpeculatableTasks is called by TaskSchedulerImpl.checkSpeculatableTasks.

If the TaskSetManager is zombie or has a single task in TaskSet, it assumes no speculatable tasks.

The method goes on with the assumption of no speculatable tasks by default.

It computes the minimum number of finished tasks for speculation (as spark.speculation.quantile of all the finished tasks).

You should see the DEBUG message in the logs:

DEBUG Checking for speculative tasks: minFinished = [minFinishedForSpeculation]

It then checks whether the number is equal or greater than the number of tasks completed successfully (using tasksSuccessful).

Having done that, it computes the median duration of all the successfully completed tasks (using taskInfos) and task length threshold using the median duration multiplied by spark.speculation.multiplier that has to be equal or less than 100.

You should see the DEBUG message in the logs:

DEBUG Task length threshold for speculation: [threshold]

For each task (using taskInfos) that is not marked as successful yet (using successful) for which there is only one copy running (using copiesRunning) and the task takes more time than the calculated threshold, but it was not in speculatableTasks it is assumed speculatable.

You should see the following INFO message in the logs:

INFO Marking task [index] in stage [taskSet.id] (on [info.host]) as speculatable because it ran more than [threshold] ms

The task gets added to the internal speculatableTasks collection. The method responds positively.

addPendingTask

Caution
FIXME

dequeueSpeculativeTask

Caution
FIXME

dequeueTask

Caution
FIXME

TaskSetManager.executorAdded

executorAdded simply calls recomputeLocality method.

TaskSetManager.recomputeLocality

recomputeLocality (re)computes locality levels as a indexed collection of task localities, i.e. Array[TaskLocality.TaskLocality].

Note
TaskLocality is an enumeration with PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY values.

The method starts with currentLocalityIndex being 0.

It checks whether pendingTasksForExecutor has at least one element, and if so, it looks up spark.locality.wait.* for PROCESS_LOCAL and checks whether there is an executor for which TaskSchedulerImpl.isExecutorAlive is true. If the checks pass, PROCESS_LOCAL becomes an element of the result collection of task localities.

The same checks are performed for pendingTasksForHost, NODE_LOCAL, and TaskSchedulerImpl.hasExecutorsAliveOnHost to add NODE_LOCAL to the result collection of task localities.

Then, the method checks pendingTasksWithNoPrefs and if it’s not empty, NO_PREF becomes an element of the levels collection.

If pendingTasksForRack is not empty, and the wait time for RACK_LOCAL is defined, and there is an executor for which TaskSchedulerImpl.hasHostAliveOnRack is true, RACK_LOCAL is added to the levels collection.

ANY is the last and always-added element in the levels collection.

Right before the method finishes, it prints out the following DEBUG to the logs:

DEBUG Valid locality levels for [taskSet]: [levels]

myLocalityLevels, localityWaits, and currentLocalityIndex are recomputed.

TaskSetManager.resourceOffer

Caution
FIXME Review TaskSetManager.resourceOffer + Does this have anything related to the following section about scheduling tasks?
resourceOffer(
  execId: String,
  host: String,
  maxLocality: TaskLocality): Option[TaskDescription]

When a TaskSetManager is a zombie, resourceOffer returns no TaskDescription (i.e. None).

For a non-zombie TaskSetManager, resourceOffer…​FIXME

Caution
FIXME

It dequeues a pending task from the taskset by checking pending tasks per executor (using pendingTasksForExecutor), host (using pendingTasksForHost), with no localization preferences (using pendingTasksWithNoPrefs), rack (uses TaskSchedulerImpl.getRackForHost that seems to return "non-zero" value for YarnScheduler only)

From TaskSetManager.resourceOffer:

INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 192.168.1.4, partition 0,PROCESS_LOCAL, 1997 bytes)

If a serialized task is bigger than 100 kB (it is not a configurable value), a WARN message is printed out to the logs (only once per taskset):

WARN TaskSetManager: Stage [task.stageId] contains a task of very large size ([serializedTask.limit / 1024] KB). The maximum recommended task size is 100 KB.

A task id is added to runningTasksSet set and parent pool notified (using increaseRunningTasks(1) up the chain of pools).

The following INFO message appears in the logs:

INFO TaskSetManager: Starting task [id] in stage [taskSet.id] (TID [taskId], [host], partition [task.partitionId],[taskLocality], [serializedTask.limit] bytes)

For example:

INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, partition 1,PROCESS_LOCAL, 2054 bytes)

Scheduling Tasks in TaskSet

Caution
FIXME

For each submitted TaskSet, a new TaskSetManager is created. The TaskSetManager completely and exclusively owns a TaskSet submitted for execution.

Caution
FIXME A picture with TaskSetManager owning TaskSet
Caution
FIXME What component knows about TaskSet and TaskSetManager. Isn’t it that TaskSets are created by DAGScheduler while TaskSetManager is used by TaskSchedulerImpl only?

TaskSetManager requests the current epoch from MapOutputTracker and sets it on all tasks in the taskset.

You should see the following DEBUG in the logs:

DEBUG Epoch for [taskSet]: [epoch]
Caution
FIXME What’s epoch. Why is this important?

TaskSetManager keeps track of the tasks pending execution per executor, host, rack or with no locality preferences.

Locality-Aware Scheduling aka Delay Scheduling

TaskSetManager computes locality levels for the TaskSet for delay scheduling. While computing you should see the following DEBUG in the logs:

DEBUG Valid locality levels for [taskSet]:  [levels]
Caution
FIXME What’s delay scheduling?

Events

When a task has finished, the TaskSetManager calls DAGScheduler.taskEnded.

Caution
FIXME

TaskSetManager.handleSuccessfulTask

handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]) method marks the task (by tid) as successful and notifies the DAGScheduler that the task has ended.

It is called by…​ when…​FIXME

Caution
FIXME Describe TaskInfo

It marks TaskInfo (using taskInfos) as successful (using TaskInfo.markSuccessful()).

It removes the task from runningTasksSet. It also decreases the number of running tasks in the parent pool if it is defined (using parent and Pool.decreaseRunningTasks).

It notifies DAGScheduler that the task ended successfully (using DAGScheduler.taskEnded with Success as TaskEndReason).

If the task was not marked as successful already (using successful), tasksSuccessful is incremented and the following INFO message appears in the logs:

INFO Finished task [info.id] in stage [taskSet.id] (TID [info.taskId]) in [info.duration] ms on [info.host] ([tasksSuccessful]/[numTasks])
Note
A TaskSet knows about the stage id it is associated with.

It also marks the task as successful (using successful). Finally, if the number of tasks finished successfully is exactly the number of tasks the TaskSetManager manages, the TaskSetManager turns zombie.

Otherwise, when the task was already marked as successful, the following INFO message appears in the logs:

INFO Ignoring task-finished event for [info.id] in stage [taskSet.id] because task [index] has already completed successfully

failedExecutors.remove(index) is called.

Caution
FIXME What does failedExecutors.remove(index) mean?

At the end, the method checks whether the TaskSetManager is a zombie and no task is running (using runningTasksSet), and if so, it calls TaskSchedulerImpl.taskSetFinished.

TaskSetManager.handleFailedTask

handleFailedTask(tid: Long, state: TaskState, reason: TaskEndReason) method is called by TaskSchedulerImpl or executorLost.

Caution
FIXME image with handleFailedTask (and perhaps the other parties involved)

The method first checks whether the task has already been marked as failed (using taskInfos) and if it has, it quits.

It removes the task from runningTasksSet and informs the parent pool to decrease its running tasks.

It marks the TaskInfo as failed and grabs its index so the number of copies running of the task is decremented (see copiesRunning).

Caution
FIXME Describe TaskInfo

The method calculates the failure exception to report per TaskEndReason. See below for the possible cases of TaskEndReason.

Caution
FIXME Describe TaskEndReason.

The executor for the failed task is added to failedExecutors.

It informs DAGScheduler that the task ended (using DAGScheduler.taskEnded).

The task is then added to the list of pending tasks.

If the TaskSetManager is not a zombie, and the task was not KILLED, and the task failure should be counted towards the maximum number of times the task is allowed to fail before the stage is aborted (TaskFailedReason.countTowardsTaskFailures is true), numFailures is incremented and if the number of failures of the task equals or is greater than assigned to the TaskSetManager (maxTaskFailures), the ERROR appears in the logs:

ERROR Task [id] in stage [id] failed [maxTaskFailures] times; aborting job

And abort is called, and the method quits.

Otherwise, TaskSchedulerImpl.taskSetFinished is called when the TaskSetManager is zombie and there are no running tasks.

FetchFailed

For FetchFailed, it logs WARNING:

WARNING Lost task [id] in stage [id] (TID [id], [host]): [reason.toErrorString]

Unless it has already been marked as successful (in successful), the task becomes so and tasksSuccessful is incremented.

The TaskSetManager becomes zombie.

No exception is returned.

ExceptionFailure

For ExceptionFailure, it grabs TaskMetrics if available.

If it is a NotSerializableException, it logs ERROR:

ERROR Task [id] in stage [id] (TID [tid]) had a not serializable result: [exception.description]; not retrying"

It calls abort and returns no failure exception.

It continues if not being a NotSerializableException.

It grabs the description and the time of the ExceptionFailure.

If the description, i.e. the ExceptionFailure, has already been reported (and is therefore a duplication), spark.logging.exceptionPrintInterval is checked before reprinting the duplicate exception in full.

For full printout of the ExceptionFailure, the following WARNING appears in the logs:

WARNING Lost task [id] in stage [id] (TID [id], [host]): [reason.toErrorString]

Otherwise, the following INFO appears in the logs:

INFO Lost task [id] in stage [id] (TID [id]) on executor [host]: [ef.className] ([ef.description]) [duplicate [count]]

The ExceptionFailure becomes failure exception.

ExecutorLostFailure

For ExecutorLostFailure if not exitCausedByApp, the following INFO appears in the logs:

INFO Task [tid] failed because while it was being computed, its executor exited for a reason unrelated to the task. Not counting this failure towards the maximum number of failures for the task.

No failure exception is returned.

Other TaskFailedReasons

For the other TaskFailedReasons, the WARNING appears in the logs:

WARNING Lost task [id] in stage [id] (TID [id], [host]): [reason.toErrorString]

No failure exception is returned.

Other TaskEndReason

For the other TaskEndReasons, the ERROR appears in the logs:

ERROR Unknown TaskEndReason: [e]

No failure exception is returned.

Retrying Tasks on Failure

Caution
FIXME

Up to spark.task.maxFailures attempts

Task retries and spark.task.maxFailures

When you start Spark program you set up spark.task.maxFailures for the number of failures that are acceptable until TaskSetManager gives up and marks a job failed.

In Spark shell with local master, spark.task.maxFailures is fixed to 1 and you need to use local-with-retries master to change it to some other value.

In the following example, you are going to execute a job with two partitions and keep one failing at all times (by throwing an exception). The aim is to learn the behavior of retrying task execution in a stage in TaskSet. You will only look at a single task execution, namely 0.0.

$ ./bin/spark-shell --master "local[*, 5]"
...
scala> sc.textFile("README.md", 2).mapPartitionsWithIndex((idx, it) => if (idx == 0) throw new Exception("Partition 2 marked failed") else it).count
...
15/10/27 17:24:56 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 1 (MapPartitionsRDD[7] at mapPartitionsWithIndex at <console>:25)
15/10/27 17:24:56 DEBUG DAGScheduler: New pending partitions: Set(0, 1)
15/10/27 17:24:56 INFO TaskSchedulerImpl: Adding task set 1.0 with 2 tasks
...
15/10/27 17:24:56 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, localhost, partition 0,PROCESS_LOCAL, 2062 bytes)
...
15/10/27 17:24:56 INFO Executor: Running task 0.0 in stage 1.0 (TID 2)
...
15/10/27 17:24:56 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 2)
java.lang.Exception: Partition 2 marked failed
...
15/10/27 17:24:56 INFO TaskSetManager: Starting task 0.1 in stage 1.0 (TID 4, localhost, partition 0,PROCESS_LOCAL, 2062 bytes)
15/10/27 17:24:56 INFO Executor: Running task 0.1 in stage 1.0 (TID 4)
15/10/27 17:24:56 INFO HadoopRDD: Input split: file:/Users/jacek/dev/oss/spark/README.md:0+1784
15/10/27 17:24:56 ERROR Executor: Exception in task 0.1 in stage 1.0 (TID 4)
java.lang.Exception: Partition 2 marked failed
...
15/10/27 17:24:56 ERROR Executor: Exception in task 0.4 in stage 1.0 (TID 7)
java.lang.Exception: Partition 2 marked failed
...
15/10/27 17:24:56 INFO TaskSetManager: Lost task 0.4 in stage 1.0 (TID 7) on executor localhost: java.lang.Exception (Partition 2 marked failed) [duplicate 4]
15/10/27 17:24:56 ERROR TaskSetManager: Task 0 in stage 1.0 failed 5 times; aborting job
15/10/27 17:24:56 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
15/10/27 17:24:56 INFO TaskSchedulerImpl: Cancelling stage 1
15/10/27 17:24:56 INFO DAGScheduler: ResultStage 1 (count at <console>:25) failed in 0.058 s
15/10/27 17:24:56 DEBUG DAGScheduler: After removal of stage 1, remaining stages = 0
15/10/27 17:24:56 INFO DAGScheduler: Job 1 failed: count at <console>:25, took 0.085810 s
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 5 times, most recent failure: Lost task 0.4 in stage 1.0 (TID 7, localhost): java.lang.Exception: Partition 2 marked failed

Zombie state

TaskSetManager enters zombie state when all tasks in a taskset have completed successfully (regardless of the number of task attempts), or if the task set has been aborted (see Aborting TaskSet).

While in zombie state, TaskSetManager can launch no new tasks and responds with no TaskDescription to resourceOffers.

TaskSetManager remains in the zombie state until all tasks have finished running, i.e. to continue to track and account for the running tasks.

Aborting TaskSet using abort Method

abort(message: String, exception: Option[Throwable] = None) method informs DAGScheduler that a TaskSet was aborted (using DAGScheduler.taskSetFailed method).

Caution
FIXME image with DAGScheduler call

The TaskSetManager enters zombie state.

Finally, maybeFinishTaskSet method is called.

Caution
FIXME Why is maybeFinishTaskSet method called? When is runningTasks 0?

Internal Registries

  • copiesRunning

  • successful

  • numFailures

  • failedExecutors contains a mapping of TaskInfo’s indices that failed to executor ids and the time of the failure. It is used in handleFailedTask.

  • taskAttempts

  • tasksSuccessful

  • stageId (default: taskSet.stageId)

  • totalResultSize

  • calculatedTasks

  • runningTasksSet

  • isZombie (default: false)

  • pendingTasksForExecutor

  • pendingTasksForHost

  • pendingTasksForRack

  • pendingTasksWithNoPrefs

  • allPendingTasks

  • speculatableTasks

  • taskInfos is the mapping between task ids and their TaskInfo

  • recentExceptions

Settings

  • spark.scheduler.executorTaskBlacklistTime (default: 0L) - time interval to pass after which a task can be re-launched on the executor where it has once failed. It is to prevent repeated task failures due to executor failures.

  • spark.speculation (default: false)

  • spark.speculation.quantile (default: 0.75) - the percentage of tasks that has not finished yet at which to start speculation.

  • spark.speculation.multiplier (default: 1.5)

  • spark.driver.maxResultSize (default: 1g) is the limit of bytes for total size of results. If the value is smaller than 1m or 1048576 (1024 * 1024), it becomes 0.

  • spark.logging.exceptionPrintInterval (default: 10000) - how frequently to reprint duplicate exceptions in full, in milliseconds

  • spark.locality.wait (default: 3s) - for locality-aware delay scheduling for PROCESS_LOCAL, NODE_LOCAL, and RACK_LOCAL when locality-specific setting is not set.

  • spark.locality.wait.process (default: the value of spark.locality.wait) - delay for PROCESS_LOCAL

  • spark.locality.wait.node (default: the value of spark.locality.wait) - delay for NODE_LOCAL

  • spark.locality.wait.rack (default: the value of spark.locality.wait) - delay for RACK_LOCAL