【kafka-03】springboot整合kafka以及核心参数详解

王柳  金牌会员 | 2024-9-27 14:55:28 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 905|帖子 905|积分 2715

Kafka系列整体栏目

内容链接地址【一】afka安装和基本核心概念https://zhenghuisheng.blog.csdn.net/article/details/142213307【二】kafka集群搭建https://zhenghuisheng.blog.csdn.net/article/details/142253288【三】springboot整合kafka以及核心参数详解https://zhenghuisheng.blog.csdn.net/article/details/142346016

  
一,springboot整合kafka以及核心参数详解

前面两篇主要解说了kafka的安装启动,以及kafka的集群的搭建,接下来这篇主要解说springboot怎样整合kafka,以及在kafka中的核心参数须要怎样设置,以及设置的意义是什么
1,springboot整合kafka

起首须要确定kafka版本,这里选择2.8.10的版本
  1. <dependency>
  2.     <groupId>org.springframework.kafka</groupId>
  3.     <artifactId>spring-kafka</artifactId>
  4.     <version>2.8.10</version>
  5. </dependency>
复制代码
接下来在yml配置文件中设置对应的参数,首老师产者和消费者都须要定义序列化,然后生产者设置默认组,发送到broker的消息确认机制等
  1. spring:
  2.   kafka:
  3.     #bootstrap-servers: 175.178.75.153:9092,175.178.75.153:9093,175.178.75.153:9094
  4.     bootstrap-servers: 175.178.75.153:9092
  5.     producer: # 生产者
  6.       retries: 3
  7. # 设置大于0的值,则客户端会将发送失败的记录重新发送
  8.       batch-size: 16384
  9.       buffer-memory: 33554432
  10.       acks: 1
  11.       # 指定消息key和消息体的编解码方式
  12.       key-serializer: org.apache.kafka.common.serialization.StringSerializer
  13.       value-serializer: org.apache.kafka.common.serialization.StringSerializer
  14.     consumer:
  15.       group-id: default-group
  16.       enable-auto-commit: false
  17.       auto-offset-reset: earliest
  18.       key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  19.       value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  20.     listener:
  21.       # 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
  22.       # RECORD
  23.       # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
  24.       # BATCH
  25.       # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交
  26.       # TIME
  27.       # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交
  28.       # COUNT
  29.       # TIME | COUNT 有一个条件满足时提交
  30.       # COUNT_TIME
  31.       # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交
  32.       # MANUAL
  33.       # 手动调用Acknowledgment.acknowledge()后立即提交
  34.       # MANUAL_IMMEDIATE
  35.       ack-mode: MANUAL_IMMEDIATE
复制代码
接下来就直接编写一个controller,模拟生产者往broker中发送消息,如直接在原先的9092端口建立的 zhstest111 的主题上面发送消息,其代码如下
  1. @RestController
  2. @RequestMapping("/test")
  3. public class KafkaController {
  4.     private static final String TOPIC = "zhstest11";
  5.     @Resource
  6.     private KafkaTemplate<String,Object> kafkaTemplate;
  7.     //发送数据
  8.     @GetMapping("/send")
  9.     public AjaxResult sendMessage() {
  10.         String uuid = UUID.randomUUID().toString();
  11.         kafkaTemplate.send(TOPIC, "hello kafka" + uuid);
  12.         return AjaxResult.success("数据发送成功");
  13.     }
  14. }
复制代码
消费者的消费如下,定义一个component配置类,然后通过 KafkaListener 监听对应的topic,有数据就消费
  1. @Component
  2. @Slf4j
  3. public class KafkaConfig {
  4.     private static final String TOPIC = "zhstest11";
  5.     //接收数据
  6.     @KafkaListener(topics = TOPIC, groupId = "my-group")
  7.     public void consume(String message) {
  8.         log.info("接收到的数据为: " + message);
  9.     }
  10. }
复制代码
2,kafka核心参数的解说

2.1,生产者端的参数核心解说

2.1.1,生产者端的ack机制

在生产者往broker中投递消息时,为了包管消息的可靠送达以及长期化机制,须要通过这个ack机制来吸收到broker的应答
  1. ack:1
复制代码


  • 当ack=0时:性能最高,消息直接异步给完broker就行,不须要broker任何回复,缺点就是容易丢消息
  • 当ack=1时,性能其次,须要leader结点将数据成功写入到当地日志,但是不须要等待集群中的follower写入,如果出现leader挂掉,但是follower未及时同步,那么在follower变成leader之后,就会丢失这部门消息
  • 当ack=-1时,性能最低,但是安全,生产者端须要等待broker集群中的leader和副本都成功写入日志
ack默认设置为1,允许在极度的情况下丢失部门消息。如果是为了记载海量的日志,那么可以将ack设置为0,如果是须要相对安全的,如金融领域不能丢失订单数据等,那么就设置成-1
2.2.2,重试次数retries

  1. retries: 3
复制代码
在生产者往broker投递消息时,当消息投递失败时,那么就可以设置重试,根据设置的值决定重试的次数。当然也有可能由于网络抖动的题目导致消息在相应时比力慢,生产者由于没吸收到相应,但是消息时投递成功到broker的,那么可能就会投递两条,那么就可能会导致重复消费的题目,因此后期须要设置消费的幂等性的题目等。
2.2.3,发送缓冲区buffer-memory

  1. buffer-memory: 33554432
复制代码
消息发送到broker时,为了提拔消息发送的效率,kafka内部将单条发送做了优化,将单条发送改成批量发送,因此设置了发送缓冲区,默认是32M巨细。
2.2.4,批量拉取batch-size

  1. batch-size: 16384
复制代码
上面解说在生产者端会有一个发送缓冲区,数据会先存储到这个发送缓冲区中,当然数据还是须要投放到broker呆板上,因此须要这只这个batch-size批量的将数据从这个缓冲区中拉取到broker上面。当拉取的数据满了16kb之后,立马触发将数据投递到broker的上面。
当然也不是说只有满16kb才能去拉取数据投放到broker中,比如只有1kb数据,后台会默认多少时间去投递一次,如隔断10ms投递一次,从而包管消息投递的高可用
2.2.5,序列化

在网络传输中,须要将数据大概实体序列化成0,1这种二进制文件数据通过网络传输,那么操作系统就可以辨认这种数据,因此在发送端中须要设置好相应的序列化器和反序列化器,如许才能剖析服务端发送的数据
  1. key-serializer: org.apache.kafka.common.serialization.StringSerializer
  2. value-serializer: org.apache.kafka.common.serialization.StringSerializer
复制代码
2.2,消费者端参数核心解说

2.2.1,enable-auto-commit自动提交偏移量

  1. enable-auto-commit: false
复制代码
在前面两篇文章中解说过,不管是单体的消费者还是消费者组,当有消息被消费后都会默认的去增长partition的偏移量。这个参数默认会设置为true,默认时会增长消费者的偏移量的,如果设置成false,那么就不会每次的去修改partition的偏移量,那么消费者每次消费就相称于从头开始消费,有点类似于 –from-beginning命令了
  1. bin/kafka-console-consumer.sh --topic zhstest11 --from-beginning --bootstrap-server localhost:9092
复制代码
2.2.2,auto-commit-interval自动提交偏移量时间隔断

  1. auto-commit-interval:5000
复制代码
上面提到了自动提交偏移量这个参数,当这个参数设置成true时,那么每个消费者的偏移量都得上报到每个topic主题中,类似于kafka内部会做一个记载,记载每一个消费者记载到什么地方,哪一个off_set偏移量。当消费者重启时大概出现故障之后,都可以重正确的地方开始消费。而这个参数,就是为了将每个消费者消费了多少偏移量进行上报的功能,默认情况就是每隔5s上报一次。
自动提交虽然方便,但是假设说这5s的数据还没来的及上报成功,服务器宕机了,那么消费者可能就会丢失这5s的消费记载,在topic中找不到,因此就会导致重复消费的题目。因此在现实开辟中,更加的的倾向于利用手动提交偏移量,因此上面的这个 enable-auto-commit 参数最好还是设置成false,这也是包管高可用的一种方式
2.2.3,ack-mode手动提交偏移量

  1. #指定手动提交确认模式,使用 Acknowledgment 对象来手动确认消费
  2. ack-mode: manual
复制代码
上面说了自动提交对数据可能会有不安全性,因此更加的推荐利用手动提交,因此在消费者参数配置这个ackmode的value值为manual,那么就可以直接利用这个 Acknowledgment 对象来手动确认消费
如下面这段代码,增长了 Acknowledgment 对象,直接通过调用 acknowledge 实现手动的提交偏移量
  1. @KafkaListener(topics = "my-topic", groupId = "my-group")
  2. public void listen(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
  3.     try {
  4.         // 消费消息的逻辑
  5.        log.info("Received message: " + record.value());
  6.         // 手动提交偏移量
  7.         acknowledgment.acknowledge();
  8.     } catch (Exception e) {
  9.         // 异常处理逻辑
  10.         log.info("Error processing message: " + e.getMessage());
  11.     }
  12. }
复制代码
2.2.4,max-poll-records 消费者拉取数据

  1. max-poll-records:500
复制代码
消费者在消费broker的数据时,也会设置默认的拉取数量,默认最多是500条。当然可以根据消费者消费的情况做一个适配和调解,消费过快的话可以调大这个参数,消费过慢的话可以调小这个参数
2.2.5,heartbeat-interval心跳维护

  1. heartbeat-interval:1000
复制代码
用于维护kafka和消费者之间的心跳题目,默认是1s,如果在指定时间内消费者没有往kafka发送心跳,那么kafka集群的调和器就会认为这个消费者已经失效。此时partition无消费者消费,那么就会触发一个消费的均衡机制,将该分区分配给其他消费者大概其他消费者组
2.2.6,auto-offset-reset

  1. auto-offset-reset: earliest
复制代码
这个参数比力有意思,和kafka的特性有关,假设有一个group1组,先消费了order订单主题的消息,此时offset的偏移量记载为1000,现在忽然新增了一个group2,那么这个group2默认时不能消费到group1消费到的消息的,即使是两个不同的组,由于在默认情况下,新的组会从主题已有的offset的偏移量继续往下消费,就是说启动后能消费到背面生产者所发送的消息
由于在kafka内部,这个 auto-offset-reset 参数默认设置的是 latest,就是说只消费自己启动之后生产者发送到broker的消息,因此为了让新的group组也消费前面的消息,可以设置这个值为earliest

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

王柳

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表