农民 发表于 2024-8-1 03:16:18

RabbitMQ消息重复消耗

RabbitMQ消息重复消耗问题

同一条消息被一个消耗者消耗多次大概被多个消耗者消耗。大概导致系统相关业务重复实验和数据不同等问题。
1.场景模拟

生产者
public String sendMessage() {
      for (int i = 1; i <= 100; i++) {
            //生成消息id
            String messageId = UUID.randomUUID().toString();
            //消息内容
            String message = "rabbitMQ测试消息!" + i;
            //发送消息
            rabbitTemplate.convertAndSend(DIRECT_EXCHANGE_NAME, DIRECT_ROUTE_KEY,message,msg -> {
                msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);//持久化
                msg.getMessageProperties().setMessageId(messageId);//设置消息id
                return msg;
            });
            System.out.println("已发送消息:id=" + messageId + " message="+ message);
      }
      return "消息发送成功!";
    }
消耗者
         @RabbitListener(queues = "directQueue")
    public void spendMessage(String msg, Channel channel, Message message) throws IOException {
      String messageId = message.getMessageProperties().getMessageId();
      System.out.println("接收到消息:id=" + messageId+ " message=" + msg);
      try {
            //模拟消费耗时
            Thread.sleep(100);
      } catch (InterruptedException e) {
            e.printStackTrace();
      }
      //channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }
测试步骤

生产者发送100条消息
https://img-blog.csdnimg.cn/direct/54fd797a66cf448b8343e217e477c91b.png
启动消耗者
https://img-blog.csdnimg.cn/direct/624d99280a4741c29c8d4766df290279.png
由于最后一条消息没有调用basicAck方法,消息并没有消耗成功,当我们重启消耗者服务时,消息会被再次消耗。
重启消耗者
https://img-blog.csdnimg.cn/direct/947d202c782540cfb94efefa86cd81ff.png
2.解决方案

由于每条消息都有自己的id(唯一标识),可根据这个id来判断消息是否被消耗过。
消耗消息前先获取消息id—>查询缓存是否存在此id,判断id对应的值—>为1则体现该消息被消耗过,为0则体现消耗中
以下示例使用redis作为介质
消耗者
    @RabbitListener(queues = "directQueue")
    public void spendMessage(String msg, Channel channel, Message message) throws IOException {
      String messageId = message.getMessageProperties().getMessageId();
         //messageId对应的缓存值为0时表示消息消费中,1表示消费完成
      if(Boolean.TRUE.equals(redisTemplate.opsForValue().setIfAbsent(messageId, "0", 30L, TimeUnit.SECONDS))){//消息第一次被消费
            try {
                //模拟消费耗时
                System.out.println("接收到消息:id=" + messageId+ " message=" + msg);
                Thread.sleep(100);
                //业务执行完成后标识消息消费完成
                redisTemplate.opsForValue().set(messageId,"1",30L,TimeUnit.SECONDS);
                //消息消费确认
                channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
                //消费成功后删除缓存
                redisTemplate.delete(messageId);
            } catch (Exception e) {
                //丢弃消息(关联了死信队列的话可以放入死信队列处理)
                channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
                //删除缓存
                redisTemplate.delete(messageId);
            }
      }else{
            String value = redisTemplate.opsForValue().get(messageId);
            if("0".equals(value)){
                return;
            }
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
      }
    }
再次按照以上步骤测试,消息不会被重复消耗。
https://img-blog.csdnimg.cn/direct/a35c2888d9d6497882fea1937f96d462.png
最后一条被消耗的消息为49,再次启动后未重复消耗。
本示例中缓存过期时间为30s,若启动消耗者的时间间隔凌驾30s,则消息仍会被重复消耗
https://img-blog.csdnimg.cn/direct/1eed22866c124edaadc8bc493dc3ca3b.png
3.死信队列

死信队列就是一个普通队列,可以使用恣意种交换机,业务队列可通过绑定死信交换机和路由键自动将被nack和reject且不重新入队的消息发送给对应的死信队列。大概通过设置业务队列的消息过期时间实现延时消息。
测试配置类
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitMQConfig {

    private static final String DIRECT_EXCHANGE_NAME = "directExchange";//交换机名称
    private static final String DIRECT_ROUTE_KEY = "directRoute";//路由键
    private static final String DIRECT_QUEUE_NAME = "directQueue";//队列名称

    /**
   * 业务交换机
   * @return direct交换机
   * 名称,是否持久化,无队列自动删除
   */
    @Bean
    public DirectExchange directExchange(){
      return new DirectExchange(DIRECT_EXCHANGE_NAME,true,false);
    }

    /**
   * 队列
   * @return 队列
   * 名称,是否持久化,是否独占,是否自动删除
   */
    @Bean
    public Queue directQueue(){
      Map<String, Object> args = new HashMap<>(3);
      //队列绑定的死信交换机
      args.put("x-dead-letter-exchange","dead_exchange");
      //队列的死信路由key
      args.put("x-dead-letter-routing-key", "dead_route");
      //消息过期时间
      //args.put("x-message-ttl",4000);
      return QueueBuilder.durable(DIRECT_QUEUE_NAME).withArguments(args).build();
    }

    /**
   * 绑定交换机和队列
   * @param directExchange 交换机
   * @param queue 队列
   * @return
   */
    @Bean
    public Binding bindingExchangeWithQueue(DirectExchange directExchange, @Qualifier("directQueue") Queue queue){
      return BindingBuilder.bind(queue).to(directExchange).with(DIRECT_ROUTE_KEY);
    }

    /**
   * 死信交换机
   * @return direct交换机
   * 名称,是否持久化,无队列自动删除
   */
    @Bean
    public DirectExchange deadExchange(){
      return new DirectExchange("dead_exchange",true,false);
    }

    @Bean
    public Queue deadQueue(){
      return new Queue("dead_queue",true,false,false);
    }

    @Bean
    public Binding bindingDeadExchangeWithQueue(DirectExchange deadExchange, Queue deadQueue){
      return BindingBuilder.bind(deadQueue).to(deadExchange).with("dead_route");
    }

}

在第二步的代码中添加模拟报错代码
if(msg.endsWith("2")){//模拟消息处理出错
    throw new RuntimeException();
}
消耗者消耗完所有消息后,控制台可以看到死信队列里有10条消息
https://img-blog.csdnimg.cn/direct/615d174829a44ce980835c624276c7b5.png
新建死信队列监听
    @RabbitListener(queues = "dead_queue")
    public void deadMessage(String msg, Channel channel, Message message) throws IOException {
      String bodyMessage = new String(message.getBody());
      System.out.println("deadMessage = " + bodyMessage);
      System.out.println("处理死信消息:" + msg);
      channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }
启动消耗者后死信队列消息被处理
https://img-blog.csdnimg.cn/direct/616f2e5d9e67401ebfdec5e1540ac0a6.png
4.延时消息(建议使用延时消息插件)

在生产者消息发送方法内添加如下参数,设置当前发送消息的过期时间,消息过期后根据队列绑定的死信交换机和路由键将消息发送到死信队列,死信队列消耗者消耗消息即完成了消息延时消耗。
**注意:**这种延时消息局限性较大,由于假如先发送一条消息设置过期时间为30s,随后发送一条过期时间为10s的消息,仍会是第一条消息过期后第二条消息才能进入死信队列。
发送10条消息到队列,依次设置过期时间为10s到1s
    public String sendMessage() {
      for (int i = 10; i >= 1; i--) {
            //生成消息id
            String messageId = UUID.randomUUID().toString();
            //消息内容
            String message = "rabbitMQ测试消息!" + i;
            int time = i;
            //发送消息
            rabbitTemplate.convertAndSend(DIRECT_EXCHANGE_NAME, DIRECT_ROUTE_KEY,message,msg -> {
                msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);//持久化
                msg.getMessageProperties().setMessageId(messageId);//设置消息id
                msg.getMessageProperties().setExpiration(time + "000");
                return msg;
            });
            System.out.println("已发送消息:id=" + messageId + " message="+ message);
      }
      return "消息发送成功!";
    }

https://img-blog.csdnimg.cn/direct/9660569003f941ada877d400053bb074.png
消耗者解释消耗普通队列的代码,启动消耗者观察死信队列消息消耗次序。
前10s都是没有处理消息的,由于第一条消息未过期,后续的消息也不会进入死信队列。
https://img-blog.csdnimg.cn/direct/59c2fe84b8424b1ea6bdd5045a4419a2.png

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: RabbitMQ消息重复消耗