6、Spring Boot 3.x集成RabbitMQ动态交换机、队列

盛世宏图  金牌会员 | 2024-10-13 09:15:59 | 来自手机 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 885|帖子 885|积分 2655

一、前言

  1. 本篇主要是围绕着 Spring Boot 3.x 与 RabbitMQ 的动态配置集成,比如动态新增 RabbitMQ 交换机、队列等操作。
复制代码
二、默认RabbitMQ中的exchange、queue动态新增及监听

1、新增RabbitMQ设置

RabbitMQConfig.java

  1. import org.springframework.amqp.rabbit.annotation.EnableRabbit;
  2. import org.springframework.amqp.rabbit.core.RabbitAdmin;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.context.annotation.Configuration;
  6. /**
  7. * @className: RabbitConfig
  8. * @program: chain
  9. * @description: RabbitMQ 配置类
  10. * @author: kenny
  11. * @create: 2024-10-03 21:59
  12. * @version: 1.0.0
  13. */
  14. @Configuration
  15. @EnableRabbit
  16. public class RabbitMQConfig {
  17.     /**
  18.      * 创建 RabbitTemplate, 用于发送消息
  19.      *
  20.      * @return RabbitTemplate
  21.      */
  22.     @Bean
  23.     public RabbitTemplate rabbitTemplate() {
  24.         return new RabbitTemplate();
  25.     }
  26.     /**
  27.      * 创建 RabbitAdmin, 用于创建 Exchange 和 Queue
  28.      *
  29.      * @param rabbitTemplate RabbitTemplate
  30.      * @return RabbitAdmin
  31.      */
  32.     @Bean
  33.     public RabbitAdmin rabbitAdmin(RabbitTemplate rabbitTemplate) {
  34.         return new RabbitAdmin(rabbitTemplate);
  35.     }
  36. }
复制代码
2、新增RabbitMQ动态操作组件

RabbitDynamicConfigService.java

  1. RabbitDynamicConfigService.java 中包含了不同类型Exchange的创建、删除,Queue的创建和删除、绑定Exchange
复制代码
  1. import lombok.extern.slf4j.Slf4j;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.amqp.rabbit.core.RabbitAdmin;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.stereotype.Service;
  6. import java.util.Map;
  7. /**
  8. * @className: RabbitDynamicConfigService
  9. * @program: chain
  10. * @description: 动态创建队列和交换机
  11. * @author: kenny
  12. * @create: 2024-10-03 23:49
  13. * @version: 1.0.0
  14. */
  15. @Slf4j
  16. @Service
  17. public class RabbitDynamicConfigService {
  18.     /**
  19.      * 为了解决循环依赖问题
  20.      */
  21.     private final RabbitAdmin rabbitAdmin;
  22.     private final RabbitListenerService rabbitListenerService;
  23.     @Autowired
  24.     public RabbitDynamicConfigService(RabbitAdmin rabbitAdmin,
  25.                                       RabbitListenerService rabbitListenerService) {
  26.         this.rabbitAdmin = rabbitAdmin;
  27.         this.rabbitListenerService = rabbitListenerService;
  28.     }
  29.     /**
  30.      * 动态创建队列,并持久化
  31.      *
  32.      * @param queueName 队列名称
  33.      */
  34.     public void createQueue(String queueName) {
  35.         // 队列持久化
  36.         Queue queue = new Queue(queueName, true);
  37.         // 创建队列
  38.         rabbitAdmin.declareQueue(queue);
  39.         System.out.println("队列创建成功: " + queueName);
  40.     }
  41.     /**
  42.      * 动态创建队列,并持久化
  43.      *
  44.      * @param queueName 队列名称
  45.      */
  46.     public void createQueue(String queueName, Boolean isListener) {
  47.         // 队列持久化
  48.         Queue queue = new Queue(queueName, true);
  49.         // 创建队列
  50.         rabbitAdmin.declareQueue(queue);
  51.         System.out.println("队列创建成功: " + queueName);
  52.         if (!isListener) {
  53.             return;
  54.         }
  55.         rabbitListenerService.createListener(queueName);
  56.     }
  57.     /**
  58.      * 动态创建交换机,并持久化
  59.      *
  60.      * @param exchangeName 交换机名称
  61.      */
  62.     public void createExchange(String exchangeName) {
  63.         // 交换机持久化
  64.         DirectExchange exchange = new DirectExchange(exchangeName, true, false);
  65.         rabbitAdmin.declareExchange(exchange);
  66.         log.info("交换机创建成功: {}", exchangeName);
  67.     }
  68.     // 动态创建 Fanout 交换机
  69.     public void createDirectExchange(String exchangeName) {
  70.         DirectExchange fanoutExchange = new DirectExchange(exchangeName, true, false); // 持久化
  71.         rabbitAdmin.declareExchange(fanoutExchange);
  72.         log.info("Direct 交换机创建成功: {}", exchangeName);
  73.     }
  74.     // 动态创建 Fanout 交换机
  75.     public void createFanoutExchange(String exchangeName) {
  76.         FanoutExchange fanoutExchange = new FanoutExchange(exchangeName, true, false); // 持久化
  77.         rabbitAdmin.declareExchange(fanoutExchange);
  78.         log.info("Fanout 交换机创建成功: {}", exchangeName);
  79.     }
  80.     // 动态创建 Topic 交换机
  81.     public void createTopicExchange(String exchangeName) {
  82.         TopicExchange topicExchange = new TopicExchange(exchangeName, true, false); // 持久化
  83.         rabbitAdmin.declareExchange(topicExchange);
  84.         log.info("Topic 交换机创建成功: {}", exchangeName);
  85.     }
  86.     // 动态创建 Headers 交换机
  87.     public void createHeadersExchange(String exchangeName) {
  88.         HeadersExchange headersExchange = new HeadersExchange(exchangeName, true, false); // 持久化
  89.         rabbitAdmin.declareExchange(headersExchange);
  90.         log.info("Headers 交换机创建成功: {}", exchangeName);
  91.     }
  92.     /**
  93.      * 动态绑定队列到交换机,并指定路由键
  94.      *
  95.      * @param queueName    队列名称
  96.      * @param exchangeName 交换机名称
  97.      * @param routingKey   路由键
  98.      */
  99.     public void bindQueueToExchange(String queueName, String exchangeName, String routingKey) {
  100.         Queue queue = new Queue(queueName);
  101.         DirectExchange exchange = new DirectExchange(exchangeName);
  102.         Binding binding = BindingBuilder.bind(queue).to(exchange).with(routingKey);
  103.         rabbitAdmin.declareBinding(binding);
  104.         log.info("绑定创建成功: {}", queueName + " -> {}", exchangeName + " 使用路由键: {}", routingKey);
  105.     }
  106.     /**
  107.      * 动态绑定队列到交换机,并指定路由键
  108.      *
  109.      * @param queueName    队列名称
  110.      * @param exchangeName 交换机名称
  111.      * @param routingKey   路由键
  112.      */
  113.     public void moreExchangeTypeBindQueueToExchange(String queueName, String exchangeType, String exchangeName, String routingKey, Map<String, Object> headers) {
  114.         switch (exchangeType) {
  115.             case "fanout" -> bindQueueToExchange(queueName, exchangeName, routingKey);
  116.             case "direct" -> bindQueueToDirectExchange(queueName, exchangeName, routingKey);
  117.             case "topic" -> bindQueueToTopicExchange(queueName, exchangeName, routingKey);
  118.             case "headers" -> bindQueueToHeadersExchange(queueName, exchangeName, headers);
  119.             default -> throw new IllegalArgumentException("不支持的交换机类型: " + exchangeType);
  120.         }
  121.     }
  122.     /**
  123.      * 动态绑定队列到交换机,并指定路由键(exchange: direct)
  124.      *
  125.      * @param queueName    队列名称
  126.      * @param exchangeName 交换机名称
  127.      */
  128.     public void bindQueueToFanoutExchange(String queueName, String exchangeName) {
  129.         Queue queue = new Queue(queueName);
  130.         FanoutExchange exchange = new FanoutExchange(exchangeName);
  131.         Binding binding = BindingBuilder.bind(queue).to(exchange);
  132.         rabbitAdmin.declareBinding(binding);
  133.         log.info("绑定创建成功: {}", queueName + " -> {}", exchangeName);
  134.     }
  135.     /**
  136.      * 动态绑定队列到交换机,并指定路由键(exchange: direct)
  137.      *
  138.      * @param queueName    队列名称
  139.      * @param exchangeName 交换机名称
  140.      * @param routingKey   路由键
  141.      */
  142.     public void bindQueueToDirectExchange(String queueName, String exchangeName, String routingKey) {
  143.         Queue queue = new Queue(queueName);
  144.         DirectExchange exchange = new DirectExchange(exchangeName);
  145.         Binding binding = BindingBuilder.bind(queue).to(exchange).with(routingKey);
  146.         rabbitAdmin.declareBinding(binding);
  147.         log.info("绑定创建成功: {}", queueName + " -> {}", exchangeName + " 使用路由键: {}", routingKey);
  148.     }
  149.     /**
  150.      * 动态绑定队列到交换机,并指定路由键(exchange: topic)
  151.      *
  152.      * @param queueName    队列名称
  153.      * @param exchangeName 交换机名称
  154.      * @param routingKey   路由键
  155.      */
  156.     public void bindQueueToTopicExchange(String queueName, String exchangeName, String routingKey) {
  157.         Queue queue = new Queue(queueName);
  158.         TopicExchange exchange = new TopicExchange(exchangeName);
  159.         Binding binding = BindingBuilder.bind(queue).to(exchange).with(routingKey);
  160.         rabbitAdmin.declareBinding(binding);
  161.         log.info("绑定创建成功: {}", queueName + " -> {}", exchangeName + " 使用路由键: {}", routingKey);
  162.     }
  163.     /**
  164.      * 动态绑定队列到交换机,并指定路由键(exchange: headers)
  165.      *
  166.      * @param queueName    队列名称
  167.      * @param exchangeName 交换机名称
  168.      * @param headers      路由键
  169.      */
  170.     public void bindQueueToHeadersExchange(String queueName, String exchangeName, Map<String, Object> headers) {
  171.         Queue queue = new Queue(queueName);
  172.         HeadersExchange exchange = new HeadersExchange(exchangeName);
  173.         Binding binding = BindingBuilder.bind(queue).to(exchange).whereAll(headers).match();
  174.         rabbitAdmin.declareBinding(binding);
  175.         log.info("队列 {}", queueName + " 已绑定到 Headers 交换机 {}", exchangeName + ",使用头部匹配规则: {}", headers);
  176.     }
  177.     /**
  178.      * 动态删除队列
  179.      *
  180.      * @param queueName 队列名称
  181.      */
  182.     public void deleteQueue(String queueName) {
  183.         rabbitAdmin.deleteQueue(queueName);
  184.         log.info("队列删除成功: {}", queueName);
  185.     }
  186.     /**
  187.      * 动态删除交换机
  188.      *
  189.      * @param exchangeName 交换机名称
  190.      */
  191.     public void deleteExchange(String exchangeName) {
  192.         rabbitAdmin.deleteExchange(exchangeName);
  193.         log.info("交换机删除成功: {}", exchangeName);
  194.     }
  195. }
