ToB企服应用市场:ToB评测及商务社交产业平台

标题: 【RabbitMQ】RabbitMQ 的根本认识 [打印本页]

作者: 悠扬随风    时间: 2024-11-3 06:33
标题: 【RabbitMQ】RabbitMQ 的根本认识


目录
消息中间件根本概念和应用场景
根本概念
应用场景
消息队列优缺点
RabbitMQ 先容
特点
根本结构
生产者与消费者
消息路由
消费者消息确认


消息中间件根本概念和应用场景

根本概念

消息中间件:消息中间件是基于队列与消息通报技术,在网络环境中为应用系统提供同步或异步、可靠的消息传输的支持性软件系统(引用360百科)
应用场景

1. 异步处置处罚

异步任务处置处罚 


日志处置处罚


2. 解耦

服务解耦


模块解耦


3. 负载均衡

负载均衡


消息队列


4. 数据同步

数据同步


事件驱动


5. 流处置处罚

流式数据处置处罚


实时盘算


6. 微服务架构

微服务通讯


服务编排


7. 大规模系统 

分布式系统


容错与重试


8. 实时通讯

实时消息推送


实时数据传输


消息队列优缺点

长处

在上面全部提到的使用场景都属于消息中间件的有点。
缺点

系统可用性低落

引入消息队列后,系统多了一个潜伏的故障点。假如消息队列服务不可用,可能会导致整个系统的通讯制止,影响业务连续性。因此,需要接纳冗余部署、负载均衡、故障转移等策略来提高消息队列的可用性。

系统复杂度提高

消息队列的引入增长了系统的复杂度,需要处置处罚消息的重试、去重、长期化、顺序性包管等问题。此外,还需要考虑消息队列的监控、告警和运维等方面。

一致性问题

在分布式系统中,消息队列可能导致数据不一致的问题。例如,假如多个系统依靠于消息队列进行状态更新,而此中一个系统处置处罚失败,可能会导致数据不一致。为了办理这个问题,可能需要接纳事件性消息、补偿事件等机制来确保数据的一致性。

性能开销

消息队列的引入可能会增长系统的延迟和开销,特别是在高并发场景下。这包括消息的序列化/反序列化、网络传输、消息存储和检索等。因此,需要公道设计消息队列的架构和参数,以均衡性能和开销。

运维成本

消息队列的运维和管理需要肯定的专业知识和技能。这包括设置管理、性能调优、故障排查和恢复等方面。此外,还需要定期备份和恢复消息队列的数据,以确保数据的可靠性和安全性。


总之,消息队列在提供解耦、异步处置处罚和削峰填谷等优势的同时,也带来了系统可用性低落、复杂度提高、一致性问题、性能开销和运维成本等挑衅。因此,在引入消息队列时,需要充实考虑这些优缺点,并接纳相应的技术步调和架构设计来规避和办理这些问题。

RabbitMQ 先容


特点

RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通讯方法,消息队列在分布式系统开发中应用非常广泛。


RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。

AMQP的重要特性是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。


AMQP协议更多用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。


RabbitMQ的可靠性是非常好的,数据能够包管百分之百的不丢失。可以使用镜像队列,它的稳定性非常好。以是说在我们互联网的金融行业。对数据的稳定性和可靠性要求都非常高的环境下,我们都会选择RabbitMQ。当然没有kafka性能好,但是要比AvtiveMQ性能要好许多。也可以自己做一些性能的优化。

根本结构




构成部分说明:
Broker:消息队列服务历程,此历程包括两个部分:Exchange和Queue
Exchange:消息队列互换机,按肯定的规则将消息路由转发到某个队列,对消息进行过虑。
Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的消费者
Producer:消息生产者,即生产方客户端,生产方客户端将消息发送
Consumer:消息消费者,即消费方客户端,接收MQ转发的消息。
Connection: 连接. 是客户端和RabbitMO服务器之间的一个TCP连接, 这个连接是建立消息通报的基础,它负责传输客户端和服务器之间的全部数据和控制信息.
Channel: 通道, 信道. Channel是在Connection之上的一个抽象层, 在 RabbitMO中, 一个TCP连接可以有多个Channel, 每个Channel都是独立的虚拟连接, 消息的发送和接收都是基于 Channel的.重要作用是将消息的读写操作复用到同一个TCP连接上,如许可以淘汰建立和关闭连接的开销提高性能.
Virtual host: 虚拟主机. 这是一个虚拟概念, 它为消息队列提供了一种逻辑上的隔离机制. 对于RabbitMQ而言, 一个 BrokerServer 上可以存在多个 Virtual Host. 当多个差别的用户使用同一个RabbitMQ Server 提供的服务时,可以虚拟分别出多个vhost,每个用户在自己的 vhost 创建exchange/queue等

生产者与消费者

在RabbitMQ中,生产者(Producer)负责发送消息,通常是应用程序向RabbitMQ服务器发送具有特定路由键的消息;消费者(Consumer)则负责处置处罚接收到的这些消息。在RabbitMQ中,生产者和消费者之间使用互换器(Exchange)和队列(Queue)进行消息路由和存储。生产者将消息发送到互换器,互换器根据消息的路由键将其放入相应的队列中,末了消费者从队列中获取并处置处罚这些消息。
生产者发送消息流程:
1、生产者和Broker建立TCP连接。
2、生产者和Broker建立通道。
3、生产者通过通道消息发送给Broker,由Exchange将消息进行转发。
4、Exchange将消息转发到指定的Queue(队列)

消费者接收消息流程:
1、消费者和Broker建立TCP连接
2、消费者和Broker建立通道
3、消费者监听指定的Queue(队列)
4、当有消息到达Queue时Broker默认将消息推送给消费者。
5、消费者接收到消息。
6、ack回复

消息路由

Direct Exchange (直接互换器)



将一个队列绑定到某个互换机上,同时赋予该绑定一个路由键(routing key)
当一个携带着路由值为R的消息被发送给直连互换机时,互换机会把它路 由给绑定值同样为R的队列。
如上图,X绑定了两个队列,并声明为direct。队列Q1绑定的键为orange,队列Q2绑定的键为black和green。生成者发布消息到Exchange上,绑定键为orange的消息会被路由到Q1中,绑定键为black和green的消息则会发送到Q2队列中。若还有其他类型的消息则被扬弃,没有与之绑定的队列。

  1. @Configuration
  2. public class DirectRabbitConfig {
  3.     //队列 起名:TestDirectQueue
  4.     @Bean
  5.     public Queue TestDirectQueue() {
  6.         // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
  7.         // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
  8.         // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
  9.         // return new Queue("TestDirectQueue",true,true,false);
  10.         //一般设置一下队列的持久化就好,其余两个就是默认false
  11.         return new Queue("TestDirectQueue",true);
  12.     }
  13.     //Direct交换机 起名:TestDirectExchange
  14.     @Bean
  15.     DirectExchange TestDirectExchange() {
  16.       //  return new DirectExchange("TestDirectExchange",true,true);
  17.         return new DirectExchange("TestDirectExchange",true,false);
  18.     }
  19.     //绑定  
  20.     //将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting
  21.     @Bean
  22.     Binding bindingDirect() {
  23.         return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");
  24.     }
  25.      
  26.     @Bean
  27.     DirectExchange lonelyDirectExchange() {
  28.         return new DirectExchange("lonelyDirectExchange");
  29.     }
  30. }
复制代码
Fanout Exchange (扇形互换器)





  1. @Configuration
  2. public class FanoutRabbitConfig {
  3.     /**
  4.      *  创建三个队列 :fanout.A   fanout.B  fanout.C
  5.      *  将三个队列都绑定在交换机 fanoutExchange 上
  6.      *  因为是扇型交换机, 路由键无需配置,配置也不起作用
  7.      */
  8.     @Bean
  9.     public Queue queueA() {
  10.         return new Queue("fanout.A");
  11.     }
  12.     @Bean
  13.     public Queue queueB() {
  14.         return new Queue("fanout.B");
  15.     }
  16.     @Bean
  17.     public Queue queueC() {
  18.         return new Queue("fanout.C");
  19.     }
  20.     @Bean
  21.     FanoutExchange fanoutExchange() {
  22.         return new FanoutExchange("fanoutExchange");
  23.     }
  24.     @Bean
  25.     Binding bindingExchangeA() {
  26.         return BindingBuilder.bind(queueA()).to(fanoutExchange());
  27.     }
  28.     @Bean
  29.     Binding bindingExchangeB() {
  30.         return BindingBuilder.bind(queueB()).to(fanoutExchange());
  31.     }
  32.     @Bean
  33.     Binding bindingExchangeC() {
  34.         return BindingBuilder.bind(queueC()).to(fanoutExchange());
  35.     }
  36. }
复制代码
Topic Exchange (主题互换器)



