MQ概览及Kafka详解

打印 上一主题 下一主题

主题 553|帖子 553|积分 1659

概览

MQ 即 messagequeue 消息队列,是分布式体系的重要组件,主要办理异步消息,应用解耦,消峰等问题。从而实现高可用,高性能,可伸缩和终极一致性的架构。利用较多的MQ有:activeMQ,rabbitMQ,kafka,metaMQ。
MQ长处



  • 异步消息处理:可以将一些非核心流程,如日记,短信,邮件等,通过MQ的方式异步去处理。如许做的好处是缩短主流程的响应时间,提升用户体验;
  • 应用解耦合:商品服务和订单服务之间。用户下单后,订单服务会通知商品服务。不利用MQ的环境是订单服务调用商品服务的接口,如许订单服务和商品服务之间是耦合的;利用MQ,订单服务完成持久化处理,将消息写入MQ消息队列中,返回用户订单下单成功,商品服务来订阅这个下单的消息,采取拉或推的方式得到下单信息,商品服务根据商品下单信息进行商品库存信息修改,如许当下单时商品服务不可用时,也不影响正常下单,这就完成了订单服务和商品服务之间的解耦;
  • 流量消峰:秒杀运动流量过大,导致流量暴增,终极可能导致应用挂掉。一般会在应用前端加入消息队列来控制运动人数,如果消息队列超过最大数量,应该直接扬弃用户请求大概跳转到错误页面。秒杀业务根据消息队列中的请求信息在做后续的业务处理。比如在抢购时,可能一下子过来了10万个请求,但MQ只继承前100个用户的请求,超过100个不接收了。如许就成功限制了用户请求;
MQ缺点



  • 体系可用性降低: 体系引入的外部依赖越多,越容易挂掉;
  • 体系复杂度提高: 加个 MQ 进来,你怎么包管消息没有重复消费?怎么处理消息丢失的环境?怎么包管消息传递的顺序性?头大头大,问题一大堆,痛苦不已;
  • 一致性问题: A 体系处理完了直接返回成功了,人都以为你这个请求就成功了;但是问题是,要是 BCD 三个体系那里,BD 两个体系写库成功了,结果 C 体系写库失败了,你这数据就不一致了;
常见MQ对比

特性ActiveMQRabbitMQRocketMQKafka单机吞吐量万级,比 RocketMQ、Kafka 低一个数量级同 ActiveMQ10 万级,支持高吞吐10 万级,高吞吐,一般共同大数据类的体系来进行实时数据计算、日记采集等场景topic 数量对吞吐量的影响topic 可以达到几百/几千的级别,吞吐量会有较小幅度的下降,这是 RocketMQ 的一大优势,在同等呆板下,可以支持大量的 topictopic 从几十到几百个时间,吞吐量会大幅度下降,在同等呆板下,Kafka 只管包管 topic 数量不要过多,如果要支持大规模的 topic,需要增长更多的呆板资源时效性ms 级微秒级,这是 RabbitMQ 的一大特点,延迟最低ms 级延迟在 ms 级以内可用性高,基于主从架构实现高可用同 ActiveMQ非常高,分布式架构非常高,分布式,一个数据多个副本,少数呆板宕机,不会丢失数据,不会导致不可用消息可靠性有较低的概率丢失数据根本不丢颠末参数优化设置,可以做到 0 丢失同 RocketMQ功能支持MQ 领域的功能极其完备基于 erlang 开发,并发本事很强,性能极好,延时很低MQ 功能较为完善,还是分布式的,扩展性好功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日记采集被大规模利用 一般的业务体系要引入 MQ,最早各人都用 ActiveMQ,但是如今确实各人用的不多了,没颠末大规模吞吐量场景的验证,社区也不是很活跃,所以各人还是算了吧,我个人不推荐用这个了。
厥后各人开始用 RabbitMQ,但是确实 erlang 语言制止了大量的 Java 工程师去深入研究和掌控它,对公司而言,几乎处于不可控的状态,但是确实人家是开源的,比较稳固的支持,活跃度也高。
不过如今确实越来越多的公司会去用 RocketMQ,确实很不错,究竟是阿里出品,但社区可能有突然黄掉的风险,目前 RocketMQ 已捐给 Apache,但 GitHub 上的活跃度实在不算高,对本身公司技能实力有绝对自大的,推荐用 RocketMQ,否则归去老诚实实用 RabbitMQ 吧,人家有活跃的开源社区,绝对不会黄。
所以中小型公司,技能实力较为一般,技能挑战不是特别高,用 RabbitMQ 是不错的选择;大型公司,基础架构研发实力较强,用 RocketMQ 是很好的选择。
如果是大数据领域的实时计算、日记采集等场景,用 Kafka 是业内尺度的,绝对没问题,社区活跃度很高,绝对不会黄,何况几乎是全天下这个领域的事实性规范。
JMS消息模子

