Sunday, December 13, 2015

Spark for Hadoop MR Refugees

Spark computational model can be confusing for someone coming from the Hadoop map reduce paradigm. Don’t get me wrong, I think that previous experience with Hadoop is definitely very good to understand a spark job execution and at the end at a very fundamental simplified level , spark DAG pipeline can be reduced to operations executed transforming data local to the node (map) and operations that trigger a shuffle of the data to be processed in other node (reduce). But I think that at the beginning you will find yourself wondering, I did this this or that way with Hadoop, how it is done with Spark?.


Distributed execution and lazy model


When writing a map reduce job, it is clear where each piece is executed and when. You have the driver program that is executed in the client/frontier node. Then you have mapper, reducer, combiner task classes that are distributed to the cluster and run in parallel processing the data. Those are the distributed code parts , that will be serialized, cannot communicate with the other tasks, etc…. You know that the distributed execution will be started with the method waitForCompletion on the driver, and the result will be written to a file using a configured outputformat and then the job ends.

With spark the code in the driver is more “interactive”, the driver can trigger distributed tasks, get some results, launch more distributed tasks and so on. Is a bit more difficult to identify when the code will be executed and when.

The  key here is that the inputs and outputs for the distributed tasks in spark are the RDD objects that abstract a distributed dataset managed by the framework that can be stored in the cluster either on disk or memory. That means that the code that will be distributed to the cluster will be the objects passed to the RDD methods. So all RDD operations are distributed, the parameter objects will be serialized and sent over the wire and you must take in account the closure etc...

The map reduce tasks that we know from Hadoop can be translated in spark as map/flatmap and reduceByKey. This is using key/value RDDs (Hadoop operates always with key/value pairs, spark support RDDs of simple types for example). Now, when does the driver executes these operations?. Spark uses a lazy evaluation model meaning that for certain operations called transformations the execution will be delayed till an actual result is demanded. These map / reduceByKey operations are transformations, so if you chain a set of map reduce jobs using them, they will be added to an internal execution graph pipeline (DAG) and delayed till a result is requested with another operation called action. When the execution is triggered the current graph is optimized/reordered and the tasks executed in the cluster.

In Hadoop these action operations correspond to the waitForCompletion call that usually will involve the output written to a file. There is an action for that in Spark for example saveAsTextFile. But you can request results from RDD transformations without explicitly writing to file using actions like collect or take. Moreover, once and RDD is transformed after an action you can issue new transformations calls to this RDD until a new action is triggered. With Hadoop that can only be done chaining several independent jobs.


Basic Map Reduce: Mapper/reducer/combiner equivalents


     These basic Hadoop tasks are now RDD transformations in Spark.

     The mapper task equivalent in spark using the map/flatmap transformation on an k/v pair RDD. Use map when we want to emit one output per input and flatmap when we want several outputs per input, flattening them afterwards.

     The reducer task equivalent in spark using the reduceByKey/groupByKey transformation on an k/v pair RDD. reduceByKey does not allow us to change the key and assumes that the reducer function is associative executing a previous combiner step before shuffling. groupByKey is more flexible in the sense that it allows to change the key it does not apply any reduce function and only groups the values by key, so there are potential risks of heavy I/O and out of memory.

The classic word count example:

val wordCounts = inputData.flatMap(line => line.split("\\s* \\s*"))
.filter(word => word.size>0)
.map(word => (word.toLowerCase,1))
.reduceByKey(_+_)
   
As we know with Hadoop there are cases when we cannot use the same reducer function as the combiner because the input and output does not match. By default the reduceByKey transformation executes the reduce function locally in the node as a combiner before shuffling and applying it again as a reducer. If we want to use a different function as combiner we need the combineByKey transformation where we can specify the reducer and combiner functions separately.

An example of different combiner and reducer, we are getting the words average length grouped by the first character:

val wordAvgLengthByFirstChars = firstCharAndCounts.combineByKey((_,1),
 (x:(Int,Int),y) => (x._1+y,x._2+1),
 (x:(Int,Int),y:(Int,Int)) => (x._1+y._1,x._2+y._2))
.mapValues(x => (x._1/x._2).toFloat)


Other concepts: Partitioner, GroupComparator, setup, cleanup


     In Hadoop and spark the default partitioner is based on the hash code of the key, as you know you can define a custom partitioner in Hadoop setting the custom partitioner class in the configuration of the job. In spark, guess what, there is an RDD transformation to define a custom partitioner call partitionBy. 

     Group comparators were used with Hadoop when implementing custom composite keys, like in the secondary sort pattern, and control the pairs will be grouped in the reducer phase. The same can be achieved with Spark using groupBy transformation with a custom comparator, as in the groupByKey transformation , use with case since all the values belonging to the same key must fit in memory

     Setup and clean up can be used in the mapper stage with the mapPartitions that run once for each partition and traverses an iterator of all the values, So you can execute your setp code before starting to iterate and the clean up one once the call to iterator has next returns false.

     For the reducer case you have to use the groupByKey transformation previously and then mapPartitions.


Some Map reduce patterns

 
I’m not going into much detail about what these basic Map Reduce patterns are, just giving an idea of how could they be implemented with Spark,

- In mapper combining.

     The reasons for this Hadoop mr pattern were mainly two: perform the combining step in memory and assure it was executed, since Hadoop did not guarantee the execution of the combining step and it was performed on the output file of the mapper. This is not the case with spark but still there could be an scenario for this pattern, when we want to use the groupByKey transformation, but we want to minimize the data shuffled performing a local combining in each partition.  Don't forget that the local aggregated data must fit in memory.

A word count example using in mapper combining and groupByKey:


val inMapperCombWordCount = inputData.mapPartitions { pIterator => val localComb = Map[String,Int]();
pIterator.foreach{_.split("\\s* \\s*")
  .map{x=>x.toLowerCase;
  localComb(x) = localComb.get(x).getOrElse(0)+1}};
localComb.toIterator}
.groupByKey.mapValues(_.sum)


- Secondary sort

     The elements needed for this pattern are the custom partitioner for the composite key and the custom group comparator. We can achieve that using the groupBy method with a custom partitioner and a custom comparator. This method is expensive in terms of the amount of data shuffled.

- Reducer side joins


     This pattern is already included in spark as the join transformation. In Hadoop this involves shuffling all the data belonging to the datasets being joined so the data belonging to the same key goes to the same partitioner. Spark adds a very important optimization, it keeps track if one of the datasets has been already partitioned and only shuffles the other, if both have been partitioned with the same partitioner and are cached in the same machines then no shuffling at all will occur. 

- Optimized Serialization

One of the key points in Hadoop is the provided optimized serialization system using writable objects that can be easily extended. Spark relies on a third party library, kryo.  

1 comment:

  1. really Good blog post.provided a helpful information.I hope that you will post more updates like thisBig Data Hadoop Training India

    ReplyDelete