深入理解和应用RabbitMQ的Work Queues模型

打印 上一主题 下一主题

主题 227|帖子 227|积分 681

当你在处理消息时,大概会碰到如许的问题:消息的生产速度远宏大于消费速度,导致消息堆积。这时候,Work Queues(工作队列)模型就能派上用场。简单来说,Work Queues 让多个消费者绑定到一个队列,共同消费队列中的消息,从而加快消息处理速度。
1. 场景模拟

我们来模拟一个如许的场景。首先,在控制台创建一个名为 work.queue 的队列。
2. 消息发送

我们通过循环发送大量消息来模拟消息堆积的征象。在 publisher 服务中的 SpringAmqpTest 类中添加一个测试方法:
  1. @Test
  2. public void testWorkQueue() throws InterruptedException {
  3.     // 队列名称
  4.     String queueName = "simple.queue";
  5.     // 消息
  6.     String message = "hello, message_";
  7.     for (int i = 0; i < 50; i++) {
  8.         // 发送消息,每20毫秒发送一次,相当于每秒发送50条消息
  9.         rabbitTemplate.convertAndSend(queueName, message + i);
  10.         Thread.sleep(20);
  11.     }
  12. }
复制代码
3. 消息接收

为了模拟多个消费者绑定同一个队列,我们在 consumer 服务的 SpringRabbitListener 中添加两个新的方法:
  1. @RabbitListener(queues = "work.queue")
  2. public void listenWorkQueue1(String msg) throws InterruptedException {
  3.     System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());
  4.     Thread.sleep(20);
  5. }
  6. @RabbitListener(queues = "work.queue")
  7. public void listenWorkQueue2(String msg) throws InterruptedException {
  8.     System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());
  9.     Thread.sleep(200);
  10. }
复制代码
留意到这两个消费者都设置了 Thread.sleep 来模拟任务耗时:


  • 消费者1:Thread.sleep(20),相当于每秒处理50个消息。
  • 消费者2:Thread.sleep(200),相当于每秒处理5个消息。
4. 测试

启动 ConsumerApplication 后,实行 publisher 服务中编写的发送测试方法 testWorkQueue。结果如下:
  1. 消费者1接收到消息:【hello, message_0】21:06:00.869555300
  2. 消费者2........接收到消息:【hello, message_1】21:06:00.884518
  3. ...
  4. 消费者1接收到消息:【hello, message_48】21:06:01.920702500
  5. 消费者2........接收到消息:【hello, message_49】21:06:05.723106700
复制代码
可以看到,消费者1和消费者2各自消费了25条消息:


  • 消费者1快速完成了任务。
  • 消费者2则缓慢处理任务。
消息是平均分配给每个消费者的,并没有思量到各个消费者的处理能力,导致一个消费者空闲,另一个繁忙。这显然是低效的。
5. 能者多劳

在 spring 中,可以通过简单配置办理这个问题。修改 consumer 服务的 application.yml 文件,添加如下配置:
  1. spring:
  2.   rabbitmq:
  3.     listener:
  4.       simple:
  5.         prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
复制代码
再次测试,结果如下:
  1. 消费者1接收到消息:【hello, message_0】21:12:51.659664200
  2. 消费者2........接收到消息:【hello, message_1】21:12:51.680610
  3. ...
  4. 消费者2........接收到消息:【hello, message_49】21:12:52.746299900
复制代码
这次,消费者1处理了更多的消息,消费者2则处理了较少的消息,总耗时在1秒左右,大大提升了效率。这充实使用了每一个消费者的处理能力,有效避免了消息积压问题。
6. 总结

Work Queues 模型的使用要点:


  • 多个消费者绑定到一个队列,同一条消息只会被一个消费者处理。
  • 通过设置 prefetch 来控制消费者预取的消息数量。
如允许以更高效地使用资源,提高消息处理速度。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

李优秀

高级会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表