PageRank算法的MapReduce实现,pagerankmapreduce


假设目前需要排名计算的网页只有4个:数据如下:

baidu	10.00 google,sina,nefu
google	10.00 baidu
sina	10.00 google
nefu	10.00 sina,google

1. baidu  存在三个外链接

2.google 存在1个外链接

3.sina 存在1个外链接

4.nefu. 存在2个外链接

由数据可以看出:所有链接都指向了google,所以google的PR应该最高,而由google指向的baidu的PR值 应该也很高。

代码如下:

package PageRank;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;

public class PageRank {
	/**
	 * @author XD
	 */
	static enum PageCount{
		Count,TotalPR
	}
	public static class  Map extends Mapper < LongWritable , Text , Text , Text >{
		protected void map(LongWritable key, Text value , Context context) throws IOException, InterruptedException{
			context.getCounter(PageCount.Count).increment(1);
			String[] kv = value.toString().split("\t");
			String _key = kv[0];
			String _value = kv[1];
			String _PRnLink[] = _value.split(" ");
			String pr = _PRnLink[0];
			String link = _PRnLink[1];
			context.write(new Text(_key),new Text(link));
			String[] site = link.split(",");
			float score = Float.valueOf(pr)/(site.length)*1.0f;
			for(int i=0;i<site.length;i++){
				context.write(new Text(site[i]), new Text(String.valueOf(score)));
			}
		}
	}
	public static class Reduce extends Reducer < Text , Text , Text, Text>{
		protected void reduce(Text key , Iterable<Text> values ,Context context) throws IOException, InterruptedException{
			StringBuilder sb = new StringBuilder();
			float factor  = 0.85f;	//阻尼因子
			float pr = 0f;
			for(Text val : values){
				String value = val.toString();
				int s = value.indexOf(".");
				if(s != -1){
					pr += Float.valueOf(value);
				}else{
					String site[] = value.split(",");
					int _len = site.length;
					for(int k=0;k<_len;k++){
						sb.append(site[k]);
						sb.append(",");
					}
				}
			}
			pr = ((1-factor)+(factor*(pr)));
			context.getCounter(PageCount.TotalPR).increment((int)(pr*1000));
			String output = pr+" "+sb.toString();
			context.write(key, new Text(output));
		}
	}
	public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException {
		// TODO Auto-generated method stub
		String input,output;
		int threshold = 100;
		int iteration = 0;
		int iterationLimit = 10;
	
		boolean status = false;
		
		while(iteration < iterationLimit){
			//展开反复迭代  注意 输入输出的路径 
			if((iteration % 2) == 0){
				input = "hdfs://localhost:9000/output_pr/p*";
				output = "hdfs://localhost:9000/output_pr2";
			}else{
				input = "hdfs://localhost:9000/output_pr2/p*";
				output = "hdfs://localhost:9000/output_pr";
			}
			Configuration conf = new Configuration();
			final FileSystem filesystem = FileSystem.get(new URI(input),conf);
			final Path outPath = new Path(output);
			if(filesystem.exists(outPath)){
				filesystem.delete(outPath, true);
			}
			Job job = new Job(conf,PageRank.class.getSimpleName());
			
			//1.1 读取文件 位置
			FileInputFormat.setInputPaths(job, input);
			
			//1.2指定的map类//1.3 map输出的key value 类型 要是和最终的输出类型是一样的 可以省略
			job.setMapperClass(Map.class);
			job.setMapOutputKeyClass(Text.class);
			job.setMapOutputValueClass(Text.class);
			job.setJarByClass(PageRank.class);
			
			//1.3 分区
			job.setPartitionerClass(HashPartitioner.class);
			
			job.setReducerClass(Reduce.class);
			//指定 reduce的输出类型
			job.setOutputKeyClass(Text.class);
			job.setOutputValueClass(Text.class);
			
			//指定写出到什么位置
			FileOutputFormat.setOutputPath(job, new Path(output));
			status = job.waitForCompletion(true);
			iteration++;
			long count = job.getCounters().findCounter(PageCount.Count).getValue();
			long TotalPr = job.getCounters().findCounter(PageCount.TotalPR).getValue();
			System.out.println("PageCount:"+count);
			System.out.println("TotalPR:"+TotalPr);
			double per_pr = TotalPr/(count*1.0d);
			System.out.println("PEr_er:"+per_pr);
			if((int)per_pr == threshold){
				System.out.println("Iteration:"+iteration);
				break;
			}	
		}
        System.exit(status?0:1);
	}
}

最后输出结果如下:



相关内容