MapReduce Ninja Moves: Combiners, Shuffle & Doing A Sort

Who's driving this car? At first glance it appears that as a developer, you have very little if no control over how MapReduce behaves. In some regards this is an accurate assessment. You have no control over when or where a MapReduce job runs, what data a specific map job will process or which reducer will handle the map's intermediate output. Feeling helpless yet?

Don't worry the truth is that despite all that, there are a number of ninja techniques you can use to take control of how data moves through your MapReduce job to influence the ultimate outcome.

Local Aggregation & Combiners

When a mapper runs a produces it's intermediate output, it is first written to disk before sent over the network through the shuffle/sort and on to the reducer. The two pain points should be evident: local disk I/O on the mapper and of course network traffic.

We can use the typical word count example to better illustrate. If we were to do a word count on the book Pride and Prejudice, our mapper would read in the text line-by-line and emit a key value pair that would consist of a key of an individual word and a value of 1. Nothing unexpected here expect the practical issue that all those key/value pairs will be first written to disk before been sent along the process over the network. For one book that may not be a problem, but we are talking Big Data here and so this is sub-optimal.

To work around this issues we can use the concept know as local aggregation which simply means that we want to consolidate the data before it writing it to disk. Local aggregations can be implemented in two way. First we could use internal structures to store data directly in the mapper. The downside to this approach is memory pressure and the potential that we exceed the amount of memory allocated to the JVM that the map job runs in.

A better method is to make use of the a combiner. Combiners act as local reducers aggregating data by key while its in memory. The difference between the two methods discussed is the combiners will spill or write to disk as buffer limits are reached. This obviously resolves the potentially out of memory issue. It can however results in duplicate keys being emitted to the Shuffle/Short which is generally not an issue considering where started from.

Not that if the reducer function is both associative and commutative  (i.e. sum of word counts) a reducer can function as both as a reducer and a combiner.

Shuffle

After all the map jobs have finished the shuffle is run. The shuffle partitions the data by key and ensures that all records for a given from all mappers are sent to a single reducer. This works because the default partitioner is the HashPartitioner which calculates a hash for the key and ensures that all key/value pairs with the same hash are sent to the same reducer. When you are working primarily with keys that are primitive data types, the built in partition process will normally suffice. When you begin to work with composite keys and complex types things get interest and its time for another ninja move.

The Custom Partitioner

The Partition<K,V> is an abstract class with a single method that allows you to take control of the shuffle process and direct your data to a reducer of your choosing. This class contains just a single method getPartition() whose job is to determine which reducer a specific record should be directed to. The key, value and number of reducers are passed in as arguments which gives you everything you needs to partition your data and direct it to the most appropriate place.

To better illustrate lets look at an example. Our map job outputs a composite key that consists of date in YYYYMMDD format and weather station identifier delimited by a pipe character. If the desired behavior is to send all intermediate output for a given year (the natural key) to a single reducer the default partitioner will not work.

Take two keys [20130312|006852] and [20130313|007051] for example. The default partition will calculate a hash over the entire key resulting in different hashes and the potential that the records are sent to separate reducers. To ensure that both records are sent to the same reducer let's implement a customer partitioner.

 public class YearPartitioner implements Partitioner {
 @Override
   public int getPartition(Text key, LongWritable value, int numReduceTasks) {
     //Split the Year out of the key and convert to int
      int year = Integer.parseInt(sKey.substring(0, 4));

      //Use mod to balance years across the # of available reducers
       return year % numReduceTasks;
   }
}

In the rather arbitrary example above, we took command over the shuffle by splitting out the year (a.k.a. the natural key) and then simply used the modulus function to balance the stream of years over the available reducers. Pretty simple and pretty straight-forward, right? Not so fast kung-fu master.... To guarantee that all relevant rows within a partition of data are sent to a single reducer you must also implement a grouping comparator which considers only the natural key (in this example the Year) as seen below.

 public class YearGroupingComparator extends WritableComparator {

  @Override
   public int compare(Text key1, Text key2) {
      int year1 = Integer.parseInt(key1.toString().substring(0, 4));
      int year2 = Integer.parseInt(key2.toString().substring(0, 4));

      return Integer.compareTo(year1, year2);
 }
}

Before you run off an write your own partitioner though note that as with anything, great power requires great responsibility. Since you are taking control of the partitioning process, MapReduce does nothing to ensure you effectively distribute your data over the reducers. This means that an ineffectively written partitioner can quickly take away all the embarrassingly parallel abilities MapReduce has given you.

Sort

Data that is delivered to the reducer is guaranteed to be grouped by either the key or by the partition/comparator function described above. Suppose though, that we wanted to calculate a moving average over the set of data. In this case, it's important that the records are delivered to the reduce function in a specific order (i.e. ordered by date). The last ninja skill we look at is the ability to do a secondary sort using another custom comparator this time one that looks at the entire composite key.

The Secondary Sort

Using the example from above to illustrate how a secondary sort works, suppose again that we are processing large amounts of multiple inputs. Our partition function/comparator ensure that all the weather station data from 2013 from all the mappers for is sent to a single reducer. But there is a high probability that if this data is spread out that it will arrive out of order. Sure we could build large arrays or other memory structures to hold and then sort the data in the JVM instance of the reducer, but this option doesn't scale well.

Instead, we can define a comparator that evaluates first the natural key and then the composite key to correctly order the key/value pairs by date as seen in the example below:

 public class SortComparator extends WritableComparator {

  @Override
   public int compare(Text key1, Text key2) {
      int year1 = Integer.parseInt(key1.toString().substring(0, 4));
      int year2 = Integer.parseInt(key2.toString().substring(0, 4));

      if (year1 == year2){
            int month1 = Integer.parseInt(key1.toString().substring(5, 7));
         int month2 = Integer.parseInt(key2.toString().substring(5, 7));

         return Integer.compareTo(month1, month2);
       }

       return Integer.compareTo(year1, year2);
 }
}

Wrap-Up

To use your new ninja move, we must configure the MapReduce job correct. The combiner, partitioner and comparators are all define in the job configuration as seen below:

 conf.setCombinerClass(Combiner.class);
conf.setPartitionerClass(Partitioner.class);
conf.setOutputValueGroupingComparator(PartitionComparator.class);
conf.setOutputKeyComparatorClass(SecondarySortComparator.class);

I hope this post has been helpful in expanded your knowledge of MapReduce and has given you some tools to take hold of how execution occurs within the the framework.

Till next time!

Chris