Sunday, September 21, 2014

Hadoop: Leveraging the framework


Map-Reduce programming model has become very popular thanks to Hadoop. It provides a simple way to parallelize processing through the distribution of the computation to the nodes where the data is located. You are free from the hassle of traditional multi-threaded distributed programming (locks, signaling, etc..), since each process runs independently in his own virtual machine. This has also disadvantages, mainly the lack of communication between processes, Mappers and Reducers are completely oblivious of the status of the rest of processes. As a truly functional paradigm, the only input the Reducer process have are the intermediate output files from the map stage.

This can prove a difficulty for the parallelization of some algorithms or even render it infeasible. In other cases the limitations can come from the framework itself. For example, some information on the input must be shared by all the tasks and is not immediately available. With Hadoop this is can be often overcome using internal mechanisms or extending the framework, although not very straightforwardly sometimes.

Let's elaborate with an actual case:

We have a huge input text file in HDFS and we want to process it adding unique sequence ids to each line, This ids will start with 0 and will be sequentially increasing in steps of one, so for a file of N lines we will add to each line an id starting in 0 and ending in N-1. The output will be the same file but having each line prepended with the id.

Our requirements are:

- Input will be a text file.
- As said the sequential ids will start in 0 and end in N-1 (N being the total number of lines of the file) in increments on 1
- The output will be in the same order of the input, we don't want to shuffle lines.
- Only one map-reduce job will be used.
- Last and not least, the whole point of using Hadoop is to parallelize and scale so no one-reducer jobs.

Summarizing, we are numbering the lines of a file in a distributed fashion using a Hadoop job.

The Job

 

- At the mapper phase, get an id of the file split currently processed, we need this id to be unique and sequentially ordered starting with the first block of the file. Then iterate this split lines incrementing a counter and emitting as key a pair of (splitId, counter). and as value the pair (counter, line) . As you can see we repeat one field of the composite key on the composite value, we will need that for implementing a secondary sorting pattern. The mapper will group all the lines with the same splitId adding the sequence information we can later retrieve in the reducer to maintain the same ordering.

- In the mapper cleanup process we need to propagate the total number of lines of the processed split to all the reducers emitting a pair with key (splitId, currentsplit) and value (currentsplit, total count) for every split. We only emit the to the splits Id greater than the current one.  At the reducer stage we will need this previously calculated totals to set the initial sequence number.

 If we take the following splits:  
 Split 1:  
 A  
 B  
 C  
 Split2:  
 D   
 E  
 Mapper 1 emits: ((1,1) (1,A))  ((1,2) (2,B))  ((1,3) (3,C))  
 Then in the clean-up we inform the reducers processing the split 2 that the total count for split 1 is 3:  
 ((2,1)(1,3))  
 Similarly Mapper 2 emits:  
 ((2,1) (1,D))  ((2,2) (2,E))  
 In the clean-up process we don't need to emit anything, since this is the last split (2 of 2).  

-We apply a secondary sort pattern, grouping in the reducers for split id and receiving the pairs ordered by the counter sequence, we need also to identify pairs with the mapper calculated totals and direct them to each partition, more on that later:

- For each key the reducer checks the count totals coming from the splits with an id lesser than the current key id, sums them and begin writing to the output file the lines using that total as the start value and incrementing with steps of 1. If there are no splits with a lower id, we begin with 0 as the first sequence id.

 In the above example the reducers will receive two calls, (remember we group by split id and we have two splits):  
 (1, (1,A) (2,B) (3,C))  
 (2, (1,3) (1,D) (2,E))  
 For split 1 we have no previous split total so we start with 0 and write to the file:  
 (0, A)(1,B)(2,C)  
 for the next split 2, we have one key (1) lower than the current with a total of 3: (1,3), so we start on 3:  
 (3,D) (4,E)  

 

 Java code


You can find the source code in my github repository, There are three classes: two writable implementations for the composite key and value and the job implementation class including all the moving parts (mapper, reducer, partitioner...): SplitSequenceWritable.java, SequenceLineWritable.java, SequencerJob.java

The key points here are:

- We need to emit an unique sequential split Id in the mapper and we need also to know the total number of splits. 

 

We can do that extending the framework. The list of FileSplits for the input file is calculated in the getSplits method of the TextInputFormatClass using as parameter the job context, so we can extend that class and also the FileSplit class to create instances with the total count and the split sequence id information:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public static class SequencerFileSplit extends FileSplit
{
 private int totalSplits;
 private int splitId;

 SequencerFileSplit()
 {
  super();
 }

 SequencerFileSplit(FileSplit split, int splitId, int totalSplits) throws IOException
 {
  super(split.getPath(), split.getStart(), split.getLength(), split.getLocations());
  this.splitId = splitId;
  this.totalSplits = totalSplits;
 }

 @Override
 public void readFields(DataInput in) throws IOException
 {
  super.readFields(in);
  splitId = in.readInt();
  totalSplits = in.readInt();
 }

 @Override
 public void write(DataOutput out) throws IOException
 {
  super.write(out);
  out.writeInt(splitId);
  out.writeInt(totalSplits);
 }

//Rest of methods
}


