SpringCloud系列教程:微服务的未来(二十五)-基于注解的声明队列交换机、 ...

打印 上一主题 下一主题

主题 892|帖子 892|积分 2676

媒介

在现代分布式系统中,消息队列是实现服务解耦和异步处理的关键组件。Spring框架提供了强盛的支持,使得与消息队列(如RabbitMQ、Kafka等)的集成变得更加便捷和灵活。本文将深入探讨如何利用Spring的注解驱动方式来设置和管理队列、交换机、消息转换器等组件,从而实现一个高效且可扩展的消息处理架构。
在本博客中,我们将重点先容:
如何使用Spring的注解方式设置RabbitMQ的队列和交换机。
如何设置消息转换器(如Jackson2JsonMessageConverter)来处理不同格式的消息。
如何根据业务需求对现有代码进行改造,将消息队列引入到系统中,从而实现消息的异步处理与解耦。
通过这篇文章,您将了解如何使用Spring框架的注解设置简化消息队列的管理,同时提升系统的可扩展性和维护性。

基于注解的声明队列交换机

利用SpringAMQP声明DirectExchange并与队列绑定
需求如下:

  • 在consumer服务中,声明队列direct.queue1和direct.queue2
  • 在consumer服务中,声明交换机hmall.direct,将两个队列与其绑定
  • 在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2

基于Bean声明队列和交换机代码如下:
  1. package com.itheima.consumer.config;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. @Configuration
  6. public class DirectConfiguration {
  7.     @Bean
  8.     public DirectExchange directExchange(){
  9.         return new DirectExchange("hmall.direct")
  10.     }
  11.     @Bean
  12.     public Queue directQueue1(){
  13.         return new Queue("direct.queuue1");
  14.     }
  15.     @Bean
  16.     public Binding directQueue1bindingRed( Queue directQueue1, DirectExchange directExchange ){
  17.         return BindingBuilder.bind(directQueue1).to(directExchange).with("red");
  18.     }
  19.     @Bean
  20.     public Binding directQueue1bindingBlue( Queue directQueue1, DirectExchange directExchange ){
  21.         return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");
  22.     }
  23.     @Bean
  24.     public Queue directQueue2(){
  25.         return new Queue("direct.queuue2");
  26.     }
  27.     @Bean
  28.     public Binding directQueue2bindingRed( Queue directQueue2, DirectExchange directExchange){
  29.         return BindingBuilder.bind(directQueue2).to(directExchange).with("red");
  30.     }
  31.     @Bean
  32.     public Binding directQueue2bindingYellow( Queue directQueue2, DirectExchange directExchange){
  33.         return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");
  34.     }
  35. }
