RabbitMQ三、springboot整合rabbitmq(消息可靠性、高级特性) ...

打印 上一主题 下一主题

主题 675|帖子 675|积分 2025

一、springboot整合RabbitMQ(jdk17)(创建两个项目,一个生产者项目,一个消费者项目)


  • 上面利用原生JAVA操作RabbitMQ较为繁琐,很多的代码都是重复誊写的,利用springboot可以简化代码的编写。
生产者项目


第一步:创建springboot工程,然后引入rabbitmq的依赖

  1. <!-- RabbitMQ起步依赖 -->
  2. <dependency>
  3.   <groupId>org.springframework.boot</groupId>
  4.   <artifactId>spring-boot-starter-amqp</artifactId>
  5. </dependency>
复制代码
第二步:编写配置文件

  1. spring:
  2.   rabbitmq:
  3.    host: 192.168.70.130  # 虚拟机的地址
  4.    port: 5672
  5.    username: admin
  6.    password: admin
  7.    virtual-host: /
  8. #日志格式
  9. logging:
  10.   pattern:
  11.    console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'
复制代码
第三步:编写RabbitMQ的配置类

  1. @Configuration
  2. public class RabbitmqConfig1 {
  3.     private final String EXCHANGE_NAME = "boot_exchange";
  4.     private final String QUEUE_NAME = "boot_queue";
  5.     private final String ROUTE_NAME = "boot_route";
  6.     //创建交换机
  7.     @Bean(EXCHANGE_NAME)
  8.     public Exchange getExchange(){
  9.         return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
  10.     }
  11.     //创建队列
  12.     @Bean(QUEUE_NAME)
  13.     public Queue getQueue(){
  14.         return new Queue(QUEUE_NAME);
  15.     }
  16.     //交换机和队列绑定
  17.     @Bean
  18.     public Binding exchangeBindQueue(@Qualifier(QUEUE_NAME) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange){
  19.         return BindingBuilder.bind(queue).to(exchange).with(ROUTE_NAME).noargs();
  20.     }
  21. }
复制代码
第四步:编写发送消息测试类

  1. //编写发送消息测试类
  2. @SpringBootTest
  3. public class RabbitmqTest {
  4.     // 注入RabbitTemplate工具类
  5.     @Autowired
  6.     private RabbitTemplate rabbitTemplate;
  7.     @Test
  8.     public void testSendMessage(){
  9.         /**
  10.          * 发送消息
  11.          * 参数1:交换机
  12.          * 参数2:路由key
  13.          * 参数3:要发送的消息
  14.          */
  15.         rabbitTemplate.convertAndSend("boot_exchange","boot_route","你好我有一个毛衫");
  16.         System.out.println("发送消息成功");
  17.     }
  18. }
复制代码
消费者项目


第一步:创建springboot工程,然后引入rabbitmq的依赖

  1. <!-- RabbitMQ起步依赖 -->
  2. <dependency>
  3.   <groupId>org.springframework.boot</groupId>
  4.   <artifactId>spring-boot-starter-amqp</artifactId>
  5. </dependency>
复制代码
第二步:编写配置文件

  1. spring:
  2.   rabbitmq:
  3.    host: 192.168.70.130  # 虚拟机的地址
  4.    port: 5672
  5.    username: admin
  6.    password: admin
  7.    virtual-host: /
  8. #日志格式
  9. logging:
  10.   pattern:
  11.    console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'
复制代码
第三步:编写消费者,监听队列

  1. @Component
  2. public class Consumer1 {
  3.     /**
  4.      * 监听队列
  5.      * @param message
  6.      * queues表示监听的队列的名称
  7.      */
  8.     @RabbitListener(queues = "boot_queue")
  9.     public void listener(String message){
  10.         System.out.println("接受到消息 = " + message);
  11.     }
  12. }
复制代码
4、rabbitmq的消息可靠性


  • RabbitMQ消息投递的路径为:
    生产者--->交换机--->队列--->消费者
  • 在RabbitMQ工作的过程中,每个环节消息都大概传递失败,那么RabbitMQ是如何监听消息是否成功投递的呢?


      • 确认模式(confirm):可以监听消息是否从生产者成功传递到交换机。


      • 退回模式(return):可以监听消息是否从交换机成功传递到队列。


      • 消费者消息确认(Consumer Ack):可以监听消费者是否成功处理消息。


【一】rabbitmq的消息可靠性——确认模式


  • 确认模式(confirm):可以监听消息是否从生产者成功传递到交换机。
  • 创建一个新的生产者项目,导入mq(上面的第一步操作)依赖进行开发:(也可以在原来的基础上修改信息)

    • 代码组成和上面的生产者项目是一样的,也是三步内容。

第一步:修改配置文件

只是添加了一句代码

  1. spring:
  2.   rabbitmq:
  3.     host: 192.168.70.130
  4.     port: 5672
  5.     username: admin
  6.     password: admin
  7.     virtual-host: / # 表示使用默认的virtual-host
  8.     #开启确认模式
  9.     publisher-confirm-type: correlated
  10. #????
  11. logging:
  12.   pattern:
  13.     console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'
复制代码
第二步:在生产者的配置类创建交换机和队列(RabbitMQ的配置类)

  1. @Configuration
  2. public class RabbitmqConfig2Confirm {
  3.     public final String EXCHANGE_NAME = "confirm_exchange";
  4.     public final String QUEUE_NAME = "confirm_queue";
  5.     public final String ROUTING_NAME = "confirm_routing";
  6.    
  7. //    创建交换机
  8.     @Bean(EXCHANGE_NAME)
  9.     public Exchange exchange(){
  10.         return ExchangeBuilder
  11.                 .topicExchange(EXCHANGE_NAME)
  12.                 .durable(true)
  13.                 .build();
  14.     }
  15. //    创建队列
  16.     @Bean(QUEUE_NAME)
  17.     public Queue queue(){
  18.         return QueueBuilder
  19.                 .durable(QUEUE_NAME)
  20.                 .build();
  21.     }
  22. //    创建交换机和队列绑定
  23.     @Bean
  24.     public Binding exchangeBindQueue(@Qualifier(QUEUE_NAME) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange){
  25.         return BindingBuilder
  26.                 .bind(queue).
  27.                 to(exchange)
  28.                 .with(ROUTING_NAME)
  29.                 .noargs();
  30.     }
  31. }
复制代码
第三步:编写测试类发生消息:生产者界说确认模式的回调方法(springboot的测试类,能够加载到第二步的配置类)

  1. @Test
  2.     void testConfirm() {
  3.         //回调确认
  4.         rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
  5.             /**
  6.              *
  7.              * @param correlationData 配置信息
  8.              * @param b 是否成功,true 是 ,false 否
  9.              * @param s 失败原因
  10.              */
  11.             @Override
  12.             public void confirm(CorrelationData correlationData, boolean b, String s) {
  13.                 if (b){
  14.                     System.out.println("发送成功");
  15.                 }else{
  16.                     System.out.println("发送失败,原因:"+s);
  17.                 }
  18.             }
  19.         });
  20.         //发送消息
  21.           /**
  22.          * 发送消息
  23.          * 参数1:交换机
  24.          * 参数2:路由key
  25.          * 参数3:要发送的消息
  26.          */
  27.          rabbitTemplate.convertAndSend("confirm_exchange","confirm_routing","send message...confirm");
  28.     }
复制代码
由于rabbitmq的confirm确认模式是确认消息是否从生产者成功传递到交换机的,所以就没必要写消费者进行信息的消费了


  • 当我们实行测试类的时间,先实行rabbitTemplate.convertAndSend(“confirm_exchange”,“confirm_routing”,“send message…confirm”);,无论消息是否成功发送都会调用 rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback()方法,假如发送成功则实行if语句的代码,假如发送失败则调用else语句的代码。

    • 根据实行的是if或者else的语句,就能判断消息是否成功传递到交换机了。

【二】rabbitmq的消息可靠性——退回模式


  • 退回模式(return):可以监听消息是否从交换机成功传递到队列。
  • 创建一个新的生产者项目,导入mq(上面的第一步操作)依赖进行开发:(也可以在原来的基础上修改信息)

    • 代码组成和上面的生产者项目是一样的,也是三步内容。

第一步:修改配置文件

只是添加了一句

  1. # rabbitmq???
  2. spring:
  3.   rabbitmq:
  4.     host: 192.168.70.130
  5.     port: 5672
  6.     username: admin
  7.     password: admin
  8.     virtual-host: /
  9.     #开启确认模式
  10.     publisher-confirm-type: correlated
  11.     #开始回退模式
  12.     publisher-returns: true
  13. #????
  14. logging:
  15.   pattern:
  16.     console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'
复制代码
第二步:编写配置类(RabbitMQ的配置类)

  1. @Configuration
  2. public class RabbitmqConfig3Return {
  3.     public final String EXCHANGE_NAME = "return_exchange";
  4.     public final String QUEUE_NAME = "return_queue";
  5.     public final String ROUTING_NAME = "return_routing";
  6. //    创建交换机
  7.     @Bean(EXCHANGE_NAME)
  8.     public Exchange exchange(){
  9.         return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
  10.     }
  11. //    创建队列
  12.     @Bean(QUEUE_NAME)
  13.     public Queue queue(){
  14.         return QueueBuilder.durable(QUEUE_NAME).build();
  15.     }
  16. //    创建交换机和队列绑定
  17.     @Bean
  18.     public Binding exchangeBindQueue(@Qualifier(QUEUE_NAME) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange){
  19.         return BindingBuilder
  20.                 .bind(queue)
  21.                 .to(exchange)
  22.                 .with(ROUTING_NAME)
  23.                 .noargs();
  24.     }
  25. }
