You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
split graph into stages of tasks:DAGScheduler负责接收由RDD构成的DAG,将一系列RDD划分到不同的Stage。根据Stage的不同类型(目前有ResultStage和Shuffle MapStage两种),给Stage中未完成的Partition创建不同类型的Task(目前有ResultTask和ShuffleMapTask两种)。每个Stage将因为未完成Partition的多少,创建零到多个Task。DAGScheduler最后将每个Stage中的Task以任务集合(TaskSet)的形式提交给Task Scheduler继续处理。
launch tasks via cluster manager:使用集群管理器(clustermanager)分配资源与任务调度,对于失败的任务还会有一定的重试与容错机制。TaskScheduler负责从DAGScheduler接收TaskSet,创建TaskSetManager对TaskSet进行管理,并将此TaskSetManager添加到调度池中,最后将对Task的调度交给调度后端接口(SchedulerBackend)处理。SchedulerBackend首先申请TaskScheduler,按照Task调度算法(目前有FIFO和FAIR两种)对调度池中的所有TaskSetManager进行排序,然后对TaskSet按照最大数据本地性原则分配资源,最后在各个分配的节点上运行TaskSet中的Task。
/** * 返回给定RDD是仅通过狭窄的依赖关系的顺序与它的祖先。 该遍历使用DFS给定的RDD的依赖关系树,但仍保持在返回的RDDS没有顺序。 * Return the ancestors of the given RDD that are related to it only through a sequence of * narrow dependencies. This traverses the given RDD's dependency tree using DFS, but maintains * no ordering on the RDDs returned.*/private[spark] defgetNarrowAncestors:Seq[RDD[_]] = {
// 窄依赖RDD的祖先集合valancestors=new mutable.HashSet[RDD[_]]
// 偏方法defvisit(rdd: RDD[_]):Unit= {
// 变量rdd的依赖,筛选出来窄依赖valnarrowDependencies= rdd.dependencies.filter(_.isInstanceOf[NarrowDependency[_]])
// 获取窄依赖的父RDDvalnarrowParents= narrowDependencies.map(_.rdd)
// 判断祖先是否包含该RDDvalnarrowParentsNotVisited= narrowParents.filterNot(ancestors.contains)
// 将祖先添加到集合,并且DFS方式回溯搜索祖先的祖先
narrowParentsNotVisited.foreach { parent =>
ancestors.add(parent)
visit(parent)
}
}
// 调用查询组件方法
visit(this)
// In case there is a cycle, do not include the root itself// 移除当前RDD进入组件集合
ancestors.filterNot(_ ==this).toSeq
}
RDD依赖
窄依赖
RDD与上游RDD的分区是一对一的关系
abstractclassNarrowDependency[T](_rdd: RDD[T]) extendsDependency[T] {
/** * 根据子分区id获取其父亲分区id,可以由多个父亲分区id * Get the parent partitions for a child partition. * @parampartitionId a partition of the child RDD * @return the partitions of the parent RDD that the child partition depends upon*/defgetParents(partitionId: Int):Seq[Int]
/** * 上游RDD * @return*/overridedefrdd:RDD[T] = _rdd
}
/** * :: DeveloperApi :: * Represents a one-to-one dependency between ranges of partitions in the parent and child RDDs. * @paramrdd the parent RDD * @paraminStart the start of the range in the parent RDD 父RDD中range的开始 * @paramoutStart the start of the range in the child RDD 子RDD中range的开始 * @paramlength the length of the range range的长度*/@DeveloperApiclassRangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
extendsNarrowDependency[T](rdd) {
overridedefgetParents(partitionId: Int):List[Int] = {
if (partitionId >= outStart && partitionId < outStart + length) {
List(partitionId - outStart + inStart)
} else {
Nil
}
}
}
/** * :: DeveloperApi :: * Represents a dependency on the output of a shuffle stage. Note that in the case of shuffle, * the RDD is transient since we don't need it on the executor side. * * @param_rdd the parent RDD 父RDD * @parampartitioner partitioner used to partition the shuffle output 分区器,用于对shuffle输出进行分区 * @paramserializer [[org.apache.spark.serializer.Serializer Serializer]] to use. If not set * explicitly then the default serializer, as specified by `spark.serializer` * config option, will be used. * @paramkeyOrdering key ordering for RDD's shuffles 排序的key * @paramaggregator map/reduce-side aggregator for RDD's shuffle rdd的shuffle是map端或者reduce端聚合 * @parammapSideCombine whether to perform partial aggregation (also known as map-side combine) 是否在map端进行预计算*/@DeveloperApiclassShuffleDependency[K:ClassTag, V:ClassTag, C:ClassTag](
@transient privateval_rdd:RDD[_ <:Product2[K, V]],
valpartitioner:Partitioner,
valserializer:Serializer=SparkEnv.get.serializer,
valkeyOrdering:Option[Ordering[K]] =None,
valaggregator:Option[Aggregator[K, V, C]] =None,
valmapSideCombine:Boolean=false)
extendsDependency[Product2[K, V]] {
// 如果设置map端预算,判断aggregator是否定义if (mapSideCombine) {
require(aggregator.isDefined, "Map-side combine without Aggregator specified!")
}
// 判断rddoverridedefrdd:RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]
// rdd的key的全类名private[spark] valkeyClassName:String= reflect.classTag[K].runtimeClass.getName
// rdd的value的全类名private[spark] valvalueClassName:String= reflect.classTag[V].runtimeClass.getName
// Note: It's possible that the combiner class tag is null, if the combineByKey// methods in PairRDDFunctions are used instead of combineByKeyWithClassTag.// 预计算函数的全类名private[spark] valcombinerClassName:Option[String] =Option(reflect.classTag[C]).map(_.runtimeClass.getName)
// shuffleIdvalshuffleId:Int= _rdd.context.newShuffleId()
// shuffle处理器,向shuffleManager注册valshuffleHandle:ShuffleHandle= _rdd.context.env.shuffleManager.registerShuffle(
shuffleId, _rdd.partitions.length, this)
// 注册shuffle的contextCleaner,用于清理shuffle中间结果
_rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
}
defdefaultPartitioner(rdd: RDD[_], others: RDD[_]*):Partitioner= {
// 将可变参数放入到一个Seq中valrdds:Seq[RDD[_]] = (Seq(rdd) ++ others)
// 筛选出存在分区的RDDvalhasPartitioner:Seq[RDD[_]] = rdds.filter(_.partitioner.exists(_.numPartitions >0))
// 获取最大分区的RDDvalhasMaxPartitioner:Option[RDD[_]] =if (hasPartitioner.nonEmpty) {
Some(hasPartitioner.maxBy(_.partitions.length))
} else {
None
}
// 得到默认分区数量,如果spark.default.parallelism存在则为他,否则为传入rdd中的最大分区数valdefaultNumPartitions:Int=if (rdd.context.conf.contains("spark.default.parallelism")) {
rdd.context.defaultParallelism
} else {
rdds.map(_.partitions.length).max
}
// If the existing max partitioner is an eligible one, or its partitions number is larger// than the default number of partitions, use the existing partitioner.// 如果存在最大分区器,并且是合格的分区程序,或者默认分区数量小于最大分区器的分区数,则返回最大分区的分区器,否则默认为Hash分区器,并且分区个数为"spark.default.parallelism"if (hasMaxPartitioner.nonEmpty && (isEligiblePartitioner(hasMaxPartitioner.get, rdds) ||
defaultNumPartitions < hasMaxPartitioner.get.getNumPartitions)) {
hasMaxPartitioner.get.partitioner.get
} else {
newHashPartitioner(defaultNumPartitions)
}
}
HashPartitioner
classHashPartitioner(partitions: Int) extendsPartitioner {
require(partitions >=0, s"Number of partitions ($partitions) cannot be negative.")
defnumPartitions:Int= partitions
defgetPartition(key: Any):Int= key match {
// null过多可能会导致数据倾斜casenull=>0// 获取key的hashcode和分区数的非负数取余为分区数case _ =>Utils.nonNegativeMod(key.hashCode, numPartitions)
}
overridedefequals(other: Any):Boolean= other match {
caseh: HashPartitioner=>
h.numPartitions == numPartitions
case _ =>false
}
overridedefhashCode:Int= numPartitions
}
RangePartitioner
/** * A [[org.apache.spark.Partitioner]] that partitions sortable records by range into roughly * equal ranges. The ranges are determined by sampling the content of the RDD passed in. * * @note The actual number of partitions created by the RangePartitioner might not be the same * as the `partitions` parameter, in the case where the number of sampled records is less than * the value of `partitions`.*/
*@param id Unique stage ID 唯一的stage ID*@param rdd RDD that this stage runs on: for a shuffle map stage, it's the RDD we run map tasks
* on, whilefor a result stage, it's the target RDD that we ran an action on
*@param numTasks Total number of tasks in stage; result stages in particular may not need to
* compute all partitions, e.g. for first(), lookup(), and take().
*@param parents List of stages that this stage depends on (through shuffle dependencies). stage依赖
*@param firstJobId ID of the first job this stage was part of, forFIFO scheduling. 第一个job的id作为这个stage的一部分
*@param callSite Location in the user program associated withthisstage: either where the target
*RDD was created, for a shuffle map stage, or where the action for a result stage was called.
*/private[scheduler] abstractclassStage(
valid:Int,
valrdd:RDD[_],
valnumTasks:Int,
valparents:List[Stage],
valfirstJobId:Int,
valcallSite:CallSite)
extendsLogging {
// rdd分区数量valnumPartitions= rdd.partitions.length
/** Set of jobs that this stage belongs to. */// jobId集合valjobIds=newHashSet[Int]
/** The ID to use for the next new attempt for this stage. */// 下次重试idprivatevarnextAttemptId:Int=0// stage namevalname:String= callSite.shortForm
// stage详情valdetails:String= callSite.longForm
/** * 返回最近一次Stage尝试的StageInfo,即返回_latestInfo。 * Pointer to the [[StageInfo]] object for the most recent attempt. This needs to be initialized * here, before any attempts have actually been created, because the DAGScheduler uses this * StageInfo to tell SparkListeners when a job starts (which happens before any stage attempts * have been created).*/privatevar_latestInfo:StageInfo=StageInfo.fromStage(this, nextAttemptId)
/** * 失败的attemptId集合 * Set of stage attempt IDs that have failed. We keep track of these failures in order to avoid * endless retries if a stage keeps failing. * We keep track of each attempt ID that has failed to avoid recording duplicate failures if * multiple tasks from the same stage attempt fail (SPARK-5945).*/valfailedAttemptIds=newHashSet[Int]
makeNewStageAttempt
/** * 通过使用新的attempt ID创建一个新的StageInfo,为这个阶段创建一个新的attempt*//** Creates a new attempt for this stage by creating a new StageInfo with a new attempt ID. */defmakeNewStageAttempt(
numPartitionsToCompute: Int,
taskLocalityPreferences: Seq[Seq[TaskLocation]] =Seq.empty):Unit= {
valmetrics=newTaskMetrics// 注册度量
metrics.register(rdd.sparkContext)
// 得到最后一次访问Stage的StageInfo信息
_latestInfo =StageInfo.fromStage(
this, nextAttemptId, Some(numPartitionsToCompute), metrics, taskLocalityPreferences)
nextAttemptId +=1
}
ResultStage实现
/** * * @paramid Unique stage ID 唯一的stage ID * @paramrdd RDD that this stage runs on: for a shuffle map stage, it's the RDD we run map tasks * on, while for a result stage, it's the target RDD that we ran an action on * @paramfunc 即对RDD的分区进行计算的函数。 * @parampartitions 由RDD的各个分区的索引组成的数组 * @paramparents List of stages that this stage depends on (through shuffle dependencies). stage依赖 * @paramfirstJobId ID of the first job this stage was part of, for FIFO scheduling. 第一个job的id作为这个stage的一部分 * @paramcallSite Location in the user program associated with this stage: either where the target * RDD was created, for a shuffle map stage, or where the action for a result stage was called.*/private[spark] classResultStage(
id: Int,
rdd: RDD[_],
valfunc: (TaskContext, Iterator[_]) => _,
valpartitions:Array[Int],
parents: List[Stage],
firstJobId: Int,
callSite: CallSite)
extendsStage(id, rdd, partitions.length, parents, firstJobId, callSite) {
/** * result stage的活跃job 如果job已经完成将会为空 * The active job for this result stage. Will be empty if the job has already finished * 例如这个任务被取消 * (e.g., because the job was cancelled).*/private[this] var_activeJob:Option[ActiveJob] =None/** * 活跃job * @return*/defactiveJob:Option[ActiveJob] = _activeJob
// 设置活跃jobdefsetActiveJob(job: ActiveJob):Unit= {
_activeJob =Option(job)
}
// 移除当前活跃jobdefremoveActiveJob():Unit= {
_activeJob =None
}
/** * 返回丢失分区id集合的seq * Returns the sequence of partition ids that are missing (i.e. needs to be computed). * * This can only be called when there is an active job.*/overridedeffindMissingPartitions():Seq[Int] = {
// 获取当前活跃jobvaljob= activeJob.get
// 筛选出没有完成的分区
(0 until job.numPartitions).filter(id =>!job.finished(id))
}
overridedeftoString:String="ResultStage "+ id
}
/** * * @paramid Unique stage ID 唯一的stage ID * @paramrdd RDD that this stage runs on: for a shuffle map stage, it's the RDD we run map tasks * on, while for a result stage, it's the target RDD that we ran an action on * @paramnumTasks Total number of tasks in stage; result stages in particular may not need to * compute all partitions, e.g. for first(), lookup(), and take(). * @paramparents List of stages that this stage depends on (through shuffle dependencies). stage依赖 * @paramfirstJobId ID of the first job this stage was part of, for FIFO scheduling. 第一个job的id作为这个stage的一部分 * @paramcallSite Location in the user program associated with this stage: either where the target * RDD was created, for a shuffle map stage, or where the action for a result stage was called. * @paramshuffleDep shuffle依赖 * @parammapOutputTrackerMaster map端输出中间数据追中器Master*/private[spark] classShuffleMapStage(
id: Int,
rdd: RDD[_],
numTasks: Int,
parents: List[Stage],
firstJobId: Int,
callSite: CallSite,
valshuffleDep:ShuffleDependency[_, _, _],
mapOutputTrackerMaster: MapOutputTrackerMaster)
extendsStage(id, rdd, numTasks, parents, firstJobId, callSite) {
// map阶段job集合private[this] var_mapStageJobs:List[ActiveJob] =Nil/** * 暂停的分区集合 * * 要么尚未计算,或者被计算在此后已失去了执行程序,它,所以应该重新计算。 此变量用于由DAGScheduler以确定何时阶段已完成。 在该阶段,无论是积极的尝试或较早尝试这一阶段可能会导致paritition IDS任务成功摆脱pendingPartitions删除。 其结果是,这个变量可以是与在TaskSetManager挂起任务的阶段主动尝试不一致(这里存储分区将始终是分区的一个子集,该TaskSetManager自以为待定)。 * Partitions that either haven't yet been computed, or that were computed on an executor * that has since been lost, so should be re-computed. This variable is used by the * DAGScheduler to determine when a stage has completed. Task successes in both the active * attempt for the stage or in earlier attempts for this stage can cause paritition ids to get * removed from pendingPartitions. As a result, this variable may be inconsistent with the pending * tasks in the TaskSetManager for the active attempt for the stage (the partitions stored here * will always be a subset of the partitions that the TaskSetManager thinks are pending).*/valpendingPartitions=newHashSet[Int]
overridedeftoString:String="ShuffleMapStage "+ id
/** * Returns the list of active jobs, * i.e. map-stage jobs that were submitted to execute this stage independently (if any).*/defmapStageJobs:Seq[ActiveJob] = _mapStageJobs
/** Adds the job to the active job list. */defaddActiveJob(job: ActiveJob):Unit= {
_mapStageJobs = job :: _mapStageJobs
}
/** Removes the job from the active job list. */defremoveActiveJob(job: ActiveJob):Unit= {
_mapStageJobs = _mapStageJobs.filter(_ != job)
}
/** * Number of partitions that have shuffle outputs. * When this reaches [[numPartitions]], this map stage is ready.*/defnumAvailableOutputs:Int= mapOutputTrackerMaster.getNumAvailableOutputs(shuffleDep.shuffleId)
/** * Returns true if the map stage is ready, i.e. all partitions have shuffle outputs.*/defisAvailable:Boolean= numAvailableOutputs == numPartitions
/** Returns the sequence of partition ids that are missing (i.e. needs to be computed). */overridedeffindMissingPartitions():Seq[Int] = {
mapOutputTrackerMaster
// 查询计算完成的分区
.findMissingPartitions(shuffleDep.shuffleId)
.getOrElse(0 until numPartitions)
}
}
StageInfo
classStageInfo(
valstageId:Int,
@deprecated("Use attemptNumber instead", "2.3.0") valattemptId:Int,
valname:String,
valnumTasks:Int, //当前Stage的task数量valrddInfos:Seq[RDDInfo], // rddInfo集合valparentIds:Seq[Int], //父Stage集合valdetails:String,//详细线程栈信息valtaskMetrics:TaskMetrics=null,
private[spark] valtaskLocalityPreferences:Seq[Seq[TaskLocation]] =Seq.empty) {
/** When this stage was submitted from the DAGScheduler to a TaskScheduler. */// DAGScheduler将当前Stage提交给TaskScheduler的时间。varsubmissionTime:Option[Long] =None/** Time when all tasks in the stage completed or when the stage was cancelled. */// 当前Stage中的所有Task完成的时间(即Stage完成的时间)或者Stage被取消的时间。varcompletionTime:Option[Long] =None/** If the stage failed, the reason why. */// 失败的原因varfailureReason:Option[String] =None/** * Terminal values of accumulables updated during this stage, including all the user-defined * accumulators.*/// 存储了所有聚合器计算的最终值。valaccumulables=HashMap[Long, AccumulableInfo]()
defstageFailed(reason: String) {
failureReason =Some(reason)
completionTime =Some(System.currentTimeMillis)
}
defattemptNumber():Int= attemptId
private[spark] defgetStatusString:String= {
if (completionTime.isDefined) {
if (failureReason.isDefined) {
"failed"
} else {
"succeeded"
}
} else {
"running"
}
}
}
private[spark] objectStageInfo {
/** * Construct a StageInfo from a Stage. * * Each Stage is associated with one or many RDDs, with the boundary of a Stage marked by * shuffle dependencies. Therefore, all ancestor RDDs related to this Stage's RDD through a * sequence of narrow dependencies should also be associated with this Stage.*/deffromStage(
stage: Stage,
attemptId: Int,
numTasks: Option[Int] =None,
taskMetrics: TaskMetrics=null,
taskLocalityPreferences: Seq[Seq[TaskLocation]] =Seq.empty
):StageInfo= {
valancestorRddInfos= stage.rdd.getNarrowAncestors.map(RDDInfo.fromRdd)
valrddInfos=Seq(RDDInfo.fromRdd(stage.rdd)) ++ ancestorRddInfos
newStageInfo(
stage.id,
attemptId,
stage.name,
numTasks.getOrElse(stage.numTasks),
rddInfos,
stage.parents.map(_.id),
stage.details,
taskMetrics,
taskLocalityPreferences)
}
}
private[spark] classJobWaiter[T](
dagScheduler: DAGScheduler,
valjobId:Int,
totalTasks: Int, // 全部等待完成Task数量resultHandler: (Int, T) =>Unit)
extendsJobListenerwithLogging {
// 完成的task数量privatevalfinishedTasks=newAtomicInteger(0)
// If the job is finished, this will be its result. In the case of 0 task jobs (e.g. zero// partition RDDs), we set the jobResult directly to JobSucceeded.// jobPromise用来代表Job完成后的结果。如果totalTasks等于零,说明没有Task需要执行,此时jobPromise将被直接设置为Success。privatevaljobPromise:Promise[Unit] =if (totalTasks ==0) Promise.successful(()) elsePromise()
defjobFinished:Boolean= jobPromise.isCompleted
defcompletionFuture:Future[Unit] = jobPromise.future
/** * Sends a signal to the DAGScheduler to cancel the job. The cancellation itself is handled * asynchronously. After the low level scheduler cancels all the tasks belonging to this job, it * will fail this job with a SparkException.*/defcancel() {
dagScheduler.cancelJob(jobId, None)
}
overridedeftaskSucceeded(index: Int, result: Any):Unit= {
// resultHandler call must be synchronized in case resultHandler itself is not thread safe.synchronized {
resultHandler(index, result.asInstanceOf[T])
}
if (finishedTasks.incrementAndGet() == totalTasks) {
jobPromise.success(())
}
}
overridedefjobFailed(exception: Exception):Unit= {
if (!jobPromise.tryFailure(exception)) {
logWarning("Ignore failure", exception)
}
}
}
ActiveJob详解
private[spark] classActiveJob(
valjobId:Int,
valfinalStage:Stage,
valcallSite:CallSite,
vallistener:JobListener,
valproperties:Properties) {
/** * 拿到分区的数量,模式匹配 * Number of partitions we need to compute for this job. Note that result stages may not need * to compute all partitions in their target RDD, for actions like first() and lookup().*/valnumPartitions= finalStage match {
// 最终阶段为最终阶段的分区数量caser: ResultStage=> r.partitions.length
// m的rdd的分区数量casem: ShuffleMapStage=> m.rdd.partitions.length
}
/** Which partitions of the stage have finished */valfinished=Array.fill[Boolean](numPartitions)(false)
varnumFinished=0/** Resets the status of all partitions in this stage so they are marked as not finished. */defresetAllPartitions():Unit= {
(0 until numPartitions).foreach(finished.update(_, false))
numFinished =0
}
}
/** * DAG时间循环处理器,主要处理DAGSchedulerEvent事件 * @paramdagScheduler*/private[scheduler] classDAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler)
extendsEventLoop[DAGSchedulerEvent]("dag-scheduler-event-loop") withLogging {
private[this] valtimer= dagScheduler.metricsSource.messageProcessingTimer
/** * The main event loop of the DAG scheduler. * DAG调度器的主事件循环*/overridedefonReceive(event: DAGSchedulerEvent):Unit= {
// 定时器上下文valtimerContext:Timer.Context= timer.time()
try {
doOnReceive(event)
} finally {
timerContext.stop()
}
}
/** * 处理DAGSchedulerEvent * @paramevent*/privatedefdoOnReceive(event: DAGSchedulerEvent):Unit= event match {
// 模式匹配来处理不同的DAG事件caseJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
caseMapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)
caseStageCancelled(stageId, reason) =>
dagScheduler.handleStageCancellation(stageId, reason)
caseJobCancelled(jobId, reason) =>
dagScheduler.handleJobCancellation(jobId, reason)
caseJobGroupCancelled(groupId) =>
dagScheduler.handleJobGroupCancelled(groupId)
caseAllJobsCancelled=>
dagScheduler.doCancelAllJobs()
caseExecutorAdded(execId, host) =>
dagScheduler.handleExecutorAdded(execId, host)
caseExecutorLost(execId, reason) =>valworkerLost= reason match {
caseSlaveLost(_, true) =>truecase _ =>false
}
dagScheduler.handleExecutorLost(execId, workerLost)
caseWorkerRemoved(workerId, host, message) =>
dagScheduler.handleWorkerRemoved(workerId, host, message)
caseBeginEvent(task, taskInfo) =>
dagScheduler.handleBeginEvent(task, taskInfo)
caseSpeculativeTaskSubmitted(task) =>
dagScheduler.handleSpeculativeTaskSubmitted(task)
caseGettingResultEvent(taskInfo) =>
dagScheduler.handleGetTaskResult(taskInfo)
casecompletion: CompletionEvent=>
dagScheduler.handleTaskCompletion(completion)
caseTaskSetFailed(taskSet, reason, exception) =>
dagScheduler.handleTaskSetFailed(taskSet, reason, exception)
caseResubmitFailedStages=>
dagScheduler.resubmitFailedStages()
}
overridedefonError(e: Throwable):Unit= {
logError("DAGSchedulerEventProcessLoop failed; shutting down SparkContext", e)
try {
dagScheduler.doCancelAllJobs()
} catch {
caset: Throwable=> logError("DAGScheduler failed to cancel all jobs.", t)
}
dagScheduler.sc.stopInNewThread()
}
overridedefonStop():Unit= {
// Cancel any active jobs in postStop hook
dagScheduler.cleanUpAfterSchedulerStop()
}
}
private[spark] objectDAGScheduler {
// The time, in millis, to wait for fetch failure events to stop coming in after one is detected;// this is a simplistic way to avoid resubmitting tasks in the non-fetchable map stage one by one// as more failure events come invalRESUBMIT_TIMEOUT=200// Number of consecutive stage attempts allowed before a stage is abortedvalDEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS=4
}
DAGScheduler的组成
private[spark] classDAGScheduler(
private[scheduler] valsc:SparkContext,
private[scheduler] valtaskScheduler:TaskScheduler,
listenerBus: LiveListenerBus,
mapOutputTracker: MapOutputTrackerMaster,
blockManagerMaster: BlockManagerMaster,
env: SparkEnv,
clock: Clock=newSystemClock())
extendsLogging {
defthis(sc: SparkContext, taskScheduler: TaskScheduler) = {
this(
sc,
taskScheduler,
sc.listenerBus,
sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
sc.env.blockManager.master,
sc.env)
}
defthis(sc: SparkContext) =this(sc, sc.taskScheduler)
private[spark] valmetricsSource:DAGSchedulerSource=newDAGSchedulerSource(this)
// 下一个jobidprivate[scheduler] valnextJobId=newAtomicInteger(0)
// 总job数量private[scheduler] defnumTotalJobs:Int= nextJobId.get()
// 下一个stageidprivatevalnextStageId=newAtomicInteger(0)
// jobid和stageid的映射private[scheduler] valjobIdToStageIds=newHashMap[Int, HashSet[Int]]
// stageid和stage的映射private[scheduler] valstageIdToStage=newHashMap[Int, Stage]
/** * shuffleid和ShuffleMapStage的映射 * Mapping from shuffle dependency ID to the ShuffleMapStage that will generate the data for * that dependency. Only includes stages that are part of currently running job (when the job(s) * that require the shuffle stage complete, the mapping will be removed, and the only record of * the shuffle data will be in the MapOutputTracker).*/private[scheduler] valshuffleIdToMapStage=newHashMap[Int, ShuffleMapStage]
// jobid和activeJob的映射private[scheduler] valjobIdToActiveJob=newHashMap[Int, ActiveJob]
// Stages we need to run whose parents aren't done// 等待中的stageprivate[scheduler] valwaitingStages=newHashSet[Stage]
// Stages we are running right nowprivate[scheduler] valrunningStages=newHashSet[Stage]
// Stages that must be resubmitted due to fetch failuresprivate[scheduler] valfailedStages=newHashSet[Stage]
private[scheduler] valactiveJobs=newHashSet[ActiveJob]
/** * Contains the locations that each RDD's partitions are cached on. This map's keys are RDD ids * and its values are arrays indexed by partition numbers. Each array value is the set of * locations where that RDD partition is cached. * * All accesses to this map should be guarded by synchronizing on it (see SPARK-4454).*///缓存每个RDD的所有分区的位置信息。cacheLocs的数据类型是HashMap[Int, IndexedSeq[Seq[TaskLocation]]],所以每个RDD的分区按照分区号作为索引存储到IndexedSeq。由于RDD的每个分区作为一个Block以及存储体系的复制因素,因此RDD的每个分区的Block可能存在于多个节点的BlockManager上,RDD每个分区的位置信息为TaskLocation的序列。privatevalcacheLocs=newHashMap[Int, IndexedSeq[Seq[TaskLocation]]]
// For tracking failed nodes, we use the MapOutputTracker's epoch number, which is sent with// every task. When we detect a node failing, we note the current epoch number and failed// executor, increment it for new tasks, and use this to ignore stray ShuffleMapTask results.//// TODO: Garbage collect information about failure epochs when we know there are no more// stray messages to detect.privatevalfailedEpoch=newHashMap[String, Long]
private [scheduler] valoutputCommitCoordinator= env.outputCommitCoordinator
// A closure serializer that we reuse.// This is only safe because DAGScheduler runs in a single thread.privatevalclosureSerializer=SparkEnv.get.closureSerializer.newInstance()
/** If enabled, FetchFailed will not cause stage retry, in order to surface the problem. */privatevaldisallowStageRetryForTest= sc.getConf.getBoolean("spark.test.noStageRetry", false)
/** * Whether to unregister all the outputs on the host in condition that we receive a FetchFailure, * this is set default to false, which means, we only unregister the outputs related to the exact * executor(instead of the host) on a FetchFailure.*/private[scheduler] valunRegisterOutputOnHostOnFetchFailure=
sc.getConf.get(config.UNREGISTER_OUTPUT_ON_HOST_ON_FETCH_FAILURE)
/** * Number of consecutive stage attempts allowed before a stage is aborted.*/private[scheduler] valmaxConsecutiveStageAttempts=
sc.getConf.getInt("spark.stage.maxConsecutiveAttempts",
DAGScheduler.DEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS)
/** * Number of max concurrent tasks check failures for each barrier job.*/private[scheduler] valbarrierJobIdToNumTasksCheckFailures=newConcurrentHashMap[Int, Int]
/** * Time in seconds to wait between a max concurrent tasks check failure and the next check.*/privatevaltimeIntervalNumTasksCheck= sc.getConf
.get(config.BARRIER_MAX_CONCURRENT_TASKS_CHECK_INTERVAL)
/** * Max number of max concurrent tasks check failures allowed for a job before fail the job * submission.*/privatevalmaxFailureNumTasksCheck= sc.getConf
.get(config.BARRIER_MAX_CONCURRENT_TASKS_CHECK_MAX_FAILURES)
privatevalmessageScheduler=ThreadUtils.newDaemonSingleThreadScheduledExecutor("dag-scheduler-message")
private[spark] valeventProcessLoop=newDAGSchedulerEventProcessLoop(this)
taskScheduler.setDAGScheduler(this)
DAGScheduler与Job的提交
提交Job
用户提交的Job首先会被转换为一系列RDD,然后才交给DAGScheduler进行处理。
defrunJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) =>U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) =>Unit,
properties: Properties):Unit= {
valstart=System.nanoTime
// 提交Job,得到一个JobWaiter,包含Job的相关信息valwaiter:JobWaiter[U] = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
// 等待job完成ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)
// job执行成功
waiter.completionFuture.value.get match {
case scala.util.Success(_) =>
logInfo("Job %d finished: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) /1e9))
// 执行失败case scala.util.Failure(exception) =>
logInfo("Job %d failed: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) /1e9))
// SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.valcallerStackTrace=Thread.currentThread().getStackTrace.tail
exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
throw exception
}
}
//submitJobdefsubmitJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) =>U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) =>Unit,
properties: Properties):JobWaiter[U] = {
// Check to make sure we are not launching a task on a partition that does not exist.// 拿到rdd的分区数量,最大分区数valmaxPartitions:Int= rdd.partitions.length
// 校验分区集合中的分区信息
partitions.find(p => p >= maxPartitions || p <0).foreach { p =>thrownewIllegalArgumentException(
"Attempting to access a non-existent partition: "+ p +". "+"Total number of partitions: "+ maxPartitions)
}
// 拿到jobIdvaljobId:Int= nextJobId.getAndIncrement()
// 如果分区数量为0if (partitions.size ==0) {
// Return immediately if the job is running 0 tasks// 这个任务运行0个task,返回这个JobWaiterreturnnewJobWaiter[U](this, jobId, 0, resultHandler)
}
assert(partitions.size >0)
// 转换funcvalfunc2= func.asInstanceOf[(TaskContext, Iterator[_]) => _]
// 创建JobWaitervalwaiter=newJobWaiter(this, jobId, partitions.size, resultHandler)
// 将JobSubmitted放入event队列,actor模式
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, callSite, waiter,
SerializationUtils.clone(properties)))
waiter
}
处理提交的Job
handleJobSubmitted
private[scheduler] defhandleJobSubmitted(jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties) {
// 创建finalStagevarfinalStage:ResultStage=nulltry {
// New stage creation may throw an exception if, for example, jobs are run on a// HadoopRDD whose underlying HDFS files have been deleted.// 创建ResultStage阶段,如果存在shuffleDependency则创建ShuffleMapStage
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
} catch {
casee: BarrierJobSlotsNumberCheckFailed=>
logWarning(s"The job $jobId requires to run a barrier stage that requires more slots "+"than the total number of slots in the cluster currently.")
// If jobId doesn't exist in the map, Scala coverts its value null to 0: Int automatically.valnumCheckFailures:Int= barrierJobIdToNumTasksCheckFailures.compute(jobId,
newBiFunction[Int, Int, Int] {
overridedefapply(key: Int, value: Int):Int= value +1
})
if (numCheckFailures <= maxFailureNumTasksCheck) {
// 重试机制
messageScheduler.schedule(
newRunnable {
overridedefrun():Unit= eventProcessLoop.post(JobSubmitted(jobId, finalRDD, func,
partitions, callSite, listener, properties))
},
timeIntervalNumTasksCheck,
TimeUnit.SECONDS
)
return
} else {
// Job failed, clear internal data.
barrierJobIdToNumTasksCheckFailures.remove(jobId)
listener.jobFailed(e)
return
}
casee: Exception=>
logWarning("Creating new stage failed due to exception - job: "+ jobId, e)
listener.jobFailed(e)
return
}
// Job submitted, clear internal data.
barrierJobIdToNumTasksCheckFailures.remove(jobId)
// 创建一个ActiveJobvaljob=newActiveJob(jobId, finalStage, callSite, listener, properties)
// 清空task位置缓存
clearCacheLocs()
logInfo("Got job %s (%s) with %d output partitions".format(
job.jobId, callSite.shortForm, partitions.length))
logInfo("Final stage: "+ finalStage +" ("+ finalStage.name +")")
logInfo("Parents of final stage: "+ finalStage.parents)
logInfo("Missing parents: "+ getMissingParentStages(finalStage))
// 得到任务提交的时间valjobSubmissionTime:Long= clock.getTimeMillis()
// 提交的任务放置jobIdToActiveJob集合中
jobIdToActiveJob(jobId) = job
// 放置activeJobs
activeJobs += job
// 向finalStage设置setActiveJob
finalStage.setActiveJob(job)
valstageIds= jobIdToStageIds(jobId).toArray
// 获取stage详情valstageInfos= stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
listenerBus.post(
// 监听任务开始SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
// 提交Stage
submitStage(finalStage)
}
privatedefgetOrCreateShuffleMapStage(
shuffleDep: ShuffleDependency[_, _, _],
firstJobId: Int):ShuffleMapStage= {
// 根据shuffle依赖id查找对应ShuffleMapStage,如果相同则直接返回,否则直接进行创建
shuffleIdToMapStage.get(shuffleDep.shuffleId) match {
caseSome(stage) =>
stage
caseNone=>// Create stages for all missing ancestor shuffle dependencies.// 创建shuffleStage
getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>// Even though getMissingAncestorShuffleDependencies only returns shuffle dependencies// that were not already in shuffleIdToMapStage, it's possible that by the time we// get to a particular dependency in the foreach loop, it's been added to// shuffleIdToMapStage by the stage creation process for an earlier dependency. See// SPARK-13902 for more information.if (!shuffleIdToMapStage.contains(dep.shuffleId)) {
createShuffleMapStage(dep, firstJobId)
}
}
// Finally, create a stage for the given shuffle dependency.
createShuffleMapStage(shuffleDep, firstJobId)
}
}
createShuffleMapStage
defcreateShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int):ShuffleMapStage= {
valrdd= shuffleDep.rdd
checkBarrierStageWithDynamicAllocation(rdd)
checkBarrierStageWithNumSlots(rdd)
checkBarrierStageWithRDDChainPattern(rdd, rdd.getNumPartitions)
valnumTasks= rdd.partitions.length
// 得到或创建父Stagevalparents= getOrCreateParentStages(rdd, jobId)
valid= nextStageId.getAndIncrement()
valstage=newShuffleMapStage(
id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep, mapOutputTracker)
stageIdToStage(id) = stage
shuffleIdToMapStage(shuffleDep.shuffleId) = stage
updateJobIdStageIdMaps(jobId, stage)
if (!mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
// Kind of ugly: need to register RDDs with the cache and map output tracker here// since we can't do it in the RDD constructor because # of partitions is unknown
logInfo(s"Registering RDD ${rdd.id} (${rdd.getCreationSite}) as input to "+s"shuffle ${shuffleDep.shuffleId}")
mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length)
}
stage
}