Java中的多线程


  简介

  为了给并发程序开发提供更好的支持,Java不仅提供了Thread类、Runnable接口等简单的多线程支持工具,还提供了用于多线程管理的线程池,用于提高并发程序的性能。

无限制线程的缺陷

  多线程的软件设计方法确实可以提高多核处理器的计算能力,提高系统的性能和吞吐量,但是如果不加限制的使用多线程,对于系统性能不仅不能提升,反而会下降产生不利影响。

  简单的线程创建方法new Thread().start(),通过thread来启动线程,并且由系统自动的回收线程。在简单的系统这样做并没有问题,但是在真实的生产系统中,在某一时刻会有大量的请求,这样就需要为每一个请求创建一个线程,而当线程数量过大时,会耗尽CPU和内存资源。

  虽然线程是一种轻量级的工具,但是其创建和销毁也需要消耗一定的时间;其次,线程本身也是需要占用内存空间的,大量的线程会抢占内存只有,导致Out of memory异常,并且大量线程的回收会给GC带来很大压力,延长GC停顿的时间。

  因此,对线程的使用必须控制一个度,在适当的范围内使用会提供系统性能,但是,一旦超过了这个范围,大量的线程就会拖垮整个系统。在生产环境下,必须要对其进行管理和控制。

简单线程池的实现

  前面介绍在多线程中不断的创建和销毁线程会带来额外的开销,这样就需要引入一种线程复用机制,即线程池。线程池的基本功能就是进行线程复用,当系统接受一个提交的任务,需要一个线程时,并不急于去创建一个线程,而是去现场池中寻找,是否有闲置的线程,若有,直接使用线程池中的线程工作,如没有,再去创建新的线程。待任务完成后,也不是简单的销毁线程,而是将线程放回线程池中,以便下次复用。

  上面已经把线程池实现的原理简单说明了一下,下面我们自己实现一个线程,来了解一下线程池实现的核心功能,有助于理解线程池的实现。

  线程池实现代码:

public class ThreadPool {
    private static ThreadPool instance = null;
    //空闲线程队列
    private List<PThread> idelThreads;
    //已有的线程总数
    private int threadCounter;
    private boolean isShutdown = false;
   
    public ThreadPool() {
        idelThreads = new Vector<PThread>(5);
        threadCounter=0;
    }

    public synchronized int getCreatedThreadsCount() {
       
        return threadCounter;
    }
   
    //取得线程池实例
    public synchronized static ThreadPool getInstatce(){
        if(instance==null){
            instance = new ThreadPool();
        }
        return instance;
    }
   
    //把线程重新放回到池中
   
    public synchronized void repool(PThread repoolThread){
        if(!isShutdown){
            idelThreads.add(repoolThread);
        }else{
            repoolThread.shutdown();
        }
    }
   
    //停止池中所有线程
    public synchronized void shutdown(){
        isShutdown = true;
        for (int i = 0; i < idelThreads.size(); i++) {
            PThread pthread = idelThreads.get(i);
            pthread.shutdown();
        }
    }
    //执行任务
    public synchronized void start(Runnable target){
        PThread pthread = null;
        //如果有闲置线程
        if(idelThreads.size()>0){
            int index = idelThreads.size()-1;
            pthread=idelThreads.get(index);
            idelThreads.remove(index);
            pthread.setTarget(target);
        }else{//如果没有闲置线程
            threadCounter++;
            PThread p = new PThread(instance, target, "PThread#"+threadCounter);
            p.start();
        }
    }
   
   
   
}

  从代码中可以看出,线程池中有一个闲置线程的队列,在执行任务时,如果有闲置线程,则从线程池中取线程执行任务,如果没有现在线程,则创建新的线程,并且在现场使用完毕后,会将线程重新放回到闲置线程队列中。

  另外,线程池的使用还需要一个永不退出的线程的配合使用,该线程在手动关闭前永不结束,并且一直等待新任务的到来。代码如下:

public class PThread extends Thread{
    //线程池
    private ThreadPool pool;
    //任务
    private Runnable target;
    private boolean isShutDown = false;
    private boolean isIdle = false;
    public PThread(ThreadPool pool, Runnable target,String name) {
        super(name);
        this.pool = pool;
        this.target = target;
    }
   
    public synchronized Runnable getTarget() {
        return target;
    }
    public synchronized boolean isIdle() {
        return isIdle;
    }

    @Override
    public void run() {
       
        while(!isShutDown){
            isIdle = false;
           
            if(target!=null){
                //运行任务
                target.run();
            }
            //任务结束,闲置任务
            isIdle=true;
           
            try {
                pool.repool(this);
                synchronized (this) {
                    //线程闲置,等待任务到来
                    wait();
                }
            } catch (Exception e) {
                // TODO: handle exception
            }
           
            isIdle=false;
        }
   
    }
   
    public synchronized void setTarget(Runnable target){
        this.target=target;
        //设置任务之后,通知run方法,开始执行
        notifyAll();
    }
   
    public synchronized void shutdown(){
        isShutDown=true;
        notifyAll();
    }
   
   
}

执行线程:

public class MyThread implements Runnable{
   
   
    private String name;
   
   
   
    public MyThread() {
       
    }


    public MyThread(String name) {
        this.name = name;
    }


    @Override
    public void run() {
        // TODO Auto-generated method stub
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
   
}

测试代码:

public class TestClient {
    public static void main(String[] args) {
       
        ExecutorService executor = Executors.newCachedThreadPool();
       
        long begin = System.currentTimeMillis();
       
        for (int i = 0; i < 1000; i++) {
            //new Thread(new MyThread("testnopoolThread"+i)).start();
           
            ThreadPool.getInstatce().start(new MyThread("testpoolThread"+i));
           
            //executor.execute(new MyThread("executorpoolThread"+i));
        }
       
        System.out.println(System.currentTimeMillis()-begin);
   
    }
}

   线程池能减少线程频繁调度的开销,线程的复用,对系统性能的提升效果比较明显。

•Executor框架

  JDK提供了一整套的Executor框架,帮助开发人员有效的进行线程控制。ThreadPoolExecutor表示一个线程池,Executors类扮演着线程池工厂的角色,通过Executor可以取得一个特定功能的线程池。

  newFixedThreadPool():该方法返回一个固定线程数量的线程池,该线程池中线程的数量始终保持不变。当���个任务提交后,线程池中若有空闲线程则立即执行,若没有,新任务会被保存在一个任务队列中,待有现车空闲时,便处理任务队列中的任务。

  newSingleThreadExecutor():该方法返回只有一个线程的线程池。若多余的任务被提交到该线程池,任务会被保存到一个任务队列中,若线程空闲,按先进先出的顺序执行队列中的任务。

  newCacheThreadPool():该方法返回一个根据实际情况调整线程数量的线程池。若有空闲线程可以复用,则优先选择使用可复用的线程,否则,创建新的线程处理新任务。

  newSingleThreadScheduledExecutor()方法:该方法返回一个ScheduledExecutorService对象,线程池大小为1,在给定时间内执行某一任务。

•自定义线程池

  newFixedThreadPool、newSingleThreadExecutor、newCacheThreadPool的内部实现都实现了ThreadPoolExecutor。在ThreadPoolExecutor中有一个最主要的构造函数:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

更多详情见请继续阅读下一页的精彩内容

  • 1
  • 2
  • 下一页

相关内容