Note
|
The introduction that follows was highly influenced by the scaladoc of org.apache.spark.scheduler.DAGScheduler. As DAGScheduler is a private class it does not appear in the official API documentation. You are strongly encouraged to read the sources and only then read this and the related pages afterwards. "Reading the sources", I say?! Yes, I am kidding! |
DAGScheduler is the scheduling layer of Apache Spark that implements stage-oriented scheduling, i.e. after an RDD action has been called it becomes a job that is then transformed into a set of stages that are submitted as TaskSets for execution (see Execution Model).
The fundamental concepts of DAGScheduler are jobs and stages (refer to Jobs and Stages respectively).
DAGScheduler works on a driver. It is created as part of SparkContext’s initialization, right after TaskScheduler and SchedulerBackend are ready.
DAGScheduler does three things in Spark (thorough explanations follow):
-
Computes an execution DAG, i.e. DAG of stages, for a job.
-
Determines the preferred locations to run each task on.
-
Handles failures due to shuffle output files being lost.
It computes a directed acyclic graph (DAG) of stages for each job, keeps track of which RDDs and stage outputs are materialized, and finds a minimal schedule to run jobs. It then submits stages to TaskScheduler.
In addition to coming up with the execution DAG, DAGScheduler also determines the preferred locations to run each task on, based on the current cache status, and passes the information to TaskScheduler.
Furthermore, it handles failures due to shuffle output files being lost, in which case old stages may need to be resubmitted. Failures within a stage that are not caused by shuffle file loss are handled by the TaskScheduler itself, which will retry each task a small number of times before cancelling the whole stage.
DAGScheduler uses an event queue architecture in which a thread can post DAGSchedulerEvent
events, e.g. a new job or stage being submitted, that DAGScheduler reads and executes sequentially. See the section Internal Event Loop - dag-scheduler-event-loop.
DAGScheduler runs stages in topological order.
Tip
|
Enable Add the following line to
Refer to Logging. |
DAGScheduler needs SparkContext, Task Scheduler, LiveListenerBus, MapOutputTracker and Block Manager to work. However, at the very minimum, DAGScheduler needs SparkContext only (and asks SparkContext for the other services).
DAGScheduler reports metrics about its execution (refer to the section Metrics).
When DAGScheduler schedules a job as a result of executing an action on a RDD or calling SparkContext.runJob() method directly, it spawns parallel tasks to compute (partial) results per partition.
DAGScheduler
maintains the following information in internal registries:
-
nextJobId
for the next job id -
numTotalJobs
(alias ofnextJobId
) for the total number of submitted -
nextStageId
for the next stage id -
jobIdToStageIds
for a mapping between jobs and their stages -
stageIdToStage
for a mapping between stage ids to stages -
shuffleToMapStage
for a mapping between ids to ShuffleMapStages -
jobIdToActiveJob
for a mapping between job ids to ActiveJobs -
waitingStages
for stages with parents to be computed -
runningStages
for stages currently being run -
failedStages
for stages that failed due to fetch failures (as reported by CompletionEvents for FetchFailed end reasons) and are going to be resubmitted. -
activeJobs
for a collection of ActiveJob instances -
cacheLocs
is a mapping between RDD ids and their cache preferences per partition (as arrays indexed by partition numbers). Each array value is the set of locations where that RDD partition is cached on. See Cache Tracking. -
failedEpoch
is a mapping between failed executors and the epoch number when the failure was caught per executor.
Caution
|
FIXME Review
|
resubmitFailedStages()
is called to go over failedStages collection (of failed stages) and submit them (using submitStage).
If the failed stages collection contains any stage, the following INFO message appears in the logs:
INFO Resubmitting failed stages
cacheLocs
and failedStages are cleared, and failed stages are submitStage one by one, ordered by job ids (in an increasing order).
Ultimately, all waiting stages are submitted (using submitWaitingStages).
When executed, DAGScheduler.runJob
is given the following arguments:
-
A RDD to run job on.
-
A function to run on each partition of the RDD.
-
A set of partitions to run on (not all partitions are always required to compute a job for actions like
first()
ortake()
). -
A callback function
resultHandler
to pass results of executing the function to. -
Properties to attach to a job.
It calls DAGScheduler.submitJob and then waits until a result comes using a JobWaiter object. A job can succeed or fail.
When a job succeeds, the following INFO shows up in the logs:
INFO Job [jobId] finished: [callSite], took [time] s
When a job fails, the following INFO shows up in the logs:
INFO Job [jobId] failed: [callSite], took [time] s
The method finishes by throwing an exception.
DAGScheduler.submitJob
is called by SparkContext.submitJob
and DAGScheduler.runJob.
When called, it does the following:
-
Checks whether the set of partitions to run a function on are in the the range of available partitions of the RDD.
-
Increments the internal
nextJobId
job counter. -
Returns a 0-task JobWaiter when no partitions are passed in.
-
Or posts JobSubmitted event to dag-scheduler-event-loop and returns a JobWaiter.
You may see an exception thrown when the partitions in the set are outside the range:
Attempting to access a non-existent partition: [p]. Total number of partitions: [maxPartitions]
You can listen for job completion or failure events after submitting a job to the DAGScheduler using JobListener
. It is a private[spark]
contract (a Scala trait) with the following two methods:
private[spark] trait JobListener {
def taskSucceeded(index: Int, result: Any)
def jobFailed(exception: Exception)
}
A job listener is notified each time a task succeeds (by def taskSucceeded(index: Int, result: Any)
), as well as if the whole job fails (by def jobFailed(exception: Exception)
).
An instance of JobListener
is used in the following places:
-
In
ActiveJob
as a listener to notify if tasks in this job finish or the job fails. -
In
DAGScheduler.handleJobSubmitted
-
In
DAGScheduler.handleMapStageSubmitted
-
In
JobSubmitted
-
In
MapStageSubmitted
The following are the job listeners used:
-
JobWaiter waits until DAGScheduler completes the job and passes the results of tasks to a
resultHandler
function. -
ApproximateActionListener
FIXME
A JobWaiter
is an extension of JobListener. It is used as the return value of DAGScheduler.submitJob and DAGScheduler.submitMapStage
. You can use a JobWaiter to block until the job finishes executing or to cancel it.
While the methods execute, JobSubmitted
and MapStageSubmitted
events are posted that reference the JobWaiter.
Since a JobWaiter
object is a JobListener
it gets notifications about taskSucceeded
and jobFailed
. When the total number of tasks (that equals the number of partitions to compute) equals the number of taskSucceeded
, the JobWaiter
instance is marked succeeded. A jobFailed
event marks the JobWaiter
instance failed.
-
FIXME Who’s using
submitMapStage
?
executorAdded(execId: String, host: String)
method simply posts a ExecutorAdded event to eventProcessLoop
.
taskEnded(
task: Task[_],
reason: TaskEndReason,
result: Any,
accumUpdates: Map[Long, Any],
taskInfo: TaskInfo,
taskMetrics: TaskMetrics): Unit
taskEnded
method simply posts a CompletionEvent event to the DAGScheduler’s internal event loop.
Note
|
DAGScheduler.taskEnded method is called by a TaskSetManager to report task completions, failures including.
|
Tip
|
Read about TaskMetrics in TaskMetrics.
|
The internal failJobAndIndependentStages
method…FIXME
Note
|
It is called by…FIXME |
DAGScheduler.eventProcessLoop
(of type DAGSchedulerEventProcessLoop
) - is the event process loop to which Spark (by DAGScheduler.submitJob) posts jobs to schedule their execution. Later on, TaskSetManager talks back to DAGScheduler to inform about the status of the tasks using the same "communication channel".
It allows Spark to release the current thread when posting happens and let the event loop handle events on a separate thread - asynchronously.
…IMAGE…FIXME
Internally, DAGSchedulerEventProcessLoop uses java.util.concurrent.LinkedBlockingDeque blocking deque that grows indefinitely (i.e. up to Integer.MAX_VALUE events).
The name of the single "logic" thread that reads events and takes decisions is dag-scheduler-event-loop.
"dag-scheduler-event-loop" #89 daemon prio=5 os_prio=31 tid=0x00007f809bc0a000 nid=0xc903 waiting on condition [0x0000000125826000]
The following are the current types of DAGSchedulerEvent
events that are handled by DAGScheduler
:
-
JobSubmitted - posted when an action job is submitted to DAGScheduler (via submitJob or
runApproximateJob
). -
MapStageSubmitted - posted when a ShuffleMapStage is submitted (via
submitMapStage
). -
JobGroupCancelled
-
AllJobsCancelled
-
BeginEvent
- posted when TaskSetManager reports that a task is starting.dagScheduler.handleBeginEvent
is executed in turn. -
GettingResultEvent
- posted when TaskSetManager reports that a task has completed and results are being fetched remotely.dagScheduler.handleGetTaskResult
executes in turn. -
CompletionEvent - posted when TaskSetManager reports that a task has completed successfully or failed.
-
ExecutorAdded - executor (
execId
) has been spawned on a host (host
). Remove it from the failed executors list if it was included, and submitWaitingStages(). -
TaskSetFailed
-
ResubmitFailedStages
Caution
|
FIXME
|
JobCancelled(jobId: Int)
event is posted to cancel a job if it is scheduled or still running. It triggers execution of DAGScheduler.handleStageCancellation(stageId)
.
Note
|
It seems that although SparkContext.cancelJob(jobId: Int) calls DAGScheduler.cancelJob , no feature/code in Spark calls SparkContext.cancelJob(jobId: Int) . A dead code?
|
When JobWaiter.cancel is called, it calls DAGScheduler.cancelJob
. You should see the following INFO message in the logs:
INFO Asked to cancel job [jobId]
It is a signal to the DAGScheduler to cancel the job.
Caution
|
FIXME |
ExecutorAdded(execId, host)
event triggers execution of DAGScheduler.handleExecutorAdded(execId: String, host: String)
.
It checks failedEpoch for the executor id (using execId
) and if it is found the following INFO message appears in the logs:
INFO Host added was in lost list earlier: [host]
The executor is removed from the list of failed nodes.
At the end, DAGScheduler.submitWaitingStages() is called.
ExecutorLost(execId)
event triggers execution of DAGScheduler.handleExecutorLost(execId: String, fetchFailed: Boolean, maybeEpoch: Option[Long] = None)
with fetchFailed
being false
.
Note
|
|
The current epoch number could be provided (as maybeEpoch
) or it is calculated by requesting it from MapOutputTrackerMaster (using MapOutputTrackerMaster.getEpoch).
Recurring ExecutorLost events merely lead to the following DEBUG message in the logs:
DEBUG Additional executor lost message for [execId] (epoch [currentEpoch])
If however the executor is not in the list of executor lost or the failed epoch number is smaller than the current one, the executor is added to failedEpoch.
The following INFO message appears in the logs:
INFO Executor lost: [execId] (epoch [currentEpoch])
The executor execId
is removed (from BlockManagerMaster
on the driver).
If the external shuffle service is not used or the ExecutorLost
event was for a map output fetch operation, all ShuffleMapStage (using shuffleToMapStage
) are called (in order):
-
ShuffleMapStage.removeOutputsOnExecutor(execId)
For no ShuffleMapStages (in shuffleToMapStage
), MapOutputTrackerMaster.incrementEpoch is called.
cacheLocs is cleared.
At the end, DAGScheduler.submitWaitingStages() is called.
StageCancelled(stageId: Int)
event is posted to cancel a stage and all jobs associated with it. It triggers execution of DAGScheduler.handleStageCancellation(stageId)
.
It is the result of executing SparkContext.cancelStage(stageId: Int)
that is called from the web UI (controlled by spark.ui.killEnabled).
Caution
|
FIXME Image of the tab with kill |
DAGScheduler.handleStageCancellation(stageId)
checks whether the stageId
stage exists and for each job associated with the stage, it calls handleJobCancellation(jobId, s"because Stage [stageId] was cancelled")
.
Note
|
A stage knows what jobs it is part of using the internal set jobIds .
|
def handleJobCancellation(jobId: Int, reason: String = "")
checks whether the job exists in jobIdToStageIds
and if not, prints the following DEBUG to the logs:
DEBUG Trying to cancel unregistered job [jobId]
However, if the job exists, the job and all the stages that are only used by it (using the internal failJobAndIndependentStages method).
For each running stage associated with the job (jobIdToStageIds
), if there is only one job for the stage (stageIdToStage
), TaskScheduler.cancelTasks is called, outputCommitCoordinator.stageEnd(stage.id)
, and SparkListenerStageCompleted is posted. The stage is no longer a running one (removed from runningStages
).
Caution
|
FIXME Image please with the call to TaskScheduler. |
-
spark.job.interruptOnCancel
(default:false
) - controls whether or not to interrupt a job on cancel.
In case TaskScheduler.cancelTasks completed successfully, JobListener is informed about job failure, cleanupStateForJobAndIndependentStages
is called, and SparkListenerJobEnd posted.
Caution
|
FIXME cleanupStateForJobAndIndependentStages code review.
|
Caution
|
FIXME Where are job.properties assigned to a job?
|
"Job %d cancelled %s".format(jobId, reason)
If no stage exists for stageId
, the following INFO message shows in the logs:
INFO No active jobs to kill for Stage [stageId]
At the end, DAGScheduler.submitWaitingStages() is called.
When a MapStageSubmitted event is posted, it triggers execution of DAGScheduler.handleMapStageSubmitted
method.
It is called with a job id (for a new job to be created), a ShuffleDependency, and a JobListener.
You should see the following INFOs in the logs:
Got map stage job %s (%s) with %d output partitions
Final stage: [finalStage] ([finalStage.name])
Parents of final stage: [finalStage.parents]
Missing parents: [list of stages]
SparkListenerJobStart event is posted to LiveListenerBus (so other event listeners know about the event - not only DAGScheduler).
The execution procedure of MapStageSubmitted events is then exactly (FIXME ?) as for JobSubmitted.
Tip
|
The difference between
|
When DAGScheduler receives JobSubmitted event it calls DAGScheduler.handleJobSubmitted
method.
handleJobSubmitted
has access to the final RDD, the partitions to compute, and the JobListener for the job, i.e. JobWaiter.
It creates a new ResultStage (as finalStage
on the picture) and instantiates ActiveJob
.
Caution
|
FIXME review newResultStage
|
You should see the following INFO messages in the logs:
INFO DAGScheduler: Got job [jobId] ([callSite.shortForm]) with [partitions.length] output partitions
INFO DAGScheduler: Final stage: [finalStage] ([finalStage.name])
INFO DAGScheduler: Parents of final stage: [finalStage.parents]
INFO DAGScheduler: Missing parents: [getMissingParentStages(finalStage)]
Then, the finalStage
stage is given the ActiveJob instance and some housekeeping is performed to track the job (using jobIdToActiveJob
and activeJobs
).
SparkListenerJobStart message is posted to LiveListenerBus.
Caution
|
FIXME jobIdToStageIds and stageIdToStage - they’re already computed. When? Where?
|
When DAGScheduler executes a job it first submits the final stage (using submitStage).
Right before handleJobSubmitted
finishes, DAGScheduler.submitWaitingStages() is called.
CompletionEvent
event informs DAGScheduler about task completions. It is handled by handleTaskCompletion(event: CompletionEvent)
.
Note
|
CompletionEvent holds contextual information about the completed task.
|
The task knows about the stage it belongs to (using Task.stageId
), the partition it works on (using Task.partitionId
), and the stage attempt (using Task.stageAttemptId
).
OutputCommitCoordinator.taskCompleted
is called.
If the reason for task completion is not Success
, SparkListenerTaskEnd is posted to LiveListenerBus. The only difference with TaskEndReason: Success is how the stage attempt id is calculated. Here, it is Task.stageAttemptId
(not Stage.latestInfo.attemptId
).
Caution
|
FIXME What is the difference between stage attempt ids? |
If the stage the task belongs to has been cancelled, stageIdToStage
should not contain it, and the method quits.
The main processing begins now depending on TaskEndReason
- the reason for task completion (using event.reason
). The method skips processing TaskEndReasons
: TaskCommitDenied
, ExceptionFailure
, TaskResultLost
, ExecutorLostFailure
, TaskKilled
, and UnknownReason
, i.e. it does nothing.
SparkListenerTaskEnd is posted to LiveListenerBus.
The partition the task worked on is removed from pendingPartitions
of the stage.
The processing splits per task type - ResultTask or ShuffleMapTask - and submitWaitingStages() is called.
For ResultTask
, the stage is ResultStage. If there is no job active for the stage (using resultStage.activeJob
), the following INFO message appears in the logs:
INFO Ignoring result from [task] because its job has finished
Otherwise, check whether the task is marked as running for the job (using job.finished
) and proceed. The method skips execution when the task has already been marked as completed in the job.
Caution
|
FIXME When could a task that has just finished be ignored, i.e. the job has already marked finished ? Could it be for stragglers?
|
updateAccumulators(event) is called.
The partition is marked as finished
(using job.finished
) and the number of partitions calculated increased (using job.numFinished
).
If the whole job has finished (when job.numFinished == job.numPartitions
), then:
-
markStageAsFinished
is called -
cleanupStateForJobAndIndependentStages(job)
-
SparkListenerJobEnd is posted to LiveListenerBus with
JobSucceeded
The JobListener
of the job (using job.listener
) is informed about the task completion (using job.listener.taskSucceeded(rt.outputId, event.result)
). If the step fails, i.e. throws an exception, the JobListener is informed about it (using job.listener.jobFailed(new SparkDriverExecutionException(e))
).
Caution
|
FIXME When would job.listener.taskSucceeded throw an exception? How?
|
For ShuffleMapTask, the stage is ShuffleMapStage.
updateAccumulators(event) is called.
event.result
is MapStatus
that knows the executor id where the task has finished (using status.location.executorId
).
You should see the following DEBUG message in the logs:
DEBUG ShuffleMapTask finished on [execId]
If failedEpoch contains the executor and the epoch of the ShuffleMapTask is not greater than that in failedEpoch, you should see the following INFO message in the logs:
INFO Ignoring possibly bogus [task] completion from executor [executorId]
Otherwise, shuffleStage.addOutputLoc(smt.partitionId, status)
is called.
The method does more processing only if the internal runningStages
contains the ShuffleMapStage with no more pending partitions to compute (using shuffleStage.pendingPartitions
).
markStageAsFinished(shuffleStage)
is called.
The following INFO logs appear in the logs:
INFO looking for newly runnable stages
INFO running: [runningStages]
INFO waiting: [waitingStages]
INFO failed: [failedStages]
mapOutputTracker.registerMapOutputs with changeEpoch
is called.
cacheLocs is cleared.
If the map stage is ready, i.e. all partitions have shuffle outputs, map-stage jobs waiting on this stage (using shuffleStage.mapStageJobs
) are marked as finished. MapOutputTrackerMaster.getStatistics(shuffleStage.shuffleDep) is called and every map-stage job is markMapStageJobAsFinished(job, stats)
.
Otherwise, if the map stage is not ready, the following INFO message appears in the logs:
INFO Resubmitting [shuffleStage] ([shuffleStage.name]) because some of its tasks had failed: [missingPartitions]
submitStage(shuffleStage) is called.
Caution
|
FIXME All "…is called" above should be rephrased to use links to appropriate sections. |
For Resubmitted
case, you should see the following INFO message in the logs:
INFO Resubmitted [task], so marking it as still running
The task (by task.partitionId
) is added to the collection of pending partitions of the stage (using stage.pendingPartitions
).
Tip
|
A stage knows how many partitions are yet to be calculated. A task knows about the partition id for which it was launched. |
FetchFailed(bmAddress, shuffleId, mapId, reduceId, failureMessage)
comes with BlockManagerId
(as bmAddress
) and the other self-explanatory values.
Note
|
A task knows about the id of the stage it belongs to. |
When FetchFailed
happens, stageIdToStage
is used to access the failed stage (using task.stageId
and the task
is available in event
in handleTaskCompletion(event: CompletionEvent)
). shuffleToMapStage
is used to access the map stage (using shuffleId
).
If failedStage.latestInfo.attemptId != task.stageAttemptId
, you should see the following INFO in the logs:
INFO Ignoring fetch failure from [task] as it's from [failedStage] attempt [task.stageAttemptId] and there is a more recent attempt for that stage (attempt ID [failedStage.latestInfo.attemptId]) running
Caution
|
FIXME What does failedStage.latestInfo.attemptId != task.stageAttemptId mean?
|
And the case finishes. Otherwise, the case continues.
If the failed stage is in runningStages
, the following INFO message shows in the logs:
INFO Marking [failedStage] ([failedStage.name]) as failed due to a fetch failure from [mapStage] ([mapStage.name])
markStageAsFinished(failedStage, Some(failureMessage))
is called.
Caution
|
FIXME What does markStageAsFinished do?
|
If the failed stage is not in runningStages
, the following DEBUG message shows in the logs:
DEBUG Received fetch failure from [task], but its from [failedStage] which is no longer running
When disallowStageRetryForTest
is set, abortStage(failedStage, "Fetch failure will not retry stage due to testing config", None)
is called.
Caution
|
FIXME Describe disallowStageRetryForTest and abortStage .
|
If the number of fetch failed attempts for the stage exceeds the allowed number (using Stage.failedOnFetchAndShouldAbort), the following method is called:
abortStage(failedStage, s"$failedStage (${failedStage.name}) has failed the maximum allowable number of times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. Most recent failure reason: ${failureMessage}", None)
If there are no failed stages reported (failedStages is empty), the following INFO shows in the logs:
INFO Resubmitting [mapStage] ([mapStage.name]) and [failedStage] ([failedStage.name]) due to fetch failure
And the following code is executed:
messageScheduler.schedule(
new Runnable {
override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages)
}, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS)
Caution
|
FIXME What does the above code do? |
For all the cases, the failed stage and map stages are both added to failedStages set.
If mapId
(in the FetchFailed
object for the case) is provided, the map stage output is cleaned up (as it is broken) using mapStage.removeOutputLoc(mapId, bmAddress)
and MapOutputTrackerMaster.unregisterMapOutput(shuffleId, mapId, bmAddress) methods.
Caution
|
FIXME What does mapStage.removeOutputLoc do?
|
If bmAddress
(in the FetchFailed
object for the case) is provided, handleExecutorLost(bmAddress.executorId, fetchFailed = true, Some(task.epoch))
is called. See ExecutorLost and handleExecutorLost (with fetchFailed being false).
Caution
|
FIXME What does handleExecutorLost do?
|
DAGScheduler.submitWaitingStages
method checks for waiting or failed stages that could now be eligible for submission.
The following TRACE
messages show in the logs when the method is called:
TRACE DAGScheduler: Checking for newly runnable parent stages
TRACE DAGScheduler: running: [runningStages]
TRACE DAGScheduler: waiting: [waitingStages]
TRACE DAGScheduler: failed: [failedStages]
The method clears the internal waitingStages
set with stages that wait for their parent stages to finish.
It goes over the waiting stages sorted by job ids in increasing order and calls submitStage method.
Caution
|
FIXME |
DAGScheduler.submitStage(stage: Stage)
is called when stage
is ready for submission.
It recursively submits any missing parents of the stage.
There has to be an ActiveJob instance for the stage to proceed. Otherwise the stage and all the dependent jobs are aborted (using abortStage
) with the message:
Job aborted due to stage failure: No active job for stage [stage.id]
For a stage with ActiveJob available, the following DEBUG message show up in the logs:
DEBUG DAGScheduler: submitStage([stage])
Only when the stage is not in waiting (waitingStages
), running (runningStages
) or failed states can this stage be processed.
A list of missing parent stages of the stage is calculated (see Calculating Missing Parent Stages) and the following DEBUG message shows up in the logs:
DEBUG DAGScheduler: missing: [missing]
When the stage has no parent stages missing, it is submitted and the INFO message shows up in the logs:
INFO DAGScheduler: Submitting [stage] ([stage.rdd]), which has no missing parents
And submitMissingTasks is called. That finishes the stage submission.
If however there are missing parent stages for the stage, all stages are processed recursively (using submitStage), and the stage is added to waitingStages
set.
DAGScheduler.getMissingParentStages(stage: Stage)
calculates missing parent map stages for a given stage
.
It starts with the stage’s target RDD (as stage.rdd
). If there are uncached partitions, it traverses the dependencies of the RDD (as RDD.dependencies
) that can be the instances of ShuffleDependency or NarrowDependency.
For each ShuffleDependency, the method searches for the corresponding ShuffleMapStage (using getShuffleMapStage
) and if unavailable, the method adds it to a set of missing (map) stages.
Caution
|
FIXME Review getShuffleMapStage
|
Caution
|
FIXME…IMAGE with ShuffleDependencies queried |
It continues traversing the chain for each NarrowDependency (using Dependency.rdd
).
A single stage can be re-executed in multiple attempts due to fault recovery. The number of attempts is configured (FIXME).
If TaskScheduler
reports that a task failed because a map output file from a previous stage was lost, the DAGScheduler resubmits that lost stage. This is detected through a CompletionEvent
with FetchFailed
, or an ExecutorLost event. DAGScheduler
will wait a small amount of time to see whether other nodes or tasks fail, then resubmit TaskSets
for any lost stage(s) that compute the missing tasks.
Please note that tasks from the old attempts of a stage could still be running.
A stage object tracks multiple StageInfo
objects to pass to Spark listeners or the web UI.
The latest StageInfo
for the most recent attempt for a stage is accessible through latestInfo
.
DAGScheduler tracks which RDDs are cached to avoid recomputing them and likewise remembers which shuffle map stages have already produced output files to avoid redoing the map side of a shuffle.
DAGScheduler is only interested in cache location coordinates, i.e. host and executor id, per partition of an RDD.
Caution
|
FIXME: A diagram, please |
If the storage level of an RDD is NONE, there is no caching and hence no partition cache locations are available. In such cases, whenever asked, DAGScheduler returns a collection with empty-location elements for each partition. The empty-location elements are to mark uncached partitions.
Otherwise, a collection of RDDBlockId
instances for each partition is created and spark-BlockManagerMaster.adoc[BlockManagerMaster] is asked for locations (using BlockManagerMaster.getLocations
). The result is then mapped to a collection of TaskLocation
for host and executor id.
DAGScheduler computes where to run each task in a stage based on the preferred locations of its underlying RDDs, or the location of cached or shuffle data.
See SPARK-9850 Adaptive execution in Spark for the design document. The work is currently in progress.
DAGScheduler.submitMapStage method is used for adaptive query planning, to run map stages and look at statistics about their outputs before submitting downstream stages.
DAGScheduler uses the following ScheduledThreadPoolExecutors (with the policy of removing cancelled tasks from a work queue at time of cancellation):
-
dag-scheduler-message
- a daemon thread pool usingj.u.c.ScheduledThreadPoolExecutor
with core pool size1
. It is used to postResubmitFailedStages
whenFetchFailed
is reported.
They are created using ThreadUtils.newDaemonSingleThreadScheduledExecutor
method that uses Guava DSL to instantiate a ThreadFactory.
submitMissingTasks(stage: Stage, jobId: Int): Unit
submitMissingTasks
is a private method that…FIXME
When executed, it prints the following DEBUG message out to the logs:
DEBUG DAGScheduler: submitMissingTasks([stage])
pendingPartitions
internal field of the stage is cleared (it is later filled out with the partitions to run tasks for).
The stage is asked for partitions to compute (see findMissingPartitions in Stages).
The method adds the stage to runningStages
.
The stage is told to be started to OutputCommitCoordinator (using outputCommitCoordinator.stageStart
)
Caution
|
FIXME Review outputCommitCoordinator.stageStart
|
The mapping between task ids and task preferred locations is computed (see getPreferredLocs - Computing Preferred Locations for Tasks and Partitions).
A new stage attempt is created (using Stage.makeNewStageAttempt
).
SparkListenerStageSubmitted is posted.
The stage is serialized and broadcast to workers using SparkContext.broadcast method, i.e. it is Serializer.serialize
to calculate taskBinaryBytes
- an array of bytes of (rdd, func) for ResultStage and (rdd, shuffleDep) for ShuffleMapStage.
Caution
|
FIXME Review taskBinaryBytes .
|
When serializing the stage fails, the stage is removed from the internal runningStages
set, abortStage
is called and the method stops.
Caution
|
FIXME Review abortStage .
|
At this point in time, the stage is on workers.
For each partition to compute for the stage, a collection of ShuffleMapTask for ShuffleMapStage or
ResultTask
for ResultStage is created.
Caution
|
FIXME Image with creating tasks for partitions in the stage. |
If there are tasks to launch (there are missing partitions in the stage), the following INFO and DEBUG messages are in the logs:
INFO DAGScheduler: Submitting [tasks.size] missing tasks from [stage] ([stage.rdd])
DEBUG DAGScheduler: New pending partitions: [stage.pendingPartitions]
All tasks in the collection become a TaskSet for TaskScheduler.submitTasks.
In case of no tasks to be submitted for a stage, a DEBUG message shows up in the logs.
For ShuffleMapStage:
DEBUG DAGScheduler: Stage [stage] is actually done; (available: ${stage.isAvailable},available outputs: ${stage.numAvailableOutputs},partitions: ${stage.numPartitions})
For ResultStage:
DEBUG DAGScheduler: Stage [stage] is actually done; (partitions: [numPartitions])
Note
|
submitMissingTasks is called when…
|
Caution
|
FIXME Review + why does the method return a sequence of TaskLocations? |
Note
|
Task ids correspond to partition ids. |
When a DAGScheduler stops (via stop()
), it stops the internal dag-scheduler-message
thread pool, dag-scheduler-event-loop, and TaskScheduler.
Spark’s DAGScheduler uses Spark Metrics System (via DAGSchedulerSource
) to report metrics about internal status.
Caution
|
FIXME What is DAGSchedulerSource ?
|
The name of the source is DAGScheduler.
It emits the following numbers:
-
stage.failedStages - the number of failed stages
-
stage.runningStages - the number of running stages
-
stage.waitingStages - the number of waiting stages
-
job.allJobs - the number of all jobs
-
job.activeJobs - the number of active jobs
updateAccumulators(event: CompletionEvent): Unit
The private updateAccumulators
method merges the partial values of accumulators from a completed task into their "source" accumulators on the driver.
Note
|
It is called by handleTaskCompletion. |
For each AccumulableInfo in the CompletionEvent
, a partial value from a task is obtained (from AccumulableInfo.update
) and added to the driver’s accumulator (using Accumulable.++=
method).
For named accumulators with the update value being a non-zero value, i.e. not Accumulable.zero
:
-
stage.latestInfo.accumulables
for theAccumulableInfo.id
is set -
CompletionEvent.taskInfo.accumulables
has a new AccumulableInfo added.
Caution
|
FIXME Where are Stage.latestInfo.accumulables and CompletionEvent.taskInfo.accumulables used?
|