Showing posts with label hadoop. Show all posts
Showing posts with label hadoop. Show all posts

June 20, 2011

How to pass job specific parameters in Hadoop

Say there is a parameter your mapper or reducer needs, and it is desirable to get this parameter from the user at the beginning of the job submission. Here is how to use "Configuration" to let the user set the parameter:

public class GenericReplace {

   public static final String IS_KEY_FIRST = "IsKeyFirstInMapFile";

   public static class GenerateLinks extends Mapper {

      public void map(Text key, Text value, Context context)  {
         if (context.getConfiguration().getInt(IS_KEY_FIRST, 1)) {
              //do this .. 
         }
         else{
              //do that .. 
         }
      }
   }

   public static void main(String[] args) throws Exception {

   Configuration conf = new Configuration();
   GenericReplace.graphPath = args[0];
   GenericReplace.outputPath = args[1];
   conf.setBoolean(IS_KEY_FIRST , Boolean.getBoolean(args[3]));
   Job job = Job.getInstance(new Cluster(conf), conf);
   ...
   }
}

Ways to write & read HDFS files

- Output Stream
FSDataOutputStream dos = fs.create(new Path("/user/tmp"), true); 
dos.writeInt(counter); 
dos.close();

- Buffered Writer/Reader
//Writer
BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(fs.create(new Path("/user/tmp"), true)));
bw.write(counter.toString());
bw.close();

//Reader
DataInputStream d = new DataInputStream(fs.open(new Path(inFile)));
BufferedReader reader = new BufferedReader(new InputStreamReader(d));
while ((line = reader.readLine()) != null){
...
}
reader.close();
  

- SequenceFile Reader and Writer (I think most preferable way for Hadoop jobs):
//writer
SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, new Path(pathForCounters, context.getTaskAttemptID().toString()), Text.class, Text.class);
   writer.append(new Text(firtUrl.toString()+"__"+ context.getTaskAttemptID().getTaskID().toString()), new Text(counter+""));
   writer.close(); 

//reader
SequenceFile.Reader reader = new SequenceFile.Reader(fs, new Path(makeUUrlFileOffsetsPathName(FileInputFormat.getInputPaths(context)[0].toString())),  conf);
   while (reader.next(key, val)){
    offsets.put(key.toString(), Integer.parseInt(val.toString()));
   }

June 14, 2011

How to ensure each key in Reducer has sorted iterator ?

There are three properties control how values are partitioned and sorted for reducer's consumption. As mentioned at riccomini's blog Owen O'Malley explains with a very simple & nice example. By default intermediate pairs are partitioned using the key. To manipulate this behavior, custom Partitioner can be defined.

Once we ensure pairs belonging to same partition are sent to the same reducer, now there are two functions take care of their ordering and grouping of keys in each partition/reducer.
setOutputKeyComparatorClass defines the sort order of the keys and setOutputValueGroupingComparator defines the groups, which pairs will be grouped together to process once. Order of values at the reducer's iterator can be set using combination of these two.

public static class RemoveIdentifierAndPartition extends Partitioner< Text, Writable > {

  @Override
  public int getPartition(Text key, Writable value, int numReduceTasks) {
   return (removeKeyIdentifier(key.toString()).hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  }
 }

 public static final class SortReducerByValuesValueGroupingComparator implements RawComparator< Text >  {
     private static Text.Comparator NODE_COMPARATOR = new Text.Comparator();

     @Override
     public int compare(Text e1, Text e2) {
         return e1.compareTo(e2);
     }

