实际搞过离线数据处理的同学都知道,Hive SQL 的各种优化方法都是和数据倾斜密切相关的,所以我会先来聊一聊 “「数据倾斜」” 的基本概念,然后再在此基础上为大家介绍各种场景下的 Hive 优化方案。
从项目实际来说, join 相关的优化其实占据了 Hive 优化的大部分内容,而 join 相关的优化又分为 mapjoin 可以解决的 join 优化和 mapjoin 无法解决的 join 优化。
对于分布式数据处理来说,我们希望数据平均分布到每个处理节点,但是实际上由于业务数据本身的问题或者分布算法的问题,每个节点分配到的数据量很可能并不是我们预想的那样。
也就是说,只有待分到最多数据的节点处理完数据,整个数据处理任务才能完成,时分布式的意义就大打折扣 ,想想那个卡死的 99% 。
实际上,即使每个节点分配到的数据量大致相同,数据仍可能倾斜,比如考虑统计词频的极端问题,如果某个节点分配到的词都是一个词,那么显此节点需要的耗时将很长,即使其数据量和其他节点的数据量相同。
其实在实际 Hive SQL 开发的过程中, Hive SQL 性能的问题上实际只有一小部分和数据倾相关。
很多时候, Hive SQL 运行得慢是由开发人员对于使用的数据了解不够以及一些不良的使用习惯引起的。
「需要计算的指标真的需要从数据仓库的公共明细层来自行汇总么?」 是不是数据公共层团队开发的公共汇总层已经可以满足自己的需求?对于大众的、 KPI 相关的指标等通常设计良好的数据仓库公共层肯定已经包含了,直接使用即可。 「真的需要扫描这么多分区么?」 比如对于销售明细事务表来说,扫描一年的分区和扫描一周的分区所带来的计算、 IO 开销完全是两个量级,所耗费的时间肯定也是不同的。作为开发人员,我们需要仔细考虑业务的需求,尽量不要浪费计算和存储资源! 「尽量不要使用 select * from your_table 这样的方式,用到哪些列就指定哪些列。」 如 select coll, col2 from your_table ,另外, where 条件中也尽量添加过滤条件,以去掉无关的数据行,从而减少整个 MapReduce 任务中需要处理、分发的数据量 「输入文件不要是大量的小文件。」 Hive 的默认 Input Split 是 128MB (可配置),小文件可先合并成大文件。
在保证了上述几点之后,有的时候发现 Hive SQL 还是要运行很长时间,【企闻】外资减仓内资增仓:五粮液市值破万亿机构分甚至运行不出来, 这时就需要真正的 Hive 优化技术了!
group by 引起的倾斜主要是输入数据行按照 「group by 列分布不均匀」 引起的。
比如,假设按照供应商对销售明细事实表来统计订单数,那么部分大供应商的订单量显然非常多,而多数供应商的订单量就一般,由于 group by 的时候是按照供应商的 ID 分发到每个 Reduce Task ,那么此时分配到大供应商的 Reduce Task 就分配了更多的订单,从而导致数据倾斜。
此时Hive 在数据倾斜的时候会进行负载均衡,生成的查询计划会有两个 MapReduce Job。
第一个 MapReduce Job 中,Map 的输出结果集合会随机分布到 Reduce 中,每个Reduce 做部分聚合操作并输出结果。这样处理的结果是相同的 GroupBy Key 有可能被分布到不同的 Reduce 中,从而达到负载均衡的目的; 第二个 MapReduce Job 再根据预处理的数据结果,按照 GroupBy Key 分布到 Reduce 中(这过程可以保证相同的GroupBy Key被分布到同一个Reduce中),最后完成最终的聚合操作。
在 Hive 开发过程中,应该小心使用 count distinct ,因为很容易引起性能问题,比如下面的 SQL:
由于必须去重,因此 Hive 将会把 Map 阶段的输出全部分布到 Reduce Task 上,此时很容易引起性能问题。对于这种情况,可以通过先 group by 再 count 的方式来优化,优化后的 SQL 如下:
join 相关的优化主要分为 mapjoin 可以解决的优化 ( 即大表 join 小表) 和 mapjoin 无法解决的优化( 即大表 join 大表 )。大表 join 小表相对容易解决,大表 join 大表相对复杂和难以解决,但也不是不可解决的,只是相对比较麻烦而已。
首先介绍大表 join 小表优化 。仍以销售明细事实表为例来说明大表 join 小表的场景。
假如供应商会进行评级,比如(五星、四星、 两星、 一星),此时业务人员希望能够分析各供应商星级的每天销售情况及其占比。
但正如上述所言,现实世界的二八准则将导致订单集中在部分供应商上,而好的供应商的评级通常会更高,此时更加剧了数据倾斜的程度。如果不加以优化,上述 SQL 将会耗费很长时间,甚至运行不出结果!
通常来说,供应商是有限的,比如上千家、上万家,数据量不会很大,而销售明细事实表比较大,这就是典型的大表 join 小表问题,可以通过 mapjoin 的方式来优化,只需添加 mapjoin hint 即可,优化后的 SQL 如下:
mapjoin 优化是在 Map 阶段进行 join ,而不是像通常那样在 Reduce 阶段按照 join 列进行分发后在每个 Reduce 任务节点上进行 join ,不需要分发也就没有倾斜的问题,相反 Hive 会将小表全量复制到每个 Map 任务节点(对于本例是 dim_seller ,当然仅全量复制 b表 sql 指定的列),然后每个 Map 任务节点执行 lookup 小表即可。
A 表为一个汇总表,汇总的是卖家买家最近 N 天交易汇总信息,即对于每个卖家最近 N 天,其每个买家共成交了多少单、总金额是多少,我们这里 N 先只取 90 天,汇总值仅取成交单数 。A 表的字段有:buyer_id 、seller_id 和 pay_cnt_90d 。 B 表为卖家基本信息表,其中包含卖家的一个分层评级信息,比如把卖家分为 6 个级别:S0、S1、S2、S3、S4、S5、S6 。
但是此 SQL 会引起数据倾斜,原因在于卖家的二八准则。某些卖家 90 天内会有几百万甚至上千万的买家,但是大部分卖家 90 天内的买家数目并不多, join table_A 和table_B 的时候 ODPS 会按照 Seller_id 进行分发, table_A 的大卖家引起了数据倾斜。
「但是本数据倾斜问题无法用 mapjoin table_B 解决,因为卖家有超过千万条、文件大小几个GB ,超过了 mapjoin 表最大 1GB 的限制。」
此方案在一些情况下可以起作用,但很多时候还是无法解决上述问题,因为大部分卖家尽管 90 买家不多 ,但还是有一些的,过滤后的 B 表仍然很大。
将这些引起倾斜的值随机分发到Reduce,其主要核心逻辑在于 join 时对这些特殊值concat 随机数,从而达到随机分发的目的。核心逻辑如下:
Hive已对此进行了优化,不需要修改SQL,只需要设置参数;比如 table_B 的值 0 和 1 引起倾斜,只需要如下设置:
是建立一个numbers表,其值只有一列int行,比如从1到10(具体根据倾斜程度确定),然后放大B表10倍,再取模join。
思路核心在于:既然按照seller_id分发会倾斜,那么再人工增加一列进行分发,这样之前倾斜的值的倾斜程度会减少为原来的1/10。可以通过配置numbers表修改放大倍数来降低倾斜程度,但弊端就是B表会膨胀N倍。
首先需要知道大卖家的名单,即先建立一个临时表动态存放每日最新的大卖家(比如dim_big_seller),同时此表的大卖家要膨胀预先设定的倍数(比如1000倍)。
在A表和 B表中分别新建一个 join 列,其逻辑为:如果是大卖家,那么 concat 一个随 机分配正整数(0到预定义的倍数之间,本例为0~1000 );如果不是,保持不变。
相比通用方案,专用方案的运行效率明显好了很多,因为只是将B表中大卖家的行数放大了 1000 倍,其他卖家的行数保持不变,但同时也可以看到代码也复杂了很多,而且必须首先建立大卖家表。
实际上方案 2 和 3 都用到了一分为二的思想,但是都不彻底,对于 mapjoin 不能解决的 问题,终极解决方案就是动态一分为 ,即对倾斜的键值和不倾斜的键值分开处理,不倾 斜的正常 join 即可,倾斜的把它们找出来然后做 mapjoin ,最后 union all 其结果即可。
总结起来,方案 1、2 以及方案 3 中的通用方案不能保证解决大表 join 大表问题,因为它们都存在种种不同的限制和特定的使用场景。
而方案 3 的专用方案和方案 4 是比较推荐的优化方案,但是它们都需要新建一个临时表来存放每日动态变化的大卖家 。
相对方案 4 来说,方案 3 的专用方案不需要对代码框架进行修改,但是 B 表会被放大,所以一定要是维度表,不然统计结果会是错误的 。方案 4 的解决方案最通用,自由度最高,但是对代码的更改也最大,甚至需要更改代码框架,可作为终极方案来使用。

企业资料通过认证