在《MapReduce Design Patterns》一书中,作者给出了Reduce-side Join的实现方法,大致步骤如下:
- 使用MultipleInputs指定不同的来源表和相应的Mapper类;
- Mapper输出的Key为Join的字段内容,Value为打了来源表标签的记录;
- Reducer在接收到同一个Key的记录后,执行以下两步:
- 遍历Values,根据标签将来源表的记录分别放到两个List中;
- 遍历两个List,输出Join结果。
具体实现可以参考这段代码。但是这种实现方法有一个问题:如果同一个Key的记录数过多,存放在List中就会占用很多内存,严重的会造成内存溢出(Out of Memory, OOM)。这种方法在一对一的情况下没有问题,而一对多、多对多的情况就会有隐患。那么,Hive在做Reduce-side Join时是如何避免OOM的呢?两个关键点:
- Reducer在遍历Values时,会将前面的表缓存在内存中,对于最后一张表则边扫描边输出;
- 如果前面几张表内存中放不下,就写入磁盘。
按照我们的实现,Mapper输出的Key是product_id
,Values是打了标签的产品表(Product)和订单表(Order)的记录。从数据量来看,应该缓存产品表,扫描订单表。这就要求两表记录到达Reducer时是有序的,产品表在前,边扫描边放入内存;订单表在后,边扫描边结合产品表的记录进行输出。要让Hadoop在Shuffle&Sort阶段先按product_id
排序、再按表的标签排序,就需要用到二次排序。
二次排序的概念很简单,将Mapper输出的Key由单一的product_id
修改为product_id+tag
的复合Key就可以了,但需通过以下几步实现:
自定义Key类型
原来product_id
是Text类型,我们的复合Key则要包含product_id
和tag
两个数据,并实现WritableComparable
接口:
1 | public class TaggedKey implements WritableComparable<TaggedKey> { |
可以看到,在比较两个TaggedKey时,会先比较joinKey(即product_id
),再比较tag
。
自定义分区方法
默认情况下,Hadoop会对Key进行哈希,以保证相同的Key会分配到同一个Reducer中。由于我们改变了Key的结构,因此需要重新编 写分区函数:
1 | public class TaggedJoiningPartitioner extends Partitioner<TaggedKey, Text> { |
自定义分组方法
同理,调用reduce函数需要传入同一个Key的所有记录,这就需要重新定义分组函数:
1 | public class TaggedJoiningGroupingComparator extends WritableComparator { |
配置Job
1 | job.setMapOutputKeyClass(TaggedKey.class); |
MapReduce过程
最后,我们在Mapper阶段使用TaggedKey,在Reducer阶段按照tag进行不同的操作就可以了:
1 |
|
遍历values时,开始都是tag=1的记录,之后都是tag=2的记录。以上代码可以在这里查看。
对于第二个问题,超过缓存大小的记录(默认25000条)就会存入临时文件,由Hive的RowContainer类实现,具体可以看这个链接。
需要注意的是,Hive默认是按SQL中表的书写顺序来决定排序的,因此应该将大表放在最后。如果要人工改变顺序,可以使用STREAMTABLE配置:
1 | SELECT /*+ STREAMTABLE(a) */ a.val, b.val, c.val FROM a JOIN b ON (a.key = b.key1) JOIN c ON (c.key = b.key1) |
但不要将这点和Map-side Join混淆,在配置了hive.auto.convert.join=true
后,是不需要注意表的顺序的,Hive会自动将小表缓存在Mapper的内存中。