记录一次Kafka重复消耗的题目
不讲原理,先抛题目日志发现Kafka同一个消耗者在一段时间内对同一条消息多次消耗。
https://i-blog.csdnimg.cn/direct/f88dcda2d648461d81e9ca364c861c93.png
https://i-blog.csdnimg.cn/direct/c489dcec34bf46a7a39271c58a005df5.png
https://i-blog.csdnimg.cn/direct/da7af224b3c34b8faeffc844502d6102.png
原理不讲,先上设置
# 是否自动提交offset
spring.kafka.consumer.enable-auto-commit=true
# 提交offset延时(接收到消息后多久提交offset)
spring.kafka.consumer.auto.commit.interval.ms=1000
# latest:重置为分区中最新的offset(消费分区中新产生的数据);
spring.kafka.consumer.auto-offset-reset=latest
# Rebalance 超时
spring.kafka.consumer.properties.max.poll.interval.ms=300000# 5 分钟
三言两语,背景简介
Kafka一个生产者,一个消耗者,消耗同一个Topic,但是其中的某些消息处理耗时凌驾5分钟。
捕获题目,深度剖析
Kafka自动提交offset后因默认max.poll.interval.ms设置5分钟没有调用poll()从而发生Reblance重复消耗的题目。
解决方案,横向对比
[*]offset自动提交改为手动提交
spring.kafka.consumer.enable-auto-commit=false
@KafkaListener(topics = "generateYyVoucher-topic", groupId = "defaultConsumerGroup")
public void generateYyVoucher(ConsumerRecord<String, String> record, Acknowledgment ack) {
try {
...
// 提交 Offset
ack.acknowledge();
} catch (Exception e) {
log.error("Message processing failed: ", e);
// 如果处理失败,Offset 不会被提交
}
}
2.增加max.poll.interval.ms延迟
spring.kafka.consumer.properties.max.poll.interval.ms=900000 # 设置为15分钟
总结:但是上述两种方案均不能从根本上解决消耗者重复消耗的题目!根本题目是在于Reblance消耗重组缘故原由导致!
第1种只能解决offset偏移量不会重发消耗当前消息,但可能会消耗上一个消息;
第2种增加延迟,当业务逻辑凌驾设置时间时仍然会重复消耗。
重复消耗,最佳解决方案
幂等消耗标识(唯一标识)
// 幂等消费标识(唯一标识),以解决Kafka自动提交offset后因默认max.poll.interval.ms设置5分钟没有调用poll()从而发生Reblance重复消费的问题
String redisKey = "voucher:processing:" + ledgerId;
// 检查 Redis 中是否已存在该幂等标识
if (stringRedisTemplate.hasKey(redisKey)) {
return;
}
// 设置 Redis 中的标识为正在处理中(可以设置一个有效期,比如 30 分钟)
stringRedisTemplate.opsForValue().set(redisKey, "processing", 30, TimeUnit.MINUTES);
氪肝提示,不是温馨
面试八股文准备再多的中间件题目,也不如真正项目上实境碰到的题目。中间件也不是每个都要用,只关注项目上利用的就行。只有真正履历解决过一两个实际题目,才能了解中间件。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页:
[1]