一、简介
在 Java 多线程编程中,还有一个非常紧张的设计模式,它就是:生产者和消费者模子。
这种模子可以充分发挥 cpu 的多线程特性,通过一些平衡手段能有效的提升系统整体处理数据的速度,减轻系统负载,提高程序的效率和稳定性,同时实现模块之间的解耦。
那什么是生产者和消费者模子呢?
简单的说,生产者和消费者之间不直接进行交互,而是通过一个缓冲区来进行交互,生产者负责生成数据,然后存入缓冲区;消费者则负责处理数据,从缓冲区获取。
大致流程图如下:
对于最简单的生产者和消费者模子,总结下来,大概有以下几个特点:
- 缓冲区为空的时候,消费者不能消费,会进入休眠状态,直到有新数据进入缓冲区,再次被唤醒
- 缓冲区填满的时候,生产者不能生产,也会进入休眠状态,直到缓冲区有空间,再次被唤醒
生产者和消费者模子作为一个非常紧张的设计模子,它的优点在于:
- 解耦:生产者和消费者之间不直接进行交互,即使生产者和消费者的代码发生变化,也不会对对方产生影响
- 消峰:例如在某项工作中,如果 A 操作生产数据的速度很快,B 操作处理速度很慢,那么 A 操作就必须等待 B 操作完成才能竣事,反之亦然。如果将 A 操作和B 操作进行解耦,中间插入一个缓冲区,这样 A 操作将生产的数据存入缓冲区,就担当了;B 操作从缓冲区获取数据并进行处理,平衡好 A 操作和 B 操作之间的缓冲区,可以显著提升系统的数据处理能力
生产者和消费者模子的应用场景非常多,例如 Java 的线程池任务执行框架、消息中间件 rabbitMQ 等,因此掌握生产者和消费者模子,对于开辟者至关紧张。
下面我们通过几个案例,一起来了解一下生产者和消费者设计模子的实践思路。
二、代码实践
2.1、利用 wait / notify 方法实现思路
生产者和消费者模子,最简单的一种技能实践方案就是基于线程的 wait() / notify() 方法,也就是通知和唤醒机制,可以将两个操作实现解耦,具体代码实践如下。- /**
- * 缓冲区容器类
- */
- public class Container {
- /**
- * 缓冲区最大容量
- */
- private int capacity = 3;
- /**
- * 缓冲区
- */
- private LinkedList<Integer> list = new LinkedList<Integer>();
- /**
- * 添加数据到缓冲区
- * @param value
- */
- public synchronized void add(Integer value) {
- if(list.size() >= capacity){
- System.out.println("生产者:"+ Thread.currentThread().getName()+",缓冲区已满,生产者进入waiting...");
- try {
- // 进入等待状态
- wait();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- System.out.println("生产者:"+ Thread.currentThread().getName()+",add:" + value);
- list.add(value);
- //唤醒其他所有处于wait()的线程,包括消费者和生产者
- notifyAll();
- }
- /**
- * 从缓冲区获取数据
- */
- public synchronized void get() {
- if(list.size() == 0){
- System.out.println("消费者:"+ Thread.currentThread().getName()+",缓冲区为空,消费者进入waiting...");
- try {
- // 进入等待状态
- wait();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- // 从头部获取数据,并移除元素
- Integer val = list.removeFirst();
- System.out.println("消费者:"+ Thread.currentThread().getName()+",value:" + val);
- //唤醒其他所有处于wait()的线程,包括消费者和生产者
- notifyAll();
- }
- }
复制代码- /**
- * 生产者类
- */
- public class Producer extends Thread{
- private Container container;
- public Producer(Container container) {
- this.container = container;
- }
- @Override
- public void run() {
- for (int i = 0; i < 6; i++) {
- container.add(i);
- }
- }
- }
复制代码- /**
- * 消费者类
- */
- public class Consumer extends Thread{
- private Container container;
- public Consumer(Container container) {
- this.container = container;
- }
- @Override
- public void run() {
- for (int i = 0; i < 6; i++) {
- container.get();
- }
- }
- }
复制代码- /**
- * 测试类
- */
- public class MyThreadTest {
- public static void main(String[] args) {
- Container container = new Container();
- Producer producer = new Producer(container);
- Consumer consumer = new Consumer(container);
- producer.start();
- consumer.start();
- }
- }
复制代码 运行结果如下:- 生产者:Thread-0,add:0
- 生产者:Thread-0,add:1
- 生产者:Thread-0,add:2
- 生产者:Thread-0,缓冲区已满,生产者进入waiting...
- 消费者:Thread-1,value:0
- 消费者:Thread-1,value:1
- 消费者:Thread-1,value:2
- 消费者:Thread-1,缓冲区为空,消费者进入waiting...
- 生产者:Thread-0,add:3
- 生产者:Thread-0,add:4
- 生产者:Thread-0,add:5
- 消费者:Thread-1,value:3
- 消费者:Thread-1,value:4
- 消费者:Thread-1,value:5
复制代码 从日志上可以很清晰的看到,生产者线程生产一批数据之后,当缓冲区已经满了,会进入等待状态,此时会通知消费者线程;消费者线程处理完数据之后,当缓冲区没有数据时,也会进入等待状态,再次通知生产者线程。
2.2、利用 await / signal 方法实现思路
除此之外,我们还可以利用ReentrantLock和Condition类中的 await() / signal() 方法实现生产者和消费者模子。
缓冲区容器类,具体代码实践如下。- /**
- * 缓冲区容器类
- */
- public class Container {
- private Lock lock = new ReentrantLock();
- private Condition condition = lock.newCondition();
- private int capacity = 3;
- private LinkedList<Integer> list = new LinkedList<Integer>();
- /**
- * 添加数据到缓冲区
- * @param value
- */
- public void add(Integer value) {
- boolean flag = false;
- try {
- flag = lock.tryLock(3, TimeUnit.SECONDS);
- if(list.size() >= capacity){
- System.out.println("生产者:"+ Thread.currentThread().getName()+",缓冲区已满,生产者进入waiting...");
- // 进入等待状态
- condition.await();
- }
- System.out.println("生产者:"+ Thread.currentThread().getName()+",add:" + value);
- list.add(value);
- //唤醒其他所有处于wait()的线程,包括消费者和生产者
- condition.signalAll();
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- if(flag){
- lock.unlock();
- }
- }
- }
- /**
- * 从缓冲区获取数据
- */
- public void get() {
- boolean flag = false;
- try {
- flag = lock.tryLock(3, TimeUnit.SECONDS);
- if(list.size() == 0){
- System.out.println("消费者:"+ Thread.currentThread().getName()+",缓冲区为空,消费者进入waiting...");
- // 进入等待状态
- condition.await();
- }
- // 从头部获取数据,并移除元素
- Integer val = list.removeFirst();
- System.out.println("消费者:"+ Thread.currentThread().getName()+",value:" + val);
- //唤醒其他所有处于wait()的线程,包括消费者和生产者
- condition.signalAll();
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- if(flag){
- lock.unlock();
- }
- }
- }
- }
复制代码 生产者、消费者、测试类代码,跟上文同等,运行结果和上文介绍的也是一样。
2.3、多生产者和消费者的实现思路
上面介绍的都是一个生产者线程和一个消费者线程,模子比力简单。实际上,在业务开辟中,常常会出现多个生产者线程和多个消费者线程,按照以上的实现思路,会出现什么环境呢?
有大概会出现程序假死现象!下面我们来分析一下案例,如果有两个生产者线程 a1、a2,两个消费者线程 b1、b2,执行过程如下:
- 1.生产者线程 a1 执行生产数据的操作,发现缓冲区数据已经填满了,然后进入等待阶段,同时向外发起通知,唤醒其它线程
- 2.因为线程唤醒具有随机性,本应该唤醒消费者线程 b1,结果大概生产者线程 a2 被唤醒,查抄缓冲区数据已经填满了,又进入等待阶段,紧接向外发起通知,消费者线程得不到被执行的机会
- 3.消费者线程 b1、b2,也有大概会出现这个现象,本应该唤醒生产者线程,结果唤醒了消费者线程
碰到这种环境,应该怎样解决呢?
因为ReentrantLock和Condition的结合,编程具有高度灵活性,我们可以采用这种组合解决多生产者和多消费者中的假死问题。
具体实现逻辑如下:- /**
- * 缓冲区容器类
- */
- public class ContainerDemo {
- private Lock lock = new ReentrantLock();
- private Condition producerCondition = lock.newCondition();
- private Condition consumerCondition = lock.newCondition();
- private int capacity = 3;
- private LinkedList<Integer> list = new LinkedList<Integer>();
- /**
- * 添加数据到缓冲区
- * @param value
- */
- public void add(Integer value) {
- boolean flag = false;
- try {
- flag = lock.tryLock(3, TimeUnit.SECONDS);
- if(list.size() >= capacity){
- System.out.println("生产者:"+ Thread.currentThread().getName()+",缓冲区已满,生产者进入waiting...");
- // 生产者进入等待状态
- producerCondition.await();
- }
- System.out.println("生产者:"+ Thread.currentThread().getName()+",add:" + value);
- list.add(value);
- // 唤醒所有消费者处于wait()的线程
- consumerCondition.signalAll();
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- if(flag){
- lock.unlock();
- }
- }
- }
- /**
- * 从缓冲区获取数据
- */
- public void get() {
- boolean flag = false;
- try {
- flag = lock.tryLock(3, TimeUnit.SECONDS);
- if(list.size() == 0){
- System.out.println("消费者:"+ Thread.currentThread().getName()+",缓冲区为空,消费者进入waiting...");
- // 消费者进入等待状态
- consumerCondition.await();
- }
- // 从头部获取数据,并移除元素
- Integer val = list.removeFirst();
- System.out.println("消费者:"+ Thread.currentThread().getName()+",value:" + val);
- // 唤醒所有生产者处于wait()的线程
- producerCondition.signalAll();
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- if(flag){
- lock.unlock();
- }
- }
- }
- }
复制代码- /**
- * 生产者
- */
- public class Producer extends Thread{
- private ContainerDemo container;
- private Integer value;
- public Producer(ContainerDemo container, Integer value) {
- this.container = container;
- this.value = value;
- }
- @Override
- public void run() {
- container.add(value);
- }
- }
复制代码- /**
- * 消费者
- */
- public class Consumer extends Thread{
- private ContainerDemo container;
- public Consumer(ContainerDemo container) {
- this.container = container;
- }
- @Override
- public void run() {
- container.get();
- }
- }
复制代码- /**
- * 测试类
- */
- public class MyThreadTest {
- public static void main(String[] args) {
- ContainerDemo container = new ContainerDemo();
- List<Thread> threadList = new ArrayList<>();
- // 初始化6个生产者线程
- for (int i = 0; i < 6; i++) {
- threadList.add(new Producer(container, i));
- }
- // 初始化6个消费者线程
- for (int i = 0; i < 6; i++) {
- threadList.add(new Consumer(container));
- }
- // 启动线程
- for (Thread thread : threadList) {
- thread.start();
- }
- }
- }
复制代码 运行结果如下:- 生产者:Thread-0,add:0
- 生产者:Thread-1,add:1
- 生产者:Thread-2,add:2
- 生产者:Thread-3,缓冲区已满,生产者进入waiting...
- 生产者:Thread-4,缓冲区已满,生产者进入waiting...
- 生产者:Thread-5,缓冲区已满,生产者进入waiting...
- 消费者:Thread-6,value:0
- 消费者:Thread-7,value:1
- 生产者:Thread-3,add:3
- 生产者:Thread-4,add:4
- 生产者:Thread-5,add:5
- 消费者:Thread-8,value:2
- 消费者:Thread-9,value:3
- 消费者:Thread-10,value:4
- 消费者:Thread-11,value:5
复制代码 通过ReentrantLock定义两个Condition,一个表示生产者的Condition,一个表示消费者的Condition,唤醒的时候调用对应的signalAll()方法就可以解决假死现象。
三、小结
最后我们来总结一下,对于生产者和消费者模子,通过合理的编程实现,可以充分充分发挥 cpu 多线程的特性,显著的提升系统处理数据的效率。
对于生产者和消费者模子中的假死现象,可以利用ReentrantLock定义两个Condition,进行交错唤醒,以解决假死问题。
四、参考
1、https://www.cnblogs.com/xrq730/p/4855663.html
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |