title: Spark内核调度
id: 5bd794b9-a8da-496c-9c1e-e44a2dd4a777
date: 2024-10-30 17:12:47
auther: chenxin
cover:
excerpt: DAG DAG概念 DAG:有向无环图 有向:有方向 无环:没有闭环 DAG有方向没有形成闭环的一个执行流程图,用于标记代码逻辑执行流程 Job和Action 行动算子作为触发开关,会将行动算子之前的一串rdd依赖链条执行起来。
permalink: /archives/sparknei-he-diao-du
categories:
- spark
- bigdata
tags:
DAG
DAG概念
DAG:有向无环图
有向:有方向
无环:没有闭环
DAG:有方向没有形成闭环的一个执行流程图,用于标记代码逻辑执行流程
Job和Action
行动算子作为触发开关,会将行动算子之前的一串rdd依赖链条执行起来。
一个行动算子会产生一个Job(一个应用程序的子任务),每一个Job都有各自自己的DAG图,上图可以看作两个DAG图。
1个ACTION=1个DAG=1个JOB
多Job的情况:如果一段代码中包含三个Action,则会产生三个Job,每个Job拥有自己的DAG图。
Spark应用程序(Application):在Spark中,运行一个完整的代码称为一个Application,其中可能包含多个Job。每个Job由一个Action触发,包含一个DAG。
DAG和分区
代码没有运行时,只能得到基础的DAG,运行后,Spark会在内部规划出带有分区关系的DAG图。
DAG的宽窄依赖和阶段划分
宽窄依赖
窄依赖:父RDD的一个分区,全部将数据发给子RDD的一个分区
宽依赖:父RDD的一个分区,将数据发送给子RDD的多个分区,宽依赖也叫shuffle
左图为窄依赖,右图为宽依赖
阶段划分
DAG图从后向前,遇到宽依赖就划分出一个阶段,称为stage。
在stage的内部一定都是窄依赖。
基于宽依赖,将DAG划分为了两个stage
在Spark中,一个应用程序可能包含多个Job,一个Job的DAG图会包含多个Stage,每个Stage包含多个Task。
Task是实际执行的最小单位。
内存迭代计算
个人理解
因为每一个阶段内都是窄依赖,所以Task1和Task2所在的线条互不依赖,因此三条线运行互不影响,相互独立。
上图中,RDD1分区1到RDD2分区1和RDD2分区1到RDD3分区1之间都可以用一个线程解决,但线程运行在Executor中,若这些线程不在同一个Executor中,那么数据
就要通过网络交互,会造成性能浪费。
Task1所在的线条,若只用一个线程解决,则RDD1分区1和RDD2分区1之间不需要网络IO,可以直接通过内存交互,那么Task1这条线就叫做内存计算管道
(PipeLine),则上图作业就可以由三个线程并行计算,只有在宽依赖的部分才会不可避免的通过网络IO进行数据交互,线程4和线程1也可能都在一个Executor中,因此也不是百分百走网络IO,但不能让线程4,5,6都在一个Executor中,那样线程1交换快,但线程2,3就会很慢。那如果这些线程都在一个Executor中呢?那样就变成了全内存
计算,失去了并行的属性,变成了Local模式。
最好的解决方案就是,线程1,2,线程3,4,线程5,6分别处于一个Executor中,那么确保了3个并行度,1/3的内存迭代,2/3的网络IO,是性能最优解。
gpt
内存计算管道(Pipeline):窄依赖的Task可以通过内存迭代在Executor中执行,不需要网络IO。Task1和Task2等彼此不依赖的线条可以在不同分区间独立执行,形成
内存计算管道。
网络IO的影响:在宽依赖处不可避免地会引发网络IO。合理分配Task到不同的Executor可以确保最大程度的内存计算和最小化的网络传输。
并行和内存计算:在每个Executor中合理分配Task,有助于平衡内存和网络IO的开销,最大化性能。最佳方案是将每组Task分配到不同Executor内,确保并行度和内存迭代的平衡。
因此,设置全局并行度后最好不要修改分区数,内存迭代管道长度会极大缩短,会对性能造成影响
Q:Spark怎么做内存计算?DAG的作用?Stage阶段划分的作用?
- Saprk会产生DAG图
- DAG图会基于分区和宽窄依赖关系划分阶段
- 一个阶段内部全是窄依赖,形成前后1:1的分区对应关系,可以产生许多的内存迭代计算的管道
- 这些内存迭代的管道,就是一个个具体的执行Task
- 一个Task在一个线程中运行,任务在Executor的内存中执行,因此主要通过内存计算完成。
Q:Spark为什么比MapReduce快?
- 算子丰富,MapReduce只有Map和Reduce两个算子
- 内存计算,算子交互和计算上可以尽量多的内存计算而非磁盘迭代
Spark并行度
Spark的并行:在同一时间内,有多少个task在同时运行。
并行度:并行能力的设置,设置并行度为6,则就是6个task并行在运行,rdd的分区就被规划为6个分区,先有并行度,再有分区规划
全局并行度设置
全局并行度优先级从高到低
- 代码
- 客户端提交参数
- 配置文件
- 默认(1,但不会全部按1,多数情况基于读取文件的分片数量来作为默认并行度)
配置文件
spark.default.parallesize 100
客户端提交参数
bin/spark-submit --conf "spark.default.parallelism=100"
代码设置
conf=SparkConf()
conf.set("spark.default.parallelise","100")
推荐设置全局并行度,而不是针对RDD改分区,否则可能会影响内存迭代管道的构建或者产生额外的shuffle
根据集群规划并行度
设置为CPU总核心的2~10倍,确保是CPU核心数的整数倍,规划并行度只看集群CPU核心数
CPU核心同一时间只能干一件事,假设有100核心,设置100并行度,那么CPU是100%出力的
在这种情况下,若Task的负载不均衡,就会出现某些Task先完成,导致CPU核心空闲的状态
因此可以把Task数量变多,若300个Task并行,那么同一时间只有100个Task并行,由200个Task在等待
可以确保,当CPU核心空闲时,有Task补上来,最大化利用集群资源
但不能过大,否则Spark调度会困难
一个分区只能由一个Task处理,一个Task可以处理多个RDD一个分区
单独设置核心数并不会自动等于分区数,例如conf = SparkConf().setAppName("test").setMaster("local[4]"),并不是说划分四个分区,而是说Spark可以使用4个
CPU核心数,最多能有四个任务同时执行,此时若再设置并行度为8,那么就是说,首先会有4个Task同时运行,剩余四个Task会等待
核心数设置的具体含义
超线程技术不会增加 Spark 的并行任务数。设置 local[4] 时,最多允许 4 个任务同时运行,超线程只是提升核心的执行效率,但不会让 Spark 将 4 核当作 8 核用。要运行 8 个任务,需要将设置改为 local[8]。
本地模式 (local[n] 或 local[*]):
local[n]:Spark 只在本地运行,n 表示 Spark 可以使用的 CPU 核心数。例如,local[4] 允许 Spark 使用 4 个核心。
local[*]:Spark 在本地模式下使用机器的所有可用核心数,最大化并行能力。
集群模式:
集群总核心数:Spark 在集群模式下运行时,你可以控制每个 Executor 能使用的核心数以及集群中总的核心数。
通过 --executor-cores 参数或 spark.executor.cores 配置项指定每个 Executor 可以使用的核心数。这样可以在每个 Executor 内并行处理多个任务。
使用 --total-executor-cores 参数或配置项指定集群中可供 Spark 应用使用的核心总数,这可以限制 Spark 应用对集群资源的总体占用。
Spark任务调度
Spark的任务调度由Driver进行负责调度
Driver 触发 Action 操作:
- Driver 端的应用程序触发了一个 Action 操作(如 runJob),启动 Spark 作业。
SparkContext 提交 Job: - SparkContext 接收到 Action 请求后调用 DAGScheduler 的 runJob 方法,开始处理作业。
DAGScheduler 划分 Stage 并提交 Task: - DAGScheduler 将作业划分为多个 Stage,并将每个 Stage 转换为一组 Task。
- DAGScheduler 使用 submitTasks 方法将 Task 列表提交给 TaskScheduler。
TaskScheduler 调度任务并请求资源: - TaskScheduler 接收 Task 列表后,通过 reviveOffers 将 Task 加入调度队列,并请求资源。
- SchedulerBackend 开始与集群的资源管理器交互,分配资源并寻找可用的 Executor。
SchedulerBackend 分配任务到 Executor: - SchedulerBackend 使用 launchTask 将任务调度到合适的 Executor 上。
- Executor 端的 ExecutorBackend 接收任务并开始执行。
Executor 执行任务: - ExecutorBackend 执行分配的任务,将结果返回给 Driver。
DAG调度器
DAG 调度器基于逻辑的 DAG 图处理作业,得到任务的划分。
- DAG 调度器负责分析逻辑 DAG 图,将作业划分成逻辑上的 Task,生成执行计划。每个 Task 执行特定的计算任务,并明确了 Task 间的依赖关系(如上图中 Task1 与 Task4、Task5、Task6 的交互关系)。
- 划分任务是不会参考Executor的数量的。
- 在任务划分完成后,Spark 会根据 Executor 配置,将这些 Task 分配到不同的 Executor 上执行。
Executor配置建议 - Executor 的数量可以通过 --num-executors 参数设置,一般推荐将 Executor 数量设为与集群中机器数量相等。Spark 会根据集群管理策略、服务器可用资源和资源限制等参数合理分配这些 Executor。如果每台服务器的资源都足够,Spark 会将 Executor 平均分配到每台服务器上。
三台服务器,每台一个 Executor:推荐这种配置,每台服务器只开一个 Executor。各 Task 的数据交互通过本地内存完成,效率较高。
两台服务器,4 个 Executor
- 若同一台服务器上有多个 Executor,进程间无法直接通过内存交互,而需通过本地回环网络通信(localhost),效率略低于内存通信。因此,建议每台服务器上仅运行一个 Executor 进程,以减少进程间通信开销,提升资源利用率。
- 因此设定Exectutor的数量时,要合理分配核心数和资源隔离,才能发挥多核并行的性能优势。
Task调度器
将DAG调度器的结果分配到具体的Executor上执行,并监控这些Task的工作进度
- TaskScheduler 将来自 DAGScheduler 的 Task 分配到具体的 Executor 上。
- TaskScheduler 负责管理任务队列,并在任务执行过程中监控任务的执行状态。它会根据需要重试失败的任务,确保任务执行的容错性。
Spark名词汇总
名词解释
- Application:一个 Spark 应用程序,包括一个 Driver 程序和在集群上运行的多个 Executor。每次代码运行时会启动一个新的 Spark Application。
- Job:每次在 RDD(弹性分布式数据集)或 DataFrame 上调用 Action 操作(如 count、collect)时都会触发一个 Job。Job 是一个计算任务的逻辑单元,由多个 Task 组成。
- Stage:每个 Job 被划分为多个 Stage。Stage 是一组可以并行执行的任务集合,Stage 划分基于数据依赖性,通常由窄依赖和宽依赖来决定。
- Task:Stage 中的基本执行单元。每个 Task 处理 RDD 或 DataFrame 的一个分区,并由 Executor 执行。
- DAG(Directed Acyclic Graph):作业在 Spark 中的逻辑执行计划,表示计算的依赖关系。Spark 会将 DAG 图划分为多个 Stage,并进一步转化为 Task。
- DAGScheduler:负责将 Job 转换为 DAG 图,进一步划分 Stage,并提交到 TaskScheduler。
- TaskScheduler:负责将 Task 分配到具体的 Executor 上,并监控其执行状态。
- Executor:在集群的 Worker 节点上运行的进程,负责执行 Task,并将计算结果返回给 Driver。Executor 同时负责数据的缓存和存储。
- Driver:运行 Spark 应用主程序的进程,负责 Spark 上下文的创建、任务的调度和集群资源的管理。Driver 是 Application 的控制器,包含了 SparkContext。
- SparkContext:Spark 应用的入口,负责与集群交互,资源申请和管理,维护所有 Job、Stage 和 Task 的调度。SparkContext 是 Application 的生命周期管理单元。
- Cluster Manager:集群资源管理器,Spark 支持 YARN、Mesos 和 Standalone 作为集群管理器,负责分配资源给 Spark 应用。
- RDD(Resilient Distributed Dataset):弹性分布式数据集,Spark 中的核心数据抽象。RDD 是不可变的、可分区的数据集,具有容错机制。
- Partition:RDD 或 DataFrame 的分区,Task 的基本处理单元。每个分区可以由一个 Task 并行处理。
- Action:触发 Spark 执行的操作,如 collect、count 等,生成 Job,并返回结果。
- Transformation:对 RDD 或 DataFrame 的延迟操作,如 map、filter 等,不会立即执行,只有 Action 触发时才执行。
- Wide Dependency:宽依赖,指 RDD 或 DataFrame 的每个分区依赖于多个父分区,通常会引发 Shuffle 操作。
- Narrow Dependency:窄依赖,指每个分区仅依赖一个父分区,不需要 Shuffle 操作,数据依赖较轻。
层级关系梳理
- 一个 Spark 环境可以运行多个 Application
- 一个代码运行起来,会成为一个 Application
- Application 内部可以有多个 Job
- 每个 Job 由一个 Action 产生,并且每个 Job 有自己的 DAG 执行图
- 一个 Job 的 DAG 图会基于宽窄依赖划分成不同的阶段
- 不同阶段内基于分区数量,形成多个内存迭代管道
- 每一个内存迭代管道形成一个 Task(DAG 调度器划分将 Job 内划分出具体的 task 任务,一 个 Job 被划分出来的 task 在逻辑上称之为这个 job 的 taskset)
总结
DAG的作用?
DAG是有向无环图,描述任务的执行流程,主要作用是协助DAG调度器构建Task分配用于任务管理
内存迭代\阶段划分?
基于DAG的宽窄依赖划分节点,每个阶段内部的都是窄依赖可以构建内存迭代管道
DAG调度器是?
构建Task分配用于做任务管理