当前位置 博文首页 > colorant的专栏:spark internal - 作业调度

    colorant的专栏:spark internal - 作业调度

    作者:[db:作者] 时间:2021-08-27 16:13

    作者:刘旭晖 Raymond 转载请注明出处

    Email:colorant at 163.com

    BLOG:http://blog.csdn.net/colorant/


    Spark中作业调度的相关类最重要的就是DAGSchedulerDAGScheduler顾名思义就是基于DAG图的Scheduler


    DAG全称 Directed Acyclic Graph,有向无环图。简单的来说,就是一个由顶点和有方向性的边构成的图中,从任意一个顶点出发,没有任何一条路径会将其带回到出发的顶点。

    ?

    在作业调度系统中,调度的基础就在于判断多个作业任务的依赖关系,这些任务之间可能存在多重的依赖关系,也就是说有些任务必须先获得执行,然后另外的相关依赖任务才能执行,但是任务之间显然不应该出现任何直接或间接的循环依赖关系,所以本质上这种关系适合用DAG有向无环图来表示。

    ?

    概括地描述DAGSchedulerTaskScheduler(关于TaskScheduler的相关细节,在我之前的关于Spark运行模式的文章中有)的功能划分就是:TaskScheduler负责实际每个具体任务的物理调度,DAGScheduler负责将作业拆分成不同阶段的具有依赖关系的多批任务,可以理解为DAGScheduler负责任务的逻辑调度。

    ?

    ?

    基本概念

    ?

    Task任务 :单个分区数据集上的最小处理流程单元

    TaskSet任务集:一组关联的,但是互相之间没有Shuffle依赖关系的任务所组成的任务集

    Stage调度阶段:一个任务集所对应的调度阶段

    Job作业:一次RDD Action生成的一个或多个Stage所组成的一次计算作业

    ?

    ?

    运行方式

    ?

    DAGSchedulerSparkContext初始化过程中实例化,一个SparkContext对应一个DAGSchedulerDAGScheduler的事件循环逻辑基于Akka Actor的消息传递机制来构建,在DAGSchedulerStart函数中创建了一个eventProcessActor用来处理各种DAGSchedulerEvent,这些事件包括作业的提交,任务状态的变化,监控等等


    private[scheduler]case class JobSubmitted(
        jobId: Int,
        finalRDD: RDD[_],
        func: (TaskContext, Iterator[_]) => _,
        partitions: Array[Int],
        allowLocal: Boolean,
        callSite: String,
        listener: JobListener,
        properties: Properties = null)
      extends DAGSchedulerEvent
     
    private[scheduler]case class JobCancelled(jobId: Int) extends DAGSchedulerEvent
    private[scheduler]case class JobGroupCancelled(groupId: String) extends DAGSchedulerEvent
    private[scheduler]case object AllJobsCancelled extends DAGSchedulerEvent
    private[scheduler]
    case classBeginEvent(task: Task[_], taskInfo: TaskInfo) extends DAGSchedulerEvent
     
    private[scheduler]
    case classGettingResultEvent(task: Task[_], taskInfo: TaskInfo) extends DAGSchedulerEvent
     
    private[scheduler]case class CompletionEvent(
        task: Task[_],
        reason: TaskEndReason,
        result: Any,
        accumUpdates: Map[Long, Any],
        taskInfo: TaskInfo,
        taskMetrics: TaskMetrics)
      extends DAGSchedulerEvent
     
    private[scheduler]case class ExecutorAdded(execId: String, host: String) extendsDAGSchedulerEvent
    private[scheduler]case class ExecutorLost(execId: String) extends DAGSchedulerEvent
    private[scheduler]  caseclass TaskSetFailed(taskSet: TaskSet, reason: String) extends DAGSchedulerEvent
    private[scheduler]case object ResubmitFailedStages extends DAGSchedulerEvent
    private[scheduler]case object StopDAGScheduler extends DAGSchedulerEvent

    ?

    不论是Client还是TaskSchedulerDAGScheduler的交互方式基本上都是通过DAGScheduler暴露的函数接口间接的给eventProcessActor发送相关消息

    ?

    如前面所说,DAGScheduler最重要的任务之一就是计算作业和任务的依赖关系,制定调度逻辑

    ?

    DAGScheduler作业调度的两个主要入口是submitJob runJob,两者的区别在于前者返回一个Jobwaiter对象,可以用在异步调用中,用来判断作业完成或者取消作业,runJob在内部调用submitJob,阻塞等待直到作业完成(或失败)

    ?

    具体往DAGScheduler提交作业的操作,基本都是封装在RDD的相关Action操作里面,不需要用户显式的提交作业

    ?

    用户代码都是基于RDD的一系列计算操作,实际运行时,这些计算操作是Lazy执行的,并不是所有的RDD操作都会触发SparkCluster上提交实际作业,基本上只有一些需要返回数据或者向外部输出的操作才会触发实际计算工作,其它的变换操作基本上只是生成对应的RDD记录依赖关系。

    ?

    DAGScheduler内部维护了各种 task / stage / job之间的映射关系表

    ?

    工作流程

    ?

    提交并运行一个Job的基本流程,包括以下步骤

    ?

    划分Stage

    ?

    当某个操作触发计算,向DAGScheduler提交作业时,DAGScheduler需要从RDD依赖链最末端的RDD出发,遍历整个RDD依赖链,划分Stage任务阶段,并决定各个Stage之间的依赖关系。Stage的划分是以ShuffleDependency为依据的,也就是说当某个RDD的运算需要将数据进行Shuffle时,这个包含了Shuffle依赖关系的RDD将被用来作为输入信息,构建一个新的Stage,由此为依据划分Stage,可以确保有依赖关系的数据能够按照正确的顺序得到处理和运算。

    ?

    GroupByKey操作为例,该操作返回的结果实际上是一个ShuffleRDD,当DAGScheduler遍历到这个ShuffleRDD的时候,因为其Dependency是一个ShuffleDependency,于是这个ShuffleRDD的父RDD以及shuffleDependency等对象就被用来构建一个新的Stage,这个Stage的输出结果的分区方式,则由ShuffleDependency中的Partitioner对象来决定。

    ?

    可以看到,尽管划分和构建Stage的依据是ShuffleDependency,对应的RDD也就是这里的ShuffleRDD,但是这个Stage所处理的数据是从这个shuffleRDD的父RDD开始计算的,只是最终的输出结果的位置信息参考了ShuffleRDD返回的ShuffleDependency里所包含的内容。而shuffleRDD本身的运算操作(其实就是一个获取shuffle结果的过程),是在下一个Stage里进行的。

    ?

    生成Job,提交Stage

    ?

    上一个步骤得到一个或多个有依赖关系的Stage,其中直接触发JobRDD所关联的Stage作为FinalStage生成一个Job实例,这两者的关系进一步存储在resultStageToJob映射表中,用于在该Stage全部完成时做一些后续处理,如报告状态,清理Job相关数据等。

    ?

    具体提交一个Stage时,首先判断该Stage所依赖的父Stage的结果是否可用,如果所有父Stage的结果都可用,则提交该Stage,如果有任何一个父Stage的结果不可用,则迭代尝试提交父Stage 所有迭代过程中由于所依赖Stage的结果不可用而没有提交成功的Stage都被放到waitingStages列表中等待将来被提交

    ?

    什么时候waitingStages中的Stage会被重新提交呢,当一个属于中间过程Stage的任务(这种类型的任务所对应的类为ShuffleMapTask)完成以后,DAGScheduler会检查对应的Stage的所有任务是否都完成了,如果是都完成了,则DAGScheduler将重新扫描一次waitingStages中的所有Stage,检查他们是否还有任何依赖的Stage没有完成,如果没有就可以提交该Stage

    ?

    此外每当完成一次DAGScheduler的事件循环以后,也会触发一次从等待和失败列表中扫描并提交就绪Stage的调用过程

    ?

    任务集的提交

    ?

    每个Stage的提交,最终是转换成一个TaskSet任务集的提交,DAGScheduler通过TaskScheduler接口提交TaskSet,这个TaskSet最终会触发TaskScheduler构建一个TaskSetManager的实例来管理这个TaskSet的生命周期,对于DAGScheduler来说提交Stage的工作到此就完成了。而TaskScheduler的具体实现则会在得到计算资源的时候,进一步通过TaskSetManager调度具体的Task到对应的Executor节点上进行运算

    ?

    任务作业完成状态的监控

    ?

    要保证相互依赖的job/stage能够得到顺利的调度执行,DAGScheduler就必然需要监控当前Job / Stage乃至Task的完成情况。这是通过对外(主要是对TaskScheduler)暴露一系列的回调函数来实现的,对于TaskScheduler来说,这些回调函数主要包括任务的开始结束失败,任务集的失败,DAGScheduler根据这些Task的生命周期信息进一步维护JobStage的状态信息。

    ?

    此外TaskScheduler还可以通过回调函数通知DAGScheduler具体的Executor的生命状态,如果某一个Executor崩溃了,或者由于任何原因与Driver失去联系了,则对应的