qidao123.com技术社区-IT企服评测·应用市场

标题: 9. RabbitMQ 消息队列幂等性,优先级队列,惰性队列的详细阐明 [打印本页]

作者: 南七星之家    时间: 2025-4-8 10:17
标题: 9. RabbitMQ 消息队列幂等性,优先级队列,惰性队列的详细阐明
9. RabbitMQ 消息队列幂等性,优先级队列,惰性队列的详细阐明

@
目录

1. RabbitMQ 消息队列的 “ 幂等性 ” 的题目

1.1 RabbitMQ 消息队列的“幂等性”的概念

用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用。举个最简单的例子:那就是付出,用户购买商品后付出,付出扣款成功,但是返回结果的时候网络非常,此时钱已经扣了,用户再次点击按钮,此时会进行第二次扣款,返回结果成功,用户查询余额发现多扣钱了,流水记载也酿成了两条。在从前的单应用系统中,我们只须要把数据操作放入到事务中即可,发生错误立即回滚,但是再响应客户端的时候也有可能出现网络中断或者非常等等。
消息消耗时的幂等性(消息不被重复消耗)
同一个消息,第一次吸收,正常处理业务,如果该消息第二次再吸收,那就不能再处理业务,否则就处理重复了;
幂等性是:对于一个资源,不管你请求一次还是请求多次,对该资源本身造成的影响应该是类似的,不能因为重复的请求而对该资源重复造成影响;
以接口幂等性举例:
接口幂等性是指: 一个接口用同样的参数反复调用,不会造成业务错误,那么这个接口就是具有幂等性的;
注册接口;
发送短信验证码接口; 比如同一个订单我付出两次,但是只会扣款一次,第二次付出不会扣款,这阐明这个付出接口是具有幂等性的;
如何避免消息的重复消耗题目?(消息消耗时的幂等性)
全局唯一 ID + Redis
生产者在发送消息时,为每条消息设置一个全局唯一的 messageId,消耗者拿到消息后,使用 setnx 下令,将 messageId 作为 key 放到 Redis 中:setnx(messageId, 1);
这里是利用 Redis 的一个 setnx 不可重复,原子性的特征。
消息重复消耗
消耗者在消耗 MQ 中的消息时,MQ 已把消息发送给消耗者,消耗者在给 MQ 返回 ack 时网络中断, 故 MQ 未收到确认信息,该条消息会重新发给其他的消耗者,或者在网络重连后再次发送给该消耗者,但 实际上该消耗者已成功消耗了该条消息,造成消耗者消耗了重复的消息。
解决思路
MQ 消耗者的幂等性的解决一样平常使用全局 ID 或者写个唯一标识比如时间戳 或者 UUID 或者订单消耗 者消耗 MQ 中的消息也可利用 MQ 的该 id 来判断,或者可按本身的规则天生一个全局唯一 id,每次消耗消 息时用该 id 先判断该消息是否已消耗过。
消耗端的幂等性保障
在海量订单天生的业务高峰期,生产端有可能就会重复发生了消息,这时候消耗端就要实现幂等性, 这就意味着我们的消息永远不会被消耗多次,即使我们收到了一样的消息。业界主流的幂等性有两种操作:
唯一 ID+指纹码机制
指纹码: 我们的一些规则或者时间戳加别的服务给到的唯一信息码,它并不一定是我们系统天生的,基 本都是由我们的业务规则拼接而来,但是一定要包管唯一性,然后就利用查询语句进行判断这个 id 是否存 在数据库中,优势就是实现简单就一个拼接,然后查询判断是否重复;劣势就是在高并发时,如果是单个数 据库就会有写入性能瓶颈当然也可以采用分库分表提拔性能,但也不是我们最保举的方式。
Redis 原子性
关于 Redis 的内容,感兴趣的大家可以移步至:✏️✏️✏️ Redis_ChinaRainbowSea的博客-CSDN博客
利用 redis 执行 setnx 下令,天然具有幂等性。从而实现不重复消耗
2. RabbitMQ 消息队列的 “ 优先级队列 ” 的题目

在我们系统中有一个订单催付 的场景,我们的客户在天猫下的订单,淘宝会及时将订单推送给我们,如果在用户设定的时间内未付款那么就会给用户推送一条短信提示,很简单的一个功能对吧,但是 , tmall 商家对我们来说,肯定是要分为大客户小客户 的对吧,比如像苹果,小米这样大商家一年起码能给我们创造很大的利润,所以理应当然,它们的订单必须得到优先处理,而曾经我们的后端系统是使用 Redis 来存放的定时轮询,大家都知道 Redis 只能用 List 做一个简简单单的消息队列,并不能实现一个优先级的场景,所以订单量大了后采用  RabbitMQ 进行改造和优化,如果发现是大客户的订单给一个相对比较高的优先级,否则就是默认的优先级。
如何添加:



  1. Map<String, Object> params = new HashMap();
  2. params.put("x-max-priority", 10);
  3. channel.queueDeclare("hello", true, false, false, params);
复制代码

  1. AMQP.BasicProperties  properties = new  AMQP.BasicProperties().builder().priority(5).build();
复制代码
要让队列实现优先级须要做的事变有如下事变:队列须要设置为优先级队列,消息须要设置消息的优先级,消耗者须要等待消息已经发送到队列中才去消耗,因为:这样才有时机对消息进行排序
简单的来说就是:我们须要让生产者(含有优先级)发送的消息,必须让其存放在队列当中,并且要是到达一定的量,让其在队列当中进行可以有一个时间进行一个(根据优先级上进行一个排序),否则,如果只有一个,就不须要将队列进行排序,因为只有一个就不存在一个优先级的题目。只有存在多个的时候,这样才有时机对消息进行一个排序。
实战:消息队列的“优先级队列” 的具体代码实现:
  1. package com.rainbowsea.rabbitmq.nine;
  2. import com.rabbitmq.client.AMQP;
  3. import com.rabbitmq.client.Channel;
  4. import com.rainbowsea.rabbitmq.utils.RabbitMQUtils;
  5. public class Producer {
  6.     private static final String QUEUE_NAME = "hello";
  7.     public static void main(String[] args) throws Exception {
  8.         try (Channel channel = RabbitMQUtils.getChannel();) {
  9. //给消息赋予一个 priority 属性
  10.             AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(5).build();
  11.             for (int i = 1; i < 11; i++) {
  12.                 String message = "info" + i;
  13.                 if (i == 5) {
  14.                     channel.basicPublish("", QUEUE_NAME, properties, message.getBytes());
  15.                 } else {
  16.                     channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
  17.                 }
  18.                 System.out.println("发送消息完成:" + message);
  19.             }
  20.         }
  21.     }
  22. }
复制代码
  1. package com.rainbowsea.rabbitmq.nine;
  2. import com.rabbitmq.client.AMQP;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.DeliverCallback;
  5. import com.rainbowsea.rabbitmq.utils.RabbitMQUtils;
  6. import java.util.HashMap;
  7. import java.util.Map;
  8. public class Consumer {
  9.     private static final String QUEUE_NAME = "hello";
  10.     public static void main(String[] args) throws Exception {
  11.         Channel channel = RabbitMQUtils.getChannel();
  12. //设置队列的最大优先级  最大可以设置到 255 官网推荐 1-10 如果设置太高比较吃内存和 CPU
  13.         Map<String, Object> params = new HashMap();
  14.         params.put("x-max-priority", 10);
  15.         channel.queueDeclare(QUEUE_NAME, true, false, false, params);
  16.         System.out.println("消费者启动等待消费......");
  17.         DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  18.             String receivedMessage = new String(delivery.getBody());
  19.             System.out.println("接收到消息:" + receivedMessage);
  20.         };
  21.         channel.basicConsume(QUEUE_NAME, true, deliverCallback, (consumerTag) -> {
  22.             System.out.println("消费者无法消费消息时调用,如队列被删除");
  23.         });
  24.     }
  25. }
复制代码
2.1 创建交换机

exchange.test.priority

2.2 创建队列

queue.test.priority
x-max-priority


2.3 队列绑定交换机



2.4 RabbitMQ 结合 Spring Boot (分模块微服务)实现 “优先级队列”

生产者发送消息
1. 配置POM
  1. <parent>
  2.     <groupId>org.springframework.boot</groupId>
  3.     <artifactId>spring-boot-starter-parent</artifactId>
  4.     <version>3.1.5</version>
  5. </parent>
  6. <dependencies>
  7.     <dependency>
  8.         <groupId>org.springframework.boot</groupId>
  9.         <artifactId>spring-boot-starter-amqp</artifactId>
  10.     </dependency>
  11.     <dependency>
  12.         <groupId>org.springframework.boot</groupId>
  13.         <artifactId>spring-boot-starter-test</artifactId>
  14.     </dependency>
  15.     <dependency>
  16.         <groupId>org.projectlombok</groupId>
  17.         <artifactId>lombok</artifactId>
  18.     </dependency>
  19. </dependencies>
复制代码
2. 配置 YAML 也可以使用 properties
  1. spring:
  2.   rabbitmq:
  3.     host: 192.168.200.100
  4.     port: 5672
  5.     username: guest
  6.     password: 123456
  7.     virtual-host: /
复制代码
3. 主启动类
  1. package com.rainbowsea.mq;
  2. import org.springframework.boot.SpringApplication;
  3. import org.springframework.boot.autoconfigure.SpringBootApplication;
  4. @SpringBootApplication
  5. public class RabbitMQPriorityProducer {
  6.     public static void main(String[] args) {
  7.         SpringApplication.run(RabbitMQPriorityProducer.class, args);
  8.     }
  9. }
复制代码
4. 发送消息
第一次发送优先级为1的消息
  1. package com.rainbowsea.mq.test;
  2. import jakarta.annotation.Resource;
  3. import org.junit.jupiter.api.Test;
  4. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  5. import org.springframework.boot.test.context.SpringBootTest;
  6. @SpringBootTest
  7. public class RabbitMQTest {
  8.     public static final String EXCHANGE_PRIORITY = "exchange.test.priority";
  9.     public static final String ROUTING_KEY_PRIORITY = "routing.key.test.priority";
  10.     @Resource
  11.     private RabbitTemplate rabbitTemplate;
  12.     @Test
  13.     public void testSendMessage() {
  14.         rabbitTemplate.convertAndSend(EXCHANGE_PRIORITY, ROUTING_KEY_PRIORITY, "I am a message with priority 1.", message->{
  15.             message.getMessageProperties().setPriority(1);
  16.             return message;
  17.         });
  18.     }
  19. }
复制代码
第二次发送优先级为2的消息
  1. @Test
  2. public void testSendMessage() {
  3.     rabbitTemplate.convertAndSend(EXCHANGE_PRIORITY, ROUTING_KEY_PRIORITY, "I am a message with priority 2.", message->{
  4.         message.getMessageProperties().setPriority(2);
  5.         return message;
  6.     });
  7. }
复制代码
第三次发送优先级为3的消息
  1. @Test
  2. public void testSendMessage() {
  3.     rabbitTemplate.convertAndSend(EXCHANGE_PRIORITY, ROUTING_KEY_PRIORITY, "I am a message with priority 3.", message->{
  4.         message.getMessageProperties().setPriority(3);
  5.         return message;
  6.     });
  7. }
复制代码

消耗端吸收消息:
  1. <parent>
  2.     <groupId>org.springframework.boot</groupId>
  3.     <artifactId>spring-boot-starter-parent</artifactId>
  4.     <version>3.1.5</version>
  5. </parent>
  6. <dependencies>
  7.     <dependency>
  8.         <groupId>org.springframework.boot</groupId>
  9.         <artifactId>spring-boot-starter-amqp</artifactId>
  10.     </dependency>
  11.     <dependency>
  12.         <groupId>org.springframework.boot</groupId>
  13.         <artifactId>spring-boot-starter-web</artifactId>
  14.     </dependency>
  15.     <dependency>
  16.         <groupId>org.projectlombok</groupId>
  17.         <artifactId>lombok</artifactId>
  18.     </dependency>
  19. </dependencies>
复制代码
  1. spring:
  2.   rabbitmq:
  3.     host: 192.168.200.100
  4.     port: 5672
  5.     username: guest
  6.     password: 123456
  7.     virtual-host: /
复制代码
3. 主启动类
  1. package com.rainbowsea.mq;
  2. import org.springframework.boot.SpringApplication;
  3. import org.springframework.boot.autoconfigure.SpringBootApplication;
  4. @SpringBootApplication
  5. public class RabbitMQPriorityConsumer {
  6.     public static void main(String[] args) {
  7.         SpringApplication.run(RabbitMQPriorityConsumer.class, args);
  8.     }
  9. }
复制代码
4.监听器
  1. package com.rainbowsea.mq.listener;
  2. import com.rabbitmq.client.Channel;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.springframework.amqp.core.Message;
  5. import org.springframework.amqp.rabbit.annotation.*;
  6. import org.springframework.stereotype.Component;
  7. @Slf4j
  8. @Component
  9. public class MyMessageProcessor {
  10.     public static final String QUEUE_PRIORITY = "queue.test.priority";
  11.     @RabbitListener(queues = {QUEUE_PRIORITY})
  12.     public void processPriorityMessage(String data, Message message, Channel channel) throws IOException {
  13.         log.info(data);
  14.         channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  15.     }
  16. }
复制代码
5. 测试效果
对于已经滞留服务器的消息,只要消耗端一启动,就能够收到消息队列的投递,打印效果如下:

3. RabbitMQ 消息队列的 “ 惰性队列 ” 的题目

3.1  RabbitMQ 消息队列 “ 惰性队列 ”的概念

“惰性队列” 的应用场景:
RabbitMQ 从 3.6.0 版本开始引入了惰性队列的概念。惰性队列会尽可能的将消息存入磁盘中,而在消 费者消耗到相应的消息时才会被加载到内存中,它的一个重要的计划目的是能够支持更长的队列,即支持 更多的消息存储。当消耗者由于各种各样的原因(比如消耗者下线、宕机亦或者是由于维护而关闭等)而致 使长时间内不能消耗消息造成堆积时,惰性队列就很有须要了。
​      默认情况下,当生产者将消息发送到 RabbitMQ 的时候,队列中的消息会尽可能的存储在内存之中, 这样可以更加速速的将消息发送给消耗者。即使是持久化的消息,在被写入磁盘的同时也会在内存中驻留 一份备份。当 RabbitMQ 须要释放内存的时候,会将内存中的消息换页至磁盘中,这个操作会泯灭较长的 时间,也会阻塞队列的操作,进而无法吸收新的消息。虽然 RabbitMQ 的开发者们一直在升级相关的算法, 但是效果始终不太理想,尤其是在消息量特殊大的时候。
官网阐明

队列可以创建为默认或惰性模式,模式指定方式是:
如果策略和队列参数同时指定,那么队列参数有更高优先级。如果队列模式是在声明时通过可选参数指定的,那么只能通过删除队列再重新创建来修改。
3.2  基于策略方式设定
  1. # 登录Docker容器
  2. docker exec -it rabbitmq /bin/bash
  3. # 运行rabbitmqctl命令
  4. rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues
复制代码
下令解读:
如果须要修改队列模式可以执行如下下令(不必删除队列再重建):
  1. rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"default"}' --apply-to queues
复制代码
3.3 在声明队列时使用参数设定

Java代码原生API设置方式:
  1. Map<String, Object> args = new HashMap<String, Object>();
  2. args.put("x-queue-mode", "lazy");
  3. channel.queueDeclare("myqueue", false, false, false, args);
复制代码
Java代码注解设置方式:
  1. @Queue(value = QUEUE_NAME, durable = "true", autoDelete = "false", arguments = {
  2.         @Argument(name = "x-queue-mode", value = "lazy")
  3. })
复制代码
队列具备两种模式:default 和 lazy。默认的为 default 模式,在 3.6.0 之前的版本无需做任何变更。lazy 模式即为惰性队列的模式,可以通过调用 channel.queueDeclare 方法的时候在参数中设置,也可以通过 Policy 的方式设置,如果一个队列同时使用这两种方式设置的话,那么 Policy 的方式具备更高的优先级。 如果要通过声明的方式改变已有队列的模式的话,那么只能先删除队列,然后再重新声明一个新的。
在队列声明的时候可以通过 “x-queue-mode” 参数来设置队列的模式,取值为“default”和 “lazy” 。下面示 例中演示了一个惰性队列的声明细节:
  1. Map<String, Object> args = new HashMap<String, Object>();
  2. args.put("x-queue-mode", "lazy");
  3. channel.queueDeclare("myqueue", false, false, false, args);
复制代码
内存开销对比:

在发送 1 百万条消息,每条消息大概占 1KB 的情况下,普通队列占用内存是 1.2GB,而惰性队列仅仅占用 1.5MB
3.4 实操演练

生产者端代码
配置POM
  1.     <parent>
  2.         <groupId>org.springframework.boot</groupId>
  3.         <artifactId>spring-boot-starter-parent</artifactId>
  4.         <version>3.1.5</version>
  5.     </parent>
  6.     <dependencies>
  7.         <dependency>
  8.             <groupId>org.springframework.boot</groupId>
  9.             <artifactId>spring-boot-starter-amqp</artifactId>
  10.         </dependency>
  11.         <dependency>
  12.             <groupId>org.springframework.boot</groupId>
  13.             <artifactId>spring-boot-starter-test</artifactId>
  14.         </dependency>
  15.         <dependency>
  16.             <groupId>org.projectlombok</groupId>
  17.             <artifactId>lombok</artifactId>
  18.         </dependency>
  19.     </dependencies>
复制代码
配置YAML
  1. spring:
  2.   rabbitmq:
  3.     host: 192.168.200.100
  4.     port: 5672
  5.     username: guest
  6.     password: 123456
  7.     virtual-host: /
复制代码
主启动类
  1. package com.rainbowsea.mq;
  2. import org.springframework.boot.SpringApplication;
  3. import org.springframework.boot.autoconfigure.SpringBootApplication;
  4. @SpringBootApplication
  5. public class RabbitMQLazyProducer {
  6.     public static void main(String[] args) {
  7.         SpringApplication.run(RabbitMQLazyProducer.class, args);
  8.     }
  9. }
复制代码
发送消息
  1. package com.rainbowsea.mq.test;
  2. import jakarta.annotation.Resource;
  3. import org.junit.jupiter.api.Test;
  4. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  5. import org.springframework.boot.test.context.SpringBootTest;
  6. @SpringBootTest
  7. public class RabbitMQTest {
  8.     public static final String EXCHANGE_LAZY_NAME = "exchange.rainbowsea.lazy";
  9.     public static final String ROUTING_LAZY_KEY = "routing.key.rainbowsea.lazy";
  10.     @Resource
  11.     private RabbitTemplate rabbitTemplate;
  12.     @Test
  13.     public void testSendMessage() {
  14.         rabbitTemplate.convertAndSend(EXCHANGE_LAZY_NAME, ROUTING_LAZY_KEY, "I am a message for test lazy queue.");
  15.     }
  16. }
复制代码
消耗者端代码
配置POM
  1.     <parent>
  2.         <groupId>org.springframework.boot</groupId>
  3.         <artifactId>spring-boot-starter-parent</artifactId>
  4.         <version>3.1.5</version>
  5.     </parent>
  6.     <dependencies>
  7.         <dependency>
  8.             <groupId>org.springframework.boot</groupId>
  9.             <artifactId>spring-boot-starter-amqp</artifactId>
  10.         </dependency>
  11.         <dependency>
  12.             <groupId>org.springframework.boot</groupId>
  13.             <artifactId>spring-boot-starter-web</artifactId>
  14.         </dependency>
  15.         <dependency>
  16.             <groupId>org.projectlombok</groupId>
  17.             <artifactId>lombok</artifactId>
  18.         </dependency>
  19.     </dependencies>
复制代码
配置YAML
  1. spring:
  2.   rabbitmq:
  3.     host: 192.168.200.100
  4.     port: 5672
  5.     username: guest
  6.     password: 123456
  7.     virtual-host: /
复制代码
主启动类
  1. package com.rainbowsea.mq;
  2. import org.springframework.boot.SpringApplication;
  3. import org.springframework.boot.autoconfigure.SpringBootApplication;
  4. @SpringBootApplication
  5. public class RabbitMQLazyConsumerMainType {
  6.     public static void main(String[] args) {
  7.         SpringApplication.run(RabbitMQLazyConsumerMainType.class, args);
  8.     }
  9.    
  10. }
复制代码
监听器
  1. package com.rainbowsea.mq.listener;
  2. import com.rabbitmq.client.Channel;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.springframework.amqp.core.Message;
  5. import org.springframework.amqp.rabbit.annotation.*;
  6. import org.springframework.stereotype.Component;
  7. @Component
  8. @Slf4j
  9. public class MyLazyMessageProcessor {
  10.     public static final String EXCHANGE_LAZY_NAME = "exchange.rainbowsea.lazy";
  11.     public static final String ROUTING_LAZY_KEY = "routing.key.rainbowsea.lazy";
  12.     public static final String QUEUE_LAZY_NAME = "queue.rainbowsea.lazy";
  13.     @RabbitListener(bindings = @QueueBinding(
  14.         value = @Queue(value = QUEUE_LAZY_NAME, durable = "true", autoDelete = "false", arguments = {
  15.             @Argument(name = "x-queue-mode", value = "lazy")
  16.         }),
  17.         exchange = @Exchange(value = EXCHANGE_LAZY_NAME, durable = "true", autoDelete = "false"),
  18.         key = {ROUTING_LAZY_KEY}
  19.     ))
  20.     public void processMessageLazy(String data, Message message, Channel channel) {
  21.         log.info("消费端接收到消息:" + data);
  22.     }
  23. }
复制代码
测试:

4. 最后:

“在这个最后的篇章中,我要表达我对每一位读者的感激之情。你们的关注和复兴是我创作的动力源泉,我从你们身上吸取了无尽的灵感与勇气。我会将你们的鼓励留在心底,继续在其他的领域奋斗。感谢你们,我们总会在某个时刻再次相遇。”


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 qidao123.com技术社区-IT企服评测·应用市场 (https://dis.qidao123.com/) Powered by Discuz! X3.4