消息队列已经逐渐成为企业IT体系内部通信的核心本事。它具有低耦合、可靠投递、广播、流量控制、最终一致性等一系列功能,成为异步RPC的紧张本事之一。 当今市面上有许多主流的消息中间件,如老牌的ActiveMQ、RabbitMQ,炙手可热的Kafka,阿里巴巴自主开辟的Notify、MetaQ、RocketMQ等。 本文不会一一先容这些消息队列的全部特性,而是探究一下自主开辟计划一个消息队列时,你必要思考和计划的紧张方面。过程中我们会参考这些成熟消息队列的许多紧张头脑。 本文首先会论述什么时候你必要一个消息队列,然后以Push模子为主,从零开始分析计划一个消息队列时必要考虑到的问题,如RPC、高可用、顺序和重复消息、可靠投递、消耗关系解析等。 也会分析以Kafka为代表的pull模子所具备的长处。末了是一些高级主题,如用批量/异步提高性能、pull模子的体系计划理念、存储子体系的计划、流量控制的计划、公平调度的实现等。其中末了四个方面会放在下篇讲解。
当你必要使用消息队列时,首先必要考虑它的必要性。可以使用mq的场景有许多,最常用的几种,是做业务解耦/最终一致性/广播/错峰流控等。反之,假如必要强一致性,关注业务逻辑的处理结果,则RPC显得更为符合。
解耦
解耦是消息队列要解决的最本质问题。所谓解耦,简单点讲就是一个事件,只关心核心的流程。而必要依赖其他体系但不那么紧张的事情,有通知即可,无需等候结果。换句话说,基于消息的模子,关心的是“通知”,而非“处理”。 比如在美团旅游,我们有一个产品中央,产品中央上游对接的是主站、移动配景、旅游供应链等各个数据源;下游对接的是筛选体系、API体系等展示体系。当上游的数据发生变更的时候,假如不使用消息体系,势必要调用我们的接口来更新数据,就特别依赖产品中央接口的稳固性和处理能力。但其实,作为旅游的产品中央,也许只有对于旅游自建供应链,产品中央更新成功才是他们关心的事情。而对于团购等外部体系,产品中央更新成功也好、失败也罢,并不是他们的职责所在。他们只必要保证在信息变更的时候通知到我们就好了。 而我们的下游,大概有更新索引、刷新缓存等一系列需求。对于产品中央来说,这也不是我们的职责所在。说白了,假如他们定时来拉取数据,也能保证数据的更新,只是实时性没有那么强。但使用接口方式去更新他们的数据,显然对于产品中央来说太过于“重量级”了,只必要发布一个产品ID变更的通知,由下游体系来处理,大概更为公道。 再举一个例子,对于我们的订单体系,订单最终付出成功之后大概必要给用户发送短信积分什么的,但其实这已经不是我们体系的核心流程了。假如外部体系速率偏慢(比如短信网关速率欠好),那么主流程的时间会加长许多,用户肯定不希望点击付出过好几分钟才看到结果。那么我们只必要通知短信体系“我们付出成功了”,不肯定非要等候它处理完成。
最终一致性
最终一致性指的是两个体系的状态保持一致,要么都成功,要么都失败。当然有个时间限定,理论上越快越好,但实际上在各种非常的情况下,大概会有肯定耽误到达最终一致状态,但末了两个体系的状态是一样的。 业界有一些为“最终一致性”而生的消息队列,如Notify(阿里)、QMQ(去哪儿)等,其计划初衷,就是为了生意业务体系中的高可靠通知。 以一个银行的转账过程来明白最终一致性,转账的需求很简单,假如A体系扣钱成功,则B体系加钱肯定成功。反之则一起回滚,像什么都没发生一样。 然而,这个过程中存在许多大概的意外: 1. A扣钱成功,调用B加钱接口失败。 2. A扣钱成功,调用B加钱接口虽然成功,但获取最闭幕果时网络非常引起超时。 3. A扣钱成功,B加钱失败,A想回滚扣的钱,但A呆板down机。
可见,想把这件看似简单的事真正做成,真的不那么轻易。全部跨VM的一致性问题,从技术的角度讲通用的解决方案是:
- 强一致性,分布式事件,但落地太难且成本太高,后文会详细提到。
- 最终一致性,紧张是用“记录”和“补偿”的方式。在做全部的不确定的事情之前,先把事情记录下来,然后去做不确定的事情,结果大概是:成功、失败或是不确定,“不确定”(例如超时等)可以等价为失败。成功就可以把记录的东西清理掉了,对于失败和不确定,可以依靠定时任务等方式把全部失败的事情重新搞一遍,直到成功为止。 回到刚才的例子,体系在A扣钱成功的情况下,把要给B“通知”这件事记录在库里(为了保证最高的可靠性可以把通知B体系加钱和扣钱成功这两件事维护在一个当地事件里),通知成功则删除这条记录,通知失败或不确定则依靠定时任务补偿性地通知我们,直到我们把状态更新成正确的为止。 整个这个模子依然可以基于RPC来做,但可以抽象成一个同一的模子,基于消息队列来做一个“企业总线”。 详细来说,当地事件维护业务变化和通知消息,一起落地(失败则一起回滚),然后RPC到达broker,在broker成功落地后,RPC返回成功,当地消息可以删除。否则当地消息不停靠定时任务轮询不断重发,如许就保证了消息可靠落地broker。 broker往consumer发送消息的过程雷同,不停发送消息,直到consumer发送消耗成功确认。 我们先不剖析重复消息的问题,通过两次消息落地加补偿,下游是肯定可以收到消息的。然后依赖状态机版本号等方式做判重,更新自己的业务,就实现了最终一致性。
最终一致性不是消息队列的必备特性,但确实可以依靠消息队列来做最终一致性的事情。另外,全部不保证100%不丢消息的消息队列,理论上无法实现最终一致性。好吧,应该说理论上的100%,清除体系严重故障和bug。 像Kafka一类的计划,在计划层面上就有丢消息的大概(比如定时刷盘,假如掉电就会丢消息)。哪怕只丢千分之一的消息,业务也必须用其他的本事来保证结果正确。
广播
消息队列的根本功能之一是举行广播。假如没有消息队列,每当一个新的业务方接入,我们都要联调一次新接口。有了消息队列,我们只必要关心消息是否送达了队列,至于谁希望订阅,是下游的事情,无疑极大地减少了开辟和联调的工作量。 比如本文开始提到的产品中央发布产品变更的消息,以及景点库许多去重更新的消息,大概“关心”方有许多个,但产品中央和景点库只必要发布变更消息即可,谁关心谁接入。
错峰与流控
试想上下游对于事情的处理能力是不同的。比如,Web前端每秒承受上千万的请求,并不是什么神奇的事情,只必要加多一点呆板,再搭建一些LVS负载平衡装备和Nginx等即可。但数据库的处理能力却十分有限,即使使用SSD加分库分表,单机的处理能力仍然在万级。由于成本的考虑,我们不能奢求数据库的呆板数目追上前端。 这种问题同样存在于体系和体系之间,如短信体系大概由于短板效应,速率卡在网关上(每秒几百次请求),跟前端的并发量不是一个数目级。但用户晚上个半分钟左右收到短信,一样平常是不会有太大问题的。假如没有消息队列,两个体系之间通过协商、滑动窗口等复杂的方案也不是说不能实现。但体系复杂性指数级增长,势必在上游大概下游做存储,并且要处理定时、拥塞等一系列问题。而且每当有处理能力有差距的时候,都必要单独开辟一套逻辑来维护这套逻辑。以是,使用中间体系转储两个体系的通信内容,并在下游体系有能力处理这些消息的时候,再处理这些消息,是一套相对较通用的方式。
总而言之,消息队列不是万能的。对于必要强事件保证而且耽误敏感的,RPC是优于消息队列的。 对于一些无关痛痒,大概对于别人非常紧张但是对于自己不是那么关心的事情,可以使用消息队列去做。 支持最终一致性的消息队列,可以或许用来处理耽误不那么敏感的“分布式事件”场景,而且相对于笨重的分布式事件,大概是更优的处理方式。 当上下游体系处理能力存在差距的时候,使用消息队列做一个通用的“漏斗”。在下游有能力处理的时候,再举行分发。 假如下游有许多体系关心你的体系发出的通知的时候,果断地使用消息队列吧。
综述
我们如今明确了消息队列的使用场景,下一步就是怎样计划实现一个消息队列了。
基于消息的体系模子,不肯定必要broker(消息队列服务端)。市面上的的Akka(actor模子)、ZeroMQ等,其实都是基于消息的体系计划范式,但是没有broker。 我们之以是要计划一个消息队列,并且配备broker,无外乎要做两件事情:
- 消息的转储,在更符合的时间点投递,大概通过一系列本事辅助消息最终能送达消耗机。
- 规范一种范式和通用的模式,以满足解耦、最终一致性、错峰等需求。 掰开了揉碎了看,最简单的消息队列可以做成一个消息转发器,把一次RPC做成两次RPC。发送者把消息投递到服务端(以下简称broker),服务端再将消息转发一手到吸取端,就是这么简单。
一样平常来讲,计划消息队列的整体思绪是先build一个整体的数据流,例如producer发送给broker,broker发送给consumer,consumer复兴消耗确认,broker删除/备份消息等。 使用RPC将数据流串起来。然后考虑RPC的高可用性,尽量做到无状态,方便水平扩展。 之后考虑怎样承载消息堆积,然后在符合的机遇投递消息,而处理堆积的最佳方式,就是存储,存储的选型必要综合考虑性能/可靠性和开辟维护成本等诸多因素。 为了实现广播功能,我们必须要维护消耗关系,可以使用zk/config server等保存消耗关系。 在完成了上述几个功能后,消息队列根本就实现了。然后我们可以考虑一些高级特性,如可靠投递,事件特性,性能优化等。 下面我们会以计划消息队列时重点考虑的模块为主线,穿插灌输一些消息队列的特性实现方法,来详细分析计划实现一个消息队列时的方方面面。
实现队列根本功能
RPC通信协议
刚才讲到,所谓消息队列,无外乎两次RPC加一次转储,当然必要消耗端最终做消耗确认的情况是三次RPC。既然是RPC,就必然牵扯出一系列话题,什么负载平衡啊、服务发现啊、通信协议啊、序列化协议啊,等等。在这一块,我的强烈建议是不要重复造轮子。使用公司现有的RPC框架:Thrift也好,Dubbo也好,大概是其他自定义的框架也好。因为消息队列的RPC,和平凡的RPC没有本质区别。当然了,自主使用Memchached大概Redis协议重新写一套RPC框架并非不可(如MetaQ使用了自己封装的Gecko NIO框架,卡夫卡也用了雷同的协议)。但实现成本和难度无疑倍增。清除对效率的极端要求,都可以使用现成的RPC框架。 简单来讲,服务端提供两个RPC服务,一个用来吸取消息,一个用来确认消息收到。并且做到不管哪个server收到消息和确认消息,结果一致即可。当然这中间大概还涉及跨IDC的服务的问题。这里和RPC的原则是一致的,尽量优先选择本机房投递。你大概会问,假如producer和consumer本身就在两个机房了,怎么办?首先,broker必须保证感知的到全部consumer的存在。其次,producer尽量选择就近的机房就好了。
高可用
其实全部的高可用,是依赖于RPC和存储的高可用来做的。先来看RPC的高可用,美团的基于MTThrift的RPC框架,阿里的Dubbo等,其本身就具有服务自动发现,负载平衡等功能。而消息队列的高可用,只要保证broker接受消息和确认消息的接口是幂等的,并且consumer的几台呆板处理消息是幂等的,如许就把消息队列的可用性,转交给RPC框架来处理了。 那么怎么保证幂等呢?最简单的方式莫过于共享存储。broker多呆板共享一个DB大概一个分布式文件/kv体系,则处理消息自然是幂等的。就算有单点故障,其他节点可以立即顶上。另外failover可以依赖定时任务的补偿,这是消息队列本身天然就可以支持的功能。存储体系本身的可用性我们不必要操太多心,放心大胆的交给DBA们吧! 对于不共享存储的队列,如Kafka使用分区加主备模式,就略微麻烦一些。必要保证每一个分区内的高可用性,也就是每一个分区至少要有一个主备且必要做数据的同步,关于这块HA的细节,可以参考下篇pull模子消息体系计划。
服务端承载消息堆积的能力
消息到达服务端假如不颠末任何处理就到吸取者了,broker就失去了它的意义。为了满足我们错峰/流控/最终可达等一系列需求,把消息存储下来,然后选择机遇投递就显得是顺理成章的了。 只是这个存储可以做成许多方式。比如存储在内存里,存储在分布式KV里,存储在磁盘里,存储在数据库里等等。但归结起来,紧张有长期化和非长期化两种。 长期化的情势能更大程度地保证消息的可靠性(如断电等不可抗外力),并且理论上能承载更大限度的消息堆积(外存的空间远大于内存)。 但并不是每种消息都必要长期化存储。许多消息对于投递性能的要求大于可靠性的要求,且数目极大(如日志)。这时候,消息不落地直接暂存内存,尝试几次failover,最终投递出去也未尝不可。 市面上的消息队列广泛两种情势都支持。当然详细的场景还要详细联合公司的业务来看。
存储子体系的选择
我们来看看假如必要数据落地的情况下各种存储子体系的选择。理论上,从速率来看,文件体系>分布式KV(长期化)>分布式文件体系>数据库,而可靠性却截然相反。还是要从支持的业务场景出发作出最公道的选择,假如你们的消息队列是用来支持付出/生意业务等对可靠性要求非常高,但对性能和量的要求没有这么高,而且没有时间精力专门做文件存储体系的研究,DB是最好的选择。 但是DB受制于IOPS,假如要求单broker 5位数以上的QPS性能,基于文件的存储是比力好的解决方案。整体上可以采用数据文件+索引文件的方式处理,详细这块的计划比力复杂,可以参考下篇的存储子体系计划。 分布式KV(如MongoDB,HBase)等,大概长期化的Redis,由于其编程接口较友好,性能也比力可观,假如在可靠性要求不是那么高的场景,也不失为一个不错的选择。
消耗关系解析
如今我们的消息队列开端具备了转储消息的能力。下面一个紧张的事情就是解析发送吸取关系,举行正确的消息投递了。 市面上的消息队列定义了一堆让人晕头转向的名词,如JMS 规范中的Topic/Queue,Kafka里面的Topic/Partition/ConsumerGroup,RabbitMQ里面的Exchange等等。抛开现象看本质,无外乎是单播与广播的区别。所谓单播,就是点到点;而广播,是一点对多点。当然,对于互联网的大部分应用来说,组间广播、组内单播是最常见的情形。 消息必要通知到多个业务集群,而一个业务集群内有许多台呆板,只要一台呆板消耗这个消息就可以了。 当然这不是绝对的,许多时候组内的广播也是有实用场景的,如当地缓存的更新等等。另外,消耗关系除了组内组间,大概会有多级树状关系。这种情况太过于复杂,一样平常不列入考虑范围。以是,一样平常比力通用的计划是支持组间广播,不同的组注册不同的订阅。组内的不同呆板,假如注册一个相同的ID,则单播;假如注册不同的ID(如IP地址+端口),则广播。 至于广播关系的维护,一样平常由于消息队列本身都是集群,以是都维护在公共存储上,如config server、zookeeper等。维护广播关系所要做的事情根本是一致的:
队列高级特性计划
上面都是些消息队列根本功能的实现,下面来看一些关于消息队列特性相关的内容,不管可靠投递/消息丢失与重复以及事件乃至于性能,不是每个消息队列都会照顾到,以是要依照业务的需求,来仔细衡量各种特性实现的成本,利弊,最终做出最为公道的计划。
可靠投递(最终一致性)
这是个激动人心的话题,完全不丢消息,究竟可不大概?答案是,完全大概,条件是消息大概会重复,并且,在非常情况下,要接受消息的耽误。 方案说简单也简单,就是每当要发生不可靠的事情(RPC等)之前,先将消息落地,然后发送。当失败大概不知道成功失败(比如超时)时,消息状态是待发送,定时任务不停轮询全部待发送消息,最终肯定可以送达。 详细来说:
- producer往broker发送消息之前,必要做一次落地。
- 请求到server后,server确保数据落地后再告诉客户端发送成功。
- 支持广播的消息队列必要对每个待发送的endpoint,长期化一个发送状态,直到全部endpoint状态都OK才可删除消息。
对于各种不确定(超时、down机、消息没有送达、送达后数据清除地、数据落地了复兴没收到),其实对于发送方来说,都是一件事情,就是消息没有送达。 重推消息所面对的问题就是消息重复。重复和丢失就像两个噩梦,你必须要面对一个。好在消息重复尚有处理的机会,消息丢失再想找回就难了。 Anyway,作为一个成熟的消息队列,应该尽量在各个环节减少重复投递的大概性,不能因为重复有解决方案就放纵的乱投递。 末了说一句,不是全部的体系都要求最终一致性大概可靠投递,比如一个论坛体系、一个雇用体系。一个重复的简历或话题被发布,大概比丢失了一个发布显得更让用户无法接受。不断重复一句话,任何底子组件要服务于业务场景。
消耗确认
当broker把消息投递给消耗者后,消耗者可以立即响应我收到了这个消息。但收到了这个消息只是第一步,我能不能处理这个消息却不肯定。或许因为消耗能力的问题,体系的负荷已经不能处理这个消息;大概是刚才状态机里面提到的消息不是我想要吸取的消息,自动要求重发。 把消息的送达和消息的处理分开,如许才真正的实现了消息队列的本质-解耦。以是,允许消耗者自动举行消耗确认是必要的。当然,对于没有特别逻辑的消息,默认Auto Ack也是可以的,但肯定要允许消耗方自动ack。 对于正确消耗ack的,没什么特别的。但是对于reject和error,必要特别说明。reject这件事情,往往业务方是无法感知到的,体系的流量和健康状况的评估,以及处理能力的评估是一件非常复杂的事情。举个极端的例子,收到一个消息开始build索引,大概这个消息要处理半个小时,但消息量却是非常的小。以是reject这块建议做成滑动窗口/线程池雷同的模子来控制, 消耗能力不匹配的时候,直接拒绝,过一段时间重发,减少业务的负担。 但业务出错这件事情是只有业务方自己知道的,就像上文提到的状态机等等。这时应该允许业务方自动ack error,并可以与broker约定下次投递的时间。
重复消息温顺序消息
上文谈到重复消息是不大概100%避免的,除非可以允许丢失,那么,顺序消息可否100%满足呢? 答案是可以,但条件更为苛刻:
- 允许消息丢失。
- 从发送方到服务方到接受者都是单点单线程。
以是绝对的顺序消息根本上是不能实现的,当然在METAQ/Kafka等pull模子的消息队列中,单线程生产/消耗,清除消息丢失,也是一种顺序消息的解决方案。 一样平常来讲,一个主流消息队列的计划范式里,应该是不丢消息的条件下,尽量减少重复消息,不保证消息的投递顺序。 谈到重复消息,紧张是两个话题:
- 怎样鉴别消息重复,并幂等的处理重复消息。
- 一个消息队列怎样尽量减少重复消息的投递。
先来看看第一个话题,每一个消息应该有它的唯一身份。不管是业务方自定义的,还是根据IP/PID/时间戳生成的MessageId,假如有地方记录这个MessageId,消息到来是可以或许举行比对就 能完成重复的判定。数据库的唯一键/bloom filter/分布式KV中的key,都是不错的选择。由于消息不能被永久存储,以是理论上都存在消息从长期化存储移除的瞬间上游还在投递的大概(上游因种种缘故起因投递失败,不停重试,都到了下游清理消息的时间)。这种事情都是非常情况下才会发生的,究竟是小众情况。两分钟消息都还没送达,多送一次又能怎样呢?幂等的处理消息是一门艺术,因为种种缘故起因重复消息大概错乱的消息还是来到了,说两种通用的解决方案: 1. 版本号。 2. 状态机。
版本号
举个简单的例子,一个产品的状态有上线/下线状态。假如消息1是下线,消息2是上线。不巧消息1判重失败,被投递了两次,且第二次发生在2之后,假如不做重复性判断,显然最终状态是错误的。 但是,假如每个消息自带一个版本号。上游发送的时候,标志消息1版本号是1,消息2版本号是2。假如再发送下线消息,则版本号标志为3。下游对于每次消息的处理,同时维护一个版本号。 每次只接受比当前版本号大的消息。初始版本为0,当消息1到达时,将版本号更新为1。消息2到来时,因为版本号>1.可以吸取,同时更新版本号为2.当另一条下线消息到来时,假如版本号是3.则是真实的下线消息。假如是1,则是重复投递的消息。 假如业务方只关心消息重复不重复,那么问题就已经解决了。但许多时候另一个头疼的问题来了,就是消息顺序假如和想象的顺序不一致。比如应该的顺序是12,到来的顺序是21。则末了会发生状态错误。 参考TCP/IP协议,假如想让乱序的消息末了可以或许正确的被构造,那么就应该只吸取比当前版本号大一的消息。并且在一个session周期内要不停保存各个消息的版本号。 假如到来的顺序是21,则先把2存起来,待1到来后,先处理1,再处理2,如许重复性温顺序性要求就都到达了。
状态机
基于版本号来处理重复温顺序消息听起来是个不错的主意,但凡事总有瑕疵。使用版本号的最大问题是:
- 对发送方必须要求消息带业务版本号。
- 下游必须存储消息的版本号,对于要严酷保证顺序的。
还不能只存储最新的版本号的消息,要把乱序到来的消息都存储起来。而且必须要对此做出处理。试想一个永不逾期的”session”,比如一个物品的状态,会不停流转于上下线。那么中间环节的全部存储 就必须保留,直到在某个版本号之前的版本一个不丢的到来,成本太高。 就刚才的场景看,假如消息没有版本号,该怎么解决呢?业务方只必要自己维护一个状态机,定义各种状态的流转关系。例如,”下线”状态只允许吸取”上线”消息,“上线”状态只能吸取“下线消息”,假如上线收到上线消息,大概下线收到下线消息,在消息不丢失和上游业务正确的条件下。要么是消息发重了,要么是顺序到达反了。这时消耗者只必要把“我不能处理这个消息”告诉投递者,要求投递者过一段时间重发即可。而且重发肯定要有次数限定,比如5次,避免死循环,就解决了。 举例子说明,假设产品本身状态是下线,1是上线消息,2是下线消息,3是上线消息,正常情况下,消息应该的到来顺序是123,但实际情况下收到的消息状态酿成了3123。 那么下游收到3消息的时候,判断状态机流转是下线->上线,可以吸取消息。然后收到消息1,发现是上线->上线,拒绝吸取,要求重发。然后收到消息2,状态是上线->下线,于是吸取这个消息。 此时无论重发的消息1大概3到来,还是可以吸取。另外的重发,在肯定次数拒绝后停止重发,业务正确。
中间件对于重复消息的处理
回归到消息队列的话题来讲。上述通用的版本号/状态机/ID判重解决方案里,哪些是消息队列该做的、哪些是消息队列不应做业务方处理的呢?其实这里没有一个完全严酷的定义,但回到我们的出发点,我们保证不丢失消息的情况下尽量少重复消息,消耗顺序不保证。那么重复消息下和乱序消息下业务的正确,应该是由消耗方保证的,我们要做的是减少消息发送的重复。 我们无法定义业务方的业务版本号/状态机,假如API里强制必要指定版本号,则显得过于绑架客户了。况且,在消耗方维护这么多状态,就涉及到一个消耗方的消息落地/多机间的同步消耗状态问题,复杂度指数级上升,而且只能解决部分问题。 减少重复消息的关键步调:
- broker记录MessageId,直到投递成功后扫除,重复的ID到来不做处理,如许只要发送者在扫除周期内可以或许感知到消息投递成功,就根本不会在server端产生重复消息。
- 对于server投递到consumer的消息,由于不确定对端是在处理过程中还是消息发送丢失的情况下,有必要记录下投递的IP地址。决定重发之前扣问这个IP,消息处理成功了吗?假如扣问无果,再重发。
事件
长期性是事件的一个特性,然而只满足长期性却不肯定能满足事件的特性。还是拿扣钱/加钱的例子讲。满足事件的一致性特征,则必须要么都不举行,要么都能成功。 解决方案从大方向上有两种:
- 两阶段提交,分布式事件。
- 当地事件,当地落地,补偿发送。
分布式事件存在的最大问题是成本太高,两阶段提交协议,对于仲裁down机大概单点故障,险些是一个无解的黑洞。对于生意业务密集型大概I/O密集型的应用,没有办法承受这么高的网络耽误,体系复杂性。 并且成熟的分布式事件肯定构建与比力靠谱的商用DB和商用中间件上,成本也太高。 那怎样使用当地事件解决分布式事件的问题呢?以当地和业务在一个数据库实例中建表为例子,与扣钱的业务操纵同一个事件里,将消息插入当地数据库。假如消息入库失败,则业务回滚;假如消息入库成功,事件提交。 然后发送消息(注意这里可以实时发送,不必要等定时任务检出,以提高消息实时性)。以后的问题就是前文的最终一致性问题所提到的了,只要消息没有发送成功,就不停靠定时任务重试。 这里有一个关键的点,当地事件做的,是业务落地和消息落地的事件,而不是业务落地和RPC成功的事件。这里许多人轻易混淆,假如是后者,无疑是事件嵌套RPC,是大忌,会有长事件死锁等各种风险。 而消息只要成功落地,很大程度上就没有丢失的风险(磁盘物理破坏除外)。而消息只要投递到服务端确认后当地才做删除,就完成了producer->broker的可靠投递,并且当消息存储非常时,业务也是可以回滚的。 当地事件存在两个最大的使用障碍:
- 配置较为复杂,“绑架”业务方,必须当地数据库实例提供一个库表。
- 对于消息耽误高敏感的业务不实用。
话说回来,不是每个业务都必要强事件的。扣钱和加钱必要事件保证,但下单和生成短信却不必要事件,不能因为要求发短信的消息存储投递失败而要求下单业务回滚。以是,一个完整的消息队列应该定义清楚自己可以投递的消息类型,如事件型消息,当地非长期型消息,以及服务端不落地的非可靠消息等。对不同的业务场景做不同的选择。另外事件的使用应该尽量低成本、透明化,可以依托于现有的成熟框架,如Spring的声明式事件做扩展。业务方只必要使用@Transactional标签即可。
性能相关
异步/同步
首先澄清一个概念,异步,同步和oneway是三件事。异步,归根结底你还是必要关心结果的,但大概不是当时的时间点关心,可以用轮询大概回调等方式处理结果;同步是必要当时关心 的结果的;而oneway是发出去就不管死活的方式,这种对于某些完全对可靠性没有要求的场景还是实用的,但不是我们重点讨论的范畴。 回归来看,任何的RPC都是存在客户端异步与服务端异步的,而且是可以任意组合的:客户端同步对服务端异步,客户端异步对服务端异步,客户端同步对服务端同步,客户端异步对服务端同步。 对于客户端来说,同步与异步紧张是拿到一个Result,还是Future(Listenable)的区别。实现方式可以是线程池,NIO大概其他事件机制,这里先不睁开讲。 服务端异步大概稍微难懂白一点,这个是必要RPC协议支持的。参考servlet 3.0规范,服务端可以吐一个future给客户端,并且在future done的时候通知客户端。 整个过程可以参考下面的代码:
客户端同步服务端异步。
- Future<Result> future = request(server);//server立刻返回future
- synchronized(future){
- while(!future.isDone()){
- future.wait();//server处理结束后会notify这个future,并修改isdone标志
- }
- }
- return future.get();
复制代码 客户端同步服务端同步。
- Result result = request(server);
复制代码 客户端异步服务端同步(这里用线程池的方式)。
- Future<Result> future = executor.submit(new Callable(){public void call<Result>(){
- result = request(server);
- }})
- return future;
复制代码 客户端异步服务端异步。
- Future<Result> future = request(server);//server立刻返回future
- return future
复制代码 上面说了这么多,其实是想让各人脱离两个误区:
- RPC只有客户端能做异步,服务端不能。
- 异步只能通过线程池。
那么,服务端使用异步最大的好处是什么呢?说到底,是解放了线程和I/O。试想服务端有一堆I/O等候处理,假如每个请求都必要同步响应,每条消息都必要结果立即返回,那么就险些没法做I/O合并 (当然接口可以计划成batch的,但大概batch发过来的仍然数目较少)。而假如用异步的方式返回给客户端future,就可以有机会举行I/O的合并,把几个批次发过来的消息一起落地(这种合并对于MySQL等允许batch insert的数据库结果尤其显着),并且彻底开释了线程。不至于说来多少请求开多少线程,可以或许支持的并发量直线提高。 来看第二个误区,返回future的方式不肯定只有线程池。换句话说,可以在线程池里面举行同步操纵,也可以举行异步操纵,也可以不使用线程池使用异步操纵(NIO、事件)。 回到消息队列的议题上,我们当然不希望消息的发送阻塞主流程(前面提到了,server端假如使用异步模子,则大概因消息合并带来肯定程度上的消息耽误),以是可以先使用线程池提交一个发送请求,主流程继承往下走。 但是线程池中的请求关心结果吗?Of course,必须等候服务端消息成功落地,才算是消息发送成功。以是这里的模子,正确地说事客户端半同步半异步(使用线程池不阻塞主流程,但线程池中的任务必要等候server端的返回),server端是纯异步。客户端的线程池wait在server端吐回的future上,直到server端处理完毕,才解除阻塞继承举行。 总结一句,同步可以或许保证结果,异步可以或许保证效率,要公道的联合才气做到最好的效率。
批量
谈到批量就不得不提生产者消耗者模子。但生产者消耗者模子中最大的痛点是:消耗者到底应该何时举行消耗。大处着眼来看,消耗动作都是事件驱动的。紧张事件包括:
- 攒够了肯定数目。
- 到达了肯定时间。
- 队列里有新的数据到来。
对于及时性要求高的数据,可用采用方式3来完成,比如客户端向服务端投递数据。只要队列有数据,就把队列中的全部数据刷出,否则将自己挂起,等候新数据的到来。 在第一次把队列数据往外刷的过程中,又积攒了一部分数据,第二次又可以形成一个批量。伪代码如下:
- Executor executor = Executors.newFixedThreadPool(4);
- final BlockingQueue<Message> queue = new ArrayBlockingQueue<>();
- private Runnable task = new Runnable({//这里由于共享队列,Runnable可以复用,故做成全局的
- public void run(){
- List<Message> messages = new ArrayList<>(20);
- queue.drainTo(messages,20);
- doSend(messages);//阻塞,在这个过程中会有新的消息到来,如果4个线程都占满,队列就有机会囤新的消息
- }
- });
- public void send(Message message){
- queue.offer(message);
- executor.submit(task)
- }
复制代码 这种方式是消息耽误和批量的一个比力好的平衡,但优先响应低耽误。耽误的最高程度由上一次发送的等候时间决定。但大概造成的问题是发送过快的话批量的大小不够满足性能的极致。
- Executor executor = Executors.newFixedThreadPool(4);
- final BlockingQueue<Message> queue = new ArrayBlockingQueue<>();
- volatile long last = System.currentMills();
- Executors.newSingleThreadScheduledExecutor().submit(new Runnable(){
- flush();
- },500,500,TimeUnits.MILLS);
- private Runnable task = new Runnable({//这里由于共享队列,Runnable可以复用,顾做成全局的。
- public void run(){
- List<Message> messages = new ArrayList<>(20);
- queue.drainTo(messages,20);
- doSend(messages);//阻塞,在这个过程中会有新的消息到来,如果4个线程都占满,队列就有机会屯新的消息。
- }
- });
- public void send(Message message){
- last = System.currentMills();
- queue.offer(message);
- flush();
- }
- private void flush(){
- if(queue.size>200||System.currentMills()-last>200){
- executor.submit(task)
- }
- }
复制代码 相反对于可以用适量的耽误来调换高性能的场景来说,用定时/定量二选一的方式大概会更为理想,既到达肯定数目才发送,但假如数目不停达不到,也不能干等,有一个时间上限。 详细说来,在上文的submit之前,多判断一个时间和数目,并且Runnable内部维护一个定时器,避免没有新任务到来时旧的任务永远没有机会触发发送条件。对于server端的数据落地,使用这种方式就非常方便。
末了啰嗦几句,曾经有人问我,为什么网络请求小包合并成大包会提高性能?紧张缘故起因有两个:
- 减少无谓的请求头,假如你每个请求只有几字节,而头却有几十字节,无疑效率非常低下。
- 减少复兴的ack包个数。把请求合并后,ack包数目必然减少,确认和重发的成本就会低落。
push还是pull
上文提到的消息队列,大多是针对push模子的计划。如今市面上有许多经典的也比力成熟的pull模子的消息队列,如Kafka、MetaQ等。这跟JMS中传统的push方式有很大的区别,可谓另辟蹊径。 我们简要分析下push和pull模子各自存在的利弊。
慢消耗
慢消耗无疑是push模子最大的致命伤,穿成流水线来看,假如消耗者的速率比发送者的速率慢许多,势必造成消息在broker的堆积。假设这些消息都是有用的无法抛弃的,消息就要不停在broker端保存。当然这还不是最致命的,最致命的是broker给consumer推送一堆consumer无法处理的消息,consumer不是reject就是error,然后往返踢皮球。 反观pull模式,consumer可以按需消耗,不用担心自己处理不了的消息来骚扰自己,而broker堆积消息也会相对简单,无需记录每一个要发送消息的状态,只必要维护全部消息的队列和偏移量就可以了。以是对于创建索引等慢消耗,消息量有限且到来的速率不匀称的情况,pull模式比力符合。
消息耽误与忙等
这是pull模式最大的短板。由于自动权在消耗方,消耗方无法正确地决定何时去拉取最新的消息。假如一次pull取到消息了还可以继承去pull,假如没有pull取到则必要等候一段时间重新pull。 但等候多久就很难判定了。你大概会说,我可以有xx动态pull取时间调解算法,但问题的本质在于,有没有消息到来这件事情决定权不在消耗方。也许1分钟内一连来了1000条消息,然后半个小时没有新消息产生, 大概你的算法算出下次最有大概到来的时间点是31分钟之后,大概60分钟之后,结果下条消息10分钟后到了,是不是很让人沮丧? 当然也不是说耽误就没有解决方案了,业界较成熟的做法是从短时间开始(不会对broker有太大负担),然后指数级增长等候。比如开始等5ms,然后10ms,然后20ms,然后40ms……直到有消息到来,然后再回到5ms。 即使如许,依然存在耽误问题:假设40ms到80ms之间的50ms消息到来,消息就耽误了30ms,而且对于半个小时来一次的消息,这些开销就是白白浪费的。 在阿里的RocketMq里,有一种优化的做法-长轮询,来平衡推拉模子各自的缺点。根本思绪是:消耗者假如尝试拉取失败,不是直接return,而是把毗连挂在那边wait,服务端假如有新的消息到来,把毗连notify起来,这也是不错的思绪。但海量的长毗连block对体系的开销还是不容小觑的,还是要公道的评估时间间隔,给wait加一个时间上限比力好~
顺序消息
假如push模式的消息队列,支持分区,单分区只支持一个消耗者消耗,并且消耗者只有确认一个消息消耗后才气push送另外一个消息,还要发送者保证全局顺序唯一,听起来也能做顺序消息,但成本太高了,尤其是必须每个消息消耗确认后才气发下一条消息,这对于本身堆积能力和慢消耗就是瓶颈的push模式的消息队列,简直是一场劫难。 反观pull模式,假如想做到全局顺序消息,就相对轻易许多:
- producer对应partition,并且单线程。
- consumer对应partition,消耗确认(或批量确认),继承消耗即可。
以是对于日志push送这种最好全局有序,但允许出现小误差的场景,pull模式非常符合。假如你不想看到通篇乱套的日志~~ Anyway,必要顺序消息的场景还是比力有限的而且成本太高,请慎重考虑。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |