Sunday, December 13, 2015

Spark for Hadoop MR Refugees

Spark computational model can be confusing for someone coming from the Hadoop map reduce paradigm. Don’t get me wrong, I think that previous experience with Hadoop is definitely very good to understand a spark job execution and at the end at a very fundamental simplified level , spark DAG pipeline can be reduced to operations executed transforming data local to the node (map) and operations that trigger a shuffle of the data to be processed in other node (reduce). But I think that at the beginning you will find yourself wondering, I did this this or that way with Hadoop, how it is done with Spark?.


Distributed execution and lazy model


When writing a map reduce job, it is clear where each piece is executed and when. You have the driver program that is executed in the client/frontier node. Then you have mapper, reducer, combiner task classes that are distributed to the cluster and run in parallel processing the data. Those are the distributed code parts , that will be serialized, cannot communicate with the other tasks, etc…. You know that the distributed execution will be started with the method waitForCompletion on the driver, and the result will be written to a file using a configured outputformat and then the job ends.

With spark the code in the driver is more “interactive”, the driver can trigger distributed tasks, get some results, launch more distributed tasks and so on. Is a bit more difficult to identify when the code will be executed and when.

The  key here is that the inputs and outputs for the distributed tasks in spark are the RDD objects that abstract a distributed dataset managed by the framework that can be stored in the cluster either on disk or memory. That means that the code that will be distributed to the cluster will be the objects passed to the RDD methods. So all RDD operations are distributed, the parameter objects will be serialized and sent over the wire and you must take in account the closure etc...

The map reduce tasks that we know from Hadoop can be translated in spark as map/flatmap and reduceByKey. This is using key/value RDDs (Hadoop operates always with key/value pairs, spark support RDDs of simple types for example). Now, when does the driver executes these operations?. Spark uses a lazy evaluation model meaning that for certain operations called transformations the execution will be delayed till an actual result is demanded. These map / reduceByKey operations are transformations, so if you chain a set of map reduce jobs using them, they will be added to an internal execution graph pipeline (DAG) and delayed till a result is requested with another operation called action. When the execution is triggered the current graph is optimized/reordered and the tasks executed in the cluster.

In Hadoop these action operations correspond to the waitForCompletion call that usually will involve the output written to a file. There is an action for that in Spark for example saveAsTextFile. But you can request results from RDD transformations without explicitly writing to file using actions like collect or take. Moreover, once and RDD is transformed after an action you can issue new transformations calls to this RDD until a new action is triggered. With Hadoop that can only be done chaining several independent jobs.


Basic Map Reduce: Mapper/reducer/combiner equivalents


     These basic Hadoop tasks are now RDD transformations in Spark.

     The mapper task equivalent in spark using the map/flatmap transformation on an k/v pair RDD. Use map when we want to emit one output per input and flatmap when we want several outputs per input, flattening them afterwards.

     The reducer task equivalent in spark using the reduceByKey/groupByKey transformation on an k/v pair RDD. reduceByKey does not allow us to change the key and assumes that the reducer function is associative executing a previous combiner step before shuffling. groupByKey is more flexible in the sense that it allows to change the key it does not apply any reduce function and only groups the values by key, so there are potential risks of heavy I/O and out of memory.

The classic word count example:

val wordCounts = inputData.flatMap(line => line.split("\\s* \\s*"))
.filter(word => word.size>0)
.map(word => (word.toLowerCase,1))
.reduceByKey(_+_)
   
As we know with Hadoop there are cases when we cannot use the same reducer function as the combiner because the input and output does not match. By default the reduceByKey transformation executes the reduce function locally in the node as a combiner before shuffling and applying it again as a reducer. If we want to use a different function as combiner we need the combineByKey transformation where we can specify the reducer and combiner functions separately.

An example of different combiner and reducer, we are getting the words average length grouped by the first character:

val wordAvgLengthByFirstChars = firstCharAndCounts.combineByKey((_,1),
 (x:(Int,Int),y) => (x._1+y,x._2+1),
 (x:(Int,Int),y:(Int,Int)) => (x._1+y._1,x._2+y._2))
.mapValues(x => (x._1/x._2).toFloat)