JMS即JavaMessageService,Java消息服务应用步伐接口,是一个Java平台中关于面向消息中间件的API,用于在两个应用步伐之间,或分布式体系中发送消息,进行异步通讯。
JMS是基于JVM的消息署理规范,ActiveMQ、HornetMQ等是JMS的实现。
Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。我们可以简单的理解:两个应用步伐之间需要进行通讯,我们利用一个JMS服务,进行中间的转发,通过JMS的利用,我们可以解除两个步伐之间的耦合。
点对点模式

消息发送者发送消息,消息署理将其放入消息队列中,消息继承者从队列中获取消息,消息读取后被移除消息队列。每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,直到它们被消费或超时。

固然可能有多个客户端在队列中侦听消息,但只有一个可以读取到消息,之后消息将不存在,其他消费者将无法读取。也就是说消息队列只有唯逐一个发送者和继承者,但是并不能说只有一个接收者
特点:


  • 每个消息只有一个消费者,即消息一旦被消费,消息就不在消息队列中;
  • 发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有正在运行,它不会影响到消息被发送到队列;
  • 接收者在成功接收消息之后需向队列应答成功;
发布订阅模式

发布者将消息发送到主题Topic中,多个订阅者订阅这个主题,订阅者不停的去轮询监听消息队列中的消息,那么就会在消息到达的同时接收消息。

特点:


  • 每个消息可以有多个消费者,消费完消息之后消息不会清除;
  • 发布者和订阅者之间有时间上的依赖性:针对某个主题(Topic)的订阅者,它必须创建一个订阅之后,才能消费发布者的消息,而且为了消费消息,订阅者必须保持运行的状态。当然,为了和缓这种严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅。如许纵然订阅者没有运行,它也能接收到发布者的消息;
kafka

kafka是一个分布式的基于发布/订阅模式的消息队列,主要应用于大数据实时处理领域。
基础架构




  • Producer:消息生产者,向kafka推送消息;
  • Consumer:消息消费者,向kafka中broker获取消息的客户端;
  • Topic:主题,可以理解为一个队列,生产者和消费者面向的都是一个topic;
  • Broker:经纪人,一台kafka服务器就是一个broker。一个集群由多个broker构成。一个broker可以容纳多个topic;
  • Partition:分区,为了实现扩展性,一个非常大的topic可以分布到多个服务器上,一个主题可以分为多个分区,每个分区是一个有序的队列;
  • ConsumerGroup:消费者组,由多个消费者构成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个消费者组内的一个消费者消费;消费者组之间互不影响。全部的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者;
  • Replica:副本,为包管集群中的某个节点发生故障时,该节点上的分区数据不丢失,且kafka仍旧能够继续工作,kafka提供了副本机制,一个topic的每个分区都有多少个副本,一个leader和多少个follower;
  • Leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是leader。leader是针对topic的,而不是broker的,即不同的kafka服务区中会出现同一个leader;
  • Follower:每个分区多个副本中的“从”,实时从leader中同步数据,保持和leader数据的同步。leader发生故障时,某个Follower会成为新的leader;
  • Controller:Kafka 集群中有一个broker会被选举为Controller,负责管理集群broker的上下线,全部topic的分区副天职配和leader选举等工作,Controller的管理工作都是依赖于Zookeeper的;
消费者组的作用为了提高消费本事,即提高并发。
解耦合是消息队列作用之一,当消费者宕机后,再次启动的时间会继续消费消息,而不是从头消费消息。由于这个特性所以消费者会保存一些消费的进度信息,被称为offset,保存的位置在kafka0.9之前保存在zookpeer当中,在此之后保存在kafka本地。即终极kafka会将消息保存在本地磁盘中,默认保留168个小时,即7天。
kafka中消息是以topic进行分类的,producer生产消息,consumer消费消息,都是面向topic的。topic是逻辑上的概念,而partition是物理上的概念,每个partition对应于一个log文件,该log文件中存储的就是producer生产的数据。
Producer生产的数据会被不停追加到该log文件末端,且每条数据都有本身的offset。consumer组中的每个consumer,都会实时记载本身消费到了哪个offset,以便堕落恢复时,从前次的位置继续消费。
发布订阅工作流程



  • 生产者定期向主题发送消息;
  • kafka署理将全部消息存储在为特定主题设置的分区中。它确保消息在分区之间均匀共享。如果生产者发送了两个消息,并且有两个分区,kafka将在第一个分区中存储一个消息,在第二个分区中存储第二个消息;
  • 消费者订阅主题后,kafka将向消费者提供该主题的当前偏移量,并将偏移量保存在Zookeeper集合中;
  • 消费者将定期向kafka请求新消息,一旦kafka从生产者那里收到消息,它将把这些消息转发给消费者,消费者将收到消息并进行处理;
  • 消息处理后,消费者将向kafka代剃头送确认;
  • 一旦kafka收到确认,它将偏移量更改为新值并在Zookeeper中对其进行更新。由于在Zookeeper中保留了偏移量,因此纵然在服务器出现故障时,利用者也可以精确读取下一条消息;
  • 4、5、6步调流程将重复进行,直到消费者停止请求为止;
消费者可以选择随时倒退或跳至所需的主题偏移量并阅读全部后续消息。
生产者

生产者文件存储

参考文章:


  • https://www.jianshu.com/p/3e54a5a39683

由于生产者生产的消息会不停追加到log文件末尾,为防止log文件过大导致数据定位效率低下,kafka采取了分片和索引机制,将每个partition分为多个segment。
每个segment对应两个文件:“.index”文件和“.log”文件。这些文件位于一个文件夹下,该文件夹的定名规则为:topic名称+分区序号。例如,first这个topic有三个分区,则其对应的文件夹为 first-0,first-1,first-2。其中,每个segment中的日记数据文件大小均相称。
   该日记数据文件的大小可以通过在kafka Broker的config/server.properties设置文件的中的“log.segment.bytes”进行设置,默认为1G大小(1073741824字节),在顺序写入消息时如果超出该设定的阈值,将会创建一组新的日记数据和索引文件。
  “.index”文件存储大量的索引信息,“.log”文件存储大量的数据,索引文件中的元数据指向对应数据文件中message的物理偏移地点。其中文件的定名是以第一个数据的偏移量来定名的。
kafka如何通过index文件快速找到log文件中的数据?
根据指定的偏移量,利用二分法查询定位出该偏移量对应的消息所在的分段索引文件和日记数据文件。然后通过二分查找法,继续查找出小于即是指定偏移量的最大偏移量,同时也得出了对应的position即实际物理位置。
根据该物理位置在分段的日记数据文件中顺序扫描查找偏移量与指定偏移量相称的消息。由于index文件中的每条对应log文件中存储内容大小都相同,所以想要找到指定的消息,只需要用index文件中的该条的大小加上该条的偏移量即可得出log文件中指定消息的位置。

生产者分区策略

分区的缘故原由:


  • 方便在集群中扩展,每个Partition可以通过调整以适应它所在的呆板,而一个topic又可以有多个Partition构成,因此整个集群就可以适应得当的数据了;
  • 可以提高并发,由于可以以Partition为单位读写了,每个partition在不同的服务区上;



  • 在指明partition的环境下,直接将指明的值直接作为partiton值;
  • 在没有指明partition值但有key的环境下,将key的hash 值与topic的partition数进行取余得到partition值;
  • 既没有partition值又没有key值的环境下,第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与topic可用的partition总数取余得到partition值,也就是常说的round-robin算法;
生产者数据可靠性包管

为包管 producer 发送的数据,能可靠的发送到指定的 topic,topic 的每个 partition 收到 producer 发送的数据后,都需要向 producer 发送 ack(acknowledgement 确认收到),如果 producer 收到 ack,就会进行下一轮的发送,否则重新发送数据。

方案长处缺点半数以上完成同步,就发 送 ack延迟低选举新的 leader 时,容忍 n 台 节点的故障,需要 2n+1 个副本全部完成同步,才发送ack选举新的 leader 时,容忍 n 台 节点的故障,需要 n+1 个副 本延迟高   理解 2n+1:
半数以上完成同步才可以发ACK,如果挂了n台有副本的服务器,那么就需要有别的n台正常发送(如许正常发送的刚好是总数(挂的和没挂的)的一半(n(挂的)+n(正常的)=2n)),由于是半数以上所以2n+1.(所以总数2n+1的时间最多只能容忍n台有故障)
  即,如果挂了n台有副本的服务器,那么存在副本的服务器的总和为 2n+1
  kafka选择了第二种方案,缘故原由如下:
1.同样为了容忍 n 台节点的故障,第一种方案需要 2n+1 个副本,而第二种方案只需要 n+1 个副本,而 kafka 的每个分区都有大量的数据,第一种方案会造成大量数据的冗余;
2.固然第二种方案的网络延迟会比较高,但网络延迟对 kafka 的影响较小;
采取第二种方案之后,假想以下情景: leader 收到数据,全部 follower 都开始同步数据,但有一个 follower,由于某种故障,迟迟不能与 leader 进行同步,那 leader 就要一直等下去,直到它完成同步,才能发送 ack。这个问题怎么办理呢?
Leader 维护了一个动态的 in-sync replica set 即ISR。
当和 leader 保持同步的 follower 集合。当 ISR 中的 follower 完成数据的同步之后,就会给 leader 发送 ack。如果 follower长时间未向leader同步数据,则该follower将被踢出ISR,该时间阈值由replica.lag.time.max.ms参数设定。 Leader 发生故障之后,就会从 ISR 中选举新的 leader。
   kafka是通过消息条数差值(replica.lag.time.max.messages) 加 通讯时间好坏(同步时间replica.lag.time.max.ms) 两个条件来选副本进ISR,在高版本中不再关注副本的消息条数最大条件。
  为何会去掉消息条数差值参数?
由于kafka一般是按batch批量发数据到leader, 如果批量条数12条,replica.lag.time.max.messages参数设置是10条,那么当一个批次消息发到kafka leader,此时,ISR中就要踢掉全部的follower,很快follower同步完全部数据后,follower又要被加入到ISR,而且要加入到zookeeper中频繁操纵,所以去撤除该条件。
生产者数据一致性包管




  • LEO:(Log End offset)每个副本的最后一个offset;
  • HW:(High Watermark)高水位,指的是消费者能见到的最大的 offset, ISR 队列中最小的 LEO;
  • follower 故障:follower 发生故障后会被临时踢出 ISR,待该 follower 恢复后, follower 会读取本地磁盘记载的前次的 HW,并将 log 文件高于 HW 的部分截取掉,从 HW 开始向 leader 进行同步。等该 follower 的 LEO 大于即是该 Partition 的 HW,即 follower 追上 leader 之后,就可以重新加入 ISR 了。
  • leader 故障:leader 发生故障之后,会从 ISR 中选出一个新的 leader,之后,为包管多个副本之间的数据一致性, 其余的 follower 会先将各自的 log 文件高于 HW 的部分截掉,然后从新的 leader同步数据;如果少于 leader 中的数据则会从 leader 中进行同步。
