package com.thread.blockingqueue.test2;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.BlockingQueue;public class ProducerConsumerService { public static void main(String[] args) { //创建大小为10的 BlockingQueue BlockingQueuequeue = new ArrayBlockingQueue<>(10); Producer producer = new Producer(queue); //这里Thread.sleep(500);的休眠时间大于Consumer的时间 Consumer consumer = new Consumer(queue); //开启 producer线程向队列中生产消息 new Thread(producer).start(); //开启 consumer线程 中队列中消费消息 new Thread(consumer).start(); System.out.println("Producer and Consumer has been started"); } }
package com.thread.blockingqueue.test2;import java.util.concurrent.BlockingQueue;public class Producer implements Runnable { private BlockingQueuequeue; public Producer(BlockingQueue q){ this.queue=q; } @Override public void run() { //生产消息 for(int i=0; i<50; i++){ Message msg = new Message(""+i); try { Thread.sleep(500); queue.put(msg); System.out.println("Produced "+msg.getMsg()); } catch (InterruptedException e) { e.printStackTrace(); } } //添加退出消息 Message msg = new Message("exit"); try { queue.put(msg); } catch (InterruptedException e) { e.printStackTrace(); } }}
package com.thread.blockingqueue.test2;import java.util.concurrent.BlockingQueue;public class Consumer implements Runnable { private BlockingQueuequeue; public Consumer(BlockingQueue q) { this.queue = q; } @Override public void run() { try { Message msg; // 获取并处理消息直到接收到“exit”消息 while ((msg = queue.take()).getMsg() != "exit") { Thread.sleep(10); System.out.println("Consumed " + msg.getMsg()); } } catch (InterruptedException e) { e.printStackTrace(); } }}
package com.thread.blockingqueue.test2;public class Message { private String msg; public Message(String str){ this.msg=str; } public String getMsg() { return msg; } }