跳到主要内容

算子

本文主要介绍Spark的算子相关面试题


RDD 算子都有哪些?

RDD的操作算子分为两类:

  • Transformation。用来对RDD进行转化,这个操作时延迟执行的(或者说是 Lazy 的);
  • Action。用来触发RDD的计算;得到相关计算结果 或者 将结果保存的外部系统中;

算子 https://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations

Transformation

算子名称依赖关系说明
map(func)对原RDD中每个元素运用func函数,并生成新的RDD
filter(func)对原RDD中每个元素使用func函数进行过滤,并生成新的RDD
flatMap(func)与map类似,但每一个输入的item被映射成0个或多个输出的items(func返回类型需要为Seq)
mapPartitions(func)与map类似,但函数单独在RDD的每个分区上运行,func函数的类型为Iterator<T> => Iterator<U>
mapPartitionsWithIndex(func)与mapPartitions类似,但func类型为(Int, Iterator<T>) => Iterator<U>,其中第一个参数为分区索引
sample(withReplacement, fraction, seed)数据采样,有三个可选参数:设置是否放回(withReplacement)、采样的百分比(fraction)、随机数生成器的种子(seed)
union合并两个RDD
intersection求两个RDD的交集
distinct去重
groupByKey当对(K,V)对的数据集调用时,返回(K,Iterable)对的数据集。
注意事项:如果您分组是为了对每个键执行聚合(例如求和或平均),则使用 reduceByKeyaggregateByKey 将产生更好的性能。
注意:默认情况下,输出中的并行级别取决于父RDD的分区数量。您可以传递一个可选的 numPartitions 参数来设置不同数量的任务。
reduceByKey当在(K,V)对的数据集上调用时,返回一个(K,V)对的数据集,其中每个键的值使用给定的reduce函数func聚合,该函数必须是(V,V)=> V类型。
aggregateByKey对键值对 RDD 进行聚合操作的算子。它允许用户指定一个初始值,并定义如何在分区内部聚合数据以及如何跨分区聚合数据。
sortByKey当在(K,V)对的数据集上调用时,其中K实现了Ordered,返回一个按键升序或降序排序的(K,V)对的数据集
join当对类型为(K,V)和(K,W)的数据集调用时,返回一个包含(K,(V,W))对的数据集,其中每个键都包含所有元素对。通过 leftOuterJoinrightOuterJoinfullOuterJoin 支持外部连接
cogroup当对类型为(K,V)和(K,W)的数据集调用时,返回一个(K,(Iterable,Iterable))元组的数据集。此操作也称为 groupWith
cartesian当对T和U类型的数据集调用时,返回一个(T,U)对的数据集(所有元素对)
pipe通过shell命令(例如Perl或bash脚本)对RDD的每个分区进行管道传输。RDD元素被写入进程的stdin,输出到其stdout的行作为字符串的RDD返回。
coalesce宽/窄将RDD中的分区数减少到numPartitions。用于在过滤大型数据集后更有效地运行操作。
repartition机重新排列RDD中的数据,以创建更多或更少的分区,并在它们之间进行平衡。这总是打乱网络上的所有数据。
repartitionAndSortWithinPartitions根据给定的分区程序对RDD进行重新分区,并在每个分区中按记录的键对记录进行排序。这比调用 repartition 然后在每个分区内排序更有效,因为它可以将排序推到shuffle机器中。

Action

