MQ:kafka-消费者的三种语义

打印 上一主题 下一主题

主题 783|帖子 783|积分 2349

前言

本文重要是以kafka 09的client为例子,详解kafka client的使用,包括kafka消费者的三种消费语义at-most-once, at-least-once, 和 exactly-once message ,生产者的使用等。
(一) 创建topic

  1. bin/kafka-topics --zookeeper localhost:2181 --create --topic normal-topic --partitions 2 --replication-factor 1
复制代码
(二) 生产者

  1. public class ProducerExample {
  2.     public static void main(String[] str) throws InterruptedException, IOException {
  3.             System.out.println("Starting ProducerExample ...");
  4.             sendMessages();
  5.     }
  6.     private static void sendMessages() throws InterruptedException, IOException {
  7.             Producer<String, String> producer = createProducer();
  8.             sendMessages(producer);
  9.             // Allow the producer to complete sending of the messages before program exit.
  10.             Thread.sleep(20);
  11.     }
  12.     private static Producer<String, String> createProducer() {
  13.         Properties props = new Properties();
  14.         props.put("bootstrap.servers", "localhost:9092");
  15.         props.put("acks", "all");
  16.         props.put("retries", 0);
  17.         // Controls how much bytes sender would wait to batch up before publishing to Kafka.
  18.         props.put("batch.size", 10);
  19.         props.put("linger.ms", 1);
  20.         props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  21.         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  22.         return new KafkaProducer(props);
  23.     }
  24.     private static void sendMessages(Producer<String, String> producer) {
  25.         String topic = "normal-topic";
  26.         int partition = 0;
  27.         long record = 1;
  28.         for (int i = 1; i <= 10; i++) {
  29.             producer.send(
  30.                 new ProducerRecord<String, String>(topic, partition,                                 Long.toString(record),Long.toString(record++)));
  31.         }
  32.     }
  33. }
复制代码
(三)消费者

消费者注册到kafka有多种方式:
subscribe:这种方式在新增topic或者partition或者消费者增加或者消费者减少的时候,会举行消费者组内消费者的再平衡。
assign:这种方式注册的消费者不会举行rebalance。
上面两种方式都是可以实现,三种消费语义的。具体API的使用请看下文。
1. At-most-once Kafka Consumer

做多一次消费语义是kafka消费者的默认实现。配置这种消费者最简单的方式是
1). enable.auto.commit设置为true。
2). auto.commit.interval.ms设置为一个较低的时间范围。
3). consumer.commitSync()不要调用该方法。
由于上面的配置,就可以使得kafka有线程负责按照指定隔断提交offset。
但是这种方式会使得kafka消费者有两种消费语义:
a.最多一次语义->at-most-once
消费者的offset已经提交,但是消息还在处理,这个时候挂了,再重启的时候会从上次提交的offset处消费,导致上次在处理的消息部分丢失。
b. 最少一次消费语义->at-least-once
消费者已经处理完了,但是offset还没提交,那么这个时候消费者挂了,就会导致消费者重复消费消息处理。但是由于auto.commit.interval.ms设置为一个较低的时间范围,会降低这种情况出现的概率。
代码如下:
  1. public class AtMostOnceConsumer {
  2.         public static void main(String[] str) throws InterruptedException {
  3.             System.out.println("Starting  AtMostOnceConsumer ...");
  4.             execute();
  5.         }
  6.         private static void execute() throws InterruptedException {
  7.                 KafkaConsumer<String, String> consumer = createConsumer();
  8.                 // Subscribe to all partition in that topic. 'assign' could be used here
  9.                 // instead of 'subscribe' to subscribe to specific partition.
  10.                 consumer.subscribe(Arrays.asList("normal-topic"));
  11.                 processRecords(consumer);
  12.         }
  13.         private static KafkaConsumer<String, String> createConsumer() {
  14.                 Properties props = new Properties();
  15.                 props.put("bootstrap.servers", "localhost:9092");
  16.                 String consumeGroup = "cg1";
  17.                 props.put("group.id", consumeGroup);
  18.                 // Set this property, if auto commit should happen.
  19.                 props.put("enable.auto.commit", "true");
  20.                 // Auto commit interval, kafka would commit offset at this interval.
  21.                 props.put("auto.commit.interval.ms", "101");
  22.                 // This is how to control number of records being read in each poll
  23.                 props.put("max.partition.fetch.bytes", "135");
  24.                 // Set this if you want to always read from beginning.
  25.                 // props.put("auto.offset.reset", "earliest");
  26.                 props.put("heartbeat.interval.ms", "3000");
  27.                 props.put("session.timeout.ms", "6001");
  28.                 props.put("key.deserializer",
  29.                         "org.apache.kafka.common.serialization.StringDeserializer");
  30.                 props.put("value.deserializer",
  31.                         "org.apache.kafka.common.serialization.StringDeserializer");
  32.                 return new KafkaConsumer<String, String>(props);
  33.         }
  34.         private static void processRecords(KafkaConsumer<String, String> consumer)  {
  35.                 while (true) {
  36.                         ConsumerRecords<String, String> records = consumer.poll(100);
  37.                         long lastOffset = 0;
  38.                         for (ConsumerRecord<String, String> record : records) {
  39.                                 System.out.printf("\n\roffset = %d, key = %s, value = %s", record.offset(),                                             record.key(), record.value());
  40.                                 lastOffset = record.offset();
  41.                          }
  42.                 System.out.println("lastOffset read: " + lastOffset);
  43.                 process();
  44.                 }
  45.         }
  46.         private static void process() throws InterruptedException {
  47.                 // create some delay to simulate processing of the message.
  48.                 Thread.sleep(20);
  49.         }
  50. }
复制代码
2. At-least-once kafka consumer

实现最少一次消费语义的消费者也很简单。
1). 设置enable.auto.commit为false
2). 消息处理完之后手动调用consumer.commitSync()
这种方式就是要手动在处理完该次poll得到消息之后,调用offset异步提交函数consumer.commitSync()。建议是消费者内部实现密等,来避免消费者重复处理消息进而得到重复结果。最多一次发生的场景是消费者的消息处理完并输出到结果库(也可能是部分处理完),但是offset还没提交,这个时候消费者挂掉了,再重启的时候会重新消费并处理消息。
代码如下:
  1. public class AtLeastOnceConsumer {
  2.     public static void main(String[] str) throws InterruptedException {
  3.             System.out.println("Starting AutoOffsetGuranteedAtLeastOnceConsumer ...");
  4.             execute();
  5.      }
  6.     private static void execute() throws InterruptedException {
  7.             KafkaConsumer<String, String> consumer = createConsumer();
  8.             // Subscribe to all partition in that topic. 'assign' could be used here
  9.             // instead of 'subscribe' to subscribe to specific partition.
  10.             consumer.subscribe(Arrays.asList("normal-topic"));
  11.             processRecords(consumer);
  12.      }
  13.      private static KafkaConsumer<String, String> createConsumer() {
  14.             Properties props = new Properties();
  15.             props.put("bootstrap.servers", "localhost:9092");
  16.             String consumeGroup = "cg1";
  17.             props.put("group.id", consumeGroup);
  18.             // Set this property, if auto commit should happen.
  19.             props.put("enable.auto.commit", "true");
  20.             // Make Auto commit interval to a big number so that auto commit does not happen,
  21.             // we are going to control the offset commit via consumer.commitSync(); after processing             // message.
  22.             props.put("auto.commit.interval.ms", "999999999999");
  23.             // This is how to control number of messages being read in each poll
  24.             props.put("max.partition.fetch.bytes", "135");
  25.             props.put("heartbeat.interval.ms", "3000");
  26.             props.put("session.timeout.ms", "6001");
  27.             props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  28.             props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
  29.             return new KafkaConsumer<String, String>(props);
  30.     }
  31.      private static void processRecords(KafkaConsumer<String, String> consumer) throws {
  32.             while (true) {
  33.                     ConsumerRecords<String, String> records = consumer.poll(100);
  34.                     long lastOffset = 0;
  35.                     for (ConsumerRecord<String, String> record : records) {
  36.                         System.out.printf("\n\roffset = %d, key = %s, value = %s", record.offset(),                                         record.key(), record.value());
  37.                         lastOffset = record.offset();
  38.                     }
  39.                     System.out.println("lastOffset read: " + lastOffset);
  40.                     process();
  41.                     // Below call is important to control the offset commit. Do this call after you
  42.                     // finish processing the business process.
  43.                     consumer.commitSync();
  44.             }
  45.     }
  46.     private static void process() throws InterruptedException {
  47.         // create some delay to simulate processing of the record.
  48.         Thread.sleep(20);
  49.     }
  50. }
复制代码
3. 使用subscribe实现Exactly-once

使用subscribe实现Exactly-once 很简单,具体思路如下:
1). 将enable.auto.commit设置为false。
2). 不调用consumer.commitSync()。
3). 使用subcribe定于topic。
4). 实现一个ConsumerRebalanceListener,在该listener内部实行
consumer.seek(topicPartition,offset),从指定的topic/partition的offset处启动。
5). 在处理消息的时候,要同时控制保存住每个消息的offset。以原子事件的方式保存offset和处理的消息结果。传统数据库实现原子事件比较简单。但对于非传统数据库,好比hdfs或者nosql,为了实现这个目标,只能将offset与消息保存在同一行。
6). 实现密等,作为保护层。
代码如下:
  1. public class ExactlyOnceDynamicConsumer {
  2.        private static OffsetManager offsetManager = new OffsetManager("storage2");
  3.         public static void main(String[] str) throws InterruptedException {
  4.                 System.out.println("Starting ExactlyOnceDynamicConsumer ...");
  5.                 readMessages();
  6.         }
  7.         private static void readMessages() throws InterruptedException {
  8.                 KafkaConsumer<String, String> consumer = createConsumer();
  9.                 // Manually controlling offset but register consumer to topics to get dynamically
  10.                 //  assigned partitions. Inside MyConsumerRebalancerListener use
  11.                 // consumer.seek(topicPartition,offset) to control offset which messages to be read.
  12.                 consumer.subscribe(Arrays.asList("normal-topic"),
  13.                                 new MyConsumerRebalancerListener(consumer));
  14.                 processRecords(consumer);
  15.         }
  16.         private static KafkaConsumer<String, String> createConsumer() {
  17.                 Properties props = new Properties();
  18.                 props.put("bootstrap.servers", "localhost:9092");
  19.                 String consumeGroup = "cg3";
  20.                 props.put("group.id", consumeGroup);
  21.                 // Below is a key setting to turn off the auto commit.
  22.                 props.put("enable.auto.commit", "false");
  23.                 props.put("heartbeat.interval.ms", "2000");
  24.                 props.put("session.timeout.ms", "6001");
  25.                 // Control maximum data on each poll, make sure this value is bigger than the maximum                   // single message size
  26.                 props.put("max.partition.fetch.bytes", "140");
  27.                 props.put("key.deserializer",                                 "org.apache.kafka.common.serialization.StringDeserializer");
  28.                 props.put("value.deserializer",                         "org.apache.kafka.common.serialization.StringDeserializer");
  29.                 return new KafkaConsumer<String, String>(props);
  30.         }
  31.         private static void processRecords(KafkaConsumer<String, String> consumer) {
  32.             while (true) {
  33.                     ConsumerRecords<String, String> records = consumer.poll(100);
  34.                     for (ConsumerRecord<String, String> record : records) {
  35.                             System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(),                                     record.key(), record.value());
  36.                             // Save processed offset in external storage.
  37.                             offsetManager.saveOffsetInExternalStore(record.topic(), record.partition(),                                             record.offset());
  38.                     }
  39.                }
  40.         }
  41. }
  42. public class MyConsumerRebalancerListener implements                                 org.apache.kafka.clients.consumer.ConsumerRebalanceListener {
  43.         private OffsetManager offsetManager = new OffsetManager("storage2");
  44.         private Consumer<String, String> consumer;
  45.         public MyConsumerRebalancerListener(Consumer<String, String> consumer) {
  46.                 this.consumer = consumer;
  47.         }
  48.         public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
  49.                 for (TopicPartition partition : partitions) {
  50.                     offsetManager.saveOffsetInExternalStore(partition.topic(), partition.partition(),                     consumer.position(partition));
  51.                 }
  52.         }
  53.         public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
  54.                 for (TopicPartition partition : partitions) {
  55.                         consumer.seek(partition,                             offsetManager.readOffsetFromExternalStore(partition.topic(),                             partition.partition()));
  56.                 }
  57.         }
  58. }
  59. /**
  60. * The partition offset are stored in an external storage. In this case in a local file system where
  61. * program runs.
  62. */
  63. public class OffsetManager {
  64.         private String storagePrefix;
  65.         public OffsetManager(String storagePrefix) {
  66.                 this.storagePrefix = storagePrefix;
  67.         }
  68.     /**
  69.         * Overwrite the offset for the topic in an external storage.
  70.         *
  71.         * @param topic - Topic name.
  72.         * @param partition - Partition of the topic.
  73.         * @param offset - offset to be stored.
  74.         */
  75.         void saveOffsetInExternalStore(String topic, int partition, long offset) {
  76.             try {
  77.                 FileWriter writer = new FileWriter(storageName(topic, partition), false);
  78.                 BufferedWriter bufferedWriter = new BufferedWriter(writer);
  79.                 bufferedWriter.write(offset + "");
  80.                 bufferedWriter.flush();
  81.                 bufferedWriter.close();
  82.             } catch (Exception e) {
  83.                     e.printStackTrace();
  84.                     throw new RuntimeException(e);
  85.             }
  86.         }
  87.         /**
  88.             * @return he last offset + 1 for the provided topic and partition.
  89.         */
  90.         long readOffsetFromExternalStore(String topic, int partition) {
  91.                 try {
  92.                         Stream<String> stream = Files.lines(Paths.get(storageName(topic, partition)));
  93.                         return Long.parseLong(stream.collect(Collectors.toList()).get(0)) + 1;
  94.                 } catch (Exception e) {
  95.                     e.printStackTrace();
  96.                 }
  97.                 return 0;
  98.         }
  99.         private String storageName(String topic, int partition) {
  100.             return storagePrefix + "-" + topic + "-" + partition;
  101.         }
  102. }
复制代码
4. 使用assign实现Exactly-once

使用assign实现Exactly-once 也很简单,具体思路如下:
1). 将enable.auto.commit设置为false。
2). 不调用consumer.commitSync()。
3). 调用assign注册kafka消费者到kafka
4). 初次启动的时候,调用consumer.seek(topicPartition,offset)来指定offset。
5). 在处理消息的时候,要同时控制保存住每个消息的offset。以原子事件的方式保存offset和处理的消息结果。传统数据库实现原子事件比较简单。但对于非传统数据库,好比hdfs或者nosql,为了实现这个目标,只能将offset与消息保存在同一行。
6). 实现密等,作为保护层。
代码如下:
  1. public class ExactlyOnceStaticConsumer {
  2.         private static OffsetManager offsetManager = new OffsetManager("storage1");
  3.         public static void main(String[] str) throws InterruptedException, IOException {
  4.                 System.out.println("Starting ExactlyOnceStaticConsumer ...");
  5.                 readMessages();
  6.         }
  7.         private static void readMessages() throws InterruptedException, IOException {
  8.                 KafkaConsumer<String, String> consumer = createConsumer();
  9.                 String topic = "normal-topic";
  10.                 int partition = 1;
  11.                 TopicPartition topicPartition =
  12.                                 registerConsumerToSpecificPartition(consumer, topic, partition);
  13.                 // Read the offset for the topic and partition from external storage.
  14.                 long offset = offsetManager.readOffsetFromExternalStore(topic, partition);
  15.                 // Use seek and go to exact offset for that topic and partition.
  16.                 consumer.seek(topicPartition, offset);
  17.                 processRecords(consumer);
  18.         }
  19.         private static KafkaConsumer<String, String> createConsumer() {
  20.                 Properties props = new Properties();
  21.                 props.put("bootstrap.servers", "localhost:9092");
  22.                 String consumeGroup = "cg2";
  23.                 props.put("group.id", consumeGroup);
  24.                 // Below is a key setting to turn off the auto commit.
  25.                 props.put("enable.auto.commit", "false");
  26.                 props.put("heartbeat.interval.ms", "2000");
  27.                 props.put("session.timeout.ms", "6001");
  28.                 // control maximum data on each poll, make sure this value is bigger than the maximum                 // single message size
  29.                 props.put("max.partition.fetch.bytes", "140");
  30.                 props.put("key.deserializer",                                     "org.apache.kafka.common.serialization.StringDeserializer");
  31.                 props.put("value.deserializer",                                     "org.apache.kafka.common.serialization.StringDeserializer");
  32.                 return new KafkaConsumer<String, String>(props);
  33.         }
  34.         /**
  35.             * Manually listens for specific topic partition. But, if you are looking for example of how to                * dynamically listens to partition and want to manually control offset then see
  36.             * ExactlyOnceDynamicConsumer.java
  37.             */
  38.          private static TopicPartition registerConsumerToSpecificPartition(
  39.                     KafkaConsumer<String, String> consumer, String topic, int partition) {
  40.                     TopicPartition topicPartition = new TopicPartition(topic, partition);
  41.                     List<TopicPartition> partitions = Arrays.asList(topicPartition);
  42.                     consumer.assign(partitions);
  43.                     return topicPartition;
  44.           }
  45.             /**
  46.                 * Process data and store offset in external store. Best practice is to do these operations
  47.                 * atomically.
  48.                 */
  49.             private static void processRecords(KafkaConsumer<String, String> consumer) throws {
  50.                     while (true) {
  51.                            ConsumerRecords<String, String> records = consumer.poll(100);
  52.                             for (ConsumerRecord<String, String> record : records) {
  53.                                     System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(),                                                 record.key(), record.value());
  54.                                     offsetManager.saveOffsetInExternalStore(record.topic(), record.partition(),                                                 record.offset());
  55.                             }
  56.                     }
  57.             }
  58. }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

祗疼妳一个

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表