【RabbitMQ】RabbitMQ 的根本认识

打印 上一主题 下一主题

主题 848|帖子 848|积分 2544



目录
消息中间件根本概念和应用场景
根本概念
应用场景
消息队列优缺点
RabbitMQ 先容
特点
根本结构
生产者与消费者
消息路由
消费者消息确认


消息中间件根本概念和应用场景

根本概念

消息中间件:消息中间件是基于队列与消息通报技术,在网络环境中为应用系统提供同步或异步、可靠的消息传输的支持性软件系统(引用360百科)
应用场景

1. 异步处置处罚

异步任务处置处罚 



  •  场景:将耗时较长的任务异步处置处罚,提高系统响应速率。
  • 示例:发送邮件、短信、文件上传等。
日志处置处罚



  • 场景:收集各个服务的日志,并异步处置处罚和存储。
  • 示例:日志聚合、日志分析、日志告警等
2. 解耦

服务解耦



  • 场景:差别服务之间通过消息通报,淘汰直接依靠。
  • 示例:订单系统与库存系统、支付系统之间的解耦。
模块解耦



  • 场景:差别模块之间通过消息通报,低落模块间的耦合度。
  • 示例:前端与后端之间的消息通报。
3. 负载均衡

负载均衡



  • 场景:将任务均匀分配给多个处置处罚节点,提高处置处罚服从。
  • 示例:分布式任务调度、批处置处罚任务等。
消息队列



  • 场景:使用消息队列作为中间层,均衡生产者和消费者的负载。
  • 示例:消息队列作为缓存层,防止瞬时高峰流量冲击下游服务。
4. 数据同步

数据同步



  • 场景:在多个系统之间同步数据,包管数据的一致性。
  • 示例:数据库同步、缓存同步、日志同步等。
事件驱动



  • 场景:基于事件驱动模子,实现系统的实时响应。
  • 示例:订单创建事件、用户登录事件等。
5. 流处置处罚

流式数据处置处罚



  • 场景:处置处罚实时数据流,进行实时分析和处置处罚。
  • 示例:实时数据分析、实时监控、实时告警等。
实时盘算



  • 场景:对实时数据进行盘算和处置处罚。
  • 示例:实时交易数据处置处罚、实时用户行为分析等。
6. 微服务架构

微服务通讯



  • 场景:微服务之间通过消息中间件进行通讯,实现松耦合。
  • 示例:微服务之间的事件通知、状态更新等。
服务编排



  • 场景:和谐多个微服务之间的交互流程。
  • 示例:复杂的业务流程编排、事件处置处罚等。
7. 大规模系统 

分布式系统



  • 场景:在大规模分布式系统中,通过消息中间件实现可靠的消息通报。
  • 示例:分布式事件处置处罚、分布式锁等。
容错与重试



  • 场景:在系统出现故障时,通过消息中间件实现容错和重试机制。
  • 示例:消息重试、故障恢复等。
8. 实时通讯

实时消息推送



  • 场景:实现实时消息推送,提高用户体验。
  • 示例:即时通讯、在线聊天室、实时通知等。
实时数据传输



  • 场景:实现实时数据传输,包管数据的实时性。
  • 示例:股票行情数据、物联网数据等。
消息队列优缺点

长处

在上面全部提到的使用场景都属于消息中间件的有点。
缺点

系统可用性低落

引入消息队列后,系统多了一个潜伏的故障点。假如消息队列服务不可用,可能会导致整个系统的通讯制止,影响业务连续性。因此,需要接纳冗余部署、负载均衡、故障转移等策略来提高消息队列的可用性。

系统复杂度提高

消息队列的引入增长了系统的复杂度,需要处置处罚消息的重试、去重、长期化、顺序性包管等问题。此外,还需要考虑消息队列的监控、告警和运维等方面。

一致性问题

在分布式系统中,消息队列可能导致数据不一致的问题。例如,假如多个系统依靠于消息队列进行状态更新,而此中一个系统处置处罚失败,可能会导致数据不一致。为了办理这个问题,可能需要接纳事件性消息、补偿事件等机制来确保数据的一致性。

性能开销

消息队列的引入可能会增长系统的延迟和开销,特别是在高并发场景下。这包括消息的序列化/反序列化、网络传输、消息存储和检索等。因此,需要公道设计消息队列的架构和参数,以均衡性能和开销。

运维成本

消息队列的运维和管理需要肯定的专业知识和技能。这包括设置管理、性能调优、故障排查和恢复等方面。此外,还需要定期备份和恢复消息队列的数据,以确保数据的可靠性和安全性。


总之,消息队列在提供解耦、异步处置处罚和削峰填谷等优势的同时,也带来了系统可用性低落、复杂度提高、一致性问题、性能开销和运维成本等挑衅。因此,在引入消息队列时,需要充实考虑这些优缺点,并接纳相应的技术步调和架构设计来规避和办理这些问题。

RabbitMQ 先容


特点

RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通讯方法,消息队列在分布式系统开发中应用非常广泛。


RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。

AMQP的重要特性是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。


AMQP协议更多用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。


RabbitMQ的可靠性是非常好的,数据能够包管百分之百的不丢失。可以使用镜像队列,它的稳定性非常好。以是说在我们互联网的金融行业。对数据的稳定性和可靠性要求都非常高的环境下,我们都会选择RabbitMQ。当然没有kafka性能好,但是要比AvtiveMQ性能要好许多。也可以自己做一些性能的优化。

根本结构




构成部分说明:
Broker:消息队列服务历程,此历程包括两个部分:Exchange和Queue
Exchange:消息队列互换机,按肯定的规则将消息路由转发到某个队列,对消息进行过虑。
Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的消费者
Producer:消息生产者,即生产方客户端,生产方客户端将消息发送
Consumer:消息消费者,即消费方客户端,接收MQ转发的消息。
Connection: 连接. 是客户端和RabbitMO服务器之间的一个TCP连接, 这个连接是建立消息通报的基础,它负责传输客户端和服务器之间的全部数据和控制信息.
Channel: 通道, 信道. Channel是在Connection之上的一个抽象层, 在 RabbitMO中, 一个TCP连接可以有多个Channel, 每个Channel都是独立的虚拟连接, 消息的发送和接收都是基于 Channel的.重要作用是将消息的读写操作复用到同一个TCP连接上,如许可以淘汰建立和关闭连接的开销提高性能.
Virtual host: 虚拟主机. 这是一个虚拟概念, 它为消息队列提供了一种逻辑上的隔离机制. 对于RabbitMQ而言, 一个 BrokerServer 上可以存在多个 Virtual Host. 当多个差别的用户使用同一个RabbitMQ Server 提供的服务时,可以虚拟分别出多个vhost,每个用户在自己的 vhost 创建exchange/queue等

生产者与消费者

在RabbitMQ中,生产者(Producer)负责发送消息,通常是应用程序向RabbitMQ服务器发送具有特定路由键的消息;消费者(Consumer)则负责处置处罚接收到的这些消息。在RabbitMQ中,生产者和消费者之间使用互换器(Exchange)和队列(Queue)进行消息路由和存储。生产者将消息发送到互换器,互换器根据消息的路由键将其放入相应的队列中,末了消费者从队列中获取并处置处罚这些消息。
生产者发送消息流程:
1、生产者和Broker建立TCP连接。
2、生产者和Broker建立通道。
3、生产者通过通道消息发送给Broker,由Exchange将消息进行转发。
4、Exchange将消息转发到指定的Queue(队列)

消费者接收消息流程:
1、消费者和Broker建立TCP连接
2、消费者和Broker建立通道
3、消费者监听指定的Queue(队列)
4、当有消息到达Queue时Broker默认将消息推送给消费者。
5、消费者接收到消息。
6、ack回复

消息路由

Direct Exchange (直接互换器)




  • 根本用法:直连型互换机(direct exchange)是根据消息携带的路由键(routing key)将消息投递给对应队列的,步调如下:
