hadoop编程小技巧(9)---二次排序(值排序),hadoop小技巧


代码测试环境:Hadoop2.4

应用场景:在Reducer端一般是key排序,而没有value排序,如果想对value进行排序,则可以使用此技巧。

应用实例描述:

比如针对下面的数据:

a,5
b,7
c,2
c,9
a,3
a,1
b,10
b,3
c,1
如果使用一般的MR的话,其输出可能是这样的:

a	1
a	3
a	5
b	3
b	10
b	7
c	1
c	9
c	2
从数据中可以看到其键是排序的,但是其值不是。通过此篇介绍的技巧可以做到下面的输出:

a	1
a	3
a	5
b	3
b	7
b	10
c	1
c	2
c	9
这个数据就是键和值都是排序的了。

二次排序原理:
1)自定义键类型,把值放入键中;

2)利用键的排序特性,可以顺便把值也排序了;

3)这时会有两个问题:

a. 数据传输到不同的Reducer会有异常;

b. 数据在Reducer中的分组不同;

针对这两个问题,需要使用自定义Partitioner、使用自定义GroupComparator来定义相应的逻辑;

实例:

driver类:

package fz.secondarysort;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class SortDriver extends Configured implements Tool{

	/**
	 * @param args
	 * @throws Exception 
	 */
	public static void main(String[] args) throws Exception {
		// TODO Auto-generated method stub
		ToolRunner.run(new Configuration(), new SortDriver(),args);
	}

	@Override
	public int run(String[] args) throws Exception {
		Job job1 = Job.getInstance(getConf(), "secondary sort ");
		job1.setJarByClass(getClass());
		if(args.length!=5){
			System.err.println("Usage: <input> <output> <numReducers> <useSecondarySort> <useGroupComparator>");
			System.exit(-1);
		}
		Path out = new Path(args[1]);
		out.getFileSystem(getConf()).delete(out, true);
		FileInputFormat.setInputPaths(job1, new Path(args[0]));
		FileOutputFormat.setOutputPath(job1, out);
		if("true".equals(args[3])||"false".equals(args[3])){
			if("true".equals(args[3])){ // 使用二次排序
				job1.setMapperClass(Mapper1.class);
				job1.setReducerClass(Reducer1.class);
				job1.setMapOutputKeyClass(CustomKey.class);
				job1.setMapOutputValueClass(NullWritable.class);
				job1.setOutputKeyClass(CustomKey.class);
				job1.setOutputValueClass(NullWritable.class);
				if("true".equals(args[4])){
					job1.setGroupingComparatorClass(CustomGroupComparator.class);
				}else if("false".equals(args[4])){
					// do nothing 
				}else{
					System.err.println("Wrong Group Comparator argument!");
					System.exit(-1);
				}
				job1.setPartitionerClass(CustomPartitioner.class);
			}else{  // 不使用二次排序
				job1.setMapperClass(Mapper2.class);
				job1.setReducerClass(Reducer2.class);
				job1.setMapOutputKeyClass(Text.class);
				job1.setMapOutputValueClass(IntWritable.class);
				job1.setOutputKeyClass(Text.class);
				job1.setOutputValueClass(IntWritable.class);
			}
		}else{
			System.err.println("The fourth argument should be 'true' or 'false'");
			System.exit(-1);
		}
		job1.setInputFormatClass(TextInputFormat.class);
		job1.setOutputFormatClass(TextOutputFormat.class);
		
		job1.setNumReduceTasks(Integer.parseInt(args[2]));
		
		boolean job1success = job1.waitForCompletion(true);
		if(!job1success) {
			System.out.println("The CreateBloomFilter job failed!");
			return -1;
		}
		return 0;
	}
	

}

mapper1(二次排序mapper):

package fz.secondarysort;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
 * 有二次排序mapper
 * @author fansy
 *
 */
public class Mapper1 extends
		Mapper<LongWritable, Text, CustomKey, NullWritable> {
	private String COMMA =",";
	private CustomKey newKey = new CustomKey();
	public void map(LongWritable key,Text value, Context cxt ) throws IOException,InterruptedException{
		String [] values = value.toString().split(COMMA);
		newKey.setSymbol(values[0]);
		newKey.setValue(Integer.parseInt(values[1]));
		cxt.write(newKey, NullWritable.get());
	}
}
Reducer1(二次排序Reducer)

package fz.secondarysort;

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
 * 二次排序Reducer
 * @author fansy
 *
 */
