最全Kafka知识宝典之消耗端深度剖析

打印 上一主题 下一主题

主题 576|帖子 576|积分 1728

一、Kafka消耗者基本特性

消耗者与消耗者组的关系

消耗者用一个消耗者组名标记自己
一个发布在Topic上消息被分发给此消耗者组中的一个消耗者


  • 假如所有的消耗者都在一个组中,那么这就变成了队列模子,即这些消耗者只有一个消耗者会收到消息
  • 假如所有的消耗者都在不同的组中,那么就完全变成了发布-订阅模子。每个消耗者都会收到消息
上述特性总结为:Kafka的消息,只允许同一个消耗者组里一个消耗者消耗。但是不同的消耗者组之间是隔离的,互不影响的
消耗者组是什么?

它是一个组,所以内部可以有多个消耗者,这些消耗者共用一个ID(叫做Group ID),一个组内的所有消耗者共同协作,完成对订阅的topic的所有分区进行消耗,此中一个topic中的一个分区只能由一个消耗者消耗
消耗者组的特性



  • 一个消耗者组可以有多个消耗者。
  • Group ID是一个字符串,在一个kafka集群中,它标识唯一的一个消耗者组。
  • 每个消耗者组订阅的所有主题中,每个主题的每个分区只能由一个消耗者消耗,消耗者组之间不影响。
消耗者分配

Kafka是怎样包管一条消息在同一个组内只会被一个消耗者消耗的?
Kafka是基于分区来分配给组内的消耗者的,也就是说,基于分区到消耗者有一个映射,这个分区内的消息都会被这个消耗者收到,而其他消耗者没有映射关系就不会被收到了。
这是组里只有一个消耗者的情况,那么所有分区的消息都会与这个消耗者创建映射关系

这是组里有多个消耗者的情况,但是消耗者数量会小于分区数量,那么一个消耗者会吸收来自多个分区的消息

这是组里有多个消耗者的情况,但是消耗者数量大于分区数量,那么会有消耗空闲,收不到分区的消息

所以最好消耗者数量小于便是分区的数量,不然会导致有些消耗者永远收不到消息
分区消息分配策略

一个consumer group中有多个consumer,一个topic有多个partition,所以一定会涉及到partition的分配题目,即确定哪个partition由哪个consumer来消耗,Kafka提供了3种消耗者分区分配策略:RangeAssignor、RoundRobinAssignor、StickyAssignor
RangeAssignor

对于每一个Topic,起首对分区按照分区ID进行排序,然后订阅这个Topic的消耗组的消耗者再进行排序,之后尽量平衡的将分区分配给消耗者,这里只能是尽量平衡,由于分区数可能无法被消耗者数量整除,那么有一些消耗者就会多分配到一些分区。可以理解为均匀分

盘算规则 
好比上述7个分区,3个comsumer,则7/3=2,余1,这个表明假如3个消耗线程均分7个分区还会多出1个分区,那么这个多出的额外分区就会给前面的消耗线程处理,所以它会把第一个分区先给到consumer-1消耗线程消耗
配置方式
  1. prop.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.RangeAssignor");
复制代码
RoundRobinAssignor

RoundRobinAssignor 采取轮询的方式分配分区。假如consuemrs订阅Topics都是类似的,那么partitions将会被匀称分配给每个consumer,最理想的状态是partitions数是consumers数的整数倍,如许每个consumer都有类似数量的partitions数。

盘算规则 
类似于斗田主发牌,第一个分区给了第一个消耗者,第二个分区就给第二个消耗者,一次进行下去
配置方式
  1. prop.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.RoundRobinAssignor");
复制代码
StickyAssignor

StickyAssignor 是 Kafka 2.4.0 版本引入的一种新的分区分配策略。它的目标是在重新分配时尽可能保持现有的分配稳定,以减少重新分配带来的影响。

盘算规则


  • StickyAssignor 会在重新分配时尽量保持现有的分区分配稳定。
  • 假如需要重新分配,它会尽量将分区分配给已经在消耗该分区的消耗者,或者分配给负载较轻的消耗者。
  • 分配时,尽量使每个消耗者的分区数量大致相等。
这里再举个例子

配置方式 
  1. prop.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.StickyAssignor");
复制代码
二、Kafka的消耗安全题目

消耗者线程安全题目

起首,kafka 的Java consumer是单线程的筹划,正确来说是双线程,kafka新版本中KafkaConsumer变成了用户主线程和心跳线程的双线程筹划
所谓用户主线程,就是你启动Consumer应用步伐的main方法的那个线程,而心跳线程只负责定期发送心跳给对应的Broker,以标识消耗者应用的存活性,引入心跳线程的目的还有一个:解耦真实的消息处理逻辑与消耗者组成员存活性管理。
尽管多了一个心跳线程,但是实际的消息处理还是由主线程完成,所以我们还是可以认为KafkaConsumer是单线程筹划的。
那为什么要采取单线程筹划的思路呢?


  • 新版本Consumer筹划了单线程+轮询的机制,这种筹划能够较好的实现非壅闭式的消息获取。(由于一旦是多线程,一定会发送壅闭等候,所以如许读取消息确保是非壅闭的)
  • 单线程的筹划能够简化Consumer端的筹划,将处理消息的逻辑是否使用多线程的选择,由你来决定。(这里说的多线程是指,单线程依然是获取消息,这个消息要存下来,真正处理这个消息的handler可以提交给线程池行止理)
  • 不论使用那种编程语言,单线程的筹划都比较容易实现,而且,单线程筹划的Consumer更容易移植到其它语言上。