复制代码
第三步:编写测试类发生消息:生产者界说退回模式的回调方法(springboot的测试类,能够加载到第二步的配置类)

  1. @Test
  2.     void testReturnSendMessage(){
  3. //        调用回退模式的回调方法,只有失败才会回调,成功不会回调哦
  4. // 失败后将失败信息封装到参数中
  5.         rabbitTemplate.setReturnsCallback(returned ->{
  6.             Message message = returned.getMessage();
  7.             System.out.println("消息对象:"+message);
  8.             System.out.println("错误码:"+returned.getReplyCode());
  9.             System.out.println("错误信息:"+returned.getReplyText());
  10.             System.out.println("交换机:"+returned.getExchange());
  11.             System.out.println("路由键:"+returned.getRoutingKey());
  12.         });
  13. //        发送消息
  14.            /**
  15.          * 发送消息
  16.          * 参数1:交换机
  17.          * 参数2:路由key
  18.          * 参数3:要发送的消息
  19.          */
  20.          rabbitTemplate.convertAndSend("return_exchange","return_routing","send message...return");
  21.     }
复制代码
由于rabbitmq的return回退模式是确认消息是否从交换机成功传递到队列的,还没有传递到消费者,所以就没必要写消费者进行信息的消费了


  • 当我们实行测试类的时间,先实行rabbitTemplate.convertAndSend(“return_exchange”,“return_routing”,“send message…return”);,假如消息成功发送到队列上则不会调用 rabbitTemplate.setReturnsCallback方法,假如发送步成功则调用回调方法rabbitTemplate.setReturnsCallback,

    • 根据运行结果就可以知道在传递消息到队列上的时间哪里发生错误了


【三】rabbitmq的消息可靠性——消费者消息确认(Consumer Ack)


  • 在RabbitMQ中,消费者接收到消息后会向队列发送确认签收的消息,只有确认签收的消息才会被移除队列。这种机制称为消费者消息确认(Consumer Acknowledge,简称Ack)

    • 类似快递员派送快递也需要我们签收,否则一直存在于快递公司的体系中。

  • 消费者消息确认(Consumer Acknowledge,简称Ack)分为自动确认和手动确认。

    • 自动确认指消息只要被消费者接收到,无论是否成功处理消息,则自动签收,并将消息从队列中移除。
    • 但是在实际开发中,收到消息后大概业务处理出现非常,那么消息就会丢失。此时需要设置手动签收,即在业务处理成功后再关照签收消息,假如出现非常,则拒签消息,让消息依然保存在队列当中。

● 自动确认:spring.rabbitmq.listener.simple.acknowledge=“none”
● 手动确认:spring.rabbitmq.listener.simple.acknowledge=“manual”

  • 创建一个新的生产者项目和新的消费者项目,导入mq(上面的第一步操作)依赖进行开发:(也可以在原来的基础上修改信息)

    • 代码组成和上面的生产者项目是一样的,也是三步内容。

生产者项目:第一步:修改配置文件

不用修改
  1. # rabbitmq???
  2. spring:
  3.   rabbitmq:
  4.     host: 192.168.70.130
  5.     port: 5672
  6.     username: admin
  7.     password: admin
  8.     virtual-host: /
  9.     #开启确认模式
  10.     publisher-confirm-type: correlated
  11.     #开始回退模式
  12.     publisher-returns: true
  13. #????
  14. logging:
  15.   pattern:
  16.     console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'
复制代码
生产者项目:第二步:编写配置类(RabbitMQ的配置类)

  1. @Configuration
  2. public class RabbitmqConfig4ACK {
  3.     public final String EXCHANGE_NAME = "ack_exchange";
  4.     public final String QUEUE_NAME = "ack_queue";
  5.     public final String ROUTING_NAME = "ack_routing";
  6. //    创建交换机
  7.     @Bean(EXCHANGE_NAME)
  8.     public Exchange exchange(){
  9.         return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
  10.     }
  11. //    创建队列
  12.     @Bean(QUEUE_NAME)
  13.     public Queue queue(){
  14.         return QueueBuilder.durable(QUEUE_NAME).build();
  15.     }
  16. //    创建交换机和队列绑定
  17.     @Bean
  18.     public Binding exchangeBindQueue(@Qualifier(QUEUE_NAME) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange){
  19.         return BindingBuilder
  20.                 .bind(queue)
  21.                 .to(exchange)
  22.                 .with(ROUTING_NAME)
  23.                 .noargs();
  24.     }
  25. }
复制代码
生产者项目:第三步:编写测试类发生消息:(springboot的测试类,能够加载到第二步的配置类)

  1. @Test
  2.     void testAck(){
  3.         //        发送消息
  4.         rabbitTemplate.convertAndSend("ack_exchange","ack_routing","send message...ack");
  5.     }
复制代码
消费者项目(自动确认):第一步:修改配置文件



  • 消费者消息确认——自动确认的配置文件
  1. spring:
  2.   rabbitmq:
  3.     host: 192.168.70.130
  4.     port: 5672
  5.     username: admin
  6.     password: admin
  7.     virtual-host: /
  8.     #开动手动签收
  9.     listener:
  10.       simple:
  11.         acknowledge-mode: none   # 默认就是自动确认
  12. #????
  13. logging:
  14.   pattern:
  15.     console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'
复制代码



  • 自动签收模式就是:消息只要被消费者接收到,无论是否成功处理消息,则自动签收,并将消息从队列中移除。当我们拿到消息的时间,业务出现非常了,所以无法精确处理消息,导致消息丢失了。
