【RabbitMQ】当队列中消息数量超过最大长度的淘汰策略 ...

宝塔山  金牌会员 | 2023-7-18 03:13:54 | 来自手机 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 891|帖子 891|积分 2673

【RabbitMQ】当队列中消息数量超过最大长度的淘汰策略

说明

最近在研究RabbitMQ如何实现延时队列时发现消息进入死信队列的情况之一就是当消息数量超过队列设置的最大长度时会被丢入死信队列,看到这时我就产生了一个疑问,到底是最后插入的消息还是最早插入的消息会被丢入死信队列呢?遗憾的是看了几篇博客都是模棱两可的答案,还有的说超过数量后该消息会被放入死信队列,看完之后还是对这个问题将信将疑。所以我决定去探究一下正确答案
答案

RabbitMQ官方文档
遇事不决肯定是先看官方文档最靠谱啦,在官网中扒拉了半天终于找到说明这个问题的页面了,就是上面引用的链接,重点如下:

翻译过来就是:在RabbitMQ中,当消息的数量或大小达到限制后,默认的操作是删除最旧的消息或将最旧的消息放入死信队列,这取决于该队列是否配置了死信队列。 我们可以通过使用overflow配置指定的处理策略,如果overflow被设置为reject-publish或reject-publish-dlx,那么会将最新插入的消息丢弃。如果该队列开启了confirm机制,发布者会收到nack的信息,如果一个消息被路由到多个队列,只要其中一个队列拒绝发布者就会收到nack消息,但是没被拒绝的队列可以正确接收到消息。reject-publish和reject-publish-dlx的区别是后者还会将拒绝的消息放入死信队列。
验证

下面我们使用demo来验证一下各个策略的现象
默认策略(drop-head)


  • application.yml配置如下:
  1.   rabbitmq:
  2.     host: 127.0.0.1
  3.     port: 5672
  4.     username: username
  5.     password: password
  6.     virtual-host: /test
  7.     publisher-confirm-type: correlated # 配置启用confirm机制
复制代码

  • 使用RabbitMQConfig创建业务队列和对应死信队列
  1. import org.springframework.amqp.core.*;
  2. import org.springframework.beans.factory.annotation.Qualifier;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. import java.util.HashMap;
  6. @Configuration
  7. public class RabbitMQConfig {
  8.     public static final String DELAY_EXCHANGE_NAME = "delay.business.exchange";
  9.     public static final String DELAY_QUEUE_NAME = "delay.business.queue";
  10.     public static final String DELAY_QUEUE_ROUTING_KEY = "delay.business.queue.routingKey";
  11.     public static final String DEAD_LETTER_EXCHANGE_NAME = "dead.letter.exchange";
  12.     public static final String DEAD_LETTER_QUEUE_NAME = "dead.letter.queue";
  13.     public static final String DEAD_LETTER_QUEUE_ROUTING_KEY = "dead.letter.queue.routingKey";
  14.    
  15.     // 声明延迟队列交换机
  16.     @Bean("delayExchange")
  17.     public DirectExchange delayExchange(){
  18.         return new DirectExchange(DELAY_EXCHANGE_NAME);
  19.     }
  20.    
  21.     // 声明死信队列交换机
  22.     @Bean("deadLetterExchange")
  23.     public DirectExchange deadLetterExchange(){
  24.         return new DirectExchange(DEAD_LETTER_EXCHANGE_NAME);
  25.     }
  26.    
  27.     // 声明延时队列
  28.     @Bean("delayQueue")
  29.     public Queue delayQueue(){
  30.         HashMap<String, Object> map = new HashMap<>();
  31.         // x-dead-letter-exchange    这里声明当前队列绑定的死信交换机
  32.         map.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME);
  33.         // x-dead-letter-routing-key  这里声明当前队列的死信路由key
  34.         map.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_ROUTING_KEY);
  35.         // 设置该队列最大消息数
  36.         map.put("x-max-length", 10);
  37.         map.put("x-overflow", "reject-publish");
  38.         return QueueBuilder.durable(DELAY_QUEUE_NAME).withArguments(map).build();
  39.     }
  40.    
  41.     // 声明死信队列
  42.     @Bean("deadLetterQueue")
  43.     public Queue deadLetterQueue(){
  44.         return new Queue(DEAD_LETTER_QUEUE_NAME);
  45.     }
  46.    
  47.     // 声明延时队列的绑定关系
  48.     @Bean
  49.     public Binding delayBinding(@Qualifier("delayExchange") DirectExchange directExchange,
  50.                                 @Qualifier("delayQueue") Queue queue){
  51.         return BindingBuilder.bind(queue).to(directExchange).with(DELAY_QUEUE_ROUTING_KEY);
  52.     }
  53.    
  54.     // 声明死信队列的绑定关系
  55.     @Bean
  56.     public Binding deadLetterBinding(@Qualifier("deadLetterExchange") DirectExchange directExchange,
  57.                                      @Qualifier("deadLetterQueue") Queue queue){
  58.         return BindingBuilder.bind(queue).to(directExchange).with(DEAD_LETTER_QUEUE_ROUTING_KEY);
  59.     }
  60. }
复制代码
注意,这里我们并没有设置overflow参数,所以采用的是默认配置
3. 创建消费者监听死信队列
  1. import com.rabbitmq.client.Channel;
  2. import com.whs.edws.config.RabbitMQConfig;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.springframework.amqp.core.Message;
  5. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  6. import org.springframework.stereotype.Component;
  7. import java.io.IOException;
  8. @Slf4j
  9. @Component
  10. public class MaxLengthConsumer {
  11.     @RabbitListener(queues = RabbitMQConfig.DEAD_LETTER_QUEUE_NAME)
  12.     public void receive(Message message, Channel channel) throws IOException {
  13.         String s = new String(message.getBody());
  14.         log.info("死信队列消费者接收到消息:" + s);
  15.         channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  16.     }
  17. }
复制代码

  • 创建测试方法发送消息
  1. @Test
  2.     void maxLengthTestPublish(){
  3.         rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
  4.             /**
  5.              * @param correlationData 相关配置信息
  6.              * @param ack             消息队列是否成功收到消息
  7.              * @param cause           错误原因
  8.              */
  9.             @Override
  10.             public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  11.                 if (ack) {
  12.                     logger.info("消息发送成功:" + correlationData.getId());
  13.                 } else {
  14.                     logger.error("消息发送失败:" + correlationData.getId());
  15.                     logger.error("错误原因:" + cause);
  16.                 }
  17.             }
  18.         });
  19.         for (int i = 0; i < 11; i++) {
  20.             CorrelationData correlationData = new CorrelationData();
  21.             correlationData.setId(String.valueOf(i));
  22.             rabbitTemplate.convertAndSend(RabbitMQConfig.DELAY_EXCHANGE_NAME, RabbitMQConfig.DELAY_QUEUE_ROUTING_KEY, String.valueOf(i), correlationData);
  23.         }
  24.     }
复制代码

  • 运行结果
    看上面的代码可知,我们设置了队列大小为10,但是我们向队列发送了11条消息,最后日志打印如下:
    2023-07-18 02:37:52.941  INFO 24308 --- [ntContainer#1-1] com.edws.rabbitmq.MaxLengthConsumer  : 死信队列消费者接收到消息:0
    和官方文档说的一样,默认最旧的一条消息被放入死信队列
reject-publish策略

reject-publish策略的验证代码只需在默认策略的基础上加上配置即可,我们在定义队列的时候加上配置
  1. // 指定超过队列长度后的策略
  2. map.put("x-overflow", "reject-publish");
复制代码
执行方法,打印的结果为:
  1. 2023-07-18 02:45:07.242  INFO 22328 --- [nectionFactory2] o.s.amqp.rabbit.core.RabbitTemplate      : 消息发送失败:10
  2. 2023-07-18 02:45:07.242  INFO 22328 --- [nectionFactory2] o.s.amqp.rabbit.core.RabbitTemplate      : 错误原因:null
复制代码
通过日志可以看到,最新插入的消息被丢弃了。至于cause为什么是null,我没找到原因,如果了解的朋友可以在评论里讨论一下
reject-publish-dlx策略

reject-publish-dlx策略的代码也是只需要在默认代码中加一行配置即可
  1. // 指定超过队列长度后的策略
  2. map.put("x-overflow", "reject-publish-dlx");
复制代码
打印结果如下:
  1. 2023-07-18 02:49:13.246  INFO 10488 --- [nectionFactory2] o.s.amqp.rabbit.core.RabbitTemplate      : 消息发送失败:10
  2. 2023-07-18 02:49:13.246  INFO 10488 --- [nectionFactory2] o.s.amqp.rabbit.core.RabbitTemplate      : 错误原因:null
  3. 2023-07-18 02:49:13.252  INFO 10488 --- [ntContainer#1-1] com.whs.edws.rabbitmq.MaxLengthConsumer  : 死信队列消费者接收到消息:10
复制代码
通过日志可以看出,最新的一条消息被拒绝且被放入死信队列

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

宝塔山

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表