Tuesday, October 7, 2014

Pangool, The Hadoop Companion


Writing mapreduce jobs with Hadoop is no trivial task, the programming model has a lot of moving parts that have to work together: mapper, combiner, reducer and in most cases partitioner, comparators, writables and writablecomparable objects.

As you know, there are tools that provide a higher level of abstraction allowing us to define the jobs by using a declarative (Hive) or procedural (Pig) notation and then generating and launching the java mapreduce tasks transparently.

This is often enough in many cases, for example when we are executing analytical or exploratory queries against a data set, the performance of the map reduce job is not a priority, it is more important to be able to create these queries easily and with a familiar, sql-like language. Not everybody is a Java expert, after all.

In other cases performance can be a key requirement and you need to roll up your sleeves and write the java code directly. This is particularly important in complex processes that involve several chained jobs. If you can reduce the number of jobs, the optimization is worth the effort.

It would be nice to have a tool that help us to abstract from the more frequent and tedious mapreduce patterns without losing the flexibility and tuning capabilities of the Hadoop java api. Enter Pangool, a Java low level mapreduce api built on top of Hadoop. By implementing an intermediate Tuple-based schema it allow us to implement patterns like secondary sorting or reduce-side joins almost transparently in a declarative fashion. This tuple schema free us of the hassle of implementing custom writable objects and is leveraged via the provided tuple input and output formats to seamlessly build job pipelines. All of this is achieved without losing the flexibility and performance that Hadoop low level api provides.

To see an example of a job created with Pangool we can rewrite the job from the previous post. We will see how it simplifies the application of the secondary sort pattern and how we can still implement our own customizations (custom partitioner, input file format...)

The code is available at github

The first change we notice is the definition of tuple schemas:


1
2
3
4
5
6
7
8
private static final Schema INTERMEDIATE_SCHEMA = new Schema(
  "schema",
  Arrays.asList(new Field[] { Field.create(SPLIT_FIELD, Type.INT),
    Field.create(LINE_FIELD, Type.STRING, true),
    Field.create(SEQ_FIELD, Type.LONG), Field.create(TOTAL_FIELD, Type.LONG, true), }));

private static final Schema RESULT_SCHEMA = Mutator.subSetOf("mutated", INTERMEDIATE_SCHEMA,
  SEQ_FIELD, LINE_FIELD);

With these two lines we define the output of the mapper process (intermediate schema) and the output of the job (result schema). That's all, we don't need to create custom writables for the composite keys and values used in the secondary sort pattern  and we can support null values in a field to send the split total count with a line value of null  (the boolean set to true in the field definition indicates null support).

Note that we can easily reuse our original schema to define the output using the Mutator class.

Now we want to apply the secondary sort, ordering by sequence and grouping by split:

1
2
3
mr.addIntermediateSchema(INTERMEDIATE_SCHEMA);
mr.setGroupByFields(SPLIT_FIELD);
mr.setOrderBy(new OrderBy().add(SPLIT_FIELD, Order.ASC).add(SEQ_FIELD, Order.ASC));

We indicate the schema, group field, and the ordering, and it's finished, this is our implementation of the secondary sort pattern (This is in addition to the mapper and reducer of course) . Easy, don't you think?. And all of this without losing the flexibility of Hadoop low level api, we still provide our custom input format, and input split classes:

1
mr.addInput(new Path(input), new SequencerInputFormat(), new SequencerMap());

and the custom partitioner:

1
2
Job job = mr.createJob();
job.setPartitionerClass(SequencerPartitioner.class);

To retrieve the number of splits we have to use the classic api:

1
2
3
FileInputFormat.addInputPath(job, new Path(input));
job.getConfiguration().setInt(TOTAL_SPLIT_NUMBER,
  new TextInputFormat().getSplits(job).size());

There are some small additional code changes derived from the new api, for example to recover our custom split instance in the mapper:

1
2
SequencerFileSplit split = (SequencerFileSplit) ((TaggedInputSplit) context
  .getHadoopContext().getInputSplit()).getInputSplit();

But in overall we are still using the mapreduce model, just with schema defined tuples instead of key value pairs and we can still leverage the underlying framework,

I have used Pangool in several projects, and in my opinion it provides the right balance between abstraction and low level capabilities. The advantages are more evident when you need to chain a sequence of jobs and perform joins, that could be a subject for a future post. The paradigm shift to the tuple model is small since it is an evolution from the previous api, you still will take advantage of all your hard learned mapreduce skills.