public static class FinalLayersReducer extends Reducer<IntWritable, Text, WritableComparable,Writable>
{
public void setup(Context context)
{
mos = new MultipleOutputs(context);
}
public void reduce(IntWritable key, Iterable<text> values, Context context) throws IOException, InterruptedException {
for ( Text val : values) {
// some sort of a computation ..
}
mos.write("outlink", key, outlink_text);
mos.write("timestamp", key, timestamp_text);
}
protected void cleanup(Context context) throws IOException, InterruptedException {
mos.close();
}
}
public static void main(String[] args) throws Exception {
Job job = new Job(conf, "prepare final layer files");
// other job settings ..
MultipleOutputs.addNamedOutput(job, "outlink", TextOutputFormat.class , IntWritable.class, Text.class);
MultipleOutputs.addNamedOutput(job, "timestamp", TextOutputFormat.class , IntWritable.class, Text.class);
}
Facing zero sized output files OR lines in the 2 separate outputs do not match when they supposed to OR can not unzip the output files -> these are signs are telling that you forget to close() the MultipleOutputs object at the end - in the cleanup() function.