March 07, 2011

many small input files

If MR input directory consists of many small files (couple MBs), then there will be a seperate map task for each and probably these map tasks will last only for 1-2 secs. it kills the performance ! so much scheduling and/or initialization overhead .. 

As advised here, it is better to combine input files - so there will be less number of map tasks with larger piece of data to process in each one. 

Here is how to combine input files. 
(1) Pick a regular record reader class, like LineRecorReader. 
(2) Define your own record record reader class for multi file inputs using an instance of (1)
(3) Define your own input format class which extends CombineFileInputFormat and returns (2)

The trick is regular recordReader uses fileSplit instance, however record reader to be used with combineFileInputFormat should you be using CombineFileSplit. Here is the code: 

Name of the regular record reader class I use in this example is RawWebGraphRecordReader. Its basic idea is similar to LineRecordReader. 
Below is the code for multi file record reader -MultiFileRawWebGraphRecordReader and input format -RawGraphInputFormat
public class MultiFileRawWebGraphRecordReader extends 
                                       RecordReader < Text, Text > {
 private static final Log LOG = LogFactory.getLog(MultiFileRawWebGraphRecordReader.class);

 private CombineFileSplit split;
 private TaskAttemptContext context;
 private int index;
 private RecordReader< Text, Text > rr;

 public MultiFileRawWebGraphRecordReader(CombineFileSplit split,
                                         TaskAttemptContext context, 
                                         Integer index) throws IOException {
  this.split = split;
  this.context = context;
  this.index = index;
  rr = new RawWebGraphRecordReader();
 }
 
 
 public void initialize(InputSplit genericSplit, TaskAttemptContext context)
 throws IOException, InterruptedException {

  this.split = (CombineFileSplit) genericSplit;
  this.context = context;

  if (null == rr) {
   rr = new RawWebGraphRecordReader();
  }

  FileSplit fileSplit = new FileSplit(this.split.getPath(index), 
                                      this.split.getOffset(index), 
                                      this.split.getLength(index), 
                                      this.split.getLocations());
  this.rr.initialize(fileSplit, this.context);
 }
 
 public boolean nextKeyValue() throws IOException, InterruptedException {
  return rr.nextKeyValue();
 }

 @Override
 public Text getCurrentKey() throws IOException, InterruptedException {
  return rr.getCurrentKey();
 }

 @Override
 public Text getCurrentValue() throws IOException, InterruptedException {
  return rr.getCurrentValue();
 }

 /**
  * Get the progress within the split
  * @throws InterruptedException 
  * @throws IOException 
  */
 @Override
 public float getProgress() throws IOException, InterruptedException {
  return rr.getProgress();
 }

 public synchronized void close() throws IOException {
  if (rr != null) {
   rr.close();
   rr = null;
  }
 }

public static class RawGraphInputFormat extends 
                                 CombineFileInputFormat< Text, Text > {

  @Override
  public RecordReader< Text, Text > 
                     createRecordReader(InputSplit split, 
                                        TaskAttemptContext context)throws IOException {
   return new CombineFileRecordReader< Text, Text >( 
                                 (CombineFileSplit) split, 
                                  context, 
                                  MultiFileRawWebGraphRecordReader.class);
  }
  
  @Override
  protected boolean isSplitable(JobContext context, Path file) {
   CompressionCodec codec = new CompressionCodecFactory(    
                                context.getConfiguration()).getCodec(file);
   return codec == null;
  }

 }