RabbitMQ实现消息发送接收——实战篇(路由模式)

打印 上一主题 下一主题

主题 949|帖子 949|积分 2857

本篇博文将带领大家一起学习rabbitMQ如何进行消息发送接收,我也是在写项目的时间边学边写,有不足的地方希望在批评区留下你的发起,我们一起讨论学习呀~
需求配景

先说一下我的项目需求配景,社区之间可以进行物资借用,当有社区提交物资借用申请时,必要通过RabbitMQ将这条消息发送到被借用物资的社区,同时在界面进行提示。
先把依赖引入一下
  1. <dependency>
  2.             <groupId>org.springframework.boot</groupId>
  3.             <artifactId>spring-boot-starter-amqp</artifactId>
  4.         </dependency>
复制代码
application.yml做好设置:
  1. spring:
  2.   rabbitmq:
  3.     host: localhost
  4.     port: 5672
  5.     username: guest
  6.     password: guest
复制代码
工具类实现

先选择以何种方式进行消息发送,这里根据需求我选择使用RabbitMQ的路由模式进行消息发送,先来设置一下相应工具类:
先设置RabbitMQ的设置类
  1. /**
  2. * @Title: RabbitMQConfig
  3. * @Author yinan
  4. * @Package com.yinan.config.RabbitConfig
  5. * @Date 2024/12/13 13:58
  6. * @description: RabbitMQ配置类
  7. */
  8. @Configuration
  9. public class RabbitMQConfig {
  10.     @Bean
  11.     public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
  12.         return new RabbitAdmin(connectionFactory);
  13.     }
  14. //    声明一个交换机
  15.     @Bean
  16.     public DirectExchange borrowMaterialExchange(){
  17.         return new DirectExchange("borrow_material_exchange");
  18.     }
  19. //    动态绑定队列时使用的方法(具体绑定逻辑在下面的监听器中实现)
  20. //    @Bean
  21. //    public Queue communityQueue(){
  22. //        return new Queue("communityQueue");
  23. //    }
  24.     @Bean
  25.     public Jackson2JsonMessageConverter messageConverter() {
  26.         return new Jackson2JsonMessageConverter();
  27.     }
  28.     @Bean
  29.     public AmqpTemplate amqpTemplate(ConnectionFactory connectionFactory) {
  30.         RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
  31.         rabbitTemplate.setMessageConverter(messageConverter());
  32.         return rabbitTemplate;
  33.     }
  34. }
复制代码
  为了确保消息发送和接收时都以 JSON 格式处理,可以在 Spring 设置中添加 Jackson2JsonMessageConverter。这样,发送端会将 MaterialBorrowing 对象序列化为 JSON,接收端会主动将 JSON 反序列化回 MaterialBorrowing 对象。 
  绑定交换机和对应队列
  1. /**
  2. * @Title: RabbitMQBindRoutingConfig
  3. * @Author yinan
  4. * @Package com.yinan.config.RabbitConfig
  5. * @Date 2024/12/13 14:21
  6. * @description:  动态绑定路由配置
  7. */
  8. @Component
  9. @Slf4j
  10. public class RabbitMQBindRoutingConfig {
  11.     @Autowired
  12.     private DirectExchange borrowMaterialExchange;
  13.     @Autowired
  14.     private RabbitAdmin rabbitAdmin;
  15.     /**
  16.      * 以社区ID为路由键,为指定社区动态创建队列并绑定到交换机
  17.      * @param communityId 社区ID
  18.      */
  19.     public void bindRouting(String communityId){
  20. //        创建队列
  21.         Queue queue = new Queue("queue_" + communityId);
  22. //        动态绑定交换机和指定队列
  23.         Binding binding = BindingBuilder.bind(queue)
  24.                 .to(borrowMaterialExchange)
  25.                 .with(communityId);
  26.         rabbitAdmin.declareExchange(borrowMaterialExchange);
  27.         rabbitAdmin.declareBinding(binding);
  28.         log.info("队列绑定成功,社区ID----》" + communityId + ",队列名称----》" + queue.getName() + ",交换机名称----》" + borrowMaterialExchange.getName());
  29.     }
  30. }
复制代码
动态声明队列
  1. @Configuration
  2. @Slf4j
  3. public class QueueDeclareConfig {
  4.     @Autowired
  5.     private RabbitAdmin rabbitAdmin;
  6.     public void dynamicDeclareQueue(String communityId){
  7.         String queueName = String.format("queue_%s",communityId);
  8.         Queue queue = new Queue(queueName,true);
  9.         rabbitAdmin.declareQueue(queue);
  10.         log.info("队列声明成功");
  11.     }
  12. }
复制代码
  在创建声明队列的时间,我们希望的是根据我们的规则在调用接口的时间去创建指定名称的队列,所以可以使用动态声明对列而不是直接在平台上进行设置。
  消息发送
  1. @Component
  2. @Slf4j
  3. public class MessageSendConfig {
  4.     @Autowired
  5.     private AmqpTemplate amqpTemplate;
  6.     public void sendMessage(Object message,String communityId){
  7.         System.out.println("发送消息:" + message);
  8.         amqpTemplate.convertAndSend("borrow_material_exchange",communityId, message);
  9.         log.info("发送消息成功------->"+message);
  10.     }
  11. }
复制代码
消息接收(动态声明与监听联合),这里你可以先思索一下为什么要用这种方式实现消息接收,而不是使用@RabbitListener去动态获取某个队列接收消息。
  1. /**
  2. * @Title: MessageRecieveConfig
  3. * @Author yinan
  4. * @Package com.yinan.config.RabbitConfig
  5. * @Date 2024/12/13 12:53
  6. * @description: 动态监听接收消息
  7. */
  8. @Service
  9. @Slf4j
  10. public class MessageRecieveConfig<T> {
  11.     private final ConnectionFactory connectionFactory;
  12.     public MessageRecieveConfig(ConnectionFactory connection) {
  13.         this.connectionFactory = connection;
  14.     }
  15.     public void recieveMessage(String communityId,Class<T> objectType){
  16.         String queueName = String.format("queue_%s",communityId);
  17. //        创建监听容器
  18.         SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
  19.         container.setQueueNames(queueName);
  20. //        处理消息消费逻辑
  21.         container.setMessageListener(message -> {
  22.             try {
  23.                 // 将字节数组转换为字符串
  24.                 String messageBody = new String(message.getBody(), StandardCharsets.UTF_8);
  25.                 System.out.println("接收到的消息:" + messageBody);
  26.                 // 如果需要将消息解析为对象(例如 MaterialBorrowing)
  27.                 ObjectMapper objectMapper = new ObjectMapper();
  28.                 T result = objectMapper.readValue(messageBody, objectType);
  29.                 System.out.println("反序列化后的消息:" + result);
  30.             } catch (Exception e) {
  31.                 log.error("处理消息时发生错误:", e);
  32.             }
  33.         });
  34.         // 确保自动确认
  35.         container.setAcknowledgeMode(AcknowledgeMode.AUTO);
  36.         container.start();
  37.         log.info("动态监听已启动,监听队列------->"+queueName);
  38.     }
  39. }
复制代码
在你的项目中分别调用就行了,必要注意的是你必须确保在消息发送的时间你的队列已经创建完成且和对应交换机进行了绑定,不然大概会导致消息发送失败。

ok,我们启动项目

你会发现你的项目根本启动不起来,原因是由于对于 Spring AMQP 的监听器来说,必须确保监听的队列已经存在于 RabbitMQ 中,否则会抛出雷同 DeclarationException 的错误。
所以我们思量可以通过动态声明队列,在程序运行时确保 RabbitMQ 上创建好所需的队列。
动态声明队列的含义
动态声明队列是指程序在运行时,通过代码检查或创建 RabbitMQ 中尚不存在的队列,而不是手动预先设置好全队伍列。这种方式可以主动帮你在 RabbitMQ 中创建所需的队列,而无需手动操纵。
这里说一下为什么必要动态绑定队列而不直接使用@RabbitListener?
为什么使用 SimpleMessageListenerContainer 动态绑定队列



  • SimpleMessageListenerContainer 不必要在项目启动时绑定队列。你可以在用户调用接口时动态创建队列,并动态监听它。
  • 特点

    • 队列在用户调用接口时才会被动态创建(通过 RabbitAdmin 或其他机制)。
    • 动态创建队列和监听时,项目启动时不会实验绑定不存在的队列,因此不会报错。

  • 适用场景:非常恰当动态队列需求,比如队列名依赖用户输入或业务逻辑,且不想在项目启动时绑定固定的队列。
   使用 @RabbitListener 的情况

  @RabbitListener 会在项目启动时绑定到指定的队列。
  

  • 要求:如果绑定的队列在 RabbitMQ 中不存在,项目启动时就会抛出异常,雷同 DeclarationException,这也就是上面为什么会报错的原因。
  • 解决办法

    • 提前创建队列:在 RabbitMQ 中手动创建队列,或通过 RabbitAdmin 在项目启动时主动创建队列。
    • 动态队列名:如果队列名是动态的,可以联合 SpEL 表达式,但队列仍然必要在项目启动时确保存在。

  SpEL 表达式

  如果你的需求中已经确定队列已经创建好的,但是必要动态去获取队列,可以使用如下情势:
  @RabbitListener(queues = "#{T(java.lang.String).format('queue_%s', 'borrowedCommunityId')}")
  这个表达式 是 Spring AMQP 中用于动态指定队列名称的 SpEL 表达式(Spring Expression Language),它的作用就是会动态天生一个队列名称,基于你传入的参数构造队列名。
  详解

  1. 关键部门剖析

  

  • T(java.lang.String)

    • T 是 SpEL 用于引用 Java 类 的方式。
    • java.lang.String 是目的 Java 类,表明你可以调用 String 类的静态方法。

  • .format()

    • String.format() 是 Java 中的静态方法,用于格式化字符串。
    • 格式化字符串的格式是 'queue_%s',%s 是占位符,用于拼接动态内容。

  • 'queue_%s'

    • 这是格式化字符串的模板。%s 表示字符串占位符。

  • 动态参数(例如 borrowedCommunityId)

    • 它会替换 %s,天生队列名。例如,当 borrowedCommunityId 的值是 123 时,结果是:queue_123。

  2. 具体实例

  假设 borrowedCommunityId = "123":
  1. String result = String.format("queue_%s", "123");
  2. System.out.println(result); // 输出:queue_123
复制代码
在 SpEL 中,这等同于:
  1. queues = "#{T(java.lang.String).format('queue_%s', '123')}"
复制代码
这会动态天生队列名称为 queue_123。
  
  为什么用 SpEL?

  Spring AMQP 的 @RabbitListener 注解中,queues 参数支持 SpEL 表达式。这使得我们可以动态决定要监听的队列,而不是写死某个固定的队列名称。
  
  实际应用场景

  就比如在我的代码中,大概有多个社区队列,例如:
  

  • queue_123(社区 ID 为 123 的队列)
  • queue_456(社区 ID 为 456 的队列)
  使用 queues = "#{T(java.lang.String).format('queue_%s', borrowedCommunityId)}",可以动态天生差异社区的队列名称,从而实现按社区路由的功能。
  启动项目之后,调用接口就可以发送消息了

但是你会发现消息消费的逻辑并没有在控制台中打印出来,这个时间你就要思量是不是以下几个问题了:
交换机和队列是否已经绑定乐成(可以在平台上进行查察)

是否绑定到了对应的交换机:amqpTemplate.convertAndSend("borrow_material_exchange",communityId, message);赤色部门指定交换机名称,如果不指定,那么就会使用默认的交换机,所以肯定也是接收不到值的。
固然,另有其他大概,如果你的项目中遇到了,可以在批评区留言,我们一起学习~
末了,重新修改代码调用接口,就可以接收到消息了

对于在界面进行消息提示的功能,这里先不写出来了,我会在后面的博客中进行更新~
【都看到这了,点赞加关注,收藏不迷路呀~】
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

宁睿

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表