1. throw new IllegalThreadStateException(); 
  2.         group.add(this); 
  3.         boolean started = false
  4.         try { 
  5.         start0();    // 启动线程的native方法 
  6.         started = true
  7.         } finally { 
  8.             try { 
  9.                 if (!started) { 
  10.                     group.threadStartFailed(this); 
  11.                 } 
  12.             } catch (Throwable ignore) { 
  13.             } 
  14.         } 
  15.     } 

恩,只有是NEW状态才能够调用native方法启动一个线程。好吧,到这里了,就普及也自补一下jvm里的线程状态:

所有的线程状态::

NEW —— 还没有启动过

RUNNABLE —— 正在jvm上运行着

BLOCKED —— 正在等待锁/信号量被释放

WAITING —— 等待其他某个线程的某个特定动作

TIMED_WAITING —— A thread that is waiting for another thread to perform an action for up to a specified waiting time is in this state.

TERMINATED —— 退出,停止

线程在某个时间点上只可能存在一种状态,这些状态是jvm里的,并不反映操作系统线程的状态。查一下Thread的API,没有对其状态进行修改的API。那么这条路是不通的吗?

仔细考虑一下……

如果把任务做成Runnable实现类,然后在把这个实现类丢进线程池调度器之前,利用此Runnable构造一个Thread,是不是这个Thread对象就能够控制这个runnable对象,进而控制在线程池中运行着的task了呢?非也!让我们看看Thread和ThreadPoolExecutor对Runnable的处理吧。

Thread

  1. /* What will be run. */ 
  2. private Runnable target; 

结合上面的start()方法,很容易猜出,start0()会把target弄成一个线程来进行运行。

ThreadPoolExecutor

  1. public void execute(Runnable command) { 
  2.         if (command == null) 
  3.             throw new NullPointerException(); 
  4.         int c = ctl.get(); 
  5.         if (workerCountOf(c) < corePoolSize) { 
  6.             if (addWorker(command, true)) 
  7.                 return; 
  8.             c = ctl.get(); 
  9.         } 
  10.         if (isRunning(c) && workQueue.offer(command)) { 
  11.             int recheck = ctl.get(); 
  12.             if (! isRunning(recheck) && remove(command)) 
  13.                 reject(command); 
  14.             else if (workerCountOf(recheck) == 0) 
  15.                 addWorker(null, false); 
  16.         } 
  17.         else if (!addWorker(command, false)) 
  18.             reject(command); 
  19. private boolean addWorker(Runnable firstTask, boolean core) { 
  20. … 
  21. boolean workerStarted = false
  22. boolean workerAdded = false
  23. Worker w = null
  24. try { 
  25. final ReentrantLock mainLock = this.mainLock; 
  26. w = new Worker(firstTask); 
  27. final Thread t = w.thread; 
  28. if (t != null) { 
  29. mainLock.lock(); 
  30. try { 
  31. int c = ctl.get(); 
  32. int rs = runStateOf(c); 
  33. if (rs < SHUTDOWN || 
  34. (rs == SHUTDOWN && firstTask == null)) { 
  35. if (t.isAlive()) // precheck that t is startable 
  36. throw new IllegalThreadStateException(); 
  37. workers.add(w); 
  38. int s = workers.size(); 
  39. if (s > largestPoolSize) 
  40. largestPoolSize = s; 
  41. workerAdded = true
  42. } finally { 
  43. mainLock.unlock(); 
  44. if (workerAdded) { 
  45. t.start(); 
  46. workerStarted = true
  47. } finally { 
  48. if (! workerStarted) 
  49. addWorkerFailed(w); 
  50. return workerStarted; 

那么Worker又是怎样的呢?

Worker

  1. private final class Worker 
  2. extends AbstractQueuedSynchronizer 
  3. implements Runnable 
  4. final Thread thread; 
  5. Runnable firstTask; 
  6. volatile long completedTasks; 
  7. Worker(Runnable firstTask) { 
  8. setState(-1); //调用runWorker之前不可以interrupt 
  9. this.firstTask = firstTask; 
  10. this.thread = getThreadFactory().newThread(this); 
  11. public void run() { 
  12. runWorker(this); 
  13. ……   
  14. ……. 
  15. void interruptIfStarted() { 
  16. Thread t; 
  17. if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { 
  18. try { 
  19. t.interrupt(); 
  20. } catch (SecurityException ignore) { 

可见worker里既包装了Runnable对象——task,又包装了一个Thread对象——以自己作为初始化参数,因为worker也是Runnable对象。然后对外提供了运行与停止接口,run()和interruptIfStarted()。回顾上面使用Thread的例子不禁有了新的领悟,我们把一个Thread对象交给ThreadPoolExecutor执行后,实际的调用是对Thread(FileTask())对象,我们暂时称之为workerWrapper。那么我们在池外进行FileTask.interrupt()操作影响的是FileTask对象,而不是workerWrapper。所以可能上面对于start()方法二次调用不是特别适当。更恰当的应该是在fileTask.interrupt()的时候就跑出异常,因为从来没有对fileTask对象执行过start()方法,这时候去interrupt就会出现错误。具体如下图:

分析到此,我们已经明确除了调用ThreadPoolExecutor了的interruptWorkers()方法别无其他途径操作这些worker了。

  1. private void interruptWorkers() { 
  2. final ReentrantLock mainLock = this.mainLock; 
  3. mainLock.lock(); 
  4. try { 
  5. for (Worker w : workers) 
  6. w.interruptIfStarted(); 
  7. } finally { 
  8. mainLock.unlock(); 

重启线程池

●TaskManager

  1. public class TaskManager  implements Runnable { 
  2. ….. 
  3. public TaskManager (Set<FileTask> runners) { 
  4. super(); 
  5. this.runners = runners; 
  6. executeTasks(runners); 
  7. private void executeTasks(Set<FileTask> runners) { 
  8. for (FileTask task : runners) { 
  9. pool.execute(task); 
  10. System.out.println(task.getClass().getSimpleName() + " has been started"); 

@Override

  1. public void run() { 
  2. while (!Thread.currentThread().isInterrupted()) { 
  3. try { 
  4. long current = System.currentTimeMillis(); 
  5. for (FileTask wrapper : runners) { 
  6. if (wrapper.getLastExecTime() != 0 && current - wrapper.getLastExecTime() > wrapper.getInterval() * 5 * 1000) {    // 开始忘了乘以1000 
  7. wrapper.interrupt(); 
  8. if (wrapper.getFiles() != null){ 
  9. for (File file : wrapper.getFiles()) { 
  10. file.delete(); 
  11. System.out.println("Going to shutdown the thread pool"); 
  12. List<Runnable> shutdownNow = pool.shutdownNow();    // 不等当前pool里的任务执行完,直接关闭线程池 
  13. for (Runnable run : shutdownNow) { 
  14. System.out.println(run + " going to be shutdown"); 
  15. while (pool.awaitTermination(1, TimeUnit.SECONDS)) {   
  16. System.out.println("The thread pool has been shutdown " + new Date()); 
  17. executeTasks(runners); // 重新执行 
  18. Thread.sleep(200); 
  19. } catch (Exception e1) { 
  20. e1.printStackTrace(); 
  21. try { 
  22. Thread.sleep(500); 
  23. } catch (InterruptedException e) { 
  24. public static void main(String[] args) { 
  25. Set<FileTask> tasks = new HashSet<FileTask>(); 
  26.         
  27. FileTask task = new FileTask(); 
  28. task.setInterval(1); 
  29. task.setName("task-1"); 
  30. tasks.add(task); 
  31.                
  32. FileTask task1 = new FileTask(); 
  33. task1.setInterval(2); 
  34. task.setName("task-2"); 
  35. tasks.add(task1); 
  36.         
  37. TaskManager  codeManager = new TaskManager (tasks); 
  38. new Thread(codeManager).start(); 
  39. }    

成功!把整个的ThreadPoolExector里所有的worker全部停止,之后再向其队列里重新加入要执行的两个task(注意这里并没有清空,只是停止而已)。这样做虽然能够及时处理task,但是一个很致命的缺点在于,如果不能明确的知道ThreadPoolExecutor要执行的task,就没有办法重新执行这些任务。




相关内容