并发遍历二叉树 Java 实现


单线程遍历二叉树是数据结构书中必讲的算法, 但多线程访问呢?

我最近写了一个多线程遍历二叉树的算法, 当然既然是多线程,就不保证一定的顺序, 只要节点能访问到就算数.

算法的基本思路

1) 使用 java.util.concurrent.ThreadPoolExecutor 构造线程池

2) 当线程池有空余线程把树的右子树拆分给另外一个线程处理, 否则自己处理右子数

要考虑的情况

1) 如果二叉树非常不平衡(左子树很深,右子树很浅), 会出现一个线程忙碌,而另外一个线程早结束的情况

工作线程过一定时间间隔就会检查线程池是否有空闲线程, 有则分离当前节点的右子树并委托给ThreadPool

2)判断所有工作都结束

启动工作任务时用 HashSet 记录, 工作任务结束时清楚HashSet 记录, 当HashSet size 为0 时,所有工作结束.

注意事项

1) 即使任务结束,ThreadPool 还是在运行的( waiting task ), 要调用 shutdown 通知 ThreadPool 结束

2) 主线程结束, 程序并不会结束, 想让主线程等待 ThreadPool 结束, 要调用ThreadPool 的 awaitTermination 方法

程序

package com.pnp.javathreadpool;

import java.util.HashSet;
import java.util.Random;
import java.util.Stack;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ConcurTravBTree {
 class BTreeNode<T extends Comparable> {

  public BTreeNode<T> left;
  public BTreeNode<T> right;
  public T value;

  public BTreeNode(BTreeNode<T> left, BTreeNode<T> right, T value) {
   this.left = left;
   this.right = right;
   this.value = value;
  }

  public BTreeNode(T value) {
   this(null, null, value);
  }
 }

 public class BTree<T extends Comparable> {
  String dumpStr;

  public BTreeNode<T> root;

  public BTree() {
  }

  public BTree(BTreeNode<T> root) {
   this.root = root;
  }

  public void orderAdd(T v) {
   if (root == null)
    root = new BTreeNode(v);
   else
    orderAdd(root, v);
  }

  public void orderAdd(BTreeNode<T> node, T v) {
   if (node.value.compareTo(v) > 0) {
    if (node.left == null)
     node.left = new BTreeNode(v);
    else
     orderAdd(node.left, v);
   } else {
    if (node.right == null)
     node.right = new BTreeNode(v);
    else
     orderAdd(node.right, v);
   }
  }
 
  public void preOrderTraverse() {
   preOrderTraverse(root);
  }
 
  public void preOrderTraverse( BTreeNode<T> node) {
   if (node != null) {
    //System.out.println(node.value);
    try {
     Thread.sleep(100);
    } catch (InterruptedException e) {
     e.printStackTrace();
    }
    preOrderTraverse(node.left);
    preOrderTraverse(node.right);
   }
  } 
 }

 static int global_id = 0;
 public static final int POOLSIZE = 4;
 
 ThreadPoolExecutor threadPool;
 BTree<Integer> tree;
 HashSet<Integer> tasks = new HashSet<Integer>();
 

 public void init() {
  tree = new BTree<Integer>();

  /*
  int[] data = new int[] { 1, 3, 5 };
  for (int i = 0; i < data.length; i++)
   tree.orderAdd(data[i]);
  */
 
  Random r = new Random();
  for( int i=0; i< 500; i++) {
   tree.orderAdd( r.nextInt(500));
  }

  threadPool = new ThreadPoolExecutor(POOLSIZE, // corePoolSize
    POOLSIZE + 2, // maximumPoolSize
    60, // keepAliveTime
    TimeUnit.SECONDS, // keepAliveTime unit
    new ArrayBlockingQueue<Runnable>(POOLSIZE + 2), // workQueue
    new ThreadPoolExecutor.CallerRunsPolicy()); // handler
 }

 public void start() {
  threadPool.execute(new Visitor(tree.root));
 }

 class Visitor implements Runnable {
  private int id;
  private BTreeNode<Integer> node;

  public Visitor(BTreeNode<Integer> node) {
   this.id = ++global_id;
   this.node = node;
  }

  public boolean handleNode(BTreeNode<Integer> node) {
   if (node == null)
    return true;

   if (ConcurTravBTree.this.threadPool.getActiveCount() <= ConcurTravBTree.this.threadPool
     .getCorePoolSize()) {
    System.out.println("--- ActiveCount "
      + ConcurTravBTree.this.threadPool.getActiveCount());
    ConcurTravBTree.this.threadPool.execute(new Visitor(node));
    return true;
   }
   return false;
  }

  public void run() {
   System.out.println("[ job" + id + " ] ---start  ");
   tasks.add(id);
   if (handleNode(node.right)) // assign Node to a task
    node.right = null; //

   // pre-order traverse
   Stack<BTreeNode<Integer>> stack = new Stack<BTreeNode<Integer>>();
   while (node != null || !stack.isEmpty()) {
    long start = System.currentTimeMillis();
    while (node != null) {
     //System.out.println("[ job" + id + " ]" + node.value);
     try {
      Thread.sleep(100);
     } catch (InterruptedException e) {
      e.printStackTrace();
     }

     stack.push(node);
     node = node.left;
    }
    node = stack.pop();
    node = node.right;

    long cur = System.currentTimeMillis();
    if (cur - start > 1000) {
     if (handleNode(node)) {
      node = null;
     }
    }
   }
   System.out.println("[ job" + id + " ] ---end");
   tasks.remove(id);
   if ( tasks.size() == 0) {
    threadPool.shutdown();
   }
  }
 }

 public static void main(String[] args) {
  ConcurTravBTree tt = new ConcurTravBTree();
  System.out.println("init tree...");
  tt.init();

  System.out.println("start preOrderTraverse traverse...");
  long start = System.currentTimeMillis();
  tt.tree.preOrderTraverse();
  long end = System.currentTimeMillis();
  System.out.println("---- preOrderTraverse cost " + (end - start));
 
  System.out.println("start concurrent traverse...");
  long cstart = System.currentTimeMillis();
  tt.start();
  try {
   tt.threadPool.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
  } catch (InterruptedException e) {
   e.printStackTrace();
  }
  long cend = System.currentTimeMillis();
  System.out.println("---- current traverse cost " + (cend - cstart));
 }
}

 

  • 1
  • 2
  • 下一页

相关内容