June 14, 2011

How to ensure each key in Reducer has sorted iterator ?

There are three properties control how values are partitioned and sorted for reducer's consumption. As mentioned at riccomini's blog Owen O'Malley explains with a very simple & nice example. By default intermediate pairs are partitioned using the key. To manipulate this behavior, custom Partitioner can be defined.

Once we ensure pairs belonging to same partition are sent to the same reducer, now there are two functions take care of their ordering and grouping of keys in each partition/reducer.
setOutputKeyComparatorClass defines the sort order of the keys and setOutputValueGroupingComparator defines the groups, which pairs will be grouped together to process once. Order of values at the reducer's iterator can be set using combination of these two.

public static class RemoveIdentifierAndPartition extends Partitioner< Text, Writable > {

  @Override
  public int getPartition(Text key, Writable value, int numReduceTasks) {
   return (removeKeyIdentifier(key.toString()).hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  }
 }

 public static final class SortReducerByValuesValueGroupingComparator implements RawComparator< Text >  {
     private static Text.Comparator NODE_COMPARATOR = new Text.Comparator();

     @Override
     public int compare(Text e1, Text e2) {
         return e1.compareTo(e2);
     }

     @Override
     public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {

         // skip last 2 bytes ( space 1 / space 2)
         
         int skip = 2;
         int stringsize1 = 0;
         int stringsize2 = 0;

         // compare the byte array of Node first
         return NODE_COMPARATOR.compare(b1, s1 , l1-skip, b2, s2 , l2-skip);
     }
 }