Hive性能调优方式及技巧全面总结

  • 李然辉

  • 2022-03-17

  • 来源:

Hive是将符合SQL语法的字符串解析生成,可以在Hadoop上执行的MapReduce的工具。Hive的调优也是数据资产管理的一部分,具有降低数据资产成本,提高数据及时性等诸多收益。

 

优化的根本思想

尽早尽量过滤数据,减少每个阶段的数据量

减少job数

解决数据倾斜问题

 

1 数据剪裁

1.1 列剪裁

Hive 在读数据的时候,可以只读取查询中所需要用到的列,而忽略其它列。

 

示例:

Select a,b

From Q

Where e<10;

 

在实施此项查询中,Q 表有5 列(a,b,c,d,e),Hive 只读取查询逻辑中真实需要的3 列a、b、e,而忽略列c,d;这样做节省了读取开销,中间表存储开销和数据整合。

 

1.2 分区裁剪

在查询的过程中减少不必要的分区。

 

示例:

Select count(orderid)

From order_table

where to_date(sale_time)=‘2014-03-03’

and hour(to_date(sale_time))=10

 

--优化后的示例代码如下:

Select count(orderid)

From order_table

where dt=‘2021-09-16’

and to_date(sale_time)=‘2014-03-03’

and hour(to_date(sale_time))=10

 

2 JOB优化

2.1 减少JOB数

不论是外关联outer join还是内关联inner join,如果Join的key相同,不管有多少个表,都会合并为一个MapReduce任务。

 

示例1:1个JOB

SELECT a.val, b.val, c.val

FROM a

JOIN b ON (a.key= b.key1)

JOIN c ON (c.key= b.key1 )

 

示例2:2个JOB

SELECT a.val, b.val, c.val

FROM a

JOIN b ON (a.key= b.key1)

JOIN c ON (c.key= b.key2)

 

2.2 JOB输入输出优化

善用muti-insert、union all,不同表的union all相当于multiple inputs,同一个表的union all,相当map一次输出多条。

 

示例:

insert overwrite table tmp1

select ... from a where 条件1;

insert overwrite table tmp2

select ... from a where 条件2;

 

--优化后的示例代码如下:

from a

insert overwrite table tmp1

select ... where 条件1

insert overwrite table tmp2

select ... where 条件2;

 

3 JOIN操作优化

3.1 避免笛卡尔积

示例:

SELECT ….

from woa_all_device_info_hisA

left outer join(

select *

from woa_all_info_hisB

where (B.mobile<> 'unknown' or B.imsi<> 'unknown')

and B.imei<> 'unknown'

and B.pt = '$data_desc'

) C

on A.app_id= C.app_idand A.imei= C.imei

 

3.2 数据过滤

在JOIN前过滤掉不需要的数据。

 

示例:

SELECT a.val, b.val

FROM a

LEFT OUTER JOIN b

ON (a.key=b.key)

WHERE a.ds='2009-07-07'

AND b.ds='2009-07-07'

 

--优化后的示例代码如下:

SELECT a.val, b.val FROM

(select key,valfrom a where a.ds=‘2009-07-07’ ) x

LEFT OUTER JOIN

(select key,valfrom b where b.ds=‘2009-07-07’ ) y

ON x.key=y.key

 

3.3 小表放前原则

在编写带有join操作的代码语句时,应该将条目少的表/子查询放在Join操作符的左边。因为在Reduce 阶段,位于Join 操作符左边的表的内容会被加载进内存,载入条目较少的表可以有效减少OOM(out of memory)即内存溢出。所以对于同一个key来说,对应的value值小的放前,大的放后。

 

示例:

A表1W 行

B表100W 行

Select a.key,b.value

From a

join b

on a.key= b.key

 

3.4 Mapjoin

当小表与大表JOIN时,采用mapjoin,即在map端完成。同时也可以避免小表与大表JOIN 产生的数据倾斜。

 

示例:

SELECT /*+ MAPJOIN(b) */

a.key,

a.value

FROM a

join b on a.key= b.key

 

3.5 LEFT SEMI JOIN

B表有重复值的情况下left semi join 产生一条,join 会产生多条。

 

示例:

通过left outer join 实现in查询:

select a.key,a.value

from a left outer

join b on a.key=b.key

where b.keyis not null

 

通过left semi join 实现in查询:

SELECT a.key, ,a.value

FROM a

LEFT SEMI

JOIN b on (a.key= b.key)

 

通过join 实现in查询:

SELECT a.key, ,a.value

FROM a

JOIN b on (a.key= b.key)

 

限制条件:

只能在ON 子句中设置过滤条件,在WHERE子句、SELECT 子句或其他地方过滤都不行。

 

4 输入输出优化

4.1 合理使用动态分区

 

示例:

SET hive.exec.dynamic.partition=true;

SET hive.exec.dynamic.partition.mode=nonstrict;

create table lxw_test1 (

sndaidstring,

mobile string

) partitioned by (ptSTRING)

stored as rcfile;

insert overwrite table lxw_test1 partition (pt)

select sndaid,mobile,ptfrom woa_user_info_mes_tmp1

 

主分区为dynamic partition列,而副分区为static partition列是不允许的,例如partition(pt, class=‘034’);是不允许的。

 

4.2 union all 优化

利用hive对UNION ALL的优化的特性,hive对union all优化只局限于非嵌套查询。

 

示例:--3个JOB

select * from

(select ci,c2,c3 from t1 Group by c1,c2,c3

Union all

Select c1,c2,c3 from t2 Group by c1,c2,c3

) t3;

 

--优化后的示例代码如下:--1个JOB

select * from

(select * from t1

Union all

Select * from t2

) t3

Group by c1,c2,c3;

 

4.3 合理使用union all

不同表太多的union ALL,不推荐使用;

通常采用建临时分区表,将不同表的结果insert到不同的分区(可并行),最后再统一处理;

 

示例:

INSERT overwrite TABLE lxw_test(flag = '1')

SELECT sndaid,mobileFROM lxw_test1;

INSERT overwrite TABLE lxw_test(flag = '2')

SELECT sndaid,mobileFROM lxw_test2;

INSERT overwrite TABLE lxw_test(flag = '3')

SELECT sndaid,mobileFROM lxw_test3;

 

5 数据去重与排序

5.1 DISTINCT 与 GROUP BY

尽量避免使用DISTINCT 进行排重,特别是大表操作,用GROUP BY 代替。

 

示例:

Select distinct key from a

 

--优化后的示例代码如下:

Select key from a group by key

 

5.2 排序优化

只有order by 产生的结果是全局有序的,可以根据实际场景进行选择排序。

 

Order by 实现全局排序,一个reduce实现,由于不能并发执行,所以效率偏低。

 

Sort by 实现部分有序,单个reduce输出的结果是有序的,效率高,通常和DISTRIBUTE BY关键字一起使用(DISTRIBUTE BY关键字可以指定map 到reduce端的分发key)CLUSTER BY col1 等价于DISTRIBUTE BY col1 SORT BY col1 但不能指定排序规则为asc或者desc。

 

5.3 多粒度计算优化

按不同维度进行订单汇总。

 

示例: 4个JOB

select * from (

select ‘1',province,sum(sales) from order_tablegroup by province

union all

select ‘2',city,sum(sales) from order_tablegroup by city union all

select ‘3',county,sum(sales) from order_tablegroup by county

) df

 

--优化后的示例代码如下:2个JOB

select type,code,sum(sales) from (

select part.split('_')[1] as type,

part.split('_')[0] as code ,sales from order_tableLATERAL VIEW

explode(split(concat(province,'_1-',city,'_2-',county,'_3'),'-')) adTableAS part

) dfgroup by type,code

 

5.4 count(列)、count(*)和count(1)优化

严格意义上来说,count(列)、count(*)和count(1)三者不等价,count(列)只是针对列的计数,另外两者则是针对表的计数,当列不为 null 时,count(列)和另外两者一致,但是count(列)还会涉及字段的筛选,以及数据序列化和反序列化,所以 count(*)和 count(1)的性能会更占优。当然,在不同数据存储格式里,上面结论不一定成立。例如,在ORC文件中,count算子可以直接读取索引中的统计信息,三者最后的表现性能差异不大。

 

