ToB企服应用市场:ToB评测及商务社交产业平台

标题: RabbitMQ三、springboot整合rabbitmq(消息可靠性、高级特性) [打印本页]

作者: 怀念夏天    时间: 2024-6-20 13:54
标题: RabbitMQ三、springboot整合rabbitmq(消息可靠性、高级特性)
一、springboot整合RabbitMQ(jdk17)(创建两个项目,一个生产者项目,一个消费者项目)

生产者项目


第一步:创建springboot工程,然后引入rabbitmq的依赖

  1. <!-- RabbitMQ起步依赖 -->
  2. <dependency>
  3.   <groupId>org.springframework.boot</groupId>
  4.   <artifactId>spring-boot-starter-amqp</artifactId>
  5. </dependency>
复制代码
第二步:编写配置文件

  1. spring:
  2.   rabbitmq:
  3.    host: 192.168.70.130  # 虚拟机的地址
  4.    port: 5672
  5.    username: admin
  6.    password: admin
  7.    virtual-host: /
  8. #日志格式
  9. logging:
  10.   pattern:
  11.    console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'
复制代码
第三步:编写RabbitMQ的配置类

  1. @Configuration
  2. public class RabbitmqConfig1 {
  3.     private final String EXCHANGE_NAME = "boot_exchange";
  4.     private final String QUEUE_NAME = "boot_queue";
  5.     private final String ROUTE_NAME = "boot_route";
  6.     //创建交换机
  7.     @Bean(EXCHANGE_NAME)
  8.     public Exchange getExchange(){
  9.         return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
  10.     }
  11.     //创建队列
  12.     @Bean(QUEUE_NAME)
  13.     public Queue getQueue(){
  14.         return new Queue(QUEUE_NAME);
  15.     }
  16.     //交换机和队列绑定
  17.     @Bean
  18.     public Binding exchangeBindQueue(@Qualifier(QUEUE_NAME) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange){
  19.         return BindingBuilder.bind(queue).to(exchange).with(ROUTE_NAME).noargs();
  20.     }
  21. }
复制代码
第四步:编写发送消息测试类

  1. //编写发送消息测试类
  2. @SpringBootTest
  3. public class RabbitmqTest {
  4.     // 注入RabbitTemplate工具类
  5.     @Autowired
  6.     private RabbitTemplate rabbitTemplate;
  7.     @Test
  8.     public void testSendMessage(){
  9.         /**
  10.          * 发送消息
  11.          * 参数1:交换机
  12.          * 参数2:路由key
  13.          * 参数3:要发送的消息
  14.          */
  15.         rabbitTemplate.convertAndSend("boot_exchange","boot_route","你好我有一个毛衫");
  16.         System.out.println("发送消息成功");
  17.     }
  18. }
复制代码
消费者项目


第一步:创建springboot工程,然后引入rabbitmq的依赖

  1. <!-- RabbitMQ起步依赖 -->
  2. <dependency>
  3.   <groupId>org.springframework.boot</groupId>
  4.   <artifactId>spring-boot-starter-amqp</artifactId>
  5. </dependency>
复制代码
第二步:编写配置文件

  1. spring:
  2.   rabbitmq:
  3.    host: 192.168.70.130  # 虚拟机的地址
  4.    port: 5672
  5.    username: admin
  6.    password: admin
  7.    virtual-host: /
  8. #日志格式
  9. logging:
  10.   pattern:
  11.    console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'
复制代码
第三步:编写消费者,监听队列

  1. @Component
  2. public class Consumer1 {
  3.     /**
  4.      * 监听队列
  5.      * @param message
  6.      * queues表示监听的队列的名称
  7.      */
  8.     @RabbitListener(queues = "boot_queue")
  9.     public void listener(String message){
  10.         System.out.println("接受到消息 = " + message);
  11.     }
  12. }
复制代码
4、rabbitmq的消息可靠性

【一】rabbitmq的消息可靠性——确认模式

第一步:修改配置文件

只是添加了一句代码

  1. spring:
  2.   rabbitmq:
  3.     host: 192.168.70.130
  4.     port: 5672
  5.     username: admin
  6.     password: admin
  7.     virtual-host: / # 表示使用默认的virtual-host
  8.     #开启确认模式
  9.     publisher-confirm-type: correlated
  10. #????
  11. logging:
  12.   pattern:
  13.     console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'
复制代码
第二步:在生产者的配置类创建交换机和队列(RabbitMQ的配置类)

  1. @Configuration
  2. public class RabbitmqConfig2Confirm {
  3.     public final String EXCHANGE_NAME = "confirm_exchange";
  4.     public final String QUEUE_NAME = "confirm_queue";
  5.     public final String ROUTING_NAME = "confirm_routing";
  6.    
  7. //    创建交换机
  8.     @Bean(EXCHANGE_NAME)
  9.     public Exchange exchange(){
  10.         return ExchangeBuilder
  11.                 .topicExchange(EXCHANGE_NAME)
  12.                 .durable(true)
  13.                 .build();
  14.     }
  15. //    创建队列
  16.     @Bean(QUEUE_NAME)
  17.     public Queue queue(){
  18.         return QueueBuilder
  19.                 .durable(QUEUE_NAME)
  20.                 .build();
  21.     }
  22. //    创建交换机和队列绑定
  23.     @Bean
  24.     public Binding exchangeBindQueue(@Qualifier(QUEUE_NAME) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange){
  25.         return BindingBuilder
  26.                 .bind(queue).
  27.                 to(exchange)
  28.                 .with(ROUTING_NAME)
  29.                 .noargs();
  30.     }
  31. }
复制代码
第三步:编写测试类发生消息:生产者界说确认模式的回调方法(springboot的测试类,能够加载到第二步的配置类)

  1. @Test
  2.     void testConfirm() {
  3.         //回调确认
  4.         rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
  5.             /**
  6.              *
  7.              * @param correlationData 配置信息
  8.              * @param b 是否成功,true 是 ,false 否
  9.              * @param s 失败原因
  10.              */
  11.             @Override
  12.             public void confirm(CorrelationData correlationData, boolean b, String s) {
  13.                 if (b){
  14.                     System.out.println("发送成功");
  15.                 }else{
  16.                     System.out.println("发送失败,原因:"+s);
  17.                 }
  18.             }
  19.         });
  20.         //发送消息
  21.           /**
  22.          * 发送消息
  23.          * 参数1:交换机
  24.          * 参数2:路由key
  25.          * 参数3:要发送的消息
  26.          */
  27.          rabbitTemplate.convertAndSend("confirm_exchange","confirm_routing","send message...confirm");
  28.     }
复制代码
由于rabbitmq的confirm确认模式是确认消息是否从生产者成功传递到交换机的,所以就没必要写消费者进行信息的消费了

【二】rabbitmq的消息可靠性——退回模式

第一步:修改配置文件

只是添加了一句

  1. # rabbitmq???
  2. spring:
  3.   rabbitmq:
  4.     host: 192.168.70.130
  5.     port: 5672
  6.     username: admin
  7.     password: admin
  8.     virtual-host: /
  9.     #开启确认模式
  10.     publisher-confirm-type: correlated
  11.     #开始回退模式
  12.     publisher-returns: true
  13. #????
  14. logging:
  15.   pattern:
  16.     console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'
复制代码
第二步:编写配置类(RabbitMQ的配置类)

  1. @Configuration
  2. public class RabbitmqConfig3Return {
  3.     public final String EXCHANGE_NAME = "return_exchange";
  4.     public final String QUEUE_NAME = "return_queue";
  5.     public final String ROUTING_NAME = "return_routing";
  6. //    创建交换机
  7.     @Bean(EXCHANGE_NAME)
  8.     public Exchange exchange(){
  9.         return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
  10.     }
  11. //    创建队列
  12.     @Bean(QUEUE_NAME)
  13.     public Queue queue(){
  14.         return QueueBuilder.durable(QUEUE_NAME).build();
  15.     }
  16. //    创建交换机和队列绑定
  17.     @Bean
  18.     public Binding exchangeBindQueue(@Qualifier(QUEUE_NAME) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange){
  19.         return BindingBuilder
  20.                 .bind(queue)
  21.                 .to(exchange)
  22.                 .with(ROUTING_NAME)
  23.                 .noargs();
  24.     }
  25. }
复制代码
第三步:编写测试类发生消息:生产者界说退回模式的回调方法(springboot的测试类,能够加载到第二步的配置类)

  1. @Test
  2.     void testReturnSendMessage(){
  3. //        调用回退模式的回调方法,只有失败才会回调,成功不会回调哦
  4. // 失败后将失败信息封装到参数中
  5.         rabbitTemplate.setReturnsCallback(returned ->{
  6.             Message message = returned.getMessage();
  7.             System.out.println("消息对象:"+message);
  8.             System.out.println("错误码:"+returned.getReplyCode());
  9.             System.out.println("错误信息:"+returned.getReplyText());
  10.             System.out.println("交换机:"+returned.getExchange());
  11.             System.out.println("路由键:"+returned.getRoutingKey());
  12.         });
  13. //        发送消息
  14.            /**
  15.          * 发送消息
  16.          * 参数1:交换机
  17.          * 参数2:路由key
  18.          * 参数3:要发送的消息
  19.          */
  20.          rabbitTemplate.convertAndSend("return_exchange","return_routing","send message...return");
  21.     }
复制代码
由于rabbitmq的return回退模式是确认消息是否从交换机成功传递到队列的,还没有传递到消费者,所以就没必要写消费者进行信息的消费了


【三】rabbitmq的消息可靠性——消费者消息确认(Consumer Ack)

● 自动确认:spring.rabbitmq.listener.simple.acknowledge=“none”
● 手动确认:spring.rabbitmq.listener.simple.acknowledge=“manual”
生产者项目:第一步:修改配置文件

不用修改
  1. # rabbitmq???
  2. spring:
  3.   rabbitmq:
  4.     host: 192.168.70.130
  5.     port: 5672
  6.     username: admin
  7.     password: admin
  8.     virtual-host: /
  9.     #开启确认模式
  10.     publisher-confirm-type: correlated
  11.     #开始回退模式
  12.     publisher-returns: true
  13. #????
  14. logging:
  15.   pattern:
  16.     console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'
复制代码
生产者项目:第二步:编写配置类(RabbitMQ的配置类)

  1. @Configuration
  2. public class RabbitmqConfig4ACK {
  3.     public final String EXCHANGE_NAME = "ack_exchange";
  4.     public final String QUEUE_NAME = "ack_queue";
  5.     public final String ROUTING_NAME = "ack_routing";
  6. //    创建交换机
  7.     @Bean(EXCHANGE_NAME)
  8.     public Exchange exchange(){
  9.         return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
  10.     }
  11. //    创建队列
  12.     @Bean(QUEUE_NAME)
  13.     public Queue queue(){
  14.         return QueueBuilder.durable(QUEUE_NAME).build();
  15.     }
  16. //    创建交换机和队列绑定
  17.     @Bean
  18.     public Binding exchangeBindQueue(@Qualifier(QUEUE_NAME) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange){
  19.         return BindingBuilder
  20.                 .bind(queue)
  21.                 .to(exchange)
  22.                 .with(ROUTING_NAME)
  23.                 .noargs();
  24.     }
  25. }
复制代码
生产者项目:第三步:编写测试类发生消息:(springboot的测试类,能够加载到第二步的配置类)

  1. @Test
  2.     void testAck(){
  3.         //        发送消息
  4.         rabbitTemplate.convertAndSend("ack_exchange","ack_routing","send message...ack");
  5.     }
复制代码
消费者项目(自动确认):第一步:修改配置文件


  1. spring:
  2.   rabbitmq:
  3.     host: 192.168.70.130
  4.     port: 5672
  5.     username: admin
  6.     password: admin
  7.     virtual-host: /
  8.     #开动手动签收
  9.     listener:
  10.       simple:
  11.         acknowledge-mode: none   # 默认就是自动确认
  12. #????
  13. logging:
  14.   pattern:
  15.     console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'
复制代码


消费者项目(自动确认):第二步:编写消费者类,监听队列


  1. @Component
  2. public class AckConsumer {
  3. //    自动签收
  4.     @RabbitListener(queues = "ack_queue")
  5.     public void listenMessage(Message message, Channel channel) throws IOException, InterruptedException, IOException {
  6. //        获取消息
  7.         String s = new String(message.getBody(), StandardCharsets.UTF_8);
  8.         System.out.println(s);
  9. //        TODO,处理事务
  10. //        故意出错
  11.         int i= 1/0;
  12.     }
  13. }
