共享变量

chenxin
11
2024-10-30

广播变量

概念

个人理解
分区的任务是以线程的方式处理的,若一个Executor上有多个分区,且每一个分区都要对Driver上的List对象进行操作,
由于Executor是一个进程,进程中的线程共享资源,若Dirver把本地List对象向每一个分区线程都发送一份,那么不仅网络IO操作增加,并且还会占用更多的内存。
因此Spark引入了广播变量,当向一个Executor的一个分区线程发送过本地资源后,若此Executor还有其他分区也需要这个本地资源,Spark会先检查这个Executor是否已>经有了这个资源,若有则不在发送这个资源。
gpt精简版
在Spark中,广播变量用于减少网络IO和内存开销。当多个分区线程需要操作Driver上的同一个资源时,Spark只会在首次发送该资源给Executor。之后,若此Executor的>其他分区也需要此资源,则无需再次发送,从而优化了资源使用。

Spark广播变量
上图中,List若被标记为广播变量,则只会向两个Executor分别发送一份这个对象,否则会发送四次

语法

#list是Driver资源,声明广播变量,此时的broad不是一个rdd
broad = sc.broadcast(list)
# 使用广播变量
value = broad.value
# 先放进broad内部,用的时候取出,传输的是broad这个对象

示例

# coding:utf8
from pyspark import SparkConf, SparkContext
import os

if __name__ == '__main__':
    os.environ['PYSPARK_PYTHON'] = "D:\dev\python\python3.11.4\python.exe"
    conf = SparkConf().setAppName("test").setMaster("local[*]")
    sc = SparkContext(conf=conf)

    stu_info_list = [
        (1, "张大山", 11),
        (2, "王美丽", 13),
        (3, "张明亮", 11),
        (4, "王小力", 11)
    ]

    # 1. 将Python List广播为广播变量
    broadcast = sc.broadcast(stu_info_list)

    score_info_rdd = sc.parallelize([
        (1, "语文", 99),
        (2, "语文", 99),
        (3, "英语", 99),
        (4, "英语", 99),
        (1, "数学", 99),
        (2, "数学", 99),
        (3, "物理", 99),
        (4, "物理", 99),
        (1, "化学", 99),
        (2, "化学", 99)
    ])

    def map_func(data):
        id = data[0]
        name = ""
        # 配合本地list与分布式rdd中的学生ID 匹配成功即可获取当前学生的姓名
        # 在使用集群和海量数据的能力时,从广播变量中取出使用即可
        for stu_info in broadcast.value:
            stu_id = stu_info[0]
            if id == stu_id:
                name = stu_info[1]
        return (name, data[1], data[2])

    # Applying map function and collecting results
    print(score_info_rdd.map(map_func).collect())
'''
场景:本地集合对象 和 分布式集合对象(RDD)进行关联的时候
适用于本地集合不是很大的情况,否则会占用Driver的大量内存
两个 RDD 之间的关联需要通过 shuffle 来重新分配数据到适当的分区,这会带来较大的网络IO和计算开销。因此,对于小型的本地集合,与 RDD 关联更为高效。
'''

累加器

问题提出

# 需求是,统计这个rdd中有多少个元素
from pyspark import SparkConf, SparkContext
import os
if __name__ == '__main__':
    os.environ['PYSPARK_PYTHON'] = "D:\dev\python\python3.11.4\python.exe"
    conf = SparkConf().setAppName("test").setMaster("local[*]")
    sc = SparkContext(conf=conf)
    rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], 2)
    count=0
    def map_func(data):
        global count
        count += 1
        print(count)
    rdd.map(map_func).collect()
    print(count)

这段代码实际上并不能统计出rdd中的所有元素个数,在sprak程序中,除了对rdd进行的操作外,其余都运行在driver中。
上述rdd被分成了两根分区,因此,dirver会把count分别发送给这两个分区,每个分区统计的结果是5,但这只是在一个分区的结果,也就是说对于driver的count并没
影响,dirver的count仍然是0。


运行结果
未使用累加器

累加器

在 Spark 中,累加器(Accumulator) 是一种共享变量,用于在分布式任务中聚合数据。累加器的特点包括:
累加操作:每个任务可以对累加器进行增量操作,结果在行动操作(如 collect())后汇总到驱动端。
只增不减:累加器只能执行累加操作,适合用于计数等用途。
延迟更新:累加器的值在任务完成后才会在驱动端可见,因此在任务中实时读取累加器值可能不准确。

语法

# 初始化累加器为0
acmlt=sc.accumulator(0)
# 累加操作
acmlt+=1

示例

from pyspark import SparkConf, SparkContext
import os
if __name__ == '__main__':
    os.environ['PYSPARK_PYTHON'] = "D:\dev\python\python3.11.4\python.exe"
    conf = SparkConf().setAppName("test").setMaster("local[*]")
    sc = SparkContext(conf=conf)
    rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], 2)
  # 初始化累加器count为0
    count=sc.accumulator(0)
    def map_func(data):
        global count
        count += 1
        print(count)

    rdd.map(map_func).collect()
    print(count)
# 在使用了累加器后,打印driver上的count就正常显示10

累加器注意事项

rdd执行行动算子之后,就会消失,如果重新构建rdd,那么累加器仍然会工作,也就会导致累加器得到的结果出错。

动物装饰