死信队列和重试队列

重试队列

        与此对应的还有一个“回退队列"的概念,试想假如消耗者在消耗时发生了异常,那么就不会对这一次消耗进行确认(Ack),进而发生回滚消息的操作之后消息始终会放在队列的顶部,然后不断被处理和回滚,导致队列陷入死循环。为相识决这个题目,可以为每个队列设置一个回退队列,它和死信队列都是为异常的处理提供的一种机制保障,实际情况下,回退队列的角色可以由死信队列和重试队列来扮演。
        重试队列实在可以当作是一种回退队列,具体指消耗端消耗消息失败时,为防止消息无故丢失而重新将消息回滚到Broker与回退队列不同的是重试队列一般分成多个重试品级,每个重试品级一般也会设置重新投递延时,重试次数越多投递延时就越大。
死信队列

        当一条消息初次消耗失败,消息队列 MQ 会主动进行消息重试,达到最大重试次数后,若消耗依然失败,则表明消耗者在正常情况下无法正确地消耗该消息,此时,消息队列 MQ不会立刻将消息抛弃,而是将其发送到该消耗者对应的特殊队列中,这种正常情况下无法被消耗的消息称为死信消息,存储死信消息的特殊队列称为死信队列。
消息丢失和消息重复

在消耗者端,消耗了消息要提交一个东西叫做offset,就是消息偏移量(也叫位移),代表我现在已经消耗到哪个位置的消息了。假如我们对于位移提交控制欠好可能出现消息丢失以及消息消息重复的情况
重复消耗

这种情况发生在消耗了数据但没有及时提交offset

好比开启了主动offset提交,consumer默认5s提交一次offset,提交offset 2s之后consumer挂了,此时已经消耗了2s的消息,但是由于没有触发5s时间间隔没有告诉kafka已经消耗信息,此时再启动consumer broker还是记载的5s主动提交之前的offset 此时会造成消息的重复消耗
消息丢失

这种情况发生消息还没真正消耗完,就提交offset了

假如将offset设置为手动提交,当offset被提交时,数据还在内存中未处理,刚好消耗者宕机,offset已经提交,数据未处理,此时就算再启动consumer也消耗不到之前的数据了,导致了数据漏消耗
假如想要consumer精准一次消耗,需要kafka消息的消耗过程和提交offset变成原子操作,此时需要我们将kafka的offset长期化到其他支持事件的中间件(好比MySOL)
消息堆积

1、假如是kafka消耗本领不敷,考虑增加topic分区数,而且同时增加消耗者组的消耗者数量,由于一个partition只能被CG(消耗者组)中的一个consumer消耗,所以partition和consumer必须同时增加
2、假如是下游数据处理不及时,可以提高每次拉取的数量。由于批次拉取数据过少,会使得处理数据小于生产的数据
配置
fetch.max.bytes消耗者获取服务器端一批消息最大的字节数,默认50M
max.poll.records一次poll拉取数据返回消息的最大条数,默认是500条
三、消耗者offset(位移)管理

消耗者在消耗的过程中需要记载自己消耗了多少数据,即消耗位置信息,在kafka中,这个位置信息有个专门的术语:位移(offset)
位移类型

有两种位移
1、分区位移
生产者向分区写入消息,每条消息在分区中的位置信息由一个叫offset的数据来表示,假设一个生产者向一个空分区写入了10 条消息,那么这 10 条消息的位移依次是 0、1、…、9;
2、消耗位移
留意,这和上面所说的消息在分区上的位移完全不是一个概念,上面的“位移“表示的是分区内的消息位置,它是稳定的,即一旦消息被成功写入到一个分区上,它的位移值就是固定的了,而消耗者位移则不同,它可能是随时变化的,毕竟它是消耗者消耗进度的指示器。
假设一个分区中有 10条消息,位移分别是0到9,某个 Consumer 应用已消耗了5条消息,这就说明该 Consumer 消耗了位移为0到4的5条消息,此时 Consumer 的位移是5,指向了下一条消息的位移。
至于为什么要有消耗位移,很好理解,当Consumer 发生故障重启之后,就能够从 Kafka 中读取之条件交的位移值,然后从相应的位移处继承消耗,从而制止整个消耗过程重来一遍,就似乎书签一样,需要书签你才可以快速找到你前次读书的位置。
位移信息存放在哪? 
Kafka0.9之后kafka将offset维护在了体系topic __consumer_offsets 中,该主题有50个partition,采取K-V方式存储数据,key=groupId+topic+partition号,value即使当前的ofset值,每隔一段时间,kafka内部会对这个topic进行压缩compact操作,保留最新的offset。
位移提交方式

