In DBMS world, it’s easy to generate a unique, auto-increment id, using MySQL’s AUTO_INCREMENT attribute on a primary key or MongoDB’s Counters Collection pattern. But when it comes to a distributed, parallel processing framework, like Hadoop Map-reduce, it is not that straight forward. The best solution to identify every record in such framework is to use UUID. But when an integer id is required, it’ll take some steps.
Solution A: Single Reducer
This is the most obvious and simple one, just use the following code to specify reducer numbers to 1:
1 | job.setNumReduceTasks(1); |
And also obvious, there are several demerits:
- All mappers output will be copied to one task tracker.
- Only one process is working on shuffel & sort.
- When producing output, there’s also only one process.
The above is not a problem for small data sets, or at least small mapper outputs. And it is also the approach that Pig and Hive use when they need to perform a total sort. But when hitting a certain threshold, the sort and copy phase will become very slow and unacceptable.
Solution B: Increment by Number of Tasks
Inspired by a mailing list that is quite hard to find, which is inspired by MySQL master-master setup (with auto_increment_increment and auto_increment_offset), there’s a brilliant way to generate a globally unique integer id across mappers or reducers. Let’s take mapper for example:
1 | public static class JobMapper extends Mapper<LongWritable, Text, LongWritable, Text> { |
The basic idea is simple:
- Set the initial id to current tasks’s id.
- When mapping each row, increment the id by the number of tasks.
It’s also applicable to reducers.
Solution C: Sorted Auto-increment Id
Here’s a real senario: we have several log files pulled from different machines, and we want to identify each row by an auto-increment id, and they should be in time sequence order.
We know Hadoop has a sort phase, so we can use timestamp as the mapper output key, and the framework will do the trick. But the sorting thing happends in one reducer (partition, in fact), so when using multiple reducer tasks, the result is not in total order. To achieve this, we can use the TotalOrderPartitioner.
How about the incremental id? Even though the outputs are in total order, Solution B is not applicable here. So we take another approach: seperate the job in two phases, use the reducer to do sorting and counting, then use the second mapper to generate the id.
Here’s what we gonna do:
- Use TotalOrderPartitioner, and generate the partition file.
- Parse logs in mapper A, use time as the output key.
- Let the framework do partitioning and sorting.
- Count records in reducer, write it with MultipleOutput.
- In mapper B, use count as offset, and increment by 1.
To simplify the situation, we assume to have the following inputs and outputs:
1 | Input Output |
Generate Partition File
To use TotalOrderpartitioner, we need a partition file (i.e. boundaries) to tell the partitioner how to partition the mapper outputs. Usually we’ll use InputSampler.RandomSampler class, but this time let’s use a manual partition file.
1 | SequenceFile.Writer writer = new SequenceFile.Writer(fs, getConf(), partition, |
So basically, the partitioner will partition the mapper outputs into three parts, the first part will be less than “12:00”, seceond part [“12:00”, “13:00”), thrid [“13:00”, ).
And then, indicate the job to use this partition file:
1 | job.setPartitionerClass(TotalOrderPartitioner.class); |
Use MutipleOutputs
In the reducer, we need to note down the row count of this partition, to do that, we’ll need the MultipleOutputs class, which let use output multiple result files apart from the default “part-r-xxxxx”. The reducer’s code is as following:
1 | public static class JobReducer extends Reducer<Text, Text, NullWritable, Text> { |
There’re several things to pay attention to:
- MultipleOutputs is declared as class member, defined in Reducer#setup method, and must be closed at Reducer#cleanup (otherwise the file will be empty).
- When instantiating MultipleOutputs class, the generic type needs to be the same as reducer’s output key/value class.
- In order to use a different output key/value class, additional setup needs to be done at job definition:
1 | Job job = new Job(getConf()); |
For example, if the output folder is “/tmp/total-sort/“, there’ll be the following files when job is done:
1 | /tmp/total-sort/count-r-00001 |
Pass Start Ids to Mapper
When the second mapper processes the inputs, we want them to know the initial id of its partition, which can be calculated from the count-*
files we produce before. To pass this information, we can use the job’s Configuration object.
1 | // Read and calculate the start id from those row-count files. |
Set the Input Non-splitable
When the file is bigger than a block or so (depending on some configuration entries), Hadoop will split it, which is not good for us. So let’s define a new InputFormat class to disable the splitting behaviour:
1 | public static class NonSplitableSequence extends SequenceFileInputFormat<NullWritable, Text> { |
And that’s it, we are able to generate a unique, auto-increment id for a sorted collection, with Hadoop’s parallel computing capability. The process is rather complicated, which requires several techniques about Hadoop. It’s worthwhile to dig.
A workable example can be found in my Github repository. If you have some more straight-forward approach, please do let me know.