复制代码
3、RabbitMQ中队列的动态监听

RabbitListenerService.java

  1. import lombok.extern.slf4j.Slf4j;
  2. import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
  3. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
  4. import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
  5. import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.stereotype.Service;
  8. /**
  9. * @className: RabbitListenerService
  10. * @program: chain
  11. * @description: RabbitMQ监听器Service组件
  12. * @author: kenny
  13. * @create: 2024-10-04 01:40
  14. * @version: 1.0.0
  15. */
  16. @Slf4j
  17. @Service
  18. public class RabbitListenerService {
  19.     // 为了解决循环依赖问题
  20.     private final SimpleRabbitListenerContainerFactory listenerContainerFactory;
  21.     private final ConnectionFactory connectionFactory;
  22.     @Autowired
  23.     public RabbitListenerService(
  24.             SimpleRabbitListenerContainerFactory listenerContainerFactory,
  25.             ConnectionFactory connectionFactory) {
  26.         this.listenerContainerFactory = listenerContainerFactory;
  27.         this.connectionFactory = connectionFactory;
  28.     }
  29.     /**
  30.      * 创建监听器容器并启动监听
  31.      *
  32.      * @param queueName 队列名称
  33.      */
  34.     public void createListener(String queueName) {
  35.         // 创建并启动监听器容器
  36.         SimpleMessageListenerContainer container = listenerContainerFactory.createListenerContainer();
  37.         container.setConnectionFactory(connectionFactory);
  38.         container.setQueueNames(queueName);
  39.         // 监听逻辑处理
  40.         container.setMessageListener(new MessageListenerAdapter(new Object() {
  41.             public void handleMessage(String message) {
  42.                 System.out.println("收到来自RabbitMQ中队列:" + queueName + " 队列的消息:" + message);
  43.             }
  44.         }));
  45.         // 启动监听器容器
  46.         container.start();
  47.         System.out.println("RabbitMQ队列监听器已启动:" + queueName);
  48.     }
  49. }
复制代码
4、RabbitMQ中的Exchange、Queue动态操作接口

RabbitDynamicChannelController.java

  1. import com.chain.air.rpp.exchange.rabbit.RabbitDynamicConfigService;
  2. import jakarta.annotation.Resource;
  3. import org.springframework.web.bind.annotation.GetMapping;
  4. import org.springframework.web.bind.annotation.RequestMapping;
  5. import org.springframework.web.bind.annotation.RequestParam;
  6. import org.springframework.web.bind.annotation.RestController;
  7. import java.util.HashMap;
  8. import java.util.Map;
  9. /**
  10. * @className: RabbitDynamicController
  11. * @program: chain
  12. * @description: RabbitMQ 动态创建队列、交换机,绑定等操作
  13. * @author: kenny
  14. * @create: 2024-10-04 00:22
  15. * @version: 1.0.0
  16. */
  17. @RestController
  18. @RequestMapping("/rabbit/dynamic/channel")
  19. public class RabbitDynamicChannelController {
  20.     /**
  21.      * 动态创建队列和交换机
  22.      */
  23.     @Resource
  24.     private RabbitDynamicConfigService rabbitDynamicConfigService;
  25.     /**
  26.      * 动态创建队列
  27.      *
  28.      * @param queueName 队列名称
  29.      * @return 处理结果
  30.      */
  31.     @GetMapping("/createQueue")
  32.     public String createQueue(@RequestParam("queueName") String queueName) {
  33.         rabbitDynamicConfigService.createQueue(queueName);
  34.         return "队列已创建: " + queueName;
  35.     }
  36.     /**
  37.      * 动态创建交换机
  38.      *
  39.      * @param exchangeName 交换机名称
  40.      * @return 处理结果
  41.      */
  42.     @GetMapping("/createExchange")
  43.     public String createExchange(@RequestParam("exchangeName") String exchangeName) {
  44.         rabbitDynamicConfigService.createExchange(exchangeName);
  45.         return "交换机已创建: " + exchangeName;
  46.     }
  47.     /**
  48.      * 动态绑定队列和交换机
  49.      *
  50.      * @param queueName    队列名称
  51.      * @param exchangeName 交换机名称
  52.      * @param routingKey   路由键
  53.      * @return 处理结果
  54.      */
  55.     @GetMapping("/bindQueue")
  56.     public String bindQueueToExchange(@RequestParam("queueName") String queueName,
  57.                                       @RequestParam("exchangeName") String exchangeName,
  58.                                       @RequestParam("routingKey") String routingKey) {
  59.         rabbitDynamicConfigService.bindQueueToExchange(queueName, exchangeName, routingKey);
  60.         return "队列和交换机已绑定: " + queueName + " -> " + exchangeName;
  61.     }
  62.     /**
  63.      * 动态删除队列
  64.      *
  65.      * @param queueName 队列名称
  66.      * @return 处理结果
  67.      */
  68.     @GetMapping("/deleteQueue")
  69.     public String deleteQueue(@RequestParam("queueName") String queueName) {
  70.         rabbitDynamicConfigService.deleteQueue(queueName);
  71.         return "队列已删除: " + queueName;
  72.     }
  73.     /**
  74.      * 动态删除交换机
  75.      *
  76.      * @param exchangeName 交换机名称
  77.      * @return 处理结果
  78.      */
  79.     @GetMapping("/deleteExchange")
  80.     public String deleteExchange(@RequestParam("exchangeName") String exchangeName) {
  81.         rabbitDynamicConfigService.deleteExchange(exchangeName);
  82.         return "交换机已删除: " + exchangeName;
  83.     }
  84.     // 创建并绑定 Fanout 交换机
  85.     @GetMapping("/createDirectExchange")
  86.     public String createDirectExchange(@RequestParam String exchangeName, @RequestParam String queueName, @RequestParam String routingKey) {
  87.         rabbitDynamicConfigService.createDirectExchange(exchangeName);
  88.         rabbitDynamicConfigService.bindQueueToDirectExchange(queueName, exchangeName, routingKey);
  89.         return "Fanout Exchange and Queue Binding created: " + exchangeName + " -> " + queueName + " with routing key: " + routingKey;
  90.     }
  91.     // 创建并绑定 Fanout 交换机
  92.     @GetMapping("/createFanoutExchange")
  93.     public String createFanoutExchange(@RequestParam String exchangeName, @RequestParam String queueName) {
  94.         rabbitDynamicConfigService.createFanoutExchange(exchangeName);
  95.         rabbitDynamicConfigService.bindQueueToFanoutExchange(queueName, exchangeName);
  96.         return "Fanout Exchange and Queue Binding created: " + exchangeName + " -> " + queueName;
  97.     }
  98.     // 创建并绑定 Topic 交换机
  99.     @GetMapping("/createTopicExchange")
  100.     public String createTopicExchange(@RequestParam String exchangeName, @RequestParam String queueName, @RequestParam String routingKey) {
  101.         rabbitDynamicConfigService.createTopicExchange(exchangeName);
  102.         rabbitDynamicConfigService.bindQueueToTopicExchange(queueName, exchangeName, routingKey);
  103.         return "Topic Exchange and Queue Binding created: " + exchangeName + " -> " + queueName + " with routing key: " + routingKey;
  104.     }
  105.     // 创建并绑定 Headers 交换机
  106.     @GetMapping("/createHeadersExchange")
  107.     public String createHeadersExchange(@RequestParam String exchangeName, @RequestParam String queueName, @RequestParam Map<String, String> headersMap) {
  108.         Map<String, Object> headers = new HashMap<>(headersMap);
  109.         rabbitDynamicConfigService.createHeadersExchange(exchangeName);
  110.         rabbitDynamicConfigService.bindQueueToHeadersExchange(queueName, exchangeName, headers);
  111.         return "Headers Exchange and Queue Binding created: " + exchangeName + " -> " + queueName + " with headers: " + headers;
  112.     }
  113. }
复制代码
5、RabbitMQ中的Queue消息监听动态操作接口

RabbitChannelListenerController.java

  1. import com.chain.air.rpp.exchange.rabbit.RabbitDynamicConfigService;
  2. import jakarta.annotation.Resource;
  3. import org.springframework.web.bind.annotation.GetMapping;
  4. import org.springframework.web.bind.annotation.RequestMapping;
  5. import org.springframework.web.bind.annotation.RequestParam;
  6. import org.springframework.web.bind.annotation.RestController;
  7. /**
  8. * @className: RabbitListenerController
  9. * @program: chain
  10. * @description: RabbitMQ 监听器 Controller 组件
  11. * @author: kenny
  12. * @create: 2024-10-04 01:30
  13. * @version: 1.0.0
  14. */
  15. @RestController
  16. @RequestMapping("/rabbit/channel/listener")
  17. public class RabbitChannelListenerController {
  18.     @Resource
  19.     private RabbitDynamicConfigService rabbitDynamicConfigService;
  20.     /**
  21.      * 创建监听器,监听指定队列
  22.      *
  23.      * @param queueName 队列名称
  24.      * @return 处理结果
  25.      */
  26.     @GetMapping("/queue")
  27.     public String listenQueue(@RequestParam("queueName") String queueName) {
  28.         rabbitDynamicConfigService.createQueue(queueName, true);
  29.         return "开始监听队列:" + queueName;
  30.     }
  31. }
复制代码
三、动态exchange、queue的测试

1、测试Exchange、Queue的动态创建和删除

2、测试Exchange和Queue的动态绑定

3、发送、接收消息测试动态创建Exchange、Queue

4、测试Queue的动态监听接口

下一篇:7、Spring Boot 3.x集成RabbitMQ动态实例等操作

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

盛世宏图

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表