Spark RDD
本文介绍 RDD,RDD是 Spark 的基石,是实现 Spark 数据处理的核心抽象。
什么是RDD ?
RDD(Resilient Distributed Dataset)是 Spark 中的核心概念,它是一个容错、可以并行执行的分布式数据集。
- 一个分区的列表
- 一个计算函数compute,对每个分区进行计算
- 对其他RDDs的依赖(宽依赖、窄依赖)列表
- 对key-value RDDs来说,存在一个分区器(Partitioner)【可选的】
- 对每个分区有一个优先位置的列表【可选的】
- 一组分片(Partition),即数据集的基本组成单位。对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值;
- 一个对分区数据进行计算的函数。Spark中RDD的计算是以分片为单位的,每个RDD都会实现 compute 函数以达到该目的。compute函数会对迭代器进行组合,不需要保存每次计算的结果;
- RDD之间的存在依赖关系。RDD的每次转换都会生成一个新的RDD,RDD之间形成类似于流水线一样的前后依赖关系(lineage)。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算;
- 对于 key-value 的RDD而言,可能存在分区器(Partitioner)。Spark 实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,另外一个是基于范围的RangePartitioner。只有 key-value 的RDD,才可能有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函数决定了RDD本身的分片数量也决定了parent RDD Shuffle输出时的分片数量;
- 一个列表,存储存储每个Partition的优先位置(preferred location)。对于一个HDFS文件来说,这个列表保存的就是每个Partition所在的块的位置。按照“移动数据不移动计算”的理念,Spark在任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。
RDD 的特点
1. 分区
RDD逻辑上是分区的,每个分区的数据是抽象存在的,计算的时候会通过一个 compute 函数得到每个分区的数据。如果RDD是通过已有的文件系统构建,则compute函数是读取指定文件系统中的数据,如果RDD是通过其他RDD转换而来,则compute函数是执行转换逻辑将其他RDD的数据进行转换。
2. 只读
RDD是只读的,要想改变RDD中的数据,只能在现有的RDD基础上创建新的RDD;
一个RDD转换为另一个RDD,通过丰富的操作算子(map、filter、union、join、reduceByKey… …)实现,不再像MR那样只能写map和reduce了。
RDD的操作算子包括两类:
- ransformation: 用来对RDD进行转化,延迟执行(Lazy);
- action: 用来触发RDD的计算;得到相关计算结果或者将RDD保存的文件系统中;
3. 依赖
RDDs通过操作算子进行转换,转换得到的新RDD包含了从其他RDDs衍生所必需的信息,RDDs之间维护着这种血缘关系(lineage),也称之为依赖。依赖包括两种:
- 窄依赖。RDDs之间分区是一一对应的(1:1 或 n:1)
- 宽依赖。子RDD每个分区与父RDD的每个分区都有关,是多对多的关系(即 n:m)。有shuffle发生
4. 缓存
可以控制存储级别(内存、磁盘等)来进行缓存。
如果在应用程序中多次使用同一个RDD,可以将该RDD缓存起来,该RDD只有在第一次计算的时候会根据血缘关系得到分区的数据,在后续其他地方用到该RDD的时候,会直接从缓存处取 而不用再根据血缘关系计算,这样就加速后期得到分区的数据,在后续其他地方用到该RDD的时候,会直接从缓存处取而不用再根据血缘关系计算,这样就加速后期的重用。
5.checkpoint
虽然RDD的血缘关系天然地可以实现容错,当RDD的某个分区数据失败或丢失,可以通过血缘关系重建。
但是于长时间迭代型应用来说,随着迭代的进行,RDDs之间的血缘关系会越来越长,一旦在后续迭代过程中出错,则需要通过非常长的血缘关系去重建,势必影响性能。
RDD支持 checkpoint 将数据保存到持久化的存储中,这样就可以切断之前的血缘关系,因为checkpoint后的RDD不需要知道它的父RDDs了,它可以从 checkpoint 处拿到数据。
RDD 算子都有哪些?
RDD的操作算子分为两类:
- Transformation。用来对RDD进行转化,这个操作时延迟执行的(或者说是 Lazy 的);
- Action。用来触发RDD的计算;得到相关计算结果 或者 将结果保存的外部系统中;
算子 https://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations
Transformation
算子名称 | 依赖关系 | 说明 |
---|---|---|
map | 窄依赖 | 对原RDD中每个元素运用func函数,并生成新的RDD |
filter | 窄依赖 | 对原RDD中每个元素使用func函数进行过滤,并生成新的RDD |
flatMap | 窄依赖 | 与map类似,但每一个输入的item被映射成0个或多个输出的items(func返回类型需要为Seq) |
mapPartitions | 窄依赖 | 与map类似,但函数单独在RDD的每个分区上运行,func函数的类型为Iterator<T> => Iterator<U> |
mapPartitionsWithIndex | 窄依赖 | 与mapPartitions类似,但func类型为(Int, Iterator<T>) => Iterator<U> ,其中第一个参数为分区索引 |
sample | 窄依赖 | 数据采样,有三个可选参数:设置是否放回(withReplacement)、采样的百分比(fraction)、随机数生成器的种子(seed) |
union | 窄依赖 | 合并两个RDD |
intersection | 窄依赖 | 求两个RDD的交集 |
distinct | 宽依赖 | 去重 |
Action
算子名称 | 依赖关系 | 说明 |
---|---|---|
reduce | 宽依赖 | 先聚合分区内的数据,再聚合分区间的数据,最终返回一个结果 |
collect | 宽依赖 | 在驱动程序中,以数组Array的形式返回数据集的所有元素,数据按照分区编号有序返回 |
count | 宽依赖 | 计算RDD中元素的数量,先在每个分区内进行计数,然后将结果相加得到总数 |
first | 宽依赖 | 返回数据集中的第一个元素,如果数据集为空,则抛出异常 |
take(num) | 宽依赖 | 返回数据集中的前num个元素,结果以数组形式返回 |
takeOrdered(num, key) | 宽依赖 | 返回按照指定key排序后的前num个元素,结果以数组形式返回 |
sum | 宽依赖(针对数值类型) | 计算数据集中所有元素的和,先在每个分区内进行局部聚合,然后进行全局聚合 |
fold(zeroValue)(op) | 宽依赖 | 使用指定的初始值zeroValue和二元操作op,先局部聚合再全局聚合,返回最终结果 |
aggregate(zeroValue)(seqOp, combOp) | 宽依赖 | 先使用seqOp对每个分区内的数据进行聚合,再使用combOp对分区间的结果进行聚合 |
map 与 mapPartitions 的区别?
- map:每次处理一条数据
- mapPartitions:每次处理一个分区的数据,分区的数据处理完成后,数据才能释放,资源不足时容易 OOM
最佳实践:当内存资源充足时,建议使用mapPartitions,以提高处理效率
map和flatmap的区别?
Apache Spark中的map
和flatMap
都是转换操作(transformations),它们用于对RDD(弹性分布式数据集)中的每个元素应用一个函数,并返回一个新的RDD。它们的主要区别在于它们如何处理函数的输出:
-
map:
map
操作接受一个函数作为参数,该函数会被应用于RDD中的每个元素。- 这个函数的输出是一个单一的值,即对于RDD中的每个元素,
map
操作会生成一个对应的输出元素。 - 结果RDD中的元素数量与原始RDD中的元素数量相同,除非应用的函数返回了
null
,这种情况下对应的元素会被忽略。
例如,如果你有一个RDD包含数字,并且你想将每个数字乘以2,你可以使用
map
操作:val rdd = sc.parallelize(List(1, 2, 3))
val doubled = rdd.map(x => x * 2) -
flatMap:
flatMap
操作也接受一个函数作为参数,该函数会被应用于RDD中的每个元素。- 不同的是,这个函数可以返回一个序列(如列表、数组等),
flatMap
会将这个序列中的所有元素都添加到结果RDD 中。 - 这意味着对于RDD中的每个元素,
flatMap
操作可能会生成多个输出元素,因此结果RDD中的元素数量可能会比原始RDD中的元素数量多。
例如,如果你有一个RDD包含字符串,并且你想将每个字符串拆分成单词,你可以使用
flatMap
操作:val rdd = sc.parallelize(List("hello world", "spark is fun"))
val words = rdd.flatMap(_.split(" "))
在性能方面,flatMap
可能会导致数据量显著增加,因为它可能会将每个输入元素映射成多个输出元素,这可能会导致后续操作的数据倾斜问题。因此,在使用flatMap
时需要谨慎,以避免不必要的资源消耗和性能问题。
Spark的cache和persist的区别?它们是transformaiton算子还是action算子?
在Apache Spark中,cache
和persist
都是用来将RDD持久化到内存中的方法,以便后续的多次计算可以重用这些数据,从而提高性能。它们的主要区别在于持久化级别和使用场景:
-
cache:
cache
是persist
的一个特例,它使用默认的存储级别StorageLevel.MEMORY_ONLY
,这意味着它会将数据以反序列化的Java对象的形式存储在内存中。- 如果内存不足以存储所有数据,
cache
会将部分数据存储到磁盘上。 cache
通常用于那些会被多次使用的RDD,因为它会自动处理数据的缓存和失效。
-
persist:
persist
允许你指定一个存储级别,这个存储级别定义了数据应该如何被存储(例如,仅在内存中,仅在磁盘上,或者两者的组合)。- 你可以使用不同的
StorageLevel
枚举值来控制数据的存储方式,比如MEMORY_ONLY
,MEMORY_AND_DISK
,DISK_ONLY
等。 persist
提供了更细粒度的控制,允许你根据数据的大小和使用情况来选择最合适的存储级别。
关于它们是transformation算子还是action算子的问题:
- 它们既不是transformation算子也不是action算子。
cache
和persist
实际上是持久化操作,它们不会立即触发计算,而是告诉Spark将数据保留在内存或磁盘中,以便后续的action操作(如count
,collect
,take
等)可以更快地访问这些数据。- 当你调用
cache
或persist
时,Spark不会立即执行任何计算,只有当你执行一个action 操作时,Spark才会根据需要缓存或持久化数据。
总结来说,cache
和persist
是用于优化Spark应用程序性能的工具,它们通过减少数据的重复计算来提高效率。选择使用cache
还是persist
取决于你的具体需求,比如数据的大小、使用频率以及可用的内存和存储资源。
RDD 的缓存级别
在Apache Spark中,RDD的缓存级别决定了数据在被持久化时存储的位置和复制的策略。这些级别通过StorageLevel
枚举来定义,它们影响数据存储的方式,包括是否存储在内存中、是否序列化、是否复制到磁盘等。以下是StorageLevel
中定义的一些常见缓存级别:
-
MEMORY_ONLY:
- 仅存储数据在内存中,不序列化。如果内存不足,数据将丢失而不是写入磁盘。
-
MEMORY_AND_DISK:
- 优先存储数据在内存中,如果内存不足,溢出到磁盘。数据在内存中保持未序列化状态。
-
MEMORY_ONLY_SER:
- 仅存储数据在内存中,但数据会被序列化。序列化可以减少内存的使用,但会增加CPU的开销。
-
MEMORY_AND_DISK_SER:
- 优先存储数据在内存中,如果内存不足,溢出到磁盘。数据在内存中保持序列化状态。
-
DISK_ONLY:
- 仅存储数据在磁盘上,不存储在内存中。
-
MEMORY_ONLY_2,MEMORY_AND_DISK_2,MEMORY_ONLY_SER_2,MEMORY_AND_DISK_SER_2:
- 这些级别与上述级别类似,但是数据会被复制到两个节点上,提供高可用性。
-
OFF_HEAP(实验性):
- 存储数据在非堆内存中,适用于Tungsten执行引擎。
选择缓存级别时,需要考虑以下几点:
-
内存与磁盘:如果数据集可以完全放入内存中,使用
MEMORY_ONLY
或MEMORY_ONLY_SER
可以提供最快的访问速度。如果数据集太大,需要考虑MEMORY_AND_DISK
或MEMORY_AND_DISK_SER
以避免内存不足。 -
序列化:序列化可以减少内存的使用,但是会增加CPU的开销。如果内存资源紧张,可以考虑使用序列化的存储级别。
-
数据丢失:如果数据非常重要,不能丢失,那么应该避免使用
MEMORY_ONLY
级别的存储,因为它在内存不足时不会将数据写入磁盘。 -
高可用性:如果需要高可用性,可以考虑使用带有复制的存储级别,如
MEMORY_ONLY_2
。
在实际应用中,选择最合适的缓存级别取决于具体的应用场景和资源限制。通常,MEMORY_AND_DISK
是一个比较平衡的选择,因为它结合了内存的速度和磁盘的持久性。
如何优化RDD的缓存策略以提高性能?
优化RDD的缓存策略以提高性能,可以考虑以下几个方面:
-
选择合适的存储级别:
- 如果内存足够大, 可以完全存放下整个RDD的数据,并且没有发生内存溢出,那么使用
MEMORY_ONLY
级别,因为它可以最大程度提高CPU效率,使RDD上的操作以最快速度运行。 - 如果内存不足以存放RDD,考虑使用
MEMORY_ONLY_SER
级别,通过序列化对象来节省内存空间,虽然增加了序列化和反序列化的开销,但总体性能仍然较高。 - 对于大数据量,如果纯内存级别都无法使用,建议使用
MEMORY_AND_DISK_SER
策略,而不是MEMORY_AND_DISK
策略,因为序列化后的数据占用空间更小。
- 如果内存足够大, 可以完全存放下整个RDD的数据,并且没有发生内存溢出,那么使用
-
避免数据倾斜:
- 数据倾斜是导致性能问题的一个常见原因。可以通过使用
reduceByKey
或groupByKey
的变体,如combineByKey
,来减轻数据倾斜。 - 使用
repartition
或coalesce
来重新分区RDD,以平衡数据分布。
- 数据倾斜是导致性能问题的一个常见原因。可以通过使用
-
合理设置分区数:
- RDD的分区数决定了并行度和性能。根据数据规模和集群资源,选择合理的分区数是非常重要的。
-
使用持久化来避免重复计算:
- 通过
persist
方法将RDD或DataFrame缓存到内存中,可以减少重复计算的开销,尤其对于多次使用同一个RDD的情况非常有用。
- 通过
-
数据压缩:
- Spark支持在计算任务中对数据进行压缩,可以降低数据传输和存储的成本,并提高计算性能。选择最适合数据集和计算任务的压缩格式,并将其配置为Spark的默认压缩格式。
-
监控和调优:
- 监控Spark作业的性能指标,及时发现和解决性能问题。不断学习和研究Spark的性能优化技巧,了解最新的优化技术和最佳实践。
-
使用高效的算法和操作:
- Spark提供了很多高效的算法和操作,包括缓存、广播、过滤、聚合等。使用这些高效的算法和操作可以显著提高Spark的性能。
通过上述措施,可以有效地优化RDD的缓存策略,提高Spark应用程序的性能。
reduceByKey和groupByKey的区别和作用?
在Apache Spark中,reduceByKey
和groupByKey
都是对键值对(pair)RDD进行操作的转换算子,但它们在处理数据的方式和性能上存在一些差异。以下是它们的区别和作用:
-
groupByKey:
groupByKey
将所有具有相同键的值收集到一个列表中,形成一个(键,值列表)的对。- 它会返回一个新的RDD,其中每个键都映射到一个包含所有对应值的集合。
groupByKey
是一个宽依赖操作,因为它需要将所有具有相同键的值从不同分区中收集到一起,这可能会导致大量的数据传输和潜在的性能问题。- 通常,
groupByKey
在数据量较大时会消耗更多的内存和网络资源,因为它需要将所有值收集到一个集合中。
例如,如果你有一个RDD包含(键,值)对,并且你想将所有具有相同键的值收集到一个列表中,你可以使用
groupByKey
:val rdd = sc.parallelize(List((1, "a"), (1, "b"), (2, "c"), (2, "d"), (3, "e")))
val grouped = rdd.groupByKey() -
reduceByKey:
reduceByKey
对每个键的所有值应用一个二元函数(即函数接受两个参数),并将结果合并成一个值。- 它会返回一个新的RDD,其中每个键都映射到一个值,这个值是通过将所有对应值应用给定函数得到的。
reduceByKey
是一个窄依赖操作,因为它只需要在每个分区内部对值进行合并,然后将合并后的结果在不同分区之间进行合并。reduceByKey
通常比groupByKey
更高效,因为它减少了数据的传输和内存的使用,特别是在处理大量数据时。
例如,如果你有一个RDD包含(键,值)对,并且你想将所有具有相同键的值相加,你可以使用
reduceByKey
:val rdd = sc.parallelize(List((1, 2), (1, 3), (2, 4), (2, 5), (3, 6)))
val reduced = rdd.reduceByKey(_ + _)
在选择使用groupByKey
还是reduceByKey
时,需要考虑数据的大小和计算的复杂性。如果每个键对应的值的数量很大,使用groupByKey
可能会导致内存不足或网络拥塞,而reduceByKey
则可以更有效地处理这种情况。此外,如果计算逻辑可以表示为一个二元函数,那么reduceByKey
通常是更好的选择。
reduceByKey和reduce的区别?
在Apache Spark中,reduceByKey
和reduce
是两种不同的转换操作,它们在处理数据的方式和适用场景上存在一些差异。以下是它们的区别:
-
reduceByKey:
reduceByKey
是针对键值对(pair)RDD的操作,它对每个键的所有值应用一个二元函数,并将结果合并成一个值。reduceByKey
会返回一个新的RDD,其中每个键都映射到一个值,这个值是通过将所有对应值应用给定函数得到的。reduceByKey
是一个窄依赖操作,因为它只需要在每个分区内部对值进行合并,然后将合并后的结果在不同分区之间进行合并。reduceByKey
通常比reduce
更高效,因为它减少了数据的传输和内存的使用,特别是在处理大量数据时。
例如,如果你有一个RDD包含(键,值)对,并且你想将所有具有相同键的值相加,你可以使用
reduceByKey
:val rdd = sc.parallelize(List((1, 2), (1, 3), (2, 4), (2, 5), (3, 6)))
val reduced = rdd.reduceByKey(_ + _) -
reduce:
reduce
是针对任何类型的RDD的操作,它对RDD中的所有元素应用一个二元函数,并将结果合并成一个值。reduce
会返回一个单一的值,这个值是通过将所有元素应用给定函数得到的。reduce
是一个宽依赖操作,因为它需要将所有元素从不同分区中收集到一起,这可能会导致大量的数据传输和潜在的性能问题。reduce
通常用于计算整个数据集的聚合值,如总和、最大值、最小值等。
例如,如果你有一个RDD包含数字,并且你想计算所有数字的总和,你可以使用
reduce
:val rdd = sc.parallelize(List(1, 2, 3, 4, 5))
val sum = rdd.reduce(_ + _)
在选择使用reduceByKey
还是reduce
时,需要考虑数据的类型和计算的目标。如果数据是键值对,并且你想对每个键的值进行合并,那么reduceByKey
是更好的选择。如果数据是单一类型的元素,并且你想计算整个数据集的聚合值,那么reduce
是更好的选择。
使用reduceByKey出现数据倾斜怎么办?
在使用Spark的reduceByKey
操作时,如果出现数据倾斜问题,可以采取以下几种解决方案:
-
采样倾斜key并分拆join操作:
- 对于由单个key导致的数据倾斜,可以将这个key单独提取出来,组成一个RDD,然后与其他RDD单独进行join操作。这样,原本会导致倾斜的key对应的数据会在shuffle阶段被分散到多个task中去进行join操作。
-
使用随机数扩容进行join:
- 如果在进行join操作时,RDD中有大量的key导致数据倾斜,可以考虑对其中一个RDD数据进行扩容,另一个RDD进行稀释后再进行join。
-
两阶段聚合(局部聚合+全局聚合):
- 这种方法适用于聚合类的shuffle操作导致的数据倾斜。首先,给倾斜的dataSkewRDD中每个key都打上一个随机前缀,进行局部聚合,然后去除随机前缀,再次进行全局聚合。
-
提高shuffle操作的并行度:
- 例如,使用
groupByKey(100)
来提高并行度,相当于增加了reduce task的数量,每一个reduce task中分区的数据量会减少,可以缓解数据倾斜。
- 例如,使用
-
过滤少数导致倾斜的key:
- 如果数据中有很多null或无效数据,可以在shuffle之前过滤掉这些数据。
-
将reduce join转为map join:
- 在对RDD使用join类操作,或者是在Spark SQL中使用join语句时,如果join操作中的一个RDD或表的数据量比较小,可以考虑将reduce join转为map join。
-
对key添加随机前缀:
- 将原始的key转化为随机值+key,进行reduceByKey操作,然后将随机值+key转成key,再进行reduceByKey操作。
这些方法可以帮助减轻或解决在使用reduceByKey
时出现的数据倾斜问题,提升Spark作业的性能。