public class Reducer1 extends
		Reducer<CustomKey, NullWritable, CustomKey, NullWritable> {
	private Logger log = LoggerFactory.getLogger(Reducer1.class);
	public void setup(Context cxt){
		log.info("reducer1*********************in setup()");
	}
	public void reduce(CustomKey key ,Iterable<NullWritable> values,Context cxt)throws IOException,InterruptedException{
		log.info("reducer1******* in reduce()");
		for(NullWritable v:values){
			log.info("key:"+key+"-->value:"+v);
			cxt.write(key, v);
		}
		log.info("reducer1****** in reduce() *******end");
	}
}
无排序mapper2

package fz.secondarysort;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
 * 不是二次排序
 * @author fansy
 *
 */
public class Mapper2 extends Mapper<LongWritable, Text, Text, IntWritable> {
	private String COMMA =",";
	public void map(LongWritable key,Text value, Context cxt ) throws IOException,InterruptedException{
		String [] values = value.toString().split(COMMA);
		Text newKey =  new Text(values[0]);
		cxt.write(newKey, new IntWritable(Integer.parseInt(values[1])));
	}
}

无排序Reducer2

package fz.secondarysort;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
 * 无二次排序
 * @author fansy
 *
 */
public class Reducer2 extends Reducer<Text, IntWritable, Text, IntWritable> {
	private Logger log = LoggerFactory.getLogger(Reducer2.class);
	public void setup(Context cxt){
		log.info("reducer2*********************in setup()");
	}
	public void reduce(Text key ,Iterable<IntWritable> values,Context cxt)throws IOException,InterruptedException{
		log.info("reducer2******* in reduce()");
		for(IntWritable v:values){
			log.info("key:"+key+"-->value:"+v);
			cxt.write(key, v);
		}
		log.info("reducer2****** in reduce() *******end");
	}
}

自定义key

package fz.secondarysort;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;
/**
 * symbol 是原始的key
 * value是原始的值
 * @author fansy
 *
 */
public class CustomKey implements WritableComparable<CustomKey> {

	private int value;
	private String symbol;
	
	@Override
	public void write(DataOutput out) throws IOException {
		out.writeInt(this.value);
		out.writeUTF(this.symbol);
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		this.value=in.readInt();
		this.symbol= in.readUTF();
	}

	@Override
	public int compareTo(CustomKey o) {
		int result = this.symbol.compareTo(o.symbol);
		return result!=0 ? result :this.value-o.value;
	}

	@Override
	public String toString(){
		return this.symbol+"\t"+this.value;
	}
	public int getValue() {
		return value;
	}

	public void setValue(int value) {
		this.value = value;
	}

	public String getSymbol() {
		return symbol;
	}

	public void setSymbol(String symbol) {
		this.symbol = symbol;
	}

}
自定义GroupComparator

package fz.secondarysort;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
/**
 * 只对比symbol,即原始的键
 * @author fansy
 *
 */
public class CustomGroupComparator extends WritableComparator {

	protected CustomGroupComparator(){
		super(CustomKey.class,true);
	}
	@SuppressWarnings("rawtypes")
	@Override
	public int compare(WritableComparable a,WritableComparable b){
		CustomKey ak = (CustomKey) a;
		CustomKey bk = (CustomKey) b;
		return ak.getSymbol().compareTo(bk.getSymbol());
	}
}

自定义Partitioner

package fz.secondarysort;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
/**
 * 保持原始分组条件
 * @author fansy
 *
 * @param <K1>
 * @param <V1>
 */
public class CustomPartitioner<K1, V1> extends Partitioner<K1, V1> {

	@Override
	public int getPartition(K1 key, V1 value, int numPartitions) {
		CustomKey keyK= (CustomKey) key;
		Text tmpValue =new  Text(keyK.getSymbol());
		return (tmpValue.hashCode() & Integer.MAX_VALUE)%numPartitions;
	}

}


结果查看:

不使用二次排序


使用二次排序,同时使用自定义组分类器:


可以看到不管二次排序和非二次排序,在Reducer端都只有三个分组;同时二次排序的其值也是排序的;

如果在二次排序中不使用组分类器,那么会得到下面的结果:

从这个结果可以看到有大于3个分组,这样的结果可能是有问题的(对于键是a的分组 整合不了数据);

同时上面使用了自定义的Partitioner,这里看不出区别是因为只有一个Reducer,如果有多个就可以看出差别。


总结:使用二次排序可以对不单单是键排序,同时可以对值进行排序,即在Reducer每个组中接收的value值是排序的,这样在某些操作中是可以增加效率的。


分享,成长,快乐

转载请注明blog地址:http://blog.csdn.net/fansy1990




hadoop的mapreduce常见算法案例有几种

基本MapReduce模式

