MapReduce 实现最小生成树,mapreduce最小


对应于下面的一个输入图,求出最短距离:


对应与的输入文件在hdfs上面存储的形式如下:制表符分隔


代码如下:

package MST;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;


public class MST {

	/**
	 * @param args
	 */
	//路径写死的 可以修改
	
	static final String INPUT_PATH = "hdfs://localhost:9000/input1/MST";
	static final String OUTPUT_PATH = "hdfs://localhost:9000/outputMst";
	
	static enum MSTCounters{
		totalWeight	//计数作用 统计最后的最小权值
	}
	
	//配置驱动
	public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException {
		// TODO Auto-generated method stub
		Configuration conf = new Configuration();
		
		Job job = new Job(conf,MST.class.getSimpleName());
		
		
		final FileSystem filesystem = FileSystem.get(new URI(INPUT_PATH),conf);
		final Path outPath = new Path(OUTPUT_PATH);
		
		if(filesystem.exists(outPath)){
			filesystem.delete(outPath, true);
		}
		
		//读取文件位置
		FileInputFormat.setInputPaths(job,INPUT_PATH);
		job.setJarByClass(MST.class);
		
		//设置map
		job.setMapOutputKeyClass(IntWritable.class);
		job.setMapOutputValueClass(Text.class);
		job.setMapperClass(MSTMapper.class);
		
		//1.3分区
		job.setPartitionerClass (HashPartitioner.class);
		job.setNumReduceTasks(1);
		
		//1.4 排序 
		 
		//1.5 合并
		
		//2.1 网络拷贝
		
		//2.2 指定reducer类
		job.setReducerClass(MSTReducer.class);
		
		//2.3 输出类型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		
		//输出文件路径
		FileOutputFormat.setOutputPath(job, outPath);
		
		//执行
		job.waitForCompletion(true);
		
		//获取计数值 并且在终端输出
		
		Counters jobCounters = job.getCounters();
	
		long totalWeight = jobCounters.findCounter(MSTCounters.totalWeight).getValue();
		
		System.out.println("the total weight of the MST is "+totalWeight);
		
	}
	/**
	 * 
	 * 实验采用的最小生成树的算法是Kruskal算法:具体描述过程参考博客:blog.csdn.net/xd_122/article/details/40684223
	 *
	 */
	static class MSTMapper extends Mapper <LongWritable , Text , IntWritable , Text>{
		protected void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{
			Text srcDestPair = new Text();
			String[] inputTokens = value.toString().split("\t");
			String weight = inputTokens[0];
			int wt = Integer.parseInt(weight);
			
			srcDestPair.set(inputTokens[1]+":"+inputTokens[2]);
			
			context.write(new IntWritable(wt),srcDestPair);
		}
	}
	
	static class MSTReducer extends Reducer <IntWritable , Text , Text , Text >{
		
		Map<String,Set<String>> node_AssociatedSet = new HashMap<String, Set<String>>();		//记录节点信息 键值对
		
		protected void reduce(IntWritable key , Iterable<Text> values, Context context) throws IOException, InterruptedException{
			String strKey = new String();
			strKey += key;
			Text outputKey = new Text(strKey);
			
			for(Text val : values){
				//判断节点是否存在于同一个树中 
				boolean ignoreSameEdgeSet1 = false;
				boolean ignoreSameEdgeSet2 = false;
				boolean ignoreSameEdgeSet3 = false;
				
				Set<String> nodeSet = new HashSet<String>();
				
				String[] srcDest = val.toString().split(":");
				
				String src = srcDest[0];
				String dest = srcDest[1];
				
				ignoreSameEdgeSet1 = isSameSet(src,dest);
				
				nodeSet.add(src);
				nodeSet.add(dest);
				
				ignoreSameEdgeSet2 = unionSet(nodeSet,src,dest);
				ignoreSameEdgeSet3 = unionSet(nodeSet,dest,src);
				
				if(!ignoreSameEdgeSet1 && !ignoreSameEdgeSet2 && !ignoreSameEdgeSet3){
					long weight_value = Long.parseLong(outputKey.toString());
					
					//统计总权值  计数器 写入到计数类到totalWeight里面
					context.getCounter(MSTCounters.totalWeight).increment(weight_value);
					
					context.write(outputKey,val);
					
				}
			}
			
		}

		private boolean unionSet(Set<String> nodeSet, String src, String dest) {
			// TODO Auto-generated method stub
			boolean ignoreEdge = false;
			
			if(!node_AssociatedSet.containsKey(src)){
				node_AssociatedSet.put(src, nodeSet);
			}else{
				Set<String> associatedSet = node_AssociatedSet.get(src);
				Set<String> nodeset = new HashSet<String>();
				nodeset.addAll(associatedSet);
				
				Iterator<String> nodeItr = nodeset.iterator();
				Iterator<String> duplicateCheckItr = nodeset.iterator();
				
				while(duplicateCheckItr.hasNext()){
					String n = duplicateCheckItr.next();
					if(node_AssociatedSet.get(n).contains(dest)){
						ignoreEdge = true;
					}
				}
				while(nodeItr.hasNext()){
					String nextNode = nodeItr.next();
					if(!node_AssociatedSet.containsKey(nextNode)){
						node_AssociatedSet.put(nextNode, nodeSet);
					}
					node_AssociatedSet.get(nextNode).addAll(nodeSet);
				}
			}
			return ignoreEdge;
		}

		private boolean isSameSet(String src, String dest) {
			// TODO Auto-generated method stub
			boolean ignoreEdge = false;
			
			for(Map.Entry<String, Set<String>> node_AssociatedSetValue : node_AssociatedSet.entrySet()){
				Set<String> nodesInSameSet = node_AssociatedSetValue.getValue();
				if(nodesInSameSet.contains(src) &&nodesInSameSet.contains(dest)){
					ignoreEdge = true;
				}
			}		
			return ignoreEdge;
		}
		
	}

}

还有相应的其他知识思想:(并查集等 参考博客:http://blog.csdn.net/xd_122/article/details/40659179

上述的输出结果如下:


查看结果:


对应输出的权值在终端 打印出:14


更详细的讲解参考:http://hadooptutorial.wikispaces.com/Sorting+feature+of+MapReduce

相关内容

    暂无相关文章