I've been working with Oracle Coherence for several months. Coherence is an in memory data grid, developed originally by Tangosol, it was acquired by Oracle in 2007. If you remember my last post about an elastic search river, it indexed coherence caches.
What makes Coherence more interesting than a simple distributed cache manager are its parallel processing capabilities. In particular, the possibility of co-locate processor agents in each node that access locally to the data. These agents are entry processors also called scalar agents.
Out of the box and making the analogy to the map reduce paradigm these processors are like mappers task running in each node, there is no reduce phase.
In addition to the scalar agents there is also support for entry aggregators that perform operations against a subset of entries to obtain a single result. They define two methods, one for processing the entries of the cache (map) and other for aggregating the results (reduce).
This model have several limitations: all the results of the entries processing have to be kept in memory (not as part of cache but as Objects in the Java heap) and sent to the requesting node to perform the aggregation. This aggregation is performed in one node. Making the analogy again this is similar to a one-reducer job, not very scalable.
Out of the box and making the analogy to the map reduce paradigm these processors are like mappers task running in each node, there is no reduce phase.
In addition to the scalar agents there is also support for entry aggregators that perform operations against a subset of entries to obtain a single result. They define two methods, one for processing the entries of the cache (map) and other for aggregating the results (reduce).
This model have several limitations: all the results of the entries processing have to be kept in memory (not as part of cache but as Objects in the Java heap) and sent to the requesting node to perform the aggregation. This aggregation is performed in one node. Making the analogy again this is similar to a one-reducer job, not very scalable.
Coming from the Hadoop world I immediately thought in the possibility of developing a more scalable framework, grouping the data by key value pairs, minimizing the network traffic and parallelizing the reducer phase. This should also allow to port a lot of algorithms already implemented in the hadoop map reduce style.
You can find the first version of this framework in my github repository. There are also samples of word count and inverted index map reduce jobs implemented as junits.
You can find the first version of this framework in my github repository. There are also samples of word count and inverted index map reduce jobs implemented as junits.
The basics
If you recall the Hadoop model, you basically need to supply an input folder, an output folder a mapper and a reducer implementation. There are also staging directories where the intermediate files involved int the shuffle and sort phase between mapping and reducing are stored. In Coherence we will model the folder locations as caches and the map and reduce task as processors.
In our case the Entry point of the framework will be a MapReduce class. The constructor needs three cache names: input, output and staging and a mapper and a reducer object implementing the interfaces we provide. These interfaces are similar to the ones we already know from Hadoop, they are generic and the type parameter correspond to the different key/value input and out pairs:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | public static interface Mapper<MKI extends Comparable<MKI>, MVI, K extends Comparable<K>, V> extends Serializable, PortableObject { public void map(MKI key, MVI value, Context<K, V> context); } public static interface Reducer<K, V, RKO extends Comparable<RKO>, RVO> extends Serializable, PortableObject { public void reduce(K key, Iterator<V> values, Context<RKO, RVO> context); } public MapReduce(String input, String staging, String output, Mapper<?, ?, K, V> mapper, Reducer<K, V, ?, ?> reducer) { this.input = input; this.staging = staging; this.output = output; this.mapper = mapper; this.reducer = reducer; } |
The mapper code will be executed inside a mapper processor modeled as an entry processor, it will iterate over the input cache entries storing the result of the map function on the staging cache.
1 2 3 4 5 6 7 | @Override public Object process(final Entry paramEntry) { mapper.map((Comparable<?>) paramEntry.getKey(), paramEntry.getValue(), context); return null; } |
Similarly the reducer code is executed inside in other processor, receiving as input the staging cache and storing the results in the output cache.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | @Override public Map processAll(Set arg0) { if (arg0.size() == 0) { return null; } final BackingMapContext bmctx = ((BinaryEntry) arg0.iterator().next()) .getBackingMapContext(); this.context = new JobContext<>(bmctx.getManagerContext().getCacheService().getCluster() .getLocalMember().getId(), output); final Set<Map.Entry<K, Set<Binary>>> entries = getIndexedValues(bmctx); Map<Binary, Binary> bMap = bmctx.getBackingMap(); Converter converter = bmctx.getManagerContext().getKeyFromInternalConverter(); for (Map.Entry<K, Set<Binary>> entry : entries) { reducer.reduce(entry.getKey(), new ValuesIterator<>(entry.getValue(), converter, bMap), context); } context.flush(); return null; } |
We need the mapper output to be grouped by output key, hence each key has an associated set of values stored in the same node, the reducer running in that node will process each key. Coherence provide us a key association feature that allow us to do that by defining a common key for a group of key values. In this case we create a synthetic composite key, NodeAwareKey with the output key, the mapper node id and a sequence, we need that to avoid collisions among keys generated in different nodes and we associate this keys by the mapper output key so they are all grouped in the same node in the staging cache. We leave the default coherence partitioner to distribute the keys but always grouped by the key associator.
We need the reducer to retrieve this information ordered and grouped by similar keys. to do that I've defined a sorted index on the output key part of the NodeAwareKey: this key implements comparable. The values are then retrieved through this index:
1 2 3 4 5 | private Set<Map.Entry<K, Set<Binary>>> getIndexedValues(BackingMapContext context) { return ((MapIndex) context.getIndexMap().get(MapReduce.KEY_EXTRACTOR)).getIndexContents() .entrySet(); } |
A better solution could be to implement a custom Binary comparator ( a raw comparator), sort the BackingMap and iterate it. Maybe something to check in the future.
As a last step the output of the reducer is also stored using the Composite keys to avoid collisions.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 | public class CompositeKey<K1 extends Comparable<K1>, K2 extends Comparable<K2>> implements KeyAssociation, Serializable, PortableObject { protected K1 key1; protected K2 key2; protected long sequence; public CompositeKey() { } public CompositeKey(K1 key1, K2 key2, long sequence) { this.key1 = key1; this.key2 = key2; this.sequence = sequence; } public K1 getKey1() { return key1; } public K2 getKey2() { return key2; } @Override public Object getAssociatedKey() { return key2; } |
Location, location, location
In Hadoop cpu is considered a cheap resource, local disk access affordable and network i/o something to avoid if possible. Hadoop is known for its high latency, so the impact of network traffic is much more noticeable in an in memory distributed storage like coherence.
Hadoop introduces an additional phase, between mapping and reducing: combining. This stage reduces the data sent over the wire, it is basically a partial reducer step on the local node, so the data sent to the final reducer has been previously aggregated. This partial reduce step is not possible in all cases.
To implement this step we need another reducer implementation used as a combiner with the particularity that it receives and emits the same key and value types that are the output of the mapper and the input of the reducer.
We need the output of the mapper to be stored in the same node it is running on, to do that we use again a Composite key replacing the node id value (We don't need it in this case since the results will be stored in the local node) by a SimplePartitionKey object on a partition in this same node and associating the keys to it (SimplePartitionKey is a class provided by Coherence, instances are created using a Factory on a partition id, coherence ensures the key is assigned to that partition). We store these mapper results in the output cache using it as temporal storage.
Then we apply the reducer processor to this cache executing the combiner implementation, the results are then grouped, distributed in the cluster and stored in the staging cache as in the previous step. Finally the reducer is applied to these combined input also in the same way.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | public void mapReduce() { NamedCache inputCache = CacheFactory.getCache(input); NamedCache stagingCache = CacheFactory.getCache(staging); NamedCache outputCache = CacheFactory.getCache(output); stagingCache.clear(); outputCache.clear(); inputCache.invokeAll(AlwaysFilter.INSTANCE, new MapperProcessor<K, V>(staging, output, this.mapper, this.combiner != null)); stagingCache.addIndex(KEY_EXTRACTOR, true, null); outputCache.addIndex(KEY_EXTRACTOR, true, null); Filter filter = AlwaysFilter.INSTANCE; if (this.combiner != null) { outputCache.invokeAll(filter, new ReducerProcessor<K, V>(staging, this.combiner)); outputCache.clear(); outputCache.removeIndex(KEY_EXTRACTOR); } stagingCache.invokeAll(filter, new ReducerProcessor<K, V>(output, this.reducer)); } |
Saving resources
To minimize memory usage is better to rely on the backing structures provided by coherence and defer the de-serialization as much as possible:
Writes to the cache from the mapper an the reducer are handled by a context that buffers them and uses putall to flush them once the limit is reached.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 | public JobContext(int memberId, String output) { this.memberId = memberId; this.output = output; this.values = new HashMap<>(); } public void flush() { store(output); values.clear(); } protected Object getKey(K key) { return new NodeAwareKey<K>(key, memberId, count++); } public void write(K key, V value) { if (BUFFER_SIZE == values.size()) { store(output); values.clear(); } values.put(getKey(key), value); } private void store(String target) { CacheFactory.getCache(target).putAll(values); } |
In the reducer, the input value list is handled using an iterator that gets and de-serializes the value from the backing map on demand, one at a time.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | public ValuesIterator(Set<Binary> currentValues, Converter converter, Map<Binary, Binary> bMap) { this.currentValues = currentValues.iterator(); this.converter = converter; this.bMap = bMap; } @Override public boolean hasNext() { return currentValues.hasNext(); } @SuppressWarnings("unchecked") @Override public V next() { return (V) converter.convert(bMap.remove(currentValues.next())); } |
The serialization used by the cache service is Portable Object, much more compact that the java standard, analogous to Hadoop writables. Mapper and reducers need to implement PortableObject interface.
Conclusion and Further work
By translating the Hadoop framework concepts to the coherence infrastructure we have achieved an in memory scalable map reduce model.
The Key concepts here are key co-location using the key affinity functionality and key comparison and ordering using an sorted index. The cluster distribution and balance is delegated to the coherence standard partitioner based on a consistent hash ring.
An immediate improvement would be to give the possibility of grouping by subgroups of a complex key (allowing secondary sort).
As a long term evolution some monitoring, metrics and error control capabilities are a must. The current implementation follows a fail fast model, there is no task retries the job either ends successfully or is cancelled by the first exception. This could be acceptable for the time being giving the lower latency and much lower input data size compared to Hadoop.
No comments:
Post a Comment