复制代码
消费者项目(手动确认):第一步:修改配置文件


  1. spring:
  2.   rabbitmq:
  3.     host: 192.168.70.130
  4.     port: 5672
  5.     username: admin
  6.     password: admin
  7.     virtual-host: /
  8.     #开动手动签收
  9.     listener:
  10.       simple:
  11.         acknowledge-mode: manual  
  12. #????
  13. logging:
  14.   pattern:
  15.     console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'
复制代码
消费者项目(手动确认):第二步:编写消费者类,监听队列


  1. @Component
  2. public class AckConsumer {
  3.     //    手动签收
  4.     @RabbitListener(queues = "ack_queue")
  5.     public void listenMessage(Message message, Channel channel) throws IOException, InterruptedException, IOException {
  6. // 消息投递序号,消息每次投递该值都会+1
  7.         long deliveryTag = message.getMessageProperties().getDeliveryTag();
  8.         try {
  9. //            int i = 1/0; //模拟处理消息出现bug
  10.             System.out.println("成功接受到消息:"+message);
  11.             // 签收消息
  12.             /**
  13.              * 参数1:消息投递序号
  14.              * 参数2:是否一次可以签收多条消息
  15.              */
  16.             channel.basicAck(deliveryTag,true);
  17.         }catch (Exception e){
  18.             System.out.println("消息消费失败!");
  19.             Thread.sleep(2000);
  20.             // 拒签消息
  21.             /**
  22.              * 参数1:消息投递序号
  23.              * 参数2:是否一次可以拒签多条消息
  24.              * 参数3:拒签后消息是否重回队列
  25.              */
  26.             channel.basicNack(deliveryTag,true,true);
  27.         }
  28.     }
  29. }
复制代码



【四】RabbitMQ高级特性——消费端限流



第一步:先在生产者项目中,发送多个消息

  1. @Test
  2.     public void testLimitSendBatch() {
  3.         // 发送十条消息
  4.         for (int i = 0; i < 10; i++) {
  5.             rabbitTemplate.convertAndSend("ack_exchange", "ack_routing", "这是第"+i+"条消息");
  6.         }
  7.     }
复制代码
第二步:修改消费者项目的配置文件

最重要就是配置文件的修改:

  1. spring:
  2.   rabbitmq:
  3.     host: 192.168.70.130
  4.     port: 5672
  5.     username: admin
  6.     password: admin
  7.     virtual-host: /
  8.     #开动手动签收
  9.     listener:
  10.       simple:
  11.         acknowledge-mode: manual  #none是默认的
  12.         prefetch: 5  # 每次消费者从队列拉取的消息数量(限制)
  13. #????
  14. logging:
  15.   pattern:
  16.     console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'
复制代码
第三步:重新编写消费者类

  1. @Component
  2. public class ConsumerLimit {
  3. //    手动签收
  4.     @RabbitListener(queues = "limit_queue")
  5.     public void listenMessage(Message message, Channel channel) throws IOException, InterruptedException, IOException {
  6. //        获取消息
  7.         String s = new String(message.getBody(), StandardCharsets.UTF_8);
  8.         System.out.println(s);
  9.         //        模拟业务处理
  10.         Thread.sleep(3000);
  11.         long deliveryTag = message.getMessageProperties().getDeliveryTag();
  12. // 手动签收
  13.         channel.basicAck(deliveryTag,true);
  14.     }
  15. }
复制代码

【五】RabbitMQ高级特性——利用限流实现不公平分发


第一步:修改消费者项目的配置文件

最重要就是配置文件的修改:

  1. spring:
  2.   rabbitmq:
  3.     host: 192.168.70.130
  4.     port: 5672
  5.     username: admin
  6.     password: admin
  7.     virtual-host: /
  8.     #开动手动签收
  9.     listener:
  10.       simple:
  11.         acknowledge-mode: manual  #none是默认的
  12.         prefetch: 1  #  消费端最多拉取1条消息消费,这样谁处理的快谁拉取下一条消息,实现了不公平分发
  13. #????
  14. logging:
  15.   pattern:
  16.     console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'
复制代码
第二步:修改消费者类,编写多个监听方法

  1. @Component
  2. public class ConsumerUnfair {
  3. //  消费者1
  4.     @RabbitListener(queues = "ack_queue")
  5.     public void listenMessage1(Message message, Channel channel) throws IOException, InterruptedException, IOException {
  6. //        获取消息
  7.         String s = new String(message.getBody(), StandardCharsets.UTF_8);
  8.         System.out.println("消费者1"+s);
  9.         Thread.sleep(3000);
  10.         long deliveryTag = message.getMessageProperties().getDeliveryTag();
  11. // 手动签收
  12.         channel.basicAck(deliveryTag,true);
  13.     }
  14.     //    消费者2
  15.     @RabbitListener(queues = "ack_queue")
  16.     public void listenMessage2(Message message, Channel channel) throws IOException, InterruptedException, IOException {
  17. //        获取消息
  18.         String s = new String(message.getBody(), StandardCharsets.UTF_8);
  19.         System.out.println("消费者2"+s);
  20.         Thread.sleep(1000);
  21.         long deliveryTag = message.getMessageProperties().getDeliveryTag();
  22. // 手动签收
  23.         channel.basicAck(deliveryTag,true);
  24.     }
  25. // .......监听方法
  26. }
复制代码

【六】RabbitMQ高级特性——消息存活时间


第一步:修改生产者项目的配置类


  1. @Configuration
  2. public class RabbitmqConfig7ttl {
  3.     public final String EXCHANGE_NAME = "ack_exchange";
  4.     public final String QUEUE_NAME = "ack_queue";
  5.     public final String ROUTING_NAME = "ack_routing";
  6. //    创建交换机
  7.     @Bean(EXCHANGE_NAME)
  8.     public Exchange exchange(){
  9.         return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
  10.     }
  11. //    创建队列
  12.     @Bean(QUEUE_NAME)
  13.     public Queue queue(){
  14.         return QueueBuilder
  15.                 .durable(QUEUE_NAME)
  16. //                设置队列的超时的时间,单位是毫秒
  17.                 .ttl(10000)
  18.                 .build();
  19.     }
  20. //    创建交换机和队列绑定
  21.     @Bean
  22.     public Binding exchangeBindQueue(@Qualifier(QUEUE_NAME) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange){
  23.         return BindingBuilder
  24.                 .bind(queue)
  25.                 .to(exchange)
  26.                 .with(ROUTING_NAME)
  27.                 .noargs();
  28.     }
  29. }
复制代码
第二步:修改生产者项目的测试类

设置单条消息存活时间

  1. @Test
  2.     public void testTtlSendBatch() {
  3.         // 发送十条消息
  4.         for (int i = 0; i < 100; i++) {
  5.             if (i%5 == 0) {
  6.                 //设置消息属性
  7.                 MessageProperties messageProperties = new MessageProperties();
  8.                 //设置存活时间
  9.                 messageProperties.setExpiration("10000");
  10.                 // 创建消息对象(可以配置消息的一些配置)
  11.                 Message message = new Message(("这是第" + i + "条消息").getBytes(StandardCharsets.UTF_8), messageProperties);
  12.                 // 发送消息
  13.                 rabbitTemplate.convertAndSend("ack_exchange", "ack_routing", message);
  14.             }else {
  15.                 rabbitTemplate.convertAndSend("ack_exchange", "ack_routing", "这是第" + i + "条消息");
  16.             }
  17.         }
  18.     }
复制代码

【七】RabbitMQ高级特性——优先级队列