Other concepts: Partitioner, GroupComparator, setup, cleanup


     In Hadoop and spark the default partitioner is based on the hash code of the key, as you know you can define a custom partitioner in Hadoop setting the custom partitioner class in the configuration of the job. In spark, guess what, there is an RDD transformation to define a custom partitioner call partitionBy. 

     Group comparators were used with Hadoop when implementing custom composite keys, like in the secondary sort pattern, and control the pairs will be grouped in the reducer phase. The same can be achieved with Spark using groupBy transformation with a custom comparator, as in the groupByKey transformation , use with case since all the values belonging to the same key must fit in memory

     Setup and clean up can be used in the mapper stage with the mapPartitions that run once for each partition and traverses an iterator of all the values, So you can execute your setp code before starting to iterate and the clean up one once the call to iterator has next returns false.

     For the reducer case you have to use the groupByKey transformation previously and then mapPartitions.


Some Map reduce patterns

 
I’m not going into much detail about what these basic Map Reduce patterns are, just giving an idea of how could they be implemented with Spark,

- In mapper combining.

     The reasons for this Hadoop mr pattern were mainly two: perform the combining step in memory and assure it was executed, since Hadoop did not guarantee the execution of the combining step and it was performed on the output file of the mapper. This is not the case with spark but still there could be an scenario for this pattern, when we want to use the groupByKey transformation, but we want to minimize the data shuffled performing a local combining in each partition.  Don't forget that the local aggregated data must fit in memory.

A word count example using in mapper combining and groupByKey:


val inMapperCombWordCount = inputData.mapPartitions { pIterator => val localComb = Map[String,Int]();
pIterator.foreach{_.split("\\s* \\s*")
  .map{x=>x.toLowerCase;
  localComb(x) = localComb.get(x).getOrElse(0)+1}};
localComb.toIterator}
.groupByKey.mapValues(_.sum)


- Secondary sort

     The elements needed for this pattern are the custom partitioner for the composite key and the custom group comparator. We can achieve that using the groupBy method with a custom partitioner and a custom comparator. This method is expensive in terms of the amount of data shuffled.

- Reducer side joins


     This pattern is already included in spark as the join transformation. In Hadoop this involves shuffling all the data belonging to the datasets being joined so the data belonging to the same key goes to the same partitioner. Spark adds a very important optimization, it keeps track if one of the datasets has been already partitioned and only shuffles the other, if both have been partitioned with the same partitioner and are cached in the same machines then no shuffling at all will occur. 

- Optimized Serialization

One of the key points in Hadoop is the provided optimized serialization system using writable objects that can be easily extended. Spark relies on a third party library, kryo.  

Monday, March 16, 2015

It's time to take another look at MongoDb

MongoDb is one of the most popular database systems today and certainly the most widely known document store solution.  Since the last version (3.0) has been released this month, I've decided to write a post on what are its strengths and weaknesses (in my opinion) and some tips in writing Mongo client applications.

The bad news

 

Despite the hype, a strong wave of criticism has emerged in the last years pointing out several important flaws:

Unlike most database systems, durability is configurable in MongoDb, and more importantly, the default value does not assure it. For durability I'm referring to the basic guarantee that, if a write to the database is successful, the data has been stored and if it if was not due some problem the client is dully notified (we are not talking here about replica based fail-overs during a crash or eventual consistency between replicas). The universal mechanism used for that is a write-ahead-log that keep tracks of the operations pending to be processed, a write is only successful if it has been recorded in the log. In MongoDb this mechanism is called journal and by default clients does not wait for the journal to be written to return from a write operation. The operation stays in the server memory till a thread dumps it to the journal asynchronously, thus we have a time window where , if the server crashes, there is an undetected data loss. Fortunately you can configure the client to wait to to the journal write before returning a success, but it is not the default behavior, and it takes a performance hit. Most of the initial MongoDb benchmarking were done without journaling making the comparison to other systems a bit unfair.

Initially MongoDb write locking system was global meaning that the whole mongod server instance was blocked for each write effectively serializing all the insertions, in later releases the lock changed to database level, forcing in most cases to model the data using a unique collection per database to parallelize data insertion. 

Scalability. Mongo is not the ideal repository when thinking in web-scale requirements, certainly not when compared with systems designed to store distributed data sets on the petabyte range like Hbase or Cassandra. Despite of the marketing and branding name (Humongous) it does not seem that it was initially designed with cluster distribution in mind: when using sharding and replication, you need different replica node sets for each shard: for example, a total of 9 different nodes is needed for supporting 3 shards with a replica factor of 2. the configuration is also complicated needing separate managing processes (mongos) also configured redundantly in HA, and other extra processes to store and serve the cluster metadata (configuration manager) deployed in a cluster of 3 nodes. On top of that the database functionality is limited when using sharding

Storage. MongoDb is schema-free but not schema-less, storing the documents in BSON format means that each document is stored with its own schema: field names, structures, hierarchy relations.. This is a great storage overhead compared to other schema bound solutions.

