广播变量
概念
个人理解
分区的任务是以线程的方式处理的,若一个Executor上有多个分区,且每一个分区都要对Driver上的List对象进行操作,
由于Executor是一个进程,进程中的线程共享资源,若Dirver把本地List对象向每一个分区线程都发送一份,那么不仅网络IO操作增加,并且还会占用更多的内存。
因此Spark引入了广播变量,当向一个Executor的一个分区线程发送过本地资源后,若此Executor还有其他分区也需要这个本地资源,Spark会先检查这个Executor是否已>经有了这个资源,若有则不在发送这个资源。
gpt精简版
在Spark中,广播变量用于减少网络IO和内存开销。当多个分区线程需要操作Driver上的同一个资源时,Spark只会在首次发送该资源给Executor。之后,若此Executor的>其他分区也需要此资源,则无需再次发送,从而优化了资源使用。
上图中,List若被标记为广播变量,则只会向两个Executor分别发送一份这个对象,否则会发送四次
语法
#list是Driver资源,声明广播变量,此时的broad不是一个rdd
broad = sc.broadcast(list)
# 使用广播变量
value = broad.value
# 先放进broad内部,用的时候取出,传输的是broad这个对象
示例
# coding:utf8
from pyspark import SparkConf, SparkContext
import os
if __name__ == '__main__':
os.environ['PYSPARK_PYTHON'] = "D:\dev\python\python3.11.4\python.exe"
conf = SparkConf().setAppName("test").setMaster("local[*]")
sc = SparkContext(conf=conf)
stu_info_list = [
(1, "张大山", 11),
(2, "王美丽", 13),
(3, "张明亮", 11),
(4, "王小力", 11)
]
# 1. 将Python List广播为广播变量
broadcast = sc.broadcast(stu_info_list)
score_info_rdd = sc.parallelize([
(1, "语文", 99),
(2, "语文", 99),
(3, "英语", 99),
(4, "英语", 99),
(1, "数学", 99),
(2, "数学", 99),
(3, "物理", 99),
(4, "物理", 99),
(1, "化学", 99),
(2, "化学", 99)
])
def map_func(data):
id = data[0]
name = ""
# 配合本地list与分布式rdd中的学生ID 匹配成功即可获取当前学生的姓名
# 在使用集群和海量数据的能力时,从广播变量中取出使用即可
for stu_info in broadcast.value:
stu_id = stu_info[0]
if id == stu_id:
name = stu_info[1]
return (name, data[1], data[2])
# Applying map function and collecting results
print(score_info_rdd.map(map_func).collect())
'''
场景:本地集合对象 和 分布式集合对象(RDD)进行关联的时候
适用于本地集合不是很大的情况,否则会占用Driver的大量内存
两个 RDD 之间的关联需要通过 shuffle 来重新分配数据到适当的分区,这会带来较大的网络IO和计算开销。因此,对于小型的本地集合,与 RDD 关联更为高效。
'''
累加器
问题提出
# 需求是,统计这个rdd中有多少个元素
from pyspark import SparkConf, SparkContext
import os
if __name__ == '__main__':
os.environ['PYSPARK_PYTHON'] = "D:\dev\python\python3.11.4\python.exe"
conf = SparkConf().setAppName("test").setMaster("local[*]")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], 2)
count=0
def map_func(data):
global count
count += 1
print(count)
rdd.map(map_func).collect()
print(count)
这段代码实际上并不能统计出rdd中的所有元素个数,在sprak程序中,除了对rdd进行的操作外,其余都运行在driver中。
上述rdd被分成了两根分区,因此,dirver会把count分别发送给这两个分区,每个分区统计的结果是5,但这只是在一个分区的结果,也就是说对于driver的count并没
影响,dirver的count仍然是0。
运行结果
累加器
在 Spark 中,累加器(Accumulator) 是一种共享变量,用于在分布式任务中聚合数据。累加器的特点包括:
累加操作:每个任务可以对累加器进行增量操作,结果在行动操作(如 collect())后汇总到驱动端。
只增不减:累加器只能执行累加操作,适合用于计数等用途。
延迟更新:累加器的值在任务完成后才会在驱动端可见,因此在任务中实时读取累加器值可能不准确。
语法
# 初始化累加器为0
acmlt=sc.accumulator(0)
# 累加操作
acmlt+=1
示例
from pyspark import SparkConf, SparkContext
import os
if __name__ == '__main__':
os.environ['PYSPARK_PYTHON'] = "D:\dev\python\python3.11.4\python.exe"
conf = SparkConf().setAppName("test").setMaster("local[*]")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], 2)
# 初始化累加器count为0
count=sc.accumulator(0)
def map_func(data):
global count
count += 1
print(count)
rdd.map(map_func).collect()
print(count)
# 在使用了累加器后,打印driver上的count就正常显示10
累加器注意事项
rdd执行行动算子之后,就会消失,如果重新构建rdd,那么累加器仍然会工作,也就会导致累加器得到的结果出错。