RDD

chenxin
67
2024-10-08

RDD详解

RDD定义

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,Spark中最基本的数据抽象,代表一个不可变可分区,里面元素可并行计算的集合。

  • Dataset:用于存放数据的数据集合。比如List,Dict

  • Distributed:Rdd中的数据是分布式存储的,可以用于分布式计算。(RDD的数据是跨域机器存储的(不同进程))

  • Resilient:RDD的数据可以存储在内存或者磁盘中。

    RDD特性

RDD是有分区的

RDD是一个抽象对象,而分区是实际的物理概念

假设一个RDD有1至6个数字。
RDD分区

RDD的方法会作用在所有分区上

对RDD中的数据乘以10倍操作。
RDD的方法作用在每个分区上

RDD之间存在依赖关系(血缘关系)

RDD之间的转换可以练成一条线条,类似于血缘关系。

RDD之间的依赖关系

Key-Value型的RDD可以有分区器

kv型的RDD默认是Hash分区规则,我们可以手动设置分区器。
这个特性只针对Key-Value型的RDD。
kv型的RDD是存储的是二元元组的形式。

如有('hadoop',1),('hadoop',2),('hbase',1),('hbase',2)那么key为hadoop和key为hbase的各分在一个区。

RDD的分区规划会尽量靠近数据所在的服务器

在初始化RDD规划的时候,分区会尽量规划到存储数据所在的服务器上,可以减少网络读取的开销。
Spark会在确保并行能力计算能力的前提下,尽量确保本地读取。

RDD分区会尽量靠近服务器

WorldCount图示

WorldCount

RDD编程

SparkContext

SparkContext对象是Spark RDD编程是程序入口对象。
SparkContext对象主要功能就是创建出第一个的RDD。

RDD创建

并行化创建

本地集合->分布式RDD

paralllieze

# 参数1,集合对象
# 参数2,分区数,没有给定分区数的时候,默认分区数根据CPU核心数
rdd=sparkcontext.parallelize(参数1,参数2)

读取文件创建

textFile

可以读取本地文件,也可以读取hdfs文件系统数据。
testFile默认分区与CPU核心数无关,如果是本地文件系统则根据文件大小,若是hdfs文件系统则根据block块的数量。

textFile

# 参数1,文件路径
# 参数2,可选,最小分区数
# 参数2在Spark允许的范围之内生效
rdd=sparkcontext.textFile(参数1,参数2)

wholeTextFile

适合读取一堆小文件
偏向于少量分区读取数据,若小文件数据少且分区又多,那么shuffle的几率更高
尽量少分区读取数据

