Spark内核调度

chenxin
22
2024-10-30

title: Spark内核调度
id: 5bd794b9-a8da-496c-9c1e-e44a2dd4a777
date: 2024-10-30 17:12:47
auther: chenxin
cover:
excerpt: DAG DAG概念 DAG:有向无环图 有向:有方向 无环:没有闭环 DAG有方向没有形成闭环的一个执行流程图,用于标记代码逻辑执行流程 Job和Action 行动算子作为触发开关,会将行动算子之前的一串rdd依赖链条执行起来。
permalink: /archives/sparknei-he-diao-du
categories:

  • spark
  • bigdata
    tags:

DAG

DAG概念

DAG:有向无环图
有向:有方向
无环:没有闭环

DAG:有方向没有形成闭环的一个执行流程图,用于标记代码逻辑执行流程

Job和Action

行动算子作为触发开关,会将行动算子之前的一串rdd依赖链条执行起来。

一个行动算子会产生一个Job(一个应用程序的子任务),每一个Job都有各自自己的DAG图,上图可以看作两个DAG图。

1个ACTION=1个DAG=1个JOB
多Job的情况:如果一段代码中包含三个Action,则会产生三个Job,每个Job拥有自己的DAG图。
Spark应用程序(Application):在Spark中,运行一个完整的代码称为一个Application,其中可能包含多个Job。每个Job由一个Action触发,包含一个DAG。

DAG和分区

代码没有运行时,只能得到基础的DAG,运行后,Spark会在内部规划出带有分区关系的DAG图。

假设只有三个分区并行

DAG的宽窄依赖和阶段划分

宽窄依赖

窄依赖:父RDD的一个分区,全部将数据发给子RDD的一个分区
宽依赖:父RDD的一个分区,将数据发送给子RDD的多个分区,宽依赖也叫shuffle

左图为窄依赖,右图为宽依赖

阶段划分

DAG图从后向前,遇到宽依赖就划分出一个阶段,称为stage。
在stage的内部一定都是窄依赖。

基于宽依赖,将DAG划分为了两个stage

在Spark中,一个应用程序可能包含多个Job,一个Job的DAG图会包含多个Stage,每个Stage包含多个Task
Task是实际执行的最小单位。

内存迭代计算

个人理解
因为每一个阶段内都是窄依赖,所以Task1和Task2所在的线条互不依赖,因此三条线运行互不影响,相互独立。
上图中,RDD1分区1到RDD2分区1和RDD2分区1到RDD3分区1之间都可以用一个线程解决,但线程运行在Executor中,若这些线程不在同一个Executor中,那么数据
就要通过网络交互,会造成性能浪费。
Task1所在的线条,若只用一个线程解决,则RDD1分区1和RDD2分区1之间不需要网络IO,可以直接通过内存交互,那么Task1这条线就叫做内存计算管道
(PipeLine)
,则上图作业就可以由三个线程并行计算,只有在宽依赖的部分才会不可避免的通过网络IO进行数据交互,线程4和线程1也可能都在一个Executor中,因此也不是百分百走网络IO,但不能让线程4,5,6都在一个Executor中,那样线程1交换快,但线程2,3就会很慢。那如果这些线程都在一个Executor中呢?那样就变成了全内存
计算,失去了并行的属性,变成了Local模式。
最好的解决方案就是,线程1,2,线程3,4,线程5,6分别处于一个Executor中,那么确保了3个并行度,1/3的内存迭代,2/3的网络IO,是性能最优解。
gpt
内存计算管道(Pipeline):窄依赖的Task可以通过内存迭代在Executor中执行,不需要网络IO。Task1和Task2等彼此不依赖的线条可以在不同分区间独立执行,形成
内存计算管道。
网络IO的影响:在宽依赖处不可避免地会引发网络IO。合理分配Task到不同的Executor可以确保最大程度的内存计算和最小化的网络传输。
并行和内存计算:在每个Executor中合理分配Task,有助于平衡内存和网络IO的开销,最大化性能。最佳方案是将每组Task分配到不同Executor内,确保并行度和内存迭代的平衡。

因此,设置全局并行度后最好不要修改分区数,内存迭代管道长度会极大缩短,会对性能造成影响

Q:Spark怎么做内存计算?DAG的作用?Stage阶段划分的作用?

  • Saprk会产生DAG图
  • DAG图会基于分区和宽窄依赖关系划分阶段
  • 一个阶段内部全是窄依赖,形成前后1:1的分区对应关系,可以产生许多的内存迭代计算的管道
  • 这些内存迭代的管道,就是一个个具体的执行Task
  • 一个Task在一个线程中运行,任务在Executor的内存中执行,因此主要通过内存计算完成。

Q:Spark为什么比MapReduce快?

  • 算子丰富,MapReduce只有Map和Reduce两个算子
  • 内存计算,算子交互和计算上可以尽量多的内存计算而非磁盘迭代

Spark并行度

Spark的并行:在同一时间内,有多少个task在同时运行。
并行度:并行能力的设置,设置并行度为6,则就是6个task并行在运行,rdd的分区就被规划为6个分区,先有并行度,再有分区规划

全局并行度设置

全局并行度优先级从高到低

  • 代码
  • 客户端提交参数
  • 配置文件
  • 默认(1,但不会全部按1,多数情况基于读取文件的分片数量来作为默认并行度)
    配置文件
spark.default.parallesize 100

客户端提交参数

bin/spark-submit --conf "spark.default.parallelism=100"

代码设置

conf=SparkConf()
conf.set("spark.default.parallelise","100")

推荐设置全局并行度,而不是针对RDD改分区,否则可能会影响内存迭代管道的构建或者产生额外的shuffle

根据集群规划并行度

设置为CPU总核心的2~10倍,确保是CPU核心数的整数倍,规划并行度只看集群CPU核心数

CPU核心同一时间只能干一件事,假设有100核心,设置100并行度,那么CPU是100%出力的
在这种情况下,若Task的负载不均衡,就会出现某些Task先完成,导致CPU核心空闲的状态
因此可以把Task数量变多,若300个Task并行,那么同一时间只有100个Task并行,由200个Task在等待
可以确保,当CPU核心空闲时,有Task补上来,最大化利用集群资源
但不能过大,否则Spark调度会困难

一个分区只能由一个Task处理,一个Task可以处理多个RDD一个分区

单独设置核心数并不会自动等于分区数,例如conf = SparkConf().setAppName("test").setMaster("local[4]"),并不是说划分四个分区,而是说Spark可以使用4个
CPU核心数,最多能有四个任务同时执行,此时若再设置并行度为8,那么就是说,首先会有4个Task同时运行,剩余四个Task会等待

核心数设置的具体含义

超线程技术不会增加 Spark 的并行任务数。设置 local[4] 时,最多允许 4 个任务同时运行,超线程只是提升核心的执行效率,但不会让 Spark 将 4 核当作 8 核用。要运行 8 个任务,需要将设置改为 local[8]。

本地模式 (local[n] 或 local[*]):

local[n]:Spark 只在本地运行,n 表示 Spark 可以使用的 CPU 核心数。例如,local[4] 允许 Spark 使用 4 个核心。
local[*]:Spark 在本地模式下使用机器的所有可用核心数,最大化并行能力。
集群模式:

集群总核心数:Spark 在集群模式下运行时,你可以控制每个 Executor 能使用的核心数以及集群中总的核心数。
通过 --executor-cores 参数或 spark.executor.cores 配置项指定每个 Executor 可以使用的核心数。这样可以在每个 Executor 内并行处理多个任务。
使用 --total-executor-cores 参数或配置项指定集群中可供 Spark 应用使用的核心总数,这可以限制 Spark 应用对集群资源的总体占用。

Spark任务调度

Spark的任务调度由Driver进行负责调度


Driver 触发 Action 操作:

  • Driver 端的应用程序触发了一个 Action 操作(如 runJob),启动 Spark 作业。
    SparkContext 提交 Job:
  • SparkContext 接收到 Action 请求后调用 DAGScheduler 的 runJob 方法,开始处理作业。
    DAGScheduler 划分 Stage 并提交 Task:
  • DAGScheduler 将作业划分为多个 Stage,并将每个 Stage 转换为一组 Task。
  • DAGScheduler 使用 submitTasks 方法将 Task 列表提交给 TaskScheduler。
    TaskScheduler 调度任务并请求资源:
  • TaskScheduler 接收 Task 列表后,通过 reviveOffers 将 Task 加入调度队列,并请求资源。
  • SchedulerBackend 开始与集群的资源管理器交互,分配资源并寻找可用的 Executor。
    SchedulerBackend 分配任务到 Executor:
  • SchedulerBackend 使用 launchTask 将任务调度到合适的 Executor 上。
  • Executor 端的 ExecutorBackend 接收任务并开始执行。
    Executor 执行任务:
  • ExecutorBackend 执行分配的任务,将结果返回给 Driver。

DAG调度器

DAG 调度器基于逻辑的 DAG 图处理作业,得到任务的划分。

  • DAG 调度器负责分析逻辑 DAG 图,将作业划分成逻辑上的 Task,生成执行计划。每个 Task 执行特定的计算任务,并明确了 Task 间的依赖关系(如上图中 Task1 与 Task4、Task5、Task6 的交互关系)。
  • 划分任务是不会参考Executor的数量的。
  • 在任务划分完成后,Spark 会根据 Executor 配置,将这些 Task 分配到不同的 Executor 上执行。
    Executor配置建议
  • Executor 的数量可以通过 --num-executors 参数设置,一般推荐将 Executor 数量设为与集群中机器数量相等。Spark 会根据集群管理策略、服务器可用资源和资源限制等参数合理分配这些 Executor。如果每台服务器的资源都足够,Spark 会将 Executor 平均分配到每台服务器上。

三台服务器,每台一个 Executor:推荐这种配置,每台服务器只开一个 Executor。各 Task 的数据交互通过本地内存完成,效率较高。

两台服务器,4 个 Executor

  • 若同一台服务器上有多个 Executor,进程间无法直接通过内存交互,而需通过本地回环网络通信(localhost),效率略低于内存通信。因此,建议每台服务器上仅运行一个 Executor 进程,以减少进程间通信开销,提升资源利用率。
  • 因此设定Exectutor的数量时,要合理分配核心数和资源隔离,才能发挥多核并行的性能优势。

Task调度器

将DAG调度器的结果分配到具体的Executor上执行,并监控这些Task的工作进度

  • TaskScheduler 将来自 DAGScheduler 的 Task 分配到具体的 Executor 上。
  • TaskScheduler 负责管理任务队列,并在任务执行过程中监控任务的执行状态。它会根据需要重试失败的任务,确保任务执行的容错性。

Spark名词汇总

名词解释

  • Application:一个 Spark 应用程序,包括一个 Driver 程序和在集群上运行的多个 Executor。每次代码运行时会启动一个新的 Spark Application。
  • Job:每次在 RDD(弹性分布式数据集)或 DataFrame 上调用 Action 操作(如 count、collect)时都会触发一个 Job。Job 是一个计算任务的逻辑单元,由多个 Task 组成。
  • Stage:每个 Job 被划分为多个 Stage。Stage 是一组可以并行执行的任务集合,Stage 划分基于数据依赖性,通常由窄依赖和宽依赖来决定。
  • Task:Stage 中的基本执行单元。每个 Task 处理 RDD 或 DataFrame 的一个分区,并由 Executor 执行。
  • DAG(Directed Acyclic Graph):作业在 Spark 中的逻辑执行计划,表示计算的依赖关系。Spark 会将 DAG 图划分为多个 Stage,并进一步转化为 Task。
  • DAGScheduler:负责将 Job 转换为 DAG 图,进一步划分 Stage,并提交到 TaskScheduler。
  • TaskScheduler:负责将 Task 分配到具体的 Executor 上,并监控其执行状态。
  • Executor:在集群的 Worker 节点上运行的进程,负责执行 Task,并将计算结果返回给 Driver。Executor 同时负责数据的缓存和存储。
  • Driver:运行 Spark 应用主程序的进程,负责 Spark 上下文的创建、任务的调度和集群资源的管理。Driver 是 Application 的控制器,包含了 SparkContext。
  • SparkContext:Spark 应用的入口,负责与集群交互,资源申请和管理,维护所有 Job、Stage 和 Task 的调度。SparkContext 是 Application 的生命周期管理单元。
  • Cluster Manager:集群资源管理器,Spark 支持 YARN、Mesos 和 Standalone 作为集群管理器,负责分配资源给 Spark 应用。
  • RDD(Resilient Distributed Dataset):弹性分布式数据集,Spark 中的核心数据抽象。RDD 是不可变的、可分区的数据集,具有容错机制。
  • Partition:RDD 或 DataFrame 的分区,Task 的基本处理单元。每个分区可以由一个 Task 并行处理。
  • Action:触发 Spark 执行的操作,如 collect、count 等,生成 Job,并返回结果。
  • Transformation:对 RDD 或 DataFrame 的延迟操作,如 map、filter 等,不会立即执行,只有 Action 触发时才执行。
  • Wide Dependency:宽依赖,指 RDD 或 DataFrame 的每个分区依赖于多个父分区,通常会引发 Shuffle 操作。
  • Narrow Dependency:窄依赖,指每个分区仅依赖一个父分区,不需要 Shuffle 操作,数据依赖较轻。

层级关系梳理

  • 一个 Spark 环境可以运行多个 Application
  • 一个代码运行起来,会成为一个 Application
  • Application 内部可以有多个 Job
  • 每个 Job 由一个 Action 产生,并且每个 Job 有自己的 DAG 执行图
  • 一个 Job 的 DAG 图会基于宽窄依赖划分成不同的阶段
  • 不同阶段内基于分区数量,形成多个内存迭代管道
  • 每一个内存迭代管道形成一个 Task(DAG 调度器划分将 Job 内划分出具体的 task 任务,一 个 Job 被划分出来的 task 在逻辑上称之为这个 job 的 taskset)

总结

DAG的作用?

DAG是有向无环图,描述任务的执行流程,主要作用是协助DAG调度器构建Task分配用于任务管理

内存迭代\阶段划分?

基于DAG的宽窄依赖划分节点,每个阶段内部的都是窄依赖可以构建内存迭代管道

DAG调度器是?

构建Task分配用于做任务管理

动物装饰