Skip to content

Latest commit

 

History

History
530 lines (308 loc) · 30.7 KB

spark-taskschedulerimpl.adoc

File metadata and controls

530 lines (308 loc) · 30.7 KB

TaskSchedulerImpl - Default TaskScheduler

TaskSchedulerImpl is the default implementation of TaskScheduler Contract and extends it to track racks per host and port. It can schedule tasks for multiple types of cluster managers by means of Scheduler Backends.

Using spark.scheduler.mode setting you can select the scheduling policy.

When a Spark application starts (and an instance of SparkContext is created) TaskSchedulerImpl with a SchedulerBackend and DAGScheduler are created and soon started.

taskschedulerimpl sparkcontext schedulerbackend dagscheduler
Figure 1. TaskSchedulerImpl and Other Services
Note
TaskSchedulerImpl is a private[spark] class with the source code in org.apache.spark.scheduler.TaskSchedulerImpl.
Tip

Enable INFO or DEBUG logging level for org.apache.spark.scheduler.TaskSchedulerImpl logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.scheduler.TaskSchedulerImpl=DEBUG

applicationAttemptId method

applicationAttemptId(): Option[String]
Caution
FIXME

schedulableBuilder Attribute

schedulableBuilder is a SchedulableBuilder for the TaskSchedulerImpl.

It is set up when a TaskSchedulerImpl is initialized and can be one of two available builders:

Note
Use spark.scheduler.mode setting to select the scheduling policy.

Tracking Racks per Hosts and Ports (getRackForHost method)

getRackForHost(value: String): Option[String]

getRackForHost is a method to know about the racks per hosts and ports. By default, it assumes that racks are unknown (i.e. the method returns None).

Note
It is overriden by the YARN-specific TaskScheduler YarnScheduler.

getRackForHost is currently used in two places:

Creating TaskSchedulerImpl

Creating a TaskSchedulerImpl object requires a SparkContext object, the acceptable number of task failures (maxTaskFailures) and optional isLocal flag (disabled by default, i.e. false).

Note
There is another TaskSchedulerImpl constructor that requires a SparkContext object only and sets maxTaskFailures to spark.task.maxFailures or, if spark.task.maxFailures is not set, defaults to 4.

While being created, it initializes internal registries to their default values.

It then sets schedulingMode to the value of spark.scheduler.mode setting or FIFO.

Note
schedulingMode is part of TaskScheduler Contract.

Failure to set schedulingMode results in a SparkException:

Unrecognized spark.scheduler.mode: [schedulingModeConf]

It sets taskResultGetter as a TaskResultGetter.

Caution
FIXME Where is taskResultGetter used?

Acceptable Number of Task Failures (maxTaskFailures attribute)

The acceptable number of task failures (maxTaskFailures) can be explicitly defined when creating TaskSchedulerImpl instance or based on spark.task.maxFailures setting that defaults to 4 failures.

Note
It is exclusively used when submitting tasks through TaskSetManager.

Internal Cleanup After Removing Executor (removeExecutor method)

removeExecutor(executorId: String, reason: ExecutorLossReason): Unit

removeExecutor removes the executorId executor from the internal registries: executorIdToTaskCount, executorIdToHost, executorsByHost, and hostsByRack. If the affected hosts and racks are the last entries in executorsByHost and hostsByRack, appropriately, they are removed from the registries.

Unless reason is LossReasonPending, the executor is removed from executorIdToHost registry and TaskSetManagers get notified.

Note
The internal removeExecutor is called as part of statusUpdate and executorLost.

Local vs Non-Local Mode (isLocal attribute)

Caution
FIXME

Initializing TaskSchedulerImpl (initialize method)

initialize(backend: SchedulerBackend): Unit

initialize initializes a TaskSchedulerImpl object.

TaskSchedulerImpl initialize
Figure 2. TaskSchedulerImpl initialization

initialize saves the reference to the current SchedulerBackend (as backend) and sets rootPool to be an empty-named Pool with already-initialized schedulingMode (while creating a TaskSchedulerImpl object), initMinShare and initWeight as 0.

Note
schedulingMode and rootPool are a part of TaskScheduler Contract.

It then creates the internal SchedulableBuilder object (as schedulableBuilder) based on schedulingMode:

With the schedulableBuilder object created, initialize requests it to build pools.

Caution
FIXME Why are rootPool and schedulableBuilder created only now? What do they need that it is not available when TaskSchedulerImpl is created?

Starting TaskSchedulerImpl (start method)

As part of initialization of a SparkContext, TaskSchedulerImpl is started (using start from the TaskScheduler Contract).

start(): Unit

It starts the scheduler backend it manages.

Below is a figure of the method calls in Spark Standalone mode.

taskschedulerimpl start standalone
Figure 3. Starting TaskSchedulerImpl in Spark Standalone mode

It also starts the task-scheduler-speculation executor pool. See Speculative Execution of Tasks.

Post-Start Initialization (using postStartHook)

postStartHook is a custom implementation of postStartHook from the TaskScheduler Contract that waits until a scheduler backend is ready (using the internal blocking waitBackendReady).

Note
postStartHook is used when SparkContext is created (before it is fully created) and YarnClusterScheduler.postStartHook.

Waiting Until SchedulerBackend is Ready (waitBackendReady method)

The private waitBackendReady method waits until a SchedulerBackend is ready.

It keeps on checking the status every 100 milliseconds until the SchedulerBackend is ready or the SparkContext is stopped.

If the SparkContext happens to be stopped while doing the waiting, a IllegalStateException is thrown with the message:

Spark context stopped while waiting for backend

Stopping TaskSchedulerImpl (stop method)

When TaskSchedulerImpl is stopped (using stop() method), it does the following:

Speculative Execution of Tasks

Speculative tasks (also speculatable tasks or task strugglers) are tasks that run slower than most (FIXME the setting) of the all tasks in a job.

Speculative execution of tasks is a health-check procedure that checks for tasks to be speculated, i.e. running slower in a stage than the median of all successfully completed tasks in a taskset (FIXME the setting). Such slow tasks will be re-launched in another worker. It will not stop the slow tasks, but run a new copy in parallel.

The thread starts as TaskSchedulerImpl starts in clustered deployment modes with spark.speculation enabled. It executes periodically every spark.speculation.interval after spark.speculation.interval passes.

When enabled, you should see the following INFO message in the logs:

INFO Starting speculative execution thread

It works as task-scheduler-speculation daemon thread pool using j.u.c.ScheduledThreadPoolExecutor with core pool size 1.

The job with speculatable tasks should finish while speculative tasks are running, and it will leave these tasks running - no KILL command yet.

It uses checkSpeculatableTasks method that asks rootPool to check for speculatable tasks. If there are any, SchedulerBackend is called for reviveOffers.

Caution
FIXME How does Spark handle repeated results of speculative tasks since there are copies launched?

Calculating Default Level of Parallelism (defaultParallelism method)

Default level of parallelism is a hint for sizing jobs. It is a part of the TaskScheduler contract and used by SparkContext to create RDDs with the right number of partitions when not specified explicitly.

TaskSchedulerImpl uses SchedulerBackend.defaultParallelism() to calculate the value, i.e. it just passes it along to a scheduler backend.

Submitting Tasks (using submitTasks)

Note
submitTasks is a part of TaskScheduler Contract.
submitTasks(taskSet: TaskSet): Unit

submitTasks creates a TaskSetManager for the input TaskSet and adds it to the Schedulable root pool.

Note
The root pool can be a single flat linked queue (in FIFO scheduling mode) or a hierarchy of pools of Schedulables (in FAIR scheduling mode).

It makes sure that the requested resources, i.e. CPU and memory, are assigned to the Spark application for a non-local environment before requesting the current SchedulerBackend to revive offers.

taskschedulerImpl submitTasks
Figure 4. TaskSchedulerImpl.submitTasks
Note
If there are tasks to launch for missing partitions in a stage, DAGScheduler executes submitTasks (see submitMissingTasks for Stage and Job).

When submitTasks is called, you should see the following INFO message in the logs:

INFO TaskSchedulerImpl: Adding task set [taskSet.id] with [tasks.length] tasks

It creates a new TaskSetManager for the input taskSet and the acceptable number of task failures.

Note
The acceptable number of task failures is specified when a TaskSchedulerImpl is created.
Note
A TaskSet knows the tasks to execute (as tasks) and stage id (as stageId) the tasks belong to. Read TaskSets.

The TaskSet is registered in the internal taskSetsByStageIdAndAttempt registry with the TaskSetManager.

If there is more than one active TaskSetManager for the stage, a IllegalStateException is thrown with the message:

more than one active taskSet for stage [stage]: [TaskSet ids]
Note
TaskSetManager is considered active when it is not a zombie.

When the method is called the very first time (hasReceivedTask is false) in cluster mode only (i.e. isLocal of the TaskSchedulerImpl is false), starvationTimer is scheduled to execute after spark.starvation.timeout to ensure that the requested resources, i.e. CPUs and memory, were assigned by a cluster manager.

Note
After the first spark.starvation.timeout passes, the internal hasReceivedTask flag becomes true.

Every time the starvation timer thread is executed and hasLaunchedTask flag is false, the following WARN message is printed out to the logs:

WARN Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources

Otherwise, when the hasLaunchedTask flag is true the timer thread cancels itself.

Ultimately, submitTasks requests the SchedulerBackend to revive offers.

Tip
Use dag-scheduler-event-loop thread to step through the code in a debugger.

taskSetsByStageIdAndAttempt Registry

Caution
FIXME

A mapping between stages and a collection of attempt ids and TaskSetManagers.

Processing Executor Resource Offers (using resourceOffers)

resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]]

resourceOffers method is called by SchedulerBackend (for clustered environments) or LocalBackend (for local mode) with WorkerOffer resource offers that represent cores (CPUs) available on all the active executors with one WorkerOffer per active executor.

taskscheduler resourceOffers
Figure 5. Processing Executor Resource Offers
Note
resourceOffers is a mechanism to propagate information about active executors to TaskSchedulerImpl with the hosts and racks (if supported by the cluster manager).

A WorkerOffer is a 3-tuple with executor id, host, and the number of free cores available.

WorkerOffer(executorId: String, host: String, cores: Int)

For each WorkerOffer (that represents free cores on an executor) resourceOffers method records the host per executor id (using the internal executorIdToHost) and sets 0 as the number of tasks running on the executor if there are no tasks on the executor (using executorIdToTaskCount). It also records hosts (with executors in the internal executorsByHost registry).

Warning
FIXME BUG? Why is the executor id not added to executorsByHost?

For the offers with a host that has not been recorded yet (in the internal executorsByHost registry) the following occurs:

  1. The host is recorded in the internal executorsByHost registry.

  2. executorAdded callback is called (with the executor id and the host from the offer).

  3. newExecAvail flag is enabled (it is later used to inform TaskSetManagers about the new executor).

Caution
FIXME a picture with executorAdded call from TaskSchedulerImpl to DAGScheduler.

It shuffles the input offers that is supposed to help evenly distributing tasks across executors (that the input offers represent) and builds internal structures like tasks and availableCpus.

TaskSchedulerImpl resourceOffers internal structures
Figure 6. Internal Structures of resourceOffers with 5 WorkerOffers

The root pool is requested for TaskSetManagers sorted appropriately (according to the scheduling order).

Note
rootPool is a part of the TaskScheduler Contract and is exclusively managed by SchedulableBuilders (that add TaskSetManagers to the root pool.

For every TaskSetManager in the TaskSetManager sorted queue, the following DEBUG message is printed out to the logs:

DEBUG TaskSchedulerImpl: parentName: [taskSet.parent.name], name: [taskSet.name], runningTasks: [taskSet.runningTasks]
Note
The internal rootPool is configured while TaskSchedulerImpl is being initialized.

While traversing over the sorted collection of TaskSetManagers, if a new host (with an executor) was registered, i.e. the newExecAvail flag is enabled, TaskSetManagers are informed about the new executor added.

Note
A TaskSetManager will be informed about one or more new executors once per host regardless of the number of executors registered on the host.

For each TaskSetManager (in sortedTaskSets) and for each preferred locality level (ascending), resourceOfferSingleTaskSet is called until launchedTask flag is false.

Caution
FIXME resourceOfferSingleTaskSet + the sentence above less code-centric.

Check whether the number of cores in an offer is greater than the number of cores needed for a task.

When resourceOffers managed to launch a task (i.e. tasks collection is not empty), the internal hasLaunchedTask flag becomes true (that effectively means what the name says "There were executors and I managed to launch a task").

resourceOffers returns the tasks collection.

Note
resourceOffers is called when CoarseGrainedSchedulerBackend makes resource offers.

resourceOfferSingleTaskSet method

resourceOfferSingleTaskSet(
  taskSet: TaskSetManager,
  maxLocality: TaskLocality,
  shuffledOffers: Seq[WorkerOffer],
  availableCpus: Array[Int],
  tasks: Seq[ArrayBuffer[TaskDescription]]): Boolean

resourceOfferSingleTaskSet is a private helper method that is executed when…​

TaskResultGetter

TaskResultGetter is a helper class for TaskSchedulerImpl.statusUpdate. It asynchronously fetches the task results of tasks that have finished successfully (using enqueueSuccessfulTask) or fetches the reasons of failures for failed tasks (using enqueueFailedTask). It then sends the "results" back to TaskSchedulerImpl.

Caution
FIXME Image with the dependencies
Tip
Consult Task States in Tasks to learn about the different task states.
Note
The only instance of TaskResultGetter is created while TaskSchedulerImpl is being created (as taskResultGetter). It requires a SparkEnv and TaskSchedulerImpl. It is stopped when TaskSchedulerImpl stops.

TaskResultGetter offers the following methods:

The methods use the internal (daemon thread) thread pool task-result-getter (as getTaskResultExecutor) with spark.resultGetter.threads so they can be executed asynchronously.

TaskResultGetter.enqueueSuccessfulTask

enqueueSuccessfulTask(taskSetManager: TaskSetManager, tid: Long, serializedData: ByteBuffer) starts by deserializing TaskResult (from serializedData using the global closure Serializer).

If the result is DirectTaskResult, the method checks taskSetManager.canFetchMoreResults(serializedData.limit()) and possibly quits. If not, it deserializes the result (using SparkEnv.serializer).

Caution
FIXME Review taskSetManager.canFetchMoreResults(serializedData.limit()).

If the result is IndirectTaskResult, the method checks taskSetManager.canFetchMoreResults(size) and possibly removes the block id (using SparkEnv.blockManager.master.removeBlock(blockId)) and quits. If not, you should see the following DEBUG message in the logs:

DEBUG Fetching indirect task result for TID [tid]

scheduler.handleTaskGettingResult(taskSetManager, tid) gets called. And sparkEnv.blockManager.getRemoteBytes(blockId).

Failure in getting task result from BlockManager results in calling TaskSchedulerImpl.handleFailedTask(taskSetManager, tid, TaskState.FINISHED, TaskResultLost) and quit.

The task result is deserialized to DirectTaskResult (using the global closure Serializer) and sparkEnv.blockManager.master.removeBlock(blockId) is called afterwards.

TaskSchedulerImpl.handleSuccessfulTask(taskSetManager, tid, result) is called.

Caution
FIXME What is TaskSchedulerImpl.handleSuccessfulTask doing?

Any ClassNotFoundException or non fatal exceptions lead to TaskSetManager.abort.

TaskResultGetter.enqueueFailedTask

enqueueFailedTask(taskSetManager: TaskSetManager, tid: Long, taskState: TaskState, serializedData: ByteBuffer) checks whether serializedData contains any data and if it does it deserializes it to a TaskEndReason (using the global closure Serializer).

Either UnknownReason or the deserialized instance is passed on to TaskSchedulerImpl.handleFailedTask as the reason of the failure.

Any ClassNotFoundException leads to printing out the ERROR message to the logs:

ERROR Could not deserialize TaskEndReason: ClassNotFound with classloader [loader]

TaskSchedulerImpl.statusUpdate

statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) is called by scheduler backends to inform about task state changes (see Task States in Tasks).

Caution
FIXME image with scheduler backends calling TaskSchedulerImpl.statusUpdate.

It is called by:

When statusUpdate starts, it checks the current state of the task and act accordingly.

If a task became TaskState.LOST and there is still an executor assigned for the task (it seems it may not given the check), the executor is marked as lost (or sometimes called failed). The executor is later announced as such using DAGScheduler.executorLost with SchedulerBackend.reviveOffers() being called afterwards.

Caution
FIXME Why is SchedulerBackend.reviveOffers() called only for lost executors?

The method looks up the TaskSetManager for the task (using taskIdToTaskSetManager).