算子名称依赖关系说明
reduce宽依赖先聚合分区内的数据,再聚合分区间的数据,最终返回一个结果
collect宽依赖在驱动程序中,以数组Array的形式返回数据集的所有元素,数据按照分区编号有序返回
count宽依赖计算RDD中元素的数量,先在每个分区内进行计数,然后将结果相加得到总数
first宽依赖返回数据集中的第一个元素,如果数据集为空,则抛出异常
take(num)宽依赖返回数据集中的前num个元素,结果以数组形式返回
takeSample返回一个数组,其中包含数据集的num个元素的随机样本,可以替换也可以不替换,可以选择预先指定一个随机数生成器种子
takeOrdered(num, key)宽依赖返回按照指定key排序后的前num个元素,结果以数组形式返回
takeOrdered使用自然顺序或自定义比较器返回RDD的前n个元素
saveAsTextFile将数据集的元素作为文本文件(或文本文件集)写入本地文件系统、HDFS或任何其他Hadoop支持的文件系统中的给定目录中。Spark会在每个元素上调用toString,将其转换为文件中的一行文本。
saveAsSequenceFile将数据集的元素作为Hadoop SequenceFile写入本地文件系统、HDFS或任何其他Hadoop支持的文件系统中的给定路径。这在实现Hadoop的可扩展接口的键值对的RDD上可用。在Scala中,它也可以用于隐式转换为可转换的类型(Spark包括Int,Double,String等基本类型的转换)。
saveAsObjectFile使用Java序列化以简单格式编写数据集的元素,然后可以使用 SparkContext.objectFile() 加载。
countByKey()仅适用于(K,V)型RDD。返回(K,Int)对的散列表以及每个键的计数
foreach修改除累加器之外的变量可能会导致未定义的行为。有关更多详细信息,请参见了解闭包。

map 与 mapPartitions 的区别?

  • map:每次处理一条数据
  • mapPartitions:每次处理一个分区的数据,分区的数据处理完成后,数据才能释放,资源不足时容易 OOM

最佳实践:当内存资源充足时,建议使用mapPartitions,以提高处理效率

map和flatmap的区别?

Apache Spark中的mapflatMap都是转换操作(transformations),它们用于对RDD(弹性分布式数据集)中的每个元素应用一个函数,并返回一个新的RDD。它们的主要区别在于它们如何处理函数的输出:

  1. map

    • map操作接受一个函数作为参数,该函数会被应用于RDD中的每个元素。
    • 这个函数的输出是一个单一的值,即对于RDD中的每个元素,map操作会生成一个对应的输出元素。
    • 结果RDD中的元素数量与原始RDD中的元素数量相同,除非应用的函数返回了null,这种情况下对应的元素会被忽略。

    例如,如果你有一个RDD包含数字,并且你想将每个数字乘以2,你可以使用map操作:

    val rdd = sc.parallelize(List(1, 2, 3))
    val doubled = rdd.map(x => x * 2)
  2. flatMap

    • flatMap操作也接受一个函数作为参数,该函数会被应用于RDD中的每个元素。
    • 不同的是,这个函数可以返回一个序列(如列表、数组等),flatMap会将这个序列中的所有元素都添加到结果RDD中。
    • 这意味着对于RDD中的每个元素,flatMap操作可能会生成多个输出元素,因此结果RDD中的元素数量可能会比原始RDD中的元素数量多。

    例如,如果你有一个RDD包含字符串,并且你想将每个字符串拆分成单词,你可以使用flatMap操作:

    val rdd = sc.parallelize(List("hello world", "spark is fun"))
    val words = rdd.flatMap(_.split(" "))

在性能方面,flatMap可能会导致数据量显著增加,因为它可能会将每个输入元素映射成多个输出元素,这可能会导致后续操作的数据倾斜问题。因此,在使用flatMap时需要谨慎,以避免不必要的资源消耗和性能问题。

区别总结

  • map:保留原集合的嵌套结构,每个元素被处理后仍然是独立的子集合。
  • flatMap:先处理元素,再将嵌套结构展开,最终得到一个一维集合。

Spark的cache和persist的区别?它们是transformaiton算子还是action算子?

