深入解析RabbitMQ:消息队列的可靠性

打印 上一主题 下一主题

主题 1034|帖子 1034|积分 3102

目录
概要
解决思绪
 一.消耗者的可靠性
1.发送者重连
2.发送者确认
二.MQ的可靠性
1.数据持久化
2.Lazy Queue
三.消耗者的可靠性
1.消耗者确认机制:
 2.失败者重试机制
小结:




概要

        在分布式体系中,消息队列是一种用于实现组件间异步通信的技术。它通过提供一个临时存储消息的缓冲区,允许发送方和接收方在不同的速度下运行,从而实现解耦和缓冲。然而,随着业务需求的不停增长和体系规模的扩大,消息队列的可靠性成为了我们必要关注的紧张题目。消息丢失、延迟和序次杂乱等题目都可能影响体系的稳固性和性能。因此,怎样提高消息队列的可靠性成为了开辟者面临的紧张挑战
解决思绪

        一.消耗者的可靠性

1.发送者重连

        偶然候由于网络波动,可能会出现发送者毗连MQ失败的环境。通过配置我们可以开启失败后重连机制(如下图)
        

        注意:当网络不稳固的时间,使用重试机制可以有用提高消息发送的成功率。不过SpringAMOP提供的重试机制是阻寨式的重试,也就是说多次重试等候的过程中,当前线程是被阻塞的,会影响业务性能。
        假如对于业务性能有要求,发起禁用重试机制。假如一定要使用,请公道配置等候时长和重试次数,固然也可以考虑使用异步线程来执行发送消息的代
2.发送者确认

        SpringAMQP提供了Publisher Confirm和Publisher Return两种确认机制。开启确机制认后,当发送者发送消息给MQ后,MQ会返回确认效果给发送者。返回的效果有以下几种环境:
        (1)消息投递到了MO,但是路由失败。此时会通过PublisherReturn返回路由异常原因,然后返回ACK,告知投递成功
        (2)临时消息投递到了MO,并且入队成功,返回ACK,告知投递成功
        (3)持久消息投递到了MQ,并且入队完成持久化,返回ACK,告知投递成功
        (4)别的环境都会返回NACK,告知投递失败

        此时,我们就必要再项目中进行配置
       1. 在Application.yml中进行配置:

配置阐明:
        publisher-confirm-type有三种模式可选:
                -- none:关闭confirm机制
                -- simple:同步阻塞等候MQ的回执消息
                -- correlated:MO异步回调方式返回回执消息
2.每个RabbitTemplate只能配置一个ReturnCallback,因此必要在项目启动过程中配置:

       3. 发送消息,指定消息ID、消息ConfirmCallback

二.MQ的可靠性

        1.数据持久化

RabbitMQ的交换机,队列,消息都支持持久化
        队列持久化:我们可以在创造队列时选择:

        交换机持久化同理
        消息持久化:我们可以在发送消息时设置(默认是持久化):


2.Lazy Queue

        从RabbitMQ的3.6.0版本开始,就增加了Lazy Queue的概念,也就是性队列。
惰性队列的特征如下:
        (1)接收到消息后直接存入磁盘,不再存储到内存
        (2)消耗者要消耗消息时才会从磁盘中读取并加载到内存(可以提前缓存部门消息到内存,最多2048条
        在3.12版本后,全部队列都是LazyQueue模式,无法更改
三.消耗者的可靠性

        1.消耗者确认机制:

        消耗者确认机制(Consumer Acknowledgement)是为了确认消耗者是否成功处理消息。当消耗者处理消息结束后应该向RabbitMQ发送一个回执,告知RabbitMQ本身消息处理状态:
        ack:成功处理消息,RabbitMO从队列中删除该消息
        nack:消息处理失败,RabbitMO必要再次投递消息
        reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息

        SpringAMQP已经实现了消息确认功能。并允许我们通过配置文件选择ACK处理方式,有三种方式:
        none:不处理。即消息投递给消耗者后立刻ack,消息会立刻从MO删除。非常不安全,不发起使用    
        manual:手动模式。必要本身在业务代码中调用api,发送ack或reject,存在业务入侵,但更灵活
        auto:自动模式。SpringAMOP使用AOP对我们的消息处理逻辑做了围绕加强,当业务正常执行时则自动返回ack(
        当业务出现异常时,根据异常判断返回不同效果:
        假如是业务异常,会自动返回nack
        关闭弹幕
        假如是消息处理或校验异常,自动返回reject)

        2.失败者重试机制

        SpringAMQP提供了消耗者失败重试机制,在消耗者出现异常时使用本地重试,而不是无限的requeue到mq。我们可以通过在application.yaml文件中添加配置来开启重试机制:
        

        在开启重试模式后,重试次数耗尽,假如消息依然失败,则必要有MessageRecoverer接口来处理,它包含三种不同的实现:
        RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
        ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
        RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

3.业务幂等性
        幂等是一个数学概念,用函数表达式来描述是如许的:f(x)=f(f(x))。在程序开辟中,则是指同一个业务,执行一次或多次对业务状态的影响是一致的。
        实现方法:
1.唯一消息id
        是给每个消息都设置一个唯一id,使用id区分是否是重复消息:

每一条消息都生成一个唯一的id,与消息一起投递给消耗者。

消耗者接收到消息后处理本身的业务,业务处理成功后将消息ID生存到数据库

假如下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理


小结

        以上方法也只是保证提高消息队列的可靠性,而不能完全保证完美的可靠

若有差错,欢迎指正,谢谢!
        





免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

北冰洋以北

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表