跳到主要内容

Spark RDD

本文介绍 RDD,RDD是 Spark 的基石,是实现 Spark 数据处理的核心抽象。


什么是RDD ?

RDD(Resilient Distributed Dataset)是 Spark 中的核心概念,它是一个容错、可以并行执行的分布式数据集。

  • 一个分区的列表
  • 一个计算函数compute,对每个分区进行计算
  • 对其他RDDs的依赖(宽依赖、窄依赖)列表
  • 对key-value RDDs来说,存在一个分区器(Partitioner)【可选的】
  • 对每个分区有一个优先位置的列表【可选的】
  • 一组分片(Partition),即数据集的基本组成单位。对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值;
  • 一个对分区数据进行计算的函数。Spark中RDD的计算是以分片为单位的,每个RDD都会实现 compute 函数以达到该目的。compute函数会对迭代器进行组合,不需要保存每次计算的结果;
  • RDD之间的存在依赖关系。RDD的每次转换都会生成一个新的RDD,RDD之间形成类似于流水线一样的前后依赖关系(lineage)。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算;
  • 对于 key-value 的RDD而言,可能存在分区器(Partitioner)。Spark 实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,另外一个是基于范围的RangePartitioner。只有 key-value 的RDD,才可能有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函数决定了RDD本身的分片数量也决定了parent RDD Shuffle输出时的分片数量;
  • 一个列表,存储存储每个Partition的优先位置(preferred location)。对于一个HDFS文件来说,这个列表保存的就是每个Partition所在的块的位置。按照“移动数据不移动计算”的理念,Spark在任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。

RDD 的特点

1. 分区

RDD逻辑上是分区的,每个分区的数据是抽象存在的,计算的时候会通过一个 compute 函数得到每个分区的数据。如果RDD是通过已有的文件系统构建,则compute函数是读取指定文件系统中的数据,如果RDD是通过其他RDD转换而来,则compute函数是执行转换逻辑将其他RDD的数据进行转换。

2. 只读

RDD是只读的,要想改变RDD中的数据,只能在现有的RDD基础上创建新的RDD;

一个RDD转换为另一个RDD,通过丰富的操作算子(map、filter、union、join、reduceByKey… …)实现,不再像MR那样只能写map和reduce了。

RDD的操作算子包括两类:

  • ransformation: 用来对RDD进行转化,延迟执行(Lazy);
  • action: 用来触发RDD的计算;得到相关计算结果或者将RDD保存的文件系统中;

3. 依赖

RDDs通过操作算子进行转换,转换得到的新RDD包含了从其他RDDs衍生所必需的信息,RDDs之间维护着这种血缘关系(lineage),也称之为依赖。依赖包括两种:

  • 窄依赖。RDDs之间分区是一一对应的(1:1 或 n:1)
  • 宽依赖。子RDD每个分区与父RDD的每个分区都有关,是多对多的关系(即 n:m)。有shuffle发生

4. 缓存

可以控制存储级别(内存、磁盘等)来进行缓存。

如果在应用程序中多次使用同一个RDD,可以将该RDD缓存起来,该RDD只有在第一次计算的时候会根据血缘关系得到分区的数据,在后续其他地方用到该RDD的时候,会直接从缓存处取而不用再根据血缘关系计算,这样就加速后期得到分区的数据,在后续其他地方用到该RDD的时候,会直接从缓存处取而不用再根据血缘关系计算,这样就加速后期的重用。

5.checkpoint

虽然RDD的血缘关系天然地可以实现容错,当RDD的某个分区数据失败或丢失,可以通过血缘关系重建。

但是于长时间迭代型应用来说,随着迭代的进行,RDDs之间的血缘关系会越来越长,一旦在后续迭代过程中出错,则需要通过非常长的血缘关系去重建,势必影响性能。

RDD支持 checkpoint 将数据保存到持久化的存储中,这样就可以切断之前的血缘关系,因为checkpoint后的RDD不需要知道它的父RDDs了,它可以从 checkpoint 处拿到数据。

RDD 算子都有哪些?

RDD的操作算子分为两类:

  • Transformation。用来对RDD进行转化,这个操作时延迟执行的(或者说是 Lazy 的);
  • Action。用来触发RDD的计算;得到相关计算结果 或者 将结果保存的外部系统中;

算子 https://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations

Transformation

算子名称依赖关系说明
map窄依赖对原RDD中每个元素运用func函数,并生成新的RDD
filter窄依赖对原RDD中每个元素使用func函数进行过滤,并生成新的RDD
flatMap窄依赖与map类似,但每一个输入的item被映射成0个或多个输出的items(func返回类型需要为Seq)
mapPartitions窄依赖与map类似,但函数单独在RDD的每个分区上运行,func函数的类型为Iterator<T> => Iterator<U>
mapPartitionsWithIndex窄依赖与mapPartitions类似,但func类型为(Int, Iterator<T>) => Iterator<U>,其中第一个参数为分区索引
sample窄依赖数据采样,有三个可选参数:设置是否放回(withReplacement)、采样的百分比(fraction)、随机数生成器的种子(seed)
union窄依赖合并两个RDD
intersection窄依赖求两个RDD的交集
distinct宽依赖去重

Action

算子名称依赖关系说明
reduce宽依赖先聚合分区内的数据,再聚合分区间的数据,最终返回一个结果
collect宽依赖在驱动程序中,以数组Array的形式返回数据集的所有元素,数据按照分区编号有序返回
count宽依赖计算RDD中元素的数量,先在每个分区内进行计数,然后将结果相加得到总数
first宽依赖返回数据集中的第一个元素,如果数据集为空,则抛出异常
take(num)宽依赖返回数据集中的前num个元素,结果以数组形式返回
takeOrdered(num, key)宽依赖返回按照指定key排序后的前num个元素,结果以数组形式返回
sum宽依赖(针对数值类型)计算数据集中所有元素的和,先在每个分区内进行局部聚合,然后进行全局聚合
fold(zeroValue)(op)宽依赖使用指定的初始值zeroValue和二元操作op,先局部聚合再全局聚合,返回最终结果
aggregate(zeroValue)(seqOp, combOp)宽依赖先使用seqOp对每个分区内的数据进行聚合,再使用combOp对分区间的结果进行聚合

map 与 mapPartitions 的区别?

  • map:每次处理一条数据
  • mapPartitions:每次处理一个分区的数据,分区的数据处理完成后,数据才能释放,资源不足时容易 OOM

最佳实践:当内存资源充足时,建议使用mapPartitions,以提高处理效率

map和flatmap的区别?

Apache Spark中的mapflatMap都是转换操作(transformations),它们用于对RDD(弹性分布式数据集)中的每个元素应用一个函数,并返回一个新的RDD。它们的主要区别在于它们如何处理函数的输出:

  1. map

    • map操作接受一个函数作为参数,该函数会被应用于RDD中的每个元素。
    • 这个函数的输出是一个单一的值,即对于RDD中的每个元素,map操作会生成一个对应的输出元素。
    • 结果RDD中的元素数量与原始RDD中的元素数量相同,除非应用的函数返回了null,这种情况下对应的元素会被忽略。

    例如,如果你有一个RDD包含数字,并且你想将每个数字乘以2,你可以使用map操作:

    val rdd = sc.parallelize(List(1, 2, 3))
    val doubled = rdd.map(x => x * 2)
  2. flatMap

    • flatMap操作也接受一个函数作为参数,该函数会被应用于RDD中的每个元素。
    • 不同的是,这个函数可以返回一个序列(如列表、数组等),flatMap会将这个序列中的所有元素都添加到结果RDD中。
    • 这意味着对于RDD中的每个元素,flatMap操作可能会生成多个输出元素,因此结果RDD中的元素数量可能会比原始RDD中的元素数量多。

    例如,如果你有一个RDD包含字符串,并且你想将每个字符串拆分成单词,你可以使用flatMap操作:

    val rdd = sc.parallelize(List("hello world", "spark is fun"))
    val words = rdd.flatMap(_.split(" "))

在性能方面,flatMap可能会导致数据量显著增加,因为它可能会将每个输入元素映射成多个输出元素,这可能会导致后续操作的数据倾斜问题。因此,在使用flatMap时需要谨慎,以避免不必要的资源消耗和性能问题。

Spark的cache和persist的区别?它们是transformaiton算子还是action算子?

在Apache Spark中,cachepersist都是用来将RDD持久化到内存中的方法,以便后续的多次计算可以重用这些数据,从而提高性能。它们的主要区别在于持久化级别和使用场景:

  1. cache

    • cachepersist的一个特例,它使用默认的存储级别StorageLevel.MEMORY_ONLY,这意味着它会将数据以反序列化的Java对象的形式存储在内存中。
    • 如果内存不足以存储所有数据,cache会将部分数据存储到磁盘上。
    • cache通常用于那些会被多次使用的RDD,因为它会自动处理数据的缓存和失效。
  2. persist

    • persist允许你指定一个存储级别,这个存储级别定义了数据应该如何被存储(例如,仅在内存中,仅在磁盘上,或者两者的组合)。
    • 你可以使用不同的StorageLevel枚举值来控制数据的存储方式,比如MEMORY_ONLY, MEMORY_AND_DISK, DISK_ONLY等。
    • persist提供了更细粒度的控制,允许你根据数据的大小和使用情况来选择最合适的存储级别。

关于它们是transformation算子还是action算子的问题:

  • 它们既不是transformation算子也不是action算子。
  • cachepersist实际上是持久化操作,它们不会立即触发计算,而是告诉Spark将数据保留在内存或磁盘中,以便后续的action操作(如count, collect, take等)可以更快地访问这些数据。
  • 当你调用cachepersist时,Spark不会立即执行任何计算,只有当你执行一个action操作时,Spark才会根据需要缓存或持久化数据。

总结来说,cachepersist是用于优化Spark应用程序性能的工具,它们通过减少数据的重复计算来提高效率。选择使用cache还是persist取决于你的具体需求,比如数据的大小、使用频率以及可用的内存和存储资源。

RDD 的缓存级别

在Apache Spark中,RDD的缓存级别决定了数据在被持久化时存储的位置和复制的策略。这些级别通过StorageLevel枚举来定义,它们影响数据存储的方式,包括是否存储在内存中、是否序列化、是否复制到磁盘等。以下是StorageLevel中定义的一些常见缓存级别:

  1. MEMORY_ONLY

    • 仅存储数据在内存中,不序列化。如果内存不足,数据将丢失而不是写入磁盘。
  2. MEMORY_AND_DISK

    • 优先存储数据在内存中,如果内存不足,溢出到磁盘。数据在内存中保持未序列化状态。
  3. MEMORY_ONLY_SER

    • 仅存储数据在内存中,但数据会被序列化。序列化可以减少内存的使用,但会增加CPU的开销。
  4. MEMORY_AND_DISK_SER

    • 优先存储数据在内存中,如果内存不足,溢出到磁盘。数据在内存中保持序列化状态。
  5. DISK_ONLY

    • 仅存储数据在磁盘上,不存储在内存中。
  6. MEMORY_ONLY_2MEMORY_AND_DISK_2MEMORY_ONLY_SER_2MEMORY_AND_DISK_SER_2

    • 这些级别与上述级别类似,但是数据会被复制到两个节点上,提供高可用性。
  7. OFF_HEAP(实验性):

    • 存储数据在非堆内存中,适用于Tungsten执行引擎。

选择缓存级别时,需要考虑以下几点:

  • 内存与磁盘:如果数据集可以完全放入内存中,使用MEMORY_ONLYMEMORY_ONLY_SER可以提供最快的访问速度。如果数据集太大,需要考虑MEMORY_AND_DISKMEMORY_AND_DISK_SER以避免内存不足。

  • 序列化:序列化可以减少内存的使用,但是会增加CPU的开销。如果内存资源紧张,可以考虑使用序列化的存储级别。

  • 数据丢失:如果数据非常重要,不能丢失,那么应该避免使用MEMORY_ONLY级别的存储,因为它在内存不足时不会将数据写入磁盘。

  • 高可用性:如果需要高可用性,可以考虑使用带有复制的存储级别,如MEMORY_ONLY_2

在实际应用中,选择最合适的缓存级别取决于具体的应用场景和资源限制。通常,MEMORY_AND_DISK是一个比较平衡的选择,因为它结合了内存的速度和磁盘的持久性。

如何优化RDD的缓存策略以提高性能?

优化RDD的缓存策略以提高性能,可以考虑以下几个方面:

  1. 选择合适的存储级别

    • 如果内存足够大,可以完全存放下整个RDD的数据,并且没有发生内存溢出,那么使用MEMORY_ONLY级别,因为它可以最大程度提高CPU效率,使RDD上的操作以最快速度运行。
    • 如果内存不足以存放RDD,考虑使用MEMORY_ONLY_SER级别,通过序列化对象来节省内存空间,虽然增加了序列化和反序列化的开销,但总体性能仍然较高。
    • 对于大数据量,如果纯内存级别都无法使用,建议使用MEMORY_AND_DISK_SER策略,而不是MEMORY_AND_DISK策略,因为序列化后的数据占用空间更小。
  2. 避免数据倾斜

    • 数据倾斜是导致性能问题的一个常见原因。可以通过使用reduceByKeygroupByKey的变体,如combineByKey,来减轻数据倾斜。
    • 使用repartitioncoalesce来重新分区RDD,以平衡数据分布。
  3. 合理设置分区数

    • RDD的分区数决定了并行度和性能。根据数据规模和集群资源,选择合理的分区数是非常重要的。
  4. 使用持久化来避免重复计算

    • 通过persist方法将RDD或DataFrame缓存到内存中,可以减少重复计算的开销,尤其对于多次使用同一个RDD的情况非常有用。
  5. 数据压缩

    • Spark支持在计算任务中对数据进行压缩,可以降低数据传输和存储的成本,并提高计算性能。选择最适合数据集和计算任务的压缩格式,并将其配置为Spark的默认压缩格式。
  6. 监控和调优

    • 监控Spark作业的性能指标,及时发现和解决性能问题。不断学习和研究Spark的性能优化技巧,了解最新的优化技术和最佳实践。
  7. 使用高效的算法和操作

    • Spark提供了很多高效的算法和操作,包括缓存、广播、过滤、聚合等。使用这些高效的算法和操作可以显著提高Spark的性能。

通过上述措施,可以有效地优化RDD的缓存策略,提高Spark应用程序的性能。

reduceByKey和groupByKey的区别和作用?

在Apache Spark中,reduceByKeygroupByKey都是对键值对(pair)RDD进行操作的转换算子,但它们在处理数据的方式和性能上存在一些差异。以下是它们的区别和作用:

  1. groupByKey

    • groupByKey将所有具有相同键的值收集到一个列表中,形成一个(键,值列表)的对。
    • 它会返回一个新的RDD,其中每个键都映射到一个包含所有对应值的集合。
    • groupByKey是一个宽依赖操作,因为它需要将所有具有相同键的值从不同分区中收集到一起,这可能会导致大量的数据传输和潜在的性能问题。
    • 通常,groupByKey在数据量较大时会消耗更多的内存和网络资源,因为它需要将所有值收集到一个集合中。

    例如,如果你有一个RDD包含(键,值)对,并且你想将所有具有相同键的值收集到一个列表中,你可以使用groupByKey

    val rdd = sc.parallelize(List((1, "a"), (1, "b"), (2, "c"), (2, "d"), (3, "e")))
    val grouped = rdd.groupByKey()
  2. reduceByKey

    • reduceByKey对每个键的所有值应用一个二元函数(即函数接受两个参数),并将结果合并成一个值。
    • 它会返回一个新的RDD,其中每个键都映射到一个值,这个值是通过将所有对应值应用给定函数得到的。
    • reduceByKey是一个窄依赖操作,因为它只需要在每个分区内部对值进行合并,然后将合并后的结果在不同分区之间进行合并。
    • reduceByKey通常比groupByKey更高效,因为它减少了数据的传输和内存的使用,特别是在处理大量数据时。

    例如,如果你有一个RDD包含(键,值)对,并且你想将所有具有相同键的值相加,你可以使用reduceByKey

    val rdd = sc.parallelize(List((1, 2), (1, 3), (2, 4), (2, 5), (3, 6)))
    val reduced = rdd.reduceByKey(_ + _)

在选择使用groupByKey还是reduceByKey时,需要考虑数据的大小和计算的复杂性。如果每个键对应的值的数量很大,使用groupByKey可能会导致内存不足或网络拥塞,而reduceByKey则可以更有效地处理这种情况。此外,如果计算逻辑可以表示为一个二元函数,那么reduceByKey通常是更好的选择。

reduceByKey和reduce的区别?

在Apache Spark中,reduceByKeyreduce是两种不同的转换操作,它们在处理数据的方式和适用场景上存在一些差异。以下是它们的区别:

  1. reduceByKey

    • reduceByKey是针对键值对(pair)RDD的操作,它对每个键的所有值应用一个二元函数,并将结果合并成一个值。
    • reduceByKey会返回一个新的RDD,其中每个键都映射到一个值,这个值是通过将所有对应值应用给定函数得到的。
    • reduceByKey是一个窄依赖操作,因为它只需要在每个分区内部对值进行合并,然后将合并后的结果在不同分区之间进行合并。
    • reduceByKey通常比reduce更高效,因为它减少了数据的传输和内存的使用,特别是在处理大量数据时。

    例如,如果你有一个RDD包含(键,值)对,并且你想将所有具有相同键的值相加,你可以使用reduceByKey

    val rdd = sc.parallelize(List((1, 2), (1, 3), (2, 4), (2, 5), (3, 6)))
    val reduced = rdd.reduceByKey(_ + _)
  2. reduce

    • reduce是针对任何类型的RDD的操作,它对RDD中的所有元素应用一个二元函数,并将结果合并成一个值。
    • reduce会返回一个单一的值,这个值是通过将所有元素应用给定函数得到的。
    • reduce是一个宽依赖操作,因为它需要将所有元素从不同分区中收集到一起,这可能会导致大量的数据传输和潜在的性能问题。
    • reduce通常用于计算整个数据集的聚合值,如总和、最大值、最小值等。

    例如,如果你有一个RDD包含数字,并且你想计算所有数字的总和,你可以使用reduce

    val rdd = sc.parallelize(List(1, 2, 3, 4, 5))
    val sum = rdd.reduce(_ + _)

在选择使用reduceByKey还是reduce时,需要考虑数据的类型和计算的目标。如果数据是键值对,并且你想对每个键的值进行合并,那么reduceByKey是更好的选择。如果数据是单一类型的元素,并且你想计算整个数据集的聚合值,那么reduce是更好的选择。

使用reduceByKey出现数据倾斜怎么办?

在使用Spark的reduceByKey操作时,如果出现数据倾斜问题,可以采取以下几种解决方案:

  1. 采样倾斜key并分拆join操作

    • 对于由单个key导致的数据倾斜,可以将这个key单独提取出来,组成一个RDD,然后与其他RDD单独进行join操作。这样,原本会导致倾斜的key对应的数据会在shuffle阶段被分散到多个task中去进行join操作。
  2. 使用随机数扩容进行join

    • 如果在进行join操作时,RDD中有大量的key导致数据倾斜,可以考虑对其中一个RDD数据进行扩容,另一个RDD进行稀释后再进行join。
  3. 两阶段聚合(局部聚合+全局聚合)

    • 这种方法适用于聚合类的shuffle操作导致的数据倾斜。首先,给倾斜的dataSkewRDD中每个key都打上一个随机前缀,进行局部聚合,然后去除随机前缀,再次进行全局聚合。
  4. 提高shuffle操作的并行度

    • 例如,使用groupByKey(100)来提高并行度,相当于增加了reduce task的数量,每一个reduce task中分区的数据量会减少,可以缓解数据倾斜。
  5. 过滤少数导致倾斜的key

    • 如果数据中有很多null或无效数据,可以在shuffle之前过滤掉这些数据。
  6. 将reduce join转为map join

    • 在对RDD使用join类操作,或者是在Spark SQL中使用join语句时,如果join操作中的一个RDD或表的数据量比较小,可以考虑将reduce join转为map join。
  7. 对key添加随机前缀

    • 将原始的key转化为随机值+key,进行reduceByKey操作,然后将随机值+key转成key,再进行reduceByKey操作。

这些方法可以帮助减轻或解决在使用reduceByKey时出现的数据倾斜问题,提升Spark作业的性能。