mapreduce实现"浏览该商品的人大多数还浏览了"经典应用


输入:

日期    ...cookie id.        ...商品id..

xx            xx                        xx

输出:

商品id         商品id列表(按优先级排序,用逗号分隔)

xx                   xx

比如:

id1              id3,id0,id4,id2

id2             id0,id5

整个计算过程分为4步

1、提取原始日志日期,cookie id,商品id信息,按天计算,最后输出数据格式

商品id-0 商品id-1

xx           x x         

这一步做了次优化,商品id-0一定比商品id-1小,为了减少存储,在最后汇总数据转置下即可

reduce做局部排序及排重

 

2、基于上次的结果做汇总,按天计算

商品id-0 商品id-1  关联值(关联值即同时访问这两个商品的用户数)

xx             x x                xx

 

3、汇总最近三个月数据,同时考虑时间衰减,时间越久关联值的贡献越低,最后输出两两商品的关联值(包括转置后)

 

4、行列转换,生成最后要的推荐结果数据,按关联值排序生成

 

第一个MR

import java.io.IOException;
import java.util.ArrayList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.log4j.Logger;


/*
 * 输入:原始数据,会有重复
 *日期 cookie 楼盘id
 * 
 * 输出:
 * 日期 楼盘id1 楼盘id2  //楼盘id1一定小于楼盘id2 ,按日期 cookie进行分组
 * 
 */

public class HouseMergeAndSplit {
	
	public static class Partitioner1 extends Partitioner<TextPair, Text> {
		  @Override
		  public int getPartition(TextPair key, Text value, int numParititon) {
					  return Math.abs((new Text(key.getFirst().toString()+key.getSecond().toString())).hashCode() * 127) % numParititon;

		  }
	}
		  public static class Comp1 extends WritableComparator {
			  public Comp1() {
			   super(TextPair.class, true);
			  }
			  @SuppressWarnings("unchecked")
			  public int compare(WritableComparable a, WritableComparable b) {
			   TextPair t1 = (TextPair) a;
			   TextPair t2 = (TextPair) b;
			   int comp= t1.getFirst().compareTo(t2.getFirst());
			   if (comp!=0)
				   return comp;
			   return t1.getSecond().compareTo(t2.getSecond());
			  }
			}
	  public static class TokenizerMapper 
	       extends Mapper<LongWritable, Text, TextPair, Text>{
	    		  Text val=new Text("test");
	    public void map(LongWritable key, Text value, Context context
	                    ) throws IOException, InterruptedException {
                         String s[]=value.toString().split("\001");	    	
	   	     TextPair tp=new TextPair(s[0],s[1],s[4]+s[3]); //thedate cookie city+houseid
	  	     context.write(tp, val);
	    }
	  }
	  
