November 03, 2010

Using Distributed Cache in Hadoop

Distributed cache allows to share static data among all nodes. In order use this functionality, the data location should be set before MR job starts.

Here is an example usage for distributed cache. While working on web-graph problem I replace URLs with unique id's. If I have the url-Id mapping in memory, I can easily replace URLs with their corresponding ids. So here is the sample usage:

public static class ReplacementMapper extends Mapper<Text, Text, Text, Text> {

    private HashMap<String, String> idmap;

    @Override
    public void setup(Context context) {
     LoadIdUrlMapping(context);
    }

    @Override
    public void map(Text key, Text value, Context context) throws InterruptedException {
        ....
    }

id-url mapping is loaded at the beginning of each Map task. Below example simply reads the file out of HDFS and stores the data in a hashmap for quick access. Here is the function:
private void loadIdUrlMapping(Context context) {
   
 FSDataInputStream in = null;
 BufferedReader br = null;
 try {
  FileSystem fs = FileSystem.get(context.getConfiguration());
  Path path = new Path(cacheFileLocation);
  in = fs.open(path);
  br  = new BufferedReader(new InputStreamReader(in));
 } catch (FileNotFoundException e1) {
  e1.printStackTrace();
  System.out.println("read from distributed cache: file not found!");
 } catch (IOException e1) {
  e1.printStackTrace();
  System.out.println("read from distributed cache: IO exception!");
 }
 try {
  this.idmap = new HashMap< string, string >();
  String line = "";
  while ( (line = br.readLine() )!= null) {
   String[] arr = line.split("\t");
   if (arr.length == 2)
    idmap.put(arr[1], arr[0]);
  }
  in.close();
 } catch (IOException e1) {
  e1.printStackTrace();
  System.out.println("read from distributed cache: read length and instances");
 }
   }
}

This is one way of accessing shared data among hadoop nodes. Other way of accessing it is through local file system. Here is a great article about how to throw cache data automatically among nodes and access it later.