Java生产消费模式实例代码[已测试]


/////////////////////////////////////////////////////////// Test.java 测试代码

public class Test1 {

    /**
     * @param args
     */
    public static void main(String[] args) {

        //如有多出生产此处q必须为单例模式,如最后那个源文件!
        QueueThread q = new QueueThread();
        // 启动10条消费线程
        for (int i = 0; i < 5; i++) {
            Consumer c = new Consumer(q);
            c.start();
        }
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // for (int i = 0; i < 50; i++) {

        Producer p = new Producer(q);
        p.setStr("生产线程产生ggggggggg  ");
        p.start();
        // }
        // 非生产线程产生数据
        for (int i = 0; i < 50; i++) {

            q.put(" 非生产线程产生数据  " + i);
        }

    }

}

/////////////////////////////////////////// QueueThread.java 线程池

import java.util.LinkedList;

public class QueueThread {

    final static int MaxLength = 10;// 等待队列大小

    private static LinkedList thread_pool = null;

    public QueueThread() {
        synchronized (this) {
            if (thread_pool == null) {
                System.out.println("初始化");
                thread_pool = new LinkedList();
            }
        }
    }

    /**
     * 判断是否已满
     *
     * @return
     */
    public boolean isFull() {

        return thread_pool.size() >= MaxLength ? true : false;

    }

    /**
     * 判断是否为空
     *
     * @return
     */
    public boolean isEmpty() {

        return thread_pool.isEmpty();

    }

    /**
     * 生产模式:插入数据到队列
     *
     * @param obj
     */
    public synchronized void put(Object obj) {
        while (isFull()) {
            try {
                System.out.println("满了等待。。。。。。");
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        // 如果没满,插入
        // 插入到队尾
        System.out.println("没满。。。。。");
        thread_pool.addLast(obj);
        notify();// notifyAll();?
    }

    /**
     * 消费模式:从队列里面取出一个对象
     *
     * @return
     */
    public synchronized Object get() {
        // 如果是空的则等待
        while (isEmpty()) {
            System.out.println("没数据等待。。。。。");
            try {
                wait();
            } catch (InterruptedException e) {

                e.printStackTrace();
            }
        }
        System.out.println("有数据,干活。。。。。");
        notify();
        return thread_pool.removeFirst();

    }

}

///////////////////////////////////////// Consumer.java

/**
 * 消费线程
 *
 * @author Mygia.mai
 *
 */
public class Consumer extends Thread {
   
    private QueueThread q;
   
    public Consumer(QueueThread q){
        this.q=q;
    }

    public void run() {
        while (true) {
            System.out.println(getName() + " 开始执行消费!");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            System.out.println(getName() + q.get());
            System.out.println(getName() + " 消费结束!");
        }
    }
}

////////////////////////////////// Producer.java 生产者线程

public class Producer extends Thread {

    QueueThread q;

    private String str;

    Producer(QueueThread q) {
        this.q = q;
    }

    public String getStr() {
        return str;
    }

    public void setStr(String str) {
        this.str = str;
    }

    public void run() {
        for (int i = 0; i < 50; i++)
            q.put(str + i);
    }

}

////////////////////////////////////////// InitQueue.java 初始化消费线程池

public class InitQueue {

    private static QueueThread queue = null;

    public InitQueue() {
        synchronized (this) {
            if (queue == null) {
                System.out.println("初始化消费线程队列!启动5条");
                queue = new QueueThread();
                // 启动5条消费线程
                for (int i = 0; i < 5; i++) {
                    Consumer c = new Consumer(queue);
                    c.start();
                }
            }
        }
    }

    /**
     * 获取
     * @return
     */
    public QueueThread getQueue() {
        return queue;
    }

}

相关内容