	  public static class IntSumReducer 
	       extends Reducer<TextPair,Text,Text,Text> {
		  private static String comparedColumn[] = new String[3];
		  ArrayList<String> houselist= new ArrayList<String>();
		  private static Text keyv = new Text();
		  
		  private static Text valuev = new Text();
		  static Logger logger = Logger.getLogger(HouseMergeAndSplit.class.getName());
		  
	    public void reduce(TextPair key, Iterable<Text> values, 
	                       Context context
	                       ) throws IOException, InterruptedException {
	    	
	    	houselist.clear();
	    	String thedate=key.getFirst().toString();
	    	String cookie=key.getSecond().toString();  
	       
	    	for (int i=0;i<3;i++)
	    		comparedColumn[i]="";
	    	
	    	//first+second为分组键,每次不同重新调用reduce函数
	    	for (Text val:values)
	    	{
	    
	    		if (thedate.equals(comparedColumn[0]) && cookie.equals(comparedColumn[1])&&  !key.getThree().toString().equals(comparedColumn[2]))
	    		 {
	    			// context.write(new Text(key.getFirst()+" "+key.getSecond().toString()), new Text(key.getThree().toString()+" first"+ " "+comparedColumn[0]+" "+comparedColumn[1]+" "+comparedColumn[2]));
	    			 houselist.add(key.getThree().toString());
	    			 
	    			 comparedColumn[0]=key.getFirst().toString();
	    			   comparedColumn[1]=key.getSecond().toString();
	    			   comparedColumn[2]=key.getThree().toString();
	    			  
	    		 }
	    		   
	    		   if (!thedate.equals(comparedColumn[0])||!cookie.equals(comparedColumn[1]))
	    			
	    			   {
	    			 
	    			 //  context.write(new Text(key.getFirst()+" "+key.getSecond().toString()), new Text(key.getThree().toString()+" second"+ " "+comparedColumn[0]+" "+comparedColumn[1]+" "+comparedColumn[2]));
	    			   houselist.add(key.getThree().toString());
	    			   comparedColumn[0]=key.getFirst().toString();
	    			   comparedColumn[1]=key.getSecond().toString();
	    			   comparedColumn[2]=key.getThree().toString();
	    			   
	    			   }
	    	
	    	
	    		    	
	    	}


	    	
	    	keyv.set(comparedColumn[0]); //日期
	    	//valuev.set(houselist.toString());
	    	//logger.info(houselist.toString());
	    	//context.write(keyv,valuev);
	    	
	    	
	    	for (int i=0;i<houselist.size()-1;i++)
	    	{
	    		for (int j=i+1;j<houselist.size();j++)
	    		{    valuev.set(houselist.get(i)+"	"+houselist.get(j)); //关联的楼盘
	    			context.write(keyv,valuev);
	    		}
	    	} 
	    	
	    }
	  }

	  public static void main(String[] args) throws Exception {
	    Configuration conf = new Configuration();
	    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
	    if (otherArgs.length != 2) {
	      System.err.println("Usage: wordcount <in> <out>");
	      System.exit(2);
	    }
	    
	    FileSystem fstm = FileSystem.get(conf);   
	    Path outDir = new Path(otherArgs[1]);   
	    fstm.delete(outDir, true);
	    
   conf.set("mapred.textoutputformat.separator", "\t"); //reduce输出时key value中间的分隔符
	    Job job = new Job(conf, "HouseMergeAndSplit");
	    job.setNumReduceTasks(4);
	    job.setJarByClass(HouseMergeAndSplit.class);
	    job.setMapperClass(TokenizerMapper.class);
	    
	    job.setMapOutputKeyClass(TextPair.class);
	    job.setMapOutputValueClass(Text.class);
	    // 设置partition
	    job.setPartitionerClass(Partitioner1.class);
	    // 在分区之后按照指定的条件分组
	    job.setGroupingComparatorClass(Comp1.class);
	    // 设置reduce
	    // 设置reduce的输出
	    job.setReducerClass(IntSumReducer.class);
	    job.setOutputKeyClass(Text.class);
	    job.setOutputValueClass(Text.class);
	    //job.setNumReduceTasks(18);
	    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
	    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
	    System.exit(job.waitForCompletion(true) ? 0 : 1);
	  }
}

TextPair

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;

public class TextPair implements WritableComparable<TextPair> {
	private Text first;
	private Text second;
	private Text three;
	public TextPair() {
	  set(new Text(), new Text(),new Text());
	}
	public TextPair(String first, String second,String three) {
	  set(new Text(first), new Text(second),new Text(three));
	}
	public TextPair(Text first, Text second,Text Three) {
	  set(first, second,three);
	}
	public void set(Text first, Text second,Text three) {
	  this.first = first;
	  this.second = second;
	  this.three=three;
	}
	public Text getFirst() {
	  return first;
	}
	public Text getSecond() {
	  return second;
	}
	public Text getThree() {
		  return three;
		}
	public void write(DataOutput out) throws IOException {
	  first.write(out);
	  second.write(out);
	  three.write(out);
	}
	public void readFields(DataInput in) throws IOException {
	  first.readFields(in);
	  second.readFields(in);
	  three.readFields(in);
	}
	public int compareTo(TextPair tp) {
	  int cmp = first.compareTo(tp.first);
	  if (cmp != 0) {
	   return cmp;
	  }
	  cmp= second.compareTo(tp.second);
	  if (cmp != 0) {
		   return cmp;
		  }
	  return three.compareTo(tp.three);
	}
	}


TextPairSecond

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;

public class TextPairSecond implements WritableComparable<TextPairSecond> {
	private Text first;
	private FloatWritable second;
	public TextPairSecond() {
	  set(new Text(), new FloatWritable());
	}
	public TextPairSecond(String first, float second) {
	  set(new Text(first), new FloatWritable(second));
	}
	public TextPairSecond(Text first, FloatWritable second) {
	  set(first, second);
	}
	public void set(Text first, FloatWritable second) {
	  this.first = first;
	  this.second = second;
	}
	public Text getFirst() {
	  return first;
	}
	public FloatWritable getSecond() {
	  return second;
	}
	public void write(DataOutput out) throws IOException {
	  first.write(out);
	  second.write(out);
	}
	public void readFields(DataInput in) throws IOException {
	  first.readFields(in);
	  second.readFields(in);
	}
	public int compareTo(TextPairSecond tp) {
	  int cmp = first.compareTo(tp.first);
	  if (cmp != 0) {
	   return cmp;
	  }
	  return second.compareTo(tp.second);
	}

	}

 

第二个MR

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.log4j.Logger;


/*
 *  统计楼盘之间共同出现的次数
 * 输入:
 * 日期 楼盘1 楼盘2
 * 
 * 输出:
 * 日期 楼盘1 楼盘2 共同出现的次数
 * 
 */

public class HouseCount {
	

	  public static class TokenizerMapper 
	       extends Mapper<LongWritable, Text, Text, IntWritable>{
	    
	
    IntWritable iw=new IntWritable(1);
	    public void map(LongWritable key, Text value, Context context
	                    ) throws IOException, InterruptedException {
	    	
	    
	     context.write(value, iw);
	    }
	  }
	  
	  public static class IntSumReducer 
	       extends Reducer<Text,IntWritable,Text,IntWritable> {
	
		 IntWritable result=new IntWritable();
	    public void reduce(Text key, Iterable<IntWritable> values, 
	                       Context context
	                       ) throws IOException, InterruptedException {
	    	
	    	 int sum=0;
	    	 for (IntWritable iw:values)
	    	 {
	    		 sum+=iw.get();
	    	 }
	    	 result.set(sum);
	     context.write(key, result)	;
	    	
	    }
	  }

	  public static void main(String[] args) throws Exception {
	    Configuration conf = new Configuration();
	    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
	    if (otherArgs.length != 2) {
	      System.err.println("Usage: wordcount <in> <out>");
	      System.exit(2);
	    }
	    
	    FileSystem fstm = FileSystem.get(conf);   
	    Path outDir = new Path(otherArgs[1]);   
	    fstm.delete(outDir, true);
	    
   conf.set("mapred.textoutputformat.separator", "\t"); //reduce输出时key value中间的分隔符
	    Job job = new Job(conf, "HouseCount");
	    job.setNumReduceTasks(2);
	    job.setJarByClass(HouseCount.class);
	    job.setMapperClass(TokenizerMapper.class);
	    
	    job.setMapOutputKeyClass(Text.class);
	    job.setMapOutputValueClass(IntWritable.class);
	
	    // 设置reduce
	    // 设置reduce的输出
	    job.setReducerClass(IntSumReducer.class);
	    job.setOutputKeyClass(Text.class);
	    job.setOutputValueClass(IntWritable.class);
	    //job.setNumReduceTasks(18);
	    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
	    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
	    System.exit(job.waitForCompletion(true) ? 0 : 1);
	  }
}


第三个MR

import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.log4j.Logger;


/*
 * 汇总近三个月统计楼盘之间共同出现的次数,考虑衰减系数, 并最后a b 转成 b a输出一次
 * 输入:
 * 日期  楼盘1 楼盘2 共同出现的次数
 * 
 * 输出
 * 楼盘1 楼盘2 共同出现的次数(考虑了衰减系数,每天的衰减系数不一样)
 * 
 */

public class HouseCountHz {
	

	  public static class HouseCountHzMapper 
	       extends Mapper<LongWritable, Text, Text, FloatWritable>{
	    
	Text keyv=new Text();
	
	FloatWritable valuev=new FloatWritable();
	    public void map(LongWritable key, Text value, Context context
	                    ) throws IOException, InterruptedException {
	    	
	    String[] s=value.toString().split("\t");
	    keyv.set(s[1]+"	"+s[2]);//楼盘1,楼盘2
	    Calendar date1=Calendar.getInstance();
		  Calendar d2=Calendar.getInstance();
	
		  Date b = null;
		  SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd");
		  try {
		    b=sdf.parse(s[0]);
		  } catch (ParseException e) {
		   e.printStackTrace();
		  }
		  d2.setTime(b);
		  long n=date1.getTimeInMillis();
		  long birth=d2.getTimeInMillis();
		  long sss=n-birth;
		  int day=(int)((sss)/(3600*24*1000)); //该条记录的日期与当前日期的日期差
		  float factor=1/(1+(float)(day-1)/10); //衰减系数
	    valuev.set(Float.parseFloat(s[3])*factor);
	    
	     context.write(keyv, valuev);
	    }
	  }
	  
	  public static class HouseCountHzReducer 
	       extends Reducer<Text,FloatWritable,Text,FloatWritable> {
	
		  FloatWritable result=new FloatWritable();
		  Text keyreverse=new Text();
	    public void reduce(Text key, Iterable<FloatWritable> values, 
	                       Context context
	                       ) throws IOException, InterruptedException {
	    	
	    	 float sum=0;
	    	 for (FloatWritable iw:values)
	    	 {
	    		 sum+=iw.get();
	    	 }
	    	 result.set(sum);
	    	 String[] keys=key.toString().split("\t");
	    	 keyreverse.set(keys[1]+"	"+keys[0]);
	     context.write(key, result)	;
	     context.write(keyreverse, result)	;
	    	
	    }
	  }

	  public static void main(String[] args) throws Exception {
	    Configuration conf = new Configuration();
	    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
	    if (otherArgs.length != 2) {
	      System.err.println("Usage: wordcount <in> <out>");
	      System.exit(2);
	    }
	    
	    FileSystem fstm = FileSystem.get(conf);   
	    Path outDir = new Path(otherArgs[1]);   
	    fstm.delete(outDir, true);
	    
   conf.set("mapred.textoutputformat.separator", "\t"); //reduce输出时key value中间的分隔符
	    Job job = new Job(conf, "HouseCountHz");
	    job.setNumReduceTasks(2);
	    job.setJarByClass(HouseCountHz.class);
	    job.setMapperClass(HouseCountHzMapper.class);
	    
	    job.setMapOutputKeyClass(Text.class);
	    job.setMapOutputValueClass(FloatWritable.class);
	
	    // 设置reduce
	    // 设置reduce的输出
	    job.setReducerClass(HouseCountHzReducer.class);
	    job.setOutputKeyClass(Text.class);
	    job.setOutputValueClass(FloatWritable.class);
	    //job.setNumReduceTasks(18);
	    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
	    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
	    System.exit(job.waitForCompletion(true) ? 0 : 1);
	  }
}


第四个MR

import java.io.IOException;
import java.util.Iterator;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;



/*
 * 输入数据:
 * 楼盘1 楼盘2 共同出现的次数
 * 
 * 输出数据
 *  楼盘1 楼盘2,楼盘3,楼盘4 (按次数排序)
 */

public class HouseRowToCol {
	
	public static class Partitioner1 extends Partitioner<TextPairSecond, Text> {
		  @Override
		  //分区
		  public int getPartition(TextPairSecond key, Text value, int numParititon) {
		 			  return Math.abs((new Text(key.getFirst().toString()+key.getSecond().toString())).hashCode() * 127) % numParititon;

		  }
	}
	//分组
		  public static class Comp1 extends WritableComparator {
			  public Comp1() {
			   super(TextPairSecond.class, true);
			  }
			  @SuppressWarnings("unchecked")
			  public int compare(WritableComparable a, WritableComparable b) {
				  TextPairSecond t1 = (TextPairSecond) a;
				  TextPairSecond t2 = (TextPairSecond) b;
			    return t1.getFirst().compareTo(t2.getFirst());

			  }
			}
		  
		  //排序
		  public static class KeyComp extends WritableComparator {
			  public KeyComp() {
			   super(TextPairSecond.class, true);
			  }
			  @SuppressWarnings("unchecked")
			  public int compare(WritableComparable a, WritableComparable b) {
				  TextPairSecond t1 = (TextPairSecond) a;
				  TextPairSecond t2 = (TextPairSecond) b;
			   int comp= t1.getFirst().compareTo(t2.getFirst());
			   if (comp!=0)
				   return comp;
			   return -t1.getSecond().compareTo(t2.getSecond());
			  }
			} 
	  public static class HouseRowToColMapper 
	       extends Mapper<LongWritable, Text, TextPairSecond, Text>{

		  Text houseid1=new Text();
		  Text houseid2=new Text();
		  FloatWritable weight=new FloatWritable();
	    public void map(LongWritable key, Text value, Context context
	                    ) throws IOException, InterruptedException {
	    	
	     String s[]=value.toString().split("\t");
	 
	       weight.set(Float.parseFloat(s[2]));
	       houseid1.set(s[0]);
	       houseid2.set(s[1]);
	     TextPairSecond tp=new TextPairSecond(houseid1,weight); 
	     context.write(tp, houseid2);
	    }
	  }
	  
	  public static class HouseRowToColReducer 
	       extends Reducer<TextPairSecond,Text,Text,Text> {
		  
	   Text valuev=new Text();
	    public void reduce(TextPairSecond key, Iterable<Text> values, 
	                       Context context
	                       ) throws IOException, InterruptedException {
	    	Text keyv=key.getFirst();
	    	Iterator<Text> it=values.iterator();
	    	StringBuilder sb=new StringBuilder(it.next().toString());
	    	while(it.hasNext())
	    	{
	    		sb.append(","+it.next().toString());
	    	}
	    	valuev.set(sb.toString());
	    	context.write(keyv, valuev);
	    	
	    	
	    	
	    }
	  }

	  public static void main(String[] args) throws Exception {
	    Configuration conf = new Configuration();
	    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
	    if (otherArgs.length != 2) {
	      System.err.println("Usage: wordcount <in> <out>");
	      System.exit(2);
	    }
	    
	    FileSystem fstm = FileSystem.get(conf);   
	    Path outDir = new Path(otherArgs[1]);   
	    fstm.delete(outDir, true);
	    
   conf.set("mapred.textoutputformat.separator", "\t"); //reduce输出时key value中间的分隔符
	    Job job = new Job(conf, "HouseRowToCol");
	    job.setNumReduceTasks(4);
	    job.setJarByClass(HouseRowToCol.class);
	    job.setMapperClass(HouseRowToColMapper.class);
	    
	    job.setMapOutputKeyClass(TextPairSecond.class);
	    job.setMapOutputValueClass(Text.class);
	    // 设置partition
	    job.setPartitionerClass(Partitioner1.class);
	    // 在分区之后按照指定的条件分组
	    job.setGroupingComparatorClass(Comp1.class);
	    job.setSortComparatorClass(KeyComp.class);
	    // 设置reduce
	    // 设置reduce的输出
	    job.setReducerClass(HouseRowToColReducer.class);
	    job.setOutputKeyClass(Text.class);
	    job.setOutputValueClass(Text.class);
	    //job.setNumReduceTasks(18);
	    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
	    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
	    System.exit(job.waitForCompletion(true) ? 0 : 1);
	  }
}




 

 

相关内容