王國慶 发表于 2024-7-26 13:48:30

剖析 Kafka 消息丢失的原因

前言

Kafka消息丢失的原因通常涉及多个方面,包罗生产者、斲丧者和Kafka服务端(Broker)的设置和行为。下面将围绕这三个关键点,详细探究Kafka消息丢失的常见原因,并提供相应的办理方案和最佳实践。具体分析如下:
一、生产者导致消息丢失的场景

场景1:消息体太大

消息巨细超过Broker的message.max.bytes的值。此时Broker会直接返回错误。
办理方案 :

1、减少生产者发送消息体体积

可以通过压缩消息体、去除不须要的字段等方式减小消息巨细。
2、调解参数max.request.size

max.request.size,表现生产者发送的单个消息的最大值,也可以指单个哀求中全部消息的总和巨细。默认值为1048576B,1MB。这个参数的值值必须小于Broker的message.max.bytes。
场景2:异步发送机制

Kafka生产者默认接纳异步发送消息,假如未精确处理发送结果,可能导致消息丢失。
办理方案 :

1、使用带回调函数的发送方法

不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。带有回调通知的 send 方法可以针对发送失败的消息举行重试处理。
场景3:网络题目和设置不当

​ 生产者在发送消息时可能遇到网络抖动或完全中断,导致消息未能到达Broker。假如生产者的设置没有思量这种情况,比方未设置恰当的重试机制(retries参数)和确认机制(acks参数),消息就可能在网络不稳定时丢失。
办理方案 :

1、设置acks参数设置为"all"

acks参数指定了必须要有多少个分区副本收到消息,生产者才认为该消息是写入成功的,这个参数对于消息是否丢失起着重要作用,该参数的设置具体如下:


[*]all/-1 : 表现kafka isr列表中全部的副本同步数据成功,才返回消息给客户端
[*]0 :表现客户端只管发送数据,不管服务端接收数据的任何情况
[*]1 :表现客户端发送数据后,需要在服务端 leader 副本写入数据成功后,返反响应
使用同步发送方式或确保acks参数设置为"all",以确保全部副本接收到消息。
2、设置重试参数

重试参数主要有retries和retry.backoff.ms两个参数。
(1)参数 retries是指生产者重试次数,该参数默认值为0。
消息在从生产者从发出到成功写入broker之前可能发生一些临时性异常,好比网络抖动、leader副本选举等,这些异常发生时客户端会举行重试,而重试的次数由retries参数指定。假如重试达到设定次数,生产者才会放弃重试并抛出异常。但是并不是全部的异常都可以通过重试来办理,好比消息过大,超过max.request.size参数设置的数值(默认值为1048576B,1MB)。假如设置retries大于0而没有设置参数max.in.flight.requests.per.connection(限制每个连接,也就是客户端与Node之间的连接最多缓存哀求数)大于0则意味着放弃发送消息的顺序性。
使用retries的默认值交给使用方自己去控制,结果往往是不处理。所以通用设置发起设置如下:
retries = Integer.MAX_VALUE
max.in.flight.requests.per.connection = 1
该参数的设置已经在kafka 2.4版本中默认设置为Integer.MAX_VALUE;同时增加了delivery.timeout.ms的参数设置。
(2)参数retry.backoff.ms 用于设定两次重试之间的时间间隔,默认值为100。
避免无效的频繁重试。在设置retries和retry.backoff.ms之前,最好先估算一下可能的异常规复时间,这样可以设定总的重试时间要大于异常规复时间,避免生产者过早的放弃重试。
3、设置 min.insync.replicas参数

参数min.insync.replicas, 该参数控制的是消息至少被写入到多少个副本才算是 “真正写入”,该值默认值为 1,不发起使用默认值 1, 发起设置min.insync.replicas至少为2。 由于假如同步副本的数目低于该设置值,则生产者会收到错误响应,从而确保消息不丢失。
二、Broker服务端导致消息丢失的场景

场景1:Broker 宕机

为了提拔性能,Kafka 使用 Page Cache,先将消息写入 Page Cache,接纳了异步刷盘机制去把消息保存到磁盘。假如刷盘之前,Broker Leader 节点宕机了,并且没有 Follower 节点可以切换成 Leader,则 Leader 重启后这部分未刷盘的消息就会丢失。
假如Broker的副本因子(replication.factor)设置过低,或者同步副本的数目(min.insync.replicas)设置不当,一旦Leader Broker宕机,选举出的新的Leader可能不包含全部消息,导致消息丢失。
办理方案 :

1、增加副本数目

这种场景下多设置副本数是一个好的选择,通常的做法是设置 replication.factor >= 3,这样每个 Partition 就会有 3个以上 Broker 副本来保存消息,同时宕机的概率很低。
同时配合设置上文提到的参数 min.insync.replicas至少为2(不发起使用默认值 1),表现消息至少要被成功写入到 2 个 Broker 副本才算是发送成功。
场景2:leader挂掉,follower未同步

假如 leader 副本地点的 broker 突然挂掉,那么就要从 follower 副本重新选出一个 leader ,但 leader 的数据还有一些没有被 follower 副本同步的话,就会造成消息丢失。
办理方案 :

1、leader竞选资格

参数unclean.leader.election.enable 的值分析如下:


[*]true:允许 ISR 列表之外的节点参与竞选 Leader;
[*]false:不允许 ISR 列表之外的节点参与竞选 Leader。
该参数默认值为false。但假如为true的话,意味着非ISR集合中的副本也可以到场选举成为leader,由于不同步副本的消息较为滞后,此时成为leader的话可能出现消息不划一的情况。所以unclean.leader.election.enable 这个参数值要设置为 false。
2、增加副本数目

同上文。
场景3:持久化错误

为了提高性能,减少刷盘次数, Kafka的Broker数据持久化时,会先存储到页缓存(Page cache)中,
按照一定的消息量和时间间隔举行举行批量刷盘的做法。数据在page cache时,假如系统挂掉,消息未能及时写入磁盘,数据就会丢失。Kafka没有提供同步刷盘的方式,所以只能通过增加副本或者修改刷盘参数提高刷盘频率来来减少这一情况。
办理方案 :

1、调解刷盘参数

kafka提供设置刷盘机制的参数如下:
log.flush.interval.messages
多少条消息刷盘1次,默认Long.MaxValue
log.flush.interval.ms
隔多长时间刷盘1次 默认null
log.flush.scheduler.interval.ms
周期性的刷盘。默认Long.MaxValue
官方不发起通过上述的刷盘3个参数来强制写盘。其认为数据的可靠性通过replica来包管,而强制flush数据到磁盘会对整体性能产生影响。
2、增加副本数目

同上文。
三、斲丧者导致消息丢失的场景

场景1:提交偏移量后消息处理失败

参数 enable.auto.commit 于设定是否自动提交offset,默认是true。代表消息会自动提交偏移量。但是提交偏移量后,消息处理失败了,则该消息丢失。
办理方案 :

可以把 enable.auto.commit 设置为 false,这样相当于每次斲丧完后手动更新 Offset。不过这又会带来提交偏移量失败时,该消息复斲丧题目,因此斲丧端需要做好幂等处理。
场景2:并发斲丧

假如斲丧端接纳多线程并发斲丧,很容易由于并发更新 Offset 导致斲丧失败。
办理方案 :

假如对消息丢失很敏感,最好使用单线程来举行斲丧。假如需要接纳多线程,可以把 enable.auto.commit 设置为 false,这样相当于每次斲丧完后手动更新 Offset。
场景3:消息堆积

斲丧者假如处理消息的速率跟不上消息产生的速率,可能会导致消息堆积,进而触发斲丧者客户端的流控机制,从而遗失部分消息。
办理方案 :

一般题目都出在斲丧端,只管提高客户端的斲丧速率,斲丧逻辑另起线程举行处理。
场景4:斲丧者组rebalance

斲丧者组 rebalance导致导致消息丢失的场景有两种:
1、某个客户端心跳超时,触发 Rebalance被踢出斲丧组。假如只有这一个客户端,那消息就不会被斲丧了。
2、Rebalance时没有及时提交偏移量,斲丧者组内的斲丧者需要重新举行分区分配,在这个过程中无法斲丧消息。所以假如在 斲丧者在 rebalance 之前没有精确处理完正在处理的消息,那么这些消息就可能会丢失或者重复斲丧。
办理方案 :

1、只管提高客户端的斲丧速率

提高单条消息的处理速率,比方对消息处理中比 较耗时的步骤可通过异步的方式举行处理、利用多线程处理等。
2、调解参数避免不 须要的rebalance

某些参数设置不当会导致重均衡频繁 ,严重影响斲丧速率,此时可以通过调解参数避免不须要的重均衡。 kafka rebalance所涉及的参数如下:
session.timeout.ms
该参数是 Coordinator 检测斲丧者失败的时间,即在这段时间内客户端是否跟 Coordinator 保持心跳,假如该参数设置数值小,可以更早发现斲丧者崩溃的信息,从而更快地开启重均衡,避免斲丧滞后,但是这也会导致频繁重均衡,这要根据现实业务来衡量。
max.poll.interval.ms
于设定consumer两次poll的最大时间间隔(默认5分钟),假如超过了该间隔consumer client会主动向coordinator发起LeaveGroup哀求,触发rebalance。根据现实场景可将max.poll.interval.ms值设置大一点,避免不 须要的rebalance。
heartbeat.interval.ms
该参数跟 session.timeout.ms 精密关联,前面也说过,只要在 session.timeout.ms 时间内与 Coordinator 保持心跳,就不会被 Coordinator 剔除,那么心跳间隔的时间就是session.timeout.ms,因此,该参数值必须小于 session.timeout.ms,以保持 session.timeout.ms 时间内有心跳。
max.poll.records
于设定每次调用poll()时取到的records的最大数,默认值是500,可根 据现实消息速率适当调小。这种思绪可办理因斲丧时间过长导致的重复斲丧题目, 对代码改动较小,但无法绝对避免重复斲丧题目。
依然会丢消息的场景

纵然把参数都设置的很美满也会丢失消息的两种场景
场景 1:

当把数据写到充足多的PageCache的时候就会告知生产者如今数据已经写入成功,但假如还没有把PageCache的数据写到硬盘上,这时候PageCache地点的操作系统都挂了,此时就会丢失数据。
场景 2:

副本地点的服务器硬盘都坏了,也会丢数据。
总结

总的来说,Kafka消息丢失是一个涉及多个环节的题目,需要从生产者、Broker和斲丧者三个层面综合思量。通过公道的设置和计谋,结合监控和及时的应对步伐,可以大幅低落消息丢失的风险,确保数据在分布式系统中的可靠传递。
下图是本文内容总结的脑图:
https://img-blog.csdnimg.cn/direct/9f8e5479e28f42e5b91667912928dd5b.png#pic_center
末了
https://img-blog.csdnimg.cn/ecc7bcef7b5c48c3a78aaba2c1b75a42.jpeg

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: 剖析 Kafka 消息丢失的原因