ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Spark内核_05

2021-11-27 16:01:30  阅读:148  来源: 互联网

标签:Task 05 Driver 调度 内存 Executor Spark 内核


目录

1.Spark内核

1.1核心组件

Driver在Spark作业执行时主要负责:

  1. 将用户程序转化为作业(Job);
  2. 在Executor之间调度任务(Task);
  3. 跟踪Executor的执行情况;
  4. 通过UI展示查询运行情况;

Executor对象是负责在Spark作业中运行具体任务

  1. 负责运行组成Spark应用的任务
  2. 要求缓存的 RDD 提供内存式存储

1.2Spark通用运行流程概述

  1. 任务提交后,都会先启动Driver程序;
  2. 随后Driver向集群管理器注册应用程序;
  3. 之后集群管理器根据此任务的配置文件分配Executor并启动;
  4. Driver开始执行main函数,Spark查询为懒执行,当执行到Action算子时开始反向推算,根据宽依赖进行Stage的划分,随后每一个Stage对应一个Taskset,
    Taskset中有多个Task,查找可用资源Executor进行调度;
  5. 根据本地化原则,Task会被分发到指定的Executor去执行,在任务执行的过程中,Executor也会不断与Driver进行通信,报告任务运行情况。

1.3Standalone模式

Standalone集群有2个重要组成部分,分别是:

1) Master(RM):是一个进程,主要负责资源的调度和分配,并进行集群的监控等职责;

2) Worker(NM):是一个进程,一个Worker运行在集群中的一台服务器上,主要负责两个职责,一个是用自己的内存存储RDD的某个或某些partition;
另一个是启动其他进程和线程(Executor),对RDD上的partition进行并行的处理和计算。

  1. 在Standalone Cluster模式下,任务提交后,Master会找到一个Worker启动Driver。
  2. Driver启动后向Master注册应用程序,Master在Worker启动Executor
  3. Worker上的Executor启动后会向Driver反向注册,
  4. 所有的Executor注册完成后,Driver开始执行main函数,之后执行到Action算子时,开始划分Stage,每个Stage生成对应的taskSet,之后将Task分发到各个Executor上执行。

在Standalone Client模式下,Driver在任务提交的本地机器上运行

1.4YARN调度

资源调度和分配交给了YARN来处理

YARN-CLENT

YARN-CLUSTER

2.Spark通讯架构

资料来源尚硅谷

相关知识

  1. BIO(Blocking I/O):阻塞式IO

    假设去饭店吃饭:老板在给前面先来的人做饭,自己就找个位置坐下等着

  2. NIO(New I/O):非阻塞式IO

    老板在给前面先来的人做饭,自己去干别的事情,过一段时间来询问老板饭是否做好。干别的事情不安宁,总要记着这个事情。

  3. AIO(Asynchronous I/O):异步非阻塞式IO

​ 老板在给前面先来的人做饭,和老板约定好什么时候给我饭,专心干别的事情

Spark基于Netty通信

Driver于Exceutor通信的方式,发件箱和收件箱,发件箱与服务通信

3. Spark任务调度机制

Driver线程主要是初始化SparkContext对象,准备运行所需的上下文

  1. 一方面保持与ApplicationMaster的RPC连接,通过ApplicationMaster申请资源,

  2. 另一方面调度任务,将任务下发到Executor上。

资源调度与任务分配

  1. 当ResourceManager向ApplicationMaster返回Container资源时
  2. ApplicationMaster就尝试在对应的Container上启动Executor进程,
  3. Executor进程起来后,会向Driver反向注册,
  4. 注册成功后保持与Driver的心跳,同时等待Driver分发任务,当分发的任务执行完毕后,将任务状态上报给Driver。

3.1Spark任务调度概述

Job、Stage以及Task

  1. 遇到一个Action方法则触发一个Job
  2. Stage以RDD宽依赖(即Shuffle)为界,遇到Shuffle做一次划分
  3. 一个Stage对应一个TaskSet

Spark RDD通过其Transactions操作,形成了RDD血缘(依赖)关系图,即DAG,最后通过Action的调用,触发Job并调度执行,
执行过程中会创建两个调度器:DAGScheduler和TaskScheduler。

  1. DAGScheduler负责Stage级的调度,主要是将job切分成若干Stages,并将每个Stage打包成TaskSet交给TaskScheduler调度。
  2. TaskScheduler负责Task级的调度,将DAGScheduler给过来的TaskSet,分发到Executor上执行

Driver初始化SparkContext过程中,会分别初始化DAGScheduler、TaskScheduler

3.2 Spark Stage级调度

当遇到一个Action操作后就会触发一个Job的计算,并交给DAGScheduler来提交,根据DAG进行切分,将一个Job划分为若干Stages

  1. 划分的Stages分两类,一类叫做ResultStage,为DAG最下游的Stage,由Action方法决定,
  2. 一类叫做ShuffleMapStage,为下游Stage准备数据

错误重试:
只有Executor丢失或者Task由于Fetch失败才需要重新提交失败的Stage以调度运行失败的任务,其他类型的Task失败会在TaskScheduler的调度过程中重试。

3.3Spark Task级调度

TaskScheduler会将TaskSet封装为TaskSetManager

TaskSetManager负责监控管理同一个Stage中的Tasks,TaskScheduler就是以TaskSetManager为单元来调度任务。

3.3.1调度策略

一种是FIFO:将TaskSetManager按照先来先到的方式入队,出队时直接拿出最先进队的TaskSetManager

