Kafka中的fetch-min-size、fetch-max-wait和request.timeout.ms设置

打印 上一主题 下一主题

主题 620|帖子 620|积分 1860

当前kafka的版本为2.8.11,Spring Boot的版本为2.7.6,在pom.xml中引入下述依赖: 
  1. <dependency>
  2.     <groupId>org.springframework.kafka</groupId>
  3.     <artifactId>spring-kafka</artifactId>
  4.     <version>2.8.11</version>
  5. </dependency>
复制代码
然后在yml设置文件举行如下设置:
  1. spring:
  2.   kafka:
  3.     bootstrap-servers: 127.0.0.1:9092
  4.     consumer:
  5.       group-id: 0
  6.       key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  7.       value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  8.       fetch-max-wait: 25000ms
  9.       fetch-min-size: 10
  10.     producer:
  11.       key-serializer: org.apache.kafka.common.serialization.StringSerializer
  12.       value-serializer: org.apache.kafka.common.serialization.StringSerializer
复制代码
1、fetch-min-size

fetch-min-size表现消费者一次拉取请求中,服务器应该返回的最小数据量,单元是字节,默认值为1个字节。如果服务器没有充足的数据返回,请求会等候。
在Spring Boot与Kafka整合中,yml文件中的fetch-min-size参数对应着kafka的fetch.min.bytes参数。
   UTF-8编码下一个字符占用一个字节
  2、fetch-max-wait

fetch-max-wait表现消费者的 fetch(拉取)请求发给broker后,在broker中可能会被阻塞的(当topic中records的总size小于fetch.min.bytes时),此时这个fetch请求耗时就会比力长。这个设置就是来设置消费者最多等候response多久。
若是不满足fetch.min.bytes时,该参数就会生效,其值表现等候消费端请求的最长等候时间,默认是500ms。
在项目中创建一个生产者用于往主题 topic0 中投递消息,如下所示:
  1. import lombok.extern.slf4j.Slf4j;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.kafka.core.KafkaTemplate;
  4. import org.springframework.kafka.support.SendResult;
  5. import org.springframework.util.concurrent.ListenableFuture;
  6. import org.springframework.util.concurrent.ListenableFutureCallback;
  7. import org.springframework.web.bind.annotation.RequestMapping;
  8. import org.springframework.web.bind.annotation.RequestParam;
  9. import org.springframework.web.bind.annotation.RestController;
  10. @Slf4j
  11. @RestController
  12. @RequestMapping("/kafka")
  13. public class KafkaProducer {
  14.     // 自定义的主题名称
  15.     public static final String TOPIC_NAME="topic0";
  16.     @Autowired
  17.     private KafkaTemplate<String, String> kafkaTemplate;
  18.     @RequestMapping("/send")
  19.     public String send(@RequestParam("msg")String msg) {
  20.         log.info("准备发送消息为:{}",msg);
  21.         // 1.发送消息
  22.         ListenableFuture<SendResult<String,String>> future=kafkaTemplate.send(TOPIC_NAME,msg);
  23.         future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
  24.             @Override
  25.             public void onFailure(Throwable throwable) {
  26.                 // 2.发送失败的处理
  27.                 log.error("生产者 发送消息失败:"+throwable.getMessage());
  28.             }
  29.             @Override
  30.             public void onSuccess(SendResult<String, String> stringObjectSendResult) {
  31.                 // 3.发送成功的处理
  32.                 log.info("生产者 发送消息成功:"+stringObjectSendResult.toString());
  33.             }
  34.         });
  35.         return "接口调用成功";
  36.     }
  37. }
复制代码
接着再在项目中创建一个消费者用于消息主题 topic0 中的消息,如下所示:
  1. import java.util.Optional;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.apache.kafka.clients.consumer.ConsumerRecord;
  4. import org.springframework.kafka.annotation.KafkaListener;
  5. import org.springframework.kafka.support.KafkaHeaders;
  6. import org.springframework.messaging.handler.annotation.Header;
  7. import org.springframework.stereotype.Component;
  8. @Slf4j
  9. @Component
  10. public class KafkaConsumer {
  11.     // 自定义主题名称,这里要注意的是主题名称中不能包含特殊符号:“.”、“_”
  12.     public static final String TOPIC_NAME="topic0";
  13.     @KafkaListener(topics = TOPIC_NAME, groupId = "ONE")
  14.     public void topic_one(ConsumerRecord<?, ?> record, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
  15.         Optional message = Optional.ofNullable(record.value());
  16.         if (message.isPresent()) {
  17.             Object msg = message.get();
  18.             log.info("消费者组One消费了消息:Topic:" + topic + ",Record:" + record + ",Message:" + msg);
  19.         }
  20.     }
  21. }
