ToB企服应用市场:ToB评测及商务社交产业平台

标题: 【RabbitMQ | 第六篇】消息重复消费问题及解决方案 [打印本页]

作者: 耶耶耶耶耶    时间: 2024-7-13 21:17
标题: 【RabbitMQ | 第六篇】消息重复消费问题及解决方案


  
6.消息重复消费问题

6.1问题介绍

什么是消息重复消费?首先我们来看一下消息的传输流程。消息生产者–>MQ–>消息消费者;消息生产者发送消息到MQ服务器,MQ服务器存储消息,消息消费者监听MQ的消息,发现有消息就消费消息。
所以消息重复也就出现在 两个阶段
1 :生产者多发送了消息给MQ;
2 :MQ的一条消息被消费者消费了多次
具体场景如下:
6.2解决思绪

6.3将该消息存储到Redis

6.3.1将id存入string(单消费者场景)

(1)实现思绪


  1.     @RabbitListener(queues = "queueName4")//发送的队列名称     @RabbitListener注解到类和方法都可以
  2.     @RabbitHandler
  3.     public void receiveMessage1(Message message) throws UnsupportedEncodingException {
  4.         //获取唯一id
  5.         String messageId = message.getMessageProperties().getMessageId();
  6.         String msg = new String(message.getBody(),"utf-8");
  7.                
  8.         //获取redis中该队列名称对应的value值
  9.         String messageRedisValue = redisUtil.get("queueName4","");
  10.         
  11.         //检验唯一id是否存在
  12.         if (messageRedisValue.equals(messageId)) {
  13.             //存在
  14.             return;
  15.         }
  16.         System.out.println("消息:"+msg+", id:"+messageId);
  17.                
  18.         //以队列为key,id为value
  19.         redisUtil.set("queueName4",messageId);
  20.     }
复制代码
(2)问题

6.3.2将id存入list中(多消费场景)

(1)实现思绪


  1. @RabbitListener(queues = "queueName4")//发送的队列名称     @RabbitListener注解到类和方法都可以
  2. @RabbitHandler
  3. public void receiveMessage2(Message message) throws UnsupportedEncodingException {
  4.     String messageId = message.getMessageProperties().getMessageId();
  5.     String msg = new String(message.getBody(),"utf-8");
  6.        
  7.     //获取
  8.     List<String> messageRedisValue = redisUtil.lrange("queueName4");
  9.     if (messageRedisValue.contains(messageId)) {
  10.         return;
  11.     }
  12.     System.out.println("消息:"+msg+", id:"+messageId);
  13.     redisUtil.lpush("queueName4",messageId);//存入list
  14. }
复制代码
6.3.3将id以key增量存入string中并设置过期时间

(1)实现思绪

以消息id为key,消息内容为value存入string中,设置过期时间( 可蒙受的redis服务器非常时间,比如设置过期时间为10分钟,如果redis服务器断了20分钟,那么未消费的数据都会丢了)
  1.     @RabbitListener(queues = "queueName4")//发送的队列名称     @RabbitListener注解到类和方法都可以
  2.     @RabbitHandler
  3.     public void receiveMessage2(Message message) throws UnsupportedEncodingException {
  4.         String messageId = message.getMessageProperties().getMessageId();
  5.         String msg = new String(message.getBody(),"utf-8");
  6.         String messageRedisValue = redisUtil.get(messageId,"");
  7.         if (msg.equals(messageRedisValue)) {
  8.             return;
  9.         }
  10.         System.out.println("消息:"+msg+", id:"+messageId);
  11.                
  12.         //以id为key,消息内容为value,过期时间10分钟
  13.         redisUtil.set(messageId,msg,10L);
  14.     }
复制代码
6.4总结

该篇文章介绍了消息重复消费问题及解决方案,问题大概产生的两个阶段(生产消息多发、消费者重复消息);解决方案:将消息发送时携带一个唯一id,消费方拿到消息时先去reids/db中有没有该数据,若没有则可以消费,否则不可以消费;并介绍了基于Redsi解决消息重复消费问题,①以队列名称为key,消息id为value,且value为string类型(得当只有一个消费方)②以队列名称为key,消息id为value,且value为list类型(得当有多个消费方场景)③以消息id为key,内容为value,并设置过期时间


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4