读书人

广度优先BFS的MapReduce兑现

发布时间: 2012-07-19 16:02:19 作者: rapoo

广度优先BFS的MapReduce实现

社交网络中的图模型经常需要构造一棵树型结构:从一个特定的节点出发,例如,构造mary的朋友以及mary朋友的朋友的一棵树。

为构造这样的一棵树,最简单的方法是使用广度优先算法:

?

经常使用链表来表示图的节点以及节点之间的链接关系,如

?

frank -> {mary, jill}jill -> {frank, bob, james}mary -> {william, joe, erin} 

?

?表示,mary有3个朋友,分别是william,joe和erin

?

?将上述关系形式化表示为

?

0-> {1, 2}2-> {3, 4, 5}1-> {6, 7, 8} 

?

有了上述链表结构,我们可以得到:

单线程的BFS如下:

1、节点对象建模Node.java

?

import java.util.*;public class Node {public static enum Color {WHITE, GRAY, BLACK};private final int id;private int parent = Integer.MAX_VALUE;private int distance = Integer.MAX_VALUE;private List<Integer> edges = null;private Color color = Color.WHITE;public Node(int id) {this.id = id;}public int getId() {return this.id;}public int getParent() {return this.parent;}public void setParent(int parent) {this.parent = parent;}public int getDistance() {return this.distance;}public void setDistance(int distance) {this.distance = distance;}public Color getColor() {return this.color;}public void setColor(Color color) {this.color = color;}public List<Integer> getEdges() {return this.edges;}public void setEdges(List<Integer> vertices) {this.edges = vertices;}}

?

?

2、BFS算法 Graph.java

?

import java.util.*;public class Graph {  private Map<Integer, Node> nodes;  public Graph() {    this.nodes = new HashMap<Integer, Node>();  }  public void breadthFirstSearch(int source) {    // Set the initial conditions for the source node    Node snode = nodes.get(source);    snode.setColor(Node.Color.GRAY);    snode.setDistance(0);    Queue<Integer> q = new LinkedList<Integer>();    q.add(source);    while (!q.isEmpty()) {      Node unode = nodes.get(q.poll());      for (int v : unode.getEdges()) {        Node vnode = nodes.get(v);        if (vnode.getColor() == Node.Color.WHITE) {          vnode.setColor(Node.Color.GRAY);          vnode.setDistance(unode.getDistance() + 1);          vnode.setParent(unode.getId());          q.add(v);        }      }      unode.setColor(Node.Color.BLACK);    }  }  public void addNode(int id, int[] edges) {    // A couple lines of hacky code to transform our    // input integer arrays (which are most comprehensible    // write out in our main method) into List<Integer>    List<Integer> list = new ArrayList<Integer>();    for (int edge : edges)      list.add(edge);    Node node = new Node(id);    node.setEdges(list);    nodes.put(id, node);  }    public void print() {    for (int v : nodes.keySet()) {      Node vnode = nodes.get(v);      System.out.printf("v = %2d parent = %2d distance = %2d \n", vnode.getId(), vnode.getParent(),          vnode.getDistance());    }  }  public static void main(String[] args) {    Graph graph = new Graph();    graph.addNode(1, new int[] { 2, 5 });    graph.addNode(2, new int[] { 1, 5, 3, 4 });    graph.addNode(3, new int[] { 2, 4 });    graph.addNode(4, new int[] { 2, 5, 3 });    graph.addNode(5, new int[] { 4, 1, 2 });    graph.breadthFirstSearch(1);    graph.print();  }}

?

?

?

但是以上BFS单线程构造树形结构对于大数据的时候,显得苍白无力。

对此,下面提出基于MapReduce的BFS并行构造社交网络中的树图算法

?

使用MapReduce计算图模型,基本思想是在每个Map slot的迭代中“makes a mess” 而在 Reduce slot中“cleans up the mess”

假设,我们用如下方式表示一个节点:

?

ID    EDGES|DISTANCE_FROM_SOURCE|COLOR|

?

?其中,EDGES是一个用“,”隔开的链接到本节点的其他节点链表List,对于我们不知道链表中的节点到本节点的距离,

使用Integer.MAX_VALUE表示"unknown"。

从COLOR,我们可以知道本节点我们计算过没有,WHITE表示计算过。

假设,我们的输入数据如下,我们从节点1开始广度优先搜索,因此,初始时,标记节点1的距离为0,color为GRAY

?

1       2,5|0|GRAY|2       1,3,4,5|Integer.MAX_VALUE|WHITE|3       2,4|Integer.MAX_VALUE|WHITE|4       2,3,5|Integer.MAX_VALUE|WHITE|5       1,2,4|Integer.MAX_VALUE|WHITE|

?

?map slot负责找出所有COLOR为GEAY的节点。而,对于每个我们计算过的节点,即COLOR为GRAY的节点,对应地,map slot的输出为一个COLOR为BLACK的节点,其中的DISTANCE = DISTANCE + 1。同时,map slot也输出所有不是GEAY的节点,其中距离不变。

因此,上述输入的输出形式如下:

?

1       2,5|0|BLACK|2       NULL|1|GRAY|5       NULL|1|GRAY|2       1,3,4,5|Integer.MAX_VALUE|WHITE|3       2,4|Integer.MAX_VALUE|WHITE|4       2,3,5|Integer.MAX_VALUE|WHITE|5       1,2,4|Integer.MAX_VALUE|WHITE|

?

?

在reduce slot获取的数据都具有同一个key。例如,获取key=2的reduce slot的对应values值为:

?

2       NULL|1|GRAY|2       1,3,4,5|Integer.MAX_VALUE|WHITE|

?

?reduce slot的任务是从获取到的数据,经过采用:

?

?1、有邻接节点的节点

?2、所有有邻接节点的节点中的最小距离

?3、所有有邻接节点中颜色最深的节点

构造出新的输出,如,经过第一次MapReduce过程,我们得到如下形式的数据:

?

1       2,5,|0|BLACK2       1,3,4,5,|1|GRAY3       2,4,|Integer.MAX_VALUE|WHITE4       2,3,5,|Integer.MAX_VALUE|WHITE5       1,2,4,|1|GRAY

?

?第二次MapReduce过程,采用上述输出作为输入,以相同的逻辑运算,得到如下结果:

?

1       2,5,|0|BLACK2       1,3,4,5,|1|BLACK3       2,4,|2|GRAY4       2,3,5,|2|GRAY5       1,2,4,|1|BLACK

?

?第三次的输出为:

?

1       2,5,|0|BLACK2       1,3,4,5,|1|BLACK3       2,4,|2|BLACK4       2,3,5,|2|BLACK5       1,2,4,|1|BLACK

?

?

MapReduce迭代过程直到所有节点不为GRAY为止。

而如果有节点没有连接到源节点,那么可能迭代过程每次都有COLOR为WHITE的节点。

?

?

MapReduce的代码如下:

1、节点对象建模:Node.java

?

package org.apache.hadoop.examples;import java.util.*;import org.apache.hadoop.io.Text;public class Node {public static enum Color {WHITE, GRAY, BLACK};private final int id;private int distance;private List<Integer> edges = new ArrayList<Integer>();private Color color = Color.WHITE;public Node(String str) {String[] map = str.split("\t");String key = map[0];String value = map[1];String[] tokens = value.split("\\|");this.id = Integer.parseInt(key);for (String s : tokens[0].split(",")) {if (s.length() > 0) {edges.add(Integer.parseInt(s));}}if (tokens[1].equals("Integer.MAX_VALUE")) {this.distance = Integer.MAX_VALUE;} else {this.distance = Integer.parseInt(tokens[1]);}this.color = Color.valueOf(tokens[2]);}public Node(int id) {this.id = id;}public int getId() {return this.id;}public int getDistance() {return this.distance;}public void setDistance(int distance) {this.distance = distance;}public Color getColor() {return this.color;}public void setColor(Color color) {this.color = color;}public List<Integer> getEdges() {return this.edges;}public void setEdges(List<Integer> edges) {this.edges = edges;}public Text getLine() {StringBuffer s = new StringBuffer();for (int v : edges) {s.append(v).append(",");}s.append("|");if (this.distance < Integer.MAX_VALUE) {s.append(this.distance).append("|");} else {s.append("Integer.MAX_VALUE").append("|");}s.append(color.toString());return new Text(s.toString());}}
?

?

2、MapRecue广度优先搜索:

?

package org.apache.hadoop.examples;import java.io.IOException;import java.util.Iterator;import java.util.List;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.*;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;/** * This is an example Hadoop Map/Reduce application. * * It inputs a map in adjacency list format, and performs a breadth-first search. * The input format is * ID   EDGES|DISTANCE|COLOR * where * ID = the unique identifier for a node (assumed to be an int here) * EDGES = the list of edges emanating from the node (e.g. 3,8,9,12) * DISTANCE = the to be determined distance of the node from the source * COLOR = a simple status tracking field to keep track of when we're finished with a node * It assumes that the source node (the node from which to start the search) has * been marked with distance 0 and color GRAY in the original input.  All other * nodes will have input distance Integer.MAX_VALUE and color WHITE. */public class GraphSearch extends Configured implements Tool {  public static final Log LOG = LogFactory.getLog("org.apache.hadoop.examples.GraphSearch");  /**   * Nodes that are Color.WHITE or Color.BLACK are emitted, as is. For every   * edge of a Color.GRAY node, we emit a new Node with distance incremented by   * one. The Color.GRAY node is then colored black and is also emitted.   */  public static class MapClass extends MapReduceBase implements      Mapper<LongWritable, Text, IntWritable, Text> {    public void map(LongWritable key, Text value, OutputCollector<IntWritable, Text> output,        Reporter reporter) throws IOException {      Node node = new Node(value.toString());      // For each GRAY node, emit each of the edges as a new node (also GRAY)      if (node.getColor() == Node.Color.GRAY) {        for (int v : node.getEdges()) {          Node vnode = new Node(v);          vnode.setDistance(node.getDistance() + 1);          vnode.setColor(Node.Color.GRAY);          output.collect(new IntWritable(vnode.getId()), vnode.getLine());        }        // We're done with this node now, color it BLACK        node.setColor(Node.Color.BLACK);      }      // No matter what, we emit the input node      // If the node came into this method GRAY, it will be output as BLACK      output.collect(new IntWritable(node.getId()), node.getLine());    }  }  /**   * A reducer class that just emits the sum of the input values.   */  public static class Reduce extends MapReduceBase implements      Reducer<IntWritable, Text, IntWritable, Text> {    /**     * Make a new node which combines all information for this single node id.     * The new node should have      * - The full list of edges      * - The minimum distance      * - The darkest Color     */    public void reduce(IntWritable key, Iterator<Text> values,        OutputCollector<IntWritable, Text> output, Reporter reporter) throws IOException {      List<Integer> edges = null;      int distance = Integer.MAX_VALUE;      Node.Color color = Node.Color.WHITE;      while (values.hasNext()) {        Text value = values.next();        Node u = new Node(key.get() + "\t" + value.toString());        // One (and only one) copy of the node will be the fully expanded        // version, which includes the edges        if (u.getEdges().size() > 0) {          edges = u.getEdges();        }        // Save the minimum distance        if (u.getDistance() < distance) {          distance = u.getDistance();        }        // Save the darkest color        if (u.getColor().ordinal() > color.ordinal()) {          color = u.getColor();        }      }      Node n = new Node(key.get());      n.setDistance(distance);      n.setEdges(edges);      n.setColor(color);      output.collect(key, new Text(n.getLine()));         }  }  static int printUsage() {    System.out.println("graphsearch [-m <num mappers>] [-r <num reducers>]");    ToolRunner.printGenericCommandUsage(System.out);    return -1;  }  private JobConf getJobConf(String[] args) {    JobConf conf = new JobConf(getConf(), GraphSearch.class);    conf.setJobName("graphsearch");    // the keys are the unique identifiers for a Node (ints in this case).    conf.setOutputKeyClass(IntWritable.class);    // the values are the string representation of a Node    conf.setOutputValueClass(Text.class);    conf.setMapperClass(MapClass.class);    conf.setReducerClass(Reduce.class);    for (int i = 0; i < args.length; ++i) {      if ("-m".equals(args[i])) {        conf.setNumMapTasks(Integer.parseInt(args[++i]));      } else if ("-r".equals(args[i])) {        conf.setNumReduceTasks(Integer.parseInt(args[++i]));      }    }    return conf;  }  /**   * The main driver for word count map/reduce program. Invoke this method to   * submit the map/reduce job.   *    * @throws IOException   *           When there is communication problems with the job tracker.   */  public int run(String[] args) throws Exception {    int iterationCount = 0;    while (keepGoing(iterationCount)) {      String input;      if (iterationCount == 0)        input = "input-graph";      else        input = "output-graph-" + iterationCount;      String output = "output-graph-" + (iterationCount + 1);      JobConf conf = getJobConf(args);      FileInputFormat.setInputPaths(conf, new Path(input));      FileOutputFormat.setOutputPath(conf, new Path(output));      RunningJob job = JobClient.runJob(conf);      iterationCount++;    }    return 0;  }    private boolean keepGoing(int iterationCount) {    if(iterationCount >= 4) {      return false;    }        return true;  }  public static void main(String[] args) throws Exception {    int res = ToolRunner.run(new Configuration(), new GraphSearch(), args);    System.exit(res);  }}

?

参考:

breadth-first graph search using an iterative map-reduce algorithm


读书人网 >行业软件

热点推荐