MapReduce处理二次排序(分区-排序-分组),mapreduce二次


MapReduce二次排序原理

在map阶段,使用job.setInputFormatClass定义的InputFormat将输入的数据集分割成小数据块splites,同时InputFormat提供一个RecordReader的实现。
本例子中使用的时TextInputFormat,他提供的RecordReader会将文本的字节偏移量作为key,这一行的文本作为value。
这就是自定义Map的输入是<LongWritable,Text>的原因,然后调用自定义的Map的map方法,将一个个<LongWritable,Text>对输入的给Map的map方法。
注意输出应该符合自定义Map中定义的输出<IntPair,IntWritable>.最终是生成一个List<IntPair,IntWritable>,在map阶段的最后,会先调用job.setPartitionerClass对这个List进行分区,每个分区映射到一个reducer。每个分区内又调用job.setSortComparatorClass设置的key比较函数类排序。可以看到,这本身就是一个二次排序。如果没有通过job.setSortComparatorClass设置Key比较函数类,则使用key的实现的compareTo方法。

在reduce阶段,reducer接收到所有映射到这个reducer的map输出后,也会调用job.setSortComparatorClass设置的key比较函数类对所有数据对排序,然后开始构造一个key对应的value迭代器,这是就要用到分组,使用job.setGroupingComparatorClass设置的分组函数类。只要这个比较器比较的两个key相同,他们就属于同一个组,他们的value就放在一个value迭代器,而这个迭代器的key使用属于同一个组的所有key的第一个key。最后就是进入Reducer的reduce方法,reduce方法的输入是所有的(key和他的value迭代器)。同样注意输入与输出的类型必须与自定义的reducer中声明的一致。

 

核心总结
1.map最后阶段进行partition分区。一般使用job.setPartitionerClass设置的类,如果没有自定义的类,用key的hashcode()方法进行排序
2.每个分区内部调用job.setSortComparatorClass设置Key的比较函数类进行排序,如果没有则使用key的实现的compareTo方法。
3.当reduce接收到所有map传输过来的数据之后,调job.setSortComparatorClass设置的key比较函数类对所有数据对排序,如果没有则使用key的实现的compareTo方法
4.紧接着使用job.setGroupingComparatorClass设置的分组函数类,进行分组,同一个key的value放在一个迭代器里面

分区 --->  排序(二次)  --->  分组
分区默认的是key的hashcode()
排序默认的实key的compareTo()

-----------------------------------------

job.setPartitionerClass(Partitioner p); //设置分区。默认分区时hashcode()
job.setSortComparatorClass(RawComparator c);  //比较排序。shuffle阶段map输出之后,reduce之前。默认是key的compareTo()方法
job.setGroupingComparatorClass(RawComparator c); //分组。Reduce阶段

-----------------------------------------

案例

原始数据

2 12:12:34 2_hao123
3 09:10:34 3_baidu
1 15:02:41 1_google
3 22:11:34 3_sougou
1 19:23:23 1_baidu
2 13:56:60 2_soso

分别依据第一列和第二列对数据进行二次排序

1.分区类

package test.mr.seconderysort;

import org.apache.hadoop.io.Text;

/*
 * 分区类
 */
public class Partitioner extends
		org.apache.hadoop.mapreduce.Partitioner<StringPart, Text> {

	@Override
	public int getPartition(StringPart key, Text value, int numPartitions) {
		// TODO Auto-generated method stub
		return Math.abs(key.hashCode()) % numPartitions;
	}

}


 

2.自定义Map输出的key类,将原始数据要排序的两列作为该JavaBean的属性,实现WritableComparable接口,实现CompareTo()排序方法

Ps:WritableComparatable接口中的CompareTo()方法:在这个方法中,如果返回-1,则当前对象排前面,返回1,就排后面 ,0,就相等。

String类中的CompareTo()方法:

/*
  * compareTo()的返回值是整型,它是先比较对应字符的大小(ASCII码顺序),如果第一个字符和参数的第一个字符不等,结束比较,返回他们之间的差值,如果第一个字符和参数的第一个字符相等,则以第二个字符和参数的第二个字符做比较,以此类推,直至比较的字符或被比较的字符有一方全比较完,这时就比较字符的长度.
  *
  * 例:  String s1 = "abc";
  *     String s2 = "abcd";
  *     String s3 = "abcdfg";
  *     String s4 = "1bcdfg";
  *     String s5 = "cdfg";
  *     System.out.println( s1.compareTo(s2) );// -1 (前面相等,s1长度小1)
  *     System.out.println( s1.compareTo(s3) ); //-3 (前面相等,s1长度小3)
  *     System.out.println( s1.compareTo(s4) ); //48("a"的ASCII码是97,"1"的的ASCII码是49,所以返回48)
  *     System.out.println( s1.compareTo(s5) ); // -2 ("a"的ASCII码是97,"c"的ASCII码是99,所以返回-2)
  */

package test.mr.seconderysort;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

/*
 * 自定义key
 */
/*
 *如果想对自己写的类排序,你就把自己写的这个类实现Comparable接口
 *然后写一个comparaTo方法来规定这个类的对象排序的顺序。
 *在这个方法中,如果返回-1,则当前对象排前面,返回1,就排后面 ,0,就相等
 */