生产者ack机制

对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,所以没须要等 ISR 中的 follower 全部接收获功。kafka为生产者提供了三种ack可靠性级别设置:


  • 0:producer 不期待 broker 的 ack,这一操纵提供了一个最低的延迟,broker 一接收到还没有写入磁盘就已经返回,当 broker 故障时有可能丢失数据;
  • 1: producer 期待 broker 的 ack, partition 的 leader 落盘成功后返回 ack,如果在 follower同步成功之前 leader 故障,那么将会丢失数据;
  • -1/all:producer 期待 broker 的 ack, ISR队列中 partition 的 leader 和 ISR 的follower 全部落盘成功后才返回 ack。但是如果在 follower 同步完成后,broker 发送 ack 之前, leader 发生故障,那么会造成数据重复;
ExactlyOnce

将服务器的 ACK 级别设置为-1,可以包管 Producer 到 Server 之间不会丢失数据,即 At Least Once 语义,至少发送一次;相对的,将服务器 ACK 级别设置为 0,可以包管生产者每条消息只会被发送一次,即 At Most Once 语义,至多发送一次。
至少发送一次可以包管数据不丢失,但是不能包管数据不重复;相对的,至多发送一次可以包管数据不重复,但是不能包管数据不丢失。 但是,对于一些非常重要的信息,比如说买卖业务数据,下游数据消费者要求数据既不重复也不丢失,即 Exactly Once 语义,准确发送一次。
在0.11 版本的 Kafka,引入了一项庞大特性:幂等性。所谓的幂等性就是指生产者不论向 Server 发送多少次重复数据, Server 端都只会持久化一条。幂等性联合 At Least Once 语义,就构成了 Kafka 的 Exactly Once 语义。
要启用幂等性,只需要将 Producer 的参数中 enable.idempotence 设置为 true 即可。
Kafka的幂等性实现实在就是将原来下游需要做的去重放在了数据上游。开启幂等性的 Producer 在初始化的时间会被分配一个 PID,发往同一 Partition 的消息会附带 Sequence Number。而Broker 端会对<ID, Partition, SeqNumber>做缓存,当具有相同主键的消息提交时, Broker 只会持久化一条。但是 PID 重启就会变革,同时不同的 Partition 也具有不同主键,所以幂等性无法包管跨分区跨会话的 Exactly Once。
生产者发送消息流程

kafka的生产者发送消息采取的是异步发送的方式。在消息发送的过程中,涉及到了两个线程——main 线程和 Sender 线程,以及一个线程共享变量——RecordAccumulator。

在生产者发送消息时,main线程将消息发送给 RecordAccumulator,当数据积聚到 batch.size 之后,sender线程才会不停从 RecordAccumulator 中拉取消息发送到 kafka的broker;如果数据迟迟未达到 batch.size,sender 线程期待 linger.time 之后就会发送数据。
消费者

消费者分区分配策略

一个消费者组中有多个消费者,一个主题有多个分区,所以必然会涉及到分区的分配问题,即确定哪个个分区由哪个消费者来消费。
消费分配策略:


  • round-robin:会采取轮询的方式将当前全部的分区依次分配给全部的消费者;
  • range:首先会计算每个消费者可以消费的分区个数,然后按照顺序将指定个数范围的分区分配给各个消费者;
  • sticky:这种分区策略是最新版本中新增的一种策略,将现有的分区尽可能均衡的分配给各个消费者,存在此目的的缘故原由在于Round-Robin和Range分配策略实际上都会导致某几个消费者承载过多的分区,从而导致消费压力不均衡;
