一些算法的MapReduce实现——图的BFS遍历
一些算法的MapReduce实现——图的BFS遍历
BFS(G, s) for each vertex u ∈ V [G] - {s} do color[u] ← WHITE d[u] ← ∞ π[u] ← NIL //除了源顶点s之外,第1-4行置每个顶点为白色,置每个顶点u的d[u]为无穷大, //置每个顶点的父母为NIL。 color[s] ← GRAY //第8行,将源顶点s置为灰色,这是因为在过程开始时,源顶点已被发现。 d[s] ← 0 //将d[s]初始化为0。 π[s] ← NIL //将源顶点的父顶点置为NIL。 Q ← Ø ENQUEUE(Q, s) //入队 //第12、13行,初始化队列Q,使其仅含源顶点s。 while Q ≠ Ø do u ← DEQUEUE(Q) //出队 //第16行,确定队列头部Q头部的灰色顶点u,并将其从Q中去掉。 for each v ∈ Adj[u] //for循环考察u的邻接表中的每个顶点v do if color[v] = WHITE then color[v] ← GRAY //置为灰色 d[v] ← d[u] + 1 //距离被置为d[u]+1 π[v] ← u //u记为该顶点的父母 ENQUEUE(Q, v) //插入队列中 color[u] ← BLACK //u 置为黑色
1<tab>2,3|0|GRAY|source 2<tab>1,3,4,5|Integer.MAX_VALUE|WHITE|null 3<tab>1,4,2|Integer.MAX_VALUE|WHITE|null 4<tab>2,3|Integer.MAX_VALUE|WHITE|null 5<tab>2|Integer.MAX_VALUE|WHITE|null
利用上面的数据,指定3个reducer对其进行处理,从开始的算法简介可以看出,不是一步MapReduce过程就可以把整个graph遍历一遍的,这需要迭代,也就需要多次运行MapReduce过程,直到所有的节点都被访问过,也就是节点的颜色都被标记为黑色,就退出。
2<tab>1,3,4,5,|1|GRAY|1
5<tab>2,|Integer.MAX_VALUE|WHITE|null
Reducer 2: (part-r-00001)
3<tab>1,4,2,|1|GRAY|1
Reducer 3: (part-r-00002)
1<tab>2,3,|0|BLACK|source
4<tab>2,3,|Integer.MAX_VALUE|WHITE|null
2<tab>1,3,4,5,|1|BLACK|1
5<tab>2,|2|GRAY|2
Reducer 2: (part-r-00001)
3<tab>1,4,2,|1|BLACK|1
Reducer 3: (part-r-00002)
1<tab>2,3,|0|BLACK|source
4<tab>2,3,|2|GRAY|2
2<tab>1,3,4,5,|1|BLACK|1
5<tab>2,|2|BLACK|2
Reducer 2: (part-r-00001)
3<tab>1,4,2,|1|BLACK|1
Reducer 3: (part-r-00002)
1<tab>2,3,|0|BLACK|source
4<tab>2,3,|2|BLACK|2
static enum MoreIterations { numberOfIterations }
上面我们定义一个枚举变量,
package com.joey.mapred.graph.utils; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.io.Text; public class Node { /** * three possible colors a node can have to keep track * of the visiting status of the nodes during graph search */ public static enum Color { WHITE, //unvisited GRAY, // visited, unprocess BLACK // processed }; private String id; // id of the node private int distance; // distance of the node from source node // list of the edges private List<String> edges = new ArrayList<String>(); private Color color = Color.WHITE; // parent/ predecessor of the node // The parent of the source is marked "source" to leave it unchanged private String parent; public Node() { distance = Integer.MAX_VALUE; color = Color.WHITE; parent = null; } public Node(String nodeInfo) { String[] inputVal = nodeInfo.split("\t"); String key = ""; String val = ""; try { key = inputVal[0]; // node id // the list of adjacent nodes, distance, color, parent val = inputVal[1]; } catch (Exception e) { e.printStackTrace(); System.exit(1); } String[] tokens = val.split("\\|"); this.id = key; for (String s : tokens[0].split(",")) { if (s.length() > 0) edges.add(s); } if (tokens[1].equalsIgnoreCase("Integer.MAX_VALUE")) { this.distance = Integer.MAX_VALUE; } else { this.distance = Integer.parseInt(tokens[1]); } this.color = Color.valueOf(tokens[2]); this.parent = tokens[3]; } public Text getNodeInfo() { StringBuilder sb = new StringBuilder(); for (String v : edges) { sb.append(v).append(","); } sb.append("|"); if (this.distance < Integer.MAX_VALUE) { sb.append(this.distance).append("|"); } else { sb.append("Integer.MAX_VALUE").append("|"); } sb.append(color.toString()).append("|"); sb.append(getParent()); return new Text(sb.toString()); } public String getId() { return id; } public void setId(String id) { this.id = id; } public int getDistance() { return distance; } public void setDistance(int distance) { this.distance = distance; } public List<String> getEdges() { return edges; } public void setEdges(List<String> edges) { this.edges = edges; } public Color getColor() { return color; } public void setColor(Color color) { this.color = color; } public String getParent() { return parent; } public void setParent(String parent) { this.parent = parent; } }
package com.joey.mapred.graph; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import com.joey.mapred.graph.utils.Node; import com.joey.mapred.graph.utils.Node.Color; public class TraverseGraph { /** * * Description : Mapper class that implements the map part of Breadth-first * search algorithm. The nodes colored WHITE or BLACK are emitted as such. For * each node that is colored GRAY, a new node is emitted with the distance * incremented by one and the color set to GRAY. The original GRAY colored * node is set to BLACK color and it is also emitted. * * Input format <key, value> : <line offset in the input file (automatically * assigned), * nodeID<tab>list_of_adjacent_nodes|distance_from_the_source|color|parent> * * Output format <key, value> : <nodeId, (updated) * list_of_adjacent_nodes|distance_from_the_source|color|parent node> * * Reference : * http://www.johnandcailin.com/blog/cailin/breadth-first-graph-search * -using-iterative-map-reduce-algorithm * */ // the type parameters are the input keys type, the input values type, the // output keys type, the output values type public static class TraverseMapper extends Mapper<Object, Text, Text, Text> { protected void map(Object key, Text value, Context context, Node inNode) throws IOException, InterruptedException { if (inNode.getColor() == Color.GRAY) { for (String neighbor : inNode.getEdges()) { Node adjacentNode = new Node(); adjacentNode.setId(neighbor); adjacentNode.setDistance(inNode.getDistance() + 1); adjacentNode.setColor(Node.Color.GRAY); adjacentNode.setParent(inNode.getId()); context.write(new Text(adjacentNode.getId()), adjacentNode.getNodeInfo()); } // this node is done, color it black inNode.setColor(Node.Color.BLACK); } context.write(new Text(inNode.getId()), inNode.getNodeInfo()); } } /** * * Description : Reducer class that implements the reduce part of parallel * Breadth-first search algorithm. Make a new node which combines all * information for this single node id that is for each key. The new node * should have the full list of edges, the minimum distance, the darkest * Color, and the parent/predecessor node * * Input format <key,value> : <nodeId, * list_of_adjacent_nodes|distance_from_the_source|color|parent_node> * * Output format <key,value> : <nodeId, * (updated)list_of_adjacent_nodes|distance_from_the_source|color|parent_node> * */ public static class TraverseReducer extends Reducer<Text, Text, Text, Text> { protected Node reduce(Text key, Iterable<Text> values, Context context, Node outNode) throws IOException, InterruptedException { // set the node id as the key outNode.setId(key.toString()); for (Text value : values) { Node inNode = new Node(key.toString() + "\t" + value.toString()); if (inNode.getEdges().size() > 0) { outNode.setEdges(inNode.getEdges()); } if (inNode.getDistance() < outNode.getDistance()) { outNode.setDistance(inNode.getDistance()); outNode.setParent(inNode.getParent()); } if (inNode.getColor().ordinal() > outNode.getColor().ordinal()) { outNode.setColor(inNode.getColor()); } } context.write(key, new Text(outNode.getNodeInfo())); return outNode; } } }
package com.joey.mapred.graph; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.ToolRunner; import com.joey.mapred.BaseDriver; import com.joey.mapred.graph.TraverseGraph.TraverseMapper; import com.joey.mapred.graph.TraverseGraph.TraverseReducer; import com.joey.mapred.graph.utils.Node; /** * Description : MapReduce program to solve the single-source shortest path * problem using parallel breadth-first search. This program also illustrates * how to perform iterative map-reduce. * * The single source shortest path is implemented by using Breadth-first search * concept. * * Reference : * http://www.johnandcailin.com/blog/cailin/breadth-first-graph-search * -using-iterative-map-reduce-algorithm * */ public class BFSearchDriver extends BaseDriver { static class SearchMapperSSSP extends TraverseMapper { public void map(Object key, Text value, Context context) throws IOException, InterruptedException { Node inNode = new Node(value.toString()); // calls the map method of the super class SearchMapper super.map(key, value, context, inNode); } } static class SearchReducerSSSP extends TraverseReducer { // the parameters are the types of the input key, the values associated with // the key and the Context object through which the Reducer communicates // with the Hadoop framework public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { // create a new out node and set its values Node outNode = new Node(); // call the reduce method of SearchReducer class outNode = super.reduce(key, values, context, outNode); // if the color of the node is gray, the execution has to continue, this // is done by incrementing the counter if (outNode.getColor() == Node.Color.GRAY) context.getCounter(MoreIterations.numberOfIterations).increment(1L); } } public int run(String[] args) throws Exception { int iterationCount = 0; // counter to set the ordinal number of the // intermediate outputs Job job; long terminationValue = 1; // while there are more gray nodes to process while (terminationValue > 0) { job = getJobConf(args); // get the job configuration String input, output; // setting the input file and output file for each iteration // during the first time the user-specified file will be the input whereas // for the subsequent iterations // the output of the previous iteration will be the input if (iterationCount == 0) { // for the first iteration the input will be the first input argument input = args[0]; } else { // for the remaining iterations, the input will be the output of the // previous iteration input = args[1] + iterationCount; } output = args[1] + (iterationCount + 1); // setting the output file FileInputFormat.setInputPaths(job, new Path(input)); FileOutputFormat.setOutputPath(job, new Path(output)); job.waitForCompletion(true); Counters jobCntrs = job.getCounters(); terminationValue = jobCntrs .findCounter(MoreIterations.numberOfIterations).getValue(); iterationCount++; } return 0; } static enum MoreIterations { numberOfIterations } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new BFSearchDriver(), args); if(args.length != 2){ System.err.println("Usage: <in> <output name> "); } System.exit(res); } @Override protected Job getJobConf(String[] args) throws Exception { JobInfo jobInfo = new JobInfo() { @Override public Class<?> getJarByClass() { return BFSearchDriver.class; } @Override public Class<? extends Mapper> getMapperClass() { return SearchMapperSSSP.class; } @Override public Class<? extends Reducer> getCombinerClass() { return null; } @Override public Class<? extends Reducer> getReducerClass() { return SearchReducerSSSP.class; } @Override public Class<?> getOutputKeyClass() { return Text.class; } @Override public Class<?> getOutputValueClass() { return Text.class; } @Override public Class<? extends InputFormat> getInputFormatClass() { return TextInputFormat.class; } @Override public Class<? extends OutputFormat> getOutputFormatClass() { return TextOutputFormat.class; } }; return setupJob("BFSSearch", jobInfo); } }
评论暂时关闭