6 数据倾斜

任务进度长时间维持在99%(或100%),查看任务监控页面,发现只有少量(1个或几个)reduce子任务未完成。因为其处理的数据量和其他reduce差异过大。单一reduce的记录数与平均记录数差异过大,通常可能达到3倍甚至更多。最长时长远大于平均时长。

 

6.1 如何判断是否发生了数据倾斜?

将所有的作业分成两部分,然后比较这两部分的 task 数目以及其他一些相关的数据。具体的计算过程如下:

 

Let us define the following variables,

 

    deviation: the deviation in input bytes between two groups

    num_of_tasks: the number of map tasks

    file_size: the average input size of the larger group

 

    num_tasks_severity: List of severity thresholds for the number of tasks. e.g., num_tasks_severity = {10, 20, 50, 100}

    deviation_severity: List of severity threshold values for the deviation of input bytes between two groups. e.g., deviation_severity: {2, 4, 8, 16}

    files_severity: The severity threshold values for the fraction of HDFS block size. e.g. files_severity = { ⅛, ¼, ½, 1}

 

Let us define the following functions,

 

    func avg(x): returns the average of a list x

    func len(x): returns the length of a list x

    func min(x,y): returns minimum of x and y

    func getSeverity(x,y): Compares value x with severity threshold values in y and returns the severity.

 

We’ll compute two groups recursively based on average memory consumed by them.

 

Let us call the two groups: group_1 and group_2

 

Without loss of generality, let us assume that,

    avg(group_1) > avg(group_2) and len(group_1)< len(group_2) then,

 

    deviation = avg(group_1) - avg(group_2) / min(avg(group_1)) - avg(group_2))

    file_size = avg(group_1)

    num_of_tasks = len(group_0)

 

The overall severity of the heuristic can be computed as,

    severity = min(

        getSeverity(deviation, deviation_severity)

        , getSeverity(file_size,files_severity)

        , getSeverity(num_of_tasks,num_tasks_severity)

)

 

6.2 参数调节

hive.map.aggr = true

Map 端部分聚合,相当于Combiner

hive.groupby.skewindata=true

 

数据倾斜的时候进行负载均衡,当项设定为 true,生成的查询计划会有两个 MR Job。第一个 MR Job 中,Map 的输出结果集合会随机分布到 Reduce 中,每个 Reduce 做部分聚合操作,并输出结果,这样处理的结果是相同的 Group By Key 有可能被分发到不同的 Reduce 中,从而达到负载均衡的目的;第二个 MR Job 再根据预处理的数据结果按照 Group By Key 分布到 Reduce 中(这个过程可以保证相同的 Group By Key 被分布到同一个 Reduce 中),最后完成最终的聚合操作。

 

6.3 空值产生的数据倾斜

场景:如日志中,常会信息丢失的问题,比如日志中的 user_id,如果取其中的 user_id 和用户表中的user_id 关联,会碰到数据倾斜的问题。

解决方法1:user_id为空的不参与关联(红色字体为修改后)

select * 

from log a 

join users b 

on a.user_id is not null 

and a.user_id = b.user_id

union all

select * 

from log a 

where a.user_id is null;

解决方法2 :赋与空值分新的key值

select * 

from log a 

left outer join users b 

on case when a.user_id is null then concat(‘hive’,rand() ) else a.user_id end = b.user_id;

 

结论:方法2比方法1效率更好,不但IO少了,而且作业数也少了。解决方法1中 log读取两次,jobs是2。解决方法2 job数是1 。这个优化适合无效 id (比如 -99 , ’’, null 等) 产生的倾斜问题。把空值的 key 变成一个字符串加上随机数,就能把倾斜的数据分到不同的reduce上 ,解决数据倾斜问题。

 

6.4 不同数据类型关联产生数据倾斜

场景:用户表中user_id字段为int,log表中user_id字段既有string类型也有int类型。当按照user_id进行两个表的Join操作时,默认的Hash操作会按int型的id来进行分配,这样会导致所有string类型id的记录都分配到一个Reducer中。

解决方法:把数字类型转换成字符串类型

select * 

from users a 