wholeTextFile读取文件夹内容时,不是把所有文件混杂在一起,而是对象的形式,每一个对象存储一个文件的内容,key为文件路径,value为文件内容,每个对象是一个小元组
例如(“D://text/1.txt”,"value")

#参数1,必选,文件路径文件夹,读取文件夹所有文件
#参数2,可选,最小分数量
#参数2话语权不足,分区数最多开到文件数量。
rdd=sparkcontext.wholeTextFile(参数1,参数2)
#收集文件内容
print(rdd.map(lambda x : x[1]).collect())

RDD算子

分布式集合对象上的api称为算子。
RDD的算子分为:
Transformation:转换算子
Action:动作(行动)算子

Transformation

RDD的算子,返回值仍然是一个RDD的,称为转换算子。
这类算子是lazy 懒加载,若没有action算子,transformation算子不工作。

Action

RDD的算子,返回值不是RDD

转换算子构建执行计划,行动算子触发任务

查看分区数

尽量不要增加分区,可能会破坏内存迭代的计算管道

# getNumPartitions是Python的函数,返回RDD或DataFrame的当前分区数量
# 创建一个 RDD
rdd = sc.parallelize([1, 2, 3, 4, 5], numSlices=3)

# 获取分区数
num_partitions = rdd.getNumPartitions()

print("Number of partitions:", num_partitions)

转换算子(Transformation)

转换算子不会立即计算,只是定义新的RDD对象,直至遇到行动算子才会执行计算

map

  • 作用:对RDD的每一个元素应用函数,返回一个新的RDD
  • 语法: rdd.map(function)
rdd = sc.parallelize([1,2,3,4,5])
map_rdd = rdd.map(lambda x : x+1)
print(map_rdd.collect()) #输出[2,3,4,5,6]

flatMap

  • 作用:对每个元素都应用一个函数,并展开返回多个元素,展平为一个RDD,实现RDD的扁平化
  • 语法:rdd.flatMap(function)
rdd = sc.parallelize(["hello world", "foo bar"])
flat_rdd = rdd.flatMap(lambda line : line.split(" "))
print(flat_mapped_rdd.collect())  # 输出:['hello', 'world', 'foo', 'bar']

filter

  • 作用:根据条件过滤每一个元素,满足条件的元素组成一个新的RDD并返回这个RDD
  • 语法:rdd.filter(function)
rdd = sc.parallelize([1, 2, 3, 4, 5])
filter_rdd = rdd.filter(lambda x : x%2 == 0)
print(filtered_rdd.collect())  # 输出:[2, 4]

distinct

  • 作用:去除 RDD 中的重复元素。
  • 语法:rdd.distinct()
rdd = sc.parallelize([1,2,2,4,4])
distinct_rdd = rdd.distinct()
print(distinct_rdd.collect())  # 输出:[1, 2, 3, 4]

groupBy

  • 作用:将 RDD 中的元素按指定条件进行分组。
  • 语法:rdd.groupBy(function)
rdd = sc.parallelize([1, 2, 3, 4, 5])
grouped_rdd = rdd.groupBy(lambda x : x%2) #根据元素除以2的余数分组,分组后的RDD的每个元素是k:v形式的键值对,k为余数,v为其分组元素
print({k:list(v) for k,v in grouped_rdd.collect()})# 输出:{1: [1, 3, 5], 0: [2, 4]}

groupByKey

groupByKey 是 Spark RDD 的一种按键分组的方法,通常用于将相同键的值集合在一起。

groupByKey 的功能
基本功能:将一个 (key, value) 形式的 RDD 按键分组,生成 (key, Iterable[values]) 的 RDD。
数据结构:结果中的每个键值对由键和一个可迭代对象(Iterable)构成,包含与该键关联的所有值。

groupByKey 的优缺点
优点:

保留完整数据:能够保留每个键的所有值,为后续操作提供更多信息。
灵活性:适合需要对每个键的所有值进行操作的场景。
缺点:

性能问题:groupByKey 可能会导致高内存使用和网络开销,特别是对于大规模数据。由于要将所有值拉到一个分区进行分组,在数据倾斜情况下可能出现内存不足的问题。
数据倾斜:当某些键的值特别多时,某些分区的负载可能会显著增大,导致执行效率降低。

rdd = sc.sc.parallelize([('math',1),('math',2),('music',2),('music',7)])
grouped_rdd = rdd.groupByKey().mapValues(list)
print(grouped_rdd.collect())
# [('math',[1,2]),('music',[2,7])]

reduceByKey

  • 作用:对 (key, value) RDD 中相同 key 的元素进行合并。
  • 语法:rdd.reduceByKey(function)
# 对值进行操作
rdd = sc.parallelize([('a', 1), ('b', 1), ('a', 1)])
reduced_rdd = rdd.reduceByKey(lambda x,y : x+y)
print(reduced_rdd.collect())  # 输出:[('a', 2), ('b', 1)]

join

  • 作用:对两个 (key, value) 形式的 RDD 进行内连接。
  • 语法:rdd1.join(rdd2)
rdd1 = sc.parallelize([('a', 1), ('b', 2)])
rdd2 = sc.parallelize([('a', 3), ('b', 4)])
joined_rdd = rdd1.join(rdd2)
print(joined_rdd.collect())  # 输出:[('a', (1, 3)), ('b', (2, 4))]

sortBy

  • 作用:对RDD元素按照指定的键进行排序
  • 语法:rdd.sortBy(function,ascending=True),True表示升序排序,False表示降序排序
rdd = sc.parallelize([('a', 3), ('b', 1), ('c', 2)])
sorted_rdd = rdd.sortBy(lambda x : x[1],ascending=True) #指定根据元素的index为1的键按照升序排序
print(sorted_rdd.collect())  # 输出:[('b', 1), ('c', 2), ('a', 3)]

union

  • 作用:合并两个 RDD,返回包含两个 RDD 的并集。
  • 语法:rdd1.union(rdd2)
rdd1 = sc.parallelize([1, 2, 3])
rdd2 = sc.parallelize([3, 4, 5])
union_rdd = rdd1.union(rdd2)
print(union_rdd.collect())  # 输出:[1, 2, 3, 3, 4, 5]

sample

  • 作用:从 RDD 中随机抽样,返回一个新 RDD。
  • 语法:rdd.sample(withReplacement, fraction, seed)
    #withReplacement:布尔值,True则有放回抽样,False则为不放回抽样
    #fraction:浮点数,表示抽样的比例,如0.4就代表抽取40%的元素
    #seed:可选参数,整数值。用于指定随机数生成器的种子,以确保抽样结果的可重复性。相同的 seed 值会生成相同的随机样本。
    rdd = sc.parallelize([1, 2, 3, 4, 5])
    sample_rdd = rdd.sample(False, 0.4) # 不放回抽样,抽取40%个元素
    print(sample_rdd.collect())  # 可能输出:[2, 4]
    

mapPartitions

  • 作用:对每个分区元素进行批量处理
  • 语法:rdd.mapPartitions(function)
rdd=sc.parallelize([1,2,3,4],2)#创建一个RDD,包含四个元素,并分为两个分区
#iter即为迭代器,这个迭代器包含一个分区的所有元素,sum(iter)计算分区中的所有元素的和,[sum(iter)] 将结果包裹在一个列表,
#因为mapPartitions也返回一个迭代器,而列表本身就是一个迭代器
partition_sum = rdd.mapPartitions(lambda iter : [sum(iter)]) 
#对第一个分区 [1, 2],sum([1, 2]) 返回 3,结果是 [3]。
#对第二个分区 [3, 4],sum([3, 4]) 返回 7,结果是 [7]。
#collect() 将所有分区中的结果收集到一个列表中。
#由于每个分区计算的和分别是 [3] 和 [7],最终结果是 [3, 7]。
print(partition_sums.collect())  # 输出:[3, 7]

mapValues

  • 作用:对键值对 RDD(键-值对形式的数据)中的 进行变换的转换算子,只对值进行操作,保持键不变。
  • 语法:rdd.mapValues(func)
#rdd:这是一个键值对形式的 RDD(通常是形如 (key, value) 的二元组)。
#func:要应用于每个值的函数。
rdd = sc.parallelize([("apple", 1), ("banana", 2), ("apple", 3), ("banana", 4), ("cherry", 5)])
result = rdd.mapValues(lambda value: value * 10)
print(result.collect())  # 输出: [('apple', 10), ('banana', 20), ('apple', 30), ('banana', 40), ('cherry', 50)]

aggregateByKey

  • 作用:对于同一个键的值进行聚合,可以自定义分区内和跨分区聚合逻辑
  • 语法:rdd.aggregateByKey(zeroValue,seqFunc,comFunc)
    eg:求相同key的value的和
# zeroValue:初始值,用于防止空指的聚合初始值。
# seqFunc:分区内聚合函数,将本分区内相同键的值聚合到一起
# comFunc:分区间聚合函数,合并不同分区的聚合结果
rdd = sc.parallelize([("a", 1), ("b", 2), ("a", 3), ("b", 4), ("a", 5)])
result_rdd = rdd.aggregateByKey(
  0,#初始值,表示对每个键在每个分区中初始化累加器的初始值。
  lambda acc,value:acc+value,#分区内聚合逻辑
  lambda acc1,acc2:acc1+acc2#分区键聚合逻辑
)
print(result_rdd.collect())#输出[('a', 9), ('b', 6)]

combineByKey

  • 作用:对同一个键的值进行更灵活的聚合操作,允许自定义创建累加器,分区内累加和分区间累加
  • 方法签名:rdd.combineByKey(createCombiner, mergeValue, mergeCombiners)
    eg:求计算每个键的平均值
# createCombiner:将第一个值转换为累加器的的函数
#mergeValue:分区内聚合函数,将新值合并到累加器
#mergeCombiners:分区间聚合函数,合并不同分区累加器
rdd = sc.parallelize([("a", 1), ("b", 2), ("a", 3), ("b", 4), ("a", 5)])
result_rdd=rdd.combineByKey(
  lambda value : (value,1),#对每一个键的第一个值,创建累加器为二元组将其转换为元组 (value, 1),value为当前值,1是计数
  lambda acc,value:(acc[0]+value,acc[1]+1),#分区内聚合,acc为累加器,value为key相同的值,因此需要值加上value,出现次数加一
  lambda acc1:acc2:(acc1[0]+acc2[0],acc1[1]+acc2[1])# acc1和acc2都是累加器,key相同的值和次数全部聚合
)
avg_rdd=result_rdd.mapValues(lambda acc : acc[0]/acc[1])
print(avg_rdd.collect())# 输出: [('a', 3.0), ('b', 3.0)]
  

行动算子(Actions)

行动算子触发 RDD 的计算,并返回结果或将结果写入存储。

collect

  • 作用:将 RDD 中的所有元素收集到一个列表中。
  • 语法:rdd.collect()
rdd = sc.parallelize([1, 2, 3])
print(rdd.collect())  # 输出:[1, 2, 3]

count

  • 作用:返回 RDD 中元素的总数。
  • 语法:rdd.count()
rdd = sc.parallelize([1, 2, 3])
print(rdd.count())  # 输出:3

first

  • 作用:返回 RDD 中的第一个元素。
  • 语法:rdd.first()
rdd = sc.parallelize([1, 2, 3])
print(rdd.first())  # 输出:1

take

  • 作用:返回 RDD 中前 n 个元素。
  • 语法:rdd.take(n)
rdd = sc.parallelize([1, 2, 3])
print(rdd.take(2))  # 输出:[1, 2]

reduce

  • 作用:通过二元操作聚合 RDD 中的所有元素,返回一个结果。
  • 语法:rdd.reduce(function)
rdd = sc.parallelize([1, 2, 3])
result = rdd.reduce(lambda x, y: x + y)
print(result)  # 输出:6

saveAsTestFile

直接由Executor执行后输出而不会把结果发送到Driver上

  • 作用:将 RDD 保存为文本文件。
  • 语法:rdd.saveAsTextFile(path)
rdd = sc.parallelize([1, 2, 3])
rdd.saveAsTextFile("/path/to/output")

takeSample

  • 作用:从 RDD 中随机抽样,返回一个列表。
  • 语法:rdd.takeSample(withReplacement, num, seed)
#与转换算子sample语法类似,num是抽取元素的个数
rdd = sc.parallelize([1, 2, 3, 4, 5])
sampled = rdd.takeSample(False, 3)
print(sampled)  # 可能输出:[2, 4, 5]

foreach

直接由Executor执行后输出而不会把结果发送到Driver上

  • 作用:对 RDD 中的每个元素执行一个操作(通常用于副作用,如写入数据库)。
  • 语法:rdd.foreach(function)
rdd = sc.parallelize([1, 2, 3])
rdd.foreach(lambda x: print(x))  # 输出:1 2 3

mean

mean 是一个用于计算 RDD 平均值的算子。它可以方便地对 RDD 中的数值进行平均计算,避免手动计算总和和计数后再求平均的步骤。

  • 用法:mean 是 RDD 的一个行动算子,它会触发 Spark 作业执行,并返回 RDD 中元素的平均值。适用于包含数值的 RDD,不适用于键值对 RDD。
rdd = sc.parallelize([1, 2, 3, 4, 5])
average = rdd.mean()
print("平均值:", average)  # 输出: 平均值: 3.0

应用场景

  • 数值型 RDD:可以直接对一个包含数值的 RDD 使用 mean,例如计算一列数据的平均值。
  • 组合键值对 RDD:如果是键值对 RDD (key, value),需要先用 map 提取值部分,或者使用 mapValues 之后计算每个键的平均值。
  • 如果 RDD 中包含浮点数,mean 的结果也会保留浮点精度,否则返回整数平均值
    在键值对 RDD 中,不能直接使用 mean。可以使用 mapValues 转换后计算平均值:
rdd = sc.parallelize([('math', 80), ('math', 90), ('science', 85), ('science', 75)])
# 将键值对 RDD 转化为每个科目的平均分
average_scores = rdd.mapValues(lambda x: (x, 1)) \
                    .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1])) \
                    .mapValues(lambda x: x[0] / x[1])

print(average_scores.collect())
# 结果: [('math', 85.0), ('science', 80.0)]

面试题:groupByKey vs reduceByKey

在实现分组聚合功能中,groupByKey与reduceBykey有较大的性能差距。
功能区别

groupByKey仅仅有分组功能。
reduceByKey除了有分组功能外,还有reduce聚合功能,是一个分组+聚合一体化的算子。

如果对数据执行分组+聚合,那么两个算子性能差别较大

groupByKey

groupByKey

如图,groupByKey+聚合逻辑的执行流程,groupByKey只能分组,所以执行上是先分组(shuffle)后聚合

reduceByKey

先在分区内做预聚合
然后再走分组流程(shuffle)
分组后再做最终聚合

reduceByKey分组聚合流程

reduceByKey 只传输预聚合后的局部结果,而不是所有的原始数据,因此在 Shuffle 阶段需要传输的数据量更少,减少网络 I/O,相比于 groupByKey 更高效。
在需要聚合的场景中,reduceByKey 通常优于 groupByKey。

RDD持久化

RDD的数据是过程数据

RDD之间的迭代,当新的RDD生成时,旧的RDD就会消失。
RDD的数据是过程数据,只在处理的过程中存在,处理完成就消失。
老旧RDD使用后,就从内存中清理掉,为后续的计算腾出空间。
RDD的数据是过程数据
RDD3第一次使用后就会消失,第二次使用基于血缘关系从RDD1重新执行构建出RDD3让RDD5使用。
也就是说,蓝色部分的RDD转换会被执行两次。

RDD的缓存

上图中,若RDD3不会消失,则无需执行两次蓝色部分,因此Spark为我们提供了缓存API,可以将制定的数据保留在内存或者硬盘上。

缓存相关API

rdd.cache() #缓存到内存中
rdd.persist(StorageLevel.MEMORY_ONLY)       # 仅内存缓存
rdd.persist(StorageLevel.MEMORY_ONLY_2)     # 仅内存缓存,2 个副本
rdd.persist(StorageLevel.DISK_ONLY)         # 仅缓存硬盘上
rdd.persist(StorageLevel.DISK_ONLY_2)       # 仅缓存硬盘上,2 个副本
rdd.persist(StorageLevel.DISK_ONLY_3)       # 仅缓存硬盘上,3 个副本
rdd.persist(StorageLevel.MEMORY_AND_DISK)   # 先放内存,不够放硬盘
rdd.persist(StorageLevel.MEMORY_AND_DISK_2) # 先放内存,不够放硬盘,2 个副本
rdd.persist(StorageLevel.OFF_HEAP)          # 堆外内存(系统内存)
# 一般建议使用 rdd.persist(StorageLevel.MEMORY_AND_DISK)
# 如果内存存任务的集群,建议使用 rdd3.persist(StorageLevel.DISK_ONLY) 或者就别用缓存了 用 CheckPoint
# 主动清理缓存的 API
rdd3.unpersist()
import time