计数与求和
问题陈述:
有许多文档,每个文档都有一些字段组成。需要计算出每个字段在所有文档中的出现次数或者这些字段的其他什么统计值。例如,给定一个log文件,其中的每条记录都包含一个响应时间,需要计算出平均响应时间。
解决方案:
让我们先从简单的例子入手。在下面的代码片段里,Mapper每遇到指定词就把频次记1,Reducer一个个遍历这些词的集合然后把他们的频次加和。

1 class Mapper
2 method Map(docid id, doc d)
3 for all term t in doc d do
4 Emit(term t, count 1)
5
6 class Reducer
7 method Reduce(term t, counts [c1, c2,...])
8 sum = 0
9 for all count c in [c1, c2,...] do
10 sum = sum + c
11 Emit(term t, count sum)

这种方法的缺点显而易见,Mapper提交了太多无意义的计数。它完全可以通过先对每个文档中的词进行计数从而减少传递给Reducer的数据量:

1 class Mapper
2 method Map(docid id, doc d)
3 H = new AssociativeArray
4 for all term t in doc d do
5 H{t} = H{t} + 1
6 for all term t in H do
7 Emit(term t, count H{t})

如果要累计计数的的不只是单个文档中的内容,还包括了一个Mapper节点处理的所有文档,那就要用到Combiner了:

1 class Mapper
2 method Map(docid id, doc d)
3 for all term t in doc d do
4 Emit(term t, count 1)
5
6 class Combiner
7 method Combine(term t, [c1, c2,...])
8 sum = 0
9 for all count c in [c1, c2,...] do
10 sum = sum + c
11 Emit(term t, count sum)
12
13 class Reducer
14 method Reduce(term t, counts [c1, c2,...])
15 sum = 0
16 for all count c in [c1, c2,...] do
17 sum = sum + c
18 Emit(term t, count sum)

应用:Log 分析, 数据查询

整理归类

问题陈述:
有一系列条目,每个条目都有几个属性,要把具有同一属性值的条目都保存在一个文件里,或者把条目按照属性值分组。 最典型的应用是倒排索引。
解决方案:
解决方案很简单。 在 Mapper 中以每个条......余下全文>>
 

C++题或代码

9、大数的乘法
Time Limit:1000MS Memory Limit:32768KDescription:给出一些整数对,一个为可能接近100位的大数,另一个为1位数,求这些数对的乘积。 Sample Input:1 1123 012345678910 8123456789101234567891012345678910 7Sample Output:1098765431280864197523708641975237086419752370 十、按长度排序
Time Limit:1000MS Memory Limit:32768KDescription:是一些整数,它的结构是:第一行为一个整数N,表示后面有N个整数需要排序输出,先按长度排,如长度一样则按大小排,若遇到N等于零,则运行结束。 Sample Input:31231233332100000000000000010Sample Output:121233333 11000000000000000 十一、排列学生成绩
Time Limit:1000MS Memory Limit:32768KDescription:一些向量数据,表示许多学生的各门成绩。编程,从中挑出平均分数在60分以上的学生进行排序输出。 Sample Input:Zhangsan 90 85 77 25 63Lisi 58 73 66 85 90Wangwu 70 80 90 51 52Qianliu 80 63 58 52 50Chenqi 90 28 38 48 58Zhouba 40 70 70 33 55Sample Output:1 Lisi2 Wangwu3 Zhangsan 12、十二、倒置排序
Time Limit:1000MS Memory Limit:32768KDescription:将一些整数按倒置值排序后输出. 所谓倒置,是指把整数各位倒过来构成一个新数,例如:13倒置成了31. Input:第一行的整数N表示后面列出的组数。每组数的第一个整数n表示后面将有n个整数。(每组数据量不超80) Output:将每组数按倒置值进行排序输出.其每组数的结果占一行.行末无空格. Sample Input:24 83 13 24 364 99 100 123 12345Sample Output:13 83 24 36100 99 123 12345 十三、绩点计算
Time Limit:1000MS Memory Limit:32768KDescription:有一些班级学生的三门课程成绩,编程计算其绩点,并按绩点分数的高低排列。绩点计算公式为:(成绩小于60分,则该门课程的绩点为0) [(课程1 – 50)÷10×3 +(课程2 – 50)÷10×3 +(课程3 – 50)÷10×4]÷10 Sample Input:张三 89 62 71李四 98 50 80王五 67 88 91Sample Output:王五 67 88 91李四 98 50 80张三 89 62 71 十四、按日期排序
Time Limit:1000MS Memory Limit:32768KDescription:有一些日期,日期格式为“MM/DD/YYYY”。编程将其按日期大小排列。 Sample Input:12/31/200510/21/200302/12/200415/12/19......余下全文>>
 

相关内容

    暂无相关文章