耶耶耶耶耶 发表于 2024-7-13 21:17:58

【RabbitMQ | 第六篇】消息重复消费问题及解决方案

https://img-blog.csdnimg.cn/direct/f61c601436f449fc8bab253f6d592489.jpeg#pic_center


6.消息重复消费问题

6.1问题介绍

什么是消息重复消费?首先我们来看一下消息的传输流程。消息生产者–>MQ–>消息消费者;消息生产者发送消息到MQ服务器,MQ服务器存储消息,消息消费者监听MQ的消息,发现有消息就消费消息。
所以消息重复也就出现在 两个阶段
1 :生产者多发送了消息给MQ;
2 :MQ的一条消息被消费者消费了多次。
具体场景如下:

[*]生产者发送消息给MQ,在MQ确认的时候出现了网络颠簸,生产者没有收到确认,这时候生产者就会重新发送这条消息,导致MQ会接收到重复消息。
[*]消费者消费成功后,给MQ确认的时候出现了网络颠簸,MQ没有接收到确认,为了保证消息不丢失,MQ就会继续给消费者投递之前的消息。这时候消费者就接收到了两条一样的消息。由于重复消息是由于网络原因造成的,无法避免。
6.2解决思绪


[*]发送消息时让每个消息携带一个全局的唯一ID
[*]在消费消息时先判断消息是否已经被消费过,保证消息消费逻辑的幂等性。具体消费过程为:

[*]消费者获取到消息后先根据id去查询redis/db是否存在该消息
[*]如果不存在,则正常消费,消费完毕后写入redis/db
[*]如果存在,则证实消息被消费过,直接丢弃

6.3将该消息存储到Redis

6.3.1将id存入string(单消费者场景)

(1)实现思绪



[*]将id号存入value中,而且value类型为string
[*]即以队列名称为key,以消息id为值
[*]每次消息过来都覆盖之前的消息
    @RabbitListener(queues = "queueName4")//发送的队列名称   @RabbitListener注解到类和方法都可以
    @RabbitHandler
    public void receiveMessage1(Message message) throws UnsupportedEncodingException {
      //获取唯一id
      String messageId = message.getMessageProperties().getMessageId();
      String msg = new String(message.getBody(),"utf-8");
               
      //获取redis中该队列名称对应的value值
      String messageRedisValue = redisUtil.get("queueName4","");
      
      //检验唯一id是否存在
      if (messageRedisValue.equals(messageId)) {
            //存在
            return;
      }
      System.out.println("消息:"+msg+", id:"+messageId);
               
      //以队列为key,id为value
      redisUtil.set("queueName4",messageId);
    }
(2)问题


[*]并发辩说:如果多个消费者同时操作 Redis 中的已消费消息列表,由于 Redis 是单线程处置惩罚下令,大概会出现并发辩说导致数据不同等或丢失问题。特殊是在高并发情况下,使用字符串类型的 ID 大概会增加并发辩说的风险
[*]内存占用:字符串类型的 ID 在内存中占用空间相对较大,尤其是对于大量消息的情况下,会增加 Redis 的内存占用。
[*]比力服从:字符串类型的 ID 比力起来相对复杂,必要举行字符串比力操作。
6.3.2将id存入list中(多消费场景)

(1)实现思绪



[*]以该队列名称为key,id为value
[*]得当多消费场景的原因:

[*]顺序性:List 是一个有序聚集,可以按照消息的顺序存储消息 ID。在多消费者场景下,保持消息的顺序通常是很重要的,以确保消息按照精确的顺序被消费。
[*]原子性操作:Redis 的 List 提供了多个原子性操作,比如从列表两端推入/弹出元素,这些操作可以确保多个消费者同时访问列表时不会出现数据竞争和并发问题。
[*]支持壅闭操作:List 提供了壅闭式的弹出操作(如 BLPOP、BRPOP),可以在没有消息时壅闭等待新消息的到来,这对于实现消费者轮询机制非常有效。

@RabbitListener(queues = "queueName4")//发送的队列名称   @RabbitListener注解到类和方法都可以
@RabbitHandler
public void receiveMessage2(Message message) throws UnsupportedEncodingException {
    String messageId = message.getMessageProperties().getMessageId();
    String msg = new String(message.getBody(),"utf-8");
       
    //获取
    List<String> messageRedisValue = redisUtil.lrange("queueName4");
    if (messageRedisValue.contains(messageId)) {
      return;
    }
    System.out.println("消息:"+msg+", id:"+messageId);

    redisUtil.lpush("queueName4",messageId);//存入list
}
6.3.3将id以key增量存入string中并设置过期时间

(1)实现思绪

以消息id为key,消息内容为value存入string中,设置过期时间( 可蒙受的redis服务器非常时间,比如设置过期时间为10分钟,如果redis服务器断了20分钟,那么未消费的数据都会丢了)
    @RabbitListener(queues = "queueName4")//发送的队列名称   @RabbitListener注解到类和方法都可以
    @RabbitHandler
    public void receiveMessage2(Message message) throws UnsupportedEncodingException {
      String messageId = message.getMessageProperties().getMessageId();
      String msg = new String(message.getBody(),"utf-8");

      String messageRedisValue = redisUtil.get(messageId,"");
      if (msg.equals(messageRedisValue)) {
            return;
      }
      System.out.println("消息:"+msg+", id:"+messageId);
               
      //以id为key,消息内容为value,过期时间10分钟
      redisUtil.set(messageId,msg,10L);
    }
6.4总结

该篇文章介绍了消息重复消费问题及解决方案,问题大概产生的两个阶段(生产消息多发、消费者重复消息);解决方案:将消息发送时携带一个唯一id,消费方拿到消息时先去reids/db中有没有该数据,若没有则可以消费,否则不可以消费;并介绍了基于Redsi解决消息重复消费问题,①以队列名称为key,消息id为value,且value为string类型(得当只有一个消费方)②以队列名称为key,消息id为value,且value为list类型(得当有多个消费方场景)③以消息id为key,内容为value,并设置过期时间
https://img-blog.csdnimg.cn/direct/03f5594d1df54f08aa4e57152ada9f6c.jpeg#pic_center

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: 【RabbitMQ | 第六篇】消息重复消费问题及解决方案