在Apache Spark中,cachepersist都是用来将RDD持久化到内存中的方法,以便后续的多次计算可以重用这些数据,从而提高性能。它们的主要区别在于持久化级别和使用场景:

  1. cache

    • cachepersist的一个特例,它使用默认的存储级别StorageLevel.MEMORY_ONLY,这意味着它会将数据以反序列化的Java对象的形式存储在内存中。
    • 如果内存不足以存储所有数据,cache会将部分数据存储到磁盘上。
    • cache通常用于那些会被多次使用的RDD,因为它会自动处理数据的缓存和失效。
  2. persist

    • persist允许你指定一个存储级别,这个存储级别定义了数据应该如何被存储(例如,仅在内存中,仅在磁盘上,或者两者的组合)。
    • 你可以使用不同的StorageLevel枚举值来控制数据的存储方式,比如MEMORY_ONLY, MEMORY_AND_DISK, DISK_ONLY等。
    • persist提供了更细粒度的控制,允许你根据数据的大小和使用情况来选择最合适的存储级别。

关于它们是transformation算子还是action算子的问题:

  • 它们既不是transformation算子也不是action算子。
  • cachepersist实际上是持久化操作,它们不会立即触发计算,而是告诉Spark将数据保留在内存或磁盘中,以便后续的action操作(如count, collect, take等)可以更快地访问这些数据。
  • 当你调用cachepersist时,Spark不会立即执行任何计算,只有当你执行一个action操作时,Spark才会根据需要缓存或持久化数据。

总结来说,cachepersist是用于优化Spark应用程序性能的工具,它们通过减少数据的重复计算来提高效率。选择使用cache还是persist取决于你的具体需求,比如数据的大小、使用频率以及可用的内存和存储资源。

reduceByKey和groupByKey的区别和作用?

在Apache Spark中,reduceByKeygroupByKey都是对键值对(pair)RDD进行操作的转换算子,但它们在处理数据的方式和性能上存在一些差异。以下是它们的区别和作用:

  1. groupByKey

    • groupByKey将所有具有相同键的值收集到一个列表中,形成一个(键,值列表)的对。
    • 它会返回一个新的RDD,其中每个键都映射到一个包含所有对应值的集合。
    • groupByKey是一个宽依赖操作,因为它需要将所有具有相同键的值从不同分区中收集到一起,这可能会导致大量的数据传输和潜在的性能问题。
    • 通常,groupByKey在数据量较大时会消耗更多的内存和网络资源,因为它需要将所有值收集到一个集合中。

    例如,如果你有一个RDD包含(键,值)对,并且你想将所有具有相同键的值收集到一个列表中,你可以使用groupByKey

    val rdd = sc.parallelize(List((1, "a"), (1, "b"), (2, "c"), (2, "d"), (3, "e")))
    val grouped = rdd.groupByKey()
  2. reduceByKey

    • reduceByKey对每个键的所有值应用一个二元函数(即函数接受两个参数),并将结果合并成一个值。
    • 它会返回一个新的RDD,其中每个键都映射到一个值,这个值是通过将所有对应值应用给定函数得到的。
    • reduceByKey是一个窄依赖操作,因为它只需要在每个分区内部对值进行合并,然后将合并后的结果在不同分区之间进行合并。
    • reduceByKey通常比groupByKey更高效,因为它减少了数据的传输和内存的使用,特别是在处理大量数据时。

    例如,如果你有一个RDD包含(键,值)对,并且你想将所有具有相同键的值相加,你可以使用reduceByKey

    val rdd = sc.parallelize(List((1, 2), (1, 3), (2, 4), (2, 5), (3, 6)))
    val reduced = rdd.reduceByKey(_ + _)

在选择使用groupByKey还是reduceByKey时,需要考虑数据的大小和计算的复杂性。如果每个键对应的值的数量很大,使用groupByKey可能会导致内存不足或网络拥塞,而reduceByKey则可以更有效地处理这种情况。此外,如果计算逻辑可以表示为一个二元函数,那么reduceByKey通常是更好的选择。

reduceByKey和reduce的区别?

在Apache Spark中,reduceByKeyreduce是两种不同的转换操作,它们在处理数据的方式和适用场景上存在一些差异。以下是它们的区别:

  1. reduceByKey

    • reduceByKey是针对键值对(pair)RDD的操作,它对每个键的所有值应用一个二元函数,并将结果合并成一个值。
    • reduceByKey会返回一个新的RDD,其中每个键都映射到一个值,这个值是通过将所有对应值应用给定函数得到的。
    • reduceByKey是一个窄依赖操作,因为它只需要在每个分区内部对值进行合并,然后将合并后的结果在不同分区之间进行合并。
    • reduceByKey通常比reduce更高效,因为它减少了数据的传输和内存的使用,特别是在处理大量数据时。

    例如,如果你有一个RDD包含(键,值)对,并且你想将所有具有相同键的值相加,你可以使用reduceByKey

    val rdd = sc.parallelize(List((1, 2), (1, 3), (2, 4), (2, 5), (3, 6)))
    val reduced = rdd.reduceByKey(_ + _)
  2. reduce

    • reduce是针对任何类型的RDD的操作,它对RDD中的所有元素应用一个二元函数,并将结果合并成一个值。
    • reduce会返回一个单一的值,这个值是通过将所有元素应用给定函数得到的。
    • reduce是一个宽依赖操作,因为它需要将所有元素从不同分区中收集到一起,这可能会导致大量的数据传输和潜在的性能问题。
    • reduce通常用于计算整个数据集的聚合值,如总和、最大值、最小值等。

    例如,如果你有一个RDD包含数字,并且你想计算所有数字的总和,你可以使用reduce

    val rdd = sc.parallelize(List(1, 2, 3, 4, 5))
    val sum = rdd.reduce(_ + _)

在选择使用reduceByKey还是reduce时,需要考虑数据的类型和计算的目标。如果数据是键值对,并且你想对每个键的值进行合并,那么reduceByKey是更好的选择。如果数据是单一类型的元素,并且你想计算整个数据集的聚合值,那么reduce是更好的选择。

使用reduceByKey出现数据倾斜怎么办?

在使用Spark的reduceByKey操作时,如果出现数据倾斜问题,可以采取以下几种解决方案:

  1. 采样倾斜key并分拆join操作

    • 对于由单个key导致的数据倾斜,可以将这个key单独提取出来,组成一个RDD,然后与其他RDD单独进行join操作。这样,原本会导致倾斜的key对应的数据会在shuffle阶段被分散到多个task中去进行join操作。
  2. 使用随机数扩容进行join

    • 如果在进行join操作时,RDD中有大量的key导致数据倾斜,可以考虑对其中一个RDD数据进行扩容,另一个RDD进行稀释后再进行join。
  3. 两阶段聚合(局部聚合+全局聚合)

    • 这种方法适用于聚合类的shuffle操作导致的数据倾斜。首先,给倾斜的dataSkewRDD中每个key都打上一个随机前缀,进行局部聚合,然后去除随机前缀,再次进行全局聚合。
  4. 提高shuffle操作的并行度

    • 例如,使用groupByKey(100)来提高并行度,相当于增加了reduce task的数量,每一个reduce task中分区的数据量会减少,可以缓解数据倾斜。
  5. 过滤少数导致倾斜的key

    • 如果数据中有很多null或无效数据,可以在shuffle之前过滤掉这些数据。
  6. 将reduce join转为map join

    • 在对RDD使用join类操作,或者是在Spark SQL中使用join语句时,如果join操作中的一个RDD或表的数据量比较小,可以考虑将reduce join转为map join。
  7. 对key添加随机前缀

    • 将原始的key转化为随机值+key,进行reduceByKey操作,然后将随机值+key转成key,再进行reduceByKey操作。

这些方法可以帮助减轻或解决在使用reduceByKey时出现的数据倾斜问题,提升Spark作业的性能。