郭卫东 发表于 2025-1-4 11:09:00

rabbitmq消息的重复消费问题

一、RabbitMQ 消息重复消费的成因

消息重复消费主要由以下缘故原由引起:

[*] 消息确认机制
RabbitMQ 的消息确认机制分为手动确认和自动确认。消费者在处理消息时,如果未正确确认(ACK),RabbitMQ 会将消息重新放回队列,导致重复消费。
[*] 消费者异常
消费者在处理消息时抛出异常,导致未发送 ACK。
[*] 网络问题
在消费者与 RabbitMQ 之间的通信过程中,网络中断或耽误大概导致 RabbitMQ 认为消息未被确认。
[*] 服务重启
如果消费者服务在处理消息时重启,未确认的消息会被重新投递。
[*] 事件问题
消费者未能正确处理幂等逻辑或业务事件,导致消息重复处理。
二、消息重复消费的影响


[*] 数据冗余
消息重复处理大概导致数据库中出现重复数据。
[*] 业务逻辑异常
某些业务操纵不支持重复执行,如扣款、订单支付等。
[*] 资源浪费
消息的重复消费会增加系统的盘算和存储负担。
三、办理消息重复消费的策略

为了办理 RabbitMQ 消息重复消费的问题,可以从以下几方面入手:
3.1 确保消息的可靠传递


[*] 手动确认机制
利用手动确认(ACK)而非自动确认,确保消息在被成功处理后再确认。
[*] 死信队列
将消费失败的消息转入死信队列,避免不停重复消费。
3.2 消费幂等性设计


[*] 唯一标识
为每条消息设计一个唯一 ID,利用数据库或缓存记载消息处理状态。
[*] 分布式锁
利用 Redis 等工具实现分布式锁,确保同一消息在同一时刻只被处理一次。
3.3 网络和服务优化


[*] 超时重试机制
设置合理的重试机制,避免因网络波动导致消息重复投递。
[*] 服务高可用
加强消费者服务的高可用性,减少服务中断的大概。
四、RabbitMQ 消息重复消费的实践

以下通过 Java 示例展示如那边理 RabbitMQ 消息重复消费问题。
4.1 Maven 依靠

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.16.0</version>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
4.2 配置 RabbitMQ

配置交换机、队列和绑定:

package com.example.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

    public static final String QUEUE_NAME = "test_queue";
    public static final String EXCHANGE_NAME = "test_exchange";
    public static final String ROUTING_KEY = "test_key";

    @Bean
    public Queue queue() {
      return QueueBuilder.durable(QUEUE_NAME).build();
    }

    @Bean
    public DirectExchange exchange() {
      return new DirectExchange(EXCHANGE_NAME);
    }

    @Bean
    public Binding binding(Queue queue, DirectExchange exchange) {
      return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
    }
}
4.3 消费者实现

1. 手动确认消息

package com.example.consumer;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

import java.nio.charset.StandardCharsets;

@Service
public class RabbitMQConsumer {

    @RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)
    public void consumeMessage(String message, Channel channel, org.springframework.amqp.core.Message amqpMessage) {
      long deliveryTag = amqpMessage.getMessageProperties().getDeliveryTag();
      try {
            // 模拟业务处理
            System.out.println("Processing message: " + message);

            // 手动确认消息
            channel.basicAck(deliveryTag, false);
      } catch (Exception e) {
            try {
                // 拒绝并重新投递
                channel.basicNack(deliveryTag, false, true);
            } catch (Exception nackEx) {
                nackEx.printStackTrace();
            }
      }
    }
}
2. 添加幂等性处理

利用 Redis 记载消息唯一 ID:
package com.example.consumer;

import com.example.config.RabbitMQConfig;
import com.rabbitmq.client.Channel;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;

import java.util.concurrent.TimeUnit;

@Service
public class IdempotentConsumer {

    @Autowired
    private StringRedisTemplate redisTemplate;

    @RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)
    public void consumeMessage(String message, Channel channel, org.springframework.amqp.core.Message amqpMessage) {
      long deliveryTag = amqpMessage.getMessageProperties().getDeliveryTag();
      String messageId = amqpMessage.getMessageProperties().getMessageId(); // 假设消息 ID 从属性中获取

      try {
            // 检查是否已处理
            if (redisTemplate.opsForValue().setIfAbsent(messageId, "processed", 10, TimeUnit.MINUTES)) {
                // 模拟业务处理
                System.out.println("Processing unique message: " + message);

                // 手动确认消息
                channel.basicAck(deliveryTag, false);
            } else {
                System.out.println("Duplicate message detected, skipping: " + messageId);
                channel.basicAck(deliveryTag, false); // 确认消息,但不重复处理
            }
      } catch (Exception e) {
            try {
                // 拒绝并重新投递
                channel.basicNack(deliveryTag, false, true);
            } catch (Exception nackEx) {
                nackEx.printStackTrace();
            }
      }
    }
}
4.4 生产者实现

package com.example.producer;

import com.example.config.RabbitMQConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class RabbitMQProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMessage(String message, String messageId) {
      rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, RabbitMQConfig.ROUTING_KEY, message, msg -> {
            msg.getMessageProperties().setMessageId(messageId);
            return msg;
      });
      System.out.println("Message sent: " + message);
    }
}
五、验证与测试


[*]启动 RabbitMQ 服务和消费者。
[*]发送测试消息:RabbitMQProducer producer = new RabbitMQProducer();
producer.sendMessage("Test Message 1", "msg-123");
producer.sendMessage("Test Message 1", "msg-123"); // 发送相同的消息 ID

[*]预期结果:

[*]消息只会被消费一次。
[*]如果重复消息到达,将被辨认并跳过处理。

六、总结

RabbitMQ 消息重复消费是一个常见但可以有效办理的问题。通过手动确认消息、幂等性设计以及合理的网络和服务配置,可以显著减少消息重复消费带来的负面影响。本文通过 Java 示例展示了如何利用 Redis 和手动 ACK 实现幂等消费和重复消息的处理。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: rabbitmq消息的重复消费问题