Skip to content

Latest commit

 

History

History
868 lines (534 loc) · 43.8 KB

spark-dagscheduler.adoc

File metadata and controls

868 lines (534 loc) · 43.8 KB

DAGScheduler

Note

The introduction that follows was highly influenced by the scaladoc of org.apache.spark.scheduler.DAGScheduler. As DAGScheduler is a private class it does not appear in the official API documentation. You are strongly encouraged to read the sources and only then read this and the related pages afterwards.

"Reading the sources", I say?! Yes, I am kidding!

Introduction

DAGScheduler is the scheduling layer of Apache Spark that implements stage-oriented scheduling, i.e. after an RDD action has been called it becomes a job that is then transformed into a set of stages that are submitted as TaskSets for execution (see Execution Model).

dagscheduler rdd partitions job resultstage
Figure 1. Executing action leads to new ResultStage and ActiveJob in DAGScheduler

The fundamental concepts of DAGScheduler are jobs and stages (refer to Jobs and Stages respectively).

DAGScheduler works on a driver. It is created as part of SparkContext’s initialization, right after TaskScheduler and SchedulerBackend are ready.

dagscheduler new instance
Figure 2. DAGScheduler as created by SparkContext with other services

DAGScheduler does three things in Spark (thorough explanations follow):

  • Computes an execution DAG, i.e. DAG of stages, for a job.

  • Determines the preferred locations to run each task on.

  • Handles failures due to shuffle output files being lost.

It computes a directed acyclic graph (DAG) of stages for each job, keeps track of which RDDs and stage outputs are materialized, and finds a minimal schedule to run jobs. It then submits stages to TaskScheduler.

In addition to coming up with the execution DAG, DAGScheduler also determines the preferred locations to run each task on, based on the current cache status, and passes the information to TaskScheduler.

Furthermore, it handles failures due to shuffle output files being lost, in which case old stages may need to be resubmitted. Failures within a stage that are not caused by shuffle file loss are handled by the TaskScheduler itself, which will retry each task a small number of times before cancelling the whole stage.

DAGScheduler uses an event queue architecture in which a thread can post DAGSchedulerEvent events, e.g. a new job or stage being submitted, that DAGScheduler reads and executes sequentially. See the section Internal Event Loop - dag-scheduler-event-loop.

DAGScheduler runs stages in topological order.

Tip

Enable DEBUG or TRACE logging level for org.apache.spark.scheduler.DAGSchedule logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.scheduler.DAGScheduler=TRACE

Refer to Logging.

DAGScheduler needs SparkContext, Task Scheduler, LiveListenerBus, MapOutputTracker and Block Manager to work. However, at the very minimum, DAGScheduler needs SparkContext only (and asks SparkContext for the other services).

DAGScheduler reports metrics about its execution (refer to the section Metrics).

When DAGScheduler schedules a job as a result of executing an action on a RDD or calling SparkContext.runJob() method directly, it spawns parallel tasks to compute (partial) results per partition.

Creating DAGScheduler Instance

Caution
FIXME

Internal Registries

DAGScheduler maintains the following information in internal registries:

  • nextJobId for the next job id

  • numTotalJobs (alias of nextJobId) for the total number of submitted

  • nextStageId for the next stage id

  • jobIdToStageIds for a mapping between jobs and their stages

  • stageIdToStage for a mapping between stage ids to stages

  • shuffleToMapStage for a mapping between ids to ShuffleMapStages

  • jobIdToActiveJob for a mapping between job ids to ActiveJobs

  • waitingStages for stages with parents to be computed

  • runningStages for stages currently being run

  • failedStages for stages that failed due to fetch failures (as reported by CompletionEvents for FetchFailed end reasons) and are going to be resubmitted.

  • activeJobs for a collection of ActiveJob instances

  • cacheLocs is a mapping between RDD ids and their cache preferences per partition (as arrays indexed by partition numbers). Each array value is the set of locations where that RDD partition is cached on. See Cache Tracking.

  • failedEpoch is a mapping between failed executors and the epoch number when the failure was caught per executor.

Caution

FIXME Review

  • cleanupStateForJobAndIndependentStages

DAGScheduler.resubmitFailedStages

resubmitFailedStages() is called to go over failedStages collection (of failed stages) and submit them (using submitStage).

If the failed stages collection contains any stage, the following INFO message appears in the logs:

INFO Resubmitting failed stages

cacheLocs and failedStages are cleared, and failed stages are submitStage one by one, ordered by job ids (in an increasing order).

Ultimately, all waiting stages are submitted (using submitWaitingStages).

DAGScheduler.runJob

When executed, DAGScheduler.runJob is given the following arguments:

  • A RDD to run job on.

  • A function to run on each partition of the RDD.

  • A set of partitions to run on (not all partitions are always required to compute a job for actions like first() or take()).

  • A callback function resultHandler to pass results of executing the function to.

  • Properties to attach to a job.

It calls DAGScheduler.submitJob and then waits until a result comes using a JobWaiter object. A job can succeed or fail.

When a job succeeds, the following INFO shows up in the logs:

INFO Job [jobId] finished: [callSite], took [time] s

When a job fails, the following INFO shows up in the logs:

INFO Job [jobId] failed: [callSite], took [time] s

The method finishes by throwing an exception.

DAGScheduler.submitJob

DAGScheduler.submitJob is called by SparkContext.submitJob and DAGScheduler.runJob.

When called, it does the following:

  • Checks whether the set of partitions to run a function on are in the the range of available partitions of the RDD.

  • Increments the internal nextJobId job counter.

  • Returns a 0-task JobWaiter when no partitions are passed in.

  • Or posts JobSubmitted event to dag-scheduler-event-loop and returns a JobWaiter.

dagscheduler submitjob
Figure 3. DAGScheduler.submitJob

You may see an exception thrown when the partitions in the set are outside the range:

Attempting to access a non-existent partition: [p]. Total number of partitions: [maxPartitions]

JobListener and Completion Events

You can listen for job completion or failure events after submitting a job to the DAGScheduler using JobListener. It is a private[spark] contract (a Scala trait) with the following two methods:

private[spark] trait JobListener {
  def taskSucceeded(index: Int, result: Any)
  def jobFailed(exception: Exception)
}

A job listener is notified each time a task succeeds (by def taskSucceeded(index: Int, result: Any)), as well as if the whole job fails (by def jobFailed(exception: Exception)).

An instance of JobListener is used in the following places:

  • In ActiveJob as a listener to notify if tasks in this job finish or the job fails.

  • In DAGScheduler.handleJobSubmitted

  • In DAGScheduler.handleMapStageSubmitted

  • In JobSubmitted

  • In MapStageSubmitted

The following are the job listeners used:

  • JobWaiter waits until DAGScheduler completes the job and passes the results of tasks to a resultHandler function.

  • ApproximateActionListener FIXME

JobWaiter

A JobWaiter is an extension of JobListener. It is used as the return value of DAGScheduler.submitJob and DAGScheduler.submitMapStage. You can use a JobWaiter to block until the job finishes executing or to cancel it.

While the methods execute, JobSubmitted and MapStageSubmitted events are posted that reference the JobWaiter.

Since a JobWaiter object is a JobListener it gets notifications about taskSucceeded and jobFailed. When the total number of tasks (that equals the number of partitions to compute) equals the number of taskSucceeded, the JobWaiter instance is marked succeeded. A jobFailed event marks the JobWaiter instance failed.

  • FIXME Who’s using submitMapStage?

DAGScheduler.executorAdded

executorAdded(execId: String, host: String) method simply posts a ExecutorAdded event to eventProcessLoop.

DAGScheduler.taskEnded

taskEnded(
  task: Task[_],
  reason: TaskEndReason,
  result: Any,
  accumUpdates: Map[Long, Any],
  taskInfo: TaskInfo,
  taskMetrics: TaskMetrics): Unit

taskEnded method simply posts a CompletionEvent event to the DAGScheduler’s internal event loop.

Note
DAGScheduler.taskEnded method is called by a TaskSetManager to report task completions, failures including.
Tip
Read about TaskMetrics in TaskMetrics.

failJobAndIndependentStages

The internal failJobAndIndependentStages method…​FIXME

Note
It is called by…​FIXME

dag-scheduler-event-loop - Internal Event Loop

DAGScheduler.eventProcessLoop (of type DAGSchedulerEventProcessLoop) - is the event process loop to which Spark (by DAGScheduler.submitJob) posts jobs to schedule their execution. Later on, TaskSetManager talks back to DAGScheduler to inform about the status of the tasks using the same "communication channel".

It allows Spark to release the current thread when posting happens and let the event loop handle events on a separate thread - asynchronously.

…​IMAGE…​FIXME

Internally, DAGSchedulerEventProcessLoop uses java.util.concurrent.LinkedBlockingDeque blocking deque that grows indefinitely (i.e. up to Integer.MAX_VALUE events).

The name of the single "logic" thread that reads events and takes decisions is dag-scheduler-event-loop.

"dag-scheduler-event-loop" #89 daemon prio=5 os_prio=31 tid=0x00007f809bc0a000 nid=0xc903 waiting on condition [0x0000000125826000]

The following are the current types of DAGSchedulerEvent events that are handled by DAGScheduler:

Caution

FIXME

  • What is an approximate job (as in DAGScheduler.runApproximateJob)?

  • statistics? MapOutputStatistics?

JobCancelled and handleJobCancellation

JobCancelled(jobId: Int) event is posted to cancel a job if it is scheduled or still running. It triggers execution of DAGScheduler.handleStageCancellation(stageId).

Note
It seems that although SparkContext.cancelJob(jobId: Int) calls DAGScheduler.cancelJob, no feature/code in Spark calls SparkContext.cancelJob(jobId: Int). A dead code?

When JobWaiter.cancel is called, it calls DAGScheduler.cancelJob. You should see the following INFO message in the logs:

INFO Asked to cancel job [jobId]

It is a signal to the DAGScheduler to cancel the job.

Caution
FIXME

ExecutorAdded and handleExecutorAdded

ExecutorAdded(execId, host) event triggers execution of DAGScheduler.handleExecutorAdded(execId: String, host: String).

It checks failedEpoch for the executor id (using execId) and if it is found the following INFO message appears in the logs:

INFO Host added was in lost list earlier: [host]

The executor is removed from the list of failed nodes.

At the end, DAGScheduler.submitWaitingStages() is called.

ExecutorLost and handleExecutorLost (with fetchFailed being false)

ExecutorLost(execId) event triggers execution of DAGScheduler.handleExecutorLost(execId: String, fetchFailed: Boolean, maybeEpoch: Option[Long] = None) with fetchFailed being false.

Note

handleExecutorLost recognizes two cases (by means of fetchFailed):

  • fetch failures (fetchFailed is true) from executors that are indirectly assumed lost. See <<handleTaskCompletion-FetchFailed, FetchFailed case in handleTaskCompletion>.

  • lost executors (fetchFailed is false) for executors that did not report being alive in a given timeframe

The current epoch number could be provided (as maybeEpoch) or it is calculated by requesting it from MapOutputTrackerMaster (using MapOutputTrackerMaster.getEpoch).

dagscheduler handleExecutorLost
Figure 4. DAGScheduler.handleExecutorLost

Recurring ExecutorLost events merely lead to the following DEBUG message in the logs:

DEBUG Additional executor lost message for [execId] (epoch [currentEpoch])

If however the executor is not in the list of executor lost or the failed epoch number is smaller than the current one, the executor is added to failedEpoch.

The following INFO message appears in the logs:

INFO Executor lost: [execId] (epoch [currentEpoch])

If the external shuffle service is not used or the ExecutorLost event was for a map output fetch operation, all ShuffleMapStage (using shuffleToMapStage) are called (in order):

For no ShuffleMapStages (in shuffleToMapStage), MapOutputTrackerMaster.incrementEpoch is called.

cacheLocs is cleared.

At the end, DAGScheduler.submitWaitingStages() is called.

StageCancelled and handleStageCancellation

StageCancelled(stageId: Int) event is posted to cancel a stage and all jobs associated with it. It triggers execution of DAGScheduler.handleStageCancellation(stageId).

It is the result of executing SparkContext.cancelStage(stageId: Int) that is called from the web UI (controlled by spark.ui.killEnabled).

Caution
FIXME Image of the tab with kill

DAGScheduler.handleStageCancellation(stageId) checks whether the stageId stage exists and for each job associated with the stage, it calls handleJobCancellation(jobId, s"because Stage [stageId] was cancelled").

Note
A stage knows what jobs it is part of using the internal set jobIds.

def handleJobCancellation(jobId: Int, reason: String = "") checks whether the job exists in jobIdToStageIds and if not, prints the following DEBUG to the logs:

DEBUG Trying to cancel unregistered job [jobId]

However, if the job exists, the job and all the stages that are only used by it (using the internal failJobAndIndependentStages method).

For each running stage associated with the job (jobIdToStageIds), if there is only one job for the stage (stageIdToStage), TaskScheduler.cancelTasks is called, outputCommitCoordinator.stageEnd(stage.id), and SparkListenerStageCompleted is posted. The stage is no longer a running one (removed from runningStages).

Caution
FIXME Image please with the call to TaskScheduler.
  • spark.job.interruptOnCancel (default: false) - controls whether or not to interrupt a job on cancel.

In case TaskScheduler.cancelTasks completed successfully, JobListener is informed about job failure, cleanupStateForJobAndIndependentStages is called, and SparkListenerJobEnd posted.

Caution
FIXME cleanupStateForJobAndIndependentStages code review.
Caution
FIXME Where are job.properties assigned to a job?
"Job %d cancelled %s".format(jobId, reason)

If no stage exists for stageId, the following INFO message shows in the logs:

INFO No active jobs to kill for Stage [stageId]

At the end, DAGScheduler.submitWaitingStages() is called.

MapStageSubmitted and handleMapStageSubmitted

When a MapStageSubmitted event is posted, it triggers execution of DAGScheduler.handleMapStageSubmitted method.

scheduler handlemapstagesubmitted
Figure 5. DAGScheduler.handleMapStageSubmitted handles MapStageSubmitted events

It is called with a job id (for a new job to be created), a ShuffleDependency, and a JobListener.

You should see the following INFOs in the logs:

Got map stage job %s (%s) with %d output partitions
Final stage: [finalStage] ([finalStage.name])
Parents of final stage: [finalStage.parents]
Missing parents: [list of stages]

SparkListenerJobStart event is posted to LiveListenerBus (so other event listeners know about the event - not only DAGScheduler).

The execution procedure of MapStageSubmitted events is then exactly (FIXME ?) as for JobSubmitted.

Tip

The difference between handleMapStageSubmitted and handleJobSubmitted:

  • handleMapStageSubmitted has ShuffleDependency among the input parameters while handleJobSubmitted has finalRDD, func, and partitions.

  • handleMapStageSubmitted initializes finalStage as getShuffleMapStage(dependency, jobId) while handleJobSubmitted as finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)

  • handleMapStageSubmitted INFO logs Got map stage job %s (%s) with %d output partitions with dependency.rdd.partitions.length while handleJobSubmitted does Got job %s (%s) with %d output partitions with partitions.length.

  • FIXME: Could the above be cut to ActiveJob.numPartitions?

  • handleMapStageSubmitted adds a new job with finalStage.addActiveJob(job) while handleJobSubmitted sets with finalStage.setActiveJob(job).

  • handleMapStageSubmitted checks if the final stage has already finished, tells the listener and removes it using the code:

    if (finalStage.isAvailable) {
      markMapStageJobAsFinished(job, mapOutputTracker.getStatistics(dependency))
    }

JobSubmitted and handleJobSubmitted

When DAGScheduler receives JobSubmitted event it calls DAGScheduler.handleJobSubmitted method.

dagscheduler handleJobSubmitted
Figure 6. DAGScheduler.handleJobSubmitted

handleJobSubmitted has access to the final RDD, the partitions to compute, and the JobListener for the job, i.e. JobWaiter.

It creates a new ResultStage (as finalStage on the picture) and instantiates ActiveJob.

Caution
FIXME review newResultStage

You should see the following INFO messages in the logs:

INFO DAGScheduler: Got job [jobId] ([callSite.shortForm]) with [partitions.length] output partitions
INFO DAGScheduler: Final stage: [finalStage] ([finalStage.name])
INFO DAGScheduler: Parents of final stage: [finalStage.parents]
INFO DAGScheduler: Missing parents: [getMissingParentStages(finalStage)]

Then, the finalStage stage is given the ActiveJob instance and some housekeeping is performed to track the job (using jobIdToActiveJob and activeJobs).

SparkListenerJobStart message is posted to LiveListenerBus.

Caution
FIXME jobIdToStageIds and stageIdToStage - they’re already computed. When? Where?

When DAGScheduler executes a job it first submits the final stage (using submitStage).

Right before handleJobSubmitted finishes, DAGScheduler.submitWaitingStages() is called.

CompletionEvent and handleTaskCompletion

CompletionEvent event informs DAGScheduler about task completions. It is handled by handleTaskCompletion(event: CompletionEvent).

dagscheduler tasksetmanager
Figure 7. DAGScheduler and CompletionEvent
Note
CompletionEvent holds contextual information about the completed task.

The task knows about the stage it belongs to (using Task.stageId), the partition it works on (using Task.partitionId), and the stage attempt (using Task.stageAttemptId).

OutputCommitCoordinator.taskCompleted is called.

If the reason for task completion is not Success, SparkListenerTaskEnd is posted to LiveListenerBus. The only difference with TaskEndReason: Success is how the stage attempt id is calculated. Here, it is Task.stageAttemptId (not Stage.latestInfo.attemptId).

Caution
FIXME What is the difference between stage attempt ids?

If the stage the task belongs to has been cancelled, stageIdToStage should not contain it, and the method quits.

The main processing begins now depending on TaskEndReason - the reason for task completion (using event.reason). The method skips processing TaskEndReasons: TaskCommitDenied, ExceptionFailure, TaskResultLost, ExecutorLostFailure, TaskKilled, and UnknownReason, i.e. it does nothing.

TaskEndReason: Success

The partition the task worked on is removed from pendingPartitions of the stage.

The processing splits per task type - ResultTask or ShuffleMapTask - and submitWaitingStages() is called.

ResultTask

For ResultTask, the stage is ResultStage. If there is no job active for the stage (using resultStage.activeJob), the following INFO message appears in the logs:

INFO Ignoring result from [task] because its job has finished

Otherwise, check whether the task is marked as running for the job (using job.finished) and proceed. The method skips execution when the task has already been marked as completed in the job.

Caution
FIXME When could a task that has just finished be ignored, i.e. the job has already marked finished? Could it be for stragglers?

The partition is marked as finished (using job.finished) and the number of partitions calculated increased (using job.numFinished).

If the whole job has finished (when job.numFinished == job.numPartitions), then:

The JobListener of the job (using job.listener) is informed about the task completion (using job.listener.taskSucceeded(rt.outputId, event.result)). If the step fails, i.e. throws an exception, the JobListener is informed about it (using job.listener.jobFailed(new SparkDriverExecutionException(e))).

Caution
FIXME When would job.listener.taskSucceeded throw an exception? How?
ShuffleMapTask

For ShuffleMapTask, the stage is ShuffleMapStage.

event.result is MapStatus that knows the executor id where the task has finished (using status.location.executorId).

You should see the following DEBUG message in the logs:

DEBUG ShuffleMapTask finished on [execId]

If failedEpoch contains the executor and the epoch of the ShuffleMapTask is not greater than that in failedEpoch, you should see the following INFO message in the logs:

INFO Ignoring possibly bogus [task] completion from executor [executorId]

Otherwise, shuffleStage.addOutputLoc(smt.partitionId, status) is called.

The method does more processing only if the internal runningStages contains the ShuffleMapStage with no more pending partitions to compute (using shuffleStage.pendingPartitions).

markStageAsFinished(shuffleStage) is called.

The following INFO logs appear in the logs:

INFO looking for newly runnable stages
INFO running: [runningStages]
INFO waiting: [waitingStages]
INFO failed: [failedStages]

mapOutputTracker.registerMapOutputs with changeEpoch is called.

cacheLocs is cleared.

If the map stage is ready, i.e. all partitions have shuffle outputs, map-stage jobs waiting on this stage (using shuffleStage.mapStageJobs) are marked as finished. MapOutputTrackerMaster.getStatistics(shuffleStage.shuffleDep) is called and every map-stage job is markMapStageJobAsFinished(job, stats).

Otherwise, if the map stage is not ready, the following INFO message appears in the logs:

INFO Resubmitting [shuffleStage] ([shuffleStage.name]) because some of its tasks had failed: [missingPartitions]
Caution
FIXME All "…​is called" above should be rephrased to use links to appropriate sections.

TaskEndReason: Resubmitted

For Resubmitted case, you should see the following INFO message in the logs:

INFO Resubmitted [task], so marking it as still running

The task (by task.partitionId) is added to the collection of pending partitions of the stage (using stage.pendingPartitions).

Tip
A stage knows how many partitions are yet to be calculated. A task knows about the partition id for which it was launched.

TaskEndReason: FetchFailed

FetchFailed(bmAddress, shuffleId, mapId, reduceId, failureMessage) comes with BlockManagerId (as bmAddress) and the other self-explanatory values.

Note
A task knows about the id of the stage it belongs to.

When FetchFailed happens, stageIdToStage is used to access the failed stage (using task.stageId and the task is available in event in handleTaskCompletion(event: CompletionEvent)). shuffleToMapStage is used to access the map stage (using shuffleId).

If failedStage.latestInfo.attemptId != task.stageAttemptId, you should see the following INFO in the logs:

INFO Ignoring fetch failure from [task] as it's from [failedStage] attempt [task.stageAttemptId] and there is a more recent attempt for that stage (attempt ID [failedStage.latestInfo.attemptId]) running
Caution
FIXME What does failedStage.latestInfo.attemptId != task.stageAttemptId mean?

And the case finishes. Otherwise, the case continues.

If the failed stage is in runningStages, the following INFO message shows in the logs:

INFO Marking [failedStage] ([failedStage.name]) as failed due to a fetch failure from [mapStage] ([mapStage.name])

markStageAsFinished(failedStage, Some(failureMessage)) is called.

Caution
FIXME What does markStageAsFinished do?

If the failed stage is not in runningStages, the following DEBUG message shows in the logs:

DEBUG Received fetch failure from [task], but its from [failedStage] which is no longer running

When disallowStageRetryForTest is set, abortStage(failedStage, "Fetch failure will not retry stage due to testing config", None) is called.

Caution
FIXME Describe disallowStageRetryForTest and abortStage.

If the number of fetch failed attempts for the stage exceeds the allowed number (using Stage.failedOnFetchAndShouldAbort), the following method is called:

abortStage(failedStage, s"$failedStage (${failedStage.name}) has failed the maximum allowable number of times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. Most recent failure reason: ${failureMessage}", None)

If there are no failed stages reported (failedStages is empty), the following INFO shows in the logs:

INFO Resubmitting [mapStage] ([mapStage.name]) and [failedStage] ([failedStage.name]) due to fetch failure

And the following code is executed:

messageScheduler.schedule(
  new Runnable {
    override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages)
  }, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS)
Caution
FIXME What does the above code do?

For all the cases, the failed stage and map stages are both added to failedStages set.

If mapId (in the FetchFailed object for the case) is provided, the map stage output is cleaned up (as it is broken) using mapStage.removeOutputLoc(mapId, bmAddress) and MapOutputTrackerMaster.unregisterMapOutput(shuffleId, mapId, bmAddress) methods.

Caution
FIXME What does mapStage.removeOutputLoc do?

If bmAddress (in the FetchFailed object for the case) is provided, handleExecutorLost(bmAddress.executorId, fetchFailed = true, Some(task.epoch)) is called. See ExecutorLost and handleExecutorLost (with fetchFailed being false).

Caution
FIXME What does handleExecutorLost do?

Submit Waiting Stages (using submitWaitingStages)

DAGScheduler.submitWaitingStages method checks for waiting or failed stages that could now be eligible for submission.

The following TRACE messages show in the logs when the method is called:

TRACE DAGScheduler: Checking for newly runnable parent stages
TRACE DAGScheduler: running: [runningStages]
TRACE DAGScheduler: waiting: [waitingStages]
TRACE DAGScheduler: failed: [failedStages]

The method clears the internal waitingStages set with stages that wait for their parent stages to finish.

It goes over the waiting stages sorted by job ids in increasing order and calls submitStage method.

submitStage - Stage Submission

Caution
FIXME

DAGScheduler.submitStage(stage: Stage) is called when stage is ready for submission.

It recursively submits any missing parents of the stage.

There has to be an ActiveJob instance for the stage to proceed. Otherwise the stage and all the dependent jobs are aborted (using abortStage) with the message:

Job aborted due to stage failure: No active job for stage [stage.id]

For a stage with ActiveJob available, the following DEBUG message show up in the logs:

DEBUG DAGScheduler: submitStage([stage])

Only when the stage is not in waiting (waitingStages), running (runningStages) or failed states can this stage be processed.

A list of missing parent stages of the stage is calculated (see Calculating Missing Parent Stages) and the following DEBUG message shows up in the logs:

DEBUG DAGScheduler: missing: [missing]

When the stage has no parent stages missing, it is submitted and the INFO message shows up in the logs:

INFO DAGScheduler: Submitting [stage] ([stage.rdd]), which has no missing parents

And submitMissingTasks is called. That finishes the stage submission.

If however there are missing parent stages for the stage, all stages are processed recursively (using submitStage), and the stage is added to waitingStages set.

Calculating Missing Parent Map Stages

DAGScheduler.getMissingParentStages(stage: Stage) calculates missing parent map stages for a given stage.

It starts with the stage’s target RDD (as stage.rdd). If there are uncached partitions, it traverses the dependencies of the RDD (as RDD.dependencies) that can be the instances of ShuffleDependency or NarrowDependency.

For each ShuffleDependency, the method searches for the corresponding ShuffleMapStage (using getShuffleMapStage) and if unavailable, the method adds it to a set of missing (map) stages.

Caution
FIXME Review getShuffleMapStage
Caution
FIXME…​IMAGE with ShuffleDependencies queried

It continues traversing the chain for each NarrowDependency (using Dependency.rdd).

Fault recovery - stage attempts

A single stage can be re-executed in multiple attempts due to fault recovery. The number of attempts is configured (FIXME).

If TaskScheduler reports that a task failed because a map output file from a previous stage was lost, the DAGScheduler resubmits that lost stage. This is detected through a CompletionEvent with FetchFailed, or an ExecutorLost event. DAGScheduler will wait a small amount of time to see whether other nodes or tasks fail, then resubmit TaskSets for any lost stage(s) that compute the missing tasks.

Please note that tasks from the old attempts of a stage could still be running.

A stage object tracks multiple StageInfo objects to pass to Spark listeners or the web UI.

The latest StageInfo for the most recent attempt for a stage is accessible through latestInfo.

Cache Tracking

DAGScheduler tracks which RDDs are cached to avoid recomputing them and likewise remembers which shuffle map stages have already produced output files to avoid redoing the map side of a shuffle.

DAGScheduler is only interested in cache location coordinates, i.e. host and executor id, per partition of an RDD.

Caution
FIXME: A diagram, please

If the storage level of an RDD is NONE, there is no caching and hence no partition cache locations are available. In such cases, whenever asked, DAGScheduler returns a collection with empty-location elements for each partition. The empty-location elements are to mark uncached partitions.

Otherwise, a collection of RDDBlockId instances for each partition is created and spark-BlockManagerMaster.adoc[BlockManagerMaster] is asked for locations (using BlockManagerMaster.getLocations). The result is then mapped to a collection of TaskLocation for host and executor id.

Preferred Locations

DAGScheduler computes where to run each task in a stage based on the preferred locations of its underlying RDDs, or the location of cached or shuffle data.

Adaptive Query Planning

See SPARK-9850 Adaptive execution in Spark for the design document. The work is currently in progress.

DAGScheduler.submitMapStage method is used for adaptive query planning, to run map stages and look at statistics about their outputs before submitting downstream stages.

ScheduledExecutorService daemon services

DAGScheduler uses the following ScheduledThreadPoolExecutors (with the policy of removing cancelled tasks from a work queue at time of cancellation):

  • dag-scheduler-message - a daemon thread pool using j.u.c.ScheduledThreadPoolExecutor with core pool size 1. It is used to post ResubmitFailedStages when FetchFailed is reported.

They are created using ThreadUtils.newDaemonSingleThreadScheduledExecutor method that uses Guava DSL to instantiate a ThreadFactory.

submitMissingTasks for Stage and Job (submitMissingTasks method)

submitMissingTasks(stage: Stage, jobId: Int): Unit

submitMissingTasks is a private method that…​FIXME

When executed, it prints the following DEBUG message out to the logs:

DEBUG DAGScheduler: submitMissingTasks([stage])

pendingPartitions internal field of the stage is cleared (it is later filled out with the partitions to run tasks for).

The stage is asked for partitions to compute (see findMissingPartitions in Stages).

The method adds the stage to runningStages.

The stage is told to be started to OutputCommitCoordinator (using outputCommitCoordinator.stageStart)

Caution
FIXME Review outputCommitCoordinator.stageStart

The mapping between task ids and task preferred locations is computed (see getPreferredLocs - Computing Preferred Locations for Tasks and Partitions).

A new stage attempt is created (using Stage.makeNewStageAttempt).

The stage is serialized and broadcast to workers using SparkContext.broadcast method, i.e. it is Serializer.serialize to calculate taskBinaryBytes - an array of bytes of (rdd, func) for ResultStage and (rdd, shuffleDep) for ShuffleMapStage.

Caution
FIXME Review taskBinaryBytes.

When serializing the stage fails, the stage is removed from the internal runningStages set, abortStage is called and the method stops.

Caution
FIXME Review abortStage.

At this point in time, the stage is on workers.

For each partition to compute for the stage, a collection of ShuffleMapTask for ShuffleMapStage or ResultTask for ResultStage is created.

Caution
FIXME Image with creating tasks for partitions in the stage.

If there are tasks to launch (there are missing partitions in the stage), the following INFO and DEBUG messages are in the logs:

INFO DAGScheduler: Submitting [tasks.size] missing tasks from [stage] ([stage.rdd])
DEBUG DAGScheduler: New pending partitions: [stage.pendingPartitions]

All tasks in the collection become a TaskSet for TaskScheduler.submitTasks.

In case of no tasks to be submitted for a stage, a DEBUG message shows up in the logs.

DEBUG DAGScheduler: Stage [stage] is actually done; (available: ${stage.isAvailable},available outputs: ${stage.numAvailableOutputs},partitions: ${stage.numPartitions})
DEBUG DAGScheduler: Stage [stage] is actually done; (partitions: [numPartitions])
Note
submitMissingTasks is called when…​

getPreferredLocs - Computing Preferred Locations for Tasks and Partitions

Caution
FIXME Review + why does the method return a sequence of TaskLocations?
Note
Task ids correspond to partition ids.

Stopping

When a DAGScheduler stops (via stop()), it stops the internal dag-scheduler-message thread pool, dag-scheduler-event-loop, and TaskScheduler.

Metrics

Spark’s DAGScheduler uses Spark Metrics System (via DAGSchedulerSource) to report metrics about internal status.

Caution
FIXME What is DAGSchedulerSource?

The name of the source is DAGScheduler.

It emits the following numbers:

  • stage.failedStages - the number of failed stages

  • stage.runningStages - the number of running stages

  • stage.waitingStages - the number of waiting stages

  • job.allJobs - the number of all jobs

  • job.activeJobs - the number of active jobs

Updating Accumulators with Partial Values from Completed Tasks (updateAccumulators method)

updateAccumulators(event: CompletionEvent): Unit

The private updateAccumulators method merges the partial values of accumulators from a completed task into their "source" accumulators on the driver.

Note
It is called by handleTaskCompletion.

For each AccumulableInfo in the CompletionEvent, a partial value from a task is obtained (from AccumulableInfo.update) and added to the driver’s accumulator (using Accumulable.++= method).

For named accumulators with the update value being a non-zero value, i.e. not Accumulable.zero:

  • stage.latestInfo.accumulables for the AccumulableInfo.id is set

  • CompletionEvent.taskInfo.accumulables has a new AccumulableInfo added.

Caution
FIXME Where are Stage.latestInfo.accumulables and CompletionEvent.taskInfo.accumulables used?

Settings

  • spark.test.noStageRetry (default: false) - if enabled, FetchFailed will not cause stage retries, in order to surface the problem. Used for testing.