The good news 

 

Obviously a product so successful had several strong points:

The document format, BSON can be directly mapped from JSON, this is a huge advantage for front end development based on javascript: the ajax messages can be directly stored in the repository. There is no relational mapping , no JPA needed.

Schema flexibility means that there is no problem with schema changes, new fields, field removal...,  One of the most painful scenarios we can found when using a relational database is gone.

This is in part possible because there are no relation between collections, no multi-collection transactions or joins.

New developments


This month MongoDb published the new 3.0 release, including a new storage engine, WiredTiger, somewhat fixing two of the most criticized points:

WiredTiger supports compression (two codecs, snappy and zlib). This is a huge storage and disk IO improvement. Mongo claims that data storage can be reduced in a 70%. In our particular case I tested the size reduction in more than an 80% using Snappy. (Snappy aims for fast compression and reasonable compression whereas zlib provides maximum compression but is slower). The tests also showed improvements in the average writing speed the reduced disk I/O compensated the cpu time used in compression.  My case is specially favorable since the documents are very big and have a high degree of redundancy.

WiredTiger write locking is done at document level providing the highest throughput. Even if you use the former mmap engine, locking is now done at collection level.

Building our stack

 

MongoDb is a good solution when you have to store unstructured data. It also gives you the possibility of horizontal scaling and high availability out of the box. Compression was a critical improvement since data redundancy is inherent to the model: there are no relations, nor joins, so all needed data has to be stored in each collection even if duplicated in a de-normalized fashion; the schema is free, meaning it is stored with each document. The reduced storage needs imply that sharding can be minimized or disregarded and an increment of the scale out limits.

There is no excuse for not using Object Document Mapping

When using MongoDb from Java the first thing that we notice is the impedance mismatch between the java object and the BSON document. This is can be addressed using an object document mapper. Unlike ORM where the impedance with the relational model can be very high, and there are advocates of either using directly SQL or a mapping framework like JPA, there is no discussion here that the best approach is to use a document mapper, since each document will be stored in a collection, not normalized, there will be no relations, external keys, more than one table involved...

Use Jackson

Usually the only variable measured when choosing a json mapper is the performance. It is a main factor for sure, but the feature set provided is equally important. Jackson is in the top regarding the serialization speed and is, hands down,  the more flexible and cofigurable, providing the richest feature set. With Jackson you even don't need to annotate classes directly or you can do it externally using  Mix-in.

Leverage the existing frameworks

There are two frameworks that provide databinding using jackson as the json mapper: mongojack and jongo. The two of them serialize directly to BSON removing an intermediate step. I opted for mongojack since the approach is to map a collection to a generic type in creation, and jongo needs to receive the destination object class on each query call, this can be useful if you need to change it for different queries. Either once in creation or in each query call the class must be provided due to Java type erasure.

Keep it simple, your data is never so unstructured

Use the class definition as the schema of the collection, (that's the approach with mongojack), mapping one type to one collection, leverage inheritance so instances of extending classes can be stored in that collection representing evolving schemas. Jackson supports the storing of the class type as metadata in the document to provide polymorphic deserialization, not existing fields can be configured to be ignored, I have tested it with mongojack and it works perfectly.

Use ObjectId as the key for all the collections, look at it as a sequence or auto-incremental synthetic id guaranteed to be unique in the cluster and containing the document creation timestamp.

Write in batches

Although MongoDb provides a configurable durability model, the reality is that the vast majority of applications will need some storage guarantees, that is a journaled write concern.

This is implemented at MongoDb server side as an scheduled thread that writes to the journal the operations queued in memory periodically. If we write in batches, aside of  the benefit of minimizing network trips, we maximize the number of operations that will be saved to the journal in the next period.

Your mileage may vary

I arrived to this guidelines after analyzing the different storage requirements of several components in our architecture and examining MongoDb capabilities and limitations. I designed a general data abstraction layer using this ideas targeting both ease of use and performance.

While I think that this ideas can be applied to most of the cases, your scenario may need a different approach (or a different database solution). In any case I think that, even if you discarded Mongo before, with the improvements added to the last release, may be it is worth a reassessment.

Thursday, February 26, 2015

Considerations on a Hadoop ETL

The first step when we want to process some data with Hadoop is to load it in HDFS.  HDFS is a robust fault tolerant distributed  file system. It gives you unified seamless access to a set of distributed hard disk partitions and also automatic replication, all of it running on commodity and even heterogeneous hardware. For all the capabilities it provides it has its caveats:

- It is written in java on top of the O.S. file system, so it demands much more cpu and memory than a traditional filesystem.
- It needs an orchestrator master process running in a machine, the Namenode.  This is both a single point of failure and a bottleneck.

You need to keep in mind these ideas when loading data in HDFS.

Don't stream the files directly to HDFS

This is generally a bad idea whatever the system you are loading into. You need some basic ETL proxy application to at least keep track of the data already streamed, The process has to be monitored so it can be resumed, and it is nice also to have a fault tolerant implementation.

In addition with HDFS you have to avoid storing a lot of small files. The name node have to keep track of every file in the system and it is memory-limited, the more file you create in HDFS, more memory is used by the name node.

In addition to that, the mapper tasks of a Hadoop job are designed to be spawned by file split, giving that a file has at least one, so per each file at least one mapper task will be spawned. To process 10 files of 10 megabytes , 10 mapper task will be needed instead of 1 for an unique file of 100 megabytes.

The current standard for this proxy application is Apache Flume, a project originally developed by Cloudera.

Use some proxy streaming tool like Apache Flume

Out of the box it seems that Flume can cover all out ingestion needs, and specifically two critical points:

- The input files are processed row per line and grouped in HDFS at new files. We can configure the desired size of this target files to make them match with the default HDFS block size of 128mb, that will be also the mapper task file split.

- We can also configure the tool to persist to disk a buffer of lines read (events in Flume parlance) still to be processed in case there is sudden network slowness/failures, NameNode maintenance downtime etc..

On top of that Flume supports some degree of HA and load balancing allowing the deployment of different "sinks" that  write to HDFS in different machines, the data is sent from the source serialized with Avro.

Some Flume pitfalls. Introducing Apache Kafka.

In our experience working with Apache Flume we found a set of different problems.

- The tool seems a bit immature in several aspects:

Duplicated files are not handled very well: Imagine that you have no control on your input source directory and your client can leave there the same file you processed an hour ago and you are keeping your processed file (auto-renamed with the .PROCESSED suffix), that causes flume to enter an irrecoverable state, you have literally to restart the offending agent:
 https://issues.apache.org/jira/browse/FLUME-2119

Zero-bytes files are also a problem.  When processing a zero byte files, the agent goes also into an irrecoverable state. To resume you have to delete the offending file and restart the agent. This seems to be resolved in the last version:
https://issues.apache.org/jira/browse/FLUME-2525

Spureous "java.io.IOException: Not a data file" exceptions. We did not found the exact cause for this. This was related to the metadata generated for the avro serialization, removing that metadata solved the issue.

- Performance was not good, two aspects in particular:
    Storing to the persistent channel
    File aggregation in HDFS.

So, how to address this issues?, Part of the solution is using a fast reliable HA, event persistent store like apache Kafka as the flume sink, it is fast, provide a robust high availability and the HDFS ingestion can be parallelized using Camus.

Flume can be configured to use it like a file streamer (no HA, no avro, no persistent channel) to insert the file lines into Kafka and Kafka in turn will be polled by the Camus job to retrieve the data.

Not all is covered out of the box, not even common cases


One of our ETL cases was fairly common, a client will ftp log files to a server, and those files are the imput for the ETL. As you imagine in this cases you have little control on the file format, delivery policy of frequency, or at least that was our case and we found several important issues we have to deal with:

         Zero byte files.  Since Flume did not like this type of files we have a staging folder were the files were ftped to and there we periodically cleaned this files before moving the rest to the Flume input folder.
          Duplicate files. Again, Flume will choke on this, the same application that removed the zero byte files will check and remove duplicates.
          Compressed files. Flume does not support compressed formats (rar in our case), so we had also to decompress in the staging folder.
          Determine if a file transfer is finished. We found no reliable mechanism in Linux to indicate that a ftp transfer for a file was finished.  At the end the solution was to check the last time the file was modified in the staging folder, and process only the ones not touched for a significant period (for example 30 mins).
          File content overlap. In our experience, some log files can contain entries already present on previous processed ones, more on this in the next section.

Hive can help


One of the points of a Hadoop ETL is parallelization, we already have seen how to leverage Hadoop by using the Camus map reduce job.

We can also make some parallel filtering using Hadoop. In our case we use it to remove data redundancy caused by file overlap. We use Hive to do that:

  • Set two staging directories in HDFS, with the current ingestion batch finished files and other with the previous one.
  • Create two external schema tables on these directories using a regex serde.
  • Periodically execute a hive query on these two tables to output the rows present in the current table and not in the previous, this output will then be stored as a columnar RCfile binary file compressed with snappy. Then we remove the old batch folder files and move there the current folder ones.

We have now completed the HDFS storage process, our files are compressed by column and have the size of the HDFS block, prepared for an optimum map reduce job processing.

A word on monitoring


It's essential to monitor your ETL process, this works at two levels:

  • Monitor the processes involved, (Flume, Kafka, custom processes), with a proper tool and implement also an alert system. For example Ganglia+Nagios
  • Monitor the data itself to check if there are problems in the ingestion. The problem here is setting thresholds for alerts, You can base that in previous statistics gathered, like the average data ingested per day or if your data is a time series like a periodic log, on detecting time gaps.

Conclusions


  • Know your weaknesses to overcome them: HDFS is not good dealing with a lot of small files.
  • Use your strong points: Parallelize everything you can.
  • Watch the performance and use an HA persistent buffered proxy.
  • Don't neglect monitoring.

Saturday, January 10, 2015

An in memory map reduce framework on top of Oracle Coherence


I've been working with Oracle Coherence for several months. Coherence is an in memory data grid, developed originally by Tangosol, it was acquired by Oracle in 2007.  If you remember my last post about an elastic search river, it indexed coherence caches.

What makes Coherence more interesting than a simple distributed cache manager are its parallel processing capabilities. In particular, the possibility of co-locate processor agents in each node that access  locally to the data. These agents are entry processors also called scalar agents.
Out of the box and making the analogy to the map reduce paradigm these processors are like mappers task running in each node, there is no reduce phase.
In addition to the scalar agents there is also support for entry aggregators that perform operations against a subset of entries to obtain a single result. They define two methods, one for processing the entries of the cache (map) and other for aggregating the results (reduce).
This model have several limitations: all the results of the entries processing have to be kept in memory (not as part of cache but as Objects in the Java heap) and sent to the requesting node to perform the aggregation. This aggregation is performed in one node. Making the analogy again this is similar to a one-reducer job, not very scalable.

Coming from the Hadoop world I immediately thought in the possibility of developing a more scalable framework, grouping the data by key value pairs,  minimizing the network traffic and parallelizing the reducer phase. This should also allow to port a lot of algorithms already implemented in the hadoop map reduce style.
You can find the first version of this framework in my github repository. There are also samples of word count and inverted index map reduce jobs implemented as junits.

The basics

 

If you recall the Hadoop model, you basically need to supply an input folder, an output folder a mapper and a reducer implementation. There are also staging directories where the intermediate files involved int the shuffle and sort phase between mapping and reducing are stored. In Coherence we will model the folder locations as caches and the map and reduce task as processors.

In our case the Entry point of the framework will be a MapReduce class. The constructor needs three cache names: input, output and staging and a mapper and a reducer object implementing the interfaces we provide. These interfaces are similar to the ones we already know from Hadoop, they are generic and the type parameter correspond to the different key/value input and out pairs:



 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
 public static interface Mapper<MKI extends Comparable<MKI>, MVI, K extends Comparable<K>, V>
   extends Serializable, PortableObject
 {
  public void map(MKI key, MVI value, Context<K, V> context);
 }

 public static interface Reducer<K, V, RKO extends Comparable<RKO>, RVO> extends Serializable, PortableObject
 {
  public void reduce(K key, Iterator<V> values, Context<RKO, RVO> context);
 }

 public MapReduce(String input, String staging, String output, Mapper<?, ?, K, V> mapper,
   Reducer<K, V, ?, ?> reducer)
 {
  this.input = input;
  this.staging = staging;
  this.output = output;
  this.mapper = mapper;
  this.reducer = reducer;
 }

The mapper code will be executed inside a mapper processor modeled as an entry processor, it will iterate over the input cache entries storing the result of the  map function on the staging cache.


1
2
3
4
5
6
7
 @Override
 public Object process(final Entry paramEntry)
 {
  mapper.map((Comparable<?>) paramEntry.getKey(), paramEntry.getValue(), context);
  return null;
 }

Similarly the reducer code is executed inside in other processor, receiving as input the staging cache and storing the results in the output cache.


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
 @Override
 public Map processAll(Set arg0)
 {
  if (arg0.size() == 0)
  {
   return null;
  }
  final BackingMapContext bmctx = ((BinaryEntry) arg0.iterator().next())
    .getBackingMapContext();
  this.context = new JobContext<>(bmctx.getManagerContext().getCacheService().getCluster()
    .getLocalMember().getId(), output);
  final Set<Map.Entry<K, Set<Binary>>> entries = getIndexedValues(bmctx);
  Map<Binary, Binary> bMap = bmctx.getBackingMap();
  Converter converter = bmctx.getManagerContext().getKeyFromInternalConverter();
  for (Map.Entry<K, Set<Binary>> entry : entries)
  {
   reducer.reduce(entry.getKey(), new ValuesIterator<>(entry.getValue(), converter, bMap),
     context);
  }
  context.flush();
  return null;
 }

We need the mapper output to be grouped by output key, hence each key has an associated set of values stored in the same node, the reducer running in that node will process each key. Coherence provide us a key association feature that allow us to do that by defining a common key for a group of key values. In this case we create a synthetic composite key, NodeAwareKey with the output key, the mapper node id and a sequence, we need that to avoid collisions among keys generated in different nodes and we associate this keys by the mapper output key so they are all grouped in the same node in the staging cache. We leave the default coherence partitioner to distribute the keys but always grouped by the key associator.

We need the reducer to retrieve this information ordered and grouped by similar keys. to do that I've defined a sorted index on the output key part of the NodeAwareKey: this key implements comparable. The values are then retrieved through this index:


1
2
3
4
5
 private Set<Map.Entry<K, Set<Binary>>> getIndexedValues(BackingMapContext context)
 {
  return ((MapIndex) context.getIndexMap().get(MapReduce.KEY_EXTRACTOR)).getIndexContents()
    .entrySet();
 }

A better solution could be to implement a custom Binary comparator ( a raw comparator), sort the BackingMap and iterate it.  Maybe something to check in the future.

As a last step the output of the reducer is also stored using the Composite keys to avoid collisions.


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class CompositeKey<K1 extends Comparable<K1>, K2 extends Comparable<K2>> implements
  KeyAssociation, Serializable, PortableObject
{
 protected K1 key1;
 protected K2 key2;
 protected long sequence;

 public CompositeKey()
 {
 }

 public CompositeKey(K1 key1, K2 key2, long sequence)
 {
  this.key1 = key1;
  this.key2 = key2;
  this.sequence = sequence;
 }

 public K1 getKey1()
 {
  return key1;
 }

 public K2 getKey2()
 {
  return key2;
 }

 @Override
 public Object getAssociatedKey()
 {
  return key2;
 }


Location, location, location

 


In Hadoop cpu is considered a cheap resource, local disk access affordable and network i/o something to avoid if possible. Hadoop is known for its high latency, so the impact of network traffic is much more noticeable in an in memory distributed storage like coherence.

Hadoop introduces an additional phase, between mapping and reducing: combining. This stage reduces the data sent over the wire, it is basically a partial reducer step on the local node, so the data sent to the final reducer has been previously aggregated. This partial reduce step is not possible in all cases.

To implement this step we need another reducer implementation used as a combiner with the particularity that it receives and emits the same key and value types that are the output of the mapper and the input of the reducer.

We need the output of the mapper to be stored in the same node it is running on, to do that we use again a Composite key replacing the node id value (We don't need it in this case since the results will be stored in the local node) by a SimplePartitionKey object on a partition in this same node and associating the keys to it (SimplePartitionKey is a class provided by Coherence, instances are created using a Factory on a partition id, coherence ensures the key is assigned to that partition). We store these mapper results in the output cache using it as temporal storage.

Then we apply the reducer processor to this cache executing the combiner implementation, the results are then grouped, distributed in the cluster and stored in the staging cache as in the previous step. Finally the reducer is applied to these combined input also in the same way.


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
 public void mapReduce()
 {
  NamedCache inputCache = CacheFactory.getCache(input);
  NamedCache stagingCache = CacheFactory.getCache(staging);
  NamedCache outputCache = CacheFactory.getCache(output);
  stagingCache.clear();
  outputCache.clear();

  inputCache.invokeAll(AlwaysFilter.INSTANCE, new MapperProcessor<K, V>(staging, output,
    this.mapper, this.combiner != null));

  stagingCache.addIndex(KEY_EXTRACTOR, true, null);
  outputCache.addIndex(KEY_EXTRACTOR, true, null);
  
  Filter filter = AlwaysFilter.INSTANCE;
  if (this.combiner != null)
  {
   outputCache.invokeAll(filter, new ReducerProcessor<K, V>(staging, this.combiner));
   outputCache.clear();
   outputCache.removeIndex(KEY_EXTRACTOR);
  }
  stagingCache.invokeAll(filter, new ReducerProcessor<K, V>(output, this.reducer));
 }


Saving resources



To minimize memory usage is better to rely on the backing structures provided by coherence and defer the de-serialization as much as possible:

Writes to the cache from the mapper an the reducer are handled by a context that buffers them and uses putall to flush them once the limit is reached.


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
 public JobContext(int memberId, String output)
 {
  this.memberId = memberId;
  this.output = output;
  this.values = new HashMap<>();
 }

 public void flush()
 {
  store(output);
  values.clear();
 }

 protected Object getKey(K key)
 {
  return new NodeAwareKey<K>(key, memberId, count++);
 }

 public void write(K key, V value)
 {
  if (BUFFER_SIZE == values.size())
  {
   store(output);
   values.clear();
  }
  values.put(getKey(key), value);
 }

 private void store(String target)
 {
  CacheFactory.getCache(target).putAll(values);
 }


In the reducer, the input value list is handled using an iterator that gets and de-serializes the value from the backing map on demand, one at a time.


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
public ValuesIterator(Set<Binary> currentValues, Converter converter, Map<Binary, Binary> bMap)
 {
  this.currentValues = currentValues.iterator();
  this.converter = converter;
  this.bMap = bMap;
 }

 @Override
 public boolean hasNext()
 {
  return currentValues.hasNext();
 }

 @SuppressWarnings("unchecked")
 @Override
 public V next()
 {
  return (V) converter.convert(bMap.remove(currentValues.next()));
 }


The serialization used by the cache service is Portable Object, much more compact that the java standard, analogous to Hadoop writables. Mapper and reducers need to implement PortableObject interface.


Conclusion and Further work



By translating the Hadoop framework concepts to the coherence infrastructure we have achieved an in memory scalable map reduce model.

The Key concepts here are key co-location using the key affinity functionality and key comparison and ordering using an sorted index. The cluster distribution and balance is delegated to the coherence standard partitioner based on a consistent hash ring.

An immediate improvement would be to give the possibility of grouping by subgroups of a complex key (allowing secondary sort).

As a long term evolution some monitoring, metrics and error control capabilities are a must. The current implementation follows a fail fast model, there is no task retries the job either ends successfully or is cancelled by the first exception.  This could be acceptable for the time being giving the lower latency and much lower input data size compared to Hadoop.

Monday, December 8, 2014

Writing an Elasticsearch river


I've been recently working a lot with elasticsearch, the enterprise search engine based on lucene.  It is a very versatile tool. Although originally conceived to index documents or web sites to provide full text search capabilities, it is widely used in other use cases like logs aggregation or structured data indexing to leverage the flexibility its query language and aggregation engine.

An interesting feature of elasticsearch is the possibility of writing custom plugins in Java and deploy them in the cluster. A particular type of plugin, the river, allows us to move the data indexing process to the cluster using a polling approach: the river runs inside the elastiscsearch node retrieving and indexing the data from our repository instead of an external client implementation in a separate process that pushes the data to the cluster.

Writing a river is not a very difficult task. I recently wrote a river for indexing oracle coherence caches, you can find it in my repository. I will try to highlight the main steps I took  using it as example.

The River is a type of plugin and must be loaded by the node when it starts, so we need our entry point, the CoherenceRiverPlugin class to extend the AbstractPlugin class. We need to provide a name and a description of the plugin and, to let Elasticsearch know there is a new plugin, we must provide the plugin implementation class name in a mandatory file named es-plugin.properties.


plugin=org.elasticsearch.plugin.river.coherence.CoherenceRiverPlugin


When the node starts it loads the plugins using reflection from the plugins directory (Can be configured) and, if enabled, from the classpath.  It also looks for a onModule method on each plugin implementation to invoke it, injecting a module parameter in the call. In our case we provide an implementation that expects the injection of the RiverModule so we can register our river:

public class CoherenceRiverPlugin extends AbstractPlugin
{

 @Override
 public String name()
 {
  return "river-coherence";
 }

 @Override
 public String description()
 {
  return "Coherence river plugin";
 }

 public void onModule(RiversModule module)
 {
  module.registerRiver("coherence", CoherenceRiverModule.class);
 }

}

In fact we could register several types of modules in our plugin,  this "module" step is there due the way the underlying dependency injection framework Guice works. The implementation of the module binds our custom implementation: CoherenceRiver to the River interface:

public class CoherenceRiverModule extends AbstractModule
{

 @Override
 protected void configure()
 {
  bind(River.class).to(CoherenceRiver.class).asEagerSingleton();
 }

}

Now we arrive to the actual River implementation: CoherenceRiver. This class implements the interface River, extends the class AbstractRiverComponent and is instantiated by dependency injection when we register our river or, if already registered, when the node starts. The Framework looks for a constructor annotated with Inject to pass the desired context parameter (like a client entry point to the node, river settings ...) or a parameter-less public constructor. In our constructor we gather and store the initialization settings.


protected CoherenceRiver(RiverName riverName, RiverSettings settings, Client client)
{
 super(riverName, settings);
 this.client = client;
...
}

Since we are implementing the River interface we must implement the start and stop methods. The start method is also called when registering the river or, if previously registered, when the node starts. This method contains the logic of our river, it registers a listener to the configured cache and performs a initial query to get the current values, results are coalesced by key on an intermediate structure. We receive all the current stored keys plus the new insertions or deletions via this listener and request the actual value to index them using a configurable BulkProcessor to throttle our inputs. Don't forget that by using the river the data loading process is being performed inside the elasticsearch node virtual machine.

In this case the river keeps running and updating the index with the changes to the cache. Other rivers perform an initial import and stop or even can be automatically removed to avoid subsequent executions.

Similarly we implement the stop method to release the river resources when the river is removed of the node is stopped.

Each river is a singleton inside the cluster, if a node executing the river fails it is started in another one. It's up to you to provide a resume mechanism or re-synchronize the index if needed. Note that this makes the river a fault tolerant component but not an scalable one, since only one instance exists in the cluster the load cannot be distributed. Nonetheless, you can create several river instances to index independent stores inside of a repository like different tables in a relational databases or key/value caches in an in-memory data grid.

To register our river we create a document entry in the special system index "_river", using a custom label as type and the string "_meta" as id, we pass the parameters in json format in the request body. In particular the field type tells elasticsearch the specific river plugin we are registering. We can create different rivers of the same type with different initial parameters by using different index types.


curl -XPUT localhost:9200/_river/coherence_river/_meta -d '{
  "type":"coherence",
  "coherence":
    {
     "cache":"TEST_CACHE",
     "query":"key() between 500 and 800"
    }
 }'

Testing our River

 

Elasticsearch provides a powerful test framework  that we can use simply by extending the class ElasticSearchIntegrationTest. By doing that a local cluster is started and we can access it in our test using an inherited instance client. There are some testing dependencies needed in our pom.xml file described in the documentation. In my experience I had to add additional artifacts, you can find the pom file in the repository.

I faced some difficulties when developing the unit tests:

To test the river I needed it to be retrieved from the classpath and I found that by default the nodes started by the test framework only searched for plugins in the default plugins folder ignoring the classpath.

I had to set the load_classpath_plugins property to true using the framework nodeSettings method:


@Override
protected Settings nodeSettings(int nodeOrdinal)
{
 return ImmutableSettings.builder().put("plugins.load_classpath_plugins", "true").build();
}

Then I found out that with the default global cluster setting this method was not triggered so I set the cluster scope to suite and it started working:


@ClusterScope(scope = Scope.SUITE, numDataNodes = 2)
public class CoherenceRiverTest extends ElasticsearchIntegrationTest

Lastly to make sure that the river index is created as a precondition for your test and assertions logic don't forget to use the ensureGreen method:

ensureGreen(index);

Final Thoughts


All in all, and despite some minor difficulties, the implementation of an elasticsearch river is pretty straightforward and the powerful elasticsearch test framework simplifies greatly the automatic testing.  To summarize the main points:

Advantages:
  • Rivers are simple to embed and test in the elasticsearch framework. The business logic is similar to the stand alone client case.
  • By leveraging the elasticsearch cluster infrastructure you get fault tolerance for free. If the node fails, the river is started in another one.
  • You avoid monitoring and managing a separate process.
  • Rivers connect directly to the repository to index, so you usually save a network hop comparing to the external client that first have to retrieve the data from the repository and then push it to the cluster.
Disadvantages:
  • They are less flexible than an external client, you have to use java.
  • Rivers are deployed as singletons in the cluster, you cannot distribute the load to different nodes, thus, rivers don't scale. On the other hand a separate pushing client could be made to scale but it is a complex task, and on most of use cases a node is enough.
  • You are running your custom data loading process inside the elasticsearch node. This can penalize the node performance, in most cases the common cpu demanding pattern is the json serialization inside the river so make sure you use a good performing library.  It is also advisable to implement a configurable throttling mechanism like the BulkProcessor.

I hope this post gives you some insights on the implications, pros and cons of using a river.