口试官:谈谈RabbitMQ的队头壅闭题目?

打印 上一主题 下一主题

主题 669|帖子 669|积分 2007

RabbitMQ 耽误消息的队头壅闭题目是指,在利用死信队列(DLX)和 TTL(消息过期时间)实现耽误消息时,由于队列的先进先出(FIFO)特性,在队列头部消息未过期的环境下,即使后续消息已经过期也不能及时处理惩罚的环境

实现原理


RabbitMQ 耽误消息的实现方式有以下两种:


  • 死信队列+TTL
  • 利用 rabbitmq-delayed-message-exchange 插件

而我们本文要讨论的“RabbitMQ 耽误消息的队头壅闭题目”只会发生在死信队列+TTL 的实现方式中。
死信队列 + TTL 的实现流程如下:





  • 生产者先将设置了 TTL(过期时间)的消息发送到普通队列。
  • 普通队列没有消息者,所以肯定会过期,消息过期之后就会发送到死信队列。
  • 消费者订阅死信队列获取消息,并执行耽误任务。

代码实现


死信队列 + TTL 在 Spring Boot 项目中的实现代码如下。

1、定义死信交换器(DLX)和死信队列

  1. // Spring Boot 配置示例
  2. @Configuration
  3. public class RabbitConfig {
  4.     // 定义死信交换器
  5.     @Bean
  6.     public DirectExchange dlxExchange() {
  7.         return new DirectExchange("dlx.exchange");
  8.     }
  9.     // 定义死信队列
  10.     @Bean
  11.     public Queue dlxQueue() {
  12.         return new Queue("dlx.queue");
  13.     }
  14.     // 绑定死信队列到 DLX
  15.     @Bean
  16.     public Binding dlxBinding() {
  17.         return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("dlx.routing.key");
  18.     }
  19.     // 定义普通队列,设置死信交换器和路由键
  20.     @Bean
  21.     public Queue mainQueue() {
  22.         Map<String, Object> args = new HashMap<>();
  23.         args.put("x-dead-letter-exchange", "dlx.exchange");
  24.         args.put("x-dead-letter-routing-key", "dlx.routing.key");
  25.         // 可选:设置队列级别的 TTL(所有消息统一过期时间)
  26.         args.put("x-message-ttl", 10000); // 10秒
  27.         return new Queue("main.queue", true, false, false, args);
  28.     }
  29.     // 主队列绑定到默认交换器(根据需要调整)
  30.     @Bean
  31.     public Binding mainBinding() {
  32.         return BindingBuilder.bind(mainQueue()).to(new DirectExchange("default.exchange")).with("main.routing.key");
  33.     }
  34. }
复制代码

2、发送消息时设置 TTL(消息级别)

  1. // 发送延迟消息(消息级别 TTL)
  2. public void sendDelayedMessage(String message, int delayMs) {
  3.     rabbitTemplate.convertAndSend("default.exchange", "main.routing.key", message, msg -> {
  4.         // 设置消息过期时间(覆盖队列级别的 TTL)
  5.         msg.getMessageProperties().setExpiration(String.valueOf(delayMs));
  6.         return msg;
  7.     });
  8. }
复制代码

3、消费者监听死信队列

  1. @RabbitListener(queues = "dlx.queue")
  2. public void handleDelayedMessage(String message) {
  3. System.out.println("处理延迟消息: " + message);
  4. }
复制代码

所以说消息的过期时间 TTL 的设置方式有以下两种:

1、队列级别:通过设置队列的 x-message-ttl 参数,设置队列同一的过期时间。

  1. Map<String, Object> args = new HashMap<>();
  2. args.put("x-message-ttl", 60000); // 设置队列消息过期时间为 60 秒
  3. channel.queueDeclare(queueName, true, false, false, args);
复制代码

2、消息级别:通过给每个消息设置 expiration 属性,为每个消息设置过期时间。

  1. AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
  2.         .deliveryMode(2) // 消息持久化
  3.         .expiration("60000") // 设置消息过期时间为 60 秒
  4.         .build();
  5. channel.basicPublish(exchangeName, routingKey, properties, message.getBytes());
复制代码

如果同时设置了消息级 TTL 和队列级 TTL,消息的现实过期时间会取两者中的最小值。

造成队头壅闭的缘故原由


造成队头壅闭的缘故原由有以下两个:


  • 先进先出的队列特性:队列中的消息必须按顺序处理惩罚,即使反面的消息 TTL 较短且已过期,也必须等候队头的消息先被处理惩罚(或过期)。
  • TTL 查抄机制:RabbitMQ 默认仅在处理惩罚队头消息时查抄其 TTL,如果队头消息的 TTL 较长(例如 10 分钟),即使后续消息的 TTL 更短(例如 1 分钟),这些消息也会被壅闭,直到队头消息过期或被移除。

如下图所示:




办理方案



  • 为不同耽误时间创建独立队列:将相同 TTL 的消息放入同一队列,制止消息的过期时间不一致。
  • 利用耽误插件:利用 RabbitMQ 的耽误插件 rabbitmq_delayed_message_exchange,直接通过耽误交换机实现耽误消息,绕过死信队列的 FIFO 限制。耽误插件是通过将消息存储到内置数据库 Mnesia,再通过不断判断过期消息,实现耽误消息的投递和执行的,因此它不存在队列的先进先出和队头壅闭的题目。

小结


队头壅闭题目是发生在利用死信队列加 TTL 实现 RabbitMQ 耽误消息的场景中,造成的缘故原由是队列先进先出的特性,加上耽误消息的查抄机制导致的,我们可以利用 RabbitMQ 的耽误插件来制止此题目。

那么题目来了,利用耽误插件怎样实现耽误任务?它和死信队列的实现方式有哪些详细的区别呢?

   文章转载自:磊哥|www.javacn.site
  原文链接:口试官:谈谈RabbitMQ的队头壅闭题目? - 磊哥|www.javacn.site - 博客园
  体验地址:引迈 - JNPF快速开发平台_低代码开发平台_零代码开发平台_流程设计器_表单引擎_工作流引擎_软件架构

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

梦应逍遥

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表