在进行spring-kafka消费的过程中,大部分人可能都碰到过kafka消息堆积的情况,尤其是大数据处置惩罚的场景,这时候就要想办法提高消费本领。
提高消费者的数量可以吗?我们知道,kafka的消费者个数是与kafka的分区数相干的,一个分区最多只能被一个消费者消费,也就是说,你在客户端开启的消费者个数即使凌驾了分区数,也不能提高消费本领,反而还占资源!
那既然一个分区只能被一个消费者消费,那我增长分区数不就可以多开消费者了?这在理论上确实是可行的,但是在运维层面会涉及到分区、数据的迁徙,而且这期间kafka是不可用的状态,成本太过高昂。
本方案从消费端的代码层面着手,看怎样计划实现一个高性能的kafka消费组件。
一、异步
默认情况下,spring-kafka采用的是同步消费模式,这种情况下消费本领被分区数限定,难以提拔。以是我们起首想到的异步消费:
异步消费,将消息的拉取与处置惩罚交由不同的线程处置惩罚,在硬件答应的前提下,可自由提高消费本领。
异步消费固然提高了消费本领,但是对于offset的提交却带来了挑战:因为默认情况下,offset是自动提交的,自动提交很可能导致异步未处置惩罚完的消息丢失;
二、手动提交
方案:
- consumer将offset加入本地列表进行维护;
- 异步worker回调更新offset状态;
- consumer遍历offset,探求可提交的最大offset;
- 提交最大offset到kafka,并清空本地已提交offset;
三、有序offset列表
从上图可知,为了进行手动提交,其中最关键的部分是手动维护的offset列表,为实现offset的提交,要达到的目标:
- 减少提交次数,每次只提交已完成的最大offset;
- offset列表已提交部分及时开释,保持弹性;
- 只管包管offset更新的有序性,提交时前面offset已完成,避免出现头部真空征象;
【101~109处置惩罚完之后,只有3次offset提交动作,109提交之后,109及之前的offset都会从offset列表中清除。】
但实际情况是:
- 异步worker执行进度无法预料,对offset回调更新无法包管顺序;
- 极有可能出现头部长时间无更新的真空征象;
- 非常情况下,由于头部offset不停不更新导致offset列表不停得不到开释从而导致内存溢出;
【103、105、106固然都处置惩罚完了,但是都还不能提交,因为这时候101和102都还未被处置惩罚。】
非常情况下会出现以下情况:
【随着时间的推移,低位的offset始终未被处置惩罚,这会导致后面全部已处置惩罚的offset都得不到提交,同时由于高位offset不断写入,低位的offset得不到清除,offset列表将急剧膨胀。】
问题:在异步执行的情况下怎样实现offset列表更新的有序性?
四、传统线程池
传统线程池调度策略:
- 当线程数< corePoolSize时,直接创建新线程执利用命;
- 当corePoolSize<线程数<maxPoolSize时,如果队列未满,使命入队等待;
- 当corePoolSize<线程数<maxPoolSize时,如果队列已满,创建新线程执利用命;
- 当线程数=maxPoolSize,且队列已满,则执行拒绝策略;
从传统线程池的调度策略来看,它是惰性的,这对我们当前场景来说有哪些问题?
- 在第3步中,后到的使命可能会比先到的使命先执行完;
- 在第4步中,拒绝策略会导致使命抛弃或由主线程插队;
这两点都会导致无法包管使命的顺序执行!
五、饥渴线程池
目标:
- 确保先到的使命先执行;
- 包管使命等待时的公平性;
- 不能丢弃使命;
方案:
- 改变传统线程池调度策略;
- 利用公平队列包管使命等待的公平性;
- 完全摒除拒绝策略;
- put instead offer;
【对传统线程池关键部分进行改写】
新的调度策略:
- 当线程数< corePoolSize时,直接创建新线程执利用命;
- 当corePoolSize<线程数<maxPoolSize时,强制创建新线程执利用命;
- 当线程数=maxPoolSize,入队列等待;
六、微批处置惩罚
目标:
- 提高末了业务执行服从,最大化提拔性能;
- 低沉本地offset列表内存开销;
- 提高offset提交至kafka的服从;
方案:
- 基于micro-batch微批思想,consumer线程对批量消息进行切分、整合,将微批数据交给不同的worker线程处置惩罚;
- offset列表仅记录micro-batch中最大的offset;
- 仅提交最大offset到kafka;
【从kafka批量拉取200条消息,在内部进行分片、切分、合并,以50一组形成微批,提交给4个worker处置惩罚,同时,本地offset列表只记录每个微批中最大的offset:50、100、150、200,这样在提高末了业务执行服从、offset提交服从的同时,也极大地节流了内存开销。】
微批处置惩罚要求业务必须具有原子性,micro-batch这一批要么全部成功,要么全部失败!
七、last offset处置惩罚
问题:
- consumer线程在提交offset时,秉着‘有多少就提交多少’的思想,并不等待,以是在异步的情况下会出现最后一批消息的若干offset得不到提交的情况;
- 如果后续没有业务消息产生,那么这若干offset将永久得不到提交,如果此时发生rebalance,将发生重复消费;
方案:
基于sideCar模式,提供额外的监督器,动态监测本节点的offset列表,如若发现offset列表仍有未提交的offset,则会主动发送探针消息,驱动consumer进行poll及commit;
八、重写与重排
问题:
- 发生rebalance时,consumer实例可能会发生变化,用已过时的consumer提交offset会失败;
- 发生rebalance时,可能会发生重复消费,本地offset列表会重复添加;
- 发生rebalance时,由于micro-batch机制,拉取的offset可能比已有的全部offset要小,造成offset列表乱序;
方案:
- 加入offset列表时,检查consumer实例,如果发生变化,则对consumer进行重写;
- 滤重检查及offset列表重排序;
九、分区重分配
问题:
- 发生rebalance时,节点被分配的分区也可能发生变化,这时节点之前生存的未提交offset列表就成了脏数据,并常驻内存;
- 脏数据导致探针消息不停地被发送,但是本节点消费不到,引起网络滥发;
问题思索(怎么判断offset列表为脏数据?):
- 判断offset列表大小不变为什么不可以?
——可能存在每次巡检时大小都不变的情况;
- 判断offset列表内容不变为什么不可以?
——可能线程池繁忙,新offset添加不停等待导致列表不变化;
方案:
每次添加offset时记录offset列表的更新时间戳,Offset Monitor定时检测,当经历了一个rebalance周期后,如果时间戳仍未更新,判断offset列表为脏数据,予以清除。
方案思索
- 时间戳长时间不更新意味着什么?
—— 1.分区重分配;2.线程池繁忙,新offset添加不进去;
- 线程池繁忙的场景,为什么offset列表不会被误杀?
—— 把时间线拉长,新offset加入等待一个rebalance周期后,一定会rebalance;
【关键代码】
这个案例给我们的启发:从空间维度转变为时间维度去判断,这种思索维度的转变以及思索尺度的放大,低沉了解决问题的复杂性,提供了更多的可能性。
十、极致性能
问题:
- 传统队列在读写时存在锁争用,在高并发场景下,线程不停地被挂起、恢复,上下文切换过程存在着很大的开销;
- 在x86架构下的CPU,在高并发场景下很容易形成伪共享(多个线程操纵不同的变量,但是变量处于雷同的缓存行,修改变量会使缓存行失效,甚至发生跨槽读取);
【传统队列关键代码】
Disruptor:
Disruptor是一个高性能的队列,通过以下计划解决传统队列的锁争用及伪共享问题:
- 无锁计划:采用CAS无锁方式,包管线程安全,并提高服从;
- 环形数组:可避免频繁的垃圾接纳,同时数组对处置惩罚器的缓存机制更加友爱;
- 元素定位:环形数组长度为2^n,通过位运算,能快速取到元素;
- Cache padding:通过添加额外的无用信息,避免伪共享引发的性能问题。
【cas与cache padding】
【关键代码】
十一、总体架构
十二、结果测试
服务器:3 * 6核8G
消费逻辑:将14万CDC消息盘算更新后写入ES
消费设置:kafka 6分区、批量拉取500、32线程、2048队列
结果对比:
- 不利用组件:81分钟
- 亲缘线程池:6分41秒
- Disruptor:4分51秒
- 饥渴线程池:4分11秒
- 饥渴 + 微批:1分35秒
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |