ToB企服应用市场:ToB评测及商务社交产业平台

标题: 【RabbitMQ】利用手册 [打印本页]

作者: 王國慶    时间: 2024-8-10 11:24
标题: 【RabbitMQ】利用手册
RabbitMQ

同步调用

长处:时效性强,期待到结果后才返回
缺点:扩展性差、性能下降(调用链越长耗时越久)、级联失败问题(一个调用点卡住,后面的链路都不能实行)
异步调用

异步调用通常是基于消息通知的方式,包含三个脚色:
消息发送者:投递消息的人,就是原来的调用者
消息吸收者:吸收和处置惩罚消息的人,就是原来的服务提供者
消息署理:管理、暂存、转发消息,你可以把它明确成微佩服务器

长处:
耦合度低,拓展性强
异步调用,无需期待,性能好
故障隔离,下游服务故障不影响上游业务
缓存消息,流量削峰填谷
缺点:
不能立即得到调用结果,时效性差
不确定下游业务实行是否乐成
业务安全依靠于Broker(消息署理)的可靠性
MQ技术选型


RabbitMQ安装

常见问题:
RabbitMQ介绍



Work Queues(任务模子)

任务模子简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息。同一个消息只会被一个消费者处置惩罚。多个消费者绑定到一个队列,可以加快消息处置惩罚速度。

默认情况下,RabbitMQ的会将消息依次轮询投递给绑定在队列上的每一个消费者。但这并没有考虑到消费者是否已经处置惩罚完消息,可能出现消息堆积。因此我们需要修改消费者的application.yml,设置preFetch值为1,确保同一时候最多投递给消费者1条消息,消费者处置惩罚完后再投递下一条消息。

Fanout互换机

Fanout Exchange 会将吸收到的消息路由到每一个跟其绑定的queue,所以也叫广播模式。 Fanout互换机会将收到的消息复制成n份,然后将消息发送到n个与其绑定的队列中。

应用场景:用户付出乐成后,生意业务服务更新订单状态,短佩服务通知用户,积分服务为用户增长积分。
实现:生意业务服务的queue、短佩服务的queue、积分服务的queue都绑定到Fanout互换机,用户付出乐成后,付出服务将消息发送到Fanout互换机,这样生意业务服务、短佩服务、积分服务九都能收到这条消息了。
案例演示
实现思路:
代码实现:

发送者:
  1. @SpringBootTest
  2. class SpringAmqpTest {
  3.     @Autowired
  4.     private RabbitTemplate rabbitTemplate;
  5.     @Test
  6.     public void testSimpleQueue(){
  7.         String exchangeName="hmall.fanout";
  8.         String message="hello everyone";                                                    rabbitTemplate.convertAndSend(exchangeName,null,message);
  9.     }
  10. }
复制代码
消费者:
  1. @Component
  2. @Slf4j
  3. public class SpringRabbitListener {
  4.     @RabbitListener(queues = "fanout.queue1")
  5.     public void listenerWorkQueue1(String message){
  6.         log.info("消费者1接收到消息:{},---{}",message, LocalTime.now());
  7.     }
  8.     @RabbitListener(queues = "fanout.queue2")
  9.     public void listenerWorkQueue2(String message){
  10.         log.info("消费者2接收到消息:{},---{}",message,LocalTime.now());
  11.     }
  12. }
复制代码
消费者输出:

Direct互换机

Direct Exchange 会将吸收到的消息根据规则路由到指定的Queue,因此称为定向路由。


应用场景:用户取消后,只需要给生意业务服务发送消息,通知生意业务服务更新订单状态,而不需要给短佩服务和积分服务发送消息。
案例演示
实现思路:
代码实现:

消费者
  1. @Component
  2. @Slf4j
  3. public class SpringRabbitListener {
  4.     @RabbitListener(queues = "direct.queue1")
  5.     public void listenerWorkQueue1(String message){
  6.         log.info("消费者1接收到消息:{},---{}",message, LocalTime.now());
  7.     }
  8.     @RabbitListener(queues = "direct.queue2")
  9.     public void listenerWorkQueue2(String message){
  10.         log.info("消费者2接收到消息:{},---{}",message,LocalTime.now());
  11.     }
  12. }
复制代码
发送者
  1. @Autowired
  2. private RabbitTemplate rabbitTemplate;
  3. @Test
  4. public void testSimpleQueue(){
  5.   //交换机名称
  6.   String exchangeName="hmall.direct";
  7.   //消息
  8.   String message_blue="hello blue";
  9.   String message_yellow="hello yellow";
  10.   String message_red="hello red";
  11.   //发送消息
  12. rabbitTemplate.convertAndSend(exchangeName,"blue",message_blue);
  13. rabbitTemplate.convertAndSend(exchangeName,"yellow",message_yellow);
  14. rabbitTemplate.convertAndSend(exchangeName,"red",message_red);
  15. }
复制代码
消费者输出:

Topic互换机

TopicExchange也是基于routingkey做消息路由,但是routingkey通常是多个单词的组合,而且以.分割。
Queue与Exchange指定routingkey时可以利用通配符:


案例演示
实现思路:
代码实现:

消费者:
  1. @Component
  2. @Slf4j
  3. public class SpringRabbitListener {
  4.     @RabbitListener(queues = "topic.queue1")
  5.     public void listenerWorkQueue1(String message){
  6.         log.info("消费者1接收到消息:{},---{}",message, LocalTime.now());
  7.     }
  8.     @RabbitListener(queues = "topic.queue2")
  9.     public void listenerWorkQueue2(String message){
  10.         log.info("消费者2接收到消息:{},---{}",message,LocalTime.now());
  11.     }
  12. }
复制代码
发送者1:
  1.     @Autowired
  2.     private RabbitTemplate rabbitTemplate;
  3.     @Test
  4.     public void testSimpleQueue(){
  5.         //交换机名称
  6.         String exchangeName="hmall.topic";
  7.         //消息
  8.         String message="中国新闻";
  9.         //发送消息
  10.         rabbitTemplate.convertAndSend(exchangeName,"china.news",message);
  11.     }
复制代码
消费者输出:

发送者2:
  1.     @Autowired
  2.     private RabbitTemplate rabbitTemplate;
  3.     @Test
  4.     public void testSimpleQueue(){
  5.         //交换机名称
  6.         String exchangeName="hmall.topic";
  7.         //消息
  8.         String message="中国天气";
  9.         //发送消息
  10.         rabbitTemplate.convertAndSend(exchangeName,"china.weather",message);
  11.     }
复制代码
消费者输出:

AMQP

Advanced Message Queuing Protocol,是用于在应用步伐之间传递业务消息的开放标准。该协议与语言和平台无关,更符合微服务中独立性的要求。
Spring AMQP是基于AMQP协议定义的一套API规范,提供了模板来发送和吸收消息。包含两部分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现。
AmqpTemplate和RabbitTemplate

AmqpTemplate 是一个接口,定义了基本的 AMQP 操纵,如发送消息、吸收消息、转换消息等。它提供了与 AMQP(包括 RabbitMQ)通讯的基本功能的抽象。
RabbitTemplate 是 AmqpTemplate 的默认实现类,专门用于与 RabbitMQ 举行交互。它实现了 AmqpTemplate 接口,并提供了更多与 RabbitMQ 交互的具体功能和配置选项。
RabbitTemplate 比 AmqpTemplate 更加丰富,提供了一些额外的高级特性和配置选项,如事件支持、消息确认机制、消息转换器等。这些功能可以更好地满意与 RabbitMQ 高级交互需求。
综上所述,AmqpTemplate 是一个通用的 AMQP 操纵接口,而 RabbitTemplate 是对其的具体实现,提供了更多与 RabbitMQ 交互的功能和默认配置,使得在 Spring 应用中利用 RabbitMQ 变得更加简单和方便。
RabbitMQ利用

背景可视化界面操纵


代码操纵

创建队列和互换机

SpringAMQP提供了几个类,用来声明队列、互换机及其绑定关系:


如果已经存在互换机、队列、绑定关系,运行代码时则不会举行创建,而且也不会报错。
通常发送者只需要关心消息发送,消费者关心队列、互换机、以及绑定关系,所以创建操纵一样平常写在消费者中。
Sping提供了基于java bean和基于@RabbitListener注解两种方式创建。

  1. package com.itheima.consumer.config;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. @Configuration
  6. public class FanoutConfiguration {
  7.   //声明交换机
  8.     @Bean
  9.     public FanoutExchange fanoutExchange(){
  10. //        方式1
  11. //        return new FanoutExchange("hmall.fanout");
  12. //        方式2
  13.         return ExchangeBuilder.fanoutExchange("hmall.fanout").build();
  14.     }
  15.   //声明队列
  16.     @Bean
  17.     public Queue fanoutQueue1(){
  18. //        方式1
  19. //        return new Queue("fanout.queue1",true);
  20. //        方式2
  21.         return QueueBuilder.durable("fanout.queue1").build();
  22.     }
  23.   //将队列和交换机绑定
  24.     @Bean
  25.     public Binding fanoutQueue1Binding(Queue fanoutQueue1,FanoutExchange fanoutExchange){//spring会去找beanName='fanoutQueue1'的bean作为参数传进来
  26.         return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
  27.     }
  28.     @Bean
  29.     public Queue fanoutQueue2(){
  30.         return QueueBuilder.durable("fanout.queue2").build();
  31.     }
  32.     @Bean
  33.     public Binding fanoutQueue2Binding(Queue fanoutQueue2,FanoutExchange fanoutExchange){//spring会去找beanName='fanoutQueue2'的bean作为参数传进来
  34.         return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
  35.     }
  36. }
复制代码

  1. @Component
  2. @Slf4j
  3. public class SpringRabbitListener {
  4.     @RabbitListener(bindings = @QueueBinding( //将交换机和队列绑定
  5.             value = @Queue(name="direct.queue1",durable = "true"), //如果没有队列direct.queue1则创建队列,并监听队列direct.queue1
  6.             exchange = @Exchange(name = "hmall.direct",type = ExchangeTypes.DIRECT), //如果没有交换机hmall.direct则创建交换机
  7.             key = {"blue","red"} //routingKey
  8.             ))
  9.     public void listenerWorkQueue1(String message){
  10.         log.info("消费者1接收到消息:{},---{}",message, LocalTime.now());
  11.     }
  12.     @RabbitListener(bindings = @QueueBinding(
  13.             value = @Queue(name="direct.queue2",durable = "true"),
  14.             exchange = @Exchange(name = "hmall.direct",type = ExchangeTypes.DIRECT),
  15.             key = {"yellow","red"}
  16.     ))
  17.     public void listenerWorkQueue2(String message){
  18.         log.info("消费者2接收到消息:{},---{}",message,LocalTime.now());
  19.     }
  20. }
复制代码
发送消息


吸收消息

  1. @Component
  2. @Slf4j
  3. public class SpringRabbitListener {
  4.     @RabbitListener(queues = "队列名")
  5.     public void listenerSimpleQueue(String message){
  6.         log.info("消费者收到消息:{}",message);
  7.     }
  8. }
复制代码
配置消息转换器

convertAndSend方法会先将消息举行序列化,然后再发送。
Spring的对消息对象的处置惩罚是由org.springframework.amap.support.converter.Messageconverter来处置惩罚的。而
默认实现是SimpleMessageConverter,如果消息实现了Serializable接口,则会利用serialize方法举行序列化,而serialize方法是基于JDK的Objectoutputstream完成序列化的。存在下列问题:


建议采用JSON序列化取代默认的JDK序列化,要做两件事情:
测试:

消息可靠性

消息丢失三种情况:

发送者的可靠性

发送者重连

有的时候由于网络颠簸,可能会出现发送者连接MQ失败的情况。通过配置我们可以开启连接失败后的重连机制,默认重连机制是关闭的。
  1. spring:
  2.   rabbitmq:
  3.     connection-timeout: 1s #设置MQ的连接超时时间,超过1秒钟还没有连上MQ则表示连接超时
  4.     template:
  5.       retry:
  6.         enabled: true # 开启超时重试机制
  7.         initial-interval: 1000ms # 失败后的初始等待时间
  8.         multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 initial-interval * multiplier
  9.         max-attempts: 3 # 最大重试次数
复制代码
案例演示:
注意:当网络不稳定的时候,利用重试机制可以有效进步消息发送的乐成率。不过SpringAMQP提供的重试机制是壅闭式的重试,也就是说多次重试期待的过程中,当前线程是被壅闭的,会影响业务性能,如果对于业务性能有要求,建议禁用重试机制。如果一定要利用,请合理配置期待时长和重试次数,固然也可以考虑利用异步线程来实行发送消息的代码。
发送者确认

SpringAMQP提供了Publisher Confirm和Publisher Return两种确认机制。开启确人机制后,当发送者发送消息给MQ后,MQ会返回确认结果给发送者。返回的结果有以下几种情况:


开开导送者确认机制:
案例演示:
注意:发送者确认机制需要发送者和MQ举行确认,会大大影响消息发送的服从,通常情况下不建议开开导送者确认机制。
MQ的可靠性

在默认情况下,RabbitMQ会将吸收到的信息保存在内存中以降低消息收发的延迟。这样会导致两个问题:

数据长期化

RabbitMQ实现数据长期化包括3个方面,设置为长期化后,重启MQ,互换机、队列、消息也不会丢失。

案例演示:
MQ吸收非长期化消息
发送者发送1百万条非长期化消息

发送耗时:

MQ收到了一百万条非长期化消息
注意:本测试利用的MQ是3.13.3,默认利用的是Lazy Queue模式:所有的消息直接存入磁盘,不再存储到内存,所以In memory显示1。(paged out代表的就是从内存移动到磁盘中的消息的数量,Persistent表现存入磁盘且长期化的消息的数量)

