Java生产消费模式实例代码[已测试]
Java生产消费模式实例代码[已测试]
/////////////////////////////////////////////////////////// Test.java 测试代码
public class Test1 {
/**
* @param args
*/
public static void main(String[] args) {
//如有多出生产此处q必须为单例模式,如最后那个源文件!
QueueThread q = new QueueThread();
// 启动10条消费线程
for (int i = 0; i < 5; i++) {
Consumer c = new Consumer(q);
c.start();
}
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// for (int i = 0; i < 50; i++) {
Producer p = new Producer(q);
p.setStr("生产线程产生ggggggggg ");
p.start();
// }
// 非生产线程产生数据
for (int i = 0; i < 50; i++) {
q.put(" 非生产线程产生数据 " + i);
}
}
}
/////////////////////////////////////////// QueueThread.java 线程池
import java.util.LinkedList;
public class QueueThread {
final static int MaxLength = 10;// 等待队列大小
private static LinkedList thread_pool = null;
public QueueThread() {
synchronized (this) {
if (thread_pool == null) {
System.out.println("初始化");
thread_pool = new LinkedList();
}
}
}
/**
* 判断是否已满
*
* @return
*/
public boolean isFull() {
return thread_pool.size() >= MaxLength ? true : false;
}
/**
* 判断是否为空
*
* @return
*/
public boolean isEmpty() {
return thread_pool.isEmpty();
}
/**
* 生产模式:插入数据到队列
*
* @param obj
*/
public synchronized void put(Object obj) {
while (isFull()) {
try {
System.out.println("满了等待。。。。。。");
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 如果没满,插入
// 插入到队尾
System.out.println("没满。。。。。");
thread_pool.addLast(obj);
notify();// notifyAll();?
}
/**
* 消费模式:从队列里面取出一个对象
*
* @return
*/
public synchronized Object get() {
// 如果是空的则等待
while (isEmpty()) {
System.out.println("没数据等待。。。。。");
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("有数据,干活。。。。。");
notify();
return thread_pool.removeFirst();
}
}
///////////////////////////////////////// Consumer.java
/**
* 消费线程
*
* @author Mygia.mai
*
*/
public class Consumer extends Thread {
private QueueThread q;
public Consumer(QueueThread q){
this.q=q;
}
public void run() {
while (true) {
System.out.println(getName() + " 开始执行消费!");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println(getName() + q.get());
System.out.println(getName() + " 消费结束!");
}
}
}
////////////////////////////////// Producer.java 生产者线程
public class Producer extends Thread {
QueueThread q;
private String str;
Producer(QueueThread q) {
this.q = q;
}
public String getStr() {
return str;
}
public void setStr(String str) {
this.str = str;
}
public void run() {
for (int i = 0; i < 50; i++)
q.put(str + i);
}
}
////////////////////////////////////////// InitQueue.java 初始化消费线程池
public class InitQueue {
private static QueueThread queue = null;
public InitQueue() {
synchronized (this) {
if (queue == null) {
System.out.println("初始化消费线程队列!启动5条");
queue = new QueueThread();
// 启动5条消费线程
for (int i = 0; i < 5; i++) {
Consumer c = new Consumer(queue);
c.start();
}
}
}
}
/**
* 获取
* @return
*/
public QueueThread getQueue() {
return queue;
}
}
评论暂时关闭