mapreduce在倒排索引中练习


倒排索引是文件检索系统中常用的数据结构,被广泛应用于全文章搜索引擎。

通常情况下,倒排索引由一个单词或词组以及相关的文档列表组成,文档列表中的文档或者是标识文档的ID号,或者是指定文档所在位置的URI;

在实际应用中,往往还需要给每个文档加一个权值,用来指出每个文档与搜索内容的相关度;

我的例子中,文档内容如下:

hadoop11:/home/in/win1 # hadoop fs -cat /user/root/in1/words.txt
mapreduce is simple
hadoop11:/home/in/win1 # hadoop fs -cat /user/root/in1/words1.txt
mapreduce is powerfull and is simple
hadoop11:/home/in/win1 # hadoop fs -cat /user/root/in1/words2.txt
cat: File does not exist: /user/root/in1/words2.txt

我的目标结果:

and words1.txt:1;
bye words3.txt:1;
hello words3.txt:1;
is words.txt:1;words1.txt:2;
mapreduce words1.txt:1;words3.txt:2;words.txt:1;
powerfull words1.txt:1;
simple words1.txt:1;words.txt:1;

代码清单,自己import吧:

 

public class indexSum {
 private static final Log log = LogFactory.getLog(LogCount.class);

 

 public static class TokenizerMapper1 extends
   Mapper<Object, Text, Text, Text> {

  private Text key1 = new Text();
  private Text value1 = new Text();

  public void map(Object key, Text value, Context context)
    throws IOException, InterruptedException {

   FileSplit split = (FileSplit) context.getInputSplit();

   StringTokenizer itr = new StringTokenizer(value.toString());

   while (itr.hasMoreTokens()) {
    key1.set(itr.nextToken() + ":"
      + split.getPath().getName().toString());

    value1.set("1");

    context.write(key1, value1);
   }
  }
 }

 public static class IntSumReducer extends Reducer<Text, Text, Text, Text> {
  private Text result = new Text();

  public void reduce(Text key, Iterable<Text> values, Context context)
    throws IOException, InterruptedException {
   int sum = 0;

   for (Text val : values) {
    sum += Integer.parseInt(val.toString());
   }
   int splitIndex = key.toString().indexOf(":");
   result.set(key.toString().substring(splitIndex + 1) + ":" + sum);
   key.set(key.toString().substring(0, splitIndex));
   context.write(key, result);
  }

 }

 public static class IntSumReducer3 extends Reducer<Text, Text, Text, Text> {
  private Text result = new Text();

  public void reduce(Text key, Iterable<Text> values, Context context)
    throws IOException, InterruptedException {

   String valueArray = new String();
   for (Text value : values) {
    valueArray += value.toString() + ";";
   }

   result.set(valueArray);

   context.write(key, result);
  }

 }

 

 public static void main(String[] args) throws Exception {
  log.info("===============Begin at : " + new Date());
 
  String[] argsTemp = { "hdfs://hadoop11:8020/user/root/in1", "out4" };
  args = argsTemp;

  File jarFile = EJob.createTempJar("bin");
  Configuration conf = new Configuration();
  conf.set("hadoop.job.ugi", "root,root");
  conf.set("fs.default.name", "hdfs://hadoop11:8020/");
  conf.set("mapred.job.tracker", "hadoop11:8021");

  String[] otherArgs = new GenericOptionsParser(conf, args)
    .getRemainingArgs();
  if (otherArgs.length != 2) {
   System.err.println("Usage: wordcount <in> <out>");
   System.exit(2);
  }
  Job job = new Job(conf, "word count");
  job.setJarByClass(LogCount.class);
  ((JobConf) job.getConfiguration()).setJar(jarFile.toString());

 
  job.setMapperClass(TokenizerMapper1.class);
  job.setCombinerClass(IntSumReducer.class);

  job.setReducerClass(IntSumReducer3.class);

  FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
  FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(Text.class);

  System.exit(job.waitForCompletion(true) ? 0 : 1);

  log.info("===============End at : " + new Date());
 }
}

 

 

 

参考apache官网的例子;

相关内容

    暂无相关文章