java使用默认线程池踩过的坑(1)(2)
- throw new IllegalThreadStateException();
- group.add(this);
- boolean started = false;
- try {
- start0(); // 启动线程的native方法
- started = true;
- } finally {
- try {
- if (!started) {
- group.threadStartFailed(this);
- }
- } catch (Throwable ignore) {
- }
- }
- }
恩,只有是NEW状态才能够调用native方法启动一个线程。好吧,到这里了,就普及也自补一下jvm里的线程状态:
所有的线程状态::
●NEW —— 还没有启动过
●RUNNABLE —— 正在jvm上运行着
●BLOCKED —— 正在等待锁/信号量被释放
●WAITING —— 等待其他某个线程的某个特定动作
●TIMED_WAITING —— A thread that is waiting for another thread to perform an action for up to a specified waiting time is in this state.
●TERMINATED —— 退出,停止
线程在某个时间点上只可能存在一种状态,这些状态是jvm里的,并不反映操作系统线程的状态。查一下Thread的API,没有对其状态进行修改的API。那么这条路是不通的吗?
仔细考虑一下……
如果把任务做成Runnable实现类,然后在把这个实现类丢进线程池调度器之前,利用此Runnable构造一个Thread,是不是这个Thread对象就能够控制这个runnable对象,进而控制在线程池中运行着的task了呢?非也!让我们看看Thread和ThreadPoolExecutor对Runnable的处理吧。
●Thread
- /* What will be run. */
- private Runnable target;
结合上面的start()方法,很容易猜出,start0()会把target弄成一个线程来进行运行。
●ThreadPoolExecutor
- public void execute(Runnable command) {
- if (command == null)
- throw new NullPointerException();
- int c = ctl.get();
- if (workerCountOf(c) < corePoolSize) {
- if (addWorker(command, true))
- return;
- c = ctl.get();
- }
- if (isRunning(c) && workQueue.offer(command)) {
- int recheck = ctl.get();
- if (! isRunning(recheck) && remove(command))
- reject(command);
- else if (workerCountOf(recheck) == 0)
- addWorker(null, false);
- }
- else if (!addWorker(command, false))
- reject(command);
- }
- private boolean addWorker(Runnable firstTask, boolean core) {
- …
- boolean workerStarted = false;
- boolean workerAdded = false;
- Worker w = null;
- try {
- final ReentrantLock mainLock = this.mainLock;
- w = new Worker(firstTask);
- final Thread t = w.thread;
- if (t != null) {
- mainLock.lock();
- try {
- int c = ctl.get();
- int rs = runStateOf(c);
- if (rs < SHUTDOWN ||
- (rs == SHUTDOWN && firstTask == null)) {
- if (t.isAlive()) // precheck that t is startable
- throw new IllegalThreadStateException();
- workers.add(w);
- int s = workers.size();
- if (s > largestPoolSize)
- largestPoolSize = s;
- workerAdded = true;
- }
- } finally {
- mainLock.unlock();
- }
- if (workerAdded) {
- t.start();
- workerStarted = true;
- }
- }
- } finally {
- if (! workerStarted)
- addWorkerFailed(w);
- }
- return workerStarted;
- }
那么Worker又是怎样的呢?
●Worker
- private final class Worker
- extends AbstractQueuedSynchronizer
- implements Runnable
- {
- final Thread thread;
- Runnable firstTask;
- volatile long completedTasks;
- Worker(Runnable firstTask) {
- setState(-1); //调用runWorker之前不可以interrupt
- this.firstTask = firstTask;
- this.thread = getThreadFactory().newThread(this);
- }
- public void run() {
- runWorker(this);
- }
- ……
- …….
- void interruptIfStarted() {
- Thread t;
- if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
- try {
- t.interrupt();
- } catch (SecurityException ignore) {
- }
- }
- }
- }
可见worker里既包装了Runnable对象——task,又包装了一个Thread对象——以自己作为初始化参数,因为worker也是Runnable对象。然后对外提供了运行与停止接口,run()和interruptIfStarted()。回顾上面使用Thread的例子不禁有了新的领悟,我们把一个Thread对象交给ThreadPoolExecutor执行后,实际的调用是对Thread(FileTask())对象,我们暂时称之为workerWrapper。那么我们在池外进行FileTask.interrupt()操作影响的是FileTask对象,而不是workerWrapper。所以可能上面对于start()方法二次调用不是特别适当。更恰当的应该是在fileTask.interrupt()的时候就跑出异常,因为从来没有对fileTask对象执行过start()方法,这时候去interrupt就会出现错误。具体如下图:
分析到此,我们已经明确除了调用ThreadPoolExecutor了的interruptWorkers()方法别无其他途径操作这些worker了。
- private void interruptWorkers() {
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- for (Worker w : workers)
- w.interruptIfStarted();
- } finally {
- mainLock.unlock();
- }
- }
重启线程池
●TaskManager
- public class TaskManager implements Runnable {
- …..
- public TaskManager (Set<FileTask> runners) {
- super();
- this.runners = runners;
- executeTasks(runners);
- }
- private void executeTasks(Set<FileTask> runners) {
- for (FileTask task : runners) {
- pool.execute(task);
- System.out.println(task.getClass().getSimpleName() + " has been started");
- }
- }
@Override
- public void run() {
- while (!Thread.currentThread().isInterrupted()) {
- try {
- long current = System.currentTimeMillis();
- for (FileTask wrapper : runners) {
- if (wrapper.getLastExecTime() != 0 && current - wrapper.getLastExecTime() > wrapper.getInterval() * 5 * 1000) { // 开始忘了乘以1000
- wrapper.interrupt();
- if (wrapper.getFiles() != null){
- for (File file : wrapper.getFiles()) {
- file.delete();
- }
- }
- System.out.println("Going to shutdown the thread pool");
- List<Runnable> shutdownNow = pool.shutdownNow(); // 不等当前pool里的任务执行完,直接关闭线程池
- for (Runnable run : shutdownNow) {
- System.out.println(run + " going to be shutdown");
- }
- while (pool.awaitTermination(1, TimeUnit.SECONDS)) {
- System.out.println("The thread pool has been shutdown " + new Date());
- executeTasks(runners); // 重新执行
- Thread.sleep(200);
- }
- }
- }
- } catch (Exception e1) {
- e1.printStackTrace();
- }
- try {
- Thread.sleep(500);
- } catch (InterruptedException e) {
- }
- }
- }
- public static void main(String[] args) {
- Set<FileTask> tasks = new HashSet<FileTask>();
- FileTask task = new FileTask();
- task.setInterval(1);
- task.setName("task-1");
- tasks.add(task);
- FileTask task1 = new FileTask();
- task1.setInterval(2);
- task.setName("task-2");
- tasks.add(task1);
- TaskManager codeManager = new TaskManager (tasks);
- new Thread(codeManager).start();
- }
- }
成功!把整个的ThreadPoolExector里所有的worker全部停止,之后再向其队列里重新加入要执行的两个task(注意这里并没有清空,只是停止而已)。这样做虽然能够及时处理task,但是一个很致命的缺点在于,如果不能明确的知道ThreadPoolExecutor要执行的task,就没有办法重新执行这些任务。
评论暂时关闭