When the TaskSetManager is found and the task is in finished state, the task is removed from the internal data structures, i.e. taskIdToTaskSetManager and taskIdToExecutorId, and the number of currently running tasks for the executor(s) is decremented (using executorIdToTaskCount).

For a FINISHED task, TaskSet.removeRunningTask is called and then TaskResultGetter.enqueueSuccessfulTask.

For a task in FAILED, KILLED, or LOST state, TaskSet.removeRunningTask is called (as for the FINISHED state) and then TaskResultGetter.enqueueFailedTask.

If the TaskSetManager could not be found, the following ERROR shows in the logs:

ERROR Ignoring update with state [state] for TID [tid] because its task set is gone (this is likely the result of receiving duplicate task finished status updates)

TaskSchedulerImpl.handleFailedTask

TaskSchedulerImpl.handleFailedTask(taskSetManager: TaskSetManager, tid: Long, taskState: TaskState, reason: TaskEndReason) is called when TaskResultGetter.enqueueSuccessfulTask failed to fetch bytes from BlockManager or as part of TaskResultGetter.enqueueFailedTask.

Either way there is an error related to task execution.

If the TaskSetManager is not a zombie and the task’s state is not KILLED, SchedulerBackend.reviveOffers is called.

TaskSchedulerImpl.taskSetFinished

taskSetFinished(manager: TaskSetManager) method is called to inform TaskSchedulerImpl that all tasks in a TaskSetManager have finished execution.

taskschedulerimpl tasksetmanager tasksetfinished
Figure 7. TaskSchedulerImpl.taskSetFinished is called when all tasks are finished
Note
taskSetFinished is called by TaskSetManager at the very end of TaskSetManager.handleSuccessfulTask.

taskSetsByStageIdAndAttempt internal mapping is queried by stage id (using manager.taskSet.stageId) for the corresponding TaskSets (TaskSetManagers in fact) to remove the currently-finished stage attempt (using manager.taskSet.stageAttemptId) and if it was the only attempt, the stage id is completely removed from taskSetsByStageIdAndAttempt.

Note
A TaskSetManager owns a TaskSet that corresponds to a stage.

Pool.removeSchedulable(manager) is called for the parent of the TaskSetManager.

You should see the following INFO message in the logs:

INFO Removed TaskSet [manager.taskSet.id], whose tasks have all completed, from pool [manager.parent.name]

TaskSchedulerImpl.executorAdded

executorAdded(execId: String, host: String)

executorAdded method simply passes the notification on to the DAGScheduler (using DAGScheduler.executorAdded)

Caution
FIXME Image with a call from TaskSchedulerImpl to DAGScheduler, please.

Internal Registries

Caution
FIXME How/where are these mappings used?

TaskSchedulerImpl tracks the following information in its internal data structures:

  • the number of tasks already scheduled for execution (nextTaskId).

  • TaskSets by stage and attempt ids (taskSetsByStageIdAndAttempt)

  • tasks to their TaskSetManagers (taskIdToTaskSetManager)

  • tasks to executors (taskIdToExecutorId)

  • the number of tasks running per executor (executorIdToTaskCount)

  • the set of executors on each host (executorsByHost)

  • the set of hosts per rack (hostsByRack)

  • executor ids to corresponding host (executorIdToHost).

Settings

spark.task.maxFailures

spark.task.maxFailures (default: 4 for cluster mode and 1 for local except local-with-retries) - The number of individual task failures before giving up on the entire TaskSet and the job afterwards.

It is used in TaskSchedulerImpl to initialize a TaskSetManager.

spark.task.cpus

spark.task.cpus (default: 1) sets how many CPUs to request per task.

spark.scheduler.mode

spark.scheduler.mode (default: FIFO) is a case-insensitive name of the scheduling mode and can be one of FAIR, FIFO, or NONE.

Note
Only FAIR and FIFO are supported by TaskSchedulerImpl. See schedulableBuilder.

spark.speculation.interval

spark.speculation.interval (default: 100ms) - how often to check for speculative tasks.

spark.starvation.timeout

spark.starvation.timeout (default: 15s) - Threshold above which Spark warns a user that an initial TaskSet may be starved.

spark.resultGetter.threads

spark.resultGetter.threads (default: 4) - the number of threads for TaskResultGetter.