重启MQ后,一百万条非长期化消息全部丢失

MQ吸收长期化消息
发送者发送1百万条长期化消息

发送耗时:

MQ收到了一百万条长期化消息
注意:本测试利用的MQ是3.13.3,默认利用的是Lazy Queue模式:所有的消息直接存入磁盘,不再存储到内存,所以In memory显示1。(paged out代表的就是从内存移动到磁盘中的消息的数量,Persistent表现存入磁盘且长期化的消息的数量)

重启MQ后,一百万条长期化消息不会丢失

结论
在吸收非长期化消息时,MQ收到消息后会先将消息存到内存中的队列中,队列满了之后会把先收到的消息存到磁盘中(这个行为称为paged out,paged out会导致MQ壅闭),然后再继承吸收消息,把消息存进内存中的队列中,队列满了之后再把队列中的消息存入磁盘中,以此类推。
在吸收长期化消息时,MQ会直接将消息存到磁盘中,不会等内存中的队列满了之后再将消息保存到磁盘中。
发送一万万条非长期化消息耗时:

发送一万万条长期化消息耗时:

从上面发送者发送一百万条消息的耗时来看,发送长期化消息比发送非长期化消息耗时更少(不需要paged out),而且长期化消息在MQ重启后不会丢失,所以建议发送长期化消息。
Lazy Queue

从RabbitMQ的3.6.0版本开始,就增长了Lazy Queue的概念,也就是惰性队列。
惰性队列的特性如下:

在3.12版本后,所有队列都是Lazy Queue模式,无法更改。
3.12版本之前的MQ设置Lazy Queue模式有三种方式:

非Lazy Queue模式+长期化消息和Lazy Queue模式+长期化消息MQ吸收消息速度对比:

消费者的可靠性

消费者确认机制

消费者确认机制(Consumer Acknowledgement)是为了确认消费者是否乐成处置惩罚消息。MQ将一条消息发送给消费者后,MQ上的这条消息处置惩罚待确认状态,当消费者处置惩罚消息结束后,应该向RabbitMO发送一个回执,告知RabbitMQ自己消息处置惩罚状态:


SpringAMQP已经实现了消息确认功能。并答应我们通过配置文件选择ACK处置惩罚方式,有三种方式:

案例演示-自动模式
案例演示-手动模式
失败重试机制

SpringAMQP提供了消费者失败重试机制,在消费者出现非常时利用当地重试,而不是无穷的requeue到mq。我们可以通过在application.yaml文件中添加配置来开启重试机制:

案例演示
  1. spring:
  2.   rabbitmq:
  3.     host: 192.168.1.2 # RabbitMQ地址
  4.     port: 5672 # 端口
  5.     virtual-host: /hmall # 虚拟主机
  6.     username: jack # 用户名
  7.     password: jack # 密码
  8.     listener:      simple:        prefetch: 1        acknowledge-mode: auto        retry:          enabled: true          initial-interval: 1000ms          multiplier: 1          max-attempts: 3          stateless: true
复制代码
在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处置惩罚,它包含三种不同的实现:

将失败处置惩罚策略改为RepublishMessageRecoverer:
案例演示

业务幂等性

幂等是一个数学概念,用函数表达式来形貌是这样的:f(x)=f(f(x)),例如求绝对值的函数。在步伐开发中,则是指同一个业务,实行一次或多次对业务状态的影响是一致的。

消除非幂等性的手段

案例演示:


延迟消息

延迟消息:发送者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间之后才收到消息。
延迟任务:设置在一定时间之后才实行的任务

死信互换机

当一个队列中的消息满意下列情况之一时,就会成为死信 (dead letter)

如果队列通过dead-letter-exchange属性指定了一个互换机,那么该队列中的死信就会投递到这个互换机中。这个互换机称为死信互换机(Dead Letter Exchange,简称DLX)

案例演示
延迟消息插件

利用死信队列可以实现延迟消息,但这种方法过于繁琐。为了简化这一过程,RabbitMQ的官方推出了一款插件,该插件原生支持延迟消息功能。该插件的运作原理是计划了一种特别的互换机,当消息投递到这种互换机时,它能够暂存一段时间,直到到达设定的延迟时间后再将消息投递到相应的队列。这种计划大大简化了延迟消息的处置惩罚过程,进步了系统的服从和可靠性。
下载
下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
选择下载的版本,插件的版本要和RabbitMQ的版本保持一致

安装

利用
本文参考文档

https://b11et3un53m.feishu.cn/wiki/A9SawKUxsikJ6dk3icacVWb4n3g
https://blog.csdn.net/karry_zzj/article/details/119513541
https://blog.csdn.net/weixin_42050545/article/details/121487823

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4