     @Override
     public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {

         // skip last 2 bytes ( space 1 / space 2)
         
         int skip = 2;
         int stringsize1 = 0;
         int stringsize2 = 0;

         // compare the byte array of Node first
         return NODE_COMPARATOR.compare(b1, s1 , l1-skip, b2, s2 , l2-skip);
     }
 }

How to set number of Maps with Hadoop

Setting number of map tasks is not simple like the reduce tasks. User can not explicitly give a fixed number, FileInputFormat decides how to split input files using various parameters.

First one is isSplitable, determines whether file is splittable or not.
Next three variables, mapred.min.split.size, mapred.max.split.size, dfs.block.size determine the actual split size used if input is splittable. By default, min split size is 0 and max split size is Long.MAX and block size 64MB. For actual split size; minSplitSize&blockSize set the lower bound and blockSize&maxSplitSize together sets the upper bound. Here is the function to calculate:
max(minsplitsize, min(maxsplitsize, blocksize))
Note: compressed input files (eg. gzip) are not splittable, there are patches * * available.

April 21, 2011

Hadoop - Incompatible namespaceIDs Error

After formatting the namenode, restarting Hadoop fails - more specifically namenode does not start with Incompatible namespaceIDs Error.
bin/hadoop namenode -format
..
bin/start-dfs.sh
..
ERROR org.apache.hadoop.hdfs.server.datanode.DataNode:
  java.io.IOException: Incompatible namespaceIDs in /hadoop21/hdfs/datadir: 
  namenode namespaceID = 515704843; datanode namespaceID = 572408927
  ..
Why? -datanodes have the old version number after formatting the namenode.
Solution? - hacking in to <hdfs-data-path>/datadir/current/VERSION file and changing the version number with the new one (which is 572408927 in this example) solves the problem. Make sure to change it for every data-node in the cluster.

WARNING: most probably you will be loosing the data in HDFS. even though it is not deleted, not accessible with the new version.

To avoid such a boring case, be careful before formatting. Take a look at this

April 19, 2011

Hadoop MultipleOutputs

Want to generate various types of output files. For example, I have a huge linkgraph with includes timestamp and the outlink information. I want put these two constrains into seperate files. Here is how to use MultipleOutputFormat for this purpose:
public static class FinalLayersReducer extends Reducer<IntWritable, Text, WritableComparable,Writable> 
{
     public void setup(Context context) 
     {
          mos = new MultipleOutputs(context);
     }
  
     public void reduce(IntWritable key, Iterable<text> values, Context context) throws IOException, InterruptedException {
          for ( Text val : values) {
          // some sort of a computation ..
          }
          mos.write("outlink", key, outlink_text);
          mos.write("timestamp", key, timestamp_text);
     }