We add our custom fields to the split, note that this is a writable object distributed in the cluster so you need to make sure the new fields are serialized and also provide an empty constructor.

Our implementation of getSplits simply calls to the parent method to retrieve all the splits information and returns them wrapped in out custom split with the id and total information. We are assuming here that the framework returns the splits ordered by start offset and that is actually true:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
public static class SequencerInputFormat extends TextInputFormat
{

 @Override
 public List<InputSplit> getSplits(JobContext arg0) throws IOException
 {
  List<InputSplit> splits = super.getSplits(arg0);
  int total = splits.size();
  List<InputSplit> result = new ArrayList<InputSplit>(total);
  int counter = 1;
  for (InputSplit split : splits)
  {
   result.add(new SequencerFileSplit((FileSplit) split, counter++, total));
  }
  return result;
 }
}


- We need to group the lines by splitIds with the same ordering of the input file. The output must maintain the initial order.

 

We are implementing a canonical secondary sort pattern with custom writable composite keys and values, a partitioner and a custom GroupComparator. The composite key containing splitid and sequence number means that we will group by splitId and order by the input sequence. We need to implement a groupcomparator by splitId only and a partitioner to send the pairs to the reducers based also on the splitId. To maintain the input file ordering the partitioner must send the adjacent splits to the same reducer. For example imagine we have 12 splits and 3 reducers, we want splits 1,2,3,4 to go to one reducer, 5,6,7,8 to another and 9,10,11,12 to the last one. To do that we need to know the total split count value in the partitioner. 
This class is not aware of the job configuration by default, but we can make Hadoop to inject it automatically implementing the Configurable interface:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public static class SequencerPartitioner extends Partitioner<SplitSequenceWritable, SequenceLineWritable>
  implements Configurable
{

 private Configuration conf;
 private int totalSplitNumber;

 @Override
 public int getPartition(SplitSequenceWritable key, SequenceLineWritable value, int numPartitions)
 {
  return ((key.getSplit() - 1) * numPartitions) / totalSplitNumber;
 }

 @Override
 public Configuration getConf()
 {
  return conf;
 }

 @Override
 public void setConf(Configuration conf)
 {
  this.conf = conf;
  totalSplitNumber = conf.getInt(TOTAL_SPLIT_NUMBER, 0);
 }
}


And now we can make it available in the job configuration, creating an instance of TextInputFormat to retrieve the value:

1
job.getConfiguration().setInt(TOTAL_SPLIT_NUMBER, new TextInputFormat().getSplits(job).size());


The reducer calculates the start number for out split id keys, remember we emitted that totals in the mapper as "meta-pairs" for all the splits:
 (split1, -currentsplit) (-1, currentSplitLineCount) 
 (split2, -currentsplit) (-1, currentSplitLineCount)
 (split3, -currentsplit) (-1, currentSplitLineCount)
.
.

As you see the sequence value corresponding to the total was emitted as -1 in the mapper stage. This is an easy way to identify an aggregation pair since all sequences are positives. Additionally the reducer will receive these totals as the first pairs since the iterable is ordered. Once we have the start sequence number we can iterate the rest of pairs (also coming in the right order thanks to the secondary sorting) containing the lines and number them incrementally:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
protected void reduce(SplitSequenceWritable key, Iterable<SequenceLineWritable> values,
  Context context) throws IOException, InterruptedException
{
 Iterator<SequenceLineWritable> iterator = values.iterator();
 long sequenceNumber = 0;
 SequenceLineWritable value = iterator.next();
 while (iterator.hasNext() && value.getSequence() < 0)
 {
  sequenceNumber += Long.valueOf(value.getLine());
  value = iterator.next();
 }
 sequence.set(sequenceNumber);
 line.set(value.getLine());
 context.write(sequence, line);
 sequenceNumber++;
 while (iterator.hasNext())
 {
  value = iterator.next();
  sequence.set(sequenceNumber);
  line.set(value.getLine());
  context.write(sequence, line);
  sequenceNumber++;
 }
}


We have extended the framework to be able to identify the current split in each stage (map, partitioner, comparator, reducer). This information needs to be calculated by Hadoop anyway and by making it available we gain insight and flexibility in our jobs. Since the pairs are grouped by split in the reducers, the data is uniformly distributed to avoid hot spots nodes allowing for a linear scaling. More generally this pattern could be used to apply transformations to the lines of a text file maintaining the ordering.