消费者项目(自动确认):第二步:编写消费者类,监听队列



  • 自动确认的消费者类
  1. @Component
  2. public class AckConsumer {
  3. //    自动签收
  4.     @RabbitListener(queues = "ack_queue")
  5.     public void listenMessage(Message message, Channel channel) throws IOException, InterruptedException, IOException {
  6. //        获取消息
  7.         String s = new String(message.getBody(), StandardCharsets.UTF_8);
  8.         System.out.println(s);
  9. //        TODO,处理事务
  10. //        故意出错
  11.         int i= 1/0;
  12.     }
  13. }
复制代码
消费者项目(手动确认):第一步:修改配置文件



  • 消费者消息确认——手动确认的配置文件
  1. spring:
  2.   rabbitmq:
  3.     host: 192.168.70.130
  4.     port: 5672
  5.     username: admin
  6.     password: admin
  7.     virtual-host: /
  8.     #开动手动签收
  9.     listener:
  10.       simple:
  11.         acknowledge-mode: manual  
  12. #????
  13. logging:
  14.   pattern:
  15.     console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'
复制代码
消费者项目(手动确认):第二步:编写消费者类,监听队列



  • 手动确认
  1. @Component
  2. public class AckConsumer {
  3.     //    手动签收
  4.     @RabbitListener(queues = "ack_queue")
  5.     public void listenMessage(Message message, Channel channel) throws IOException, InterruptedException, IOException {
  6. // 消息投递序号,消息每次投递该值都会+1
  7.         long deliveryTag = message.getMessageProperties().getDeliveryTag();
  8.         try {
  9. //            int i = 1/0; //模拟处理消息出现bug
  10.             System.out.println("成功接受到消息:"+message);
  11.             // 签收消息
  12.             /**
  13.              * 参数1:消息投递序号
  14.              * 参数2:是否一次可以签收多条消息
  15.              */
  16.             channel.basicAck(deliveryTag,true);
  17.         }catch (Exception e){
  18.             System.out.println("消息消费失败!");
  19.             Thread.sleep(2000);
  20.             // 拒签消息
  21.             /**
  22.              * 参数1:消息投递序号
  23.              * 参数2:是否一次可以拒签多条消息
  24.              * 参数3:拒签后消息是否重回队列
  25.              */
  26.             channel.basicNack(deliveryTag,true,true);
  27.         }
  28.     }
  29. }
复制代码




  • 手动签收模式就是:假如出现非常,则拒签消息,让消息依然保存在队列当中。方便下次请求能够请求到这次因为非常而没有接收到的消息。
【四】RabbitMQ高级特性——消费端限流




  • 前面说过MQ可以对请求进行“削峰填谷”,即通过消费端限流的方式限制消息的拉取速率,达到保护消费端的目的。
  • 利用【三】rabbitmq的消息可靠性——消费者消息确认(Consumer Ack)的项目,消费者利用手动确认模式的代码即可(但是要修改配置文件)
第一步:先在生产者项目中,发送多个消息

  1. @Test
  2.     public void testLimitSendBatch() {
  3.         // 发送十条消息
  4.         for (int i = 0; i < 10; i++) {
  5.             rabbitTemplate.convertAndSend("ack_exchange", "ack_routing", "这是第"+i+"条消息");
  6.         }
  7.     }
复制代码
第二步:修改消费者项目的配置文件

最重要就是配置文件的修改:

  1. spring:
  2.   rabbitmq:
  3.     host: 192.168.70.130
  4.     port: 5672
  5.     username: admin
  6.     password: admin
  7.     virtual-host: /
  8.     #开动手动签收
  9.     listener:
  10.       simple:
  11.         acknowledge-mode: manual  #none是默认的
  12.         prefetch: 5  # 每次消费者从队列拉取的消息数量(限制)
  13. #????
  14. logging:
  15.   pattern:
  16.     console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'
复制代码
第三步:重新编写消费者类

  1. @Component
  2. public class ConsumerLimit {
  3. //    手动签收
  4.     @RabbitListener(queues = "limit_queue")
  5.     public void listenMessage(Message message, Channel channel) throws IOException, InterruptedException, IOException {
  6. //        获取消息
  7.         String s = new String(message.getBody(), StandardCharsets.UTF_8);
  8.         System.out.println(s);
  9.         //        模拟业务处理
  10.         Thread.sleep(3000);
  11.         long deliveryTag = message.getMessageProperties().getDeliveryTag();
  12. // 手动签收
  13.         channel.basicAck(deliveryTag,true);
  14.     }
  15. }
复制代码


  • 其实就是修改了消费者项目的配置文件,添加一条配置信息,限制消费者消息的拉取速率。

【五】RabbitMQ高级特性——利用限流实现不公平分发


  • 在RabbitMQ中,多个消费者监听同一条队列,则队列默认接纳的轮询分发。但是在某种场景下这种策略并不是很好,例如消费者1处理任务的速率非常快,而其他消费者处理速率却很慢。此时假如接纳公平分发,则消费者1有很大一部门时间处于空闲状态。此时可以接纳不公平分发,即谁处理的快,谁处理的消息多。


  • 在【四】RabbitMQ高级特性——消费端限流的基础上,修改一消费者项目的配置文件,然后在消费者类中多写几个监听消息的方法(或者多写几个消费者类)。