* (星号) 用来表现一个单词 (必须出现的)
# (井号) 用来表现恣意数量(零个或多个)单词
如上图,在X绑定了三个主题的消息队列,我们来看看他们之间数据接收环境是怎么样的
quick.orange.rabbit                    被队列 Q1Q2 接收到
lazy.orange.elephant                 被队列 Q1Q2 接收到
quick.orange.fox                        被队列 Q1 接收到
lazy.brown.fox                            被队列 Q2 接收到
lazy.pink.rabbit                           固然满足两个绑定但只被队列 Q2 接收一次
quick.brown.fox                         不匹配任何绑定不会被任何队列接收到会被扬弃
quick.orange.male.rabbit           是四个单词不匹配任何绑定会被扬弃
lazy.orange.male.rabbit              是四个单词但匹配 Q2
当队列绑定关系是下列这种环境时需要引起注意
   当一个队列绑定键是#,那么这个队列将接收全部数据,就有点像 fanout了
   假如队列绑定键当中没有#和*出现,那么该队列绑定类型就是 direct 了

  1. @Configuration
  2. public class TopicRabbitConfig {
  3.     //绑定键
  4.     public final static String topic1 = "lazy.orange.elephant";
  5.     public final static String topic2 = "quick.orange.rabbit";
  6.     @Bean
  7.     public Queue firstQueue() {
  8.         return new Queue(TopicRabbitConfig.topic1);
  9.     }
  10.     @Bean
  11.     public Queue secondQueue() {
  12.         return new Queue(TopicRabbitConfig.topic2);
  13.     }
  14.     @Bean
  15.     TopicExchange exchange() {
  16.         return new TopicExchange("topicExchange");
  17.     }
  18.     //将firstQueue和topicExchange绑定,而且绑定的键值为lazy.orange.elephant
  19.     //这样只要是消息携带的路由键是lazy.orange.elephant,才会分发到该队列
  20.     @Bean
  21.     Binding bindingExchangeMessage() {
  22.         return BindingBuilder.bind(firstQueue()).to(exchange()).with(topic1);
  23.     }
  24.     //将secondQueue和topicExchange绑定,而且绑定的键值为用上通配路由键规则#.orange.#
  25.     // 这样只要是消息携带的路由键是包含有orange,都会分发到该队列
  26.     @Bean
  27.     Binding bindingExchangeMessage2() {
  28.         return BindingBuilder.bind(secondQueue()).to(exchange()).with("#.orange.#");
  29.     }
  30. }
复制代码
消费者消息确认


这也是默认的消息确认环境。 AcknowledgeMode.NONE
RabbitMQ成功将消息发出(即将消息成功写入TCP Socket)中立即认为本次投递已经被精确处置处罚,不管消费者端是否成功处置处罚本次投递。
以是这种环境假如消费端消费逻辑抛出非常,也就是消费端没有处置处罚成功这条消息,那么就相称于丢失了消息。
一样寻常这种环境我们都是使用try catch捕捉非常后,打印日志用于追踪数据,如许找出对应数据再做后续处置处罚。


默认的消息是自动确认,需要更改设置文件:
  1. spring:
  2.   rabbitmq:
  3.     listener:
  4.       simple:
  5.         acknowledge-mode: manual
复制代码
或在 RabbitListenerContainerFactory 中进行开启手动 ack:

  1. @Bean
  2. public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
  3.     SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
  4.     factory.setConnectionFactory(connectionFactory);
  5.     factory.setMessageConverter(new Jackson2JsonMessageConverter());
  6.     factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);             //开启手动 ack
  7.     return factory;
  8. }
复制代码

消费者收到消息后,手动调用channel.basicAck/channel.basicNack/channel.basicReject后,RabbitMQ收到这些消息后,才认为本次投递成功。
channel.basicAck用于肯定确认
channel.basicNack用于否定确认(注意:这是AMQP 0-9-1的RabbitMQ扩展)
channel.basicReject用于否定确认,但与channel.basicNack相比有一个限定:一次只能拒绝单条消息
消费者端以上的3个方法都表现消息已经被精确投递,但是channel.basicAck表现消息已经被精确处置处罚。而channel.basicNack,channel.basicReject表现没有被精确处置处罚.

channel.basicAck(deliveryTag,false);
消费消息的手动确认,消息确认成功-basicAck,第一个参数deliveryTag,消息的唯一标识,第二个参数multiple,消息是否支持批量确认,假如是true,代表可以一次性确认标识小于等于当前标识的全部消息,假如是false,只会确认当前消息

channel.basicReject(deliveryTag, true);
拒绝消费当前消息,假如第二参数传入true,就是将数据重新丢回队列里,那么下次还会消费这消息。设置false,就是告诉服务器,我已经知道这条消息数据了,由于一些原因拒绝它,而且服务器把这个消息丢掉就行,下次不想再消费这条消息了。
使用拒绝后重新入列这个确认模式要谨慎,由于一样寻常都是出现非常的时间,catch非常再拒绝入列,选择是否重入列。但是假如使用不当会导致一些每次都被你重入列的消息一直消费-入列-消费-入列如许循环,会导致消息积压。

channel.basicNack(deliveryTag, false, true);
不消费当前消息,第一个参数依然是当前消息到的数据的唯一id;第二个参数是指是否针对多条消息;假如是true,也就是说一次性针对当前通道的消息的tagID小于当前这条消息的,都拒绝确认。第三个参数是指是否重新入列,也就是指不确认的消息是否重新丢回到队列里面去。
同样使用不确认后重新入列这个确认模式要谨慎,由于这里也可能由于考虑不周出现消息一直被重新丢回去的环境,导致积压。


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4