Hive性能调优方式及技巧全面总结
-
李然辉
-
2022-03-17
Hive是将符合SQL语法的字符串解析生成,可以在Hadoop上执行的MapReduce的工具。Hive的调优也是数据资产管理的一部分,具有降低数据资产成本,提高数据及时性等诸多收益。
优化的根本思想
尽早尽量过滤数据,减少每个阶段的数据量
减少job数
解决数据倾斜问题
1 数据剪裁
1.1 列剪裁
Hive 在读数据的时候,可以只读取查询中所需要用到的列,而忽略其它列。
示例:
Select a,b
From Q
Where e<10;
在实施此项查询中,Q 表有5 列(a,b,c,d,e),Hive 只读取查询逻辑中真实需要的3 列a、b、e,而忽略列c,d;这样做节省了读取开销,中间表存储开销和数据整合。
1.2 分区裁剪
在查询的过程中减少不必要的分区。
示例:
Select count(orderid)
From order_table
where to_date(sale_time)=‘2014-03-03’
and hour(to_date(sale_time))=10
--优化后的示例代码如下:
Select count(orderid)
From order_table
where dt=‘2021-09-16’
and to_date(sale_time)=‘2014-03-03’
and hour(to_date(sale_time))=10
2 JOB优化
2.1 减少JOB数
不论是外关联outer join还是内关联inner join,如果Join的key相同,不管有多少个表,都会合并为一个MapReduce任务。
示例1:1个JOB
SELECT a.val, b.val, c.val
FROM a
JOIN b ON (a.key= b.key1)
JOIN c ON (c.key= b.key1 )
示例2:2个JOB
SELECT a.val, b.val, c.val
FROM a
JOIN b ON (a.key= b.key1)
JOIN c ON (c.key= b.key2)
2.2 JOB输入输出优化
善用muti-insert、union all,不同表的union all相当于multiple inputs,同一个表的union all,相当map一次输出多条。
示例:
insert overwrite table tmp1
select ... from a where 条件1;
insert overwrite table tmp2
select ... from a where 条件2;
--优化后的示例代码如下:
from a
insert overwrite table tmp1
select ... where 条件1
insert overwrite table tmp2
select ... where 条件2;
3 JOIN操作优化
3.1 避免笛卡尔积
示例:
SELECT ….
from woa_all_device_info_hisA
left outer join(
select *
from woa_all_info_hisB
where (B.mobile<> 'unknown' or B.imsi<> 'unknown')
and B.imei<> 'unknown'
and B.pt = '$data_desc'
) C
on A.app_id= C.app_idand A.imei= C.imei
3.2 数据过滤
在JOIN前过滤掉不需要的数据。
示例:
SELECT a.val, b.val
FROM a
LEFT OUTER JOIN b
ON (a.key=b.key)
WHERE a.ds='2009-07-07'
AND b.ds='2009-07-07'
--优化后的示例代码如下:
SELECT a.val, b.val FROM
(select key,valfrom a where a.ds=‘2009-07-07’ ) x
LEFT OUTER JOIN
(select key,valfrom b where b.ds=‘2009-07-07’ ) y
ON x.key=y.key
3.3 小表放前原则
在编写带有join操作的代码语句时,应该将条目少的表/子查询放在Join操作符的左边。因为在Reduce 阶段,位于Join 操作符左边的表的内容会被加载进内存,载入条目较少的表可以有效减少OOM(out of memory)即内存溢出。所以对于同一个key来说,对应的value值小的放前,大的放后。
示例:
A表1W 行
B表100W 行
Select a.key,b.value
From a
join b
on a.key= b.key
3.4 Mapjoin
当小表与大表JOIN时,采用mapjoin,即在map端完成。同时也可以避免小表与大表JOIN 产生的数据倾斜。
示例:
SELECT /*+ MAPJOIN(b) */
a.key,
a.value
FROM a
join b on a.key= b.key
3.5 LEFT SEMI JOIN
B表有重复值的情况下left semi join 产生一条,join 会产生多条。
示例:
通过left outer join 实现in查询:
select a.key,a.value
from a left outer
join b on a.key=b.key
where b.keyis not null
通过left semi join 实现in查询:
SELECT a.key, ,a.value
FROM a
LEFT SEMI
JOIN b on (a.key= b.key)
通过join 实现in查询:
SELECT a.key, ,a.value
FROM a
JOIN b on (a.key= b.key)
限制条件:
只能在ON 子句中设置过滤条件,在WHERE子句、SELECT 子句或其他地方过滤都不行。
4 输入输出优化
4.1 合理使用动态分区
示例:
SET hive.exec.dynamic.partition=true;
SET hive.exec.dynamic.partition.mode=nonstrict;
create table lxw_test1 (
sndaidstring,
mobile string
) partitioned by (ptSTRING)
stored as rcfile;
insert overwrite table lxw_test1 partition (pt)
select sndaid,mobile,ptfrom woa_user_info_mes_tmp1
主分区为dynamic partition列,而副分区为static partition列是不允许的,例如partition(pt, class=‘034’);是不允许的。
4.2 union all 优化
利用hive对UNION ALL的优化的特性,hive对union all优化只局限于非嵌套查询。
示例:--3个JOB
select * from
(select ci,c2,c3 from t1 Group by c1,c2,c3
Union all
Select c1,c2,c3 from t2 Group by c1,c2,c3
) t3;
--优化后的示例代码如下:--1个JOB
select * from
(select * from t1
Union all
Select * from t2
) t3
Group by c1,c2,c3;
4.3 合理使用union all
不同表太多的union ALL,不推荐使用;
通常采用建临时分区表,将不同表的结果insert到不同的分区(可并行),最后再统一处理;
示例:
INSERT overwrite TABLE lxw_test(flag = '1')
SELECT sndaid,mobileFROM lxw_test1;
INSERT overwrite TABLE lxw_test(flag = '2')
SELECT sndaid,mobileFROM lxw_test2;
INSERT overwrite TABLE lxw_test(flag = '3')
SELECT sndaid,mobileFROM lxw_test3;
5 数据去重与排序
5.1 DISTINCT 与 GROUP BY
尽量避免使用DISTINCT 进行排重,特别是大表操作,用GROUP BY 代替。
示例:
Select distinct key from a
--优化后的示例代码如下:
Select key from a group by key
5.2 排序优化
只有order by 产生的结果是全局有序的,可以根据实际场景进行选择排序。
Order by 实现全局排序,一个reduce实现,由于不能并发执行,所以效率偏低。
Sort by 实现部分有序,单个reduce输出的结果是有序的,效率高,通常和DISTRIBUTE BY关键字一起使用(DISTRIBUTE BY关键字可以指定map 到reduce端的分发key)CLUSTER BY col1 等价于DISTRIBUTE BY col1 SORT BY col1 但不能指定排序规则为asc或者desc。
5.3 多粒度计算优化
按不同维度进行订单汇总。
示例: 4个JOB
select * from (
select ‘1',province,sum(sales) from order_tablegroup by province
union all
select ‘2',city,sum(sales) from order_tablegroup by city union all
select ‘3',county,sum(sales) from order_tablegroup by county
) df
--优化后的示例代码如下:2个JOB
select type,code,sum(sales) from (
select part.split('_')[1] as type,
part.split('_')[0] as code ,sales from order_tableLATERAL VIEW
explode(split(concat(province,'_1-',city,'_2-',county,'_3'),'-')) adTableAS part
) dfgroup by type,code
5.4 count(列)、count(*)和count(1)优化
严格意义上来说,count(列)、count(*)和count(1)三者不等价,count(列)只是针对列的计数,另外两者则是针对表的计数,当列不为 null 时,count(列)和另外两者一致,但是count(列)还会涉及字段的筛选,以及数据序列化和反序列化,所以 count(*)和 count(1)的性能会更占优。当然,在不同数据存储格式里,上面结论不一定成立。例如,在ORC文件中,count算子可以直接读取索引中的统计信息,三者最后的表现性能差异不大。
6 数据倾斜
任务进度长时间维持在99%(或100%),查看任务监控页面,发现只有少量(1个或几个)reduce子任务未完成。因为其处理的数据量和其他reduce差异过大。单一reduce的记录数与平均记录数差异过大,通常可能达到3倍甚至更多。最长时长远大于平均时长。
6.1 如何判断是否发生了数据倾斜?
将所有的作业分成两部分,然后比较这两部分的 task 数目以及其他一些相关的数据。具体的计算过程如下:
Let us define the following variables,
deviation: the deviation in input bytes between two groups
num_of_tasks: the number of map tasks
file_size: the average input size of the larger group
num_tasks_severity: List of severity thresholds for the number of tasks. e.g., num_tasks_severity = {10, 20, 50, 100}
deviation_severity: List of severity threshold values for the deviation of input bytes between two groups. e.g., deviation_severity: {2, 4, 8, 16}
files_severity: The severity threshold values for the fraction of HDFS block size. e.g. files_severity = { ⅛, ¼, ½, 1}
Let us define the following functions,
func avg(x): returns the average of a list x
func len(x): returns the length of a list x
func min(x,y): returns minimum of x and y
func getSeverity(x,y): Compares value x with severity threshold values in y and returns the severity.
We’ll compute two groups recursively based on average memory consumed by them.
Let us call the two groups: group_1 and group_2
Without loss of generality, let us assume that,
avg(group_1) > avg(group_2) and len(group_1)< len(group_2) then,
deviation = avg(group_1) - avg(group_2) / min(avg(group_1)) - avg(group_2))
file_size = avg(group_1)
num_of_tasks = len(group_0)
The overall severity of the heuristic can be computed as,
severity = min(
getSeverity(deviation, deviation_severity)
, getSeverity(file_size,files_severity)
, getSeverity(num_of_tasks,num_tasks_severity)
)
6.2 参数调节
hive.map.aggr = true
Map 端部分聚合,相当于Combiner
hive.groupby.skewindata=true
数据倾斜的时候进行负载均衡,当项设定为 true,生成的查询计划会有两个 MR Job。第一个 MR Job 中,Map 的输出结果集合会随机分布到 Reduce 中,每个 Reduce 做部分聚合操作,并输出结果,这样处理的结果是相同的 Group By Key 有可能被分发到不同的 Reduce 中,从而达到负载均衡的目的;第二个 MR Job 再根据预处理的数据结果按照 Group By Key 分布到 Reduce 中(这个过程可以保证相同的 Group By Key 被分布到同一个 Reduce 中),最后完成最终的聚合操作。
6.3 空值产生的数据倾斜
场景:如日志中,常会信息丢失的问题,比如日志中的 user_id,如果取其中的 user_id 和用户表中的user_id 关联,会碰到数据倾斜的问题。
解决方法1:user_id为空的不参与关联(红色字体为修改后)
select *
from log a
join users b
on a.user_id is not null
and a.user_id = b.user_id
union all
select *
from log a
where a.user_id is null;
解决方法2 :赋与空值分新的key值
select *
from log a
left outer join users b
on case when a.user_id is null then concat(‘hive’,rand() ) else a.user_id end = b.user_id;
结论:方法2比方法1效率更好,不但IO少了,而且作业数也少了。解决方法1中 log读取两次,jobs是2。解决方法2 job数是1 。这个优化适合无效 id (比如 -99 , ’’, null 等) 产生的倾斜问题。把空值的 key 变成一个字符串加上随机数,就能把倾斜的数据分到不同的reduce上 ,解决数据倾斜问题。
6.4 不同数据类型关联产生数据倾斜
场景:用户表中user_id字段为int,log表中user_id字段既有string类型也有int类型。当按照user_id进行两个表的Join操作时,默认的Hash操作会按int型的id来进行分配,这样会导致所有string类型id的记录都分配到一个Reducer中。
解决方法:把数字类型转换成字符串类型
select *
from users a
left outer join logs b
on a.usr_id = cast(b.user_id as string)
6.5 小表不小不大,怎么用 map join 解决倾斜问题
使用 map join 解决小表(记录数少)关联大表的数据倾斜问题,这个方法使用的频率非常高,但如果小表很大,大到map join会出现bug或异常,这时就需要特别的处理。
以下例子:
select *
from log a
left outer join users b
on a.user_id = b.user_id;
users 表有 600w+ 的记录,把 users 分发到所有的 map 上也是个不小的开销,而且 map join 不支持这么大的小表。如果用普通的 join,又会碰到数据倾斜的问题。
解决方法:
select /+mapjoin(x)/
from log a
left outer join (
select /+mapjoin(c)/d.
from (
select distinct user_id
from log ) c
join users d on c.user_id = d.user_id ) x
on a.user_id = b.user_id;
假如,log里user_id有上百万个,这就又回到原来map join问题。所幸,每日的会员uv不会太多,有交易的会员不会太多,有点击的会员不会太多,有佣金的会员不会太多等等。所以这个方法能解决很多场景下的数据倾斜问题。
6.6 如果确认人为建表疏忽造成的倾斜,考虑以下的优化方案:
1.采样log表,哪些user_id比较倾斜,得到一个结果表tmp1。由于对计算框架来说,所有的数据过来,他都是不知道数据分布情况的,所以采样是并不可少的。
2.数据的分布符合社会学统计规则,贫富不均。倾斜的key不会太多,就像一个社会的富人不多,奇特的人不多一样。所以tmp1记录数会很少。把tmp1和users做map join生成tmp2,把tmp2读到distribute file cache。这是一个map过程。
3.map读入users和log,假如记录来自log,则检查user_id是否在tmp2里,如果是,输出到本地文件a,否则生成的key,value对,假如记录来自member,生成的key,value对,进入reduce阶段。
4.最终把a文件,把Stage3 reduce阶段输出的文件合并起写到hdfs。
6.7 如果确认业务需要这样倾斜的逻辑,考虑以下的优化方案:
1.对于join,在判断小表不大于1G的情况下,使用map join
2.对于group by或distinct,设定 hive.groupby.skewindata=true
3.尽量使用上述的SQL语句调节进行优化
7 使用Hive元数据做监控
7.1 监控普通表存储的文件的平均大小
对于大的文件块可能导致数据在读取时产生数据倾斜,影响集群任务的运行效率。下面的代码是对大于两倍HDFS文件块大小的表。
--整体逻辑通过DBS找到对应库下面的表TBLS
--再通过找到每个表对应的表属性,取得totalsize和numFiles两个属性,前者表示文件大小,后者表示文件数量
select tbl_name,avgfilesize'fileSize(MB)'
from (
select tp.totalSize/(1024*1024)/numFiles avgfilesize,
TBL NAME
from DBS d
/*DBS的主键DB_ID*/
inner join TBLS t on d.DB_ID=t.DB_ID
left join(
select TBL ID,
/*每个表存储的文件个数*/
max(case PARAM_KEY when 'numFiles'
then PARAM_VALUE else 0 end) numFiles,
/*文件存储的大小*/
max(case PARAM KEY when 'totalSize'
then PARAM_VALUE else 0 end) totalSize
/*TABLE PARAMS 记录的表属性*/
from TABLE PARAMS
GROUP BY TBL ID
)tp on t.TBL ID=tp.TBL_ID
where d.`NAME`='数据库名,
and tp.numFiles is not null
and tp.numFiles>0
)a where avgfilesize> hdfs的文件块大小*2
7.2 监控分区存储的文件平均大小
大于两倍HDFS文件块大小的分区,示例如下:
--先用DBS关联TBLS表,TBLS表关联PARTITIONS表
PARTITION 表关联 PARTITION PARAMS
select tbl name,PART_NAME, avgfilesize'fileSize (MB) ' from (
select pp.totalSize/(1024*1024)/numFiles avgfilesize,
TBL NAME,
part.PART NAME
from DBS d
inner join TBLS t on d.DB ID=t.DB_ID
inner join `PARTITIONS` part on t.TBL _ID=part.TBL_ID
left join (
select PART_ID,
/*每个表存储的文件个数*/
max(case PARAM_KEY when 'numFiles'
then PARAM VALUE else O end) numFiles,
/*文件存储的大小*/
max(case PARAM_KEY when 'totalSize'
then PARAM_VALUE else O end) totalSize
/*TABLE_PARAMS 记录的表属性*/
from PARTITION_PARAMS
GROUP BY PART_ID
)pp on part.PART_ID=pp.PART_ID
where d.`NAME`='要监控的数据库名,
and pp.numFiles is not null
and pp.numFiles>0
) a where avgfilesize> hdfs的文件块大小*2
7.3 监控大表不分区的表
对于大数据量的表,如果不进行分区,意味着程序在读取
相同的数据时需要遍历更多的文件块。下面是监控该示例的代码:
/*监控大表不分区的表*/
select t.TBL_NAME '表名',d.`NAME`'库名',
totalSize/1024/1024/1024 '文件大小(GB)'
from DBS d
/*DBS的主键DB_ID*/
inner join TBLS t on d.DB_ID=t.DB_ID
inner join (
select TBL ID,
/*文件存储的大小*/
max(case PARAM_KEY when 'totalSize'
then PARAM_VALUE else 0 end) totalSize
/*TABLE_PARAMS 记录的表属性*/
from TABLEPARAMS
GROUP BY TBL_ID
)tp on t.TBL ID=tp.TBL ID
left join(
select distinct TBL ID from`PARTITIONS`
) part on t.TBL ID=part.TBL ID
/*part.TBL_ID is null表示不存在分区*/
where d.`NAME`='需要监控的库名'
and part.TBL ID is nu11
/*数据量大于30GB的表*/
and totalSize/1024/1024/1024>30
7.4 监控分区数据不均匀的表
分区不均匀的数据,可能意味着自己的分区列设计存在问题,或者某个分区的数据写入业务有调整,导致数据急速上升或者下跌,这时我们需要做特别的关注。监控的示例如下:
select TBL_NAME,
max(totalSize),
min(totalSize),
avg(totalSize)
from (
select pp.totalSize,
TBL NAME,
part.PART_NAME
from DBS d
inner join TBLS t on d.DB_ID=t.DB_ID
inner join 'PARTITIONS' part on t.TBL_ID=part.TBL_ID inner join (
select PART_ID,
/*文件存储的大小*/
max(case PARAM_KEY when 'totalSize'
then PARAM_VALUE else 0 end)/1024/1024 totalSize
/*TABLE_PARAMS 记录的表属性*/
from PARTITION_PARAMS
GROUP BY PART_ID
)pp on part.PART_ID=pp.PART_ID
where d.`NAME`='default'
and pp.totalSize is not null
and pp.totalSize>0
) a group by TBL_NAME
having max(totalSize)>avg(totalSize)*5
总结
hive调优是比较大的专题,需要结合实际的业务,数据的类型,分布,质量状况等来实际的考虑如何进行系统性的优化,hive底层是mapreduce,所以hadoop调优也是hive调优的一个基础,hvie调优可以分为几个模块进行考虑,数据的压缩与存储,HSQL的优化,hive参数的优化,解决数据的倾斜等。可以利用hive元数据、YARN日志等数据开发相应的指标进行监控,例如,压缩格式是否规范、小文件占比、是否数据倾斜、是否僵尸任务等指标。
- hive
- SQL
- 观点
- 案例
- IT/互联网
- 科研/技术服务
- CTO
- CDO
- IT
推荐

李然辉
京东科技信息技术有限公司
数据资产管理团队负责人
京东科技信息技术有限公司 数据资产管理团队负责人
某大型互联网企业数据资产管理负责人,DAMA(国际数据管理协会)评选的中国十大数据治理专家,中国企业数据治理联盟成员,国家工程实验室特聘专家。合著的书籍有《变数·中国数字企业模型及实践》、《数据治理那些事》、《数据治理:工业企业数字化转型之道》等
我要评论