RabbitMQ 是一个强大的消息队列系统,基于 AMQP(高级消息队列协议),广泛应用于各种分布式系统中。
RabbitMQ 组件
生产者(Producer)
- 生产者是发送消息的应用程序,它将消息发送到 RabbitMQ 的交换机(Exchange)。
- 生产者可以选择指定路由键(routing key),用于决定消息应该被发送到哪个队列。
交换机(Exchange)
- 交换机是消息的路由器,负责将生产者发送的消息根据路由规则分发到相应的队列。
- 交换机类型决定了消息路由的运动,RabbitMQ 提供四种类型的交换机:
- Direct Exchange: 根据路由键将消息精确地路由到绑定该交换机的队列。
- Fanout Exchange: 将消息广播到所有绑定的队列,忽略路由键。
- Topic Exchange: 支持利用通配符举行模式匹配,将消息路由到一个或多个队列,基于路由键。
- Headers Exchange: 根据消息头部信息举行路由,而不是路由键。
队列(Queue)
- 队列是存储消息的容器,消息在队列中等待消耗者处理。
- 每个队列都有一个唯一的名称,生产者和消耗者通过这个名称举行通讯。
- 队列可以设置为持久化,以确保在 RabbitMQ 服务器重启时消息不会丢失。
消耗者(Consumer)
- 消耗者是吸收和处理消息的应用程序,从队列中取出消息并举行处理。
- 消耗者可以选择同步(拉取)或异步(推送)方式来吸收消息。
Broker
- RabbitMQ 服务器自己被称为 Broker,负责处理消息的吸收、存储和转发。
RabbitMQ 消息传递模型
RabbitMQ 的消息传递模型基于以下几个关键概念:
消息
- 消息是通过 RabbitMQ 举行传递的数据单位,可以包罗任意数据(如 JSON、XML、文本等)。
- 消息可以设置属性(如路由键、头信息、逾期时间等)。
路由键(Routing Key)
- 路由键是生产者发送消息时指定的标识符,交换机利用路由键决定消息的去向。
- 在 Direct 和 Topic 交换机中,路由键是重要的路由依据。
绑定(Binding)
- 绑定是将交换机和队列关联起来的关系,允许交换机将消息路由到特定的队列。
- 可以在绑定时指定路由键,以实现更精确的消息路由。
交换机类型
- Direct Exchange:
- Fanout Exchange:
- Topic Exchange:
- 消息根据路由键的模式举行路由,支持通配符(* 表现一个单词,# 表现零个或多个单词)。
- Headers Exchange:
- 通过消息的头部信息举行路由,路由键被忽略,支持复杂的路由逻辑。
工作原理
消息流转过程
- 生产者发送消息:
- 生产者将消息发送到交换机,通常需要指定交换机名称和路由键。
- 交换机路由消息:
- 交换机根据其类型和路由键将消息路由到一个或多个队列。
- 队列存储消息:
- 消耗者获取消息:
- 消息确认:
- 消耗者处理完消息后,发送确认消息(ACK)给 RabbitMQ,表明消息已被成功消耗。
- 消息重新入队:
- 如果消耗者未确认消息,RabbitMQ 会将该消息重新放回队列,确保消息不丢失。
消息持久化
- RabbitMQ 支持消息持久化,即将消息存储到磁盘,防止 RabbitMQ 服务崩溃时丢失消息。
- 为了实现持久化,生产者需要将消息标记为持久化(设置消息的 delivery_mode 为 2),并将队列声明为持久化。
如何包管消息不丢失
发送者消息不丢失
持久化消息: 在发送消息时,生产者可以将消息设置为持久化,通过将 delivery_mode 属性设置为 2,这样即使 RabbitMQ 服务器崩溃,持久化的消息依然会被保存
持久化队列:队列自己也应声明为持久化(durable),确保队列在 RabbitMQ 重启时仍然存在
确认机制(Publisher Confirms)
- 开启消息确认 :RabbitMQ 支持生产者确认机制(Publisher Confirms),这允许生产者在发送消息后吸收确认(ACK),以确保消息已成功到达 RabbitMQ。
- 启用确认机制:在发送消息之前,需要将通道设置为确认模式。RabbitMQ 会在消息成功存储后返回确认。
异常处理与重试机制:在发送消息时,要捕捉可能的异常,并根据需要举行重试。可以实现一个重试机制,在发送失败时重新尝试发送消息。
消耗者消息不丢失
手动消息确认(Manual Acknowledgments):消耗者在处理完消息后手动发送消息确认(ACK),只有在 RabbitMQ 收到确认后,消息才会从队列中移除。如果没有收到确认,RabbitMQ 会将消息重新入队,并将其分发给其他消耗者。
消息重试机制:
- 消息重入队(Requeue):当消耗者无法成功处理消息时,可以选择通过 basic_nack() 或 basic_reject() 方法将消息重新入队,使 RabbitMQ 将消息重新投递给另一个消耗者。
- 消息处理失败后的重试:在消耗失败时,可以设置重试逻辑来尝试重新处理消息。比方,可以将失败的消息延迟一定时间后重新放入队列,再次交由消耗者处理。
利用死信队列(Dead Letter Queue, DLQ):通过设置 TTL(消息存活时间)大概最大重试次数,超时或多次处理失败的消息可以被路由到死信队列。这样可以避免消息丢失,同时为后续手动处理这些“题目消息”提供了保障。
消息重复消耗的缘故原由
RabbitMQ 出现消息重复消耗的缘故原由主要有以下几点:
- 消息确认机制异常:
- RabbitMQ 依赖消耗者手动确认消息(ACK)。如果消耗者没有及时确认,RabbitMQ 会重新将消息投递给其他消耗者。
- 在利用自动确认模式(auto-ack)时,可能在消耗者未完全处理消息时,RabbitMQ 已经认为消耗成功,导致消耗者重启或失败后重新消耗。
- 网络题目:
- 消息确认(ACK)时,网络故障可能导致 RabbitMQ 没有收到确认信号,认为消耗失败,重新将消息投递,导致重复消耗。
- 消耗者宕机或重启:
- 如果消耗者在处理消息时宕机或重启,而消息未被确认,RabbitMQ 会重新发送该消息。
- 消耗失败重试:
- 当消耗失败时,消息可能被重新投递给消耗者处理,导致重复消耗。
办理消息重复消耗的思路
虽然 RabbitMQ 的消息投递模型可能导致消息重复消耗,但通过以下几种方法可以有效办理这一题目:
1. 业务幂等性
幂等性是办理消息重复消耗的核心。无论消息被消耗多少次,业务处理的效果应该是一样的。通过以下计谋实现业务的幂等性:
- 利用全局唯一的业务 ID:
- 每条消息带有一个唯一的业务标识符,消耗者处理消息时先检查该标识符是否已处理过。如果已处理,则跳过该消息。
- 这种方式常通过数据库或缓存(如 Redis)来维护处理记录,避免重复处理。
示例伪代码:
- String messageId = message.getBusinessId();
- if (isProcessed(messageId)) {
- return; // 已处理,跳过
- }
- processMessage(message);
- markAsProcessed(messageId); // 处理完成后记录处理状态
复制代码
- 数据库唯一约束:
- 利用数据库的唯一性约束(如唯一索引)来防止数据重复写入。比方,在插入操纵时对某个字段(如订单号)设置唯一索引,确保数据库层面不会重复插入。
- 乐观锁或版本控制:
- 在更新数据时,可以利用乐观锁或版本号控制,避免由于消息重复消耗而导致数据不一致。
消息堆积的缘故原由
消息堆积的产生通常与以下几种情况有关:
- 消耗者处理能力不敷:
- 当消耗者的处理能力(消耗速度)小于生产者的消息生产速度时,消息会堆积在队列中。常见缘故原由包罗消耗者业务逻辑过于复杂、消耗者的硬件资源不敷等。
- 瞬时消息流量激增:
- 在系统中某个时间点忽然有大量消息产生,短时间内消耗者无法处理完所有消息,导致消息在队列中堆积。这种情况通常发生在高峰期大概大规模批量处理场景中。
- 消耗者异常或宕机:
- 当消耗者出现异常、宕机或克制消耗时,队列中的消息无法被及时处理,消息一连堆积。
- 消耗者消耗失败且重回队列:
- 如果消耗者处理消息失败,并且这些消息被重回队列(requeue),会导致同一批消息反复被投递和消耗,但无法真正被消耗成功,进而导致堆积。
- 队列配置不当:
- RabbitMQ 队列没有设置消息 TTL(逾期时间)或队列的最大长度,导致消息长期堆积,尤其是在没有消耗者的情况下。
- 网络延迟或通讯题目:
- 如果网络题目导致消息无法从队列中传递到消耗者,会导致消息渐渐堆积。
消息堆积的影响:
- 内存占用增长:消息存储在内存或磁盘中,消息量过大可能导致 RabbitMQ 节点内存或磁盘资源耗尽。
- 系统性能下降:大量消息堆积会增长队列操纵的复杂性,导致消息的发布和消耗延迟。
- RabbitMQ 节点崩溃:严峻的消息堆积可能导致 RabbitMQ 节点不可用,影响整个消息系统的稳固性。
如何办理消息堆积
扩展消耗者:
- 增长消耗者的数目或提升消耗者的并发处理能力,以提高消息消耗速度。
优化业务逻辑:
- 优化消耗者的业务处理逻辑,减少单条消息的处理时间,避免复杂的同步操纵。
利用批量消耗:
- 通过批量消耗来减少消耗者处理每条消息的开销。可以利用 basicQos 预取多个消息后举行批量处理。
设置消息逾期机制:
- 设置消息的 TTL 或队列的最大长度,确保消息在系统中不会无穷期堆积。
应用分布式消耗模型:
- 如果消息量非常大,可以思量通过水平扩展 RabbitMQ 节点、分区队列等方式来分散压力。
特性
- 高可用性: RabbitMQ 支持集群和镜像队列(mirrored queues)以确保高可用性。
- 消息确认: 确保消息被消耗且不会丢失。
- 机动的路由: 利用交换机和路由键机动地控制消息的路由。
- 多种协议支持: 除了 AMQP,RabbitMQ 还支持 STOMP、MQTT、HTTP 等协议。
- 管理界面: 提供 Web 管理界面,方便监控和管理消息队列
优缺点
- 长处:
- 强大的路由能力。
- 提供多种消息确认和持久化机制。
- 社区支持和文档丰富。
- 缺点:
- 在高吞吐量场景下性能较低。
- 对于大量消息的积压,可能会导致延迟增长。
RabbitMQ 高可用性
集群
RabbitMQ 支持集群摆设,这样多个 RabbitMQ 节点可以协同工作,提供负载均衡和容错能力。集群模式下,消息在差别的节点间传递,某个节点发生故障时,其他节点仍能继续工作。
- 集群模式特性:
- 集群中的所有节点都共享相同的用户、权限、队列定义和绑定关系。
- 队列默认只存储在声明它的节点上,但可以通过镜像队列机制复制到多个节点。
集群的优势:
- 即使某个节点宕机,整个集群仍然可以正常工作。
- 通过负载均衡分散消息处理压力。
- 通过复制和同步数据来包管数据一致性和可用性。
创建 RabbitMQ 集群:
- 可以通过 rabbitmqctl 工具将多个 RabbitMQ 节点参加到一个集群中。比方,将节点 B 参加到节点 A 的集群中:
- rabbitmqctl stop_app
- rabbitmqctl reset
- rabbitmqctl join_cluster rabbit@A
- rabbitmqctl start_app
复制代码 镜像队列(Mirrored Queues)
镜像队列是一种高可用队列,允许队列在多个 RabbitMQ 节点上举行复制。通过将队列的副本(镜像)分布到差别的节点上,确保在单个节点故障时,队列和消息仍然可用。
- 镜像队列机制:
- 在声明队列时,可以配置该队列为镜像队列,并指定它的镜像副本数目。
- 主队列和镜像队列之间及时同步,当主队列处理消息时,镜像也会同步处理。
- 如果主队列的节点宕机,RabbitMQ 会自动提升一个镜像为新的主队列,确保消息不丢失。
- 配置镜像队列:
- rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'
复制代码 这将使所有队列镜像到集群中的所有节点。
- 镜像队列计谋:
- ha-mode=all: 将队列镜像到所有节点。
- ha-mode=exactly: 指定队列镜像到特定数目的节点。
- ha-mode=nodes: 将队列镜像到指定的节点列表。
队列分片(Quorum Queues)
队列分片(Quorum Queues)是一种替换镜像队列的机制,专为高可用性和高负载场景设计。与镜像队列相比,Quorum Queues 在处理大量消息时具有更高的容错性和性能。
- Quorum Queues 特性:
- 基于 Raft 共识算法,确保消息在多个节点间的一致性和持久化。
- Quorum Queues 将消息分布到多个节点,而不是完整地复制消息。
- 更适合长时间运行、处理大量消息的高可用场景。
- Quorum Queues 的优势:
- 更好的弹性扩展性,适合处理高负载。
- 利用分布式算法管理队列的高可用性,比镜像队列更有服从。
- # 声明 Quorum Queues
- rabbitmqctl set_policy quorum-queues "^" '{"queue-type":"quorum"}'
复制代码 网络分区管理(Network Partition Handling)
在 RabbitMQ 集群中,如果网络发生分区(partition),即集群中的节点无法互相通讯,可能会导致一致性题目。为了处理这种情况,RabbitMQ 提供了几种网络分区处理计谋:
- 自动规复(Autoheal):
- 当网络分区规复时,自动选择此中一个分区作为主分区,并将其他分区规复为从节点。未规复的消息可能会丢失,但整个集群会重新回到一致的状态。
- 逼迫分区(Pause_minority):
- 暂停少数分区,继续服务多数节点,防止网络分区期间出现不一致的写操纵。
- 忽略分区(Ignore):
- RabbitMQ 继续处理所有分区,分区规复后手动办理不一致题目。这种计谋风险较大,不推荐用于生产环境。
- # 设置网络分区策略
- rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all","ha-sync-mode":"automatic"}' --apply-to queues
复制代码 节点健康监控和自动规复
- 心跳检测和毗连规复:
- RabbitMQ 利用心跳机制检测节点和客户端的毗连状态。如果某个节点或客户端超过心跳时间没有响应,RabbitMQ 会认为它已宕机并关闭毗连。这可以确保节点故障时,消耗者和生产者能够及时感知,并举行规复。
- 自动故障规复(Automatic Failover):
- 在高可用性配置下(如镜像队列或 Quorum Queues),当一个节点故障时,RabbitMQ 会自动将工作转移到其他正常运行的节点。
高可用性客户端配置
- 客户端自动重连:
- RabbitMQ 的客户端库(如 Pika、Java 的 AMQP 客户端)通常支持自动重连机制。如果客户端与 RabbitMQ 服务器的毗连断开,客户端可以尝试自动重连到集群中的其他节点。
- 负载均衡:
- 可以通过负载均衡器(如 Nginx、HAProxy)在 RabbitMQ 集群节点之间分发客户端的毗连哀求,确保客户端始终毗连到可用的 RabbitMQ 节点。
持久化机制
- 队列和消息持久化:
- 确保队列和消息都设置为持久化,这样在 RabbitMQ 节点宕机重启后,消息仍然保留在磁盘中。队列持久化可以确保 RabbitMQ 重启时,消息不会丢失。
- # 声明持久化队列
- channel.queue_declare(queue='your_queue', durable=True)
- # 发送持久化消息
- channel.basic_publish(
- exchange='your_exchange',
- routing_key='your_routing_key',
- body='your_message',
- properties=pika.BasicProperties(
- delivery_mode=2, # 设置消息持久化
- ))
复制代码 总结
RabbitMQ 通过多种机制来确保高可用性:
- 集群模式 提供了横向扩展和容错能力。
- 镜像队列 确保队列及其消息在多个节点上复制,防止单点故障。
- 队列分片(Quorum Queues) 提供了高效的队列管理和更高的容错性。
- 网络分区管理 资助处理集群中网络故障时的分区题目。
- 心跳检测与自动规复 确保节点或客户端故障时的快速检测和规复。
- 负载均衡和自动重连 提供了客户端侧的高可用支持。
- 持久化机制 确保队列和消息在 RabbitMQ 重启时不丢失。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |