hadoop编程小技巧(1)---map端聚合,hadoop---map


测试hadoop版本:2.4 

Map端聚合的应用场景:当我们只关心所有数据中的部分数据时,并且数据可以放入内存中。

使用的好处:可以大大减小网络数据的传输量,提高效率;

一般编程思路:在Mapper的map函数中读入所有数据,然后添加到一个List(队列)中,然后在cleanup函数中对list进行处理,输出我们关系的少量数据。

实例:

在map函数中使用空格分隔每行数据,然后把每个单词添加到一个堆栈中,在cleanup函数中输出堆栈中单词次数比较多的单词以及次数;

package fz.inmap.aggregation;

import java.io.IOException;
import java.util.ArrayList;
import java.util.PriorityQueue;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class InMapArrgegationDriver extends Configured implements Tool{
	public static Logger log = LoggerFactory.getLogger(InMapArrgegationDriver.class);
	/**
	 * @throws Exception 
	 * 
	 */
	public static void main(String[] args) throws Exception {
		ToolRunner.run(new Configuration(), new InMapArrgegationDriver(),args);
	}

	@Override
	public int run(String[] arg0) throws Exception {
		if(arg0.length!=3){
			System.err.println("Usage:\nfz.inmap.aggregation.InMapArrgegationDriver <in> <out> <maxNum>");
			return -1;
		}
		Configuration conf = getConf();
		
//		System.out.println(conf.get("fs.defaultFS"));
		Path in = new Path(arg0[0]);
		Path out= new Path(arg0[1]);
		out.getFileSystem(conf).delete(out, true);
		conf.set("maxResult", arg0[2]);
		Job job = Job.getInstance(conf,"in map arrgegation job");
		job.setJarByClass(getClass());
		
		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class);
		
		job.setMapperClass(InMapMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
//		job.setOutputKeyClass(LongWritable.class);
//		job.setOutputValueClass(VectorWritable.class);
		job.setNumReduceTasks(0);
//		System.out.println(job.getConfiguration().get("mapreduce.job.reduces"));
//		System.out.println(conf.get("mapreduce.job.reduces"));
		FileInputFormat.setInputPaths(job, in);
		FileOutputFormat.setOutputPath(job, out);
		
		return job.waitForCompletion(true)?0:-1;
	}
	
	protected static class InMapMapper extends Mapper<LongWritable,Text,Text,IntWritable>{
		private ArrayList<Word> words = new ArrayList<Word>();
		private PriorityQueue<Word> queue;
		private int maxResult;
		
		protected void setup(Context cxt){
			maxResult = cxt.getConfiguration().getInt("maxResult", 10);
		}
		
		protected void map(LongWritable key, Text value,Context cxt){
			String  [] line = value.toString().split(" "); // use blank to split
			for(String word:line){
				Word curr = new Word(word,1);
				if(words.contains(curr)){
					// increase the exists word's frequency
					for(Word w:words){
						if(w.equals(curr)){
							w.frequency++;
							break;
						}
					}
				}else{
					words.add(curr);
				}
			}
		}
		protected void cleanup(Context cxt) throws InterruptedException,IOException{
			Text outputKey = new Text();
			IntWritable outputValue = new IntWritable();
			
			queue = new PriorityQueue<Word>(words.size());
			queue.addAll(words);
			for(int i=0;i< maxResult;i++){
				Word tail = queue.poll();
				if(tail!=null){
					outputKey.set(tail.value);
					outputValue.set(tail.frequency);
					log.info("key is {},value is {}", outputKey,outputValue);
					cxt.write(outputKey, outputValue);
					
				}
			}
		}
	}

}

使用到的Word类

package fz.inmap.aggregation;

public class Word implements Comparable<Word>{

	public String value;
	public int frequency;
	
	public Word(String value,int frequency){
		this.value=value;
		this.frequency=frequency;
	}
	@Override
	public int compareTo(Word o) {
		return o.frequency-this.frequency;
	}
	@Override
	public boolean equals(Object obj){
		if(obj instanceof Word){
			return value.equalsIgnoreCase(((Word)obj).value);
		}else{
			return false;
		}
	}
}

查看输出结果,可以看日志(由于在程序中输出了日志,所以在日志中也可以查看到);


或者查看输出结果:



总结:使用map端聚合,虽然可以大大减小网络数据传输量,提高效率,但是我们在应用的时候还是需要考虑实际的应用环境。比如,如果使用上面的算法来计算最大单词频率的前10个,然后还是使用上面的代码,就会有问题。每个mapper会处理并输出自己的单词词频最大的10个单词,并没有考虑到所有数据,这样在reducer端整合的时候就会可能会忽略部分数据,造成最终结果的错误。



分享,成长,快乐

转载请注明blog地址:http://blog.csdn.net/fansy1990





相关内容

    暂无相关文章