Monday, December 8, 2014

Writing an Elasticsearch river


I've been recently working a lot with elasticsearch, the enterprise search engine based on lucene.  It is a very versatile tool. Although originally conceived to index documents or web sites to provide full text search capabilities, it is widely used in other use cases like logs aggregation or structured data indexing to leverage the flexibility its query language and aggregation engine.

An interesting feature of elasticsearch is the possibility of writing custom plugins in Java and deploy them in the cluster. A particular type of plugin, the river, allows us to move the data indexing process to the cluster using a polling approach: the river runs inside the elastiscsearch node retrieving and indexing the data from our repository instead of an external client implementation in a separate process that pushes the data to the cluster.

Writing a river is not a very difficult task. I recently wrote a river for indexing oracle coherence caches, you can find it in my repository. I will try to highlight the main steps I took  using it as example.

The River is a type of plugin and must be loaded by the node when it starts, so we need our entry point, the CoherenceRiverPlugin class to extend the AbstractPlugin class. We need to provide a name and a description of the plugin and, to let Elasticsearch know there is a new plugin, we must provide the plugin implementation class name in a mandatory file named es-plugin.properties.


plugin=org.elasticsearch.plugin.river.coherence.CoherenceRiverPlugin


When the node starts it loads the plugins using reflection from the plugins directory (Can be configured) and, if enabled, from the classpath.  It also looks for a onModule method on each plugin implementation to invoke it, injecting a module parameter in the call. In our case we provide an implementation that expects the injection of the RiverModule so we can register our river:

public class CoherenceRiverPlugin extends AbstractPlugin
{

 @Override
 public String name()
 {
  return "river-coherence";
 }

 @Override
 public String description()
 {
  return "Coherence river plugin";
 }

 public void onModule(RiversModule module)
 {
  module.registerRiver("coherence", CoherenceRiverModule.class);
 }

}

In fact we could register several types of modules in our plugin,  this "module" step is there due the way the underlying dependency injection framework Guice works. The implementation of the module binds our custom implementation: CoherenceRiver to the River interface:

public class CoherenceRiverModule extends AbstractModule
{

 @Override
 protected void configure()
 {
  bind(River.class).to(CoherenceRiver.class).asEagerSingleton();
 }

}

Now we arrive to the actual River implementation: CoherenceRiver. This class implements the interface River, extends the class AbstractRiverComponent and is instantiated by dependency injection when we register our river or, if already registered, when the node starts. The Framework looks for a constructor annotated with Inject to pass the desired context parameter (like a client entry point to the node, river settings ...) or a parameter-less public constructor. In our constructor we gather and store the initialization settings.


protected CoherenceRiver(RiverName riverName, RiverSettings settings, Client client)
{
 super(riverName, settings);
 this.client = client;
...
}

Since we are implementing the River interface we must implement the start and stop methods. The start method is also called when registering the river or, if previously registered, when the node starts. This method contains the logic of our river, it registers a listener to the configured cache and performs a initial query to get the current values, results are coalesced by key on an intermediate structure. We receive all the current stored keys plus the new insertions or deletions via this listener and request the actual value to index them using a configurable BulkProcessor to throttle our inputs. Don't forget that by using the river the data loading process is being performed inside the elasticsearch node virtual machine.

In this case the river keeps running and updating the index with the changes to the cache. Other rivers perform an initial import and stop or even can be automatically removed to avoid subsequent executions.

Similarly we implement the stop method to release the river resources when the river is removed of the node is stopped.

Each river is a singleton inside the cluster, if a node executing the river fails it is started in another one. It's up to you to provide a resume mechanism or re-synchronize the index if needed. Note that this makes the river a fault tolerant component but not an scalable one, since only one instance exists in the cluster the load cannot be distributed. Nonetheless, you can create several river instances to index independent stores inside of a repository like different tables in a relational databases or key/value caches in an in-memory data grid.

To register our river we create a document entry in the special system index "_river", using a custom label as type and the string "_meta" as id, we pass the parameters in json format in the request body. In particular the field type tells elasticsearch the specific river plugin we are registering. We can create different rivers of the same type with different initial parameters by using different index types.


curl -XPUT localhost:9200/_river/coherence_river/_meta -d '{
  "type":"coherence",
  "coherence":
    {
     "cache":"TEST_CACHE",
     "query":"key() between 500 and 800"
    }
 }'

Testing our River

 

Elasticsearch provides a powerful test framework  that we can use simply by extending the class ElasticSearchIntegrationTest. By doing that a local cluster is started and we can access it in our test using an inherited instance client. There are some testing dependencies needed in our pom.xml file described in the documentation. In my experience I had to add additional artifacts, you can find the pom file in the repository.

I faced some difficulties when developing the unit tests:

To test the river I needed it to be retrieved from the classpath and I found that by default the nodes started by the test framework only searched for plugins in the default plugins folder ignoring the classpath.

I had to set the load_classpath_plugins property to true using the framework nodeSettings method:


@Override
protected Settings nodeSettings(int nodeOrdinal)
{
 return ImmutableSettings.builder().put("plugins.load_classpath_plugins", "true").build();
}

Then I found out that with the default global cluster setting this method was not triggered so I set the cluster scope to suite and it started working:


@ClusterScope(scope = Scope.SUITE, numDataNodes = 2)
public class CoherenceRiverTest extends ElasticsearchIntegrationTest

Lastly to make sure that the river index is created as a precondition for your test and assertions logic don't forget to use the ensureGreen method:

ensureGreen(index);

Final Thoughts


All in all, and despite some minor difficulties, the implementation of an elasticsearch river is pretty straightforward and the powerful elasticsearch test framework simplifies greatly the automatic testing.  To summarize the main points:

Advantages:
  • Rivers are simple to embed and test in the elasticsearch framework. The business logic is similar to the stand alone client case.
  • By leveraging the elasticsearch cluster infrastructure you get fault tolerance for free. If the node fails, the river is started in another one.
  • You avoid monitoring and managing a separate process.
  • Rivers connect directly to the repository to index, so you usually save a network hop comparing to the external client that first have to retrieve the data from the repository and then push it to the cluster.
Disadvantages:
  • They are less flexible than an external client, you have to use java.
  • Rivers are deployed as singletons in the cluster, you cannot distribute the load to different nodes, thus, rivers don't scale. On the other hand a separate pushing client could be made to scale but it is a complex task, and on most of use cases a node is enough.
  • You are running your custom data loading process inside the elasticsearch node. This can penalize the node performance, in most cases the common cpu demanding pattern is the json serialization inside the river so make sure you use a good performing library.  It is also advisable to implement a configurable throttling mechanism like the BulkProcessor.

I hope this post gives you some insights on the implications, pros and cons of using a river.

Saturday, November 15, 2014

Distributed Storage Concepts in Vertica and Cassandra


In this post I want to explore the topic of distributed storage by comparing two different products: Apache Cassandra, an open source operational database system and Hp Vertica a proprietary analytic database system.

A distributed database is a computer network where information is distributed and stored in several nodes.

Cassandra is a column-oriented distributed storage system with no single point of failure capable of scaling out to hundred an even thousand of nodes, highly available and resilient to network splits. It is  designed to support a high ratio of random writes without sacrificing the read efficiency providing eventual consistency.

I've had previous experience working with Cassandra but recently I became involved  in a project where we needed a distributed storage solution to store time series and perform analytic queries on the data. While comparing different products we came across Hp Vertica.

Vertica is the commercial version of the C-Store columnar storage system, an academic collaboration between different universities.

What sets Vertica apart from the new batch of NoSql distributed storage solutions, is that it is actually an RDBMS, ACID and fully SQL compliant and capable of scale out to large clusters. There are deployed production clusters in the range of hundreds of nodes and over the petabyte size. Vertica is an analytic database, and as such is designed to ingest great amounts of data in batch processes and support a relatively low number of very complex read transactions.

The two products are share-nothing, elastic, scale-out architectures.


The Consistent Hash Ring


When we think about a distributed storage solution, our first step is to define the sharding strategy we are going to use to split the data among nodes of the cluster.

The simpler solution could be to arbitrarily map ranges of keys to nodes. This is actually a working approach used in some systems but it has disadvantages, namely the need to store and maintain metadata tables (one per objetct/table/collection key type) with the mapping of ranges to nodes. It would probably need a central point of coordination.

We can also simply apply the function -key value- modulo -number of nodes- and the result would be the node containing the key.  Since we need an integer value and also to avoid hotspots caused by the skew of our key distribution first we will need to apply a hash function with a good mixing behavior to the key: hash(key value) modulo -number of nodes-. A good candidate can be MD5 hash. Now we have a good performing algorithm that distributes the data evenly in our cluster.

What happens if we add a new requirement?. We need our cluster to be elastic, nodes can be added or removed as needed and the data must be re-balanced in such cases. With our current strategy when we add or remove nodes, we will need to re-hash an move a huge amount of data. We need a strategy decoupled from the number of nodes.

We already introduced the hash function to make or solution independent of the key types and skew. Now we introduce the concept of consistent hashing: we to apply the same hash function we use for the key value to the node id (the id can be an unique property like the ip address) . We map each node as a value in the hash range, since this range, although huge, is limited we can handle it as if it wraps up in a circular way forming a ring: when we reach the maximum value the next one is 0.  We have our nodes represented as points inside the ring. To find the node corresponding to a key, we apply the hash function to the key so we get also the point representation of the key in the ring, then we move clockwise until we find the following node point and that's the one storing the data corresponding to this key. The consistency means that when we add or remove a node we only need to relocate the data stored on it.

This solution is considered the canonical consistent hash ring strategy. The node is randomly assigned a point in the ring by applying the hash function to the node id, the risk of collision with another node is negligible, the trade off is that we don't know what will be the range assigned to a node, and there could be great size differences, creating hot spots. This is particularly patent with a small number of nodes. Ranges tend to even up with a large number of nodes due to the good mixing behavior of the hash function. 

The problem of load balancing in the cluster can be resolved using the virtual nodes strategy. We split the hash range or continuum in a big fixed number of slices of the same size called virtual nodes. The keys are hashed and assigned to a virtual node by applying the modulo function. The same number vnodes are assigned to the physical nodes thus avoiding hot spots. We can even assign more vnodes to the most powerful server if out cluster is not homogeneous. The mapping between nodes an vnodes is maintained in a metadata table. This is an hybrid of the two first proposals. The distribution depends on the number of vnodes, but it is a fixed value during all the life of the cluster. The mapping metadata table to maintain is unique.

Cassandra Partitioner