from pyspark import SparkConf,SparkContext
import os
if __name__ == '__main__':
    os.environ['PYSPARK_PYTHON'] = "D:\dev\python\python3.11.4\python.exe"
    conf=SparkConf().setAppName("cache").setMaster("local[*]")
    sc=SparkContext(conf=conf)
    nums=[1,2,3]
    rdd1=sc.parallelize(nums)
    rdd2=rdd1.map(lambda x:10)
    # rdd2.cache()
    rdd3=rdd2.map(lambda x:x+1)
    print(rdd3.collect())
    rdd4=rdd2.map(lambda x:x-1)
    print(rdd4.collect())
    time.sleep(50000000)

没有缓存的DAG图

上图即为没有对rdd2进行缓存的情况下,rdd4的DAG图,可以看到,是从parallelize开始,也就是重新获取RDD1后执行操作。

import time

from pyspark import SparkConf,SparkContext
from pyspark.storagelevel import StorageLevel
import os
if __name__ == '__main__':
    os.environ['PYSPARK_PYTHON'] = "D:\dev\python\python3.11.4\python.exe"
    conf=SparkConf().setAppName("cache").setMaster("local[*]")
    sc=SparkContext(conf=conf)
    nums=[1,2,3]
    rdd1=sc.parallelize(nums)
    rdd2=rdd1.map(lambda x:10)
    # 启用缓存
    rdd2.cache()
    rdd3=rdd2.map(lambda x:x+1)
    print(rdd3.collect())
    rdd4=rdd2.map(lambda x:x-1)
    print(rdd4.collect())
    # 释放缓存
    rdd2.unpersist()

    time.sleep(50000000)

