Spring Boot 整合 RabbitMQ 详解

打印 上一主题 下一主题

主题 828|帖子 828|积分 2484

前言:
在消息中间件领域中 RabbitMQ 也是一种非经常见的消息中间件了,本篇简单分享一下 Spring Boot 项目集成 RabbitMQ 的过程。
RabbitMQ 系列文章传送门
RabbitMQ 的先容及焦点概念解说
@RabbitListener 注解详解
Spring Boot 集成 RabbitMQ 可以分为三大步,如下:


  • 在 proerties 大概 yml 文件中添加 RabbitMQ 设置。
  • 项目 pom.xml 文件中引入 spring-boot-starter-amqp 依靠。
  • 注入 RabbitTemplate 开始利用 RabbitMQ ,其实这步以及算是利用了,不能算作集成了,但是集成了总归是要利用的,我把这里也算作一步了。
在 proerties 大概 yml 文件中添加 RabbitMQ 设置如下:
  1. spring.rabbitmq.host= xxx.xxx.xxx
  2. spring.rabbitmq.port= 5672
  3. spring.rabbitmq.username= admin
  4. spring.rabbitmq.password= admin
  5. spring.rabbitmq.virtual-host = /study
复制代码
项目 pom.xml 文件中引入 spring-boot-starter-amqp 依靠如下:
  1. <dependency>
  2.         <groupId>org.springframework.boot</groupId>
  3.         <artifactId>spring-boot-starter-amqp</artifactId>
  4.         <version>2.4.5</version>
  5. </dependency>
复制代码
RabbitMQ 利用

前文我们在分享 RabbitMQ 焦点概念的时间,我们知道了 RabbitMQ 有六种互换机类型,下面我们就针对六种互换机来分享 RabbitMQ 的利用。
Direct Exchange(直连互换机)
直连互换机是 RoutingKey 完全匹配模式,也就是我们常说的点对点模式,消息会传送给 RoutingKey 完全匹配的队列。
直连互换机 Direct Exchange 和队列 Queue 的路由设置代码如下:
  1. package com.user.service.rabbitmq;
  2. import org.springframework.amqp.core.Binding;
  3. import org.springframework.amqp.core.BindingBuilder;
  4. import org.springframework.amqp.core.DirectExchange;
  5. import org.springframework.amqp.core.Queue;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. @Configuration
  9. public class RabbitDirectConfig {
  10.     //注入队列
  11.     @Bean
  12.     public Queue queue() {
  13.         return new Queue("direct-buget-queue");
  14.     }
  15.     //注入交换机
  16.     @Bean
  17.     public DirectExchange directExchange() {
  18.         //durable:重启后是否有效 autodelete: 长期未使用是否删除掉
  19.         return new DirectExchange("direct-buget-exchange", true, true);
  20.     }
  21.     //绑定队列和交换机
  22.     @Bean
  23.     public Binding binding() {
  24.         return BindingBuilder.bind(queue()).to(directExchange()).with("direct-buget-exchange-routing-key");
  25.     }
  26. }
复制代码
直连互换机 Direct Exchange 消息生产代码如下:
  1. //direct 模式消息发送
  2. public void sendDirectMessage(String message) {
  3.         rabbitTemplate.convertAndSend("direct-buget-exchange", "direct-buget-exchange-routing-key", message);
  4. }
复制代码
直连互换机 Direct Exchange 消息消费代码如下:
  1. //direct 直连模式消费端
  2. @RabbitListener(queues = "direct-buget-queue")
  3. public void directConsumer(String message) {
  4.         System.out.println("direct 消息消费成功,message内容为:" + message);
  5. }
复制代码
直连互换机 Direct Exchange 消息测试(触发消息生产及消费)代码代码如下:
  1. @GetMapping("/send-direct-buget-message")
  2. private String sendDirectBugetMessage(@RequestParam String message) {
  3.         myRabbitProducer.sendDirectMessage(message);
  4.         return "OK";
  5. }
复制代码
直连互换机 Direct Exchange 消息测试(触发消息生产及消费)效果如下:
  1. direct 消息消费成功,message内容为:hello message
复制代码
直连互换机,一对一模式,效果符合预期。
Topic Exchange(主题互换机)
主题互换机支持路由模糊匹配,可以利用星号和井号(#)作为通配符进行匹配,其中 ”*“ 可以代替一个单词,(#) 可以代替恣意个单词。
主题互换机 Topic Exchange 和队列 Queue 的路由设置代码如下:
  1. package com.user.service.rabbitmq;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. @Configuration
  6. public class RabbitTopicConfig {
  7.     //注入队列
  8.     @Bean("topicQueue")
  9.     public Queue queue1() {
  10.         return new Queue("topic-buget-queue");
  11.     }
  12.     //注入队列
  13.     @Bean("topicQueue2")
  14.     public Queue queue2() {
  15.         return new Queue("topic-buget-queue2");
  16.     }
  17.     //注入交换机
  18.     @Bean
  19.     public TopicExchange topicExchange() {
  20.         //durable:重启后是否有效 autodelete: 长期未使用是否删除掉
  21.         return new TopicExchange("topic-buget-exchange", true, true);
  22.     }
  23.     //绑定队列和交换机
  24.     @Bean("topicBinding")
  25.     public Binding binding() {
  26.         return BindingBuilder.bind(queue1()).to(topicExchange()).with("topic.buget.exchange.routing.key.*");
  27.     }
  28.     //绑定队列和交换机
  29.     @Bean("topicBinding2")
  30.     public Binding binding2() {
  31.         return BindingBuilder.bind(queue2()).to(topicExchange()).with("topic.buget.exchange.routing.key.#");
  32.     }
  33. }
复制代码
主题互换机 Topic Exchange 消息生产代码如下:
  1. //topic 模式消息发送
  2. public void sendTopicMessage(String message) {
  3.         rabbitTemplate.convertAndSend("topic-buget-exchange", "topic.buget.exchange.routing.key.1", message);
  4. }
  5. //topic 模式消息发送
  6. public void sendTopicMessage2(String message) {
  7.         rabbitTemplate.convertAndSend("topic-buget-exchange", "topic.buget.exchange.routing.key.1.1", message);
  8. }
复制代码
主题互换机 Topic Exchange 消息消费代码如下:
  1. //topic 模式消费端
  2. @RabbitListener(queues = "topic-buget-queue")
  3. public void topicConsumer(String message) {
  4.         System.out.println("topic topic-buget-queue 消息消费成功,message内容为:" + message);
  5. }
  6. //topic 模式消费端
  7. @RabbitListener(queues = "topic-buget-queue2")
  8. public void topicConsumer2(String message) {
  9.         System.out.println("topic topic-buget-queue2 消息消费成功,message内容为:" + message);
  10. }
复制代码
主题互换机 Topic Exchange 消息测试(触发消息生产及消费)代码代码如下:
  1. @GetMapping("/send-topic-buget-message")
  2. private String sendTopicBugetMessage(@RequestParam String message) {
  3.         myRabbitProducer.sendTopicMessage(message);
  4.         myRabbitProducer.sendTopicMessage2(message);
  5.         return "OK";
  6. }
复制代码
主题互换机 Topic Exchange 消息测试(触发消息生产及消费)效果如下:
  1. topic topic-buget-queue 消息消费成功,message内容为:hello topic
  2. topic topic-buget-queue2 消息消费成功,message内容为:hello topic
  3. topic topic-buget-queue2 消息消费成功,message内容为:hello topic
复制代码
主题互换机 Topic Exchange 我们声明了两个 topic.buget.exchange.routing.key.* 和 topic.buget.exchange.routing.key.#,其中星号可以代替一个单词,(#) 可以代替恣意个单词,因此 topic-buget-queue 只能匹配到 topic.buget.exchange.routing.key.1 的消息,topic topic-buget-queue2 则可以匹配到两个路由的消息,效果符合预期。
Fanout Exchange(扇形互换机)
扇形互换机,一个互换器可以绑定多个队列,只要互换机接收到消息就会发送给所有和它绑定的队列,不再进行 RoutingKey 判定,也就是我们常说的发布订阅模式。
扇形互换机 Fanout Exchange 和队列 Queue 的路由设置代码如下:
  1. package com.user.service.rabbitmq;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. @Configuration
  6. public class RabbitFanoutConfig {
  7.     //注入队列
  8.     @Bean("fanoutQueue")
  9.     public Queue queue1() {
  10.         return new Queue("fanout-buget-queue");
  11.     }
  12.     //注入队列
  13.     @Bean("fanoutQueue2")
  14.     public Queue queue2() {
  15.         return new Queue("fanout-buget-queue2");
  16.     }
  17.     //注入交换机
  18.     @Bean
  19.     public FanoutExchange fanoutExchange() {
  20.         //durable:重启后是否有效 autodelete: 长期未使用是否删除掉
  21.         return new FanoutExchange("fanout-buget-exchange", true, true);
  22.     }
  23.     //绑定队列和交换机
  24.     @Bean("fanoutBinding1")
  25.     public Binding binding() {
  26.         return BindingBuilder.bind(queue1()).to(fanoutExchange());
  27.     }
  28.     //绑定队列和交换机
  29.     @Bean("fanoutBinding2")
  30.     public Binding binding2() {
  31.         return BindingBuilder.bind(queue2()).to(fanoutExchange());
  32.     }
  33. }
复制代码
扇形互换机 Fanout Exchange 消息生产代码如下:
  1. //Fanout 模式消息发送
  2. public void sendFanoutMessage(String message) {
  3.         rabbitTemplate.convertAndSend("fanout-buget-exchange", "", message);
  4. }
复制代码
扇形互换机 Fanout Exchange 消息消费代码如下:
  1. //Fanout 模式消费端
  2. @RabbitListener(queues = "fanout-buget-queue")
  3. public void fanoutConsumer1(String message) {
  4.         System.out.println("fanout fanout-buget-queue 消息消费成功,message内容为:" + message);
  5. }
  6. //Fanout 模式消费端
  7. @RabbitListener(queues = "fanout-buget-queue2")
  8. public void fanoutConsumer2(String message) {
  9.         System.out.println("fanout fanout-buget-queue2 消息消费成功,message内容为:" + message);
  10. }
复制代码
扇形互换机 Fanout Exchange 消息测试(触发消息生产及消费)代码代码如下:
  1. @GetMapping("/send-fanout-buget-message")
  2. private String sendFanoutBugetMessage(@RequestParam String message) {
  3.         myRabbitProducer.sendFanoutMessage(message);
  4.         return "OK";
  5. }
复制代码
扇形互换机 Fanout Exchange 消息测试(触发消息生产及消费)效果如下:
  1. fanout fanout-buget-queue2 消息消费成功,message内容为:hello fanout
  2. fanout fanout-buget-queue 消息消费成功,message内容为:hello fanout
复制代码
扇形互换机 Fanout Exchange 队列 Queue 只要与其绑定,就可以把消息路由到对应的 Queue 上并完成消费,效果符合预期。
Headers Exchange(头互换机)
Header Exchange 不依靠 RoutingKey 的判定,而是根据发送的消息内容中的 headers 属性进行匹配,当消息投递到首部互换器时,RabbitMQ 会获取到该消息的 headers,对比其中的键值对是否完全匹配队列和互换器绑定时指定的键值对,假如完全匹配,则消息会路由到该队列,否则不会路由到该队列。
头互换机 Headers Exchange 和队列 Queue 的路由设置代码如下:
  1. package com.user.service.rabbitmq;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. import java.util.HashMap;
  6. import java.util.Map;
  7. @Configuration
  8. public class RabbitHeadersConfig {
  9.     //注入队列
  10.     @Bean("headersQueue")
  11.     public Queue queue1() {
  12.         return new Queue("headers-buget-queue");
  13.     }
  14.     //注入队列
  15.     @Bean("headersQueue2")
  16.     public Queue queue2() {
  17.         return new Queue("headers-buget-queue2");
  18.     }
  19.     //注入交换机
  20.     @Bean
  21.     public HeadersExchange headersExchange() {
  22.         //durable:重启后是否有效 autodelete: 长期未使用是否删除掉
  23.         return new HeadersExchange("headers-buget-exchange", true, true);
  24.     }
  25.     //绑定队列和交换机 绑定 Header中 header-key1 = a 的队列。
  26.     @Bean("headersBinding1")
  27.     public Binding binding() {
  28.         return BindingBuilder.bind(queue1()).to(headersExchange()).where("header-key1").matches("a");
  29.     }
  30.     //绑定队列和交换机 绑定 Header中  header-key2 = b 的队列。
  31.     @Bean("headersBinding2")
  32.     public Binding binding2() {
  33.         Map<String, Object> map = new HashMap<>();
  34.         map.put("header-key2", "b");
  35.         return BindingBuilder.bind(queue2()).to(headersExchange()).whereAny(map).match();
  36.     }
  37. }
复制代码
头互换机 Headers Exchange 消息生产代码如下:
  1. //Headers 模式消息发送
  2. public void sendHeadersMessage(String message) {
  3.         MessageProperties messageProperties = new MessageProperties();
  4.         messageProperties.setHeader("header-key1", "a");
  5.         rabbitTemplate.convertAndSend("headers-buget-exchange", "", new Message(message.getBytes(StandardCharsets.UTF_8), messageProperties));
  6.         MessageProperties messageProperties2 = new MessageProperties();
  7.         messageProperties2.setHeader("header-key2", "b");
  8.         rabbitTemplate.convertAndSend("headers-buget-exchange", "", new Message(message.getBytes(StandardCharsets.UTF_8), messageProperties2));
  9. }
复制代码
头互换机 Headers Exchange 消息消费代码如下:
  1. //headers 模式消费端
  2. @RabbitListener(queues = "headers-buget-queue")
  3. public void headersConsumer(String message) {
  4.         System.out.println("headers headers-buget-queue 消息消费成功,message内容为:" + message);
  5. }
  6. //headers 模式消费端
  7. @RabbitListener(queues = "headers-buget-queue2")
  8. public void headersConsumer2(String message) {
  9.         System.out.println("headers headers-buget-queue2 消息消费成功,message内容为:" + message);
  10. }
复制代码
头互换机 Headers Exchange 消息测试(触发消息生产及消费)代码代码如下:
  1. @GetMapping("/send-headers-buget-message")
  2. private String sendHeadersBugetMessage(@RequestParam String message) {
  3.         myRabbitProducer.sendHeadersMessage(message);
  4.         return "OK";
  5. }
复制代码
头互换机 Headers Exchange 消息测试(触发消息生产及消费)效果如下:
  1. headers headers-buget-queue2 消息消费成功,message内容为:hello headers
  2. headers headers-buget-queue 消息消费成功,message内容为:hello headers
  3. headers headers-buget-queue2 消息消费成功,message内容为:hello headers
复制代码
效果符合预期。
Backup Exchange(备份互换机)
RabbitMQ 本身是不存在备份互换机类型的,备份互换机是抽象出来的一个概念,通过设置互换机的 alternate-exchange 的屬性,属性值是互换机的名称,设置当前互换机的备份互换机,当消息路由无法在当前互换机匹配到合适的队列投递时,将会把消息转到备份互换机,分发到其绑定的备份队列中,备份互换机一般利用扇形互换机,因为其不需要进行路由匹配。
备份互换机 Backup Exchange 和队列 Queue 的路由设置代码如下:
  1. package com.user.service.rabbitmq;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. import java.util.HashMap;
  6. import java.util.Map;
  7. @Configuration
  8. public class RabbitBackupConfig {
  9.     //注入队列 durable:重启后是否有效 exclusive : 是否独自的 autodelete: 长期未使用是否删除掉
  10.     @Bean("backupQueue")
  11.     public Queue backupQueue() {
  12.         return new Queue("backup-buget-queue", true, false, false);
  13.     }
  14.     //注入队列
  15.     @Bean("noBbackupQueue")
  16.     public Queue noBbackupQueue() {
  17.         return new Queue("nobackup-buget-queue", true, false, false);
  18.     }
  19.     //注入 direct 交换机
  20.     @Bean("nonBackupExchange")
  21.     public DirectExchange nonBackupExchange() {
  22.         Map<String, Object> map = new HashMap<>();
  23.         //当消息路由无法在当前交换机匹配到合适的队列投递时 将消息转到备份交换机 backup-buget-exchange 分发到其绑定的备份队列中
  24.         map.put("alternate-exchange", "backup-buget-exchange");
  25.         return new DirectExchange("nobackup-buget-exchange", true, false, map);
  26.     }
  27.     //注入 Fanout 交换机
  28.     @Bean("backupExchange")
  29.     public FanoutExchange backupExchange(){
  30.         return new FanoutExchange("backup-buget-exchange",true,false);
  31.     }
  32.     //绑定非备份队列 direct交换机
  33.     @Bean
  34.     public Binding noBindBackupQueue() {
  35.         return BindingBuilder.bind(noBbackupQueue()).to(nonBackupExchange()).with("noback-buget-exchange-routing-key");
  36.     }
  37.     //扇形交换机的特性决定了它适合做备份交换机 (只要扇形交换机收到消息后就会被转发到与之绑定的队列中 不进行路由判断)
  38.     @Bean
  39.     public Binding bindBackupQueue(){
  40.         return BindingBuilder.bind(backupQueue()).to(backupExchange());
  41.     }
  42. }
复制代码
备份互换机 Backup Exchange 消息生产代码如下:
  1. //备份模式 消息发送
  2. public void sendBackupMessage() {
  3.         //路由正确匹配 消息投递到非备份队列中
  4.         rabbitTemplate.convertAndSend("nobackup-buget-exchange", "noback-buget-exchange-routing-key", "hello noBackup");
  5.         //路由无法匹配 消息投递到备份队列中 nobackup-buget-exchange 交换机中没有路由 noback-buget-exchange-routing-key-1
  6.         rabbitTemplate.convertAndSend("nobackup-buget-exchange", "noback-buget-exchange-routing-key-1", "hello backup");
  7. }
复制代码
备份互换机 Backup Exchange 消息消费代码如下:
  1. //backup 模式消费端(能够正确匹配到的队列)
  2. @RabbitListener(queues = "nobackup-buget-queue")
  3. public void noBackupConsumer(String message) {
  4.         System.out.println("nobackup-buget-queue 消息消费成功,message内容为:" + message);
  5. }
  6. //backup 模式消费端 (没有匹配到全部来到 backup-buget-queue 队列) 也就是备份队列
  7. @RabbitListener(queues = "backup-buget-queue")
  8. public void nackupConsumer(String message) {
  9.         System.out.println("backup-buget-queue 消息消费成功,message内容为:" + message);
  10. }
复制代码
备份互换机 Backup Exchange 消息测试(触发消息生产及消费)代码代码如下:
  1. @GetMapping("/send-backup-buget-message")
  2. private String sendBackupBugetMessage() {
  3.         myRabbitProducer.sendBackupMessage();
  4.         return "OK";
  5. }
复制代码
备份互换机 Backup Exchange 消息测试(触发消息生产及消费)效果如下:
  1. nobackup-buget-queue 消息消费成功,message内容为:hello noBackup
  2. backup-buget-queue 消息消费成功,message内容为:hello backup
复制代码
我们通过 alternate-exchange 属性给 nobackup-buget-exchange 互换机设置了备份互换机 backup-buget-exchange,我们分别发了两个路由信息 noback-buget-exchange-routing-key 和 noback-buget-exchange-routing-key-1(找不到的路由),最终两条消息被消费了,效果符合预期。
Dead Exchange(死信互换机)
同备份互换机一样 RabbitMQ 本身是不存在死信互换机类型的,死信互换机可以明白成一个拥有特殊意义的直连互换机,通过设置队列中的 x-dead-letter-exchange 和 x-dead-letter-routing-key 属性来设置绑定死信互换机,当消费者拒绝消费、消息积存队列到达最大长度大概消息过期时,消息从正常队列转到死信队列,死信在转移到死信队列时,它的路由是会保存下来,但是假如设置了 x-dead-letter-routing-key 参数的话,路由就会被更换为设置的这个值。
死信互换机 Dead Exchange 和队列 Queue 的路由设置代码如下:
  1. package com.user.service.rabbitmq;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. import java.util.HashMap;
  6. import java.util.Map;
  7. @Configuration
  8. public class RabbitDeadConfig {
  9.     //注入 死信队列 durable:重启后是否有效 exclusive : 是否独自的 autodelete: 长期未使用是否删除掉
  10.     @Bean("deadQueue")
  11.     public Queue deadQueue() {
  12.         return new Queue("dead-buget-queue", true, false, false);
  13.     }
  14.     //注入正常队列队列
  15.     @Bean("normalQueue")
  16.     public Queue normalQueue() {
  17.         Map<String, Object> map = new HashMap<>();
  18.         //当前队列中 message 过期时间
  19.         map.put("x-message-ttl", 5000);
  20.         //给当前队列绑定死信交换机
  21.         map.put("x-dead-letter-exchange", "dead-buget-exchange");
  22.         //绑定 Routing key
  23.         map.put("x-dead-letter-routing-key", "dead-buget-exchange-routing-key");
  24.         return new Queue("normal-buget-queue", true, false, false, map);
  25.     }
  26.     //注入 normalExchange 交换机
  27.     @Bean("normalExchange")
  28.     public DirectExchange normalExchange() {
  29.         return new DirectExchange("normal-buget-exchange", true, false);
  30.     }
  31.     //注入 死信交换机
  32.     @Bean("deadExchange")
  33.     public DirectExchange deadExchange() {
  34.         return new DirectExchange("dead-buget-exchange", true, false);
  35.     }
  36.     //死信队列和交换器绑定
  37.     @Bean
  38.     public Binding bindDeadQueue() {
  39.         return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("dead-buget-exchange-routing-key");
  40.     }
  41.     //正常队列的交换器和队列绑定
  42.     @Bean
  43.     public Binding bindNormalQueue() {
  44.         return BindingBuilder.bind(normalQueue()).to(normalExchange()).with("normal-buget-exchange-routing-key");
  45.     }
  46. }
复制代码
死信互换机 Dead Exchange 消息生产代码如下:
  1. //死信 模式消息发送
  2. public void sendDeadMessage(String message) {
  3.         System.out.println("normal-buget-queue 开始发送,当前时间:"+System.currentTimeMillis());
  4.         rabbitTemplate.convertAndSend("normal-buget-exchange", "normal-buget-exchange-routing-key", message);
  5. }
复制代码
死信互换机 Dead Exchange 消息消费代码如下:
  1. //死信 模式消费端
  2. @RabbitListener(queues = "dead-buget-queue")
  3. public void deadConsumer(String message) {
  4.         System.out.println("dead-buget-queue 开始消费,当前时间:"+System.currentTimeMillis());
  5.         System.out.println("dead-buget-queue 消息消费成功,message内容为:" + message);
  6. }
复制代码
死信互换机 Dead Exchange 消息测试(触发消息生产及消费)代码代码如下:
  1. @GetMapping("/send-dead-buget-message")
  2. private String sendBackupBugetMessage(@RequestParam String message) {
  3.         myRabbitProducer.sendDeadMessage(message);
  4.         return "OK";
  5. }
复制代码
死信互换机 Dead Exchange 消息测试(触发消息生产及消费)效果如下:
  1. normal-buget-queue 开始发送,当前时间:1725281520323
  2. dead-buget-queue 开始消费,当前时间:1725281525327
  3. dead-buget-queue 消息消费成功,message内容为:dead
复制代码
我们对 normalQueue 设置了 TTL 为 5秒钟,并为之设置了 死信互换机 dead-buget-exchange 和路由 dead-buget-exchange-routing-key,然后在 normalQueue 上发送了消息,但是没有对 normalQueue 进行监听消费,我们发现过了(1725281525327-1725281520323=5004)5秒后,dead-buget-queue 完成了消息消费,效果符合预期。
消息生产完备代码如下:
  1. package com.user.service.rabbitmq;
  2. import org.springframework.amqp.core.Message;
  3. import org.springframework.amqp.core.MessageProperties;
  4. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.stereotype.Component;
  7. import java.nio.charset.StandardCharsets;
  8. @Component
  9. public class MyRabbitProducer {
  10.     @Autowired
  11.     private RabbitTemplate rabbitTemplate;
  12.     //direct 模式消息发送
  13.     public void sendDirectMessage(String message) {
  14.         rabbitTemplate.convertAndSend("direct-buget-exchange", "direct-buget-exchange-routing-key", message);
  15.     }
  16.     //topic 模式消息发送
  17.     public void sendTopicMessage(String message) {
  18.         rabbitTemplate.convertAndSend("topic-buget-exchange", "topic.buget.exchange.routing.key.1", message);
  19.     }
  20.     //topic 模式消息发送
  21.     public void sendTopicMessage2(String message) {
  22.         rabbitTemplate.convertAndSend("topic-buget-exchange", "topic.buget.exchange.routing.key.1.1", message);
  23.     }
  24.     //Fanout 模式消息发送
  25.     public void sendFanoutMessage(String message) {
  26.         rabbitTemplate.convertAndSend("fanout-buget-exchange", "", message);
  27.     }
  28.     //Headers 模式消息发送
  29.     public void sendHeadersMessage(String message) {
  30.         MessageProperties messageProperties = new MessageProperties();
  31.         messageProperties.setHeader("header-key1", "a");
  32.         rabbitTemplate.convertAndSend("headers-buget-exchange", "", new Message(message.getBytes(StandardCharsets.UTF_8), messageProperties));
  33.         MessageProperties messageProperties2 = new MessageProperties();
  34.         messageProperties2.setHeader("header-key2", "b");
  35.         rabbitTemplate.convertAndSend("headers-buget-exchange", "", new Message(message.getBytes(StandardCharsets.UTF_8), messageProperties2));
  36.     }
  37.     //备份模式 消息发送
  38.     public void sendBackupMessage() {
  39.         //路由正确匹配 消息投递到非备份队列中
  40.         rabbitTemplate.convertAndSend("nobackup-buget-exchange", "noback-buget-exchange-routing-key", "hello noBackup");
  41.         //路由无法匹配 消息投递到备份队列中 nobackup-buget-exchange 交换机中没有路由 noback-buget-exchange-routing-key-1
  42.         rabbitTemplate.convertAndSend("nobackup-buget-exchange", "noback-buget-exchange-routing-key-1", "hello backup");
  43.     }
  44.     //死信 模式消息发送
  45.     public void sendDeadMessage(String message) {
  46.         System.out.println("normal-buget-queue 开始发送,当前时间:"+System.currentTimeMillis());
  47.         rabbitTemplate.convertAndSend("normal-buget-exchange", "normal-buget-exchange-routing-key", message);
  48.     }
  49. }
复制代码
消息消费完备代码如下:
  1. package com.user.service.rabbitmq;
  2. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  3. import org.springframework.stereotype.Component;
  4. @Component
  5. public class MyRabbitConsumer {
  6.     //direct 直连模式消费端
  7.     @RabbitListener(queues = "direct-buget-queue")
  8.     public void directConsumer(String message) {
  9.         System.out.println("direct 消息消费成功,message内容为:" + message);
  10.     }
  11.     //topic 模式消费端
  12.     @RabbitListener(queues = "topic-buget-queue")
  13.     public void topicConsumer(String message) {
  14.         System.out.println("topic topic-buget-queue 消息消费成功,message内容为:" + message);
  15.     }
  16.     //topic 模式消费端
  17.     @RabbitListener(queues = "topic-buget-queue2")
  18.     public void topicConsumer2(String message) {
  19.         System.out.println("topic topic-buget-queue2 消息消费成功,message内容为:" + message);
  20.     }
  21.     //Fanout 模式消费端
  22.     @RabbitListener(queues = "fanout-buget-queue")
  23.     public void fanoutConsumer1(String message) {
  24.         System.out.println("fanout fanout-buget-queue 消息消费成功,message内容为:" + message);
  25.     }
  26.     //Fanout 模式消费端
  27.     @RabbitListener(queues = "fanout-buget-queue2")
  28.     public void fanoutConsumer2(String message) {
  29.         System.out.println("fanout fanout-buget-queue2 消息消费成功,message内容为:" + message);
  30.     }
  31.     //headers 模式消费端
  32.     @RabbitListener(queues = "headers-buget-queue")
  33.     public void headersConsumer(String message) {
  34.         System.out.println("headers headers-buget-queue 消息消费成功,message内容为:" + message);
  35.     }
  36.     //headers 模式消费端
  37.     @RabbitListener(queues = "headers-buget-queue2")
  38.     public void headersConsumer2(String message) {
  39.         System.out.println("headers headers-buget-queue2 消息消费成功,message内容为:" + message);
  40.     }
  41.     //backup 模式消费端(能够正确匹配到的队列)
  42.     @RabbitListener(queues = "nobackup-buget-queue")
  43.     public void noBackupConsumer(String message) {
  44.         System.out.println("nobackup-buget-queue 消息消费成功,message内容为:" + message);
  45.     }
  46.     //backup 模式消费端 (没有匹配到全部来到 backup-buget-queue 队列) 也就是备份队列
  47.     @RabbitListener(queues = "backup-buget-queue")
  48.     public void nackupConsumer(String message) {
  49.         System.out.println("backup-buget-queue 消息消费成功,message内容为:" + message);
  50.     }
  51.     //死信 模式消费端
  52.     @RabbitListener(queues = "dead-buget-queue")
  53.     public void deadConsumer(String message) {
  54.         System.out.println("dead-buget-queue 开始消费,当前时间:"+System.currentTimeMillis());
  55.         System.out.println("dead-buget-queue 消息消费成功,message内容为:" + message);
  56.     }
  57. }
复制代码
消息测试(触发消息生产及消费)代码代码如下:
  1. package com.user.service.controller;
  2. import com.user.service.rabbitmq.MyRabbitProducer;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.web.bind.annotation.GetMapping;
  5. import org.springframework.web.bind.annotation.RequestParam;
  6. import org.springframework.web.bind.annotation.RestController;
  7. @RestController
  8. public class RabbitController {
  9.     @Autowired
  10.     private MyRabbitProducer myRabbitProducer;
  11.     @GetMapping("/send-direct-buget-message")
  12.     private String sendDirectBugetMessage(@RequestParam String message) {
  13.         myRabbitProducer.sendDirectMessage(message);
  14.         return "OK";
  15.     }
  16.     @GetMapping("/send-topic-buget-message")
  17.     private String sendTopicBugetMessage(@RequestParam String message) {
  18.         myRabbitProducer.sendTopicMessage(message);
  19.         myRabbitProducer.sendTopicMessage2(message);
  20.         return "OK";
  21.     }
  22.     @GetMapping("/send-fanout-buget-message")
  23.     private String sendFanoutBugetMessage(@RequestParam String message) {
  24.         myRabbitProducer.sendFanoutMessage(message);
  25.         return "OK";
  26.     }
  27.     @GetMapping("/send-headers-buget-message")
  28.     private String sendHeadersBugetMessage(@RequestParam String message) {
  29.         myRabbitProducer.sendHeadersMessage(message);
  30.         return "OK";
  31.     }
  32.     @GetMapping("/send-backup-buget-message")
  33.     private String sendBackupBugetMessage() {
  34.         myRabbitProducer.sendBackupMessage();
  35.         return "OK";
  36.     }
  37.     @GetMapping("/send-dead-buget-message")
  38.     private String sendBackupBugetMessage(@RequestParam String message) {
  39.         myRabbitProducer.sendDeadMessage(message);
  40.         return "OK";
  41.     }
  42. }
复制代码
尚硅谷 RabbitMQ 学习视频
总结:本篇简单分享了 RabbitMQ 六种互换机的消息生产和消费的利用,只是 Demo 案例,在实际项目中更换成业务代码即可,盼望可以资助到有需要的朋侪。
如有不准确的地方请各位指出纠正。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

民工心事

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表