     protected void cleanup(Context context) throws IOException, InterruptedException {
          mos.close();
     }
}

public static void main(String[] args) throws Exception {

     Job job = new Job(conf, "prepare final layer files");
     
     // other job settings ..

     MultipleOutputs.addNamedOutput(job, "outlink", TextOutputFormat.class , IntWritable.class, Text.class);
     MultipleOutputs.addNamedOutput(job, "timestamp", TextOutputFormat.class , IntWritable.class, Text.class);
}

Facing zero sized output files OR lines in the 2 separate outputs do not match when they supposed to OR can not unzip the output files -> these are signs are telling that you forget to close() the MultipleOutputs object at the end - in the cleanup() function.

April 18, 2011

Hadoop Intermediate Data Compression

To enable intermediate data compression, setup corresponding variables in mapred-site.xml.
<!-- mapred-site.xml -->   
<property>
    <name> mapreduce.map.output.compress </name> 
    <value> true</value> 
</property>
<property>
    <name>mapreduce.map.output.compress.codec</name>
    <value>org.apache.hadoop.io.compress.GzipCodec</value>
</property>

Setting up LZO compression is a bit tricky. First of all, should install LZO package on all nodes. I built this package and followed instructions here.

Having difficulty while building  eg: "BUILD FAILED make sure $JAVA_HOME set correctly." - then take a look at here.

At the end, this is how my config files look like:
<!-- mapred-site.xml -->
<property>
    <name> mapreduce.map.output.compress </name> 
    <value> true</value> 
</property>
<property>
    <name>mapreduce.map.output.compress.codec</name>
    <value>com.hadoop.compression.lzo.LzoCodec</value>
</property>

<!-- core-site.xml -->
<property>
    <name>io.compression.codecs</name>
    <value>org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec,com.hadoop.compression.lzo.LzoCodec,com.hadoop.compression.lzo.LzopCodec,org.apache.hadoop.io.compress.BZip2Codec</value>
</property>
<property>
    <name>io.compression.codec.lzo.class</name>
    <value>com.hadoop.compression.lzo.LzoCodec</value>
</property>

To compress final output data, Job object should be set to output compressed data before its execution.

Compressing Hadoop Output usinig Gzip and Lzo

In most of the cases, writing out output files in compressed format is faster - less amount of data will be written. To have a faster computation, compression algorithm should perform well - so time is saved even though there is an extra compression time overhead.

Compressing regular output formats with Gzip, use:
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setCompressOutput(job, true);
TextOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
...

For Lzo Output compression, download this package by @kevinweil. Then following should work:
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setCompressOutput(job, true);
TextOutputFormat.setOutputCompressorClass(job, LzoCodec.class);
...

In terms of space efficiency, Gzip compresses better. However, in terms of time Lzo i smuch faster. Also, it is possible to split Lzo files, splittable Gzip is not available.
Keep in mind that these two techniques will only compress the final outputs of a Hadoop job. To be able to compress intermediate data, parameters in mapred-site.xml should be configured.

April 11, 2011

LZO build problem

Trying to built the lzo library for Hadoop, it failed with "make sure $JAVA_HOME" set correctly message. Here is the full error log:
....     
   [exec] checking jni.h usability... no
   [exec] checking jni.h presence... no
   [exec] checking for jni.h... no
   [exec] configure: error: Native java headers not found. 
   Is $JAVA_HOME set correctly?
BUILD FAILED make sure $JAVA_HOME set correctly. 

This means build is using incorrect java installation. To make sure JAVA_HOME is pointing to the correct one use apt-file search - searches in all packages installed in your system.
apt-file search jni.h
And then set JAVA_HOME accordingly.

Common Hadoop HDFS exceptions with large files

Big data in HDFS, so many disk problems. First of all, make sure there are at least ~20-30% free space in each node. There are two other problems I faced recently:

all datanodes are bad
This error could be cause because of there are too many open files. limit is 1024 by default. To increase this use
ulimit -n newsize
For more information click!

error in shuffle in fetcher#k 
This is another problem - here is full error log:
2011-04-11 05:59:45,744 WARN org.apache.hadoop.mapred.Child: 
Exception running child : org.apache.hadoop.mapreduce.task.reduce.Shuffle$ShuffleError: 
error in shuffle in fetcher#2
 at org.apache.hadoop.mapreduce.task.reduce.Shuffle.run(Shuffle.java:124)
 at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:362)
 at org.apache.hadoop.mapred.Child$4.run(Child.java:217)
 at java.security.AccessController.doPrivileged(Native Method)
 at javax.security.auth.Subject.doAs(Subject.java:416)
 at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:742)
 at org.apache.hadoop.mapred.Child.main(Child.java:211)
Caused by: java.lang.OutOfMemoryError: Java heap space
 at org.apache.hadoop.io.BoundedByteArrayOutputStream.(BoundedByteArrayOutputStream.java:58)
 at org.apache.hadoop.io.BoundedByteArrayOutputStream.(BoundedByteArrayOutputStream.java:45)
 at org.apache.hadoop.mapreduce.task.reduce.MapOutput.(MapOutput.java:104)
 at org.apache.hadoop.mapreduce.task.reduce.MergeManager.unconditionalReserve(MergeManager.java:267)
 at org.apache.hadoop.mapreduce.task.reduce.MergeManager.reserve(MergeManager.java:257)
 at org.apache.hadoop.mapreduce.task.reduce.Fetcher.copyMapOutput(Fetcher.java:305)
 at org.apache.hadoop.mapreduce.task.reduce.Fetcher.copyFromHost(Fetcher.java:251)
 at org.apache.hadoop.mapreduce.task.reduce.Fetcher.run(Fetcher.java:149)

One way to go around this problem is making sure there are not too many map tasks for small input files. If possible you can cat input files manually to create bigger chunks or push hadoop to combine multiple tiny input files for a single mapper. For more details, take a look at here.

Also, at Hadoop discussions groups, it is mentioned that default value of dfs.datanode.max.xcievers parameter, the upper bound for the number of files an HDFS DataNode can serve, is too low and causes ShuffleError. In hdfs-site.xml, I set this value to 2048 and worked in my case.
<property>
        <name>dfs.datanode.max.xcievers</name>
        <value>2048</value>
  </property>

Update: Default value for dfs.datanode.max.xcievers is updated with this JIRA.

April 03, 2011

java.io.EOFException with Hadoop

My code runs smoothly with a smaller dataset, however whenever I run it with a larger one, it fails with java.io.EOFException I've been trying to figure out the problem.

11/03/31 01:13:55 INFO mapreduce.Job: 
  Task Id: attempt_201103301621_0025_m_000634_0, Status : FAILED
java.io.EOFException
 at java.io.DataInputStream.readFully(DataInputStream.java:197)
 at org.apache.hadoop.io.DataOutputBuffer$Buffer.write(DataOutputBuffer.java:68)
 at org.apache.hadoop.io.DataOutputBuffer.write(DataOutputBuffer.java:106)
 at org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:1999)
 at org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:2131)
 ...
 ...
 ...
 at org.apache.hadoop.mapred.MapTask$
  NewTrackingRecordReader.nextKeyValue(MapTask.java:465)
 at org.apache.hadoop.mapreduce.task.MapContextImpl.nextKeyValue(MapContextImpl.java:80)
 at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.nextKeyValue(WrappedMapper.java:90)
 at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:143)
 at org.apache.hadoop.mapreduce.lib.input.DelegatingMapper.run
  (Delegatin

So, EOFException means something wrong with your input files. If files are not written & closed correctly, this exception is thrown - the file systems thinks there are more to read but actually number of bytes left are less than expected.
To solve the problem, dig into the input files and make sure they are created carefully without any corruption. Also if MultipleOutputs is used to prepare input files, make sure it is also closed at the end!

March 29, 2011

WARNING : There are about 1 missing blocks. Please check the log or run fsck.

hadoop fsck /
hadoop fsck -delete / 
hadoop fsck -move / 
-move option moves under /lost+found
-delete option deleted all corrupted files
For more options: http://developer.yahoo.com/hadoop/tutorial/module2.html

March 09, 2011

Hadoop - MapReduce without reducer to avoid sorting

MR job can be defined with no reducer. In this case, all the mappers write their outputs under specified job output directory. So; there will be no sorting and no partitioning.
Just set the number of reduces to 0.
job.setNumReduceTasks(0);

March 07, 2011

many small input files

If MR input directory consists of many small files (couple MBs), then there will be a seperate map task for each and probably these map tasks will last only for 1-2 secs. it kills the performance ! so much scheduling and/or initialization overhead .. 

As advised here, it is better to combine input files - so there will be less number of map tasks with larger piece of data to process in each one. 

Here is how to combine input files. 
(1) Pick a regular record reader class, like LineRecorReader. 
(2) Define your own record record reader class for multi file inputs using an instance of (1)
(3) Define your own input format class which extends CombineFileInputFormat and returns (2)

The trick is regular recordReader uses fileSplit instance, however record reader to be used with combineFileInputFormat should you be using CombineFileSplit. Here is the code: 

Name of the regular record reader class I use in this example is RawWebGraphRecordReader. Its basic idea is similar to LineRecordReader. 
Below is the code for multi file record reader -MultiFileRawWebGraphRecordReader and input format -RawGraphInputFormat
public class MultiFileRawWebGraphRecordReader extends 
                                       RecordReader < Text, Text > {
 private static final Log LOG = LogFactory.getLog(MultiFileRawWebGraphRecordReader.class);

 private CombineFileSplit split;
 private TaskAttemptContext context;
 private int index;
 private RecordReader< Text, Text > rr;

 public MultiFileRawWebGraphRecordReader(CombineFileSplit split,
                                         TaskAttemptContext context, 
                                         Integer index) throws IOException {
  this.split = split;
  this.context = context;
  this.index = index;
  rr = new RawWebGraphRecordReader();
 }
 
 
 public void initialize(InputSplit genericSplit, TaskAttemptContext context)
 throws IOException, InterruptedException {

  this.split = (CombineFileSplit) genericSplit;
  this.context = context;

  if (null == rr) {
   rr = new RawWebGraphRecordReader();
  }

  FileSplit fileSplit = new FileSplit(this.split.getPath(index), 
                                      this.split.getOffset(index), 
                                      this.split.getLength(index), 
                                      this.split.getLocations());
  this.rr.initialize(fileSplit, this.context);
 }
 
 public boolean nextKeyValue() throws IOException, InterruptedException {
  return rr.nextKeyValue();
 }

 @Override
 public Text getCurrentKey() throws IOException, InterruptedException {
  return rr.getCurrentKey();
 }

 @Override
 public Text getCurrentValue() throws IOException, InterruptedException {
  return rr.getCurrentValue();
 }

 /**
  * Get the progress within the split
  * @throws InterruptedException 
  * @throws IOException 
  */
 @Override
 public float getProgress() throws IOException, InterruptedException {
  return rr.getProgress();
 }

 public synchronized void close() throws IOException {
  if (rr != null) {
   rr.close();
   rr = null;
  }
 }

public static class RawGraphInputFormat extends 
                                 CombineFileInputFormat< Text, Text > {

  @Override
  public RecordReader< Text, Text > 
                     createRecordReader(InputSplit split, 
                                        TaskAttemptContext context)throws IOException {
   return new CombineFileRecordReader< Text, Text >( 
                                 (CombineFileSplit) split, 
                                  context, 
                                  MultiFileRawWebGraphRecordReader.class);
  }
  
  @Override
  protected boolean isSplitable(JobContext context, Path file) {
   CompressionCodec codec = new CompressionCodecFactory(    
                                context.getConfiguration()).getCodec(file);
   return codec == null;
  }

 }

Hadoop - Java Heap Space Error

"Error: Java Heap space" means I'm trying to allocate more memory then available in the system.
how to go around? (1) better configuration (2) look for unnecessarily allocated objects
Configuration

mapred.map.child.java.opts : heap size for map tasks
mapred.reduce.child.java.opts: heap size for reduce tasks

mapred.tasktracker.map.tasks.maximum: max map tasks can run simultaneously per node
mapred.tasktracker.reduce.tasks.maximum: max reduce tasks can run simultaneously per node

Make sure ((num_of_maps * map_heap_size) + (num_of_reducers * reduce_heap_size)) is not larger than memory available in the system. Max number of mappers & reducers can also be tuned looking at available system resources.

io.sort.factor: max # of streams to merge at once for sorting. Used both in map and reduce.

io.sort.mb: map side memory buffer size used while sorting
mapred.job.shuffle.input.buffer.percent: Reduce side buffer related - The percentage of memory to be allocated from the maximum heap size for storing map outputs during the shuffle

NOTE: Using fs.inmemory.size.mb is very bad idea!
Unnecessary memory allocation

Simply look for new keyword and make sure there is no unnecessary allocation. A very common tip is using set() method of Writable objects rather than re-allocating a new object at every map or reduce.
Here is a simple count example to show the trick:

public static class UrlReducer extends Reducer{
  IntWritable sumw = new IntWritable();
  int sum;

  public void reduce(Text key,Iterable<IntW> vals,Context context){
    sum=0;
    for (IntWritable val : vals) {
      sum += val.get();
    }
    sumw.set(sum);
    context.write(key, sumw);
  }
}

note: There are couple more tips here for resolving common errors in Hadoop.

December 03, 2010

make sure start-dfs.sh script is being executed on the master !

everything looks fine when start-all.sh is executed but there is no Namenode process apprear on the jps results. why ? also when I check the logs I see following networking exceptions:

ERROR org.apache.hadoop.hdfs.server.namenode.NameNode: java.net.BindException: Problem binding to hostname/ipaddress:host : Cannot assign requested address
        at org.apache.hadoop.ipc.Server.bind(Server.java:190)
        at org.apache.hadoop.ipc.Server$Listener.(Server.java:253)
        at org.apache.hadoop.ipc.Server.(Server.java:1026)
        at org.apache.hadoop.ipc.RPC$Server.(RPC.java:488)

when I try to do an ls on dfs, I get following:
ipc.Client: Retrying connect to server:  hostname/ipaddress:host
...
...
Bad connection to FS. command aborted

I spent lots of time trying to figure out the "networking" problem, checked if the port is already in use, ip4/ip6 conflict etc ..

at the end, I realized that I'm running start-all.sh script on a random node. When it is executed on the master node, works just fine ! simple fix ..

November 24, 2010

hadoop - wrong key class exception

Usually happens because of mismatch between Map or Reduce class signature and configuration settings.

But also be careful about the combiner ! Check if you are using the same class as reducer and combiner. If reducer's input key-value pair is not same as its output key-value pair, then it can not be used as a combiner -because combiner's output will became input on the reducer side !

here is an example:
reducer input key val : < IntWritable, IntWritable >
reducer output key val: < Text, Text >

if this reducer is used as combiner, then the combiner will output <text, text> and reducer will receive <text, text> as input - and boom - wrong key class exception !

November 23, 2010

java.lang.InstantiationException hadoop

java.lang.InstantiationException definition:
Thrown when an application tries to create an instance of a class using the newInstance method in class Class, but the specified class object cannot be instantiated because it is an interface or is an abstract class.

I get this exception for setting input reader to FileInputFormat
FileInputFormat is an abstract class !
job.setInputFormatClass(FileInputFormat.class)

Default is TextInputFormat and it can be used instead..
job.setInputFormatClass(TextInputFormat.class)

exception:
Exception in thread "main" java.lang.RuntimeException: java.lang.InstantiationException
        at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:123)
        at org.apache.hadoop.mapreduce.lib.input.MultipleInputs.getInputFormatMap(MultipleInputs.java:109)
        at org.apache.hadoop.mapreduce.lib.input.DelegatingInputFormat.getSplits(DelegatingInputFormat.java:58)
        at org.apache.hadoop.mapreduce.JobSubmitter.writeNewSplits(JobSubmitter.java:401)
        at org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:418)
        at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:338)
        at org.apache.hadoop.mapreduce.Job.submit(Job.java:960)
        at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:976)
        at nyu.cs.webgraph.LinkGraphUrlIdReplacement.phase1(LinkGraphUrlIdReplacement.java:326)
        at nyu.cs.webgraph.LinkGraphUrlIdReplacement.main(LinkGraphUrlIdReplacement.java:351)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:616)
        at org.apache.hadoop.util.RunJar.main(RunJar.java:192)
Caused by: java.lang.InstantiationException
        at sun.reflect.InstantiationExceptionConstructorAccessorImpl.newInstance(InstantiationExceptionConstructorAccessorImpl.java:48)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:532)
        at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:121)
        ... 14 more

November 03, 2010

Using Distributed Cache in Hadoop

Distributed cache allows to share static data among all nodes. In order use this functionality, the data location should be set before MR job starts.

Here is an example usage for distributed cache. While working on web-graph problem I replace URLs with unique id's. If I have the url-Id mapping in memory, I can easily replace URLs with their corresponding ids. So here is the sample usage:

public static class ReplacementMapper extends Mapper<Text, Text, Text, Text> {

    private HashMap<String, String> idmap;

    @Override
    public void setup(Context context) {
     LoadIdUrlMapping(context);
    }

    @Override
    public void map(Text key, Text value, Context context) throws InterruptedException {
        ....
    }

id-url mapping is loaded at the beginning of each Map task. Below example simply reads the file out of HDFS and stores the data in a hashmap for quick access. Here is the function:
private void loadIdUrlMapping(Context context) {
   
 FSDataInputStream in = null;
 BufferedReader br = null;
 try {
  FileSystem fs = FileSystem.get(context.getConfiguration());
  Path path = new Path(cacheFileLocation);
  in = fs.open(path);
  br  = new BufferedReader(new InputStreamReader(in));
 } catch (FileNotFoundException e1) {
  e1.printStackTrace();
  System.out.println("read from distributed cache: file not found!");
 } catch (IOException e1) {
  e1.printStackTrace();
  System.out.println("read from distributed cache: IO exception!");
 }
 try {
  this.idmap = new HashMap< string, string >();
  String line = "";
  while ( (line = br.readLine() )!= null) {
   String[] arr = line.split("\t");
   if (arr.length == 2)
    idmap.put(arr[1], arr[0]);
  }
  in.close();
 } catch (IOException e1) {
  e1.printStackTrace();
  System.out.println("read from distributed cache: read length and instances");
 }
   }
}

This is one way of accessing shared data among hadoop nodes. Other way of accessing it is through local file system. Here is a great article about how to throw cache data automatically among nodes and access it later.