rabbitmq消息的重复消费问题

打印 上一主题 下一主题

主题 857|帖子 857|积分 2571


一、RabbitMQ 消息重复消费的成因

消息重复消费主要由以下缘故原由引起:

  • 消息确认机制
    RabbitMQ 的消息确认机制分为手动确认和自动确认。消费者在处理消息时,如果未正确确认(ACK),RabbitMQ 会将消息重新放回队列,导致重复消费。
  • 消费者异常
    消费者在处理消息时抛出异常,导致未发送 ACK。
  • 网络问题
    在消费者与 RabbitMQ 之间的通信过程中,网络中断或耽误大概导致 RabbitMQ 认为消息未被确认。
  • 服务重启
    如果消费者服务在处理消息时重启,未确认的消息会被重新投递。
  • 事件问题
    消费者未能正确处理幂等逻辑或业务事件,导致消息重复处理。

二、消息重复消费的影响


  • 数据冗余
    消息重复处理大概导致数据库中出现重复数据。
  • 业务逻辑异常
    某些业务操纵不支持重复执行,如扣款、订单支付等。
  • 资源浪费
    消息的重复消费会增加系统的盘算和存储负担。

三、办理消息重复消费的策略

为了办理 RabbitMQ 消息重复消费的问题,可以从以下几方面入手:
3.1 确保消息的可靠传递


  • 手动确认机制
    利用手动确认(ACK)而非自动确认,确保消息在被成功处理后再确认。
  • 死信队列
    将消费失败的消息转入死信队列,避免不停重复消费。
3.2 消费幂等性设计


  • 唯一标识
    为每条消息设计一个唯一 ID,利用数据库或缓存记载消息处理状态。
  • 分布式锁
    利用 Redis 等工具实现分布式锁,确保同一消息在同一时刻只被处理一次。
3.3 网络和服务优化


  • 超时重试机制
    设置合理的重试机制,避免因网络波动导致消息重复投递。
  • 服务高可用
    加强消费者服务的高可用性,减少服务中断的大概。

四、RabbitMQ 消息重复消费的实践

以下通过 Java 示例展示如那边理 RabbitMQ 消息重复消费问题。
4.1 Maven 依靠

  1. <dependency>
  2.     <groupId>com.rabbitmq</groupId>
  3.     <artifactId>amqp-client</artifactId>
  4.     <version>5.16.0</version>
  5. </dependency>
  6. <dependency>
  7.     <groupId>org.springframework.boot</groupId>
  8.     <artifactId>spring-boot-starter-data-redis</artifactId>
  9. </dependency>
复制代码
4.2 配置 RabbitMQ

配置交换机、队列和绑定:

  1. package com.example.config;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. @Configuration
  6. public class RabbitMQConfig {
  7.     public static final String QUEUE_NAME = "test_queue";
  8.     public static final String EXCHANGE_NAME = "test_exchange";
  9.     public static final String ROUTING_KEY = "test_key";
  10.     @Bean
  11.     public Queue queue() {
  12.         return QueueBuilder.durable(QUEUE_NAME).build();
  13.     }
  14.     @Bean
  15.     public DirectExchange exchange() {
  16.         return new DirectExchange(EXCHANGE_NAME);
  17.     }
  18.     @Bean
  19.     public Binding binding(Queue queue, DirectExchange exchange) {
  20.         return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
  21.     }
  22. }
复制代码

4.3 消费者实现

1. 手动确认消息

  1. package com.example.consumer;
  2. import com.rabbitmq.client.Channel;
  3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  4. import org.springframework.stereotype.Service;
  5. import java.nio.charset.StandardCharsets;
  6. @Service
  7. public class RabbitMQConsumer {
  8.     @RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)
  9.     public void consumeMessage(String message, Channel channel, org.springframework.amqp.core.Message amqpMessage) {
  10.         long deliveryTag = amqpMessage.getMessageProperties().getDeliveryTag();
  11.         try {
  12.             // 模拟业务处理
  13.             System.out.println("Processing message: " + message);
  14.             // 手动确认消息
  15.             channel.basicAck(deliveryTag, false);
  16.         } catch (Exception e) {
  17.             try {
  18.                 // 拒绝并重新投递
  19.                 channel.basicNack(deliveryTag, false, true);
  20.             } catch (Exception nackEx) {
  21.                 nackEx.printStackTrace();
  22.             }
  23.         }
  24.     }
  25. }
复制代码
2. 添加幂等性处理

利用 Redis 记载消息唯一 ID:
  1. package com.example.consumer;
  2. import com.example.config.RabbitMQConfig;
  3. import com.rabbitmq.client.Channel;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.data.redis.core.StringRedisTemplate;
  6. import org.springframework.stereotype.Service;
  7. import java.util.concurrent.TimeUnit;
  8. @Service
  9. public class IdempotentConsumer {
  10.     @Autowired
  11.     private StringRedisTemplate redisTemplate;
  12.     @RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)
  13.     public void consumeMessage(String message, Channel channel, org.springframework.amqp.core.Message amqpMessage) {
  14.         long deliveryTag = amqpMessage.getMessageProperties().getDeliveryTag();
  15.         String messageId = amqpMessage.getMessageProperties().getMessageId(); // 假设消息 ID 从属性中获取
  16.         try {
  17.             // 检查是否已处理
  18.             if (redisTemplate.opsForValue().setIfAbsent(messageId, "processed", 10, TimeUnit.MINUTES)) {
  19.                 // 模拟业务处理
  20.                 System.out.println("Processing unique message: " + message);
  21.                 // 手动确认消息
  22.                 channel.basicAck(deliveryTag, false);
  23.             } else {
  24.                 System.out.println("Duplicate message detected, skipping: " + messageId);
  25.                 channel.basicAck(deliveryTag, false); // 确认消息,但不重复处理
  26.             }
  27.         } catch (Exception e) {
  28.             try {
  29.                 // 拒绝并重新投递
  30.                 channel.basicNack(deliveryTag, false, true);
  31.             } catch (Exception nackEx) {
  32.                 nackEx.printStackTrace();
  33.             }
  34.         }
  35.     }
  36. }
复制代码

4.4 生产者实现

  1. package com.example.producer;
  2. import com.example.config.RabbitMQConfig;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.stereotype.Service;
  6. @Service
  7. public class RabbitMQProducer {
  8.     @Autowired
  9.     private RabbitTemplate rabbitTemplate;
  10.     public void sendMessage(String message, String messageId) {
  11.         rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, RabbitMQConfig.ROUTING_KEY, message, msg -> {
  12.             msg.getMessageProperties().setMessageId(messageId);
  13.             return msg;
  14.         });
  15.         System.out.println("Message sent: " + message);
  16.     }
  17. }
复制代码

五、验证与测试


  • 启动 RabbitMQ 服务和消费者
  • 发送测试消息
    1. RabbitMQProducer producer = new RabbitMQProducer();
    2. producer.sendMessage("Test Message 1", "msg-123");
    3. producer.sendMessage("Test Message 1", "msg-123"); // 发送相同的消息 ID
    复制代码
  • 预期结果

    • 消息只会被消费一次。
    • 如果重复消息到达,将被辨认并跳过处理。


六、总结

RabbitMQ 消息重复消费是一个常见但可以有效办理的问题。通过手动确认消息、幂等性设计以及合理的网络和服务配置,可以显著减少消息重复消费带来的负面影响。本文通过 Java 示例展示了如何利用 Redis 和手动 ACK 实现幂等消费和重复消息的处理。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

郭卫东

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表