一、RabbitMQ 消息重复消费的成因
消息重复消费主要由以下缘故原由引起:
- 消息确认机制
RabbitMQ 的消息确认机制分为手动确认和自动确认。消费者在处理消息时,如果未正确确认(ACK),RabbitMQ 会将消息重新放回队列,导致重复消费。
- 消费者异常
消费者在处理消息时抛出异常,导致未发送 ACK。
- 网络问题
在消费者与 RabbitMQ 之间的通信过程中,网络中断或耽误大概导致 RabbitMQ 认为消息未被确认。
- 服务重启
如果消费者服务在处理消息时重启,未确认的消息会被重新投递。
- 事件问题
消费者未能正确处理幂等逻辑或业务事件,导致消息重复处理。
二、消息重复消费的影响
- 数据冗余
消息重复处理大概导致数据库中出现重复数据。
- 业务逻辑异常
某些业务操纵不支持重复执行,如扣款、订单支付等。
- 资源浪费
消息的重复消费会增加系统的盘算和存储负担。
三、办理消息重复消费的策略
为了办理 RabbitMQ 消息重复消费的问题,可以从以下几方面入手:
3.1 确保消息的可靠传递
- 手动确认机制
利用手动确认(ACK)而非自动确认,确保消息在被成功处理后再确认。
- 死信队列
将消费失败的消息转入死信队列,避免不停重复消费。
3.2 消费幂等性设计
- 唯一标识
为每条消息设计一个唯一 ID,利用数据库或缓存记载消息处理状态。
- 分布式锁
利用 Redis 等工具实现分布式锁,确保同一消息在同一时刻只被处理一次。
3.3 网络和服务优化
- 超时重试机制
设置合理的重试机制,避免因网络波动导致消息重复投递。
- 服务高可用
加强消费者服务的高可用性,减少服务中断的大概。
四、RabbitMQ 消息重复消费的实践
以下通过 Java 示例展示如那边理 RabbitMQ 消息重复消费问题。
4.1 Maven 依靠
- <dependency>
- <groupId>com.rabbitmq</groupId>
- <artifactId>amqp-client</artifactId>
- <version>5.16.0</version>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-data-redis</artifactId>
- </dependency>
复制代码 4.2 配置 RabbitMQ
配置交换机、队列和绑定:
- package com.example.config;
- import org.springframework.amqp.core.*;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- @Configuration
- public class RabbitMQConfig {
- public static final String QUEUE_NAME = "test_queue";
- public static final String EXCHANGE_NAME = "test_exchange";
- public static final String ROUTING_KEY = "test_key";
- @Bean
- public Queue queue() {
- return QueueBuilder.durable(QUEUE_NAME).build();
- }
- @Bean
- public DirectExchange exchange() {
- return new DirectExchange(EXCHANGE_NAME);
- }
- @Bean
- public Binding binding(Queue queue, DirectExchange exchange) {
- return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
- }
- }
复制代码 4.3 消费者实现
1. 手动确认消息
- package com.example.consumer;
- import com.rabbitmq.client.Channel;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Service;
- import java.nio.charset.StandardCharsets;
- @Service
- public class RabbitMQConsumer {
- @RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)
- public void consumeMessage(String message, Channel channel, org.springframework.amqp.core.Message amqpMessage) {
- long deliveryTag = amqpMessage.getMessageProperties().getDeliveryTag();
- try {
- // 模拟业务处理
- System.out.println("Processing message: " + message);
- // 手动确认消息
- channel.basicAck(deliveryTag, false);
- } catch (Exception e) {
- try {
- // 拒绝并重新投递
- channel.basicNack(deliveryTag, false, true);
- } catch (Exception nackEx) {
- nackEx.printStackTrace();
- }
- }
- }
- }
复制代码 2. 添加幂等性处理
利用 Redis 记载消息唯一 ID:
- package com.example.consumer;
- import com.example.config.RabbitMQConfig;
- import com.rabbitmq.client.Channel;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.data.redis.core.StringRedisTemplate;
- import org.springframework.stereotype.Service;
- import java.util.concurrent.TimeUnit;
- @Service
- public class IdempotentConsumer {
- @Autowired
- private StringRedisTemplate redisTemplate;
- @RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)
- public void consumeMessage(String message, Channel channel, org.springframework.amqp.core.Message amqpMessage) {
- long deliveryTag = amqpMessage.getMessageProperties().getDeliveryTag();
- String messageId = amqpMessage.getMessageProperties().getMessageId(); // 假设消息 ID 从属性中获取
- try {
- // 检查是否已处理
- if (redisTemplate.opsForValue().setIfAbsent(messageId, "processed", 10, TimeUnit.MINUTES)) {
- // 模拟业务处理
- System.out.println("Processing unique message: " + message);
- // 手动确认消息
- channel.basicAck(deliveryTag, false);
- } else {
- System.out.println("Duplicate message detected, skipping: " + messageId);
- channel.basicAck(deliveryTag, false); // 确认消息,但不重复处理
- }
- } catch (Exception e) {
- try {
- // 拒绝并重新投递
- channel.basicNack(deliveryTag, false, true);
- } catch (Exception nackEx) {
- nackEx.printStackTrace();
- }
- }
- }
- }
复制代码 4.4 生产者实现
- package com.example.producer;
- import com.example.config.RabbitMQConfig;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Service;
- @Service
- public class RabbitMQProducer {
- @Autowired
- private RabbitTemplate rabbitTemplate;
- public void sendMessage(String message, String messageId) {
- rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, RabbitMQConfig.ROUTING_KEY, message, msg -> {
- msg.getMessageProperties().setMessageId(messageId);
- return msg;
- });
- System.out.println("Message sent: " + message);
- }
- }
复制代码 五、验证与测试
- 启动 RabbitMQ 服务和消费者。
- 发送测试消息:
- RabbitMQProducer producer = new RabbitMQProducer();
- producer.sendMessage("Test Message 1", "msg-123");
- producer.sendMessage("Test Message 1", "msg-123"); // 发送相同的消息 ID
复制代码 - 预期结果:
- 消息只会被消费一次。
- 如果重复消息到达,将被辨认并跳过处理。
六、总结
RabbitMQ 消息重复消费是一个常见但可以有效办理的问题。通过手动确认消息、幂等性设计以及合理的网络和服务配置,可以显著减少消息重复消费带来的负面影响。本文通过 Java 示例展示了如何利用 Redis 和手动 ACK 实现幂等消费和重复消息的处理。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |