public class GenericReplace { public static final String IS_KEY_FIRST = "IsKeyFirstInMapFile"; public static class GenerateLinks extends Mapper { public void map(Text key, Text value, Context context) { if (context.getConfiguration().getInt(IS_KEY_FIRST, 1)) { //do this .. } else{ //do that .. } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); GenericReplace.graphPath = args[0]; GenericReplace.outputPath = args[1]; conf.setBoolean(IS_KEY_FIRST , Boolean.getBoolean(args[3])); Job job = Job.getInstance(new Cluster(conf), conf); ... } }
June 20, 2011
How to pass job specific parameters in Hadoop
Say there is a parameter your mapper or reducer needs, and it is desirable to get this parameter from the user at the beginning of the job submission. Here is how to use "Configuration" to let the user set the parameter:
Ways to write & read HDFS files
- Output Stream
- Buffered Writer/Reader
- SequenceFile Reader and Writer (I think most preferable way for Hadoop jobs):
FSDataOutputStream dos = fs.create(new Path("/user/tmp"), true); dos.writeInt(counter); dos.close();
- Buffered Writer/Reader
//Writer BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(fs.create(new Path("/user/tmp"), true))); bw.write(counter.toString()); bw.close(); //Reader DataInputStream d = new DataInputStream(fs.open(new Path(inFile))); BufferedReader reader = new BufferedReader(new InputStreamReader(d)); while ((line = reader.readLine()) != null){ ... } reader.close();
- SequenceFile Reader and Writer (I think most preferable way for Hadoop jobs):
//writer SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, new Path(pathForCounters, context.getTaskAttemptID().toString()), Text.class, Text.class); writer.append(new Text(firtUrl.toString()+"__"+ context.getTaskAttemptID().getTaskID().toString()), new Text(counter+"")); writer.close(); //reader SequenceFile.Reader reader = new SequenceFile.Reader(fs, new Path(makeUUrlFileOffsetsPathName(FileInputFormat.getInputPaths(context)[0].toString())), conf); while (reader.next(key, val)){ offsets.put(key.toString(), Integer.parseInt(val.toString())); }
June 19, 2011
June 14, 2011
How to ensure each key in Reducer has sorted iterator ?
There are three properties control how values are partitioned and sorted for reducer's consumption. As mentioned at riccomini's blog Owen O'Malley explains with a very simple & nice example. By default intermediate pairs are partitioned using the key. To manipulate this behavior, custom
Once we ensure pairs belonging to same partition are sent to the same reducer, now there are two functions take care of their ordering and grouping of keys in each partition/reducer.
Partitioner
can be defined. Once we ensure pairs belonging to same partition are sent to the same reducer, now there are two functions take care of their ordering and grouping of keys in each partition/reducer.
setOutputKeyComparatorClass
defines the sort order of the keys and setOutputValueGroupingComparator
defines the groups, which pairs will be grouped together to process once. Order of values at the reducer's iterator can be set using combination of these two. public static class RemoveIdentifierAndPartition extends Partitioner< Text, Writable > { @Override public int getPartition(Text key, Writable value, int numReduceTasks) { return (removeKeyIdentifier(key.toString()).hashCode() & Integer.MAX_VALUE) % numReduceTasks; } } public static final class SortReducerByValuesValueGroupingComparator implements RawComparator< Text > { private static Text.Comparator NODE_COMPARATOR = new Text.Comparator(); @Override public int compare(Text e1, Text e2) { return e1.compareTo(e2); } @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { // skip last 2 bytes ( space 1 / space 2) int skip = 2; int stringsize1 = 0; int stringsize2 = 0; // compare the byte array of Node first return NODE_COMPARATOR.compare(b1, s1 , l1-skip, b2, s2 , l2-skip); } }
How to set number of Maps with Hadoop
Setting number of map tasks is not simple like the reduce tasks. User can not explicitly give a fixed number, FileInputFormat decides how to split input files using various parameters.
First one is
Next three variables,
First one is
isSplitable
, determines whether file is splittable or not. Next three variables,
mapred.min.split.size
, mapred.max.split.size
, dfs.block.size
determine the actual split size used if input is splittable. By default, min split size is 0 and max split size is Long.MAX
and block size 64MB. For actual split size; minSplitSize&blockSize set the lower bound and blockSize&maxSplitSize together sets the upper bound. Here is the function to calculate:max(minsplitsize, min(maxsplitsize, blocksize))
Note: compressed input files (eg. gzip) are not splittable, there are patches * * available.
Subscribe to:
Posts (Atom)