Hadoop读书笔记(十三)MapReduce中Top算法,hadoopmapreduce


Hadoop读书笔记系列文章:http://blog.csdn.net/caicongyang/article/category/2166855

1.说明:

从给定的文件中的找到最大值

2.代码:

TopApp.java

package suanfa;

import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.TestMiniMRClientCluster.MyReducer;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * <p> 
 * Title: TopApp.java 
 * Package suanfa 
 * </p>
 * <p>
 * Description: 从算1000w个数据中找到最大值
 * <p>
 * @author Tom.Cai
 * @created 2014-12-2 下午10:28:33 
 * @version V1.0 
 *
 */
public class TopApp {
	private static final String INPUT_PATH = "hdfs://192.168.80.100:9000/top_input";
	private static final String OUT_PATH = "hdfs://192.168.80.100:9000/top_out";
	
	
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
		final Path outPath = new Path(OUT_PATH);
		if(fileSystem.exists(outPath)){
			fileSystem.delete(outPath, true);
		}
		
		final Job job = new Job(conf , TopApp.class.getSimpleName());
		FileInputFormat.setInputPaths(job, INPUT_PATH);
		job.setMapperClass(MyMapper.class);
		job.setReducerClass(MyReducer.class);
		job.setOutputKeyClass(LongWritable.class);
		job.setOutputValueClass(NullWritable.class);
		FileOutputFormat.setOutputPath(job, outPath);
		job.waitForCompletion(true);
	}
	
	static class MyMapper extends  Mapper<LongWritable, Text, LongWritable, NullWritable>{
		long max = Long.MAX_VALUE;
		@Override
		protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
			long temp = Long.parseLong(value.toString());
			if(temp>max){
				max = temp;
			}
		}

		@Override
		protected void cleanup(Context context) throws IOException, InterruptedException {
			context.write(new LongWritable(max), NullWritable.get());
		}
	}
	
	
	static class MyReduceR extends  Reducer<LongWritable, NullWritable, LongWritable, NullWritable>{
		long max = Long.MAX_VALUE;
		@Override
		protected void reduce(LongWritable key, Iterable<NullWritable> value, Context context) throws IOException, InterruptedException {
			long temp = Long.parseLong(value.toString());
			if(temp>max){
				max = temp;
			}
		}
		
		@Override
		protected void cleanup(Context context) throws IOException, InterruptedException {
			context.write(new LongWritable(max), NullWritable.get());
		}
		
	}
}


欢迎大家一起讨论学习!

有用的自己收!

记录与分享,让你我共成长!欢迎查看我的其他博客;我的博客地址:http://blog.csdn.net/caicongyang


相关内容