一种是FAIR:TaskSetMagager进行排序,要排序的TaskSetMagager对象包含三个属性: runningTasks值(正在运行的Task数)、minShare值、weight值,
综合考量三值进行排序

TaskSetManager封装了一个Stage的所有Task,并负责管理调度这些Task。

Spark调度总是会尽量让每个task以最高的本地性级别来启动

  • 同一个Executor
  • 同一个节点
  • 同一个机架的两个节点上

3.2 失败重试

  1. Task被提交到Executor启动执行后,
  2. Executor会将执行状态上报给SchedulerBackend,
  3. SchedulerBackend则告诉TaskScheduler,TaskScheduler
  4. 到该Task对应的TaskSetManager,并通知到该TaskSetManager

TaskSetManager就知道Task的失败与成功状态,对于失败的Task,会记录它失败的次数

在记录Task失败次数过程中,会记录它上一次失败所在的Executor Id和Host,这样下次再调度这个Task时,会使用黑名单机制,避免它被调度到上一次失败的节点上

4. Spark Shuffle解析

1.ShuffleMapStage的结束伴随着shuffle文件的写磁盘。
2.ResultStage对应action算子,即将一个函数应用在RDD的各个partition的数据集上,意味着一个job的运行结束

4.1HashShuffle

1.未优化的

Task 开始那边各自进行 Hash 计算,每个task得到3个分类

2.优化的

启用合并机制,合并机制就是复用buffer

在同一个进程中,无论是有多少过Task,都会把同样的Key放在同一个Buffer里

然后把Buffer中的数据写入以Core数量为单位的本地文件中,(一个Core只有一种类型的Key的数据)

4.2SortShuffle

1.普通SortShuffle

  • 在溢写磁盘前,先根据key进行排序,排序过后的数据,会分批写入到磁盘文件中
  • 也就是说一个Task过程会产生多个临时文件
  • 最后在每个Task中,将所有的临时文件合并,这就是merge过程
  • 此过程将所有临时文件读取出来,一次写入到最终文件

意味着一个Task的所有数据都在这一个文件中。同时单独写一份索引文件,标识下游各个Task的数据在文件中的索引,start offset和end offset。

2.bypass SortShuffle

触发条件

  • shuffle reduce task数量小于触发参数的阈值
  • 不是聚合类的shuffle算子

此时task会为每个reduce端的task都创建一个临时磁盘文件

该过程的磁盘写机制其实跟未经优化的HashShuffleManager是一模一样的,因为都要创建数量惊人的磁盘文件,只是在最后会做一个磁盘文件的合并而已

而该机制与普通SortShuffleManager运行机制的不同在于:不会进行排序,shuffle write过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销

5.Spark内存管理

5.1堆内内存和堆外内存

堆内内存受到JVM统一管理,

堆外内存是直接向操作系统进行内存的申请和释放。

Spark对堆内内存的规划

  • 这些任务在缓存 RDD 数据和广播(Broadcast)数据时占用的内存被规划为存储(Storage)内存,

  • 这些任务在执行 Shuffle 时占用的内存被规划为执行(Execution)内存

  • 剩余的部分不做特殊规划


Spark对堆内内存的管理是一种逻辑上的”规划式”的管理,对象实例占用内存的申请和释放都由JVM完成

Spark只能在申请后和释放前记录这些内存。

​ 申请内存流程如下:

  • Spark 在代码中 new 一个对象实例;

  • JVM 从堆内内存分配空间,创建对象并返回对象引用;

  • Spark 保存该对象的引用,记录该对象占用的内存。

    释放内存流程如下:

  • Spark记录该对象释放的内存,删除该对象的引用;

  • 等待JVM的垃圾回收机制释放该对象占用的堆内内存。

Spark 通过对存储内存和执行内存各自独立的规划管理,可以决定是否要在存储内存里缓存新的 RDD,以及是否为新的任务分配执行内存,在一定程度上
可以提升内存的利用率,减少异常的出现。

堆外内存是直接向操作系统进行内存的申请和释放。

为了进一步优化内存的使用以及提高Shuffle时排序的效率,Spark引入了堆外(Off-heap)内存,使之可以直接在工作节点的系统内存中开辟空间

5.2统一内存管理

存储内存和执行内存共享同一块空间,可以动态占用对方的空闲区域

双方的空间都不足时,则存储到硬盘;若己方空间不足而对方空余时,可借用对方的空间

Task在启动之初读取一个分区时,会先判断这个分区是否已经被持久化,如果没有则需要检查Checkpoint 或按照血统重新计算

Driver端的Master负责整个Spark应用程序的Block的元数据信息的管理和维护,而Executor端的Slave需要将Block的更新等状态上报到Master,
同时接收Master 的命令,例如新增或删除一个RDD。

RDD 在缓存到存储内存之后,Partition 被转换成Block,Record在堆内或堆外存储内存中占用一块连续的空间。将Partition由不连续的存储空间
转换为连续存储空间的过程,Spark称之为"展开"(Unroll)。

Spark的存储内存和执行内存有着截然不同的管理方式:

  • 对于存储内存来说,Spark用一个LinkedHashMap来集中管理所有的Block,Block由需要缓存的 RDD的Partition转化而成;

  • 而对于执行内存,Spark用AppendOnlyMap来存储 Shuffle过程中的数据,在Tungsten排序中甚至抽象成为页式内存管理,开辟了全新的
    JVM内存管理机制。

2021.11.27 15:43

标签:Task,05,Driver,调度,内存,Executor,Spark,内核
来源: https://www.cnblogs.com/wrcc/p/15612143.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有