本文为您介绍产生数据倾斜的场景、产生原因及相应的处理措施。

问题现象

查看Logview时,发现有少数Fuxi Instance处理的数据量远远超过其他Fuxi Instance处理的数据量,从而导致少数Fuxi Instance的运行时长远远超过其他Fuxi Instance的平均运行时长,进而导致整个任务运行时间超长,造成任务延迟。例如,在历年双11的离线任务中,会遇到很多由于数据倾斜而导致的问题(主要是数据延迟),一方面是由于双11处理的数据量可能是平时的很多倍,另一方面业务上的“爆款”也在客观上一定程度地加大了数据倾斜发生的概率。

产生原因&处理措施

产生该问题的可能原因及对应的处理措施如下。

产生原因 描述 处理措施
group by倾斜 group bykey分布不均匀。

例如,在某一节日期间,某个店铺的单品PV量达4千万以上,店铺PV量达8千万以上,导致根据商品和店铺的PV量计算IPV时,发生数据倾斜。

在执行SQL语句前,设置防倾斜参数set odps.sql.groupby.skewindata=true;,与SQL语句一起提交。
说明 当对常量执行group by操作时,设置该参数不会生效,需要您自行修改代码逻辑,避免对常量执行group by操作。
join倾斜 join onkey分布不均匀。
  • 如果join两边的表中有一张是小表,可以将join改为mapjoin来处理。
  • 对易产生倾斜的key用单独的逻辑来处理。例如两边表的key中有大量NULL数据会导致倾斜,需要在join前先过滤掉NULL数据或补上随机数,然后再进行join

    例如,某张表中,有大量未登录用户的访问记录(user_id为NULL),如果直接和用户表关联的话,会产生倾斜。这时候可以做如下处理:select * from table_a a left outer join table_b b on case when a.user_id is null then concat('dp_hive',rand() ) else a.user_id end = b.user_id;。通常情况下,可能倾斜的值不是NULL,而是有意义的数据,这时候就需要对这类数据进行单独处理。

count distinct倾斜 特殊值过多,常见于固定的特殊值比较多的场景,和join中易产生倾斜的key类似。 先过滤特殊值,在count结果的基础上加上特殊值的个数。或根据具体场景进行具体分析。
例如,统计用户日购买UV、周购买UV和月购买UV:
--该方式会导致数据倾斜。
select
     count(distinct if(num_alipay_1days>0,user_id,null)) as cat_users_1days ,
     count(distinct if(num_alipay_7days>0,user_id,null)) as cat_users_7days ,
     count(distinct if(num_alipay_30days>0,user_id,null)) as cat_users_30days
     from table_a t1 where ds='20200625';
--可改写为如下语句。
select
     count(if(num_user_1days=1,1,null)) as cat_users_1days ,
     count(if(num_user_7days=1,1,null)) as cat_users_7days ,
     count(if(num_user_30days=1,1,null)) as cat_users_30days
     from
     (select user_id ,
     if(sum(num_alipay_1days)>0,1,0) as num_user_1days,
     if(sum(num_alipay_7days)>0,1,0) as num_user_7days,
     if(sum(num_alipay_30days)>0,1,0) as num_user_30days
     from table_a
     where ds='20200625' group by user_id
     )tmp;
错误使用动态分区 在不需要使用动态分区的场景,使用了动态分区。例如,某段代码示例如下:
insert overwrite table table_a partition(dt)
select
    split_part(content,'\t',1) as nurl,
    ca_j_get_host(split_part(content,'\t',1)) as host,
    split_part(content,'\t',2) as stat,
    dt
    from table_b
    where dt='20200502';
在该示例中,没有必要使用动态分区,使用动态分区后,会启动Reduce Task,不仅浪费资源,还可能发生数据倾斜,应该使用固定分区,正确示例如下:
insert overwrite table table_a partition(dt='20200502')
select
    split_part(content,'\t',1) as nurl,
    ca_j_get_host(split_part(content,'\t',1)) as host,
    split_part(content,'\t',2) as stat
    from table_b
    where dt='20200502';
正确使用动态分区。如果必须使用动态分区,需要在执行SQL语句前,设置set odps.sql.reshuffle.dynamicpt=false;(默认值是False),与SQL语句一起提交。
未合理使用odps.sql.reducer.instances参数 例如,某段代码示例如下:
set odps.sql.reducer.instances = 931;
insert overwrite table  my_tableA  partition( pt )
select
  col1,
  col2,
  col3,
  col4,
  my_udf( col5) as pt
from
  my_tableB
where ds= '20200701' ;

在该示例中,my_udf( col5) as pt作为动态分区,而实际上my_udf( col5)只会有少数几个值,但odps.sql.reducer.instances却设置了931个,这意味着有大量Instance处理的数据量为0,会导致数据倾斜。

从代码中删除odps.sql.reducer.instances参数设置。