Thread

  1. @Override 
  2. protected Object clone() throws CloneNotSupportedException { 
  3. throw new CloneNotSupportedException(); 

答案显而易见,线程是不支持clone 的。我们需要重新new 一个Thread来重新运行。其实我们只需要将原来的Worker里的Runnable换成我们自己的task,然后将访问权限适当放开就可以了。还有,就是让我们的CustomThreadPoolExecutor继承Thread,因为它需要定时监控自己的所有的worker里Thread的运行状态。

CustomThreadPoolExecutor

  1. public class CustomThreadPoolExecutor extends ThreadPoolExecutor implements Runnable {  
  2. public void execute(Testask command) { 
  3. ….//将执行接口改为接收我们的业务类 
  4. … 
  5. … 
  6. private final class Worker 
  7. extends AbstractQueuedSynchronizer 
  8. implements Runnable 
  9. … 
  10. Testask firstTask; //将Runnable改为我们的业务类,方便查看状态 
  11. … 
  12. Worker(Testask firstTask) { 
  13. …//同样将初始化参数改为我们的业务类 
  14.  
  15. public static void main(String[] args) { 
  16. CustomThreadPoolExecutor pool = new CustomThreadPoolExecutor(0, Integer.MAX_VALUE, 
  17. 60L, TimeUnit.SECONDS, 
  18. new SynchronousQueue<Runnable>()); 
  19.          
  20. Testask task = new Testask(); 
  21. task.setInterval(1); 
  22. pool.execute(task); 
  23.          
  24. Testask task1 = new Testask(); 
  25. task1.setInterval(2); 
  26. pool.execute(task1); 
  27.          
  28. new Thread(pool).start(); 
  29.  
  30. @Override 
  31. public void run() { 
  32. while (!Thread.currentThread().isInterrupted()) { 
  33. try { 
  34. long current = System.currentTimeMillis(); 
  35. Set<Testask> toReExecute = new HashSet<Testask>(); 
  36. System.out.println("\t number is " + number); 
  37. for (Worker wrapper : workers) { 
  38. Testask tt = wrapper.firstTask; 
  39. if (tt != null) { 
  40. if (current - tt.getLastExecTime() > tt.getInterval() * 5 * 1000) { 
  41. wrapper.interruptIfStarted(); 
  42. remove(tt); 
  43. if (tt.getFiles() != null) { 
  44. for (File file : tt.getFiles()) { 
  45. file.delete(); 
  46. System.out.println("THread is timeout : " + tt + " " + new Date()); 
  47. toReExecute.add(tt); 
  48. if (toReExecute.size() > 0) { 
  49. mainLock.lock(); 
  50. try { 
  51. for (Testask tt : toReExecute) { 
  52. execute(tt);    // execute this task again 
  53. }  
  54. } finally { 
  55. mainLock.unlock(); 
  56. } catch (Exception e1) { 
  57. System.out.println("Error happens when we trying to interrupt and restart a code task "); 
  58. try { 
  59. Thread.sleep(500); 
  60. } catch (InterruptedException e) { 

Testask

  1. class Testask implements Runnable { 
  2. ….. 
  3.  
  4. @Override 
  5. public void run() { 
  6. while (!Thread.currentThread().isInterrupted()) { 
  7. lastExecTime = System.currentTimeMillis(); 
  8. System.out.println(Thread.currentThread().getName() + " is running -> " + new Date()); 
  9. try { 
  10. CustomThreadPoolExecutor.number++; 
  11. Thread.sleep(getInterval() * 6 * 1000); 
  12.                 System.out.println(Thread.currentThread().getName() + " after sleep"); 
  13. } catch (InterruptedException e) { 
  14. Thread.currentThread().interrupt(); 
  15. System.out.println("InterruptedException happens"); 
  16. System.out.println("Going to die"); 

最终方案

综上,最稳妥的就是使用JDK自带的ThreadPoolExecutor, 如果需要对池里的task进行任意时间的控制,可以考虑全面更新,全方面,360度无死角的定制自己的线程池当然是最好的方案,但是一定要注意对于共享对象的处理,适当的处理好并发访问共享对象的方法。

鉴于我们的场景,由于时间紧,而且需要了解的task并不多,暂时选用全部重新更新的策略。上线后,抽时间把自己定制的ThreadPoolExecutor搞定,然后更新上去!




相关内容