left outer join logs b 

on a.usr_id = cast(b.user_id as string)

 

6.5 小表不小不大,怎么用 map join 解决倾斜问题

使用 map join 解决小表(记录数少)关联大表的数据倾斜问题,这个方法使用的频率非常高,但如果小表很大,大到map join会出现bug或异常,这时就需要特别的处理。

 

以下例子:

select *

from log a

left outer join users b

on a.user_id = b.user_id;

 

users 表有 600w+ 的记录,把 users 分发到所有的 map 上也是个不小的开销,而且 map join 不支持这么大的小表。如果用普通的 join,又会碰到数据倾斜的问题。

 

解决方法:

select /+mapjoin(x)/

from log a

left outer join (

select /+mapjoin(c)/d.

from (

select distinct user_id

from log ) c

join users d on c.user_id = d.user_id ) x

on a.user_id = b.user_id;

 

假如,log里user_id有上百万个,这就又回到原来map join问题。所幸,每日的会员uv不会太多,有交易的会员不会太多,有点击的会员不会太多,有佣金的会员不会太多等等。所以这个方法能解决很多场景下的数据倾斜问题。

 

6.6 如果确认人为建表疏忽造成的倾斜,考虑以下的优化方案:

1.采样log表,哪些user_id比较倾斜,得到一个结果表tmp1。由于对计算框架来说,所有的数据过来,他都是不知道数据分布情况的,所以采样是并不可少的。

 

2.数据的分布符合社会学统计规则,贫富不均。倾斜的key不会太多,就像一个社会的富人不多,奇特的人不多一样。所以tmp1记录数会很少。把tmp1和users做map join生成tmp2,把tmp2读到distribute file cache。这是一个map过程。

 

3.map读入users和log,假如记录来自log,则检查user_id是否在tmp2里,如果是,输出到本地文件a,否则生成的key,value对,假如记录来自member,生成的key,value对,进入reduce阶段。

 

4.最终把a文件,把Stage3 reduce阶段输出的文件合并起写到hdfs。

 

6.7 如果确认业务需要这样倾斜的逻辑,考虑以下的优化方案:

1.对于join,在判断小表不大于1G的情况下,使用map join

 

2.对于group by或distinct,设定 hive.groupby.skewindata=true

 

3.尽量使用上述的SQL语句调节进行优化

 

7 使用Hive元数据做监控

7.1 监控普通表存储的文件的平均大小

对于大的文件块可能导致数据在读取时产生数据倾斜,影响集群任务的运行效率。下面的代码是对大于两倍HDFS文件块大小的表。

 

--整体逻辑通过DBS找到对应库下面的表TBLS

--再通过找到每个表对应的表属性,取得totalsize和numFiles两个属性,前者表示文件大小,后者表示文件数量

select tbl_name,avgfilesize'fileSize(MB)'

from (

select tp.totalSize/(1024*1024)/numFiles avgfilesize,

TBL NAME

from DBS d

/*DBS的主键DB_ID*/

inner join TBLS t on d.DB_ID=t.DB_ID

left join(

select TBL ID,

/*每个表存储的文件个数*/

max(case PARAM_KEY when 'numFiles'

then PARAM_VALUE else 0 end) numFiles,

/*文件存储的大小*/

max(case PARAM KEY when 'totalSize'

then PARAM_VALUE else 0 end) totalSize

/*TABLE PARAMS 记录的表属性*/

from TABLE PARAMS

GROUP BY TBL ID

)tp on t.TBL ID=tp.TBL_ID

where d.`NAME`='数据库名,

and tp.numFiles is not null

and tp.numFiles>0

)a where avgfilesize> hdfs的文件块大小*2

 

7.2 监控分区存储的文件平均大小

大于两倍HDFS文件块大小的分区,示例如下:

--先用DBS关联TBLS表,TBLS表关联PARTITIONS表

PARTITION 表关联 PARTITION PARAMS

