MapReduce处理数据去重与数据排序,mapreduce排序


一:MapReduce处理数据去重

Map的key具有数据去重的功能

/*
 * 去除数据中相同数据
 * 数据去重问题
 * 以整个数据作为key发送出去, value为null
 */
public class DelsameMap extends Mapper<LongWritable, Text, Text, Text> {
	@Override
	protected void map(LongWritable key, Text value,
			Mapper<LongWritable, Text, Text, Text>.Context context)
			throws IOException, InterruptedException {
		String line = value.toString();
		if (line.length() > 0) {
			context.write(new Text(line.trim()), new Text(""));
		}

	}
}


 

public class DelsameRedu extends Reducer<Text, Text, Text, NullWritable> {
	@Override
	protected void reduce(Text key, Iterable<Text> values,
			Reducer<Text, Text, Text, NullWritable>.Context context)
			throws IOException, InterruptedException {
		context.write(key, NullWritable.get());
	}
}
public class DelsameMain {
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = new Job(conf);
		job.setJarByClass(DelsameMain.class);

		job.setMapperClass(DelsameMap.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);

		job.setReducerClass(DelsameRedu.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(NullWritable.class);

		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		job.waitForCompletion(true);
	}
}


二:MapReduce处理数据排序

将原始数据作为map输出的key设置为int类型。map会自动的根据key进行排序

/*
 * mapreduce处理数据排序
 *将原始数据作为map输出的key设置为int类型。map会自动的根据key进行排序
 */
public class SortMap extends Mapper<LongWritable, Text, IntWritable, Text> {
	@Override
	protected void map(LongWritable key, Text value,
			Mapper<LongWritable, Text, IntWritable, Text>.Context context)
			throws IOException, InterruptedException {
		String line = value.toString();
		if (line.length() > 0) {
			context.write(new IntWritable(Integer.parseInt(line.trim())),
					new Text(""));
		}
	}
}


/*
 * 将values作为次序key。将map排序好的key作为value输出
 */
public class SortRedu extends
		Reducer<IntWritable, Text, IntWritable, IntWritable> {
	private IntWritable num = new IntWritable(1);

	@Override
	protected void reduce(IntWritable key, Iterable<Text> values,
			Reducer<IntWritable, Text, IntWritable, IntWritable>.Context context)
			throws IOException, InterruptedException {

		// 将values作为排序的次序。将map拍好序的key作为reduce的value输出
		for (Text val : values) {
			context.write(num, key);
			num = new IntWritable(num.get() + 1);
		}
	}
}


public class SortMain {
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = new Job(conf);
		job.setJarByClass(SortMain.class);

		job.setMapperClass(SortMap.class);
		job.setMapOutputKeyClass(IntWritable.class);
		job.setMapOutputValueClass(Text.class);

		job.setReducerClass(SortRedu.class);
		job.setOutputKeyClass(IntWritable.class);
		job.setOutputValueClass(IntWritable.class);

		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		job.waitForCompletion(true);
	}
}



 

相关内容