Spark 性能调优
本文介绍Spark 性能调优的一些策略
介绍一下join操作优化经验?
在Spark中,join操作是数据处理的核心操作之一,但其性能往往受到多种因素的影响。以下是一些优化join操作的经验:
选择合适的Join类型
- Inner Join:仅保留两个数据集中键匹配的记录。适用于需要精确匹配的场景。
- Outer Join:保留两个数据集中的所有记录,如果匹配不上则用null填充。包括Left Outer Join、Right Outer Join和Full Outer Join,根据实际需求选择。
- Left Semi Join:只返回左边数据集中存在匹配的记录,不返回右边数据集的字段。适用于只需要判断左边数据是否存在匹配的场景。
- Left Anti Join:只返回左边数据集中不存在匹配的记录,不返回右边数据集的字段。适用于需要找出左边数据中不匹配的部分。
处理数据倾斜
数据倾斜是join操作中常见的问题,会导致任务执行效率低下。以下是一些处理数据倾斜的方法:
-
增加Shuffle Partitions:通过增加shuffle partitions的数量,将数据划分得更加细致,减小每个partition中的数据量,从而减轻数据倾斜问题。
spark.conf.set("spark.sql.shuffle.partitions", "200")
-
使用Broadcast Join:对于一个小表和一个大表的join操作,可以使用Broadcast Join,将小表广播到每个Executor节点上,减少大表的数据传输和shuffle开销。
spark.sql("SELECT /*+ BROADCAST(table2) */ * FROM table1 JOIN table2 ON table1.key = table2.key")
-
对数据进行重新分区:使用
repartition
或coalesce
方法对数据进行重新分区,将数据均匀分布到不同的partition中。
调整并行度
并行度是指在集群中同时执行任务的数量,可以通过设置spark.default.parallelism
属性来控制。根据集群的资源情况和任务的复杂度来设置并行度,以提高处理效率。
spark.conf.set("spark.default.parallelism", "100")
另外,还可以通过在创建RDD时指定分区数,或者通过调用repartition()
或coalesce()
方法来调整RDD的分区数,从而控制并行度。
其他优化策略
- 选择合适的Join策略:Spark会根据数据集的大小和分区情况选择合适的join策略来执行连接操作,如Broadcast Hash Join、Shuffle Hash Join和Sort Merge Join。可以通过设置
joinHint
来指定连接策略的提示。 - 优化数据序列化:使用高效的序列化框架(如Kryo)来减少数据的序列化和反序列化开销。
- 避免频繁的Shuffle操作:合理设计数据分区和join操作,减少不必要的数据shuffle过程。
spark.storage.memoryFraction参数的含义,实际生产中如何调优?
spark.storage.memoryFraction
参数在Spark中用于设置RDD持久化数据在Executor内存中所占的比例。默认情况下,这个参数的值是0.6,意味着Executor的60%内存可以用于保存持久化的RDD数据。
参数含义
- RDD持久化:该参数决定了有多少内存可以用于存储持久化的RDD数据。持久化策略(如MEMORY_ONLY、MEMORY_AND_DISK等)会影响数据在内存和磁盘之间的存储方式。
- 内存分配:根据这个参数,Spark会动态地管理存储内存和执行内存之间的分配,以优化整体性能。
实际生产调优
在实际生产中,调优spark.storage.memoryFraction
参数需要考虑以下几个因素:
- 持久化操作频率:
- 如果Spark作业中有较多的RDD持久化操作,可以考虑提高这个参数的值,以确保持久化的数据能够容纳在内存中,避免数据写入磁盘带来的性能损失。
- Shuffle操作频率:
- 如果Spark作业中的Shuffle类操作比较多,而持久化操作相对较少,那么可以适当降低这个参数的值,以节约出更多的内存给JVM,减少GC(垃圾回收)的发生,从而改善程序的整体性能。
- GC表现:
- 通过Spark Web UI观察作业的GC耗时。如果发现GC时间很长,意味着Task执行用户代码的内存不够用,此时也可以考虑调低
spark.storage.memoryFraction
的值,以留出更多内存供Task执行使用。
- 通过Spark Web UI观察作业的GC耗时。如果发现GC时间很长,意味着Task执行用户代码的内存不够用,此时也可以考虑调低
- 数据规模和内存资源:
- 根据集群的内存资源和处理的数据规模来合理设置这个参数。如果内存资源充足,可以适当提高参数值以充分利用内存;如果内存资源紧张,则需要谨慎设置,以避免内存溢出或频繁的GC。
调优建议
- 监控和观察:在实际运行中,通过Spark Web UI和其他监控工具观察内存使用情况、GC表现以及作业执行时间等指标,以便及时调整参数。
- 逐步调整:不要一次性将参数值调整得过大或过小,而是逐步调整并观察效果,以找到最佳的参数值。
- 综合考虑:调优时需要综 合考虑多个因素,包括数据规模、作业类型、集群资源等,以制定最合适的调优策略。
spark.shuffle.memoryFraction参数的含义,以及优化经验?
-
spark.shuffle.memoryFraction
是shuffle调优中重要参数,shuffle从上一个task拉去数 据过来,要在Executor进行聚合操作,聚合操作时使用Executor内存的比例由该参数决定,默认 是20% 如果聚合时数据超过了该大小,那么就会spill到磁盘,极大降低性能; -
如果Spark作业中的 RDD持久化操作较少,shuffle操作较多时,建议降低持久化操作的内存占比,提高shuffle操作的 内存占比比例,避免shuffle过程中数据过多时内存不够用,必须溢写到磁盘上,降低了性能。此外如果发现作业由于频繁的gc导致运行缓慢,意味着task执行用户代码的内存不够用,那么同样建议调低这个参数的值。
Spark 调优都有哪些手段?
编码优化
1. RDD 复用
避免创建重复的RDD。在开发过程中要注意:对于同一份数据,只应该创建一个RDD,不要创建多个RDD来代表同一份数据。
2. RDD缓存/持久化
-
当多次对同一个RDD执行算子操作时,每一次都会对这个RDD以之前的父RDD重新计算一次,这种情况是必须要避免的,对同一个RDD的重复计算是对资源的极大浪费
-
对多次使用的RDD进行持久化,通过持久化将公共RDD的数据缓存到内存/磁盘中,之后对于公共RDD的计算都会从内存/磁盘中直接获取RDD数据
-
RDD的持久化是可以进行序列化的,当内存无法将RDD的数据完整的进行存放的时候,可以考虑使用序列化的方式减小数据体积,将数据完整存储在内存中
3. 巧用 filter
- 尽可能早的执行filter操作,过滤无用数据
- 在filter过滤掉较多数据后,使用 coalesce 对数据进行重分区
4. 使用高性能算子
-
避免使用groupByKey,根据场景选择使用高性能的聚合算子 reduceByKey、aggregateByKey
-
coalesce、repartition,选择没有shuffle的操作
-
foreachPartition 优化输出操作
-
map、mapPartitions,选择合理的选择算子mapPartitions性能更好,但数据量大时容易导致OOM
-
用 repartitionAndSortWithinPartitions 替代 repartition + sort 操作
-
合理使用 cache、persist、checkpoint,选择合理的数据存储级别
-
filter的使用
-
减少对数据源的扫描(算法复杂了)
5. 设置合理的并行度
- Spark作业中的并行度指各个stage的task的数量
- 设置合理的并行度,让并行度与资源相匹配。简单来说就是在资源允许的前提下,并行度要设置的尽可能大,达到可以充分利用集群资源。合理的设置并行度,可以提升整个Spark作业的性能和运行速度
6. 广播大变量
- 默认情况下,task中的算子中如果使用了外部变量,每个task都会获取一份变量的复本,这会造多余的网络传输和内存消耗
- 使用广播变量,只会在每个Executor保存一个副本,Executor的所有task共用此广播变量,这样就节约了网络及内存资源
7. Kryo序列化
- 默认情况下,Spark使用Java的序列化机制。Java的序列化机制使用方便,不需要额外的配置。但Java序列化机制效率不高,序列化速度慢而且序 列化后的数据占用的空间大
- Kryo序列化机制比Java序列化机制性能提高10倍左右。Spark之所以没有默认使用Kryo作为序列化类库,是它不支持所有对象的序列化,同时Kryo需要用户在使用前注册需要序列化的类型,不够方便。从Spark 2.0开始,简单类型、简单类型数组、字符串类型的Shuffling RDDs 已经默认使用 Kryo 序列化方式
8. 优化数据结构
Spark中有三种类型比较消耗内存:
- 对象。每个Java对象都有对象头、引用等额外的信息,占用了额外的内存空间
- 字符串。每个字符串内部都有一个字符数组以及长度等额外信息
- 集合类型。如HashMap、LinkedList等,集合类型内部通常会使用一些内部类来封装集合元素
Spark官方建议,在编码实现中,特别是对于算子函数中的代码,尽量不要使用上述三种数据结构。尽量使用字符串替代对象,使用原始类型(比如Int、Long)替代字符串,使用数组替代集合类型,尽可能地减少内存占用,从而降低GC频率,提升性能。
参数优化
1. shuffle 调优
开发过程中对 Shuffle 的优化:
-
减少Shuffle过程中的数据量
-
避免Shuffle
1.1 调节 map 端缓冲区大小
spark.shuffle.file.buffer
默认值为32K,shuffle write阶段buffer缓冲大小。将数据写到磁盘文件之前,会先写入buffer缓冲区,缓冲写满后才溢写到磁盘- 调节map端缓冲的大小,避免频繁的磁盘IO操作,进而提升任务整体性能
- 合理设置参数,性能会有 1%~5% 的提升
1.2 调节 reduce 端拉取数据缓冲区大小
spark.reducer.maxSizeInFlight
默认值为48M。设置shuffle read阶段buffer缓冲区大小,这个buffer缓冲决定了每次能够拉取多少数据- 在内存资源充足的情况下,可适当增加参数的大小(如96m),减少拉取数据的次数及网络传输次数,进而提升性能
- 合理设置参数,性能会有 1%~5% 的提升
1.3 调节 reduce 端拉取数据重试次数及等待间隔
- Shuffle read 阶段拉取数据时,如果因为网络异常导致拉取失败,会自动进行重试
spark.shuffle.io.maxRetries
,默认值3。最大重试次数spark.shuffle.io.retryWait
,默认值5s。每次重试拉取数据的等待间隔- 一般调高最大重试次数,不调整时间间隔
1.4 调节 Sort Shuffle 排序操作阈值
- 如果shuffle reduce task的数量小于阈值,则shuffle write过程中不会进行排序操作,而是直接按未经优化的Hash Shuffle方式写数据,最后将每个task产生的所有临时磁盘文件都合并成一个文件,并创建单独的索引文件
spark.shuffle.sort.bypassMergeThreshold
,默认值为200- 当使用SortShuffleManager时,如果的确不需要排序操作,建议将这个参数调大
1.5 调节 Shuffle 内存大小
- Spark给 Shuffle 阶段分配了专门的内存区域,这部分内存称为执行内存
- 如果内存充足,而且很少使用持久化操作,建议调高这个比例,给 shuffle 聚合操作更多内存,以避免由于内存不足导致聚合过程中频繁读写磁盘
- 合理调节该参数可以将性能提升10%左右
2. 内存调优
3. 资源优化
在提交Spark任务时,可以根据任务的特点和需求设置适当的资源参数,以下是一些常见的资源参数:
- Executor Memory:每个Executor分配的内存大小。如果应用程序需要处理大量的数据或进行复杂的计算,可以适当增加Executor Memory的大小。
- Executor Cores:每个Executor分配的CPU核数。如果应用程序需要进行大量的并行计算,可以适当增加Executor Cores的数量。
- Driver Memory:Driver进程分配的内存大小。如果应用程序需要缓存大量的数据或进行大量的Shuffle操作,可以适当增加Driver Memory的大小。
- Executor Memory Overhead:每个Executor分配的内存空间,用于JVM堆外的存储和操作系统缓存。可以根据应用程序的需求进行适当的调整。
4. 动态资源分配
Spark 提供了一种机制,可以根据工作负载动态调整应用程序占用的资源。这意味着如果不再使用资源,您的应用程序可能会将资源返还给集群,并在以后有需求时再次请求它们。如果多个应用程序共享 Spark 集群中的资源,则此功能特别有用。
- 动态的资源分配是 executor 级
- 默认情况下禁用此功能,并在所有粗粒度集群管理器上可用 (standlone,Yarn,Mesos,K8s)
动态申请 executor
如果有新任务处于等待状态,并且等待时间超过Spark.dynamicAllocation.schedulerBacklogTimeout(默认1s),则会依次启动xecutor,每次启动1、2、4、8…个executor(如果有的话)。启动的间隔由spark.dynamicAllocation.sustainedSchedulerBacklogTimeout
控制 (默认与schedulerBacklogTimeout相同)
动态移除executor
executor空闲时间超过 spark.dynamicAllocation.executorIdleTimeout
设置的值(默认60s),该executor会被移除,除非有缓存数据
相关参数
spark.dynamicAllocation.enabled = true
参考 -> https://spark.apache.org/docs/3.5.3/job-scheduling.html#dynamic-resource-allocation
5. 调节本地等待时间
- Spark总是倾向于让所有任务都具有最佳的数据本地性。遵循移动计算不移动数据的思想,Spark希望task能够运行在它要计算的数据所在的节点上,这样可以避免数据的网络传输
PROCESS_LOCAL > NODE_LOCAL > NO_PREF > RACK_LOCAL > ANY
- 在某些情况下,可能会出现一些空闲的executor没有待处理的数据,那么Spark可能就会牺牲一些数据本地
- 如果对应节点资源用尽,Spark会等待一段时间(默认3s)。如果等待指定时间后仍无法在该节点运行,那么自动降级,尝试将task分配到比较差的本地化级别所对应的节点上;如果当前级别仍然不行,那么继续降级
- 调节本地等待时长。如果在等待时间内,目标节点处理完成了一部分 Task,那么等待运行的 Task 将有机会得到执行,获得较好的数据本地性,提高 Spark 作业整体性能
- 根据数据本地性不同,等待的时间间隔也不一致,不同数据本地性的等待时间设置参数
spark.locality.wait
:设置所有级别的数据本地性,默认是3000毫秒spark.locality.wait.process
:多长时间等不到PROCESS_LOCAL就降级,默认为${spark.locality.wait}
spark.locality.wait.node
:多长时间等不到NODE_LOCAL就降级,默认为${spark.locality.wait}
spark.locality.wait.rack
:多长时间等不到RACK_LOCAL就降级,默认为${spark.locality.wait}
6. 调节链接等待时长
提升作业稳定性
-
Spark作业中,如果executor在数据读取过程中由于JVM垃圾回收或其他原因暂时无法响应,调节连接等待时长可以避免因连接超时导致的任务失败和作业崩溃。
-
通过增加等待时长,可以给executor更多的时间来完成其当前操作,从而减少因网络或资源竞争导致的任务重试和作业延迟。
方法
- 调节连接等待时长主要通过设置Spark配置参数来实现,例如
spark.core.connection.ack.wait.timeout
。 - 可以在
spark-submit
脚本中使用--conf
选项来设置这些参数,或者在Spark配置文件中进行配置。
数据倾斜产生的原因和解决
Spark中的数据倾斜是指在分布式计算过程中,数据在各个节点或分区之间分布不均匀,导致某些节点或分区的数据量远大于其他节点或分区。
- 原因
- 键分布不均匀:
- 当输入数据集中的键(Key)分布不均匀时,会导致Spark任务的负载不均衡。例如,某个键的数据量远远大于其他键的数据量,这将导致负载不均衡和计算节点的压力过大。(例如参与计算的 key 有大量空值(null))
- 聚合操作:
- 聚合操作(如
groupByKey
、reduceByKey
等)是导致数据倾斜的另一个常见原因。当进行聚合操作时,Spark会将具有相同键的数据分发到同一个节点进行计算。如果某个键的数据量远远大于其他键的数据量,将导致负载不均衡。
- 聚合操作(如
- 自定义分区器:
- 如果自定义分区器的规则不当,也会导致数据倾斜。例如,如果自定义分区器将大量数据映射到同一个分区,将导致该分区的负载过大。
处理:
-
预处理:
- 过滤key中的空值。
- 消除数据源带来的数据倾斜(文件采用可切分的压缩方式)
-
避免 shuffle:
- Map端的join是典型的解决方案
- 可以完全消除Shuffle,进而解决数据倾斜
- 有很强的适用场景(大表和小表关联),典型的大表与小表的join,其他场景不合适
-
减少 Shuffle 过程中传输的数据:
- 使用高性能算子,避免使用groupByKey,用reduceByKey或aggregateByKey替代
- 没有从根本上解决数据分配不均的问题,收效有限,使用场景有限
-
提高shuffle操作的并行度
- 增加shuffle read task的数量,可以让原本分配给一个task的多个key分配给多个task,从而让每个task处理比原来更少的数据。
- 只是缓解了数据倾斜而已,没有彻底根除问题,根据实践经验来看,其效果有限。
-
选择新的可用于聚合或join的Key:
- 从业务出发,使用新的key去做聚合或join。如当前key是【省 城市 日期】,在业务允许的情况下选择新的key【省 城市 区 日期】,有可能 解决或改善 数据倾斜问题
- 存在的问题:这样的key不好找;或者找到了新的key也不能解决问题
-
改变Reduce的并行度
key.hashCode % reduce个数 = 分区号
- 变更 reduce 的并行度。理论上分区数从 N 变为 N-1 有可能解决或改善数据倾斜
- 一般情况下这个方法不管用,数据倾斜可能是由很多key造成的,但是建议试试因为这个方法非常简单,成本极低
- 可能只是解决了这一次的数据倾斜问题,非长远之计
- 缺点:适用性不广;优点:简单
-
两阶段聚合(局部聚合+全局聚合)
- 对RDD执行reduceByKey等聚合类shuffle算子或者在Spark SQL中使用group by语句进行分组聚合时,比较适用这种方案
- 这个方案的核心实现思路就是进行两阶段聚合。第一次是局部聚合,先给每个key都打上一个随机数,比如10以内的随机数,此时原先一样的key就变成不一样的了,比如(hello, 1) (hello, 1) (hello, 1) (hello, 1),就会变成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接着对打上随机数后的数据,执行reduceByKey等聚合操作,进行局部聚合,那么局部聚合结果,就会变成了(1_hello, 2) (2_hello, 2)。然后将各个key的前缀给去掉,就会变成(hello,2)(hello,2),再次进行全局聚合操作,就可以得到最终结果了,比如(hello, 4)
- 将原本相同的key通过附加随机前缀的方式,变成多个不同的key,就可以让原本被一个task处理的数据分散到多个task上去做局部聚合,进而解决单个task处理数据量过多的问题。接着去除掉随机前缀,再次进行全局聚合,就可以得到最终的结果
- 对于聚合类的shuffle操作导致的数据倾斜,效果是非常不错的。通常都可以解决掉数据倾斜,或者至少是大幅度缓解数据倾斜,将Spark作业的性能提升数倍以上。
- 仅 仅适用于聚合类的shuffle操作,适用范围相对较窄。如果是join类的shuffle操作,还得用其他的解决方案
加盐:在键(Key)上添加一个随机前缀或后缀,这样可以将原本集中在某个或某几个节点上的数据分散到不同的节点或分区中
参考 :https://tech.meituan.com/2016/05/12/spark-tuning-pro.html