select tbl name,PART_NAME, avgfilesize'fileSize (MB) ' from (

select pp.totalSize/(1024*1024)/numFiles avgfilesize,

TBL NAME,

part.PART NAME

from DBS d

inner join TBLS t on d.DB ID=t.DB_ID

inner join `PARTITIONS` part on t.TBL _ID=part.TBL_ID

left join (

select PART_ID,

/*每个表存储的文件个数*/

max(case PARAM_KEY when 'numFiles'

then PARAM VALUE else O end) numFiles,

/*文件存储的大小*/

max(case PARAM_KEY when 'totalSize'

then PARAM_VALUE else O end) totalSize

/*TABLE_PARAMS 记录的表属性*/

from PARTITION_PARAMS

GROUP BY PART_ID

)pp on part.PART_ID=pp.PART_ID

where d.`NAME`='要监控的数据库名,

and pp.numFiles is not null

and pp.numFiles>0

) a where avgfilesize> hdfs的文件块大小*2

 

7.3 监控大表不分区的表

对于大数据量的表,如果不进行分区,意味着程序在读取

相同的数据时需要遍历更多的文件块。下面是监控该示例的代码:

/*监控大表不分区的表*/

select t.TBL_NAME '表名',d.`NAME`'库名',

totalSize/1024/1024/1024 '文件大小(GB)'

from DBS d

/*DBS的主键DB_ID*/

inner join TBLS t on d.DB_ID=t.DB_ID

inner join (

select TBL ID,

/*文件存储的大小*/

max(case PARAM_KEY when 'totalSize'

then PARAM_VALUE else 0 end) totalSize

/*TABLE_PARAMS 记录的表属性*/

from TABLEPARAMS

GROUP BY TBL_ID

 )tp on t.TBL ID=tp.TBL ID

left join(

select distinct TBL ID from`PARTITIONS`

) part on t.TBL ID=part.TBL ID

/*part.TBL_ID is null表示不存在分区*/

where d.`NAME`='需要监控的库名'

and part.TBL ID is nu11

/*数据量大于30GB的表*/

and totalSize/1024/1024/1024>30

 

7.4 监控分区数据不均匀的表

分区不均匀的数据,可能意味着自己的分区列设计存在问题,或者某个分区的数据写入业务有调整,导致数据急速上升或者下跌,这时我们需要做特别的关注。监控的示例如下:

select TBL_NAME,

max(totalSize),

min(totalSize),

avg(totalSize)

from (

select pp.totalSize,

TBL NAME,

part.PART_NAME

from DBS d

inner join TBLS t on d.DB_ID=t.DB_ID

inner join 'PARTITIONS' part on t.TBL_ID=part.TBL_ID inner join (

select PART_ID,

/*文件存储的大小*/

max(case PARAM_KEY when 'totalSize'

then PARAM_VALUE else 0 end)/1024/1024 totalSize

/*TABLE_PARAMS 记录的表属性*/

from PARTITION_PARAMS

GROUP BY PART_ID

)pp on part.PART_ID=pp.PART_ID

where d.`NAME`='default'

and pp.totalSize is not null

and pp.totalSize>0

) a group by TBL_NAME

having max(totalSize)>avg(totalSize)*5

 

总结

hive调优是比较大的专题,需要结合实际的业务,数据的类型,分布,质量状况等来实际的考虑如何进行系统性的优化,hive底层是mapreduce,所以hadoop调优也是hive调优的一个基础,hvie调优可以分为几个模块进行考虑,数据的压缩与存储,HSQL的优化,hive参数的优化,解决数据的倾斜等。可以利用hive元数据、YARN日志等数据开发相应的指标进行监控,例如,压缩格式是否规范、小文件占比、是否数据倾斜、是否僵尸任务等指标。

  • 观点
  • 案例
  • IT/互联网
  • 科研/技术服务
  • CTO
  • CDO
  • IT
  • hive
  • SQL

推荐

我要评论

李然辉

京东科技信息技术有限公司

数据资产管理团队负责人

京东科技信息技术有限公司 数据资产管理团队负责人

某大型互联网企业数据资产管理负责人,DAMA(国际数据管理协会)评选的中国十大数据治理专家,中国企业数据治理联盟成员,国家工程实验室特聘专家。合著的书籍有《变数·中国数字企业模型及实践》、《数据治理那些事》、《数据治理:工业企业数字化转型之道》等

相关文章