第一步:修改生产者项目的配置类


  1. @Configuration
  2. public class RabbitmqConfig8Priority {
  3.     public final String EXCHANGE_NAME = "priority_exchange";
  4.     public final String QUEUE_NAME = "priority_queue";
  5.     public final String ROUTING_NAME = "priority_routing";
  6. //    创建交换机
  7.     @Bean(EXCHANGE_NAME)
  8.     public Exchange exchange(){
  9.         return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
  10.     }
  11. //    创建队列
  12.     @Bean(QUEUE_NAME)
  13.     public Queue queue(){
  14.         return QueueBuilder
  15.                 .durable(QUEUE_NAME)
  16. //                设置队列的优先级,值越大优先级越高,一般不超过10
  17.                 .maxPriority(10)
  18.                 .build();
  19.     }
  20. //    创建交换机和队列绑定
  21.     @Bean
  22.     public Binding exchangeBindQueue(@Qualifier(QUEUE_NAME) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange){
  23.         return BindingBuilder
  24.                 .bind(queue)
  25.                 .to(exchange)
  26.                 .with(ROUTING_NAME)
  27.                 .noargs();
  28.     }
  29. }
复制代码
第二步:修改生产者项目的测试

  1. @Test
  2.     public void testPrioritySendBatch() {
  3.         // 发送十条消息
  4.         for (int i = 0; i < 100; i++) {
  5.             if (i%5 == 0) {
  6.                 //设置消息属性
  7.                 MessageProperties messageProperties = new MessageProperties();
  8. //             设置优先级
  9.                 messageProperties.setPriority(9);
  10.                 // 创建消息对象(可以配置消息的一些配置)
  11.                 Message message = new Message(("这是第" + i + "条消息").getBytes(StandardCharsets.UTF_8), messageProperties);
  12.                 // 发送消息
  13.                 rabbitTemplate.convertAndSend("priority_exchange", "priority_routing", message);
  14.             }else {
  15.                 rabbitTemplate.convertAndSend("priority_exchange", "priority_routing", "这是第" + i + "条消息");
  16.             }
  17.         }
  18.     }
复制代码

【八】RabbitMQ高级特性——死信队列

生产者项目:第一步:修改配置文件

  1. # rabbitmq???
  2. spring:
  3.   rabbitmq:
  4.     host: 192.168.70.130
  5.     port: 5672
  6.     username: admin
  7.     password: admin
  8.     virtual-host: /
  9.     #开启确认模式
  10.     publisher-confirm-type: correlated
  11.     #开始回退模式
  12.     publisher-returns: true
  13. #????
  14. logging:
  15.   pattern:
  16.     console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'
复制代码
生产者项目:第二步:编写配置类(RabbitMQ的配置类)

  1. @Configuration
  2. public class RabbitmqConfig9Dead {
  3. //    死信
  4.     private final String DEAD_EXCHANGE = "dead_exchange";
  5.     private final String DEAD_QUEUE = "dead_queue";
  6.     private final String DEAD_ROUTING = "dead_routing";
  7.     // 死信交换机
  8.     @Bean(DEAD_EXCHANGE)
  9.     public Exchange deadExchange(){
  10.         return ExchangeBuilder
  11.                 .topicExchange(DEAD_EXCHANGE)
  12.                 .durable(true)
  13.                 .build();
  14.     }
  15.     // 死信队列
  16.     @Bean(DEAD_QUEUE)
  17.     public Queue deadQueue(){
  18.         return QueueBuilder
  19.                 .durable(DEAD_QUEUE)
  20.                 .build();
  21.     }
  22.     // 死信交换机绑定死信队列
  23.     @Bean
  24.     public Binding bindDeadQueue(@Qualifier(DEAD_EXCHANGE) Exchange exchange,@Qualifier(DEAD_QUEUE)Queue queue){
  25.         return BindingBuilder
  26.                 .bind(queue)
  27.                 .to(exchange)
  28.                 .with(DEAD_ROUTING)
  29.                 .noargs();
  30.     }
  31.     // 普通
  32.     private final String NORMAL_EXCHANGE = "normal_exchange";
  33.     private final String NORMAL_QUEUE = "normal_queue";
  34.     private final String NORMAL_ROUTING = "normal_routing";
  35.     // 普通交换机
  36.     @Bean(NORMAL_EXCHANGE)
  37.     public Exchange normalExchange(){
  38.         return ExchangeBuilder
  39.                 .topicExchange(NORMAL_EXCHANGE)
  40.                 .durable(true)
  41.                 .build();
  42.     }
  43.     // 普通队列
  44.     @Bean(NORMAL_QUEUE)
  45.     public Queue normalQueue(){
  46.         return QueueBuilder
  47.                 .durable(NORMAL_QUEUE)
  48.                 .deadLetterExchange(DEAD_EXCHANGE) // 绑定死信交换机
  49.                 .deadLetterRoutingKey(DEAD_ROUTING) // 死信队列路由关键字
  50.                 .ttl(10000) // 消息存活10s
  51.                 .maxLength(10) // 队列最大长度为10
  52.                 .build();
  53.     }
  54.     // 普通交换机绑定普通队列
  55.     @Bean
  56.     public Binding bindNormalQueue(@Qualifier(NORMAL_EXCHANGE) Exchange exchange,@Qualifier(NORMAL_QUEUE)Queue queue){
  57.         return BindingBuilder
  58.                 .bind(queue)
  59.                 .to(exchange)
  60.                 .with(NORMAL_ROUTING)
  61.                 .noargs();
  62.     }
  63. }
