ToB企服应用市场:ToB评测及商务社交产业平台
标题:
rabbitmq消息的重复消费问题
[打印本页]
作者:
郭卫东
时间:
2025-1-4 11:09
标题:
rabbitmq消息的重复消费问题
一、RabbitMQ 消息重复消费的成因
消息重复消费主要由以下缘故原由引起:
消息确认机制
RabbitMQ 的消息确认机制分为手动确认和自动确认。消费者在处理消息时,如果未正确确认(ACK),RabbitMQ 会将消息重新放回队列,导致重复消费。
消费者异常
消费者在处理消息时抛出异常,导致未发送 ACK。
网络问题
在消费者与 RabbitMQ 之间的通信过程中,网络中断或耽误大概导致 RabbitMQ 认为消息未被确认。
服务重启
如果消费者服务在处理消息时重启,未确认的消息会被重新投递。
事件问题
消费者未能正确处理幂等逻辑或业务事件,导致消息重复处理。
二、消息重复消费的影响
数据冗余
消息重复处理大概导致数据库中出现重复数据。
业务逻辑异常
某些业务操纵不支持重复执行,如扣款、订单支付等。
资源浪费
消息的重复消费会增加系统的盘算和存储负担。
三、办理消息重复消费的策略
为了办理 RabbitMQ 消息重复消费的问题,可以从以下几方面入手:
3.1 确保消息的可靠传递
手动确认机制
利用手动确认(ACK)而非自动确认,确保消息在被成功处理后再确认。
死信队列
将消费失败的消息转入死信队列,避免不停重复消费。
3.2 消费幂等性设计
唯一标识
为每条消息设计一个唯一 ID,利用数据库或缓存记载消息处理状态。
分布式锁
利用 Redis 等工具实现分布式锁,确保同一消息在同一时刻只被处理一次。
3.3 网络和服务优化
超时重试机制
设置合理的重试机制,避免因网络波动导致消息重复投递。
服务高可用
加强消费者服务的高可用性,减少服务中断的大概。
四、RabbitMQ 消息重复消费的实践
以下通过 Java 示例展示如那边理 RabbitMQ 消息重复消费问题。
4.1 Maven 依靠
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.16.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
复制代码
4.2 配置 RabbitMQ
配置交换机、队列和绑定:
package com.example.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
public static final String QUEUE_NAME = "test_queue";
public static final String EXCHANGE_NAME = "test_exchange";
public static final String ROUTING_KEY = "test_key";
@Bean
public Queue queue() {
return QueueBuilder.durable(QUEUE_NAME).build();
}
@Bean
public DirectExchange exchange() {
return new DirectExchange(EXCHANGE_NAME);
}
@Bean
public Binding binding(Queue queue, DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
}
}
复制代码
4.3 消费者实现
1. 手动确认消息
package com.example.consumer;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import java.nio.charset.StandardCharsets;
@Service
public class RabbitMQConsumer {
@RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)
public void consumeMessage(String message, Channel channel, org.springframework.amqp.core.Message amqpMessage) {
long deliveryTag = amqpMessage.getMessageProperties().getDeliveryTag();
try {
// 模拟业务处理
System.out.println("Processing message: " + message);
// 手动确认消息
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
try {
// 拒绝并重新投递
channel.basicNack(deliveryTag, false, true);
} catch (Exception nackEx) {
nackEx.printStackTrace();
}
}
}
}
复制代码
2. 添加幂等性处理
利用 Redis 记载消息唯一 ID:
package com.example.consumer;
import com.example.config.RabbitMQConfig;
import com.rabbitmq.client.Channel;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import java.util.concurrent.TimeUnit;
@Service
public class IdempotentConsumer {
@Autowired
private StringRedisTemplate redisTemplate;
@RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)
public void consumeMessage(String message, Channel channel, org.springframework.amqp.core.Message amqpMessage) {
long deliveryTag = amqpMessage.getMessageProperties().getDeliveryTag();
String messageId = amqpMessage.getMessageProperties().getMessageId(); // 假设消息 ID 从属性中获取
try {
// 检查是否已处理
if (redisTemplate.opsForValue().setIfAbsent(messageId, "processed", 10, TimeUnit.MINUTES)) {
// 模拟业务处理
System.out.println("Processing unique message: " + message);
// 手动确认消息
channel.basicAck(deliveryTag, false);
} else {
System.out.println("Duplicate message detected, skipping: " + messageId);
channel.basicAck(deliveryTag, false); // 确认消息,但不重复处理
}
} catch (Exception e) {
try {
// 拒绝并重新投递
channel.basicNack(deliveryTag, false, true);
} catch (Exception nackEx) {
nackEx.printStackTrace();
}
}
}
}
复制代码
4.4 生产者实现
package com.example.producer;
import com.example.config.RabbitMQConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class RabbitMQProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage(String message, String messageId) {
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, RabbitMQConfig.ROUTING_KEY, message, msg -> {
msg.getMessageProperties().setMessageId(messageId);
return msg;
});
System.out.println("Message sent: " + message);
}
}
复制代码
五、验证与测试
启动 RabbitMQ 服务和消费者
。
发送测试消息
:
RabbitMQProducer producer = new RabbitMQProducer();
producer.sendMessage("Test Message 1", "msg-123");
producer.sendMessage("Test Message 1", "msg-123"); // 发送相同的消息 ID
复制代码
预期结果
:
消息只会被消费一次。
如果重复消息到达,将被辨认并跳过处理。
六、总结
RabbitMQ 消息重复消费是一个常见但可以有效办理的问题。通过手动确认消息、幂等性设计以及合理的网络和服务配置,可以显著减少消息重复消费带来的负面影响。本文通过 Java 示例展示了如何利用 Redis 和手动 ACK 实现幂等消费和重复消息的处理。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/)
Powered by Discuz! X3.4