Kafka生产者

打印 上一主题 下一主题

主题 693|帖子 693|积分 2079

生产者创建消息。在其他基于发布与订阅的消息系统中,生产者可能被称为发布者 或 写入者。
一般情况下,一个消息会被发布到一个特定的主题上。生产者在默认情况下把消息均衡地分布到主题的所有分区上,而并不关心特定消息会被写到哪个分区。不过,在某些情况下,生产者会把消息直接写到指定的分区。这通常是通过消息键和分区器来实现的,分区器为键生成一个散列值,并将其映射到指定的分区上。这样可以保证包含同一个键的消息会被写到同一个分区上。生产者也可以使用自定义的分区器,根据不同的业务规则将消息映射到分区。
生产者发送消息的方式

生产者发送消息主要有 2 种方式:同步发送消息、异步发送消息
同步发送消息

同步发送消息:我们调用 KafkaProducer 的 send() 方法发送消息,send() 方法会返回一个包含 RecordMetadata 的 Future 对象,然后调用 Future 的 get() 方法等待 Kafka 响应,通过 Kafka 的响应,我们就可以知道消息是否发送成功。

  • 如果服务器返回错误,Future 的 get() 方法会抛出异常。
  • 如果没有发生错误,我们会得到一个 RecordMetadata 对象,这个对象包含消息的目标主题、分区信息和消息的偏移量等信息。
我们调用 KafkaProducer 的 send() 方法发送 ProducerRecord 对象,消息先是被放进缓冲区,然后使用单独的线程将消息发送到服务器端。
异常处理
如果在发送数据之前或者在发送过程中发生了任何错误,比如 broker 返回了一个不允许重发消息的异常或者已经超过了重发的次数,那么就会抛出异常。在发送消息之前,生产者也是有可能发生异常的。这些异常有可能是 SerializationException(说明序列化消息失败)、BufferExhaustedException 或 TimeoutException(说明缓冲区已满),又或者是 InterruptException(说明发送线程被中断)。
KafkaProducer 一般会发生两类错误。

  • 其中一类是可重试错误,这类错误可以通过重发消息来解决。比如对于连接错误,可以通过再次建立连接来解决,“无主(no leader)”错误则可以通过重新为分区选举首领来解决。KafkaProducer 可以被配置成自动重试,如果在多次重试后仍无法解决问题,应用程序会收到一个重试异常。
  • 另一类错误无法通过重试解决,比如“消息太大”异常。对于这类错误,KafkaProducer 不会进行任何重试,直接抛出异常。
  1. public void send(String topic, String key, String val) {
  2.     ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, key, val);
  3.     try {
  4.         ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(producerRecord);
  5.         SendResult<String, String> sendResult = future.get();
  6.     } catch (Exception e) {
  7.         e.printStackTrace();
  8.     }
  9. }
复制代码
异步发送消息

异步发送消息:我们调用 KafkaProducer 的 send() 方法,并指定一个回调方法,在服务器返回响应时调用该方法。
大多数时候,我们并不需要等待响应。不过在遇到消息发送失败时,我们需要抛出异常、记录错误日志,或者把消息写入“错误消息”文件以便日后分析。为了在异步发送消息的同时能够对异常情况进行处理,生产者提供了回调支持。
为了使用回调,需要一个实现了 org.apache.kafka.clients.producer.Callback 接口的类,这个接口只有一个 onCompletion() 方法。如果 Kafka 返回一个错误,onCompletion() 方法会抛出一个非空异常。通过 onCompletion() 方法抛出的异常,我们可以对发送失败的消息进行处理。一般情况下,因为生产者会自动进行重试,所以就没必要在代码逻辑里处理那些可重试的错误。你只需要处理那些不可重试的错误或重试次数超出上限的情况。
  1. private class DemoProducerCallback implements Callback {
  2.     @Override
  3.     public void onCompletion(RecordMetadata recordMetadata, Exception e) {
  4.         if (e != null) {
  5.             e.printStackTrace();
  6.         }
  7.     }
  8. }
  9. ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Biomedical Materials", "USA");
  10. producer.send(record, new DemoProducerCallback());
复制代码
分区器

介绍分区

ProducerRecord 对象包含目标主题、消息键和值(消息)。

  • 如果消息键为 null,并且使用了默认的 DefaultPartitioner 分区器,那么分区器使用粘性分区策略(UniformSticky),会随机选择一个分区,并尽可能一直使用该分区,等到该分区的 batch 已满或者已完成,Kafka 再随机一个分区进行使用(保证和上一次的分区不同)。
  • 如果消息键不为 null,并且使用了默认的 DefaultPartitioner 分区器,那么分区器会对消息键进行散列(使用 Kafka 自己的散列算法,即使升级 Java 版本,散列值也不会发生变化),然后根据散列值把消息映射到特定的分区上(散列值 与 主题的分区数进行取余得到 partition 值)。
这里的关键之处在于,同一个键总是被映射到同一个分区上,所以在进行映射时,我们会使用主题的所有分区,而不仅仅是可用的分区。这也意味着,如果写入数据的分区是不可用的,那么就会发生错误。
只有在不改变主题分区数量的情况下,键与分区之间的映射才能保持不变。一旦主题增加了新的分区,那么键与分区之间的映射关系就改变了。如果要使用键来映射分区,那么最好在创建主题的时候就把分区规划好,而且永远不要增加新分区。
自定义分区策略

生产者可以使用自定义的分区器,根据不同的业务规则将消息映射到分区。
通过分区器实现自定义分区策略的步骤:

  • 定义一个类,该类实现 Partitioner 接口(分区器)
  • 配置生产者(KafkaProducer),让生产者发送消息时使用自定义的分区器:properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class.getName());
  1. public class MyPartitioner implements Partitioner {
  2.     /**
  3.      * 返回信息对应的分区
  4.      *
  5.      * @param topic      主题
  6.      * @param key        消息的 key
  7.      * @param keyBytes   消息的 key 序列化后的字节数组
  8.      * @param value      消息的 value
  9.      * @param valueBytes 消息的 value 序列化后的字节数组
  10.      * @param cluster    集群元数据可以查看分区信息
  11.      * @return
  12.      */
  13.     @Override
  14.     public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
  15.         List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
  16.         if ((keyBytes == null) || (!(key instanceof String))) {
  17.             throw new InvalidRecordException("We expect all messages to have String type as key");
  18.         }
  19.         // 实现自己的分区策略
  20.         // 返回数据写入的分区号
  21.         return 0;
  22.     }
  23.     // 关闭资源
  24.     @Override
  25.     public void close() {
  26.     }
  27.     // 配置方法
  28.     @Override
  29.     public void configure(Map<String, ?> configs) {
  30.     }
  31. }
复制代码
参考资料

《Kafka 权威指南》第 3 章:Kafka 生产者——向 Kafka 写入数据

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

汕尾海湾

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表