将一个队列绑定到某个互换机上,同时赋予该绑定一个路由键(routing key)
当一个携带着路由值为R的消息被发送给直连互换机时,互换机会把它路 由给绑定值同样为R的队列。
如上图,X绑定了两个队列,并声明为direct。队列Q1绑定的键为orange,队列Q2绑定的键为black和green。生成者发布消息到Exchange上,绑定键为orange的消息会被路由到Q1中,绑定键为black和green的消息则会发送到Q2队列中。若还有其他类型的消息则被扬弃,没有与之绑定的队列。


  • 示例代码:实现简单的 Direct Exchange 示例。
  1. @Configuration
  2. public class DirectRabbitConfig {
  3.     //队列 起名:TestDirectQueue
  4.     @Bean
  5.     public Queue TestDirectQueue() {
  6.         // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
  7.         // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
  8.         // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
  9.         // return new Queue("TestDirectQueue",true,true,false);
  10.         //一般设置一下队列的持久化就好,其余两个就是默认false
  11.         return new Queue("TestDirectQueue",true);
  12.     }
  13.     //Direct交换机 起名:TestDirectExchange
  14.     @Bean
  15.     DirectExchange TestDirectExchange() {
  16.       //  return new DirectExchange("TestDirectExchange",true,true);
  17.         return new DirectExchange("TestDirectExchange",true,false);
  18.     }
  19.     //绑定  
  20.     //将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting
  21.     @Bean
  22.     Binding bindingDirect() {
  23.         return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");
  24.     }
  25.      
  26.     @Bean
  27.     DirectExchange lonelyDirectExchange() {
  28.         return new DirectExchange("lonelyDirectExchange");
  29.     }
  30. }
复制代码
Fanout Exchange (扇形互换器)




  • 根本用法:广播模式,将消息发送给全部绑定的队列。扇型互换机(funout exchange)将消息路由给绑定到它身上的全部队列。差别于直连互换机,路由键在此类型上不启任务作用。假如N个队列绑定到某个扇型互换机上,当有消息发送给此扇型互换机时,互换机会将消息的发送给这全部的N个队列



  • 示例代码:实现简单的 Fanout Exchange 示例。
  1. @Configuration
  2. public class FanoutRabbitConfig {
  3.     /**
  4.      *  创建三个队列 :fanout.A   fanout.B  fanout.C
  5.      *  将三个队列都绑定在交换机 fanoutExchange 上
  6.      *  因为是扇型交换机, 路由键无需配置,配置也不起作用
  7.      */
  8.     @Bean
  9.     public Queue queueA() {
  10.         return new Queue("fanout.A");
  11.     }
  12.     @Bean
  13.     public Queue queueB() {
  14.         return new Queue("fanout.B");
  15.     }
  16.     @Bean
  17.     public Queue queueC() {
  18.         return new Queue("fanout.C");
  19.     }
  20.     @Bean
  21.     FanoutExchange fanoutExchange() {
  22.         return new FanoutExchange("fanoutExchange");
  23.     }
  24.     @Bean
  25.     Binding bindingExchangeA() {
  26.         return BindingBuilder.bind(queueA()).to(fanoutExchange());
  27.     }
  28.     @Bean
  29.     Binding bindingExchangeB() {
  30.         return BindingBuilder.bind(queueB()).to(fanoutExchange());
  31.     }
  32.     @Bean
  33.     Binding bindingExchangeC() {
  34.         return BindingBuilder.bind(queueC()).to(fanoutExchange());
  35.     }
  36. }
复制代码
Topic Exchange (主题互换器)




  • 根本用法:基于主题模式,使用通配符进行匹配。
* (星号) 用来表现一个单词 (必须出现的)
# (井号) 用来表现恣意数量(零个或多个)单词
如上图,在X绑定了三个主题的消息队列,我们来看看他们之间数据接收环境是怎么样的
quick.orange.rabbit                    被队列 Q1Q2 接收到
lazy.orange.elephant                 被队列 Q1Q2 接收到
quick.orange.fox                        被队列 Q1 接收到
lazy.brown.fox                            被队列 Q2 接收到
lazy.pink.rabbit                           固然满足两个绑定但只被队列 Q2 接收一次
quick.brown.fox                         不匹配任何绑定不会被任何队列接收到会被扬弃
quick.orange.male.rabbit           是四个单词不匹配任何绑定会被扬弃
lazy.orange.male.rabbit              是四个单词但匹配 Q2
当队列绑定关系是下列这种环境时需要引起注意
   当一个队列绑定键是#,那么这个队列将接收全部数据,就有点像 fanout了
   假如队列绑定键当中没有#和*出现,那么该队列绑定类型就是 direct 了


  • 示例代码:实现简单的 Topic Exchange 示例。
  1. @Configuration
  2. public class TopicRabbitConfig {
  3.     //绑定键
  4.     public final static String topic1 = "lazy.orange.elephant";
  5.     public final static String topic2 = "quick.orange.rabbit";
  6.     @Bean
  7.     public Queue firstQueue() {
  8.         return new Queue(TopicRabbitConfig.topic1);
  9.     }
  10.     @Bean
  11.     public Queue secondQueue() {
  12.         return new Queue(TopicRabbitConfig.topic2);
  13.     }
  14.     @Bean
  15.     TopicExchange exchange() {
  16.         return new TopicExchange("topicExchange");
  17.     }
  18.     //将firstQueue和topicExchange绑定,而且绑定的键值为lazy.orange.elephant
  19.     //这样只要是消息携带的路由键是lazy.orange.elephant,才会分发到该队列
  20.     @Bean
  21.     Binding bindingExchangeMessage() {
  22.         return BindingBuilder.bind(firstQueue()).to(exchange()).with(topic1);
  23.     }
  24.     //将secondQueue和topicExchange绑定,而且绑定的键值为用上通配路由键规则#.orange.#
  25.     // 这样只要是消息携带的路由键是包含有orange,都会分发到该队列
  26.     @Bean
  27.     Binding bindingExchangeMessage2() {
  28.         return BindingBuilder.bind(secondQueue()).to(exchange()).with("#.orange.#");
  29.     }
  30. }
复制代码
消费者消息确认



  • 自动确认
这也是默认的消息确认环境。 AcknowledgeMode.NONE
RabbitMQ成功将消息发出(即将消息成功写入TCP Socket)中立即认为本次投递已经被精确处置处罚,不管消费者端是否成功处置处罚本次投递。
以是这种环境假如消费端消费逻辑抛出非常,也就是消费端没有处置处罚成功这条消息,那么就相称于丢失了消息。
一样寻常这种环境我们都是使用try catch捕捉非常后,打印日志用于追踪数据,如许找出对应数据再做后续处置处罚。



  • 手动确认
默认的消息是自动确认,需要更改设置文件:
  1. spring:
  2.   rabbitmq:
  3.     listener:
  4.       simple:
  5.         acknowledge-mode: manual
复制代码
或在 RabbitListenerContainerFactory 中进行开启手动 ack:

  1. @Bean
  2. public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
  3.     SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
  4.     factory.setConnectionFactory(connectionFactory);
  5.     factory.setMessageConverter(new Jackson2JsonMessageConverter());
  6.     factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);             //开启手动 ack
  7.     return factory;
  8. }
复制代码

消费者收到消息后,手动调用channel.basicAck/channel.basicNack/channel.basicReject后,RabbitMQ收到这些消息后,才认为本次投递成功。
channel.basicAck用于肯定确认
channel.basicNack用于否定确认(注意:这是AMQP 0-9-1的RabbitMQ扩展)
channel.basicReject用于否定确认,但与channel.basicNack相比有一个限定:一次只能拒绝单条消息
消费者端以上的3个方法都表现消息已经被精确投递,但是channel.basicAck表现消息已经被精确处置处罚。而channel.basicNack,channel.basicReject表现没有被精确处置处罚.

channel.basicAck(deliveryTag,false);
消费消息的手动确认,消息确认成功-basicAck,第一个参数deliveryTag,消息的唯一标识,第二个参数multiple,消息是否支持批量确认,假如是true,代表可以一次性确认标识小于等于当前标识的全部消息,假如是false,只会确认当前消息

channel.basicReject(deliveryTag, true);
拒绝消费当前消息,假如第二参数传入true,就是将数据重新丢回队列里,那么下次还会消费这消息。设置false,就是告诉服务器,我已经知道这条消息数据了,由于一些原因拒绝它,而且服务器把这个消息丢掉就行,下次不想再消费这条消息了。
使用拒绝后重新入列这个确认模式要谨慎,由于一样寻常都是出现非常的时间,catch非常再拒绝入列,选择是否重入列。但是假如使用不当会导致一些每次都被你重入列的消息一直消费-入列-消费-入列如许循环,会导致消息积压。

channel.basicNack(deliveryTag, false, true);
不消费当前消息,第一个参数依然是当前消息到的数据的唯一id;第二个参数是指是否针对多条消息;假如是true,也就是说一次性针对当前通道的消息的tagID小于当前这条消息的,都拒绝确认。第三个参数是指是否重新入列,也就是指不确认的消息是否重新丢回到队列里面去。
同样使用不确认后重新入列这个确认模式要谨慎,由于这里也可能由于考虑不周出现消息一直被重新丢回去的环境,导致积压。


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

悠扬随风

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表