复制代码
SpringAMOP还提供了基于@RabbitListener注解来声明队列和交换机的方式
  1. @RabbitListener(bindings =@QueueBinding(
  2.         value = @Queue(name =direct.queue1),
  3.         exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT),
  4.         key = {"red","blue"}
  5. ))
  6. public void listenDirectQueuel(string msg){
  7.         System.out.println("消费者1接收到Direct消息:【+msg+"】");
  8. }
复制代码
吸收者代码如下:
  1.         @RabbitListener(bindings = @QueueBinding(
  2.             value = @Queue(name = "direct.queue1",durable = "true"),
  3.             exchange = @Exchange(name = "hmall.direct",type = ExchangeTypes.DIRECT),
  4.             key = {"red","blue"}
  5.     ))
  6.     public void listenDirectQueue1(String message)throws Exception {
  7.         log.info("消费者1监听到direct.queue2的消息,["+message+"]");
  8.     }
  9.     @RabbitListener(bindings = @QueueBinding(
  10.             value = @Queue(name = "direct.queue2",durable = "true"),
  11.             exchange = @Exchange(name = "hmall.direct",type = ExchangeTypes.DIRECT),
  12.             key = {"red","yellow"}
  13.     ))
复制代码
消息转换器

消息转换器
需求:测试利用SpringAMQP发送对象类型的消息


  • 声明一个队列,名为object.queue
  • 编写单元测试,向队列中直接发送一条消息,消息类型为Map
  • 在控制台查看消息,总结你能发现的问题
  1. // 准备消息
  2. Map<String,0bject>msg = new HashMap<>();
  3. msg.put("name","Jack");
  4. msg.put("age",21);
复制代码
创建队列object.queue

测试代码如下:
  1.         @Test
  2.     public void TestSendObject(){
  3.         Map<String, Object> msg = new HashMap<>();
  4.         msg.put("name", "Jack");
  5.         msg.put("age", 18);
  6.         //3.发送消息 参数分别是:交换机名称、RoutingKey(暂时为空)、消息
  7.         rabbitTemplate.convertAndSend("object.queue",msg);
  8.     }
复制代码
在控制台上找到object.queue中得到消息

Spring的对消息对象的处理是由org.springframework.amgp.support.converter.MessageConverter来处理的。而默认实现是SimpleMessageConverter,基于IDK的ObjectOutputStream完成序列化。存在下列问题:


  • JDK的序列化有安全风险
  • JDK序列化的消息太大
  • JDK序列化的消息可读性差
发起采用JSON序列化代替默认的JDK序列化,要做两件事情:
在publisher和consumer中都要引入jackson依赖:
  1. <dependency>
  2.         <groupId>com.fasterxml.jackson.core</groupId>
  3.         <artifactId>jackson-databind</artifactId>
  4. </dependency>
复制代码
在publisher和consumer中都要设置Messageconverter:
  1. @Bean
  2. public MessageConverter messageConverter(){
  3.         return new Jackson2JsonMessageConverter();
  4. }
复制代码


消费者代码:
  1.         @RabbitListener(queues = "object.queue")
  2.     public void listenObjectQueue(Map<String,Object> msg)throws Exception {
  3.         log.info("消费者监听到pbject.queue的消息,["+msg+"]");
  4.     }
复制代码

运行效果如下:

业务改造

需求:改造余额支付功能,不再同步调用交易服务的0penFeign接口,而是采用异步MO通知交易服务更新订单状态。

在trade-service微服务消费者设置和pay-service微服务发送者都设置MQ依赖
  1.         <!--消息发送-->
  2.   <dependency>
  3.       <groupId>org.springframework.boot</groupId>
  4.       <artifactId>spring-boot-starter-amqp</artifactId>
  5.   </dependency>
复制代码
在trade-service微服务和pay-service微服务添加上RabbitMQ设置信息
  1. spring:
  2.   rabbitmq:
  3.     host: 192.168.244.136
  4.     port: 5672
  5.     virtual-host: /hmall
  6.     username: hmall
  7.     password: 1234
复制代码
因为消费者和发送者都需要消息转换器,故直接将代码写到hm-common服务中,在config包中创建MqConfig类
  1. package com.hmall.common.config;
  2. import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
  3. import org.springframework.amqp.support.converter.MessageConverter;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.context.annotation.Configuration;
  6. @Configuration
  7. public class MqConfig {
  8.     @Bean
  9.     public MessageConverter messageConverter() {
  10.         return new Jackson2JsonMessageConverter();
  11.     }
  12. }
复制代码
同时trade-service微服务和pay-service微服务是无法主动扫描到该类,采用SpringBoot主动装配的原理,在resource文件夹下的META-INF文件夹下的spring.factories文件中添加类路径:

在吸收者trade-service微服务中创建PayStatusListener
  1. package com.hmall.trade.listener;
  2. import com.hmall.trade.service.IOrderService;
  3. import lombok.RequiredArgsConstructor;
  4. import org.springframework.amqp.rabbit.annotation.Exchange;
  5. import org.springframework.amqp.rabbit.annotation.Queue;
  6. import org.springframework.amqp.rabbit.annotation.QueueBinding;
  7. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  8. import org.springframework.stereotype.Component;
  9. @Component
  10. @RequiredArgsConstructor
  11. public class PayStatusListener {
  12.     private final IOrderService orderService;
  13.     @RabbitListener(bindings = @QueueBinding(
  14.             value = @Queue("trade.pay.success.queue"),
  15.             exchange = @Exchange(value = "pay.direct"),
  16.             key = "pay.success"
  17.     ))
  18.     public void ListenPaySuccess(Long orderId) {
  19.         orderService.markOrderPaySuccess(orderId);
  20.     }
  21. }
复制代码
修改pay-service服务下的com.hmall.pay.service.impl.PayOrderServiceImpl类中的tryPayOrderByBalance方法:
  1. @Service
  2. @RequiredArgsConstructor
  3. @Slf4j
  4. public class PayOrderServiceImpl extends ServiceImpl<PayOrderMapper, PayOrder> implements IPayOrderService {
  5.         private final RabbitTemplate rabbitTemplate;
  6.         ...
  7.         @Override
  8.         @Transactional
  9.         public void tryPayOrderByBalance(PayOrderDTO payOrderDTO) {
  10.             // 1.查询支付单
  11.                    PayOrder po = getById(payOrderDTO.getId());
  12.                    // 2.判断状态
  13.             if(!PayStatus.WAIT_BUYER_PAY.equalsValue(po.getStatus())){
  14.                 // 订单不是未支付,状态异常
  15.                 throw new BizIllegalException("交易已支付或关闭!");
  16.             }
  17.             // 3.尝试扣减余额
  18.             userClient.deductMoney(payOrderDTO.getPw(), po.getAmount());
  19.             // 4.修改支付单状态
  20.             boolean success = markPayOrderSuccess(payOrderDTO.getId(), LocalDateTime.now());
  21.             if (!success) {
  22.                 throw new BizIllegalException("交易已支付或关闭!");
  23.             }
  24.             // 5.修改订单状态
  25.             // tradeClient.markOrderPaySuccess(po.getBizOrderNo());
  26.             try {
  27.                 rabbitTemplate.convertAndSend("pay.direct", "pay.success", po.getBizOrderNo());
  28.             } catch (Exception e) {
  29.                 log.error("支付成功的消息发送失败,支付单id:{}, 交易单id:{}", po.getId(), po.getBizOrderNo(), e);
  30.             }
  31.         }
  32. }
复制代码



总结

本文先容了基于Spring框架的注解方式来设置消息队列、交换机以及消息转换器的实现方法。通过注解设置,开发者可以更轻松地创建和管理RabbitMQ等消息队列的组件,而无需过多的 XML 设置或繁琐的手动设置。具体来说,我们探讨了如何:
使用 @RabbitListener 和 @EnableRabbit 注解设置消息监听器和消息队列。
设置消息转换器,特别是如何通过 Jackson2JsonMessageConverter 将消息转换为JSON格式,从而实现数据的序列化与反序列化。
结合业务需求,解说如何对现有系统进行改造,集成消息队列,实现异步处理和服务解耦。
通过这些设置和改造,系统的消息处理本领得到了增强,性能和可扩展性也得到了明显提升。消息队列的使用不但可以或许淘汰服务之间的紧耦合,还可以或许通过异步方式提高系统的相应速率和吞吐量。
希望本博客可以或许资助您明确Spring在消息队列方面的强盛功能,并为您的业务应用提供参考。随着系统复杂度的增长,公道的使用消息队列将成为构建高可用、高性能系统的关键之一。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

圆咕噜咕噜

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表