轮询就不必说了,就是把分区按照hash排序,然后分配。range按范围分配,先将全部的分区放到一起然后排序,按照均匀分配的方式计算每个消费者会得到多少个分区,如果没有除尽,则会将多出来的分区依次计算到前面几个消费者。
比如这里是三个分区和两个消费者,那么每个消费者至少会得到1个分区,而3除以2后还余1,那么就会将多余的部分依次算到前面几个消费者,也就是这里的1会分配给第一个消费者,
如果按照Range分区方式进行分配,其本质上是依次遍历每个topic,然后将这些topic的分区按照其所订阅的消费者数量进行均匀的范围分配。这种方式从计算原理上就会导致排序在前面的消费者分配到更多的分区,从而导致各个消费者的压力不均衡。
消费者消费数据问题

消费者在消费的时间,需要维护一个offset,用于记载消费的位置,当offset提交时会有两个问题:重复消费和漏消费:


  • 当提交的offset小于当前步伐处理的最后一条消息的offset,会造成重复消费。情景:先消费,后提交offset,如果消费成功、提交失败,消费者下次获取的offset还是以前的,所以会造成重复消费。
  • 当提交的offset大于当前步伐处理的最后一条消息的offset,会造成漏消费。情景:先提交offset,后消费,如果提交成功、消费失败,消费者下次获取的offset已经是新的,所以会造成漏消费。
这里就需要注意offset的提交方式,offset默认是自动提交,当然这会造成消费的不准确。offset提交方式:


  • 异步提交当前offset;
  • 同步提交当前offset;
  • 异步提交当前offset;
  • 同步和异步组合提交;
  • 提交指定的offset;
建议将offset保存在数据库中,使当前业务与offset提交绑定起来,如许可以一定程度制止重复消费问题,重复消费的问题,一方面需要消息中间件来进行包管。另一方面需要本身的处理逻辑来包管消息的幂等性。
kafka事件

kafka从0.11版本开始引入了事件支持。事件可以包管kafka在Exactly Once语义的基础上,生产和消费可以跨分区和会话,要么全部成功,要么全部失败。


  • 生产者:为了实现跨分区跨会话的事件,需要引入一个全局唯一的事件ID,并将生产者得到的PID和事件ID绑定。如许当生产者重启后就可以通过正在进行的事件ID得到原来的PID,进而包管精准发送一次;
   为了管理 Transaction, Kafka 引入了一个新的组件 Transaction Coordinator。 Producer 就是通过和 Transaction Coordinator 交互得到 Transaction ID 对应的任务状态。 Transaction Coordinator 还负责将事件全部写入 Kafka 的一个内部 Topic,如许纵然整个服务重启,由于事件状态得到保存,进行中的事件状态可以得到恢复,从而继续进行。
  

  • 消费者:对于消费者而言,事件的包管就会相对较弱,尤其时无法包管提交的信息被准确消费。这是由于消费者可以通过offset访问任意信息,而且不同的Segment File生命周期不同,同一事件的消息可能会出现重启后被删除的环境;
消息积存

如果线上遇到大量消息积存,那就是线上故障了.最可能是消费者出现故障
一般这个时间,只能临时告急扩容了:


  • 先修复消费者的问题,确保其恢复消费速率,然后将现有消费者都停掉
  • 新建一个 topic,partition 是原来的 10 倍,临时建立好原先 10 倍的 queue 数量;
  • 然后写一个临时的分发数据的 consumer 步伐,这个步伐部署上去消费积存的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的 10 倍数量的 queue;
  • 接着临时征用 10 倍的呆板来部署 consumer,每一批 consumer 消费一个临时 queue 的数据。这种做法相称于是临时将 queue 资源和 consumer 资源扩大 10 倍,以正常的 10 倍速率来消费数据;
  • 等快速消费完积存数据之后,得恢复原先部署的架构,重新用原先的 consumer 呆板来消费消息;
如果没有消费者没有出现问题,而出现了消费积存的环境可以参考以下思路:


  • 提高消费并行度
  • 批量方式消费
  • 跳过非重要方式消费
  • 优化消息消费业务处理过程,简化过程

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

用多少眼泪才能让你相信

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表