一些算法的MapReduce实现——倒排索引实现


Introduce to Inverted List

有两种不同的反向索引形式:

{2}
 "banana": {2}
 "is":     {0, 1, 2}
 "it":     {0, 1, 2}
 "what":   {0, 1}


{(2, 2)}
"banana": {(2, 3)}
"is":     {(0, 1), (0, 4), (1, 1), (2, 1)}
"it":     {(0, 0), (0, 3), (1, 2), (2, 0)} 
"what":   {(0, 2), (1, 0)}

MapReduce Implementation

class MAPPER
	procedure MAP(docid n, doc d)
		for all term t in doc d
			EMIT(term t, <n, 1>)
			
class REDUCER
	method INITIALIZE
		t(pre) <-- null
	procedure REDUCER(term t, postings[<docid n1, tf1>,<docid n2, tf2>....])
		P <-- new ASSOCIATIVE_SORTED_MAP
		if t(pre) != t AND t(pre) != null
			EMIT(t, P)
			P.RESET
		for all posting<n, tf> in postings[....]
			P{n, tf} = P{n, tf++};
	method CLOSE
		EMIT(term t, P)



(tuple <term t, docis n>, tf)

/**
   * input format
   *    docid<tab>doc content
   *    
   * output format
   *    (term:docid)<tab>(tf in this doc)
   *
   */
  public static class InvertedListMap extends Mapper<IntWritable/*docid*/, Text/*doc content*/, Text, IntWritable> {

    @Override
    protected void map(IntWritable key, Text value, Context context)
        throws IOException, InterruptedException {
      HashMap<Text, IntWritable> freqs = new HashMap<Text, IntWritable> ();
      
      // the document can be preprocessed by thrid analynized tool first
      // here is simplify this procedure using split by whitespace instead
      String[] terms = value.toString().split(" "); 
      for (String term : terms ) {
        
        if (term == null || "".equals(term)) continue;
        
        if (freqs.containsKey(new Text(term))) {
          int tf = freqs.get(new Text(term)).get();
          freqs.put(new Text(term), new IntWritable(tf + 1));
        } else {
          freqs.put(new Text(term), new IntWritable(1));
        }
      } // end of for loop
      
      Iterator<Map.Entry<Text, IntWritable>> entryIter = (Iterator<Entry<Text, IntWritable>>) freqs.entrySet().iterator();
      while (entryIter.hasNext()) {
        Map.Entry<Text, IntWritable> entry = entryIter.next();
        Text tuple = new Text();
        tuple.set(entry.getKey().toString() + ":" + key.toString());
        context.write(tuple, freqs.get(entry.getKey()));
      }
    }
    
  }
  
  public static class InvertedListReduce extends Reducer <Text, IntWritable, Text, Map<IntWritable, IntWritable>> {

    private String term = null;
    private Map<IntWritable, IntWritable> posting = null;
    
    @Override
    protected void setup(Context context)
        throws IOException, InterruptedException {
      term = null;
      posting = new TreeMap<IntWritable, IntWritable>();
    }

    @Override
    protected void cleanup(Context context)
        throws IOException, InterruptedException {
      context.write(new Text(term), posting);
    }

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values,
        Context context) throws IOException, InterruptedException {
      String[] tuple = key.toString().split(":");
      if (term != null && !term.equalsIgnoreCase(tuple[0])) {
        context.write(new Text(tuple[0]), posting);
        posting.clear();
      } else {
        for (IntWritable val : values) {
          posting.put(new IntWritable(Integer.parseInt(tuple[1])), val);
        }
        
        term = key.toString();
      }
    }
    
  }



Reference

1、http://blog.csdn.net/hguisu/article/details/7962350
2、2010 - Jimmy Lin - Data-Intensive Text Processing with MapReduce
3、http://chandramanitiwary.wordpress.com/2012/08/19/map-reduce-design-patterns-pairs-stripes/

相关内容