MapReduce实现倒排索引,mapreduce实现索引


使用到Combiner编程(可插拔式)

在map端对输出先做合并,最基本是实现本地key合并,具有本地reduce功能

如果不用combiner,所有结果都是reduce完成,效率会底下

Combiner的的输入输出类型应该完全一致(实现如累加,最大值等功能)

job.setCombinerClass();

倒排索引基本实现

package cn.MapReduce.px;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class InverIndex {
	/*
	 * dao pai suoyin
	 */
	public static class InverIndexMap extends
			Mapper<LongWritable, Text, Text, Text> {

		private Text k2 = new Text();
		private Text v2 = new Text();

		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			String line = value.toString();
			String words[] = line.split(" ");
			// get fileObject
			FileSplit inputSplit = (FileSplit) context.getInputSplit();
			String path = inputSplit.getPath().toString();
			for (String word : words) {
				String WordAndPath = word + "->" + path;
				k2.set(WordAndPath);
				v2.set("1");
				context.write(k2, v2);
			}

		}
	}

	public static class InverIndexCombiner extends
			Reducer<Text, Text, Text, Text> {

		private Text k4 = new Text();
		private Text v4 = new Text();

		protected void reduce(Text k3, Iterable<Text> v3s, Context context)
				throws IOException, InterruptedException {
			String word = k3.toString().split("->")[0];
			String path = k3.toString().split("->")[1];
			Integer count = 0;
			for (Text t : v3s) {
				count += Integer.parseInt(t.toString());
			}
			String PathAndCount = path + "->" + count;
			k4.set(word);
			v4.set(PathAndCount);
			context.write(k4, v4);
		}

	}

	public static class InverIndexReduce extends
			Reducer<Text, Text, Text, Text> {

		private Text v6 = new Text();

		protected void reduce(Text k5, Iterable<Text> v5s, Context context)
				throws IOException, InterruptedException {
			String result = "";
			for (Text t : v5s) {
				result += t.toString() + "\t";
			}
			v6.set(result);
			context.write(k5, v6);
		}

	}

	public static void main(String[] args) throws Exception {
		Job job = Job.getInstance(new Configuration());
		job.setJarByClass(InverIndex.class);
		job.setMapperClass(InverIndexMap.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		FileInputFormat.setInputPaths(job, new Path(args[0]));

		job.setCombinerClass(InverIndexCombiner.class);
		job.setReducerClass(InverIndexReduce.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		job.waitForCompletion(true);
	}

}


 

 

相关内容