复制代码
项目启动以后,这时控制台中会打印下述信息:
  1. ConsumerConfig values:
  2. auto.commit.interval.ms = 5000
  3. auto.offset.reset = latest
  4. bootstrap.servers = [127.0.0.1:9092]
  5. client.id = consumer-ONE-1
  6. enable.auto.commit = false
  7. fetch.max.wait.ms = 25000
  8. fetch.min.bytes = 10
  9. group.id = ONE
  10. key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
  11. value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
复制代码
接着使用 /kafka/send?msg=xxx 接口连续往主题 topic0 中生产 9 条消息,我们会发现 9 条消息发送完了以后消费者并不会立马就可以消费到这些数据,大约在等候 25 秒时长以上才能消费到这些数据。然后后面我们再调用/kafka/send?msg=xxx 接口连续往主题 topic0 中生产多条消息,不管你怎么发送数据,这时消费者都可以立马消费到这些数据。
重启项目使用生产者往主题 topic0 中投递 10 条消息,我们会发现 10 条消息发送完了以后消费者并不会立马就可以消费到这些数据,大约在 25 秒时长以内就能消费到这些数据。接着我们再调用/kafka/send?msg=xxx 接口连续往主题 topic0 中生产多条消息,不管你怎么发送数据,这时消费者也是都可以立马消费到这些数据。
   通过上述示例我们可以看出fetch-min-size和fetch-max-wait这两个参数的设置项确实生效了,fetch-min-size值的设置仅仅是在项目启动以后Kafka客户端第一次发起Fetch请求时(不管消费者有没有获取到数据)才会生效,仅生效一次。一旦完成一次Fetch请求后续对于消息的消费都是即时消费。而fetch-max-wait的值是一直会生效的,kefka客户端每发起一次Fetch请求,在服务端没有充足数据返回时,该Fetch请求会处于等候状态,直到到达最长等候时间。
    fetch-min-size这是一个调优相关的参数,默以为1字节,表现如果请求的数据量小于1字节,broker就是攒一攒,等充足1字节了再一起返回给Consumer,这个值建议可以适当调大一点,以进步服务的吞吐量。 
    如果在fetch.max.wait.ms指定的时间内,数据量依然没有到达fetch.min.bytes所设置的值,那broker也不会再等了,将直接返回数据给Consumer,此值默以为500ms。 
  3、request.timeout.ms

这个是生产者和消费者的通用参数设置,该设置控制客户端等候请求相应的最长时间。如果在超时时间过去之后客户端仍然未收到相应,则客户端将在必要时重新发送请求,大概在重试次数用尽时使请求失败,默认值为30秒。
比方将上述yml设置文件修改为如下所示,fetch-max-wait 的值为31秒(超过了30秒):
  1. spring:
  2.   kafka:
  3.     bootstrap-servers: 127.0.0.1:9092
  4.     consumer:
  5.       group-id: 0
  6.       key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  7.       value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  8.       fetch-max-wait: 31000ms
  9.       fetch-min-size: 10
  10.     producer:
  11.       key-serializer: org.apache.kafka.common.serialization.StringSerializer
  12.       value-serializer: org.apache.kafka.common.serialization.StringSerializer
复制代码
那么在项目启动以后,不管客户端有没有获取到消息,控制台中每隔一段时间就会报下述提示信息:
  1. [Consumer clientId=consumer-ONE-1, groupId=ONE] Disconnecting from node 1001 due to request timeout.
  2. [Consumer clientId=consumer-ONE-1, groupId=ONE] Cancelled in-flight FETCH request with correlation id 21 due to node 1001 being disconnected (elapsed time since creation: 30015ms, elapsed time since send: 30011ms, request timeout: 30000ms)
  3. [Consumer clientId=consumer-ONE-1, groupId=ONE] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 1001:
复制代码
意思就是说由于客户端和服务端的毗连被断开了,取消客户端的在线fetch请求,以是一般环境下,request.timeout.ms 的默认值未改变时,建议 fetch-max-wait 这个值不要超过30秒。 

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

徐锦洪

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表