一、前言
- 本篇主要是围绕着 Spring Boot 3.x 与 RabbitMQ 的动态配置集成,比如动态新增 RabbitMQ 交换机、队列等操作。
复制代码 二、默认RabbitMQ中的exchange、queue动态新增及监听
1、新增RabbitMQ设置
RabbitMQConfig.java
- import org.springframework.amqp.rabbit.annotation.EnableRabbit;
- import org.springframework.amqp.rabbit.core.RabbitAdmin;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- /**
- * @className: RabbitConfig
- * @program: chain
- * @description: RabbitMQ 配置类
- * @author: kenny
- * @create: 2024-10-03 21:59
- * @version: 1.0.0
- */
- @Configuration
- @EnableRabbit
- public class RabbitMQConfig {
- /**
- * 创建 RabbitTemplate, 用于发送消息
- *
- * @return RabbitTemplate
- */
- @Bean
- public RabbitTemplate rabbitTemplate() {
- return new RabbitTemplate();
- }
- /**
- * 创建 RabbitAdmin, 用于创建 Exchange 和 Queue
- *
- * @param rabbitTemplate RabbitTemplate
- * @return RabbitAdmin
- */
- @Bean
- public RabbitAdmin rabbitAdmin(RabbitTemplate rabbitTemplate) {
- return new RabbitAdmin(rabbitTemplate);
- }
- }
复制代码 2、新增RabbitMQ动态操作组件
RabbitDynamicConfigService.java
- RabbitDynamicConfigService.java 中包含了不同类型Exchange的创建、删除,Queue的创建和删除、绑定Exchange
复制代码- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.core.*;
- import org.springframework.amqp.rabbit.core.RabbitAdmin;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Service;
- import java.util.Map;
- /**
- * @className: RabbitDynamicConfigService
- * @program: chain
- * @description: 动态创建队列和交换机
- * @author: kenny
- * @create: 2024-10-03 23:49
- * @version: 1.0.0
- */
- @Slf4j
- @Service
- public class RabbitDynamicConfigService {
- /**
- * 为了解决循环依赖问题
- */
- private final RabbitAdmin rabbitAdmin;
- private final RabbitListenerService rabbitListenerService;
- @Autowired
- public RabbitDynamicConfigService(RabbitAdmin rabbitAdmin,
- RabbitListenerService rabbitListenerService) {
- this.rabbitAdmin = rabbitAdmin;
- this.rabbitListenerService = rabbitListenerService;
- }
- /**
- * 动态创建队列,并持久化
- *
- * @param queueName 队列名称
- */
- public void createQueue(String queueName) {
- // 队列持久化
- Queue queue = new Queue(queueName, true);
- // 创建队列
- rabbitAdmin.declareQueue(queue);
- System.out.println("队列创建成功: " + queueName);
- }
- /**
- * 动态创建队列,并持久化
- *
- * @param queueName 队列名称
- */
- public void createQueue(String queueName, Boolean isListener) {
- // 队列持久化
- Queue queue = new Queue(queueName, true);
- // 创建队列
- rabbitAdmin.declareQueue(queue);
- System.out.println("队列创建成功: " + queueName);
- if (!isListener) {
- return;
- }
- rabbitListenerService.createListener(queueName);
- }
- /**
- * 动态创建交换机,并持久化
- *
- * @param exchangeName 交换机名称
- */
- public void createExchange(String exchangeName) {
- // 交换机持久化
- DirectExchange exchange = new DirectExchange(exchangeName, true, false);
- rabbitAdmin.declareExchange(exchange);
- log.info("交换机创建成功: {}", exchangeName);
- }
- // 动态创建 Fanout 交换机
- public void createDirectExchange(String exchangeName) {
- DirectExchange fanoutExchange = new DirectExchange(exchangeName, true, false); // 持久化
- rabbitAdmin.declareExchange(fanoutExchange);
- log.info("Direct 交换机创建成功: {}", exchangeName);
- }
- // 动态创建 Fanout 交换机
- public void createFanoutExchange(String exchangeName) {
- FanoutExchange fanoutExchange = new FanoutExchange(exchangeName, true, false); // 持久化
- rabbitAdmin.declareExchange(fanoutExchange);
- log.info("Fanout 交换机创建成功: {}", exchangeName);
- }
- // 动态创建 Topic 交换机
- public void createTopicExchange(String exchangeName) {
- TopicExchange topicExchange = new TopicExchange(exchangeName, true, false); // 持久化
- rabbitAdmin.declareExchange(topicExchange);
- log.info("Topic 交换机创建成功: {}", exchangeName);
- }
- // 动态创建 Headers 交换机
- public void createHeadersExchange(String exchangeName) {
- HeadersExchange headersExchange = new HeadersExchange(exchangeName, true, false); // 持久化
- rabbitAdmin.declareExchange(headersExchange);
- log.info("Headers 交换机创建成功: {}", exchangeName);
- }
- /**
- * 动态绑定队列到交换机,并指定路由键
- *
- * @param queueName 队列名称
- * @param exchangeName 交换机名称
- * @param routingKey 路由键
- */
- public void bindQueueToExchange(String queueName, String exchangeName, String routingKey) {
- Queue queue = new Queue(queueName);
- DirectExchange exchange = new DirectExchange(exchangeName);
- Binding binding = BindingBuilder.bind(queue).to(exchange).with(routingKey);
- rabbitAdmin.declareBinding(binding);
- log.info("绑定创建成功: {}", queueName + " -> {}", exchangeName + " 使用路由键: {}", routingKey);
- }
- /**
- * 动态绑定队列到交换机,并指定路由键
- *
- * @param queueName 队列名称
- * @param exchangeName 交换机名称
- * @param routingKey 路由键
- */
- public void moreExchangeTypeBindQueueToExchange(String queueName, String exchangeType, String exchangeName, String routingKey, Map<String, Object> headers) {
- switch (exchangeType) {
- case "fanout" -> bindQueueToExchange(queueName, exchangeName, routingKey);
- case "direct" -> bindQueueToDirectExchange(queueName, exchangeName, routingKey);
- case "topic" -> bindQueueToTopicExchange(queueName, exchangeName, routingKey);
- case "headers" -> bindQueueToHeadersExchange(queueName, exchangeName, headers);
- default -> throw new IllegalArgumentException("不支持的交换机类型: " + exchangeType);
- }
- }
- /**
- * 动态绑定队列到交换机,并指定路由键(exchange: direct)
- *
- * @param queueName 队列名称
- * @param exchangeName 交换机名称
- */
- public void bindQueueToFanoutExchange(String queueName, String exchangeName) {
- Queue queue = new Queue(queueName);
- FanoutExchange exchange = new FanoutExchange(exchangeName);
- Binding binding = BindingBuilder.bind(queue).to(exchange);
- rabbitAdmin.declareBinding(binding);
- log.info("绑定创建成功: {}", queueName + " -> {}", exchangeName);
- }
- /**
- * 动态绑定队列到交换机,并指定路由键(exchange: direct)
- *
- * @param queueName 队列名称
- * @param exchangeName 交换机名称
- * @param routingKey 路由键
- */
- public void bindQueueToDirectExchange(String queueName, String exchangeName, String routingKey) {
- Queue queue = new Queue(queueName);
- DirectExchange exchange = new DirectExchange(exchangeName);
- Binding binding = BindingBuilder.bind(queue).to(exchange).with(routingKey);
- rabbitAdmin.declareBinding(binding);
- log.info("绑定创建成功: {}", queueName + " -> {}", exchangeName + " 使用路由键: {}", routingKey);
- }
- /**
- * 动态绑定队列到交换机,并指定路由键(exchange: topic)
- *
- * @param queueName 队列名称
- * @param exchangeName 交换机名称
- * @param routingKey 路由键
- */
- public void bindQueueToTopicExchange(String queueName, String exchangeName, String routingKey) {
- Queue queue = new Queue(queueName);
- TopicExchange exchange = new TopicExchange(exchangeName);
- Binding binding = BindingBuilder.bind(queue).to(exchange).with(routingKey);
- rabbitAdmin.declareBinding(binding);
- log.info("绑定创建成功: {}", queueName + " -> {}", exchangeName + " 使用路由键: {}", routingKey);
- }
- /**
- * 动态绑定队列到交换机,并指定路由键(exchange: headers)
- *
- * @param queueName 队列名称
- * @param exchangeName 交换机名称
- * @param headers 路由键
- */
- public void bindQueueToHeadersExchange(String queueName, String exchangeName, Map<String, Object> headers) {
- Queue queue = new Queue(queueName);
- HeadersExchange exchange = new HeadersExchange(exchangeName);
- Binding binding = BindingBuilder.bind(queue).to(exchange).whereAll(headers).match();
- rabbitAdmin.declareBinding(binding);
- log.info("队列 {}", queueName + " 已绑定到 Headers 交换机 {}", exchangeName + ",使用头部匹配规则: {}", headers);
- }
- /**
- * 动态删除队列
- *
- * @param queueName 队列名称
- */
- public void deleteQueue(String queueName) {
- rabbitAdmin.deleteQueue(queueName);
- log.info("队列删除成功: {}", queueName);
- }
- /**
- * 动态删除交换机
- *
- * @param exchangeName 交换机名称
- */
- public void deleteExchange(String exchangeName) {
- rabbitAdmin.deleteExchange(exchangeName);
- log.info("交换机删除成功: {}", exchangeName);
- }
- }
复制代码 3、RabbitMQ中队列的动态监听
RabbitListenerService.java
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
- import org.springframework.amqp.rabbit.connection.ConnectionFactory;
- import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
- import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Service;
- /**
- * @className: RabbitListenerService
- * @program: chain
- * @description: RabbitMQ监听器Service组件
- * @author: kenny
- * @create: 2024-10-04 01:40
- * @version: 1.0.0
- */
- @Slf4j
- @Service
- public class RabbitListenerService {
- // 为了解决循环依赖问题
- private final SimpleRabbitListenerContainerFactory listenerContainerFactory;
- private final ConnectionFactory connectionFactory;
- @Autowired
- public RabbitListenerService(
- SimpleRabbitListenerContainerFactory listenerContainerFactory,
- ConnectionFactory connectionFactory) {
- this.listenerContainerFactory = listenerContainerFactory;
- this.connectionFactory = connectionFactory;
- }
- /**
- * 创建监听器容器并启动监听
- *
- * @param queueName 队列名称
- */
- public void createListener(String queueName) {
- // 创建并启动监听器容器
- SimpleMessageListenerContainer container = listenerContainerFactory.createListenerContainer();
- container.setConnectionFactory(connectionFactory);
- container.setQueueNames(queueName);
- // 监听逻辑处理
- container.setMessageListener(new MessageListenerAdapter(new Object() {
- public void handleMessage(String message) {
- System.out.println("收到来自RabbitMQ中队列:" + queueName + " 队列的消息:" + message);
- }
- }));
- // 启动监听器容器
- container.start();
- System.out.println("RabbitMQ队列监听器已启动:" + queueName);
- }
- }
复制代码 4、RabbitMQ中的Exchange、Queue动态操作接口
RabbitDynamicChannelController.java
- import com.chain.air.rpp.exchange.rabbit.RabbitDynamicConfigService;
- import jakarta.annotation.Resource;
- import org.springframework.web.bind.annotation.GetMapping;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RequestParam;
- import org.springframework.web.bind.annotation.RestController;
- import java.util.HashMap;
- import java.util.Map;
- /**
- * @className: RabbitDynamicController
- * @program: chain
- * @description: RabbitMQ 动态创建队列、交换机,绑定等操作
- * @author: kenny
- * @create: 2024-10-04 00:22
- * @version: 1.0.0
- */
- @RestController
- @RequestMapping("/rabbit/dynamic/channel")
- public class RabbitDynamicChannelController {
- /**
- * 动态创建队列和交换机
- */
- @Resource
- private RabbitDynamicConfigService rabbitDynamicConfigService;
- /**
- * 动态创建队列
- *
- * @param queueName 队列名称
- * @return 处理结果
- */
- @GetMapping("/createQueue")
- public String createQueue(@RequestParam("queueName") String queueName) {
- rabbitDynamicConfigService.createQueue(queueName);
- return "队列已创建: " + queueName;
- }
- /**
- * 动态创建交换机
- *
- * @param exchangeName 交换机名称
- * @return 处理结果
- */
- @GetMapping("/createExchange")
- public String createExchange(@RequestParam("exchangeName") String exchangeName) {
- rabbitDynamicConfigService.createExchange(exchangeName);
- return "交换机已创建: " + exchangeName;
- }
- /**
- * 动态绑定队列和交换机
- *
- * @param queueName 队列名称
- * @param exchangeName 交换机名称
- * @param routingKey 路由键
- * @return 处理结果
- */
- @GetMapping("/bindQueue")
- public String bindQueueToExchange(@RequestParam("queueName") String queueName,
- @RequestParam("exchangeName") String exchangeName,
- @RequestParam("routingKey") String routingKey) {
- rabbitDynamicConfigService.bindQueueToExchange(queueName, exchangeName, routingKey);
- return "队列和交换机已绑定: " + queueName + " -> " + exchangeName;
- }
- /**
- * 动态删除队列
- *
- * @param queueName 队列名称
- * @return 处理结果
- */
- @GetMapping("/deleteQueue")
- public String deleteQueue(@RequestParam("queueName") String queueName) {
- rabbitDynamicConfigService.deleteQueue(queueName);
- return "队列已删除: " + queueName;
- }
- /**
- * 动态删除交换机
- *
- * @param exchangeName 交换机名称
- * @return 处理结果
- */
- @GetMapping("/deleteExchange")
- public String deleteExchange(@RequestParam("exchangeName") String exchangeName) {
- rabbitDynamicConfigService.deleteExchange(exchangeName);
- return "交换机已删除: " + exchangeName;
- }
- // 创建并绑定 Fanout 交换机
- @GetMapping("/createDirectExchange")
- public String createDirectExchange(@RequestParam String exchangeName, @RequestParam String queueName, @RequestParam String routingKey) {
- rabbitDynamicConfigService.createDirectExchange(exchangeName);
- rabbitDynamicConfigService.bindQueueToDirectExchange(queueName, exchangeName, routingKey);
- return "Fanout Exchange and Queue Binding created: " + exchangeName + " -> " + queueName + " with routing key: " + routingKey;
- }
- // 创建并绑定 Fanout 交换机
- @GetMapping("/createFanoutExchange")
- public String createFanoutExchange(@RequestParam String exchangeName, @RequestParam String queueName) {
- rabbitDynamicConfigService.createFanoutExchange(exchangeName);
- rabbitDynamicConfigService.bindQueueToFanoutExchange(queueName, exchangeName);
- return "Fanout Exchange and Queue Binding created: " + exchangeName + " -> " + queueName;
- }
- // 创建并绑定 Topic 交换机
- @GetMapping("/createTopicExchange")
- public String createTopicExchange(@RequestParam String exchangeName, @RequestParam String queueName, @RequestParam String routingKey) {
- rabbitDynamicConfigService.createTopicExchange(exchangeName);
- rabbitDynamicConfigService.bindQueueToTopicExchange(queueName, exchangeName, routingKey);
- return "Topic Exchange and Queue Binding created: " + exchangeName + " -> " + queueName + " with routing key: " + routingKey;
- }
- // 创建并绑定 Headers 交换机
- @GetMapping("/createHeadersExchange")
- public String createHeadersExchange(@RequestParam String exchangeName, @RequestParam String queueName, @RequestParam Map<String, String> headersMap) {
- Map<String, Object> headers = new HashMap<>(headersMap);
- rabbitDynamicConfigService.createHeadersExchange(exchangeName);
- rabbitDynamicConfigService.bindQueueToHeadersExchange(queueName, exchangeName, headers);
- return "Headers Exchange and Queue Binding created: " + exchangeName + " -> " + queueName + " with headers: " + headers;
- }
- }
复制代码 5、RabbitMQ中的Queue消息监听动态操作接口
RabbitChannelListenerController.java
- import com.chain.air.rpp.exchange.rabbit.RabbitDynamicConfigService;
- import jakarta.annotation.Resource;
- import org.springframework.web.bind.annotation.GetMapping;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RequestParam;
- import org.springframework.web.bind.annotation.RestController;
- /**
- * @className: RabbitListenerController
- * @program: chain
- * @description: RabbitMQ 监听器 Controller 组件
- * @author: kenny
- * @create: 2024-10-04 01:30
- * @version: 1.0.0
- */
- @RestController
- @RequestMapping("/rabbit/channel/listener")
- public class RabbitChannelListenerController {
- @Resource
- private RabbitDynamicConfigService rabbitDynamicConfigService;
- /**
- * 创建监听器,监听指定队列
- *
- * @param queueName 队列名称
- * @return 处理结果
- */
- @GetMapping("/queue")
- public String listenQueue(@RequestParam("queueName") String queueName) {
- rabbitDynamicConfigService.createQueue(queueName, true);
- return "开始监听队列:" + queueName;
- }
- }
复制代码 三、动态exchange、queue的测试
1、测试Exchange、Queue的动态创建和删除
2、测试Exchange和Queue的动态绑定
3、发送、接收消息测试动态创建Exchange、Queue
4、测试Queue的动态监听接口
下一篇:7、Spring Boot 3.x集成RabbitMQ动态实例等操作
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |