算子
本文主要介绍Spark的算子相关面试题
RDD 算子都有哪些?
RDD的操作算子分为两类:
- Transformation。用来对RDD进行转化,这个操作时延迟执行的(或者说是 Lazy 的);
- Action。用来触发RDD的计算;得到相关计算结果 或者 将结果保存的外部系统中;
算子 https://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations
Transformation
算子名称 | 依赖关系 | 说明 |
---|---|---|
map(func) | 窄 | 对原RDD中每个元素运用func函数,并生成新的RDD |
filter(func) | 窄 | 对原RDD中每个元素使用func函数进行过滤,并生成新的RDD |
flatMap(func) | 窄 | 与map类似,但每一个输入的item被映射成0个或多个输出的items(func返回类型需要为Seq) |
mapPartitions(func) | 窄 | 与map类似,但函数单独在RDD的每个分区上运行,func函数的类型为Iterator<T> => Iterator<U> |
mapPartitionsWithIndex(func) | 窄 | 与mapPartitions类似,但func类型为(Int, Iterator<T>) => Iterator<U> ,其中第一个参数为分区索引 |
sample(withReplacement, fraction, seed) | 窄 | 数据采样,有三个可选参数:设置是否放回(withReplacement)、采样的百分比(fraction)、随机数生成器的种子(seed) |
union | 窄 | 合并两个RDD |
intersection | 窄 | 求两个RDD的交集 |
distinct | 宽 | 去重 |
groupByKey | 宽 | 当对(K,V)对的数据集调用时,返回(K,Iterable)对的数据集。 注意事项:如果您分组是为了对每个键执行聚合(例如求和或平均),则使用 reduceByKey 或 aggregateByKey 将产生更好的性能。注意:默认情况下,输出中的并行级别取决于父RDD的分区数量。您可以传递一个可选的 numPartitions 参数来设置不同数量的任务。 |
reduceByKey | 窄 | 当在(K,V)对的数据集上调用时,返回一个(K,V)对的数据集,其中每个键的值使用给定的reduce函数func聚合,该函数必须是(V,V)=> V类型。 |
aggregateByKey | 宽 | 对键值对 RDD 进行聚合操作的算子。它允许用户指定一个初始值,并定义如何在分区内部聚合数据以及如何跨分区聚合数据。 |
sortByKey | 宽 | 当在(K,V)对的数据集上调用时,其中K实现了Ordered,返回一个按键升序或降序排序的(K,V)对的数据集 |
join | 宽 | 当对类型为(K,V)和(K,W)的数据集调用时,返回一个包含(K,(V,W))对的数据集,其中每个键都包含所有元素对。通过 leftOuterJoin 、 rightOuterJoin 和 fullOuterJoin 支持 外部连接 |
cogroup | 宽 | 当对类型为(K,V)和(K,W)的数据集调用时,返回一个(K,(Iterable,Iterable))元组的数据集。此操作也称为 groupWith 。 |
cartesian | 窄 | 当对T和U类型的数据集调用时,返回一个(T,U)对的数据集(所有元素对) |
pipe | 窄 | 通过shell命令(例如Perl或bash脚本)对RDD的每个分区进行管道传输。RDD元素被写入进程的stdin,输出到其stdout的行作为字符串的RDD返回。 |
coalesce | 宽/窄 | 将RDD中的分区数减少到numPartitions。用于在过滤大型数据集后更有效地运行操作。 |
repartition | 宽 | 机重新排列RDD中的数据,以创建更多或更少的分区,并在它们之间进行平衡。这总是打乱网络上的所有数据。 |
repartitionAndSortWithinPartitions | 宽 | 根据给定的分区程序对RDD进行重新分区,并在每个分区中按记录的键对记录进行排序。这比调用 repartition 然后在每个分区内排序更有效,因为它可以将排序推到shuffle机器中。 |
Action
算子名称 | 依赖关系 | 说明 |
---|---|---|
reduce | 宽依赖 | 先聚合分区内的数据,再聚合分区间的数据,最终返回一个结果 |
collect | 宽依赖 | 在驱动程序中,以数组Array的形式返回数据集的所有元素,数据按照分区编号有序返回 |
count | 宽依赖 | 计算RDD中元素的数量,先在每个分区内进行计数,然后将结果相加得到总数 |
first | 宽依赖 | 返回数据集中的第一个元素,如果数据集为空,则抛出异常 |
take(num) | 宽依赖 | 返回数据集中的前num个元素,结果以数组形式返回 |
takeSample | 返回一个数组,其中包含数据集的num个元素的随机样本,可以替换也可以不替换,可以选择预先指定一个随机数生成器种子 | |
takeOrdered(num, key) | 宽依赖 | 返回按照指定key排序后的前num个元素,结果以数组形式返回 |
takeOrdered | 使用自然顺序或自定义比较器返回RDD的前n个元素 | |
saveAsTextFile | 将数据集的元素作为文本文件(或文本文件集)写入本地文件系统、HDFS或任何其他Hadoop支持的文件系统中的给定目录中。Spark会在每个元素上调用toString,将其转换为文件中的一行文本。 | |
saveAsSequenceFile | 将数据集的元素作为Hadoop SequenceFile写入本地文件系统、HDFS或任何其他Hadoop支持的文件系统中的给定路径。这在实现Hadoop的可扩展接口的键值对的RDD上可用。在Scala中,它也可以用于隐式转换为可转换的类型(Spark包括Int,Double,String等基本类型的转换)。 | |
saveAsObjectFile | 使用Java序列化以简单格式编写数据集的元素,然后可以使用 SparkContext.objectFile() 加载。 | |
countByKey() | 仅适用于(K,V)型RDD。返回(K,Int)对的散列表以及每个键的计数 | |
foreach | 修改除累加器之外的变量可能会导致未定义的行为。有关更多详细信息,请参见了解闭包。 |
map 与 mapPartitions 的区别?
- map:每次处理一条数据
- mapPartitions:每次处理一个分区的数据,分区的数据处理完成后,数据才能释放,资源不足时容易 OOM
最佳实践:当内存资源充足时,建议使用mapPartitions,以提高处理效率
map和flatmap的区别?
Apache Spark中的map
和flatMap
都是转换操作(transformations),它们用于对RDD(弹性分布式数据集)中的每个元素应用一个函数,并返回一个新的RDD。它们的主要区别在于它们如何处理函数的输出:
-
map:
map
操作接受一个函数作为参数,该函数会被应用于RDD中的每个元素。- 这个函数的输出是一个单一的值,即对于RDD中的每个元素,
map
操作会生成一个对应的输出元素。 - 结果RDD中的元素数量与原始RDD中的元素数量相同,除非应用的函数返回了
null
,这种情况下对应的元素会被忽略。
例如,如果你有一个RDD包含数字,并且你想将每个数字乘以2,你可以使用
map
操作:val rdd = sc.parallelize(List(1, 2, 3))
val doubled = rdd.map(x => x * 2) -
flatMap:
flatMap
操作也接受一个函数作为参数,该函数会被应用于RDD中的每个元素。- 不同的是,这个函数可以返回一个序列(如列表、数组等),
flatMap
会将这个序列中的所有元素都添加到结果RDD中。 - 这意味着对于RDD中的每个元素,
flatMap
操作可能会生成多个输出元素,因此结果RDD中的元素数量可能会比原始RDD中的元素数量多。
例如,如果你有一个RDD包含字符串,并且你想将每个字符串拆分成单词,你可以使用
flatMap
操作:val rdd = sc.parallelize(List("hello world", "spark is fun"))
val words = rdd.flatMap(_.split(" "))
在性能方面,flatMap
可能会导致数据量显著增加,因为它可能会将每个输入元素映射成多个输出元素,这可能会导致后续操作的数据倾斜问题。因此,在使用flatMap
时需要谨慎,以避免不必要的资源消耗和性能问题。
区别总结
map
:保留原集合的嵌套结构,每个元素被处理后仍然是独立的子集合。flatMap
:先处理元素,再将嵌套结构展开,最终得到一个一维集合。
Spark的cache和persist的区别?它们是transformaiton算子还是action算子?
在Apache Spark中,cache
和persist
都是用来将RDD持久化到内存中的方法,以便后续的多次计算可以重用这些数据,从而提高性能。它们的主要区别在于持久化级别和使用场景:
-
cache:
cache
是persist
的一个特例,它使用默认的存储级别StorageLevel.MEMORY_ONLY
,这意味着它会将数据以反序列化的Java对象的形式存储在内存中。- 如果内存不足以存储所有数据,
cache
会将部分数据存储到磁盘上。 cache
通常用于那些会被多次使用的RDD,因为它会自动处理数据的缓存和失效。
-
persist:
persist
允许你指定一个存储级别,这个存储级别定义了数据应该如何被存储(例如,仅在内存中,仅在磁盘上,或者两者的组合)。- 你可以使用不同的
StorageLevel
枚举值来控制数据的存储方式,比如MEMORY_ONLY
,MEMORY_AND_DISK
,DISK_ONLY
等。 persist
提供了更细粒度的控制,允许你根据数据的大小和使用情况来选择最合适的存储级别。
关于它们是transformation算子还是action算子的问题:
- 它们既不是transformation算子也不是action算子。
cache
和persist
实际上是持久化操作,它们不会立即触发计算,而是告诉Spark将数据保留在内存或磁盘中,以便后续的action操作(如count
,collect
,take
等)可以更快地访问这些数据。- 当你调用
cache
或persist
时,Spark不会立即执行任何计算,只有当你执行一个action操作时,Spark才会根据需要缓存或持久化数据。
总结来说,cache
和persist
是用于优化Spark应用程序性能的工具,它们通过减少数据的重复计算来提高效率。选择使用cache
还是persist
取决于你的具体需求,比如数据的大小、使用频率以及可用的内存和存储资源。
reduceByKey和groupByKey的区别和作用?
在Apache Spark中,reduceByKey
和groupByKey
都是对键值对(pair)RDD进行操作的转换算子,但它们在处理数据的方式和性能上存在一些差异。以下是它们的区别和作用:
-
groupByKey:
groupByKey
将所有具有相同键的值收集到一个列表中,形成一个(键,值列表)的对。- 它会返回一个新的RDD,其中每个键都映射到一个包含所有对应值的集合。
groupByKey
是一个宽依赖操作,因为它需要将所有具有相同键的值从不同分区中收集到一起,这可能会导致大量的数据传输和潜在的性能问题。- 通常,
groupByKey
在数据量较大时会消耗更多的内存和网络资源,因为它需要将所有值收集到一个集合中。
例如,如果你有一个RDD包含(键,值)对,并且你想将所有具有相同键的值收集到一个列表中,你可以使用
groupByKey
:val rdd = sc.parallelize(List((1, "a"), (1, "b"), (2, "c"), (2, "d"), (3, "e")))
val grouped = rdd.groupByKey() -
reduceByKey:
reduceByKey
对每个键的所有值应用一个二元函数(即函数接受两个参数),并将结果合并成一个值。- 它会返回一个新的RDD,其中每个键都映射到一个值,这个值是通过将所有对应值应用给定函数得到的。
reduceByKey
是一个窄依赖操作,因为它只需要在每个分区内部对值进行合并,然后将合并后的结果在不同分区之间进行合并。reduceByKey
通常比groupByKey
更高效,因为它减少了数据的传输和内存的使用,特别是在处理大量数据时。
例如,如果你有一个RDD包含(键,值)对,并且你想将所有具有相同键的值相加,你可以使用
reduceByKey
:val rdd = sc.parallelize(List((1, 2), (1, 3), (2, 4), (2, 5), (3, 6)))
val reduced = rdd.reduceByKey(_ + _)
在选择使用groupByKey
还是reduceByKey
时,需要考虑数据的大小和计算的复杂性。如果每个键对应的值的数量很大,使用groupByKey
可能会导致内存不足或网络拥塞,而reduceByKey
则可以更有效地处理这种情况。此外,如果计算逻辑可以表示为一个二元函数,那么reduceByKey
通常是更好的选择。
reduceByKey和reduce的区别?
在Apache Spark中,reduceByKey
和reduce
是两种不同的转换操作,它们在处理数据的方式和适用场景上存在一些差异。以下是它们的区别:
-
reduceByKey:
reduceByKey
是针对键值对(pair)RDD的操作,它对每个键的所有值应用一个二元函数,并将结果合并成一个值。reduceByKey
会返回一个新的RDD,其中每个键都映射到一个值,这个值是通过将所有对应值应用给定函数得到的。reduceByKey
是一个窄依赖操作,因为它只需要在每个分区内部对值进行合并,然后将合并后的结果在不同分区之间进行合并。reduceByKey
通常比reduce
更高效,因为它减少了数据的传 输和内存的使用,特别是在处理大量数据时。
例如,如果你有一个RDD包含(键,值)对,并且你想将所有具有相同键的值相加,你可以使用
reduceByKey
:val rdd = sc.parallelize(List((1, 2), (1, 3), (2, 4), (2, 5), (3, 6)))
val reduced = rdd.reduceByKey(_ + _) -
reduce:
reduce
是针对任何类型的RDD的操作,它对RDD中的所有元素应用一个二元函数,并将结果合并成一个值。reduce
会返回一个单一的值,这个值是通过将所有元素应用给定函数得到的。reduce
是一个宽依赖操作,因为它需要将所有元素从不同分区中收集到一起,这可能会导致大量的数据传输和潜在的性能问题。reduce
通常用于计算整个数据集的聚合值,如总和、最大值、最小值等。
例如,如果你有一个RDD包含数字,并且你想计算所有数字的总和,你可以使用
reduce
:val rdd = sc.parallelize(List(1, 2, 3, 4, 5))
val sum = rdd.reduce(_ + _)
在选择使用reduceByKey
还是reduce
时,需要考虑数据的类型和计算的目标。如果数据是键值对,并且你想对每个键的值进行合并,那么reduceByKey
是更好的选择。如果数据是单一类型的元素,并且你想计算整个数据集的聚合值,那么reduce
是更好的选择。
使用reduceByKey出现数据倾斜怎么办?
在使用Spark的reduceByKey
操作时,如果出现数据倾斜问题,可以采取以下几种解决方案:
-
采样倾斜key并分拆join操作:
- 对于由单个key导致的数据倾斜,可以将这个key单独提取出来,组成一个RDD,然后与其他RDD单独进行join操作。这样,原本会导致倾斜的key对应的数据会在shuffle阶段被分散到多个task中去进行join操作。
-
使用随机数扩容进行join:
- 如果在进行join操作时,RDD中有大量的key导致数据倾斜,可以考虑对其中一个RDD数据进行扩容,另一个RDD进行稀释后再进行join。
-
两阶段聚合(局部聚合+全局聚合):
- 这种方法适用于聚合类的shuffle操作导致的数据倾斜。首先,给倾斜的dataSkewRDD中每个key都打上一个随机前缀,进行局部聚合,然后去除随机前缀,再次进行全局聚合。
-
提高shuffle操作的并行度:
- 例如,使用
groupByKey(100)
来提高并行度,相当于增加了reduce task的数量,每一个reduce task中分区的数据量会减少,可以缓解数据倾斜。
- 例如,使用
-
过滤少数导致倾斜的key:
- 如果数据中有很多null或无效数据,可以在shuffle之前过滤掉这些数据。
-
将reduce join转为map join:
- 在对RDD使用join类操作,或者是在Spark SQL中使用join语句时,如果join操作中的一个RDD或表的数据量比较小,可以考虑将reduce join转为map join。
-
对key添加随机前缀:
- 将原始的key转化为随机值+key,进行reduceByKey操作,然后将随机值+key转成key,再进行reduceByKey操作。
这些方法可以帮助减轻或解决在使用reduceByKey
时出现的数据倾斜问题,提升Spark作业的性能。