使用了缓存的DAG图

将rdd2.cache()注释解开重新运行程序,则rdd4的DAG图多了一个绿点,绿色的点实际上就是已经缓存过的rdd2,rdd4任务是从绿色点之后开始执行。

缓存原理

缓存可以将过程RDD数据,持久化到内存或者硬盘上
这个保存在设定上是认为不安全的
缓存的数据在设计上是认为有丢失风险的
因此,缓存的特点就是:保留被缓存的RDD的血缘关系,分散存储
一旦缓存丢失,则基于血缘关系重新计算这个RDD的数据。

分散存储

RDD的CheckPoint

CheckPoint技术,将RDD的数据保存起来,仅支持硬盘存储

  • 被设计认为是安全的

  • 不保留血缘关系

CheckPoint仅支持硬盘存储,因此可以把数据存储到hdfs文件系统中
CheckPoint保存数据

checkpoint集中收集各个分区数据进行存储,而缓存是分散存储

checkpoint 对比 缓存

  • CheckPoint无论分区数量是多少,风险一样。而缓存,分区越多,风险越高(分区多,缓存各自存各自的,失去一份,整个缓存失效)。
  • CheckPoint支持写入HDFS,缓存不行,HDFS是高可靠存储,因此CheckPoint被认为是安全的。
  • CheckPoint不支持内存,缓存若写内存性能比CheckPoint好。
  • CheckPoint设计认为是安全的,所以不保留血缘关系,而缓存设计上认为不安全,保留血缘关系
#设置CheckPoint,首先要选择保存路径
#Local模式下可以使用本地文件系统,集群可以使用hdfs系统
sc.setCheckpointDir("file:///D:/CheckPoint")
#使用checkpoint,直接调用即可
rdd.checkpoint()

从checkpoint的Rdd开始的DAG图

由图可知,起点是checkpoint,证明没有保留血缘关系。

总结

Cache和CheckPoint区别

Cache 是轻量化保存 RDD 数据,可存储在内存和硬盘,是分散存储,设计上数据是不安全的(保留 RDD 血缘关系)。
Checkpoint 是重置级保存 RDD 数据,是集中存储,只能存储在硬盘(HDFS)上,设计上是安全的(不保留 RDD 血缘关系)。

Cache和CheckPoint性能对比

Cache 性能更好,因为是分散存储,各个 Executor 并行执行,效率高,可以保存到内存中(占内存),更快。
Checkpoint 比较慢,因为是集中存储,涉及到网络 IO,但存储到 HDFS 上更加安全(多副本)。

动物装饰