主动提交

Kafka 消耗者在后台定期主动提交偏移量。所以有两个配置


  • enable.auto.commit:设置为 true 以启用主动提交,默认值为 true。
  • auto.commit.interval.ms:指定主动提交的间隔时间,默认值为 5000 毫秒(5 秒)。
  1. Properties props = new Properties();
  2. props.put("bootstrap.servers", "localhost:9092");
  3. props.put("group.id", "my-group");
  4. props.put("enable.auto.commit", "true");  // 启用自动提交
  5. props.put("auto.commit.interval.ms", "5000");  // 设置自动提交间隔时间为 5 秒
  6. props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  7. props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  8. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
  9. consumer.subscribe(Arrays.asList("my-topic"));
  10. while (true) {
  11.     ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
  12.     for (ConsumerRecord<String, String> record : records) {
  13.         // 处理消息
  14.         System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
  15.     }
  16. }
复制代码
手动提交

消耗者在处理完消息后显式地提交偏移量。需要enable.auto.commit设置为 false 以禁用主动提交。
消耗者在处理完消息后,显式调用 commitSync() 或 commitAsync() 方法来提交偏移量。
commitSync() 是同步提交,会壅闭直到提交成功或抛出异常。
commitAsync() 是异步提交,不会壅闭,可以提供回调函数来处理提交结果。
  1. Properties props = new Properties();
  2. props.put("bootstrap.servers", "localhost:9092");
  3. props.put("group.id", "my-group");
  4. props.put("enable.auto.commit", "false");  // 禁用自动提交
  5. props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  6. props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  7. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
  8. consumer.subscribe(Arrays.asList("my-topic"));
  9. try {
  10.     while (true) {
  11.         ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
  12.         for (ConsumerRecord<String, String> record : records) {
  13.             // 处理消息
  14.             System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
  15.         }
  16.         // 同步提交偏移量
  17.         consumer.commitSync();
  18.     }
  19. } catch (CommitFailedException e) {
  20.     // 处理提交失败的情况
  21.     e.printStackTrace();
  22. } finally {
  23.     consumer.close();
  24. }
复制代码
这个代码示例中,commitSync()是在consumer.poll()得到了大量的records之后,只要进行了xommit,那就是对这poll下来的所有records进行提交位移
考虑一种情况,假如在循环实行处理单条record中,发生了死循环或者出现了异常,但是之前的record又被处理过了,就会导致前面的这些record没被提交位移
所以有没有对单条record的commit呢?固然有,这种做法称为逐条提交
  1. consumer.commitSync(
  2.     Collections.singletonMap(
  3.         new TopicPartition(record.topic(), record.partition()),
  4.         new OffsetAndMetadata(record.offset() + 1)//当前record已处理,所以提交的offset应该是下一条record,需要+1
  5.     )
  6. );
复制代码
四、分区再平衡

什么是分区再平衡?

        一个新的消耗者加入群组时,它读取的是原本由其他消耗者读取的消息,当一个消耗者被关闭或发生崩溃时,它就离开群组,原本由它读取的分区将由群组里的其他消耗者来读取,在主题发生变化时,好比管理员添加了新的分区,会发生分区重分配。
        分区的所有权从一个消耗者转移到另一个消耗者,如许的行为被称为再平衡,再平衡非常重要,它为消耗者群组带来了高可用性和伸缩性(我们可以放心地添加或移除消耗者),不外在正常情况下,我们并不盼望发生如许的行为,在再平衡期间,消耗者无法读取消息,造成整个群组一小段时间的不可用,另外,当分区被重新分配给另一个消耗者时,消耗者当前的读取状态会丢失,它有可能还需要去刷新缓存,在它重新恢复状态之前会拖慢应用步伐。
再平衡的过程
        只要消耗者以正常的时间间隔发送心跳,就被认为是活跃的,说明它还在读取分区里的消息,消耗者会在轮询消息(为了获取消息)或提交偏移量时发送心跳,假如消耗者停止发送心跳的时间足够长,会话就会过期,群组协调器认为它已经死亡,就会触发一次再平衡。
总结:什么时候会触发分区再分配


  • Topic中添加一个新的分区,消耗者将重新分配
  • 消耗者关闭或者崩溃,消耗者读取的分区将会分配给其他消耗者
  • 消耗者群组中添加新的消耗者,将分区重新分配
五、Kafka存储结构


Kafka存储数据,是以分区为单元的,每个分区都有自己的log文件夹,下面的文件会分段(segment)存储
为什么分区太多的时候,Kafka性能会下降?
Kafka是在硬盘上顺序存取数据的,但是分区太多,造成写数据会东一个分区西一个分区的找,演变成随机存取了,所以导致kafka性能下降


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

梦应逍遥

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表