ToB企服应用市场:ToB评测及商务社交产业平台

标题: rabbitmq消息的重复消费问题 [打印本页]

作者: 郭卫东    时间: 2025-1-4 11:09
标题: rabbitmq消息的重复消费问题

一、RabbitMQ 消息重复消费的成因

消息重复消费主要由以下缘故原由引起:

二、消息重复消费的影响


三、办理消息重复消费的策略

为了办理 RabbitMQ 消息重复消费的问题,可以从以下几方面入手:
3.1 确保消息的可靠传递

3.2 消费幂等性设计

3.3 网络和服务优化


四、RabbitMQ 消息重复消费的实践

以下通过 Java 示例展示如那边理 RabbitMQ 消息重复消费问题。
4.1 Maven 依靠

  1. <dependency>
  2.     <groupId>com.rabbitmq</groupId>
  3.     <artifactId>amqp-client</artifactId>
  4.     <version>5.16.0</version>
  5. </dependency>
  6. <dependency>
  7.     <groupId>org.springframework.boot</groupId>
  8.     <artifactId>spring-boot-starter-data-redis</artifactId>
  9. </dependency>
复制代码
4.2 配置 RabbitMQ

配置交换机、队列和绑定:

  1. package com.example.config;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. @Configuration
  6. public class RabbitMQConfig {
  7.     public static final String QUEUE_NAME = "test_queue";
  8.     public static final String EXCHANGE_NAME = "test_exchange";
  9.     public static final String ROUTING_KEY = "test_key";
  10.     @Bean
  11.     public Queue queue() {
  12.         return QueueBuilder.durable(QUEUE_NAME).build();
  13.     }
  14.     @Bean
  15.     public DirectExchange exchange() {
  16.         return new DirectExchange(EXCHANGE_NAME);
  17.     }
  18.     @Bean
  19.     public Binding binding(Queue queue, DirectExchange exchange) {
  20.         return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
  21.     }
  22. }
复制代码

4.3 消费者实现

1. 手动确认消息

  1. package com.example.consumer;
  2. import com.rabbitmq.client.Channel;
  3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  4. import org.springframework.stereotype.Service;
  5. import java.nio.charset.StandardCharsets;
  6. @Service
  7. public class RabbitMQConsumer {
  8.     @RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)
  9.     public void consumeMessage(String message, Channel channel, org.springframework.amqp.core.Message amqpMessage) {
  10.         long deliveryTag = amqpMessage.getMessageProperties().getDeliveryTag();
  11.         try {
  12.             // 模拟业务处理
  13.             System.out.println("Processing message: " + message);
  14.             // 手动确认消息
  15.             channel.basicAck(deliveryTag, false);
  16.         } catch (Exception e) {
  17.             try {
  18.                 // 拒绝并重新投递
  19.                 channel.basicNack(deliveryTag, false, true);
  20.             } catch (Exception nackEx) {
  21.                 nackEx.printStackTrace();
  22.             }
  23.         }
  24.     }
  25. }
复制代码
2. 添加幂等性处理

利用 Redis 记载消息唯一 ID:
  1. package com.example.consumer;
  2. import com.example.config.RabbitMQConfig;
  3. import com.rabbitmq.client.Channel;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.data.redis.core.StringRedisTemplate;
  6. import org.springframework.stereotype.Service;
  7. import java.util.concurrent.TimeUnit;
  8. @Service
  9. public class IdempotentConsumer {
  10.     @Autowired
  11.     private StringRedisTemplate redisTemplate;
  12.     @RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)
  13.     public void consumeMessage(String message, Channel channel, org.springframework.amqp.core.Message amqpMessage) {
  14.         long deliveryTag = amqpMessage.getMessageProperties().getDeliveryTag();
  15.         String messageId = amqpMessage.getMessageProperties().getMessageId(); // 假设消息 ID 从属性中获取
  16.         try {
  17.             // 检查是否已处理
  18.             if (redisTemplate.opsForValue().setIfAbsent(messageId, "processed", 10, TimeUnit.MINUTES)) {
  19.                 // 模拟业务处理
  20.                 System.out.println("Processing unique message: " + message);
  21.                 // 手动确认消息
  22.                 channel.basicAck(deliveryTag, false);
  23.             } else {
  24.                 System.out.println("Duplicate message detected, skipping: " + messageId);
  25.                 channel.basicAck(deliveryTag, false); // 确认消息,但不重复处理
  26.             }
  27.         } catch (Exception e) {
  28.             try {
  29.                 // 拒绝并重新投递
  30.                 channel.basicNack(deliveryTag, false, true);
  31.             } catch (Exception nackEx) {
  32.                 nackEx.printStackTrace();
  33.             }
  34.         }
  35.     }
  36. }
复制代码

4.4 生产者实现

  1. package com.example.producer;
  2. import com.example.config.RabbitMQConfig;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.stereotype.Service;
  6. @Service
  7. public class RabbitMQProducer {
  8.     @Autowired
  9.     private RabbitTemplate rabbitTemplate;
  10.     public void sendMessage(String message, String messageId) {
  11.         rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, RabbitMQConfig.ROUTING_KEY, message, msg -> {
  12.             msg.getMessageProperties().setMessageId(messageId);
  13.             return msg;
  14.         });
  15.         System.out.println("Message sent: " + message);
  16.     }
  17. }
复制代码

五、验证与测试


六、总结

RabbitMQ 消息重复消费是一个常见但可以有效办理的问题。通过手动确认消息、幂等性设计以及合理的网络和服务配置,可以显著减少消息重复消费带来的负面影响。本文通过 Java 示例展示了如何利用 Redis 和手动 ACK 实现幂等消费和重复消息的处理。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4