ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

1

2022-01-04 11:04:23  阅读:123  来源: 互联网

标签: set join -- reduce hive key


1.1 Map端文件合并减少Map任务数量

一般来说,HDFS的默认文件块大小是128M,如果在Hive执行任务时,发现Map端的任务过多,且执行时间多数不超过一分钟,建议通过参数,划分(split)文件的大小,合并小文件。如:

1 set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
2 set mapreduce.input.fileinputformat.split.minsize=516000000; -- 516M
3 set mapreduce.input.fileinputformat.split.maxsize=1280000000; -- 1280M
4 set mapreduce.input.fileinputformat.split.minsize.per.node=516000000;
5 set mapreduce.input.fileinputformat.split.minsize.per.rack=516000000;

这样可以减小map的任务数,可以减少中间临时文件的产生,并且也较少reduce阶段的任务数和产生的文件数。

1.2 Reduce阶段数量调整
若指定mapred.reduce.tasks参数,则用该参数值;mapred.reduce.tasks默认值为-1,表示自动计算;

若未指定mapred.reduce.tasks,Hive会自动计算reduce个数,基于以下两个配置:

hive.exec.reducers.bytes.per.reducer:每个reduce任务处理的数据量,默认为1G
hive.exec.reducers.max:每个任务最大的reduce数,集群默认为50

reducer数的计算公式如下:

1 reduce_number = min(input_size / hive.exec.reducers.bytes.per.reducer, hive.exec.reducers.max)

建议一般使用下面2个配置进行reduce阶段任务数量控制:

set hive.exec.reducers.bytes.per.reducer = 1000000000; - (Generally, the default value of 1G is enough, generally no need to modify)
set hive.exec.reducers.max = 50; - (up to 100 reduce)

如果reduce阶段任务运行慢,可以考虑增加hive.exec.reducers.max的数量,但不宜过多,过多会产生小文件。

1.3 Reduce阶段内存调整

在我们减少Map端的任务数时 ,在Reduce阶段会存在任务数据量大的问题,可能会超过Reduce阶段默认的内存,从而导致OOM。

我们可以通过调整Reduce阶段的内存进行优化:

1 set mapreduce.reduce.memory.mb=8192; - The default value is 1024, the unit is M, here is set to 8192M
2 set mapreduce.reduce.java.opts=-Xmx6144m; - Set the memory of the Reduce phase jvm, generally 0.75 times the previous parameter

二、MapReduce压缩优化
2.1 Map端数据压缩,减少Reduce copy阶段时间
因为在Reduce过程,需要从Map阶段产生的文件的机器进行copy(非local机器)数据,这个过程是依赖网络带宽的,整个过程是非常缓慢,所以可以对Map端产生的文件进行压缩处理,降低带宽的限制。

1 set hive.exec.compress.intermediate = true; - Enable compression on intermediate data
2 set mapreduce.map.output.compress=true; - Whether to start compression (true/false)
3 set mapreduce.map.output.compress.codec = org.apache.hadoop.io.compress.SnappyCodec; - Specify Snappy compression format

当然在开启Map压缩,会有一定的CPU消耗,但MapReduce任务是IO型的任务,消耗掉CPU也不是很要求,如果是计算密集型的计算,不建议开启CPU(计算资源宝贵呀)。

2.2 Reduce端数据压缩,减小文件块大小,节约存储资源
1 set hive.exec.compress.output=true; - Enable hive final output data compression function
2 set mapreduce.output.fileoutputformat.compress=true; - Turn on mapreduce final output data compression
3 set mapreduce.output.fileoutputformat.compress.codec =org.apache.hadoop.io.compress.SnappyCodec; - Set the compression method of mapreduce final data output
4 set mapreduce.output.fileoutputformat.compress.type=BLOCK; - Set mapreduce final data output compression to block compression

join导致的数据倾斜
如果数据表的key本身分布不匀,在join时,特别容易出现数据倾斜的问题。一般出现该问题有2种情况:

3.1.1 小表与大表进行join

一般建议,在表与表进行join时,我们将小表放在左边效率会更高些。

如果小表足够小(如:一张维度表),是可以加载到内存中的。直接在Map端进行join,避免数据shuffle过程,导致数据倾斜问题。不过需要设置下面参数:

1 set hive.auto.convert.join = true; -- 将小表进行map-join操作
2 set hive.mapjoin.smalltable.filesize = 2500000;-- 默认值25M,如果表的大小小于此值就会被加载进内存中

也可以使用两个配置参数下面的2个配置,启用自动联接,不再需要在查询中提供 Map 联接提示。

1 set hive.auto.convert.join.noconditionaltask = true; -- 表示启用了自动转换
2 set hive.auto.convert.join.noconditionaltask.size = 10000000; 

3.1.2 两张同量级的表进行join

如果是2张同量级的表进行join产生数据倾斜问题,可以尽量从业务数据层面去解决,其次是调整Hive参数。

1.一般来讲,这种情况是某些key在join时,数据量大导致分布不匀,将key加上一个随机数在进行join。如:

1 select a.key as key,value1,value2 
2 from 
3 (select key,value1,concat(key,cast(rand() * 10 as int)) as key_1 
4 from table1) a
5 left join
6 (select key,value2,concat(key,cast(rand() * 10 as int)) as key_1
7 from table2) b
8 on a.key_1 = b.key_1;

或者是拆成2个部分,将key量大的部分过滤出来,在分别处理。

不过在此之前对数据先进行预处理,过滤些缺失值和无效值,防止这种数据导致数据倾斜。

2.通过Hive参数处理,设置参数:

1 set hive.optimize.skewjoin = true; -- 有数据倾斜的时候进行负载均衡
2 set hive.skewjoin.key = 100000; -- 默认是100000,默认的1个reduce 只处理1G 

hive.optimize.skewjoin参数是对join时,key超过设置的hive.skewjoin.key的数量是,就会将该key的数据存储至HDFS,然后运行一个map任务进行处理。防止数据倾斜。

与此类似的有group by操作的数据倾斜处理:hive.groupby.skewindata=true,不过这个参数不适用在多个维度distinct,不然会报错。

3.2 group by导致的数据倾斜
一般可以通过下面参数调节:

1 set hive.map.aggr=true; -- Map 端部分聚合,相当于Combiner
2 set hive.groupby.skewindata=true;

hive.groupby.skewindata=true时,生成的查询计划会有两个 MR Job。第一个 MR Job 中,Map 的输出结果集合会随机分布到 Reduce 中,每个 Reduce 做部分聚合操作,并输出结果,这样处理的结果是相同的 Group By Key 有可能被分发到不同的 Reduce 中,从而达到负载均衡的目的;第二个 MR Job 再根据预处理的数据结果按照 Group By Key 分布到 Reduce 中(这个过程可以保证相同的 Group By Key 被分布到同一个 Reduce 中),最后完成最终的聚合操作。

不过这个参数不适用在多个维度distinct,不然会报错。

四、动态分区插入优化
动态分区参数:

1 set hive.exec.dynamic.partition = true; -- 设置开启动态分区
2 set hive.exec.dynamic.partition.mode = nonstrict; --设置为非严格模式
3 set hive.exec.max.dynamic.partitions.pernode=100; -- 默认值 100
4 set hive.exec.max.dynamic.partitions = 1000; -- 设置动态分区最大个数
5 set hive.exec.max.created.files = 100000; --设置整个MR Job中最大可以创建多少个HDFS文件,默认值100000

当我们insert overwrite时,有时会插入动态分区中,当有的分区的数量非常大是,这时会导致reduce阶段的某task任务非常的慢,可以考虑使用分桶加随机数的方式,进行优化,如:

1 DISTRIBUTE BY 动态分区 + cast(rand()*10 as int) -- 生成reduce阶段
2 
3 示例:
4 insert overwrite table_name partiton(dt,country)
5 select value,dt,country from temp_table 
6 DISTRIBUTE BY dt,country,cast(rand() * 10 as int); -- 随机数可以根据数据倾斜程度设置

但有一个缺点,会导致中间文件的增多,不过中间文件,一般在跑完Job会进行删除(中间文件个数 = 动态分区数 * 随机数)。

ps: 严格模式下的动态分区插入,必须保证至少有一个静态分区。

标签:,set,join,--,reduce,hive,key
来源: https://www.cnblogs.com/blogwolf/p/15761687.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有