kafka enable.auto.commit和auto.offset.reset使用说明

打印 上一主题 下一主题

主题 536|帖子 536|积分 1608

enable.auto.commit

是否自动提交offset,默认是true。
auto.offset.reset

表示自动重置 offset。
auto.offset.reset 参数定义了当无法获取斲丧分区的位移时从那边开始斲丧。比方:当 Broker 端没有 offset(如第一次斲丧或 offset 凌驾7天过期)时怎样初始化 offset,当收到 OFFSET_OUT_OF_RANGE 错误时怎样重置 Offset。
earliest:自动重置到 partition 的最小 offset。
当各分区下有已提交的offset时,从提交的offset开始斲丧;无提交的offset时,重新开始斲丧。
latest:默以为 latest,表示自动重置到 partition 的最大 offset。
当各分区下有已提交的offset时,从提交的offset开始斲丧;无提交的offset时,斲丧新产生的该分区下的数据。
none:不自动进行 offset 重置,抛出 OffsetOutOfRangeException 异常。
topic各分区都存在已提交的offset时,从offset后开始斲丧;只要有一个分区不存在已提交的offset,则抛出异常。
auto.offset.reset=none 使用说明

使用背景

不盼望发生 offset 自动重置的情况,因为业务不答应发生大规模的重复斲丧。
留意:
此时斲丧组在第一次斲丧的时间就会找不到 offset 而报错,这时就须要在 catch 里手动设置 offset。
使用说明

auto.offset.reset 设置为 None 以后,可以避免 offset 自动重置的问题,但是当增加分区的时间,因为关闭了自动重置机制,客户端不知道新的分区要从哪里开始斲丧,则会产生异常,此时须要人工去设置斲丧分组 offset 并斲丧。
使用方式

斲丧者在斲丧时,当 consumer 设置 auto.offset.reset=none, 捕捉到 NoOffsetForPartitionException 异常,在 catch 里本身设置 offset。您可以根据自身业务情况选择以下方式中的其中一种。
指定 offset,这里须要本身维护 offset,方便重试。
指定重新开始斲丧。
指定 offset 为最近可用的 offset。
根据时间戳获取 offset,设置 offset。
总结:
  1. package com.tencent.tcb.operation.ckafka.plain;
  2. import com.google.common.collect.Lists;
  3. import com.tencent.tcb.operation.ckafka.JavaKafkaConfigurer;
  4. import java.time.Instant;
  5. import java.time.temporal.ChronoUnit;
  6. import java.util.ArrayList;
  7. import java.util.Collection;
  8. import java.util.HashMap;
  9. import java.util.List;
  10. import java.util.Map;
  11. import java.util.Map.Entry;
  12. import java.util.Properties;
  13. import org.apache.kafka.clients.CommonClientConfigs;
  14. import org.apache.kafka.clients.consumer.ConsumerConfig;
  15. import org.apache.kafka.clients.consumer.ConsumerRecord;
  16. import org.apache.kafka.clients.consumer.ConsumerRecords;
  17. import org.apache.kafka.clients.consumer.KafkaConsumer;
  18. import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
  19. import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
  20. import org.apache.kafka.clients.producer.ProducerConfig;
  21. import org.apache.kafka.common.PartitionInfo;
  22. import org.apache.kafka.common.TopicPartition;
  23. import org.apache.kafka.common.config.SaslConfigs;
  24. public class KafkaPlainConsumerDemo {
  25.     public static void main(String args[]) {
  26.         //设置JAAS配置文件的路径。
  27.         JavaKafkaConfigurer.configureSaslPlain();
  28.         //加载kafka.properties。
  29.         Properties kafkaProperties = JavaKafkaConfigurer.getKafkaProperties();
  30.         Properties props = new Properties();
  31.         //设置接入点,请通过控制台获取对应Topic的接入点。
  32.         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));
  33.         //接入协议。
  34.         props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
  35.         //Plain方式。
  36.         props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
  37.         //两次Poll之间的最大允许间隔。
  38.         //消费者超过该值没有返回心跳,服务端判断消费者处于非存活状态,服务端将消费者从Consumer Group移除并触发Rebalance,默认30s。
  39.         props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
  40.         //每次Poll的最大数量。
  41.         //注意该值不要改得太大,如果Poll太多数据,而不能在下次Poll之前消费完,则会触发一次负载均衡,产生卡顿。
  42.         props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
  43.         //消息的反序列化方式。
  44.         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
  45.                 "org.apache.kafka.common.serialization.StringDeserializer");
  46.         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
  47.                 "org.apache.kafka.common.serialization.StringDeserializer");
  48.         //当前消费实例所属的消费组,请在控制台申请之后填写。
  49.         //属于同一个组的消费实例,会负载消费消息。
  50.         props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getProperty("group.id"));
  51.         //消费offset的位置。注意!如果auto.offset.reset=none这样设置,消费组在第一次消费的时候 就会报错找不到offset,第一次这时候就需要在catch里手动设置offset。
  52.         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
  53.         //构造消费对象,也即生成一个消费实例。
  54.         KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
  55.         //设置消费组订阅的Topic,可以订阅多个。
  56.         //如果GROUP_ID_CONFIG是一样,则订阅的Topic也建议设置成一样。
  57.         List<String> subscribedTopics = new ArrayList<String>();
  58.         //如果需要订阅多个Topic,则在这里添加进去即可。
  59.         //每个Topic需要先在控制台进行创建。
  60.         String topicStr = kafkaProperties.getProperty("topic");
  61.         String[] topics = topicStr.split(",");
  62.         for (String topic : topics) {
  63.             subscribedTopics.add(topic.trim());
  64.         }
  65.         consumer.subscribe(subscribedTopics);
  66.         //循环消费消息。
  67.         while (true) {
  68.             try {
  69.                 ConsumerRecords<String, String> records = consumer.poll(1000);
  70.                 //必须在下次Poll之前消费完这些数据, 且总耗时不得超过SESSION_TIMEOUT_MS_CONFIG。 建议开一个单独的线程池来消费消息,然后异步返回结果。
  71.                 for (ConsumerRecord<String, String> record : records) {
  72.                     System.out.println(
  73.                             String.format("Consume partition:%d offset:%d", record.partition(), record.offset()));
  74.                 }
  75.             } catch (NoOffsetForPartitionException e) {
  76.                 System.out.println(e.getMessage());
  77.                 //当auto.offset.reset设置为 none时,需要捕获异常 自己设置offset。您可以根据自身业务情况选择以下方式中的其中一种。
  78.                 //e.g 1 :指定offset, 这里需要自己维护offset,方便重试。
  79.                 Map<Integer, Long> partitionBeginOffsetMap = getPartitionOffset(consumer, topicStr, true);
  80.                 Map<Integer, Long> partitionEndOffsetMap = getPartitionOffset(consumer, topicStr, false);
  81.                 consumer.seek(new TopicPartition(topicStr, 0), 0);
  82.                 //e.g 2:从头开始消费
  83.                 consumer.seekToBeginning(Lists.newArrayList(new TopicPartition(topicStr, 0)));
  84.                 //e.g 3:指定offset为最近可用的offset。
  85.                 consumer.seekToEnd(Lists.newArrayList(new TopicPartition(topicStr, 0)));
  86.                 //e.g 4: 根据时间戳获取offset,就是根据时间戳去设置offset。例如重置到10分钟前的offset
  87.                 Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
  88.                 Long value = Instant.now().minus(300, ChronoUnit.SECONDS).toEpochMilli();
  89.                 timestampsToSearch.put(new TopicPartition(topicStr, 0), value);
  90.                 Map<TopicPartition, OffsetAndTimestamp> topicPartitionOffsetAndTimestampMap = consumer
  91.                         .offsetsForTimes(timestampsToSearch);
  92.                 for (Entry<TopicPartition, OffsetAndTimestamp> entry : topicPartitionOffsetAndTimestampMap
  93.                         .entrySet()) {
  94.                     TopicPartition topicPartition = entry.getKey();
  95.                     OffsetAndTimestamp entryValue = entry.getValue();
  96.                     consumer.seek(topicPartition, entryValue.offset()); // 指定offset, 这里需要自己维护offset,方便重试。
  97.                 }
  98.             }
  99.         }
  100.     }
  101.     /**
  102.      * 获取topic的最早、最近的offset
  103.      * @param consumer
  104.      * @param topicStr
  105.      * @param beginOrEnd true begin; false end
  106.      * @return
  107.      */
  108.     private static Map<Integer, Long> getPartitionOffset(KafkaConsumer<String, String> consumer, String topicStr,
  109.             boolean beginOrEnd) {
  110.         Collection<PartitionInfo> partitionInfos = consumer.partitionsFor(topicStr);
  111.         List<TopicPartition> tp = new ArrayList<>();
  112.         Map<Integer, Long> map = new HashMap<>();
  113.         partitionInfos.forEach(str -> tp.add(new TopicPartition(topicStr, str.partition())));
  114.         Map<TopicPartition, Long> topicPartitionLongMap;
  115.         if (beginOrEnd) {
  116.             topicPartitionLongMap = consumer.beginningOffsets(tp);
  117.         } else {
  118.             topicPartitionLongMap = consumer.endOffsets(tp);
  119.         }
  120.         topicPartitionLongMap.forEach((key, beginOffset) -> {
  121.             int partition = key.partition();
  122.             map.put(partition, beginOffset);
  123.         });
  124.         return map;
  125.     }
  126. }
复制代码
springboot项目下

  1. /**
  2.      * enable-auto-commit: false 由spring提交
  3.      * enable-auto-commit: true  由kafka提交
  4.      */
  5.     /**
  6.      * enable-auto-commit: true  相同组下  (换组 会重置数据)
  7.      * 如果这个topic某个分区有已经提交的offset,那么无论是把auto.offset.reset=earliest还是latest,都将失效,消费者会从已经提交的offset开始消费.
  8.      */
  9.     /**
  10.      * enable-auto-commit: fasle 相同组下 (换组 会重置数据)
  11.      * 如果这个topic某个分区没有提交的offset,那么把auto.offset.reset=latest,将没消费的设置为提交消费,然后从最后开始消费
  12.      * 如果这个topic某个分区没有提交的offset,那么把auto.offset.reset=earliest,从没开始消费的offset开始消费
  13.      */
复制代码
非springboot项目下

  1. enable.auto.commit false
  2.     auto.offset.reset earliest 第一次消费, 重启后消费  都会从第一条开始重新消费全部数据
  3. enable.auto.commit true
  4.     auto.offset.reset earliest 第一次消费全部数据,重启后从提交处开始消费
  5. enable.auto.commit false
  6.     auto.offset.reset latest  第一次,重启后会从最后一条开始消费,但没有提交,换成earliest 重新消费全部数据
  7. enable.auto.commit true
  8.    auto.offset.reset latest   第一次从最后一条开始消费,重启后从提交处开始消费
复制代码


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

盛世宏图

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表