一:技术配景
假如我们有一个支付服务,支付服务的业务逻辑是:起首支付扣减余额,更新支付单状态,更新订单状态,发短信,给这个用户增长积分。在这个场景下,如果我们利用同步调用通信,那么调用支付这个接口是不是得比及给这个用户增长积分的业务执行完后,这个调用链条依次返回到支付接口时,支付接口的业务才算完成,可见这个情况显着很不公道。起首支付接口等待时间太长了,我支付业务的核心逻辑做完后,还要分身着非核心的业务,那如果有一个非核心业务down掉了,或者因为网络题目或者硬件题目或者代码题目使这个接口不能响应或者响应时间很长,那么下面的业务也会做不了,这条线程就出了大题目。堆积在tomcat内,长期占用tomcat的资源,再有用户下单,照旧一样的占用tomcat的资源,现在一个服务失败了,业务中远程调用的接口及其的多,依次类推,就会造成服务级联失败,雪崩现象产生。假如我们来做限流,或者熔断,那肯定不行啊,这可是支付业务,非常告急,如果熔断,那么业务不可以,就可能造成数据不同等性等题目,这是非常严肃的。
以是同步通信在有些场景下使不适用的,我们应该把业务的核心逻辑来做同步通信,(也就是,我们必须要拿到业务处置惩罚完成返回的响应,从而必须做下一步的操纵的这种场景下),在同步通信下,可做限流熔断等服务的掩护操纵;而非核心业务我们就用异步通信来做。非常常见的秒杀场景,它是一种流量激增的业务,激增的这段时间,qps及其高,如果采用同步调用,结果可想而知,轻则响应时间非常慢,用户体验差,重则服务器直接瓦解。异步调用就相当于把一个很长的链条拆开,留其告急的部分做同步调用,不太告急的做异步调用,就类似于把山峰削开,用来填山谷,让它变得平整。
二:安装部署
官网:https://www.rabbitmq.com/tutorials/tutorial-one-java
基于docker安装(条件你已经在官网下载好了rabbitmq)
2-1:加载rabbitmq.tar
2-2: 运行rabbitmq
- docker run \
- -e RABBITMQ_DEFAULT_USER=rabbitmq \
- -e RABBITMQ_DEFAULT_PASS=rabbitmq \
- -v mq-plugins:/plugins \
- --name mq \
- --hostname mq \
- -p 15672:15672 \
- -p 5672:5672 \
- --network zzb-network\
- -d \
- rabbitmq:3.8-management
复制代码 三:初识RabbltMQ
整体架构:
- virtual-host:假造主机,起到数据隔离的作用
- publisher:消息发送者
- consumer:消息的消费者
- queue:队列,存储消息
- exchange:互换机,负责路由消息
注意:互换机只能路由消息,无法存储消息;互换机只会路由消息给其绑定的队列,因此队列必须与互换机绑定
2-3 Java简单示例 (基于AMQP协议)
引入amqp-starter依靠
- <!--AMQP依赖,包含RabbitMQ-->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
复制代码 设置MQ地址,在发送者服务和吸收者服务的application.yml中添加设置(最好设置在nacos中,方便管理):
- spring:
- rabbitmq:
- host: 192.168.203.130 # 你的虚拟机IP
- port: 5672 # 端口
- virtual-host: /hellomq # 虚拟主机
- username: zzb # 用户名
- password: 123456 # 密码
复制代码 发送者:
- @Autowired
- private RabbitTemplate rabbitTemplate;
- @Test
- public void send(){
- //队列名称
- String queue = "simple.queue";
- //消息
- String message = "hello RabbitMQ";
- //发送消息
- rabbitTemplate.convertAndSend(queue, message);
- }
复制代码 接受者
- // 把接受消息的类的对象放进ioc容器
- @Component
- @Slf4j
- public class Listener {
-
- @RabbitListener(queues = "simple.queue")
- public void receive(String message) {
- log.info("接收到消息:{}", message);
- }
- }
复制代码 三:SpringAMQP
3-1:Work queues,任务模型:
Work queues,就是多个消费者监听同一个队列。
在实际开发中,并不是在一个服务里编写多个消费者监听同一个队列,而是部署多个实例,形成集群,监听那个队列。
1:Work queues的特点
队列中的一条消息只会被一个消费者监听执行,如果有其他的消费者,是拿不到这个消息的;基于这个特点,如果有其他消费者,其他消费者就会监听执行这个队列后面的消息,以是对于一个队列中全部的消息,多消费者是可以更快的处置惩罚这些消息,以是加消费者是在高并发场景下一个不错的处置惩罚方案。
我们要知道,部署多个实例,每个呆板的性能可能有好有坏,如果有台呆板性能比较差一些,那它处置惩罚消息的速率肯定比不外其他的消费者,那队列怎样分配消息给消费者呢?答案默认是轮询,每个消费者处置惩罚一个消息。显然这个方案和实际情况背道而驰,以是我们必要设置,让每一个消费者只能处置惩罚一条消息,等当前消息处置惩罚完毕后,才能获取下一条消息。这样完美解决了消息积蓄题目。在application.yaml中设置:
- spring:
- rabbitmq:
- listener:
- simple:
- prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
复制代码 3-2:互换机
互换机的作用重要是接受发送者发送的消息,并将消息路由到预期绑定的队列,常见的互换机类型有三种:
3-2-1:Fauout:广播rabbitTemplate.convertAndSend(x,x,x)三个参数
每个监听不同队列的消费者都可以接受到来自同一个互换机转发来的消息。
3-2-2:Direct:定向
Direct Exchange会将接受到的消息根据规则路由到指定的queue,因此称为定向路由
- 每一个queue都会erexchange设置一个BindingKey
- 发布者发送消息时,指定消息的RoutingKey
- Exchange将消息路由到BindingKey与消息RoutingKey同等的队列
3-2-3:Topic:话题
TopicExchange也是基于RoutingKey做消息路由,然是routingKey通常是多个单词的组合,并且以 . 分割,比如 user.# item.# #.cn
BindingKey可以利用通配符:#代表0个或者多个单词,*代表一个单词
3-3 声明队列互换机
1. 基于Bean声明队列互换机
SpringAMQP提供了几个类,用来声明队列、互换机及其绑定关系:
- Queue:用于声明队别,可以用工厂类QueueBuilder构建
- Exchange:用于声明互换机,可以用工厂类ExchangeBuilder构建
- Binding:用于声明队列和互换机的绑定关系,可以用工厂类BindingBuilder构建
fanout示例:
- package com.itheima.consumer.config;
- import org.springframework.amqp.core.Binding;
- import org.springframework.amqp.core.BindingBuilder;
- import org.springframework.amqp.core.FanoutExchange;
- import org.springframework.amqp.core.Queue;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- @Configuration
- public class FanoutConfig {
- /**
- * 声明交换机
- * @return Fanout类型交换机
- */
- @Bean
- public FanoutExchange fanoutExchange(){
- return new FanoutExchange("a.fanout");
- }
- /**
- * 第1个队列
- */
- @Bean
- public Queue fanoutQueue1(){
- return new Queue("fanout.queue1");
- }
- /**
- * 绑定队列和交换机
- */
- @Bean
- public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
- return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
- }
- /**
- * 第2个队列
- */
- @Bean
- public Queue fanoutQueue2(){
- return new Queue("fanout.queue2");
- }
- /**
- * 绑定队列和交换机
- */
- @Bean
- public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
- return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
- }
- }
复制代码 direct示例:
- package com.itheima.consumer.config;
- import org.springframework.amqp.core.*;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- @Configuration
- public class DirectConfig {
- /**
- * 声明交换机
- * @return Direct类型交换机
- */
- @Bean
- public DirectExchange directExchange(){
- return ExchangeBuilder.directExchange("a.direct").build();
- }
- /**
- * 第1个队列
- */
- @Bean
- public Queue directQueue1(){
- return new Queue("direct.queue1");
- }
- /**
- * 绑定队列和交换机
- */
- @Bean
- public Binding bindingQueue1WithRed(Queue directQueue1, DirectExchange directExchange){
- return BindingBuilder.bind(directQueue1).to(directExchange).with("red");
- }
- /**
- * 绑定队列和交换机
- */
- @Bean
- public Binding bindingQueue1WithBlue(Queue directQueue1, DirectExchange directExchange){
- return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");
- }
- /**
- * 第2个队列
- */
- @Bean
- public Queue directQueue2(){
- return new Queue("direct.queue2");
- }
- /**
- * 绑定队列和交换机
- */
- @Bean
- public Binding bindingQueue2WithRed(Queue directQueue2, DirectExchange directExchange){
- return BindingBuilder.bind(directQueue2).to(directExchange).with("red");
- }
- /**
- * 绑定队列和交换机
- */
- @Bean
- public Binding bindingQueue2WithYellow(Queue directQueue2, DirectExchange directExchange){
- return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");
- }
- }
复制代码 可以看到,声明direct互换机与队列绑定关系的代码显得非常臃肿,以是一般我们利用注解来声明
- //在监听消息时,指定绑定关系
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "direct.queue1"),
- exchange = @Exchange(name = "a.direct", type = ExchangeTypes.DIRECT),
- key = {"red", "blue"}
- ))
- public void listenDirectQueue1(String msg){
- System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
- }
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "direct.queue2"),
- exchange = @Exchange(name = "a.direct", type = ExchangeTypes.DIRECT),
- key = {"red", "yellow"}
- ))
- public void listenDirectQueue2(String msg){
- System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
- }
复制代码 3-4 消息转换器
发送消息时,默认的把消息转换是jdk来做的,把对象序列化为字节举行传输,这种方式不推荐
发起采用别的消息转换器,如json序列化
步骤一:在publisher和consumer两个服务中都引入依靠:
- <dependency>
- <groupId>com.fasterxml.jackson.dataformat</groupId>
- <artifactId>jackson-dataformat-xml</artifactId>
- <version>2.9.10</version>
- </dependency>
复制代码 注意,如果项目中引入了spring-boot-starter-web依靠,则无需再次引入Jackson依靠。
步骤二:设置消息转换器,在publisher和consumer两个服务的启动类中添加一个Bean即可:
- @Bean
- public MessageConverter messageConverter(){
- // 1.定义消息转换器
- Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
- // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
- jackson2JsonMessageConverter.setCreateMessageIds(true);
- return jackson2JsonMessageConverter;
- }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |