MapReduce TopK 文件,mapreducetopk文件


问题描述:对于每日访问google 的ip做个记录 对应计算出当天前K个访问次数最多的ip地址。

对应此问题 先自定制一个ip格式的数据类型 继承WritableComparable接口。

package reverseIndex;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;

public class ipAndcount implements WritableComparable<ipAndcount>{
	private Text ip;
	private IntWritable count;
	public ipAndcount(){
		this.ip = new Text("");
		this.count = new IntWritable(1);
	}
	public ipAndcount(Text ip,IntWritable count){
		this.ip =ip;
		this.count = count;
	}
	@Override
	public void readFields(DataInput input) throws IOException {
		// TODO Auto-generated method stub
		ip.readFields(input);
		count.readFields(input);
		
	}
	@Override
	public void write(DataOutput output) throws IOException {
		// TODO Auto-generated method stub
		ip.write(output);
		count.write(output);
	}
	@Override
	public int compareTo(ipAndcount o) {
		// TODO Auto-generated method stub
		return ((ipAndcount)o).count.compareTo(count)==0?ip.compareTo(((ipAndcount)o).ip)
				:((ipAndcount)o).count.compareTo(count);
	}
	public boolean equals(ipAndcount o){
		if(!(o instanceof ipAndcount)){
			return false;
		}
		ipAndcount other = (ipAndcount)o;
		return ip.equals(other.ip) &&(count.equals(other.count));
	}
	public String toString(){
		StringBuffer buf = new StringBuffer("IP=");
		buf.append(ip.toString());
		buf.append(",Count=");
		buf.append(count.toString());
		buf.append(";");
		return buf.toString();
	}
	public Text getIp(){
		return ip;
	}
	public IntWritable getCount(){
		return count;
	}
	public void setCount(IntWritable count){
		this.count = count;
	}
}

此问题 应该分为俩个作业进行完成,一个用于统计IP及其整合的数量(类似WordCount)另一个用于选择出前K个进行输出:

package reverseIndex;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;


//分为2个作业进行 完成 一个 用于统计每日的访问ip 另一个用于选择出前K个 访问高的ip
public class firstK {
	
	public static class FindIpMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
		private IntWritable one = new IntWritable(1);
		public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{
			context.write(value,one);
		}
	}
	public static class IpReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
		public void reduce(Text key,Iterable<IntWritable>values,Context context) throws IOException, InterruptedException{
			int sum = 0;
			for(IntWritable val : values){
				sum += val.get();
			}
			context.write(key, new IntWritable(sum));
		}
	}
	public static class beforeSortIpmapper extends Mapper<Text,Text,ipAndcount,Text>{
		public void map(Text key,Text value,Context context) throws IOException, InterruptedException{
			ipAndcount tmp = new ipAndcount(key,new IntWritable(Integer.valueOf(value.toString())));
			context.write(tmp,new Text());
		}
	}
	public static class selectTopKReducer extends Reducer<ipAndcount,Text,ipAndcount,Text>{
		int count = 0;
		int k = 10;
		public void reduce(ipAndcount key,Iterable<Text> values,Context context) throws IOException, InterruptedException{
			if(count<k){
				context.write(key, null);
				count++;
			}
		}
	}
	public static void main(String[] args) throws IOException {
		// TODO Auto-generated method stub
		Configuration conf = new Configuration();
		Job job1 = new Job(conf,"sum ip");
		job1.setJarByClass(firstK.class);
		
		//默认输入输出格式
		job1.setInputFormatClass(TextInputFormat.class);
		job1.setOutputFormatClass(TextOutputFormat.class);
		
		//读取文件路径 和输出路径
		Path in = new Path(args[0]);
		Path out = new Path(args[1]);
		
		FileInputFormat.addInputPath(job1,in);
		FileOutputFormat.setOutputPath(job1,out);
	
		//设置map的输入输出格式
		job1.setMapOutputKeyClass(Text.class);
		job1.setMapOutputValueClass(IntWritable.class);
		job1.setOutputKeyClass(Text.class);
		job1.setOutputValueClass(IntWritable.class);
		//设置处理类
		job1.setMapperClass(FindIpMapper.class);
		job1.setReducerClass(IpReducer.class);
		//reduce任务个数
		job1.setNumReduceTasks(7);
		
		//作业2的配置
		Configuration conf2 = new Configuration();
		Job job2 = new Job(conf2,"select K");
		job1.setJarByClass(firstK.class);
		job1.setInputFormatClass(KeyValueTextInputFormat.class);
		job1.setOutputFormatClass(TextOutputFormat.class);
		
		Path in2 = new Path(args[1]);
		Path out2 = new Path(args[2]);
		FileInputFormat.addInputPath(job2,in2);
		FileOutputFormat.setOutputPath(job2,out2);
		
		job1.setMapOutputKeyClass(ipAndcount.class);
		job1.setMapOutputValueClass(Text.class);
		job1.setOutputKeyClass(ipAndcount.class);
		job1.setOutputValueClass(Text.class);
		job1.setMapperClass(beforeSortIpmapper.class);
		job1.setReducerClass(selectTopKReducer.class);
		job1.setNumReduceTasks(1);
		
		//作业的关联性  使用jobcontrol进行处理
		JobControl jc = new JobControl("select k ip");
		
		ControlledJob cjob1 = new ControlledJob(conf);
		cjob1.setJob(job1);
		ControlledJob cjob2 = new ControlledJob(conf2);
		cjob2.setJob(job2);
		
		jc.addJob(cjob1);
		jc.addJob(cjob2);
		//依赖关系
		cjob2.addDependingJob(cjob1);
		jc.run();
	}
}



将之前mapreduce生成的文件作为新一次mapreduce的输入文件,怎划分字段

你第一个job应该使用的是TextOutputFormat,所以输出默认是key-value形式的文本文档,当作为输入之后默认是使用TextOutputFormat,读入的key是每行的偏移量而非上一个job输出时的key,这是需要显示设置第二个job的输入格式为KeyValueInputFormat。
 

mapreduce 输出的文件名可以自己定义

当然可以,通过它的API就可以知道了,有机会搭上环境试下就知道了~
 

相关内容