As advised here, it is better to combine input files - so there will be less number of map tasks with larger piece of data to process in each one.
Here is how to combine input files.
(1) Pick a regular record reader class, like LineRecorReader.
(2) Define your own record record reader class for multi file inputs using an instance of (1) .
(3) Define your own input format class which extends CombineFileInputFormat and returns (2)
The trick is regular recordReader uses fileSplit instance, however record reader to be used with combineFileInputFormat should you be using CombineFileSplit. Here is the code:
Name of the regular record reader class I use in this example is RawWebGraphRecordReader. Its basic idea is similar to LineRecordReader.
Below is the code for multi file record reader -MultiFileRawWebGraphRecordReader and input format -RawGraphInputFormat
public class MultiFileRawWebGraphRecordReader extends RecordReader < Text, Text > { private static final Log LOG = LogFactory.getLog(MultiFileRawWebGraphRecordReader.class); private CombineFileSplit split; private TaskAttemptContext context; private int index; private RecordReader< Text, Text > rr; public MultiFileRawWebGraphRecordReader(CombineFileSplit split, TaskAttemptContext context, Integer index) throws IOException { this.split = split; this.context = context; this.index = index; rr = new RawWebGraphRecordReader(); } public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException, InterruptedException { this.split = (CombineFileSplit) genericSplit; this.context = context; if (null == rr) { rr = new RawWebGraphRecordReader(); } FileSplit fileSplit = new FileSplit(this.split.getPath(index), this.split.getOffset(index), this.split.getLength(index), this.split.getLocations()); this.rr.initialize(fileSplit, this.context); } public boolean nextKeyValue() throws IOException, InterruptedException { return rr.nextKeyValue(); } @Override public Text getCurrentKey() throws IOException, InterruptedException { return rr.getCurrentKey(); } @Override public Text getCurrentValue() throws IOException, InterruptedException { return rr.getCurrentValue(); } /** * Get the progress within the split * @throws InterruptedException * @throws IOException */ @Override public float getProgress() throws IOException, InterruptedException { return rr.getProgress(); } public synchronized void close() throws IOException { if (rr != null) { rr.close(); rr = null; } }
public static class RawGraphInputFormat extends CombineFileInputFormat< Text, Text > { @Override public RecordReader< Text, Text > createRecordReader(InputSplit split, TaskAttemptContext context)throws IOException { return new CombineFileRecordReader< Text, Text >( (CombineFileSplit) split, context, MultiFileRawWebGraphRecordReader.class); } @Override protected boolean isSplitable(JobContext context, Path file) { CompressionCodec codec = new CompressionCodecFactory( context.getConfiguration()).getCodec(file); return codec == null; } }