RabbitMQ消息重复消耗

打印 上一主题 下一主题

主题 538|帖子 538|积分 1614

RabbitMQ消息重复消耗问题

同一条消息被一个消耗者消耗多次大概被多个消耗者消耗。大概导致系统相关业务重复实验和数据不同等问题。
1.场景模拟

生产者
  1. public String sendMessage() {
  2.         for (int i = 1; i <= 100; i++) {
  3.             //生成消息id
  4.             String messageId = UUID.randomUUID().toString();
  5.             //消息内容
  6.             String message = "rabbitMQ测试消息!" + i;
  7.             //发送消息
  8.             rabbitTemplate.convertAndSend(DIRECT_EXCHANGE_NAME, DIRECT_ROUTE_KEY,message,msg -> {
  9.                 msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);//持久化
  10.                 msg.getMessageProperties().setMessageId(messageId);//设置消息id
  11.                 return msg;
  12.             });
  13.             System.out.println("已发送消息:id=" + messageId + " message="+ message);
  14.         }
  15.         return "消息发送成功!";
  16.     }
复制代码
消耗者
  1.            @RabbitListener(queues = "directQueue")
  2.     public void spendMessage(String msg, Channel channel, Message message) throws IOException {
  3.         String messageId = message.getMessageProperties().getMessageId();
  4.         System.out.println("接收到消息:id=" + messageId+ " message=" + msg);
  5.         try {
  6.             //模拟消费耗时
  7.             Thread.sleep(100);
  8.         } catch (InterruptedException e) {
  9.             e.printStackTrace();
  10.         }
  11.         //channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
  12.     }
复制代码
测试步骤

生产者发送100条消息

启动消耗者

由于最后一条消息没有调用basicAck方法,消息并没有消耗成功,当我们重启消耗者服务时,消息会被再次消耗。
重启消耗者

2.解决方案

由于每条消息都有自己的id(唯一标识),可根据这个id来判断消息是否被消耗过。
消耗消息前先获取消息id—>查询缓存是否存在此id,判断id对应的值—>为1则体现该消息被消耗过,为0则体现消耗中
以下示例使用redis作为介质
消耗者
  1.     @RabbitListener(queues = "directQueue")
  2.     public void spendMessage(String msg, Channel channel, Message message) throws IOException {
  3.         String messageId = message.getMessageProperties().getMessageId();
  4.          //messageId对应的缓存值为0时表示消息消费中,1表示消费完成
  5.         if(Boolean.TRUE.equals(redisTemplate.opsForValue().setIfAbsent(messageId, "0", 30L, TimeUnit.SECONDS))){//消息第一次被消费
  6.             try {
  7.                 //模拟消费耗时
  8.                 System.out.println("接收到消息:id=" + messageId+ " message=" + msg);
  9.                 Thread.sleep(100);
  10.                 //业务执行完成后标识消息消费完成
  11.                 redisTemplate.opsForValue().set(messageId,"1",30L,TimeUnit.SECONDS);
  12.                 //消息消费确认
  13.                 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
  14.                 //消费成功后删除缓存
  15.                 redisTemplate.delete(messageId);
  16.             } catch (Exception e) {
  17.                 //丢弃消息(关联了死信队列的话可以放入死信队列处理)
  18.                 channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
  19.                 //删除缓存
  20.                 redisTemplate.delete(messageId);
  21.             }
  22.         }else{
  23.             String value = redisTemplate.opsForValue().get(messageId);
  24.             if("0".equals(value)){
  25.                 return;
  26.             }
  27.             channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
  28.         }
  29.     }
复制代码
再次按照以上步骤测试,消息不会被重复消耗。

最后一条被消耗的消息为49,再次启动后未重复消耗。
本示例中缓存过期时间为30s,若启动消耗者的时间间隔凌驾30s,则消息仍会被重复消耗

3.死信队列

死信队列就是一个普通队列,可以使用恣意种交换机,业务队列可通过绑定死信交换机和路由键自动将被nack和reject且不重新入队的消息发送给对应的死信队列。大概通过设置业务队列的消息过期时间实现延时消息。
测试配置类
  1. import org.springframework.amqp.core.*;
  2. import org.springframework.beans.factory.annotation.Qualifier;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. import java.util.HashMap;
  6. import java.util.Map;
  7. @Configuration
  8. public class RabbitMQConfig {
  9.     private static final String DIRECT_EXCHANGE_NAME = "directExchange";//交换机名称
  10.     private static final String DIRECT_ROUTE_KEY = "directRoute";//路由键
  11.     private static final String DIRECT_QUEUE_NAME = "directQueue";//队列名称
  12.     /**
  13.      * 业务交换机
  14.      * @return direct交换机
  15.      * 名称,是否持久化,无队列自动删除
  16.      */
  17.     @Bean
  18.     public DirectExchange directExchange(){
  19.         return new DirectExchange(DIRECT_EXCHANGE_NAME,true,false);
  20.     }
  21.     /**
  22.      * 队列
  23.      * @return 队列
  24.      * 名称,是否持久化,是否独占,是否自动删除
  25.      */
  26.     @Bean
  27.     public Queue directQueue(){
  28.         Map<String, Object> args = new HashMap<>(3);
  29.         //队列绑定的死信交换机
  30.         args.put("x-dead-letter-exchange","dead_exchange");
  31.         //队列的死信路由key
  32.         args.put("x-dead-letter-routing-key", "dead_route");
  33.         //消息过期时间
  34.         //args.put("x-message-ttl",4000);
  35.         return QueueBuilder.durable(DIRECT_QUEUE_NAME).withArguments(args).build();
  36.     }
  37.     /**
  38.      * 绑定交换机和队列
  39.      * @param directExchange 交换机
  40.      * @param queue 队列
  41.      * @return
  42.      */
  43.     @Bean
  44.     public Binding bindingExchangeWithQueue(DirectExchange directExchange, @Qualifier("directQueue") Queue queue){
  45.         return BindingBuilder.bind(queue).to(directExchange).with(DIRECT_ROUTE_KEY);
  46.     }
  47.     /**
  48.      * 死信交换机
  49.      * @return direct交换机
  50.      * 名称,是否持久化,无队列自动删除
  51.      */
  52.     @Bean
  53.     public DirectExchange deadExchange(){
  54.         return new DirectExchange("dead_exchange",true,false);
  55.     }
  56.     @Bean
  57.     public Queue deadQueue(){
  58.         return new Queue("dead_queue",true,false,false);
  59.     }
  60.     @Bean
  61.     public Binding bindingDeadExchangeWithQueue(DirectExchange deadExchange, Queue deadQueue){
  62.         return BindingBuilder.bind(deadQueue).to(deadExchange).with("dead_route");
  63.     }
  64. }
复制代码
在第二步的代码中添加模拟报错代码
  1. if(msg.endsWith("2")){//模拟消息处理出错
  2.     throw new RuntimeException();
  3.   }
复制代码
消耗者消耗完所有消息后,控制台可以看到死信队列里有10条消息

新建死信队列监听
  1.     @RabbitListener(queues = "dead_queue")
  2.     public void deadMessage(String msg, Channel channel, Message message) throws IOException {
  3.         String bodyMessage = new String(message.getBody());
  4.         System.out.println("deadMessage = " + bodyMessage);
  5.         System.out.println("处理死信消息:" + msg);
  6.         channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
  7.     }
复制代码
启动消耗者后死信队列消息被处理

4.延时消息(建议使用延时消息插件)

在生产者消息发送方法内添加如下参数,设置当前发送消息的过期时间,消息过期后根据队列绑定的死信交换机和路由键将消息发送到死信队列,死信队列消耗者消耗消息即完成了消息延时消耗。
**注意:**这种延时消息局限性较大,由于假如先发送一条消息设置过期时间为30s,随后发送一条过期时间为10s的消息,仍会是第一条消息过期后第二条消息才能进入死信队列。
发送10条消息到队列,依次设置过期时间为10s到1s
  1.     public String sendMessage() {
  2.         for (int i = 10; i >= 1; i--) {
  3.             //生成消息id
  4.             String messageId = UUID.randomUUID().toString();
  5.             //消息内容
  6.             String message = "rabbitMQ测试消息!" + i;
  7.             int time = i;
  8.             //发送消息
  9.             rabbitTemplate.convertAndSend(DIRECT_EXCHANGE_NAME, DIRECT_ROUTE_KEY,message,msg -> {
  10.                 msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);//持久化
  11.                 msg.getMessageProperties().setMessageId(messageId);//设置消息id
  12.                 msg.getMessageProperties().setExpiration(time + "000");
  13.                 return msg;
  14.             });
  15.             System.out.println("已发送消息:id=" + messageId + " message="+ message);
  16.         }
  17.         return "消息发送成功!";
  18.     }
复制代码

消耗者解释消耗普通队列的代码,启动消耗者观察死信队列消息消耗次序。
前10s都是没有处理消息的,由于第一条消息未过期,后续的消息也不会进入死信队列。


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

农民

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表