复制代码
生产者项目:第三步:编写测试类发生消息:(springboot的测试类,能够加载到第二步的配置类)

  1. @Test
  2. public void testDlx(){
  3.   // 存活时间过期后变成死信
  4.   //     rabbitTemplate.convertAndSend("normal_exchange","normal_routing","测试死信");
  5.   
  6.   // 超过队列长度后变成死信
  7.   //     for (int i = 0; i < 20; i++) {
  8.   //       rabbitTemplate.convertAndSend("normal_exchange","normal_routing","测试死信");
  9.   //     }
  10.   
  11.   // 消息拒签但不返回原队列后变成死信
  12.   rabbitTemplate.convertAndSend("normal_exchange","normal_routing","测试死信");
  13. }
复制代码
消费者项目(手动确认):第一步:修改配置文件


  1. spring:
  2.   rabbitmq:
  3.     host: 192.168.70.130
  4.     port: 5672
  5.     username: admin
  6.     password: admin
  7.     virtual-host: /
  8.     #开动手动签收
  9.     listener:
  10.       simple:
  11.         acknowledge-mode: manual  
  12. #????
  13. logging:
  14.   pattern:
  15.     console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'
复制代码
消费者项目(手动确认):第二步:编写消费者类,监听队列


  1. @Component
  2. public class ConsumerDead {
  3.     @RabbitListener(queues = "normal_queue")
  4.     public void listenMessage1(Message message, Channel channel) throws IOException, InterruptedException, IOException {
  5. //        获取消息
  6.         String s = new String(message.getBody(), StandardCharsets.UTF_8);
  7.         System.out.println("消费者1"+s);
  8.         Thread.sleep(500);
  9.         long deliveryTag = message.getMessageProperties().getDeliveryTag();
  10. // 拒绝签收
  11.         channel.basicNack(deliveryTag,true,false);
  12.     }
复制代码

【九】RabbitMQ高级特性——延长队列


第一步:创建springboot项目并添加依赖

  1. <dependency>
  2.   <groupId>org.springframework.boot</groupId>
  3.   <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>
  5. <dependency>
  6.   <groupId>org.springframework.boot</groupId>
  7.   <artifactId>spring-boot-starter-web</artifactId>
  8. </dependency>
  9. <dependency>
  10.   <groupId>org.projectlombok</groupId>
  11.   <artifactId>lombok</artifactId>
  12. </dependency>
复制代码
第二步:编写配置文件

  1. spring:
  2.   rabbitmq:
  3.     host: 192.168.70.130
  4.     port: 5672
  5.     username: admin
  6.     password: admin
  7.     virtual-host: /
  8.     #开动手动签收
  9.     listener:
  10.       simple:
  11.         acknowledge-mode: manual
  12. # ????
  13. logging:
  14.   pattern:
  15.     console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'
复制代码
第三步:编写配置类

  1. @Configuration
  2. public class RabbitMQConfig {
  3.     private final String DEAD_EXCHANGE = "order_expire_exchange";
  4.     private final String DEAD_QUEUE = "order_expire_queue";
  5.     private final String DEAD_ROUTING = "order_expire_routing";
  6.     private final String ORDER_EXCHANGE = "order_exchange";
  7.     private final String ORDER_QUEUE = "order_queue";
  8.     private final String ORDER_ROUTING = "order_routing";
  9.     // 死信交换机
  10.     @Bean(DEAD_EXCHANGE)
  11.     public Exchange deadExchange(){
  12.         return ExchangeBuilder
  13.                 .topicExchange(DEAD_EXCHANGE)
  14.                 .durable(true)
  15.                 .build();
  16.     }
  17.     // 死信队列
  18.     @Bean(DEAD_QUEUE)
  19.     public Queue deadQueue(){
  20.         return QueueBuilder
  21.                 .durable(DEAD_QUEUE)
  22.                 .build();
  23.     }
  24.     // 死信交换机绑定死信队列
  25.     @Bean
  26.     public Binding bindDeadQueue(@Qualifier(DEAD_EXCHANGE) Exchange exchange, @Qualifier(DEAD_QUEUE)Queue queue){
  27.         return BindingBuilder
  28.                 .bind(queue)
  29.                 .to(exchange)
  30.                 .with(DEAD_ROUTING)
  31.                 .noargs();
  32.     }
  33.     // 普通交换机
  34.     @Bean(ORDER_EXCHANGE)
  35.     public Exchange normalExchange(){
  36.         return ExchangeBuilder
  37.                 .topicExchange(ORDER_EXCHANGE)
  38.                 .durable(true)
  39.                 .build();
  40.     }
  41.     // 普通队列
  42.     @Bean(ORDER_QUEUE)
  43.     public Queue normalQueue(){
  44.         return QueueBuilder
  45.                 .durable(ORDER_QUEUE)
  46.                 .deadLetterExchange(DEAD_EXCHANGE) // 绑定死信交换机
  47.                 .deadLetterRoutingKey(DEAD_ROUTING) // 死信队列路由关键字
  48.                 .ttl(10000) // 消息存活10s(模拟30min超时)
  49.                 .build();
  50.     }
  51.     // 普通交换机绑定普通队列
  52.     @Bean
  53.     public Binding bindNormalQueue(@Qualifier(ORDER_EXCHANGE) Exchange exchange,@Qualifier(ORDER_QUEUE)Queue queue){
  54.         return BindingBuilder
  55.                 .bind(queue)
  56.                 .to(exchange)
  57.                 .with(ORDER_ROUTING)
  58.                 .noargs();
  59.     }
  60. }
复制代码
第四步:创建控制器,完成下单功能

  1. @RestController
  2. public class OrderController {
  3.     //注入MQ
  4.     @Autowired
  5.     private RabbitTemplate rabbitTemplate;
  6.     @RequestMapping("/addOrder")
  7.     public String addOrder(){
  8.         //生成订单号
  9.         String orderNumber = "2030061812251234";
  10.         //在service层完成订单逻辑
  11.         //将订单号发送到订单mq,30分钟过期进入死信队列,死信队列消费查询订单支付状态,做对应处理
  12.         rabbitTemplate.convertAndSend("order_exchange","order_routing",orderNumber);
  13.         return "下单成功! 您的订单号为 :"+orderNumber;
  14.     }
  15. }
复制代码
第五步:创建消费者,监听消息

  1. @Component
  2. public class ListenerOrder {
  3.     //监听订单过期队列
  4.     @RabbitListener(queues = "order_expire_queue")
  5.     public void orderListener(String orderId){
  6.         System.out.println("orderId = " + orderId);
  7.         //根据订单id查询订单状态是否支付
  8.         /**
  9.          * 监听死信队列的类,回去30min超时订单号,根据订单号查询订单的支付状态
  10.          * 支付:走下一步流程
  11.          * 未支付:关闭订单,库存回滚
  12.          */
  13.     }
  14. }
复制代码
手动签收模式的结果


自动签收模式的结果

  1. spring:
  2.   rabbitmq:
  3.     host: 192.168.70.130
  4.     port: 5672
  5.     username: admin
  6.     password: admin
  7.     virtual-host: /
  8.     #开动自动签收
  9.     listener:
  10.       simple:
  11.         acknowledge-mode: none   # 默认的
  12. # ????
  13. logging:
  14.   pattern:
  15.     console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'
复制代码

RabbitMQ一、RabbitMQ的先容与安装(docker)
RabbitMQ二、RabbitMQ的六种模式

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4