public class StringPart implements WritableComparable<StringPart> {
	/*
	 * 两列排序
	 */
	private String first;
	private String second;

	public String getFirst() {
		return first;
	}

	public void setFirst(String first) {
		this.first = first;
	}

	public String getSecond() {
		return second;
	}

	public void setSecond(String second) {
		this.second = second;
	}

	public StringPart() {
		super();
		// TODO Auto-generated constructor stub
	}

	public StringPart(String first, String second) {
		super();
		this.first = first;
		this.second = second;
	}

	@Override
	public void write(DataOutput out) throws IOException {
		out.writeUTF(first);
		out.writeUTF(second);

	}

	@Override
	public void readFields(DataInput in) throws IOException {
		this.first = in.readUTF();
		this.second = in.readUTF();
	}

	/*
	 * 排序
	 */
	/*
	 * compareTo()的返回值是整型,它是先比较对应字符的大小(ASCII码顺序),如果第一个字符和参数的第一个字符不等,结束比较,返回他们之间的
	 * 
	 * 差值,如果第一个字符和参数的第一个字符相等,则以第二个字符和参数的第二个字符做比较,以此类推,直至比较的字符或被比较的字符有一方
	 * 
	 * 全比较完,这时就比较字符的长度.
	 * 
	 * 例:  String s1 = "abc";
	 *     String s2 = "abcd"; 
	 *     String s3 = "abcdfg"; 
	 *     String s4 = "1bcdfg"; 
	 *     String s5 = "cdfg"; 
	 *     System.out.println( s1.compareTo(s2) );// -1 (前面相等,s1长度小1) 
	 *     System.out.println( s1.compareTo(s3) ); //-3 (前面相等,s1长度小3) 
	 *     System.out.println( s1.compareTo(s4) ); //48("a"的ASCII码是97,"1"的的ASCII码是49,所以返回48) 
	 *     System.out.println( s1.compareTo(s5) ); // -2 ("a"的ASCII码是97,"c"的ASCII码是99,所以返回-2)
	 */
	@Override
	public int compareTo(StringPart o) {
		if (!this.first.equals(o.getFirst())) {
			return first.compareTo(o.getFirst()); // 字符串的compareTo()方法
		} else {
			if (!this.second.equals(o.getSecond())) {
				return second.compareTo(o.getSecond());
			} else {
				return 0;
			}
		}
	}

	@Override
	public int hashCode() {
		final int prime = 31;
		int result = 1;
		result = prime * result + ((first == null) ? 0 : first.hashCode());
		result = prime * result + ((second == null) ? 0 : second.hashCode());
		return result;
	}

	@Override
	public boolean equals(Object obj) {
		if (this == obj)
			return true;
		if (obj == null)
			return false;
		if (getClass() != obj.getClass())
			return false;
		StringPart other = (StringPart) obj;
		if (first == null) {
			if (other.first != null)
				return false;
		} else if (!first.equals(other.first))
			return false;
		if (second == null) {
			if (other.second != null)
				return false;
		} else if (!second.equals(other.second))
			return false;
		return true;
	}

}


 

3.分组类(根据原始数据的第一列进行分组)

package test.mr.seconderysort;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

/*
 * 实现分组
 */
public class Grouping extends WritableComparator {

	protected Grouping() {
		super(StringPart.class, true);
	}

	@Override
	public int compare(WritableComparable a, WritableComparable b) {
		StringPart a1 = (StringPart) a;
		StringPart b1 = (StringPart) b;
		// 只需要比较a1,b1的first字段即认为他们是否属于同组
		return a1.getFirst().compareTo(b1.getFirst());
	}

}


 

4.Map类
package test.mr.seconderysort;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class SeconderyMap extends Mapper<LongWritable, Text, StringPart, Text> {
	@Override
	protected void map(LongWritable key, Text value,
			Mapper<LongWritable, Text, StringPart, Text>.Context context)
			throws IOException, InterruptedException {
		String line = value.toString();
		String[] str = line.split("\t");
		if (str.length == 3) {
			StringPart temp = new StringPart(str[0], str[1]);
			context.write(temp, value);
		}
	}

}


5.Reduce类

package test.mr.seconderysort;

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class SeconderyRedu extends
		Reducer<StringPart, Text, NullWritable, Text> {

	private static Text part = new Text("------------");

	@Override
	protected void reduce(StringPart key, Iterable<Text> values,
			Reducer<StringPart, Text, NullWritable, Text>.Context context)
			throws IOException, InterruptedException {
		context.write(NullWritable.get(), part);
		for (Text t : values) {
			context.write(NullWritable.get(), t);
		}
	}
}


 

6.job类

package test.mr.seconderysort;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class SeconderyMain {
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = new Job(conf);
		job.setJarByClass(SeconderyMain.class);

		job.setGroupingComparatorClass(Grouping.class);
		job.setPartitionerClass(Partitioner.class);

		job.setMapperClass(SeconderyMap.class);
		job.setMapOutputKeyClass(StringPart.class);
		job.setMapOutputValueClass(Text.class);

		job.setReducerClass(SeconderyRedu.class);
		job.setOutputKeyClass(NullWritable.class);
		job.setOutputValueClass(Text.class);

		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		job.waitForCompletion(true);
	}
}


 

相关内容