多线程系列(十二) -生产者和消费者模子

打印 上一主题 下一主题

主题 918|帖子 918|积分 2754

一、简介

在 Java 多线程编程中,还有一个非常紧张的设计模式,它就是:生产者和消费者模子。
这种模子可以充分发挥 cpu 的多线程特性,通过一些平衡手段能有效的提升系统整体处理数据的速度,减轻系统负载,提高程序的效率和稳定性,同时实现模块之间的解耦。
那什么是生产者和消费者模子呢?
简单的说,生产者和消费者之间不直接进行交互,而是通过一个缓冲区来进行交互,生产者负责生成数据,然后存入缓冲区;消费者则负责处理数据,从缓冲区获取。
大致流程图如下:

对于最简单的生产者和消费者模子,总结下来,大概有以下几个特点:

  • 缓冲区为空的时候,消费者不能消费,会进入休眠状态,直到有新数据进入缓冲区,再次被唤醒
  • 缓冲区填满的时候,生产者不能生产,也会进入休眠状态,直到缓冲区有空间,再次被唤醒
生产者和消费者模子作为一个非常紧张的设计模子,它的优点在于:

  • 解耦:生产者和消费者之间不直接进行交互,即使生产者和消费者的代码发生变化,也不会对对方产生影响
  • 消峰:例如在某项工作中,如果 A 操作生产数据的速度很快,B 操作处理速度很慢,那么 A 操作就必须等待 B 操作完成才能竣事,反之亦然。如果将 A 操作和B 操作进行解耦,中间插入一个缓冲区,这样 A 操作将生产的数据存入缓冲区,就担当了;B 操作从缓冲区获取数据并进行处理,平衡好 A 操作和 B 操作之间的缓冲区,可以显著提升系统的数据处理能力
生产者和消费者模子的应用场景非常多,例如 Java 的线程池任务执行框架、消息中间件 rabbitMQ 等,因此掌握生产者和消费者模子,对于开辟者至关紧张。
下面我们通过几个案例,一起来了解一下生产者和消费者设计模子的实践思路。
二、代码实践

2.1、利用 wait / notify 方法实现思路

生产者和消费者模子,最简单的一种技能实践方案就是基于线程的 wait() / notify() 方法,也就是通知和唤醒机制,可以将两个操作实现解耦,具体代码实践如下。
  1. /**
  2. * 缓冲区容器类
  3. */
  4. public class Container {
  5.     /**
  6.      * 缓冲区最大容量
  7.      */
  8.     private int capacity = 3;
  9.     /**
  10.      * 缓冲区
  11.      */
  12.     private LinkedList<Integer> list = new LinkedList<Integer>();
  13.     /**
  14.      * 添加数据到缓冲区
  15.      * @param value
  16.      */
  17.     public synchronized void add(Integer value) {
  18.         if(list.size() >= capacity){
  19.             System.out.println("生产者:"+ Thread.currentThread().getName()+",缓冲区已满,生产者进入waiting...");
  20.             try {
  21.                 // 进入等待状态
  22.                 wait();
  23.             } catch (InterruptedException e) {
  24.                 e.printStackTrace();
  25.             }
  26.         }
  27.         System.out.println("生产者:"+ Thread.currentThread().getName()+",add:" + value);
  28.         list.add(value);
  29.         //唤醒其他所有处于wait()的线程,包括消费者和生产者
  30.         notifyAll();
  31.     }
  32.     /**
  33.      * 从缓冲区获取数据
  34.      */
  35.     public synchronized void get() {
  36.         if(list.size() == 0){
  37.             System.out.println("消费者:"+ Thread.currentThread().getName()+",缓冲区为空,消费者进入waiting...");
  38.             try {
  39.                 // 进入等待状态
  40.                 wait();
  41.             } catch (InterruptedException e) {
  42.                 e.printStackTrace();
  43.             }
  44.         }
  45.         // 从头部获取数据,并移除元素
  46.         Integer val = list.removeFirst();
  47.         System.out.println("消费者:"+ Thread.currentThread().getName()+",value:" + val);
  48.         //唤醒其他所有处于wait()的线程,包括消费者和生产者
  49.         notifyAll();
  50.     }
  51. }
复制代码
  1. /**
  2. * 生产者类
  3. */
  4. public class Producer extends Thread{
  5.     private Container container;
  6.     public Producer(Container container) {
  7.         this.container = container;
  8.     }
  9.     @Override
  10.     public void run() {
  11.         for (int i = 0; i < 6; i++) {
  12.             container.add(i);
  13.         }
  14.     }
  15. }
复制代码
  1. /**
  2. * 消费者类
  3. */
  4. public class Consumer extends Thread{
  5.     private Container container;
  6.     public Consumer(Container container) {
  7.         this.container = container;
  8.     }
  9.     @Override
  10.     public void run() {
  11.         for (int i = 0; i < 6; i++) {
  12.             container.get();
  13.         }
  14.     }
  15. }
复制代码
  1. /**
  2. * 测试类
  3. */
  4. public class MyThreadTest {
  5.     public static void main(String[] args) {
  6.         Container container = new Container();
  7.         Producer producer = new Producer(container);
  8.         Consumer consumer = new Consumer(container);
  9.         producer.start();
  10.         consumer.start();
  11.     }
  12. }
复制代码
运行结果如下:
  1. 生产者:Thread-0,add:0
  2. 生产者:Thread-0,add:1
  3. 生产者:Thread-0,add:2
  4. 生产者:Thread-0,缓冲区已满,生产者进入waiting...
  5. 消费者:Thread-1,value:0
  6. 消费者:Thread-1,value:1
  7. 消费者:Thread-1,value:2
  8. 消费者:Thread-1,缓冲区为空,消费者进入waiting...
  9. 生产者:Thread-0,add:3
  10. 生产者:Thread-0,add:4
  11. 生产者:Thread-0,add:5
  12. 消费者:Thread-1,value:3
  13. 消费者:Thread-1,value:4
  14. 消费者:Thread-1,value:5
复制代码
从日志上可以很清晰的看到,生产者线程生产一批数据之后,当缓冲区已经满了,会进入等待状态,此时会通知消费者线程;消费者线程处理完数据之后,当缓冲区没有数据时,也会进入等待状态,再次通知生产者线程。
2.2、利用 await / signal 方法实现思路

除此之外,我们还可以利用ReentrantLock和Condition类中的 await() / signal() 方法实现生产者和消费者模子。
缓冲区容器类,具体代码实践如下。
  1. /**
  2. * 缓冲区容器类
  3. */
  4. public class Container {
  5.     private Lock lock = new ReentrantLock();
  6.     private Condition condition = lock.newCondition();
  7.     private int capacity = 3;
  8.     private LinkedList<Integer> list = new LinkedList<Integer>();
  9.     /**
  10.      * 添加数据到缓冲区
  11.      * @param value
  12.      */
  13.     public void add(Integer value) {
  14.         boolean flag = false;
  15.         try {
  16.             flag = lock.tryLock(3, TimeUnit.SECONDS);
  17.             if(list.size() >= capacity){
  18.                 System.out.println("生产者:"+ Thread.currentThread().getName()+",缓冲区已满,生产者进入waiting...");
  19.                 // 进入等待状态
  20.                 condition.await();
  21.             }
  22.             System.out.println("生产者:"+ Thread.currentThread().getName()+",add:" + value);
  23.             list.add(value);
  24.             //唤醒其他所有处于wait()的线程,包括消费者和生产者
  25.             condition.signalAll();
  26.         } catch (Exception e) {
  27.             e.printStackTrace();
  28.         } finally {
  29.             if(flag){
  30.                 lock.unlock();
  31.             }
  32.         }
  33.     }
  34.     /**
  35.      * 从缓冲区获取数据
  36.      */
  37.     public void get() {
  38.         boolean flag = false;
  39.         try {
  40.             flag = lock.tryLock(3, TimeUnit.SECONDS);
  41.             if(list.size() == 0){
  42.                 System.out.println("消费者:"+ Thread.currentThread().getName()+",缓冲区为空,消费者进入waiting...");
  43.                 // 进入等待状态
  44.                 condition.await();
  45.             }
  46.             // 从头部获取数据,并移除元素
  47.             Integer val = list.removeFirst();
  48.             System.out.println("消费者:"+ Thread.currentThread().getName()+",value:" + val);
  49.             //唤醒其他所有处于wait()的线程,包括消费者和生产者
  50.             condition.signalAll();
  51.         } catch (Exception e) {
  52.             e.printStackTrace();
  53.         } finally {
  54.             if(flag){
  55.                 lock.unlock();
  56.             }
  57.         }
  58.     }
  59. }
复制代码
生产者、消费者、测试类代码,跟上文同等,运行结果和上文介绍的也是一样。
2.3、多生产者和消费者的实现思路

上面介绍的都是一个生产者线程和一个消费者线程,模子比力简单。实际上,在业务开辟中,常常会出现多个生产者线程和多个消费者线程,按照以上的实现思路,会出现什么环境呢?
有大概会出现程序假死现象!下面我们来分析一下案例,如果有两个生产者线程 a1、a2,两个消费者线程 b1、b2,执行过程如下:

  • 1.生产者线程 a1 执行生产数据的操作,发现缓冲区数据已经填满了,然后进入等待阶段,同时向外发起通知,唤醒其它线程
  • 2.因为线程唤醒具有随机性,本应该唤醒消费者线程 b1,结果大概生产者线程 a2 被唤醒,查抄缓冲区数据已经填满了,又进入等待阶段,紧接向外发起通知,消费者线程得不到被执行的机会
  • 3.消费者线程 b1、b2,也有大概会出现这个现象,本应该唤醒生产者线程,结果唤醒了消费者线程
碰到这种环境,应该怎样解决呢?
因为ReentrantLock和Condition的结合,编程具有高度灵活性,我们可以采用这种组合解决多生产者和多消费者中的假死问题。
具体实现逻辑如下:
  1. /**
  2. * 缓冲区容器类
  3. */
  4. public class ContainerDemo {
  5.     private Lock lock = new ReentrantLock();
  6.     private Condition producerCondition = lock.newCondition();
  7.     private Condition consumerCondition = lock.newCondition();
  8.     private int capacity = 3;
  9.     private LinkedList<Integer> list = new LinkedList<Integer>();
  10.     /**
  11.      * 添加数据到缓冲区
  12.      * @param value
  13.      */
  14.     public void add(Integer value) {
  15.         boolean flag = false;
  16.         try {
  17.             flag = lock.tryLock(3, TimeUnit.SECONDS);
  18.             if(list.size() >= capacity){
  19.                 System.out.println("生产者:"+ Thread.currentThread().getName()+",缓冲区已满,生产者进入waiting...");
  20.                 // 生产者进入等待状态
  21.                 producerCondition.await();
  22.             }
  23.             System.out.println("生产者:"+ Thread.currentThread().getName()+",add:" + value);
  24.             list.add(value);
  25.             // 唤醒所有消费者处于wait()的线程
  26.             consumerCondition.signalAll();
  27.         } catch (Exception e) {
  28.             e.printStackTrace();
  29.         } finally {
  30.             if(flag){
  31.                 lock.unlock();
  32.             }
  33.         }
  34.     }
  35.     /**
  36.      * 从缓冲区获取数据
  37.      */
  38.     public void get() {
  39.         boolean flag = false;
  40.         try {
  41.             flag = lock.tryLock(3, TimeUnit.SECONDS);
  42.             if(list.size() == 0){
  43.                 System.out.println("消费者:"+ Thread.currentThread().getName()+",缓冲区为空,消费者进入waiting...");
  44.                 // 消费者进入等待状态
  45.                 consumerCondition.await();
  46.             }
  47.             // 从头部获取数据,并移除元素
  48.             Integer val = list.removeFirst();
  49.             System.out.println("消费者:"+ Thread.currentThread().getName()+",value:" + val);
  50.             // 唤醒所有生产者处于wait()的线程
  51.             producerCondition.signalAll();
  52.         } catch (Exception e) {
  53.             e.printStackTrace();
  54.         } finally {
  55.             if(flag){
  56.                 lock.unlock();
  57.             }
  58.         }
  59.     }
  60. }
复制代码
  1. /**
  2. * 生产者
  3. */
  4. public class Producer extends Thread{
  5.     private ContainerDemo container;
  6.     private Integer value;
  7.     public Producer(ContainerDemo container, Integer value) {
  8.         this.container = container;
  9.         this.value = value;
  10.     }
  11.     @Override
  12.     public void run() {
  13.         container.add(value);
  14.     }
  15. }
复制代码
  1. /**
  2. * 消费者
  3. */
  4. public class Consumer extends Thread{
  5.     private ContainerDemo container;
  6.     public Consumer(ContainerDemo container) {
  7.         this.container = container;
  8.     }
  9.     @Override
  10.     public void run() {
  11.         container.get();
  12.     }
  13. }
复制代码
  1. /**
  2. * 测试类
  3. */
  4. public class MyThreadTest {
  5.     public static void main(String[] args) {
  6.         ContainerDemo container = new ContainerDemo();
  7.         List<Thread> threadList = new ArrayList<>();
  8.         // 初始化6个生产者线程
  9.         for (int i = 0; i < 6; i++) {
  10.             threadList.add(new Producer(container, i));
  11.         }
  12.         // 初始化6个消费者线程
  13.         for (int i = 0; i < 6; i++) {
  14.             threadList.add(new Consumer(container));
  15.         }
  16.         // 启动线程
  17.         for (Thread thread : threadList) {
  18.             thread.start();
  19.         }
  20.     }
  21. }
复制代码
运行结果如下:
  1. 生产者:Thread-0,add:0
  2. 生产者:Thread-1,add:1
  3. 生产者:Thread-2,add:2
  4. 生产者:Thread-3,缓冲区已满,生产者进入waiting...
  5. 生产者:Thread-4,缓冲区已满,生产者进入waiting...
  6. 生产者:Thread-5,缓冲区已满,生产者进入waiting...
  7. 消费者:Thread-6,value:0
  8. 消费者:Thread-7,value:1
  9. 生产者:Thread-3,add:3
  10. 生产者:Thread-4,add:4
  11. 生产者:Thread-5,add:5
  12. 消费者:Thread-8,value:2
  13. 消费者:Thread-9,value:3
  14. 消费者:Thread-10,value:4
  15. 消费者:Thread-11,value:5
复制代码
通过ReentrantLock定义两个Condition,一个表示生产者的Condition,一个表示消费者的Condition,唤醒的时候调用对应的signalAll()方法就可以解决假死现象。
三、小结

最后我们来总结一下,对于生产者和消费者模子,通过合理的编程实现,可以充分充分发挥 cpu 多线程的特性,显著的提升系统处理数据的效率。
对于生产者和消费者模子中的假死现象,可以利用ReentrantLock定义两个Condition,进行交错唤醒,以解决假死问题。
四、参考

1、https://www.cnblogs.com/xrq730/p/4855663.html

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

慢吞云雾缓吐愁

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表