第一步:修改消费者项目的配置文件

最重要就是配置文件的修改:

  1. spring:
  2.   rabbitmq:
  3.     host: 192.168.70.130
  4.     port: 5672
  5.     username: admin
  6.     password: admin
  7.     virtual-host: /
  8.     #开动手动签收
  9.     listener:
  10.       simple:
  11.         acknowledge-mode: manual  #none是默认的
  12.         prefetch: 1  #  消费端最多拉取1条消息消费,这样谁处理的快谁拉取下一条消息,实现了不公平分发
  13. #????
  14. logging:
  15.   pattern:
  16.     console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'
复制代码
第二步:修改消费者类,编写多个监听方法

  1. @Component
  2. public class ConsumerUnfair {
  3. //  消费者1
  4.     @RabbitListener(queues = "ack_queue")
  5.     public void listenMessage1(Message message, Channel channel) throws IOException, InterruptedException, IOException {
  6. //        获取消息
  7.         String s = new String(message.getBody(), StandardCharsets.UTF_8);
  8.         System.out.println("消费者1"+s);
  9.         Thread.sleep(3000);
  10.         long deliveryTag = message.getMessageProperties().getDeliveryTag();
  11. // 手动签收
  12.         channel.basicAck(deliveryTag,true);
  13.     }
  14.     //    消费者2
  15.     @RabbitListener(queues = "ack_queue")
  16.     public void listenMessage2(Message message, Channel channel) throws IOException, InterruptedException, IOException {
  17. //        获取消息
  18.         String s = new String(message.getBody(), StandardCharsets.UTF_8);
  19.         System.out.println("消费者2"+s);
  20.         Thread.sleep(1000);
  21.         long deliveryTag = message.getMessageProperties().getDeliveryTag();
  22. // 手动签收
  23.         channel.basicAck(deliveryTag,true);
  24.     }
  25. // .......监听方法
  26. }
复制代码


  • 最重要的就是消费者项目的配置文件的修改: 配置消费端最多拉取1条消息消费,如许谁处理的快谁拉取下一条消息,实现了不公平分发。
【六】RabbitMQ高级特性——消息存活时间


  • RabbitMQ可以设置消息的存活时间(Time To Live,简称TTL),当消息到达存活时间后还没有被消费,会被移出队列。RabbitMQ可以对队列的所有消息设置存活时间,也可以对某条消息设置存活时间。


  • 利用【三】rabbitmq的消息可靠性——消费者消息确认(Consumer Ack)的项目,消费者利用手动确认模式的代码
第一步:修改生产者项目的配置类


  1. @Configuration
  2. public class RabbitmqConfig7ttl {
  3.     public final String EXCHANGE_NAME = "ack_exchange";
  4.     public final String QUEUE_NAME = "ack_queue";
  5.     public final String ROUTING_NAME = "ack_routing";
  6. //    创建交换机
  7.     @Bean(EXCHANGE_NAME)
  8.     public Exchange exchange(){
  9.         return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
  10.     }
  11. //    创建队列
  12.     @Bean(QUEUE_NAME)
  13.     public Queue queue(){
  14.         return QueueBuilder
  15.                 .durable(QUEUE_NAME)
  16. //                设置队列的超时的时间,单位是毫秒
  17.                 .ttl(10000)
  18.                 .build();
  19.     }
  20. //    创建交换机和队列绑定
  21.     @Bean
  22.     public Binding exchangeBindQueue(@Qualifier(QUEUE_NAME) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange){
  23.         return BindingBuilder
  24.                 .bind(queue)
  25.                 .to(exchange)
  26.                 .with(ROUTING_NAME)
  27.                 .noargs();
  28.     }
  29. }
复制代码
第二步:修改生产者项目的测试类

设置单条消息存活时间

  1. @Test
  2.     public void testTtlSendBatch() {
  3.         // 发送十条消息
  4.         for (int i = 0; i < 100; i++) {
  5.             if (i%5 == 0) {
  6.                 //设置消息属性
  7.                 MessageProperties messageProperties = new MessageProperties();
  8.                 //设置存活时间
  9.                 messageProperties.setExpiration("10000");
  10.                 // 创建消息对象(可以配置消息的一些配置)
  11.                 Message message = new Message(("这是第" + i + "条消息").getBytes(StandardCharsets.UTF_8), messageProperties);
  12.                 // 发送消息
  13.                 rabbitTemplate.convertAndSend("ack_exchange", "ack_routing", message);
  14.             }else {
  15.                 rabbitTemplate.convertAndSend("ack_exchange", "ack_routing", "这是第" + i + "条消息");
  16.             }
  17.         }
  18.     }
复制代码



    • 假如设置了单条消息的存活时间,也设置了队列的存活时间,以时间短的为准。


    • 消息过期后,并不会立即移除消息,只有消息消费到队列顶端时,才会移除该消息

【七】RabbitMQ高级特性——优先级队列


  • 假设在电商体系中有一个订单催付的场景,即客户在一段时间内未付款会给用户推送一条短信提醒,但是体系中分为大型商家和小型商家。比如像苹果,小米如许大商家一年能给我们创造很大的利润,所以在订单量大时,他们的订单必须得到优先处理,此时就需要为差别的消息设置差别的优先级,此时我们要利用优先级队列。


  • 利用【三】rabbitmq的消息可靠性——消费者消息确认(Consumer Ack)的项目,消费者利用手动确认模式的代码
第一步:修改生产者项目的配置类


  1. @Configuration
  2. public class RabbitmqConfig8Priority {
  3.     public final String EXCHANGE_NAME = "priority_exchange";
  4.     public final String QUEUE_NAME = "priority_queue";
  5.     public final String ROUTING_NAME = "priority_routing";
  6. //    创建交换机
  7.     @Bean(EXCHANGE_NAME)
  8.     public Exchange exchange(){
  9.         return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
  10.     }
  11. //    创建队列
  12.     @Bean(QUEUE_NAME)
  13.     public Queue queue(){
  14.         return QueueBuilder
  15.                 .durable(QUEUE_NAME)
  16. //                设置队列的优先级,值越大优先级越高,一般不超过10
  17.                 .maxPriority(10)
  18.                 .build();
  19.     }
  20. //    创建交换机和队列绑定
  21.     @Bean
  22.     public Binding exchangeBindQueue(@Qualifier(QUEUE_NAME) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange){
  23.         return BindingBuilder
  24.                 .bind(queue)
  25.                 .to(exchange)
  26.                 .with(ROUTING_NAME)
  27.                 .noargs();
  28.     }
  29. }
复制代码
第二步:修改生产者项目的测试

  1. @Test
  2.     public void testPrioritySendBatch() {
  3.         // 发送十条消息
  4.         for (int i = 0; i < 100; i++) {
  5.             if (i%5 == 0) {
  6.                 //设置消息属性
  7.                 MessageProperties messageProperties = new MessageProperties();
  8. //             设置优先级
  9.                 messageProperties.setPriority(9);
  10.                 // 创建消息对象(可以配置消息的一些配置)
  11.                 Message message = new Message(("这是第" + i + "条消息").getBytes(StandardCharsets.UTF_8), messageProperties);
  12.                 // 发送消息
  13.                 rabbitTemplate.convertAndSend("priority_exchange", "priority_routing", message);
  14.             }else {
  15.                 rabbitTemplate.convertAndSend("priority_exchange", "priority_routing", "这是第" + i + "条消息");
  16.             }
  17.         }
  18.     }
复制代码


  • 设置了消息的优先级,那么消费者项目在消费消息的时间就会优先消费等级高的消息。
【八】RabbitMQ高级特性——死信队列


  • 在MQ中,当消息成为死信(Dead message)后,消息中间件可以将其从当前队列发送到另一个队列中,当前队列就是死信队列。而在RabbitMQ中,由于有交换机的概念,实际是将死信发送给了死信交换机(Dead Letter Exchange,简称DLX)。死信交换机和死信队列和普通的没有区别。

  • 消息成为死信的情况:


      • 队列消息长度到达限制。


      • 消费者拒接消费消息,basicNack/basicReject,而且不把消息重新放入原目标队列,requeue=false;


      • 消息到达存活时间未被消费。


生产者项目:第一步:修改配置文件

  1. # rabbitmq???
  2. spring:
  3.   rabbitmq:
  4.     host: 192.168.70.130
  5.     port: 5672
  6.     username: admin
  7.     password: admin
  8.     virtual-host: /
  9.     #开启确认模式
  10.     publisher-confirm-type: correlated
  11.     #开始回退模式
  12.     publisher-returns: true
  13. #????
  14. logging:
  15.   pattern:
  16.     console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'
复制代码
生产者项目:第二步:编写配置类(RabbitMQ的配置类)

  1. @Configuration
  2. public class RabbitmqConfig9Dead {
  3. //    死信
  4.     private final String DEAD_EXCHANGE = "dead_exchange";
  5.     private final String DEAD_QUEUE = "dead_queue";
  6.     private final String DEAD_ROUTING = "dead_routing";
  7.     // 死信交换机
  8.     @Bean(DEAD_EXCHANGE)
  9.     public Exchange deadExchange(){
  10.         return ExchangeBuilder
  11.                 .topicExchange(DEAD_EXCHANGE)
  12.                 .durable(true)
  13.                 .build();
  14.     }
  15.     // 死信队列
  16.     @Bean(DEAD_QUEUE)
  17.     public Queue deadQueue(){
  18.         return QueueBuilder
  19.                 .durable(DEAD_QUEUE)
  20.                 .build();
  21.     }
  22.     // 死信交换机绑定死信队列
  23.     @Bean
  24.     public Binding bindDeadQueue(@Qualifier(DEAD_EXCHANGE) Exchange exchange,@Qualifier(DEAD_QUEUE)Queue queue){
  25.         return BindingBuilder
  26.                 .bind(queue)
  27.                 .to(exchange)
  28.                 .with(DEAD_ROUTING)
  29.                 .noargs();
  30.     }
  31.     // 普通
  32.     private final String NORMAL_EXCHANGE = "normal_exchange";
  33.     private final String NORMAL_QUEUE = "normal_queue";
  34.     private final String NORMAL_ROUTING = "normal_routing";
  35.     // 普通交换机
  36.     @Bean(NORMAL_EXCHANGE)
  37.     public Exchange normalExchange(){
  38.         return ExchangeBuilder
  39.                 .topicExchange(NORMAL_EXCHANGE)
  40.                 .durable(true)
  41.                 .build();
  42.     }
  43.     // 普通队列
  44.     @Bean(NORMAL_QUEUE)
  45.     public Queue normalQueue(){
  46.         return QueueBuilder
  47.                 .durable(NORMAL_QUEUE)
  48.                 .deadLetterExchange(DEAD_EXCHANGE) // 绑定死信交换机
  49.                 .deadLetterRoutingKey(DEAD_ROUTING) // 死信队列路由关键字
  50.                 .ttl(10000) // 消息存活10s
  51.                 .maxLength(10) // 队列最大长度为10
  52.                 .build();
  53.     }
  54.     // 普通交换机绑定普通队列
  55.     @Bean
  56.     public Binding bindNormalQueue(@Qualifier(NORMAL_EXCHANGE) Exchange exchange,@Qualifier(NORMAL_QUEUE)Queue queue){
  57.         return BindingBuilder
  58.                 .bind(queue)
  59.                 .to(exchange)
  60.                 .with(NORMAL_ROUTING)
  61.                 .noargs();
  62.     }
  63. }
复制代码
生产者项目:第三步:编写测试类发生消息:(springboot的测试类,能够加载到第二步的配置类)

  1. @Test
  2. public void testDlx(){
  3.   // 存活时间过期后变成死信
  4.   //     rabbitTemplate.convertAndSend("normal_exchange","normal_routing","测试死信");
  5.   
  6.   // 超过队列长度后变成死信
  7.   //     for (int i = 0; i < 20; i++) {
  8.   //       rabbitTemplate.convertAndSend("normal_exchange","normal_routing","测试死信");
  9.   //     }
  10.   
  11.   // 消息拒签但不返回原队列后变成死信
  12.   rabbitTemplate.convertAndSend("normal_exchange","normal_routing","测试死信");
  13. }
复制代码
消费者项目(手动确认):第一步:修改配置文件



  • 消费者消息确认——手动确认的配置文件
  1. spring:
  2.   rabbitmq:
  3.     host: 192.168.70.130
  4.     port: 5672
  5.     username: admin
  6.     password: admin
  7.     virtual-host: /
  8.     #开动手动签收
  9.     listener:
  10.       simple:
  11.         acknowledge-mode: manual  
  12. #????
  13. logging:
  14.   pattern:
  15.     console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'
复制代码
消费者项目(手动确认):第二步:编写消费者类,监听队列



  • 手动确认
  1. @Component
  2. public class ConsumerDead {
  3.     @RabbitListener(queues = "normal_queue")
  4.     public void listenMessage1(Message message, Channel channel) throws IOException, InterruptedException, IOException {
  5. //        获取消息
  6.         String s = new String(message.getBody(), StandardCharsets.UTF_8);
  7.         System.out.println("消费者1"+s);
  8.         Thread.sleep(500);
  9.         long deliveryTag = message.getMessageProperties().getDeliveryTag();
  10. // 拒绝签收
  11.         channel.basicNack(deliveryTag,true,false);
  12.     }
复制代码


  • 死信队列小结


      • 死信交换机和死信队列和普通的没有区别


      • 当消息成为死信后,假如该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列


      • 消息成为死信的三种情况:
           


        • 队列消息长度到达限制;


        • 消费者拒接消费消息,而且不重回队列;


        • 原队列存在消息过期设置,消息到达超时时间未被消费;



【九】RabbitMQ高级特性——延长队列


  • 延长队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。

    • 例如:


        • 下单后,30分钟未支付,取消订单,回滚库存。


        • 新用户注册成功7天后,发送短信问候。
              

        • 实现方式:


            • 定时器


            • 延长队列








  • RabbitMQ中并未提供延长队列功能,我们可以利用死信队列实现延长队列的效果。


    • 延长队列 指消息进入队列后,可以被延长肯定时间,再进行消费。


    • RabbitMQ没有提供延长队列功能,但是可以利用 : TTL + DLX 来实现延长队列效果。


第一步:创建springboot项目并添加依赖

  1. <dependency>
  2.   <groupId>org.springframework.boot</groupId>
  3.   <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>
  5. <dependency>
  6.   <groupId>org.springframework.boot</groupId>
  7.   <artifactId>spring-boot-starter-web</artifactId>
  8. </dependency>
  9. <dependency>
  10.   <groupId>org.projectlombok</groupId>
  11.   <artifactId>lombok</artifactId>
  12. </dependency>
复制代码
第二步:编写配置文件

  1. spring:
  2.   rabbitmq:
  3.     host: 192.168.70.130
  4.     port: 5672
  5.     username: admin
  6.     password: admin
  7.     virtual-host: /
  8.     #开动手动签收
  9.     listener:
  10.       simple:
  11.         acknowledge-mode: manual
  12. # ????
  13. logging:
  14.   pattern:
  15.     console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'
复制代码
第三步:编写配置类

  1. @Configuration
  2. public class RabbitMQConfig {
  3.     private final String DEAD_EXCHANGE = "order_expire_exchange";
  4.     private final String DEAD_QUEUE = "order_expire_queue";
  5.     private final String DEAD_ROUTING = "order_expire_routing";
  6.     private final String ORDER_EXCHANGE = "order_exchange";
  7.     private final String ORDER_QUEUE = "order_queue";
  8.     private final String ORDER_ROUTING = "order_routing";
  9.     // 死信交换机
  10.     @Bean(DEAD_EXCHANGE)
  11.     public Exchange deadExchange(){
  12.         return ExchangeBuilder
  13.                 .topicExchange(DEAD_EXCHANGE)
  14.                 .durable(true)
  15.                 .build();
  16.     }
  17.     // 死信队列
  18.     @Bean(DEAD_QUEUE)
  19.     public Queue deadQueue(){
  20.         return QueueBuilder
  21.                 .durable(DEAD_QUEUE)
  22.                 .build();
  23.     }
  24.     // 死信交换机绑定死信队列
  25.     @Bean
  26.     public Binding bindDeadQueue(@Qualifier(DEAD_EXCHANGE) Exchange exchange, @Qualifier(DEAD_QUEUE)Queue queue){
  27.         return BindingBuilder
  28.                 .bind(queue)
  29.                 .to(exchange)
  30.                 .with(DEAD_ROUTING)
  31.                 .noargs();
  32.     }
  33.     // 普通交换机
  34.     @Bean(ORDER_EXCHANGE)
  35.     public Exchange normalExchange(){
  36.         return ExchangeBuilder
  37.                 .topicExchange(ORDER_EXCHANGE)
  38.                 .durable(true)
  39.                 .build();
  40.     }
  41.     // 普通队列
  42.     @Bean(ORDER_QUEUE)
  43.     public Queue normalQueue(){
  44.         return QueueBuilder
  45.                 .durable(ORDER_QUEUE)
  46.                 .deadLetterExchange(DEAD_EXCHANGE) // 绑定死信交换机
  47.                 .deadLetterRoutingKey(DEAD_ROUTING) // 死信队列路由关键字
  48.                 .ttl(10000) // 消息存活10s(模拟30min超时)
  49.                 .build();
  50.     }
  51.     // 普通交换机绑定普通队列
  52.     @Bean
  53.     public Binding bindNormalQueue(@Qualifier(ORDER_EXCHANGE) Exchange exchange,@Qualifier(ORDER_QUEUE)Queue queue){
  54.         return BindingBuilder
  55.                 .bind(queue)
  56.                 .to(exchange)
  57.                 .with(ORDER_ROUTING)
  58.                 .noargs();
  59.     }
  60. }
复制代码
第四步:创建控制器,完成下单功能

  1. @RestController
  2. public class OrderController {
  3.     //注入MQ
  4.     @Autowired
  5.     private RabbitTemplate rabbitTemplate;
  6.     @RequestMapping("/addOrder")
  7.     public String addOrder(){
  8.         //生成订单号
  9.         String orderNumber = "2030061812251234";
  10.         //在service层完成订单逻辑
  11.         //将订单号发送到订单mq,30分钟过期进入死信队列,死信队列消费查询订单支付状态,做对应处理
  12.         rabbitTemplate.convertAndSend("order_exchange","order_routing",orderNumber);
  13.         return "下单成功! 您的订单号为 :"+orderNumber;
  14.     }
  15. }
复制代码
第五步:创建消费者,监听消息

  1. @Component
  2. public class ListenerOrder {
  3.     //监听订单过期队列
  4.     @RabbitListener(queues = "order_expire_queue")
  5.     public void orderListener(String orderId){
  6.         System.out.println("orderId = " + orderId);
  7.         //根据订单id查询订单状态是否支付
  8.         /**
  9.          * 监听死信队列的类,回去30min超时订单号,根据订单号查询订单的支付状态
  10.          * 支付:走下一步流程
  11.          * 未支付:关闭订单,库存回滚
  12.          */
  13.     }
  14. }
复制代码
手动签收模式的结果



  • 在手动签收模式的时间,当我们启动项目,访问订单功能时,立即生成了一个队列消息

  • 然后因为是手动签收模式,所以在消息的存活时间过去了之后,成为了死信消息,所以被消息被拒收了,但是还存在队列中。

自动签收模式的结果

  1. spring:
  2.   rabbitmq:
  3.     host: 192.168.70.130
  4.     port: 5672
  5.     username: admin
  6.     password: admin
  7.     virtual-host: /
  8.     #开动自动签收
  9.     listener:
  10.       simple:
  11.         acknowledge-mode: none   # 默认的
  12. # ????
  13. logging:
  14.   pattern:
  15.     console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'
复制代码


  • 在自动签收模式的时间,当我们启动项目,访问订单功能时,立即生成了一个队列消息

  • 因为是自动签收的,所以消息过了存活时间之后就没了(自动确认指消息只要被消费者接收到,无论是否成功处理消息,则自动签收,并将消息从队列中移除)

RabbitMQ一、RabbitMQ的先容与安装(docker)
RabbitMQ二、RabbitMQ的六种模式

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

怀念夏天

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表