Hive性能优化
1.资源优化
- Hive 运行在yarn 集群上,对集群资源的利用率的提升,也会对hive优化有帮助作用。例如 cpu 超配,需要看cpu 的利用率
- 设置合理的内存,避免内存浪费,集群的并发度减少
2.建表优化
2.1 分区设计
Hive分区是将表数据按照某个或多个字段划分为多个目录存储的技术。每个分区对应HDFS上的一个目录,分区字段的值作为目录名。分区的意义在于:
- 缩小查询范围:查询时只需扫描相关分区,减少数据读取量。
- 优化数据管理:便于按照业务需求(如时间、地域)对数据进行分类存储。
- 提升查询性能:通过分区裁剪(Partition Pruning)减少MapReduce任务的输入数据量。
a. 分区的类型
Hive分区分为静态分区和动态分区:
- 静态分区:在加载数据时手动指定分区字段的值,适用于分区字段值已知的场景。
- 动态分区:根据数据内容自动创建分区,适用于分区字段值不确定的场景,需启用动态分区功能。
b. 分区的创建与管理
- 创建分区表:使用
PARTITIONED BY
关键字定义分区字段,分区字段不能与表中已有字段重复。 - 加载数据:通过
LOAD DATA
或INSERT
语句将数据加载到指定分区。 - 查看分区:使用
SHOW PARTITIONS
命令查看表的分区信息。 - 删除分区:使用
ALTER TABLE ... DROP PARTITION
删除指定分区。
c. 分区的优化与注意事项
- 分区字段选择:选择高基数字段(如时间、地域)作为分区字段,避免分区过多或过少。
- 分区数量控制:过多的分区会增加元数据管理开销,需根据数据量和查询需求合理设计分区策略。
- 编码规范:确保分区字段的命名和值符合业务逻辑,避免歧义。
d.. 分区的应用场景
- 日志管理:按照日期对日志数据进行分区,便于按时间范围查询。
- 数据分析:按照地域、类别等维度对数据进行分区,支持多维分析。
- 性能优化:通过分区裁剪减少查询数据量,提升大数据查询效率。
- 选择高基数(不同值较多)但又不至于产生过多小分区的列作为分区键
- 经常用于过滤条件的列适合作为分区列
- 避免过度分区,可能导致小文件问题
2.2 分桶设计
分桶是将数据基于某列的哈希值分散到固定数量的桶中的技术。
优势:
- 提高某些查询的效率,尤其是join操作
- 为抽样查询提供更高效的支持
- 可以实现更细粒度的数据组织
使用建议:
- 选 择具有较高基数的列作为分桶列
- 桶的数量通常选择2的幂次方(如64、128等)
- 在有大量join操作的场景下特别有用
2.3.数据格式优化
压缩效率 orc>parquet>text 一般来说,选择orc 和parquet 较多,但对于Apache Hudi 来说,parquet 的兼容性会更好一点,所以要根据链路技术栈去做选择,不单单从性能方面考虑。
2.4 压缩格式选择
压缩算法对比
压缩方式 | 压缩比 | 压缩速度 | 解压缩速度 | 是否可分割 |
---|---|---|---|---|
gzip | 13.4% | 21 MB/s | 118 MB/s | 否 |
bzip2 | 13.2% | 2.4MB/s | 9.5MB/s | 是 |
LZO | 20.5% | 135 MB/s | 410 MB/s | 是 |
snappy | 22.2% | 172 MB/s | 409 MB/s | 否 |
尽管 SNAPPY 不支持拆分压缩数据,但由于 ORC 的结构使得数据可以在块级别进行并行处理,再加上 SNAPPY 的高解压速度,使得这种组合在很多场景下仍然可以实现非常高效的查询和计算性能
3.HQL 和运行参数优化
3.1 列裁剪/分区裁剪
只读取所需要的列或者是分区,减少文件扫描的数据量
3.2 谓词下推
谓词下推(Predicate Pushdown,PPD)是 Hive SQL 中的一种重要优化技术。谓词指的是 SQL 查询里的过滤条件,像 WHERE
子句、JOIN
条件等。谓词下推的核心思想是把过滤条件尽可能地提前到数据源端进行处理,从而减少从数据源传输到后续处理阶段的数据量,提升查询效率。
默认参数开启
SET hive.optimize.ppd=false;
3.3 JVM 重用
Hive语句最终会转换为一系的MapReduce任务,每一个MapReduce任务是由一系的MapTask 和ReduceTask组成的,默认情况下,MapReduce中一个MapTask或者ReduceTask就会启动一个 JVM进程,一个Task执行完毕后,JVM进程就会退出。这样如果任务花费时间很短,又要多次启动 JVM的情况下,JVM的启动时间会变成一个比较大的消耗,这时,可以通过重用JVM来解决 。
set mapred.job.reuse.jvm.num.tasks=5;
JVM也是有缺点的,开启JVM重用会一直占用使用到的task的插槽,以便进行重用,直到任务完成后才 会释放。如果某个不平衡的job中有几个reduce task执行的时间要比其他 的reduce task消耗的时间 要多得多的话,那么保留的插槽就会一直空闲却无法被其他的job使用,直到所有的task都结束了才 会释放。
根据经验,一般来说可以使用一个cpu core启动一个JVM,假如服务器有16个cpu core,但是这个 节点,可能会启动32个 mapTask ,完全可以考虑:启动一个JVM,执行两个Task 。
3.4 并行执行
有的查询语句,Hive会将其转化为一个或多个阶段,包括:MapReduce阶段、抽样阶段、合并阶段、 limit阶段等。默认情况下,一次只执行一个阶段。但是,如果某些阶段不是互相依赖,是可以并行执行的。多阶段并行是比较耗系统资源的 。
一个 Hive SQL语句可能会转为多个MapReduce Job,每一个 job 就是一个 stage , 这些Job顺序执行,这个在 client 的运行日志中也可以看到。但是有时候这些任务之间并不是相互依赖的,如果集群资源允许的话,可以让多个并不相互依赖 stage 并发执行,这样就节约了时间,提高了执行速度,但是如 果集群资源匮乏时,启用并行化反倒是会导致各个 Job 相互抢占资源而导致整体执行性能的下降。启用 并行化:
##可以开启并发执行。
set hive.exec.parallei=true;
##同一个sql允许最大并行度,默认为8。
set hive.exec.paral1 el.thread.number=16;
3.5 推测执行
在分布式集群环境下,因为程序Bug(包括Hadoop本身的bug),负载不均衡或者资源分布不均等原因,会造成同一个作业的多个任务之间运行速度不一致,有些任务的运行速度可能明显慢于其他任务(比如一个作业的某个任务进度只有50%,而其他所有任务已经运行完毕),则这些任务会拖慢作业的整体执行进度。为了避免这种情况发生,Hadoop采用了推测执行(Speculative Execution)机制,它根据一定的法则推测出“拖后腿”的任务,并为这样的任务启动一个备份任务,让该任务与原始任务同时处理同一份数据,并最终选用最先成功运行完成任务的计算结果作为最终结果 。
# 启动mapper阶段的推测执行机制
set mapreduce.map.speculative=true;
# 启动reducer阶段的推测执行机制
set mapreduce.reduce.speculative=true;
设置开启推测执行参数:Hadoop 的 mapred-site.xml 文件中进行配置:
<property>
<name>mapreduce.map.speculative</name>
<value>true</value>
<description>lf true, then multiple i nstances of some map tasks may be executed i n parallel.</description>
</property>
<property>
<name>mapreduce.reduce.speculati ve</name>
<value>true</value>
<descri pti on>lf true, then multi ple i nstances of some reduce tasks may be executed in parallel.
</description>
</property>
Hive本身也提供了配置项来控制reduce-side的推测执行
<property>
<name>hive.mapped.reduce.tasks.speculative.executi on</name>
<value>true</value>
<description>whether speculative execution for reducers should be turned on. </description>
</property>
建议:
如果用户对于运行时的偏差非常敏感的话,那么可以将这些功能关闭掉。如果用户因为输入数据量很大而需要 执行长时间的MapTask或者ReduceTask的话,那么启动推测执行造成的浪费是非常巨大的。
3.6 Join 优化
Join优化整体原则:
1、优先过滤后再进行Join操作,最大限度的减少参与join的数据量
2、小表join大表,最好启动 mapjoin,hive自动启用mapjoin, 小表不能超过25M,可以更改
3.7 排序优化
order by 排序,最后只存在一个reduce,效率比较低。可以用sort by操作,然后结合distribute by作为reduce分区键。
3.7 union all
sql 逻辑中如果出现两张表union all 再插入一张表的情况时,可以使用 from ... insert into 示例
from all_student
insert into select * from A_student
insert into select * from B_student
3.8 distinct
Hive 3.0中新增了 count(distinct) 优化,通过配置 hive.optimize.countdistinct,即使真的出现数据倾斜也可以自动优化,自动改变SQL执行的逻辑。
3.小文件处理
产生小文件的原因
- 数据的频繁导入,例如 insert into 或者直接load,每次都会生成一个新的文件
- 分区或者桶表设计不合理
影响
- Hive 数据存储在HDFS中,过的小文件会导致Namenode 元数据变大,占用过多的内存,影响性能
- 对 hive 来说,在进行查询时,每个小文件都会当成一个块,启动一个Map任务来完成,而一个Map任务启动和初始化的时间远远大于逻辑处理的时间,就会造成很大的资源浪费。而且,同时可执行的Map数量是受限的。
处理
1. 使用 hive 自带的 concatenate 命令,自动合并小文件
#对于非分区表
alter table A concatenate;
#对于分区表
alter table B partition(day=20250101) concatenate;
1、concatenate 命令只支持 RCFILE 和 ORC 文件类型。 2、使用concatenate命令合并小文件时不能指定合并后的文件数量,但可以多次执行该命令。 3、当多次使用concatenate后文件数量不在变化,这个跟参数 mapreduce.input.fileinputformat.split.minsize=256mb 的设置有关,可设定每个文件的最小size。
也可以重新查询并插入当前表
INSERT OVERWRITE TABLE t_stu
SELECT * FROM t_stu;
过大的文件或者过小的文件,都会重新整理一次,这个方法并不是一劳永逸,因为依赖hive 的配置参数,参数设置不合理,小文件还是会多
2.调整参数减少Map数量
#执行Map前进行小文件合并
#CombineHiveInputFormat底层是 Hadoop的 CombineFileInputFormat 方法
#此方法是在mapper中将多个文件合成一个split作为输入
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; -- 默认
#每个Map最大输入大小(这个值决定了合并后文件的数量)
set mapred.max.split.size=256000000; -- 256M
#一个节点上split的至少的大小(这个值决定了多个DataNode上的文件是否需要合并)
set mapred.min.split.size.per.node=100000000; -- 100M
#一个交换机下split的至少的大小(这个值决定了多个交换机上的文件是否需要合并)
set mapred.min.split.size.per.rack=100000000; -- 100M
设置map输出和reduce输出进行合并的相关参数:
#设置map端输 出进行合并,默认为true
set hive.merge.mapfiles = true;
#设置reduce端输出进行合并,默认为false
set hive.merge.mapredfiles = true;
#设置合并文件的大小
set hive.merge.size.per.task = 256*1000*1000; -- 256M
#当输出文件的平均大小小于该值时,启动一个独立的MapReduce任务进行文件merge
set hive.merge.smallfiles.avgsize=16000000; -- 16M
启用压缩
# hive的查询结果输出是否进行压 缩
set hiv e.exec.compress.output=true;
# MapReduce Job的结果输出是否使用压 缩
set mapreduce.output.fileoutputformat.compress=true;
3. 减少Reduce的数量
reduce 的个数决定了输出的文件的个数,所以可以调整reduce的个数控制hive表的文件数量, #hive中的分区函数 distribute by 正好是控制MR中partition分区的, #然后通过设置reduce的数量,结合分区函数让数据均衡的进入每个reduce即可。
#设置reduce的数量有两种方式,第一种是直接设置reduce个数
set mapreduce.job.reduces=10;
#第二种是设置每个reduce的大小,Hive会根据数据总大小猜测确定一个reduce个数
set hive.exec.reducers.bytes.per.reducer=5120000000; -- 默认是1G,设置为5G
#执行以下语句,将数据均衡的分配到reduce中
set mapreduce.job.reduces=10;
insert overwrite table A partition(dt)
select * from B
distribute by rand();
解释:如设置reduce数量为10,则使用 rand(), 随机生成一个数 x % 10 ,
这样数据就会随机 进入 reduce 中,防止出现有的文件过大或过小
4.使用hadoop的archive将小文件归档
Hadoop Archive简称HAR,是一个高效地将小文件放入HDFS块中的文件存档工具,它能够将多个小文件打包成一个HAR文件,这样在减少namenode内存使用的同时,仍然允许对文件进行透明的访问
#用来控制归档是否可用
set hive.archive.enabled=true;
#通知Hive在创建归档时是否可以设置父目录
set hive.archive.har.parentdir.settable=true;
#控制需要归档文件的大小
set har.partfile.size=1099511627776;
#使用以下命令进行归档
ALTER TABLE A ARCHIVE PARTITION(dt='2020-12-24', hr='12');
对已归档的分区恢复为原文件
ALTER TABLE A UNARCHIVE PARTITION(dt='2020-12-24', hr='12');
- 归档的分区可以查看不能 insert overwrite,必须先 unarchive
- 归档文件是经过压缩的,在查询数据时,需要先对归档文件进行解压缩。这会增加一定的计算开销和时间成本,尤其是对于实时性要求较高的查询,可能会导致查询延迟增加
4.数据倾斜怎么解决
4.1 空值引发的数据倾斜
shuffle阶段的hash操作,只要key的hash结果是一样的,它们就会被拉到同一个reduce中
- 直接过滤
- 可以给null值随机赋值,例如使用case when then 赋值随机数
4.2不能数据类型引发的数据倾斜
对于两个表join,表a中需要join的字段key为int,表b中key字段既有string类型也有int类型。当按照key进行两个表的join操作时,默认的Hash操作会按int型的id来进行分配,这样所有的string类型都被分配成同一个id,结果就是所有的string类型的字段进入到一个reduce中,引发数据倾斜。
解决方案
如果key字段既有string类型也有int类型,默认的hash就都会按int类型来分配,那我们直接把int类型都转为string就好了,这样key字段都为string,hash时就按照string类型分配了:
SELECT *
FROM users a
LEFT JOIN logs b ON a.usr_id = CAST(b.user_id AS string);
4.3 不可拆分的大文件
文件使用GZIP压缩等不支持文件分割操作的压缩方式,在日后有作业涉及读取压缩后的文件时,该压缩文件只会被一个任务所读取。
解决方案:
使用其他压缩格式
4.4 数据膨胀引发的数据倾斜
在多维聚合计算时,如果进行分组聚合的字段过多,如下:
select a,b,c,count(1)from log group by a,b,c with rollup;
注:对于最后的 with rollup
关键字不知道大家用过没,with rollup是用来在分组统计数据的基础上再进行统计汇总,即用来得到group by的汇总信息。
如果上面的log表的数据量很大,并且Map端的聚合不能很好地起到数据压缩的情况下,会导致Map端产出的数据急速膨胀,这种情况容易导致作业内存溢出的异常。如果log表含有数据倾斜key,会加剧Shuffle过程的数据倾斜。
解决方案:
可以拆分上面的sql,将 with rollup
拆分成如下几个sql:
SELECT a, b, c, COUNT(1)
FROM log
GROUP BY a, b, c;
SELECT a, b, NULL, COUNT(1)
FROM log
GROUP BY a, b;
SELECT a, NULL, NULL, COUNT(1)
FROM log
GROUP BY a;
SELECT NULL, NULL, NULL, COUNT(1)
FROM log;
但是,上面这种方式不太好,因为现在是对3个字段进行分组聚合,那如果是5个或者10个字段呢,那么需要拆解的SQL语句会更多。
在Hive中可以通过参数 hive.new.job.grouping.set.cardinality
配置的方式自动控制作业的拆解,该参数默认值是30。表示针对grouping sets/rollups/cubes这类多维聚合的操作,如果最后拆解的键组合大于该值,会启用新的任务去处理大于该值之外的组合。如果在处理数据时,某个分组聚合的列有较大的倾斜,可以适当调小该值。
4.4. 表连接时引发的数据倾斜
解决方案:
4.4.1 启用 MapJoin
如果其中一个表非常小,可以将其加载到内存中,使用 MapJoin 来避免数据倾斜。在 Hive 中,可以通过设置参数来开启 MapJoin。
-- 开启自动 MapJoin
SET hive.auto.convert.join = true;
-- 设置小表的最大大小(默认值为 25MB)
SET hive.mapjoin.smalltable.filesize = 25000000;
SELECT /*+ MAPJOIN(small_table) */ *
FROM small_table
JOIN big_table
ON small_table.key = big_table.key;
4.4.2
对于倾斜的键,可以通过加盐的方式将其数据分散到多个 Reduce 任务中。具体做法是在倾斜的键上添加随机数,然后进行两次连接。
-- 第一次连接:对倾斜的键加盐
SELECT /*+ BROADCASTJOIN(small_table) */
CASE WHEN a.key IN ('skewed_key1', 'skewed_key2') THEN CONCAT(a.key, CAST(FLOOR(RAND() * 10) AS STRING)) ELSE a.key END AS key,
a.col1,
b.col2
FROM big_table a
JOIN small_table b
ON CASE WHEN a.key IN ('skewed_key1', 'skewed_key2') THEN CONCAT(a.key, CAST(FLOOR(RAND() * 10) AS STRING)) ELSE a.key END = b.key;
-- 第二次连接:去除盐
SELECT
REGEXP_REPLACE(key, '[0-9]+$', '') AS original_key,
col1,
col2
FROM (
-- 第一次连接的结果
SELECT /*+ BROADCASTJOIN(small_table) */
CASE WHEN a.key IN ('skewed_key1', 'skewed_key2') THEN CONCAT(a.key, CAST(FLOOR(RAND() * 10) AS STRING)) ELSE a.key END AS key,
a.col1,
b.col2
FROM big_table a
JOIN small_table b
ON CASE WHEN a.key IN ('skewed_key1', 'skewed_key2') THEN CONCAT(a.key, CAST(FLOOR(RAND() * 10) AS STRING)) ELSE a.key END = b.key
) subquery;