Hadoop读书笔记(十二)MapReduce自定义排序,hadoopmapreduce


Hadoop读书笔记系列文章:http://blog.csdn.net/caicongyang/article/category/2166855

1.说明:

对给出的两列数据首先按照第一列升序排列,当第一列相同时,第二列升序排列

数据格式:

3	3
3	2
3	1
2	2
2	1
1	1


2.代码

SortApp.java
package sort;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;

/**
 * 
 * <p> 
 * Title: SortApp.java 
 * Package sort 
 * </p>
 * <p>
 * Description: 当第一列不同时,升序;当第一列相同时,第二列升序
 * <p>
 * @author Tom.Cai
 * @created 2014-12-2 下午10:29:14 
 * @version V1.0 
 *
 */
public class SortApp {
	private static final String INPUT_PATH = "hdfs://192.168.80.100:9000/sort_input";
	private static final String OUT_PATH = "hdfs://192.168.80.100:9000/sort_out";
	
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
		if(fileSystem.exists(new Path(OUT_PATH))){
			fileSystem.delete(new Path(OUT_PATH),true);
		}
		Job job = new Job(conf,SortApp.class.getSimpleName());
		//1.1 指定输入文件路径
		FileInputFormat.setInputPaths(job, INPUT_PATH);
		//指定哪个类用来格式化输入文件
		job.setInputFormatClass(TextInputFormat.class);
		
		//1.2指定自定义的Mapper类
		job.setMapperClass(MyMapper.class);
		//指定输出<k2,v2>的类型
		job.setMapOutputKeyClass(newK2.class);
		job.setMapOutputValueClass(LongWritable.class);
		
		//1.3 指定分区类
		job.setPartitionerClass(HashPartitioner.class);
		job.setNumReduceTasks(1);
		
		//1.4 TODO 排序、分区
		
		//1.5  TODO (可选)合并
		
		//2.2 指定自定义的reduce类
		job.setReducerClass(MyReducer.class);
		//指定输出<k3,v3>的类型
		job.setOutputKeyClass(LongWritable.class);
		job.setOutputValueClass(LongWritable.class);
		
		//2.3 指定输出到哪里
		FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
		//设定输出文件的格式化类
		job.setOutputFormatClass(TextOutputFormat.class);
		
		//把代码提交给JobTracker执行
		job.waitForCompletion(true);
		
		
		
		
	}
	
	static class MyMapper extends Mapper<LongWritable,Text, newK2,LongWritable>{

		@Override
		protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
			String[] splied = value.toString().split("\t");
			newK2 k2 = new newK2(Long.parseLong(splied[0]),Long.parseLong(splied[1]));
			final LongWritable v2 = new LongWritable(Long.parseLong(splied[1]));
			context.write(k2, v2);
		}
		
	}
	
	
	static class MyReducer extends Reducer<newK2, LongWritable, LongWritable, LongWritable>{

		@Override
		protected void reduce(sort.SortApp.newK2 key, Iterable<LongWritable> value, Context context) throws IOException, InterruptedException {
			context.write(new LongWritable(key.first), new LongWritable(key.second));
			}
	}
	
	
	static class newK2 implements WritableComparable<newK2>{
		Long first;
		Long second;
		
		
		public newK2(long first, long second) {
			this.first = first;
			this.second = second;
		}

		public newK2() {
		}

		@Override
		public void readFields(DataInput input) throws IOException {
			this.first = input.readLong();
			this.second = input.readLong();
		}

		@Override
		public void write(DataOutput out) throws IOException {
			out.writeLong(first);
			out.writeLong(second);
		}
		/**
		 * 
		 * 
		 * 当第一列不同时,升序;当第一列相同时,第二列升序
		 */
		 
		@Override
		public int compareTo(newK2 o) {
			long temp = this.first -o.first;
			if(temp!=0){
				return (int)temp;
			}
			return (int)(this.second -o.second);
		}

		@Override
		public int hashCode() {
			return this.first.hashCode()+this.second.hashCode();
		}

		@Override
		public boolean equals(Object obj) {
			if(!(obj instanceof newK2)){
				return false;
			}
			newK2 k2 = (newK2)obj;
			return(this.first == k2.first)&&(this.second == k2.second);
		}
	}

}


欢迎大家一起讨论学习!

有用的自己收!

记录与分享,让你我共成长!欢迎查看我的其他博客;我的博客地址:http://blog.csdn.net/caicongyang




相关内容