手写生产者斲丧者模型

打印 上一主题 下一主题

主题 919|帖子 919|积分 2757

前言

生产者-斲丧者模式是一个非常经典的多线程并发协作模式,弄懂生产者-斲丧者问题能够让我们对并发编程的明确加深。
所谓的生产者-斲丧者,实际上包罗了两类线程,一种是生产者线程用于生产数据,另一种是斲丧者线程用于斲丧数据,为了解耦生产者和斲丧者的关系,通常会接纳共享的数据区域,就像是一个仓库,生产者生产数据之后直接放置在共享数据区中,并不需要关心斲丧者的举动;而斲丧者只需要从共享数据区中获取数据,不需要关心生产者的举动。
这个共享数据区域中应该具备这样的线程间并发协作功能:

  • 如果共享数据区已满的话,阻塞生产者继续生产数据;
  • 如果共享数据区为空的话,阻塞斲丧者继续斲丧数据;
在实现生产者斲丧者问题时,可以接纳三种方式:
BlockingQueue 实现生产者-斲丧者

BlockingQueue 提供了可阻塞的插入和移除的方法。当队列容器已满,生产者线程会被阻塞,直到队列未满;当队列容器为空时,斲丧者线程会被阻塞,直至队列非空时为止。
有了这个队列,生产者就只需要关注生产,而不用管斲丧者的斲丧举动,更不用期待斲丧者线程实行完;斲丧者也只管斲丧,不用管生产者是怎么生产的,更不用等着生产者生产。
  1. public class ProductorConsumer {
  2.     private static LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
  3.     public static void main(String[] args) {
  4.         ExecutorService service = Executors.newFixedThreadPool(15);
  5.         for (int i = 0; i < 5; i++) {
  6.             service.submit(new Productor(queue));
  7.         }
  8.         for (int i = 0; i < 10; i++) {
  9.             service.submit(new Consumer(queue));
  10.         }
  11.     }
  12.     static class Productor implements Runnable {
  13.         private BlockingQueue queue;
  14.         public Productor(BlockingQueue queue) {
  15.             this.queue = queue;
  16.         }
  17.         @Override
  18.         public void run() {
  19.             try {
  20.                 while (true) {
  21.                     Random random = new Random();
  22.                     int i = random.nextInt();
  23.                     System.out.println("生产者" + Thread.currentThread().getName() + "生产数据" + i);
  24.                     queue.put(i);
  25.                 }
  26.             } catch (InterruptedException e) {
  27.                 e.printStackTrace();
  28.             }
  29.         }
  30.     }
  31.     static class Consumer implements Runnable {
  32.         private BlockingQueue queue;
  33.         public Consumer(BlockingQueue queue) {
  34.             this.queue = queue;
  35.         }
  36.         @Override
  37.         public void run() {
  38.             try {
  39.                 while (true) {
  40.                     Integer element = (Integer) queue.take();
  41.                     System.out.println("消费者" + Thread.currentThread().getName() + "正在消费数据" + element);
  42.                 }
  43.             } catch (InterruptedException e) {
  44.                 e.printStackTrace();
  45.             }
  46.         }
  47.     }
  48. }
复制代码
synchronized 实现生产者-斲丧者

这着实也是手动实现阻塞队列的方式
  1. import java.util.LinkedList;
  2. import java.util.Queue;
  3. import java.util.concurrent.CyclicBarrier;
  4. public class MyBlockingQueue {
  5.     //队列
  6.     private final Queue<String> myQueue = new LinkedList<>();
  7.     //最大长度
  8.     private static final int MAXSIZE = 20;
  9.     private static final int MINSIZE = 0;
  10.     //获取队列长度
  11.     public int getSize() {
  12.         return myQueue.size();
  13.     }
  14.     //生产者
  15.     public void push(String str) throws Exception {
  16.         //拿到对象锁
  17.         synchronized (myQueue) {
  18.             //如果队列满了,则阻塞
  19.             while (getSize() == MAXSIZE) {
  20.                 myQueue.wait();
  21.             }
  22.             myQueue.offer(str);
  23.             System.out.println(Thread.currentThread().getName() + "放入元素" + str);
  24.             //唤醒消费者线程,消费者和生产者自己去竞争锁
  25.             myQueue.notify();
  26.         }
  27.     }
  28.     //消费者
  29.     public String pop() throws Exception {
  30.         synchronized (myQueue) {
  31.             String result = null;
  32.             //队列为空则阻塞
  33.             while (getSize() == MINSIZE) {
  34.                 myQueue.wait();
  35.             }
  36.             //先进先出
  37.             result = myQueue.poll();
  38.             System.out.println(Thread.currentThread().getName() + "取出了元素" + result);
  39.             //唤醒生产者线程,消费者和生产者自己去竞争锁
  40.             myQueue.notify();
  41.             return result;
  42.         }
  43.     }
  44.     public static void main(String args[]) {
  45.         MyBlockingQueue myBlockingQueue = new MyBlockingQueue();
  46.         //两个线程,都执行完成了打印
  47.         CyclicBarrier barrier = new CyclicBarrier(2, () -> {
  48.             System.out.println("生产结束,下班了,消费者明天再来吧!");
  49.         });
  50.         //生产者线程
  51.         new Thread(() -> {
  52.             //50个辛勤的生产者循环向队列中添加元素
  53.             try {
  54.                 for (int i = 0; i < 50; i++) {
  55.                     myBlockingQueue.push("——" + i);
  56.                 }
  57.                 //生产完了
  58.                 barrier.await();
  59.             } catch (Exception e) {
  60.                 e.printStackTrace();
  61.             }
  62.         }, "生产者").start();
  63.         //消费者线程
  64.         new Thread(() -> {
  65.             //50个白拿的消费者疯狂向队列中获取元素
  66.             try {
  67.                 for (int j = 0; j < 50; j++) {
  68.                     myBlockingQueue.pop();
  69.                 }
  70.                 //消费完了
  71.                 barrier.await();
  72.             } catch (Exception e) {
  73.                 e.printStackTrace();
  74.             }
  75.         }, "消费者").start();
  76.     }
  77. }
复制代码
Condition 实现生产者-斲丧者
  1. public class BoundedQueue {
  2.     /**
  3.      * 生产者容器
  4.      */
  5.     private LinkedList<Object> buffer;
  6.     /**
  7.      * //容器最大值是多少
  8.      */
  9.     private int maxSize;
  10.    
  11.     private Lock lock;
  12.    
  13.     /**
  14.      * 满了
  15.      */
  16.     private Condition fullCondition;
  17.    
  18.     /**
  19.      * 不满
  20.      */
  21.     private Condition notFullCondition;
  22.     BoundedQueue(int maxSize) {
  23.         this.maxSize = maxSize;
  24.         buffer = new LinkedList<Object>();
  25.         lock = new ReentrantLock();
  26.         fullCondition = lock.newCondition();
  27.         notFullCondition = lock.newCondition();
  28.     }
  29.     /**
  30.      * 生产者
  31.      *
  32.      * @param obj
  33.      * @throws InterruptedException
  34.      */
  35.     public void put(Object obj) throws InterruptedException {
  36.         //获取锁
  37.         lock.lock();
  38.         try {
  39.             while (maxSize == buffer.size()) {
  40.                 //满了,添加的线程进入等待状态
  41.                 notFullCondition.await();
  42.             }
  43.             buffer.add(obj);
  44.             //通知
  45.             fullCondition.signal();
  46.         } finally {
  47.             lock.unlock();
  48.         }
  49.     }
  50.     /**
  51.      * 消费者
  52.      *
  53.      * @return
  54.      * @throws InterruptedException
  55.      */
  56.     public Object get() throws InterruptedException {
  57.         Object obj;
  58.         lock.lock();
  59.         try {
  60.             while (buffer.size() == 0) {
  61.                 //队列中没有数据了 线程进入等待状态
  62.                 fullCondition.await();
  63.             }
  64.             obj = buffer.poll();
  65.             //通知
  66.             notFullCondition.signal();
  67.         } finally {
  68.             lock.unlock();
  69.         }
  70.         return obj;
  71.     }
  72. }
复制代码
生产者-斲丧者模式的应用场景

生产者-斲丧者模式一般用于将生产数据的一方和斲丧数据的一方分割开来,将生产数据与斲丧数据的过程解耦开来。
Excutor 使命实行框架

通过将使命的提交和使命的实行解耦开来,提交使命的操作相当于生产者,实行使命的操作相当于斲丧者。
例如使用 Excutor 构建 Web 服务器,用于处理线程的哀求:生产者将使命提交给线程池,线程池创建线程处理使命,如果需要运行的使命数大于线程池的基本线程数,那么就把使命扔到阻塞队列(通过线程池+阻塞队列的方式比只使用一个阻塞队列的服从高很多,由于斲丧者能够处理就直接处理掉了,不用每个斲丧者都要先从阻塞队列中取出使命再实行)
消息中心件 MQ

双十一的时间,会产生大量的订单,那么不可能同时处理那么多的订单,需要将订单放入一个队列里面,然后由专门的线程处理订单。
这里用户下单就是生产者,处理订单的线程就是斲丧者;再比如 12306 的抢票功能,先由一个容器存储用户提交的订单,然后再由专门处理订单的线程慢慢处理,这样可以在短时间内支持高并发服务。
使命的处理时间比较长的情况下

比如上传附件并处理,那么这个时间可以将用户上传和处理附件分成两个过程,用一个队列临时存储用户上传的附件,然后立刻返回用户上传成功,然后有专门的线程处理队列中的附件。
生产者-斲丧者模式的优点:

  • 解耦:将生产者类和斲丧者类进行解耦,消除代码之间的依靠性,简化工作负载的管理
  • 复用:通过将生产者类和斲丧者类独立开来,对生产者类和斲丧者类进行独立的复用与扩展
  • 调整并发数:由于生产者和斲丧者的处理速率是不一样的,可以调整并发数,给予慢的一方多的并发数,来进步使命的处理速率
  • 异步:对于生产者和斲丧者来说能够各司其职,生产者只需要关心缓冲区是否还有数据,不需要期待斲丧者处理完;对于斲丧者来说,也只需要关注缓冲区的内容,不需要关注生产者,通过异步的方式支持高并发,将一个耗时的流程拆成生产和斲丧两个阶段,这样生产者由于实行 put 的时间比较短,可以支持高并发
  • 支持分布式:生产者和斲丧者通过队列进行通讯,所以不需要运行在同一台机器上,在分布式情况中可以通过 redis 的 list 作为队列,而斲丧者只需要轮询队列中是否有数据。同时还能支持集群的伸缩性,当某台机器宕掉的时间,不会导致整个集群宕掉
面试题专栏

面试题专栏已上线,接待访问。

  • 如果你不知道简历怎么写,简历项目不知道怎么包装;
  • 如果简历中有些内容你不知道该不该写上去;
  • 如果有些综合性问题你不知道怎么答;
那么可以私信我,我会尽我所能帮助你。
关于作者

来自一线程序员Seven的探索与实践,连续学习迭代中~
本文已收录于我的个人博客:https://www.seven97.top
公众号:seven97,接待关注~

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

道家人

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表