public static class FinalLayersReducer extends Reducer<IntWritable, Text, WritableComparable,Writable> { public void setup(Context context) { mos = new MultipleOutputs(context); } public void reduce(IntWritable key, Iterable<text> values, Context context) throws IOException, InterruptedException { for ( Text val : values) { // some sort of a computation .. } mos.write("outlink", key, outlink_text); mos.write("timestamp", key, timestamp_text); } protected void cleanup(Context context) throws IOException, InterruptedException { mos.close(); } } public static void main(String[] args) throws Exception { Job job = new Job(conf, "prepare final layer files"); // other job settings .. MultipleOutputs.addNamedOutput(job, "outlink", TextOutputFormat.class , IntWritable.class, Text.class); MultipleOutputs.addNamedOutput(job, "timestamp", TextOutputFormat.class , IntWritable.class, Text.class); }
Facing zero sized output files OR lines in the 2 separate outputs do not match when they supposed to OR can not unzip the output files -> these are signs are telling that you forget to close() the MultipleOutputs object at the end - in the cleanup() function.