当前位置 博文首页 > colorant的专栏:Spark internal - 多样化的运行模式 (下)

    colorant的专栏:Spark internal - 多样化的运行模式 (下)

    作者:[db:作者] 时间:2021-08-27 16:14

    作者:刘旭晖 Raymond 转载请注明出处

    Email:colorant at 163.com

    BLOG:http://blog.csdn.net/colorant/


    上一篇中介绍了Spark的各种运行模式的基本流程和相关实现,这里主要分析一下各种运行模式中涉及到的一些细节问题的流程和实现

    ?

    Spark的各种运行模式虽然启动方式,运行位置,调度手段有所不同,但它们所要完成的任务基本都是一致的,就是在合适的位置安全可靠的根据用户的配置和Job的需要管理和运行Task,这里粗略的列举一下在运行调度过程中各种需要考虑的问题

    ?

    • 环境变量的传递
    • Jar包和各种依赖文件的分发
    • Task的管理和序列化等
    • 用户参数配置
    • 用户及权限控制

    ?

    ?

    环境变量的传递

    ?

    Spark的运行参数有很大一部分是通过环境变量来设置的,例如Executor的内存设置,Library路径等等。Local模式当然不存在环境变量的传递问题,在Cluster模式下,就需要将环境变量传递到远端JVM环境中去

    ?

    SparkContext在初始化过程中 需要传递给Executor的环境变量,会在executorEnvs变量中(HashMap)中收集起来

    ?

    而具体如何将这些变量设置到Executor的环境中,取决于ExecutorLaunch方式

    ?

    Spark Standalone模式中,这些变量被封装在org.apache.spark.deploy.Command中,交给AppClient启动远程ExecutorCommand经由Spark Master通过Actor再次转发给合适的WorkerWorker通过ExecutorRunner构建Java.lang.Process运行ExecutorBackend,环境变量在ExecutorRunner中传递给java.lang.ProcessBuilder.environment完成整个传递过程

    ?

    Mesos相关模式中,这些环境变量被设置到org.apache.mesos.Protos.Environment中,在通过MesosLaunch Task时交给Mesos完成分发工作

    ?

    yarn-standalone模式中,这些环境变量首先要通过Yarn Client 设置到Spark AM的运行环境中,基本就是Client类运行环境中以SPARK开头的环境变量全部设置到ContainerLaunchContext中,AM通过WorkerRunnable进一步将它们设置到运行Executor所用的ContainerLaunchContext

    ?

    Yarn-client模式与yarn-standalone模式大致相同,虽然SparkContext运行在本地,executor所需的环境变量还是通过ContainerLaunchContextAM中转发给Executor

    ?

    可以注意到,在Yarn相关模式中,并没有使用到SparkContext收集的executorEnvs,主要是因为Yarn Standalone模式下Sparkcontext本身就是在远程运行的,因此在Yarn Client中单独实现了相关代码

    ?

    Jar包和各种依赖文件的分发

    ?

    Spark程序的运行依赖大致分两类, 一是Spark runtime及其依赖,二是应用程序自身的额外依赖


    对于Local模式而言,不存在Jar包分发的问题


    对于第一类依赖

    ?

    Spark Standalone模式中,整个环境随Spark部署到各个节点中,因此也不存在runtime Jar包分发的问题

    ?

    Mesos相关模式下,Mesos本身需要部署到各个节点,SparkRuntime可以和Standalone模式一样部署到各个节点中,也可以上传到Mesos可以读取的地方比如HDFS上,然后通过配置spark.executor.uri通知Mesos相关的SchedulerBackend,它们会将该URL传递给MesosMesosLaunch任务时会从指定位置获取相关文件

    ?

    Spark 应用程序所额外依赖的文件,在上述模式中可以通过参数将URL传递给SparkContext,对于本地文件SparkContext将启动一个HttpServer用于其它节点读取相关文件,其它如HDFS和外部HTTP等地址上的文件则原封不动,然后这些额外依赖文件的URLTaskSetmanager中和Task本身一起被序列化后发送给ExecutorExecutor再反序列化得到URL并传递给ExecutorURLClassLoader使用

    ?

    Yarn相关模式中,Runtime和程序运行所依赖的文件首先通过HDFS Client API上传到Job.sparkStaging目录下,然后将对应的文件和URL映射关系通过containerLaunchContext.setLocalResources函数通知YarnYarnNodeManagerLaunch container的时候会从指定URL处下载相关文件作为运行环境的一部分。上面的步骤对于Spark AM来说是充分的,而对于需要进一步分发到Executor的运行环境中的文件来说,AM还需要在创建ExecutorContainer的时候同样调用setLocalResources函数,AM是如何获得对应的文件和URL列表的呢,其实就是SparkYarn Client将这些文件的相关属性如URL,时间戳,尺寸等信息打包成字符串,通过特定的环境变量(SPARK_YARN_CACHE_XXX )传递给AMAM再把它们从环境变量中还原成所需文件列表

    ?

    ?

    Task管理和序列化

    ?

    Task的运行要解决的问题不外乎就是如何以正确的顺序,有效地管理和分派任务,如何将Task及运行所需相关数据有效地发送到远端,以及收集运行结果

    ?

    Task的派发源起于DAGScheduler调用TaskScheduler.submitTasks将一个Stage相关的一组Task一起提交调度。

    ?

    TaskSchedulerImpl中,这一组Task被交给一个新的TaskSetManager实例进行管理,所有的TaskSetManager经由SchedulableBuilder根据特定的调度策略进行排序,在TaskSchedulerImplresourceOffers函数中,当前被选择的TaskSetManagerResourceOffer函数被调用并返回包含了序列化任务数据的TaskDescription,最后这些TaskDescription再由SchedulerBackend派发到ExecutorBackend去执行

    ?

    系列化的过程中,上一节中所述App依赖文件相关属性URL等通过DataOutPutStream写出,而Task本身通过可配置的Serializer来序列化,当前可配制的Serializer包括如JavaSerializer KryoSerializer

    ?

    Task的运行结果在Executor端被序列化并发送回SchedulerBackend,由于受到Akka Frame Size尺寸的限制,如果运行结果数据过大,结果会存储到BlockManager中,这时候发送到SchedulerBackend的是对应数据的BlockIDTaskScheduler最终会调用TaskResultGetter在线程池中以异步的方式读取结果,TaskSetManager再根据运行结果更新任务状态(比如失败重试等)并汇报给DAGScheduler

    ?

    ?

    用户参数配置

    ?

    Spark的用户参数配置途径很多,除了环境变量以外,可以通过Spark.conf文件设置,也可以通过修改系统属性设置 "spark.*"

    ?

    而这些配置参数的使用环境也很多样化,有些在Sparkcontext本地使用(除了yarn-standalone模式),有些需要分发到Cluster集群中去

    ?

    SparkContext中解析和使用,比如spark.masterspark.app.names, spark.jars等等,通常用于配置SparkContext运行参数,创建Executor启动环境等

    ?

    发送给Executor的参数又分两部分

    ?

    一部分在ExecutorBackend初始化过程中需要使用的系统变量,会通过SparkContext在初始化过程中读取并设置到环境变量中去,在通过前面所述的方式,使用对应的底层资源调度系统设置到运行容器的环境变量中

    ?