Spark RDD
本文介绍 RDD,RDD是 Spark 的基石,是实现 Spark 数据处理的核心抽象。
什么是RDD ?
RDD(Resilient Distributed Dataset)是 Spark 中的核心概念,它是一个容错、可以并行执行的分布式数据集。
- 一个分区的列表
- 一个计算函数compute,对每个分区进行计算
- 对其他RDDs的依赖(宽依赖、窄依赖)列表
- 对key-value RDDs来说,存在一个分区器(Partitioner)【可选的】
- 对每个分区有一个优先位置的列表【可选的】
- 一组分片(Partition),即数据集的基本组成单位。对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值;
- 一个对分区数据进行计算的函数。Spark中RDD的计算是以分片为单位的,每个RDD都会实现 compute 函数以达到该目的。compute函数会对迭代器进行组合,不需要保存每次计算的结果;
- RDD之间的存在依赖关系。RDD的每次转换都会生成一个新的RDD,RDD之间形成类似于流水线一样的前后依赖关系(lineage)。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算;
- 对于 key-value 的RDD而言,可能存在分区器(Partitioner)。Spark 实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,另外一个是基于范围的RangePartitioner。只有 key-value 的RDD,才可能有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函数决定了RDD本身的分片数量也决定了parent RDD Shuffle输出时 的分片数量;
- 一个列表,存储存储每个Partition的优先位置(preferred location)。对于一个HDFS文件来说,这个列表保存的就是每个Partition所在的块的位置。按照“移动数据不移动计算”的理念,Spark在任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。
RDD 的特点
1. 分区
RDD逻辑上是分区的,每个分区的数据是抽象存在的,计算的时候会通过一个 compute 函数得到每个分区的数据。如果RDD是通过已有的文件系统构建,则compute函数是读取指定文件系统中的数据,如果RDD是通过其他RDD转换而来,则compute函数是执行转换逻辑将其他RDD的数据进行转换。
2. 只读
RDD是只读的,要想改变RDD中的数据,只能在现有的RDD基础上创建新的RDD;
一个RDD转换为另一个RDD,通过丰富的操作算子(map、filter、union、join、reduceByKey… …)实现,不再像MR那样只能写map和reduce了。
RDD的操作算子包括两类:
- ransformation: 用来对RDD进行转化,延迟执行(Lazy);
- action: 用来触发RDD的计算;得到相关计算结果或者将RDD保存的文件系统中;
3. 依赖
RDDs通过操作算子进行转换,转换得到的新RDD包含了从其他RDDs衍生所必需的信息,RDDs之间维护着这种血缘关系(lineage),也称之为依赖。依赖包括两种:
- 窄依赖。RDDs之间分区是一一对应的(1:1 或 n:1)
- 宽依赖。子RDD每个分区与父RDD的每个分区都有关,是多对多的关系(即 n:m)。有shuffle发生
4. 缓存
可以控制存储级别(内存、磁盘等)来进行缓存。
如果在应用程序中多次使用同一个RDD,可以将该RDD缓存起来,该RDD只有在第一次计算的时候会根据血缘关系得到分区的数据,在后续其他地方用到该RDD的时候,会直接从缓存处取而不用再根据血缘关系计算,这样就加速后期得到分区的数据,在后续其他地方用到该RDD的时候,会直接从缓存处取而不用再根据血缘关系计算,这样就加速后期的重用。
5.checkpoint
虽然RDD的血缘关系天然地可以实现容错,当RDD的某个分区数据失败或丢失,可以通过血缘关系重建。
但是于长时间迭代型应用来说,随着迭代的进行,RDDs之间的血缘关系会越来越长,一旦在后续迭代过程中出错,则需要通过非常长的血缘关系去重建,势必影响性能。
RDD支持 checkpoint 将数据保存到持久化的存储中,这样就可以切断之前的血缘关系,因为checkpoint后的RDD不需要知道它的父RDDs了,它可以从 checkpoint 处拿到数据。
RDD 算子都有哪些?
RDD的操作算子分为两类:
- Transformation。用来对RDD进行转化,这个操作时延迟执行的(或者说是 Lazy 的);
- Action。用来触发RDD的计算;得到相关计算结果 或者 将结果保存的外部系统中;
算子 https://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations
Transformation
算子名称 | 依赖关系 | 说明 |
---|---|---|
map | 窄依赖 | 对原RDD中每个元素运用func函数,并生成新的RDD |
filter | 窄依赖 | 对原RDD中每个元素使用func函数进行过滤,并生成新的RDD |
flatMap | 窄依赖 | 与map类似,但每一个输入的item被映射成0个或多个输出的items(func返回类型需要为Seq) |
mapPartitions | 窄依赖 | 与map类似,但函数单独在RDD的每个分区上运行,func函数的类型为Iterator<T> => Iterator<U> |
mapPartitionsWithIndex | 窄依赖 | 与mapPartitions类似,但func类型为(Int, Iterator<T>) => Iterator<U> ,其中第一个参数为分区索引 |
sample | 窄依赖 | 数据采样,有三个可选参数:设置是否放回(withReplacement)、采样的百分比 (fraction)、随机数生成器的种子(seed) |
union | 窄依赖 | 合并两个RDD |
intersection | 窄依赖 | 求两个RDD的交集 |
distinct | 宽依赖 | 去重 |
Action
算子名称 | 依赖关系 | 说明 |
---|---|---|
reduce | 宽依赖 | 先聚合分区内的数据,再聚合分区间的数据,最终返回一个结果 |
collect | 宽依赖 | 在驱动程序中,以数组Array的形式返回数据集的所有元素,数据按照分区编号有序返回 |
count | 宽依赖 | 计算RDD中元素的数量,先在每个分区内进行计数,然后将结果相加得到总数 |
first | 宽依赖 | 返回数据集中的第一个元素,如果数据集为空,则抛出异常 |
take(num) | 宽依赖 | 返回数据集中的前num个元素,结果以数组形式返回 |
takeOrdered(num, key) | 宽依赖 | 返回按照指定key排序后的前num个元素,结果以数组形式返回 |
sum | 宽依赖(针对数值类型) | 计算数据集中所有元素的和,先在每个分区内进行局部聚合,然后进行全局聚合 |
fold(zeroValue)(op) | 宽依赖 | 使用指定的初始值zeroValue和二元操作op,先局部聚合再全局聚合,返回最终结果 |
aggregate(zeroValue)(seqOp, combOp) | 宽依赖 | 先使用seqOp对每个分区内的数据进行聚合,再使用combOp对分区间的结果进行聚合 |
map 与 mapPartitions 的区别?
- map:每次处理一条数据
- mapPartitions:每次处理一个分区的数据,分区的数据处理完成后,数据才能释放,资源不足时容易 OOM
最佳实践:当内存资源充足时,建议使用mapPartitions,以提高处理效率
map和flatmap的区别?
Apache Spark中的map
和flatMap
都是转换操作(transformations),它们用于对RDD(弹性分布式数据集)中的每个元素应用一个函数,并返回一个新的RDD。它们的主要区别在于它们如何处理函数的输出:
-
map:
map
操作接受一个函数作为参数,该函数会被应用于RDD中的每个元素。- 这个函数的输出是一个单一的值,即对于RDD中的每个元素,
map
操作会生成一个对应的输出元素。 - 结果RDD中的元素数量与原始RDD中的元素数量相同,除非应用的函数返回了
null
,这种情况下对应的元素会被忽略。
例如,如果你有一个RDD包含数字,并且你想将每个数字乘以2,你可以使用
map
操作:val rdd = sc.parallelize(List(1, 2, 3))
val doubled = rdd.map(x => x * 2) -
flatMap:
flatMap
操作也接受一个函数作为参数,该函数会被应用于RDD中的每个元素。- 不同的是,这个函数可以返回一个序列(如列表、数组等),
flatMap
会将这个序列中的所有元素都添加到结果RDD中。 - 这意味着对于RDD中的每个元素,
flatMap
操作可能会生成多个输出元素,因此结果RDD中的元素数量可能会比原始RDD中的元素数量多。
例如,如果你有一个RDD包含字符串,并且你想将每个字符串拆分成单词,你可以使用
flatMap
操作:val rdd = sc.parallelize(List("hello world", "spark is fun"))
val words = rdd.flatMap(_.split(" "))
在性能方面,flatMap
可能会导致数据量显著增加,因为它可能会将每个输入元素映射成多个输出元素,这可能会导致后续操作的数据倾斜问题。因此,在使用flatMap
时需要谨慎,以避免不必要的资源消耗和性能问题。
Spark的cache和persist的区别?它们是transformaiton算子还是action算子?
在Apache Spark中,cache
和persist
都是用来将RDD持久化到内存中的方法,以便后续的多次计算可以重用这些数据,从而提高性能。它们的主要区别在于持久化级别和使用场景:
-
cache:
cache
是persist
的一个特例,它使用默认的存储级别StorageLevel.MEMORY_ONLY
,这意味着它会将数据以反序列化的Java对象的形式存储在内存中。- 如果内存不足以存储所有数据,
cache
会将部分数据存储到磁盘上。 cache
通常用于那些会被多次使用的RDD,因为它会自动处理数据的缓存和失效。
-
persist:
persist
允许你指定一个存储级别,这个存储级别定义了数据应该如何被存储(例如,仅在内存中,仅在磁盘上,或者两者的组合)。- 你可以使用不同的
StorageLevel
枚举值来控制数据的存储方式,比如MEMORY_ONLY
,MEMORY_AND_DISK
,DISK_ONLY
等。 persist
提供了更细粒度的控制,允许你根据数据的大小和使用情况来选择最合适的存储级别。
关于它们是transformation算子还是action算子的问题:
- 它们既不是transformation算子也不是action算子。
cache
和persist
实际上是持久化操作,它们不会立即触发计算,而是告诉Spark将数据保留在内存或磁盘中,以便后续的action操作(如count
,collect
,take
等)可以更快地访问这些数据。- 当你调用
cache
或persist
时,Spark不会立即执行任何计算,只有当你执行一个action操作时,Spark才会根据需要缓存或持久化数据。
总结来说,cache
和persist
是用于优化Spark应用程序性能的工具,它们通过减少数据的重复计算来提高效率。选择使用cache
还是persist
取决于你的具体需求,比如数据的大小、使用频率以及可用的内存和存储资源。
RDD 的缓存级别
在Apache Spark中,RDD的缓存级别决定了数据在被持久化时存储的位置和复制的策略。这些级别通过StorageLevel
枚举来定义,它们影响数据存储的方式,包括是否存储在内存中、是否序列化、是否复制到磁盘等。以下是StorageLevel
中定义的一些常见缓存级别:
-
MEMORY_ONLY