Cassandra initially used a partitioner to decide the node the data is stored on. (In fact the system even allows to partition the data arbirtrarily following the key ordering sequence although is strongly not recommended.

From the documentation:

"A partitioner determines how data is distributed across the nodes in the cluster (including replicas). Basically, a partitioner is a hash function for computing the token (it's hash) of a row key. Each row of data is uniquely identified by a row key and distributed across the cluster by the value of the token"

At first Cassandra followed the consistent hash ring with pseudo random generated token ids strategy. As we saw previously this solution leads to a load balancing problem in the cluster. Beginning with version 1.2 Cassandra introduces vnodes:

"Prior to version 1.2, you had to calculate and assign a single token to each node in a cluster. Each token determined the node's position in the ring and its portion of data according to its hash value. Starting in version 1.2, Cassandra allows many tokens per node. The new paradigm is called virtual nodes (vnodes). Vnodes allow each node to own a large number of small partition ranges distributed throughout the cluster. Vnodes also use consistent hashing to distribute data but using them doesn't require token generation and assignmen"

In the latest releases the virtual node strategy was adopted.

Vertica Segmentation


Vertica reserve the use of the partitioning concept to local intra-node tuple segregation to distinguish from inter-node segregation.

From the original Vertica architecture paper:

"Vertica applies a default hash function to the columns chosen as segmentation keys, This function distributes the data using a normal statistical distribution. The node on which the tuple is stored is determined by this hash. The whole range of hash values is divided between the number of nodes, and each node is assigned a range beginning with the previous node maximum and covering the maxInt/numNodes (max Integer value is 2^64) following values."

This is the second approach we described, this is dependent on the number of nodes, and any node joining or leaving the cluster force us to relocate almost all the data.

Lately Vertica introduced a new feature called elastic cluster:

"To help make data re balancing due to cluster scaling more efficient, HP Vertica locally segments data storage on each node so it can be easily moved to other nodes in the cluster. When a new node is added to the cluster, existing nodes in the cluster give up some of their data segments to populate the new node and exchange segments to keep the number of nodes that any one node depends upon to a minimum"

"The alternative to elastic cluster is to re-segment all of the data in the projection and redistribute it to all of the nodes in the database evenly any time a node is added or removed. This method requires more processing and more disk space, since it requires all of the data in all projections to essentially be dumped and reloaded."

These local segments correspond to the virtual nodes strategy.

Conclusion


It seems that we have a winner:The virtual nodes solution. Vertica and Cassandra arrived to the same strategy to distribute the data in the cluster. In fact, this solution is widely used in similar systems like Voldemort or DynamoDb.

Revisiting the CAP theorem


You are probably familiar with the CAP theorem, in brief it states that a distributed system cannot simultaneously guarantee the following properties instead it must pick two of the three and neglect the third:

  • Consistency. All nodes see the same data at the same time.
  • Availability. Every request receive a response either success or fail.
  • Partition. The system is resilient to network partitions and continues operating in such cases (split brain situation).
According to this, systems can be classified as CA, CP or AP.

The first time I heard of it I remember thinking that A was not possible in a real distributed systems (see the fallacies of distributed computing) and that A and P seemed to somewhat overlap.

Later I came across this interesting post by Coda Hale that re-explains the theorem in a more sensible way:

Distributed Systems are defined by the property they choose to guarantee when there is a network partition: Consistency or Availabilty. Systems can be:

CP: The system chooses consistency over availability on a network partition, I the event of a network split the system stops working or return error to all requests.
AP: The system chooses availabilty over consistency on a network partition, that means that nodes can still being giving service indepently. Once the connection is recovered, a synchronizaton mechanism restores a consistent view. Take in account an AP system is giving up strong consistency in favor of soft or eventual consistency for all its operational life to avoid a service outage in the split network exceptional situation.

The corollary is that a distributed system cannot be CA and guarante both consistency and availability in the event of a partition. Only a not-distributed system (a unique node) can be CA.

Coda Hale reasons also that availabilty is preferable over consistency in most systems since a service outage has always an economic cost and strong consistency is rarely required.

On the other hand, Michael Stonebraker one of the creators of Vertica, favors consistency over availability, his argument is also solid: the split brain situations are very rare, and in favoring availabilty over consistency in such cases you are sacrificing consistency also in nomal operational situations that are the vast majority of cases.

Cassandra is an AP system designed to be highly available, and that is a priority in an operational database system. Although it supports a tunable consistency model trading off latency for consistency, it is intended to work as an eventual consistency system and does not support row locking in any case.

It is no surprise that Vertica is a CP system, it provides an ACID consistency model. and since it is an analytical db where availability is not the priority, it seems like a right choice.

Storage Considerations

A great deal of the differences of behavior and performance come from the different local storage policies chosen, I'll leave that for a future post.


Tuesday, October 7, 2014

Pangool, The Hadoop Companion


Writing mapreduce jobs with Hadoop is no trivial task, the programming model has a lot of moving parts that have to work together: mapper, combiner, reducer and in most cases partitioner, comparators, writables and writablecomparable objects.

As you know, there are tools that provide a higher level of abstraction allowing us to define the jobs by using a declarative (Hive) or procedural (Pig) notation and then generating and launching the java mapreduce tasks transparently.

This is often enough in many cases, for example when we are executing analytical or exploratory queries against a data set, the performance of the map reduce job is not a priority, it is more important to be able to create these queries easily and with a familiar, sql-like language. Not everybody is a Java expert, after all.

In other cases performance can be a key requirement and you need to roll up your sleeves and write the java code directly. This is particularly important in complex processes that involve several chained jobs. If you can reduce the number of jobs, the optimization is worth the effort.

It would be nice to have a tool that help us to abstract from the more frequent and tedious mapreduce patterns without losing the flexibility and tuning capabilities of the Hadoop java api. Enter Pangool, a Java low level mapreduce api built on top of Hadoop. By implementing an intermediate Tuple-based schema it allow us to implement patterns like secondary sorting or reduce-side joins almost transparently in a declarative fashion. This tuple schema free us of the hassle of implementing custom writable objects and is leveraged via the provided tuple input and output formats to seamlessly build job pipelines. All of this is achieved without losing the flexibility and performance that Hadoop low level api provides.

To see an example of a job created with Pangool we can rewrite the job from the previous post. We will see how it simplifies the application of the secondary sort pattern and how we can still implement our own customizations (custom partitioner, input file format...)

The code is available at github

The first change we notice is the definition of tuple schemas:


1
2
3
4
5
6
7
8
private static final Schema INTERMEDIATE_SCHEMA = new Schema(
  "schema",
  Arrays.asList(new Field[] { Field.create(SPLIT_FIELD, Type.INT),
    Field.create(LINE_FIELD, Type.STRING, true),
    Field.create(SEQ_FIELD, Type.LONG), Field.create(TOTAL_FIELD, Type.LONG, true), }));

private static final Schema RESULT_SCHEMA = Mutator.subSetOf("mutated", INTERMEDIATE_SCHEMA,
  SEQ_FIELD, LINE_FIELD);

With these two lines we define the output of the mapper process (intermediate schema) and the output of the job (result schema). That's all, we don't need to create custom writables for the composite keys and values used in the secondary sort pattern  and we can support null values in a field to send the split total count with a line value of null  (the boolean set to true in the field definition indicates null support).

Note that we can easily reuse our original schema to define the output using the Mutator class.

Now we want to apply the secondary sort, ordering by sequence and grouping by split:

1
2
3
mr.addIntermediateSchema(INTERMEDIATE_SCHEMA);
mr.setGroupByFields(SPLIT_FIELD);
mr.setOrderBy(new OrderBy().add(SPLIT_FIELD, Order.ASC).add(SEQ_FIELD, Order.ASC));

We indicate the schema, group field, and the ordering, and it's finished, this is our implementation of the secondary sort pattern (This is in addition to the mapper and reducer of course) . Easy, don't you think?. And all of this without losing the flexibility of Hadoop low level api, we still provide our custom input format, and input split classes:

1
mr.addInput(new Path(input), new SequencerInputFormat(), new SequencerMap());

and the custom partitioner:

1
2
Job job = mr.createJob();
job.setPartitionerClass(SequencerPartitioner.class);

To retrieve the number of splits we have to use the classic api:

1
2
3
FileInputFormat.addInputPath(job, new Path(input));
job.getConfiguration().setInt(TOTAL_SPLIT_NUMBER,
  new TextInputFormat().getSplits(job).size());

There are some small additional code changes derived from the new api, for example to recover our custom split instance in the mapper:

1
2
SequencerFileSplit split = (SequencerFileSplit) ((TaggedInputSplit) context
  .getHadoopContext().getInputSplit()).getInputSplit();

But in overall we are still using the mapreduce model, just with schema defined tuples instead of key value pairs and we can still leverage the underlying framework,

I have used Pangool in several projects, and in my opinion it provides the right balance between abstraction and low level capabilities. The advantages are more evident when you need to chain a sequence of jobs and perform joins, that could be a subject for a future post. The paradigm shift to the tuple model is small since it is an evolution from the previous api, you still will take advantage of all your hard learned mapreduce skills.

Sunday, September 21, 2014

Hadoop: Leveraging the framework


Map-Reduce programming model has become very popular thanks to Hadoop. It provides a simple way to parallelize processing through the distribution of the computation to the nodes where the data is located. You are free from the hassle of traditional multi-threaded distributed programming (locks, signaling, etc..), since each process runs independently in his own virtual machine. This has also disadvantages, mainly the lack of communication between processes, Mappers and Reducers are completely oblivious of the status of the rest of processes. As a truly functional paradigm, the only input the Reducer process have are the intermediate output files from the map stage.

This can prove a difficulty for the parallelization of some algorithms or even render it infeasible. In other cases the limitations can come from the framework itself. For example, some information on the input must be shared by all the tasks and is not immediately available. With Hadoop this is can be often overcome using internal mechanisms or extending the framework, although not very straightforwardly sometimes.

Let's elaborate with an actual case:

We have a huge input text file in HDFS and we want to process it adding unique sequence ids to each line, This ids will start with 0 and will be sequentially increasing in steps of one, so for a file of N lines we will add to each line an id starting in 0 and ending in N-1. The output will be the same file but having each line prepended with the id.

Our requirements are:

- Input will be a text file.
- As said the sequential ids will start in 0 and end in N-1 (N being the total number of lines of the file) in increments on 1
- The output will be in the same order of the input, we don't want to shuffle lines.
- Only one map-reduce job will be used.
- Last and not least, the whole point of using Hadoop is to parallelize and scale so no one-reducer jobs.

Summarizing, we are numbering the lines of a file in a distributed fashion using a Hadoop job.

The Job

 

- At the mapper phase, get an id of the file split currently processed, we need this id to be unique and sequentially ordered starting with the first block of the file. Then iterate this split lines incrementing a counter and emitting as key a pair of (splitId, counter). and as value the pair (counter, line) . As you can see we repeat one field of the composite key on the composite value, we will need that for implementing a secondary sorting pattern. The mapper will group all the lines with the same splitId adding the sequence information we can later retrieve in the reducer to maintain the same ordering.

- In the mapper cleanup process we need to propagate the total number of lines of the processed split to all the reducers emitting a pair with key (splitId, currentsplit) and value (currentsplit, total count) for every split. We only emit the to the splits Id greater than the current one.  At the reducer stage we will need this previously calculated totals to set the initial sequence number.

 If we take the following splits:  
 Split 1:  
 A  
 B  
 C  
 Split2:  
 D   
 E  
 Mapper 1 emits: ((1,1) (1,A))  ((1,2) (2,B))  ((1,3) (3,C))  
 Then in the clean-up we inform the reducers processing the split 2 that the total count for split 1 is 3:  
 ((2,1)(1,3))  
 Similarly Mapper 2 emits:  
 ((2,1) (1,D))  ((2,2) (2,E))  
 In the clean-up process we don't need to emit anything, since this is the last split (2 of 2).  

-We apply a secondary sort pattern, grouping in the reducers for split id and receiving the pairs ordered by the counter sequence, we need also to identify pairs with the mapper calculated totals and direct them to each partition, more on that later:

- For each key the reducer checks the count totals coming from the splits with an id lesser than the current key id, sums them and begin writing to the output file the lines using that total as the start value and incrementing with steps of 1. If there are no splits with a lower id, we begin with 0 as the first sequence id.

 In the above example the reducers will receive two calls, (remember we group by split id and we have two splits):  
 (1, (1,A) (2,B) (3,C))  
 (2, (1,3) (1,D) (2,E))  
 For split 1 we have no previous split total so we start with 0 and write to the file:  
 (0, A)(1,B)(2,C)  
 for the next split 2, we have one key (1) lower than the current with a total of 3: (1,3), so we start on 3:  
 (3,D) (4,E)  

 

 Java code


You can find the source code in my github repository, There are three classes: two writable implementations for the composite key and value and the job implementation class including all the moving parts (mapper, reducer, partitioner...): SplitSequenceWritable.java, SequenceLineWritable.java, SequencerJob.java

The key points here are:

- We need to emit an unique sequential split Id in the mapper and we need also to know the total number of splits. 

 

We can do that extending the framework. The list of FileSplits for the input file is calculated in the getSplits method of the TextInputFormatClass using as parameter the job context, so we can extend that class and also the FileSplit class to create instances with the total count and the split sequence id information:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public static class SequencerFileSplit extends FileSplit
{
 private int totalSplits;
 private int splitId;

 SequencerFileSplit()
 {
  super();
 }

 SequencerFileSplit(FileSplit split, int splitId, int totalSplits) throws IOException
 {
  super(split.getPath(), split.getStart(), split.getLength(), split.getLocations());
  this.splitId = splitId;
  this.totalSplits = totalSplits;
 }

 @Override
 public void readFields(DataInput in) throws IOException
 {
  super.readFields(in);
  splitId = in.readInt();
  totalSplits = in.readInt();
 }

 @Override
 public void write(DataOutput out) throws IOException
 {
  super.write(out);
  out.writeInt(splitId);
  out.writeInt(totalSplits);
 }

//Rest of methods
}


We add our custom fields to the split, note that this is a writable object distributed in the cluster so you need to make sure the new fields are serialized and also provide an empty constructor.

Our implementation of getSplits simply calls to the parent method to retrieve all the splits information and returns them wrapped in out custom split with the id and total information. We are assuming here that the framework returns the splits ordered by start offset and that is actually true:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
public static class SequencerInputFormat extends TextInputFormat
{

 @Override
 public List<InputSplit> getSplits(JobContext arg0) throws IOException
 {
  List<InputSplit> splits = super.getSplits(arg0);
  int total = splits.size();
  List<InputSplit> result = new ArrayList<InputSplit>(total);
  int counter = 1;
  for (InputSplit split : splits)
  {
   result.add(new SequencerFileSplit((FileSplit) split, counter++, total));
  }
  return result;
 }
}


- We need to group the lines by splitIds with the same ordering of the input file. The output must maintain the initial order.

 

We are implementing a canonical secondary sort pattern with custom writable composite keys and values, a partitioner and a custom GroupComparator. The composite key containing splitid and sequence number means that we will group by splitId and order by the input sequence. We need to implement a groupcomparator by splitId only and a partitioner to send the pairs to the reducers based also on the splitId. To maintain the input file ordering the partitioner must send the adjacent splits to the same reducer. For example imagine we have 12 splits and 3 reducers, we want splits 1,2,3,4 to go to one reducer, 5,6,7,8 to another and 9,10,11,12 to the last one. To do that we need to know the total split count value in the partitioner. 
This class is not aware of the job configuration by default, but we can make Hadoop to inject it automatically implementing the Configurable interface:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public static class SequencerPartitioner extends Partitioner<SplitSequenceWritable, SequenceLineWritable>
  implements Configurable
{

 private Configuration conf;
 private int totalSplitNumber;

 @Override
 public int getPartition(SplitSequenceWritable key, SequenceLineWritable value, int numPartitions)
 {
  return ((key.getSplit() - 1) * numPartitions) / totalSplitNumber;
 }

 @Override
 public Configuration getConf()
 {
  return conf;
 }

 @Override
 public void setConf(Configuration conf)
 {
  this.conf = conf;
  totalSplitNumber = conf.getInt(TOTAL_SPLIT_NUMBER, 0);
 }
}


And now we can make it available in the job configuration, creating an instance of TextInputFormat to retrieve the value:

1
job.getConfiguration().setInt(TOTAL_SPLIT_NUMBER, new TextInputFormat().getSplits(job).size());


The reducer calculates the start number for out split id keys, remember we emitted that totals in the mapper as "meta-pairs" for all the splits:
 (split1, -currentsplit) (-1, currentSplitLineCount) 
 (split2, -currentsplit) (-1, currentSplitLineCount)
 (split3, -currentsplit) (-1, currentSplitLineCount)
.
.

As you see the sequence value corresponding to the total was emitted as -1 in the mapper stage. This is an easy way to identify an aggregation pair since all sequences are positives. Additionally the reducer will receive these totals as the first pairs since the iterable is ordered. Once we have the start sequence number we can iterate the rest of pairs (also coming in the right order thanks to the secondary sorting) containing the lines and number them incrementally:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
protected void reduce(SplitSequenceWritable key, Iterable<SequenceLineWritable> values,
  Context context) throws IOException, InterruptedException
{
 Iterator<SequenceLineWritable> iterator = values.iterator();
 long sequenceNumber = 0;
 SequenceLineWritable value = iterator.next();
 while (iterator.hasNext() && value.getSequence() < 0)
 {
  sequenceNumber += Long.valueOf(value.getLine());
  value = iterator.next();
 }
 sequence.set(sequenceNumber);
 line.set(value.getLine());
 context.write(sequence, line);
 sequenceNumber++;
 while (iterator.hasNext())
 {
  value = iterator.next();
  sequence.set(sequenceNumber);
  line.set(value.getLine());
  context.write(sequence, line);
  sequenceNumber++;
 }
}


We have extended the framework to be able to identify the current split in each stage (map, partitioner, comparator, reducer). This information needs to be calculated by Hadoop anyway and by making it available we gain insight and flexibility in our jobs. Since the pairs are grouped by split in the reducers, the data is uniformly distributed to avoid hot spots nodes allowing for a linear scaling. More generally this pattern could be used to apply transformations to the lines of a text file maintaining the ordering.