Kafka 发送消息的分区计谋有哪些?思维导图 代码示例(java 架构) ...

锦通  金牌会员 | 2025-1-22 03:41:19 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 877|帖子 877|积分 2631

在 Apache Kafka 中,生产者发送消息时可以选择不同的分区计谋来决定消息应该被发送到主题的哪个分区。公道的分区计谋对于确保数据分布均匀、进步吞吐量和实现特定的业务逻辑至关重要。Kafka 提供了默认的分区计谋,并答应开发者自界说分区器以满足特定需求。
Kafka 分区计谋思维导图

  1. Kafka Partitioning Strategies
  2. ├── 默认分区策略 (Default Partitioning Strategy)
  3. │   ├── 如果指定了分区,则使用指定的分区
  4. │   ├── 如果指定了键(Key),则使用哈希函数计算分区
  5. │   └── 如果既没有指定分区也没有键,则采用轮询或随机分配
  6. ├── 自定义分区策略 (Custom Partitioning Strategy)
  7. │   ├── 实现 `org.apache.kafka.clients.producer.Partitioner` 接口
  8. │   ├── 重写 `partition()` 方法定义分区逻辑
  9. │   └── 在配置中指定自定义分区器类名
  10. ├── 常见分区策略示例
  11. │   ├── 按键哈希分区 (Hash-based Partitioning)
  12. │   │   └── 使用键的哈希值确定分区
  13. │   ├── 轮询分区 (Round-robin Partitioning)
  14. │   │   └── 循环选择每个分区
  15. │   ├── 随机分区 (Random Partitioning)
  16. │   │   └── 随机选择一个分区
  17. │   ├── 时间戳分区 (Timestamp-based Partitioning)
  18. │   │   └── 根据消息的时间戳分配分区
  19. │   └── 地理位置分区 (Geolocation-based Partitioning)
  20. │       └── 根据消息中的地理位置信息分配分区
  21. └── 配置与优化
  22.     ├── partitioner.class - 设置自定义分区器类
  23.     ├── num.partitions - 主题创建时的分区数
  24.     └── default.topic.config - 设置主题级别的默认配置
复制代码
Java代码示例:自界说分区计谋

设置依赖(Maven)

首先,在pom.xml中添加Kafka客户端库依赖:
  1. <dependencies>
  2.     <!-- Kafka Client -->
  3.     <dependency>
  4.         <groupId>org.apache.kafka</groupId>
  5.         <artifactId>kafka-clients</artifactId>
  6.         <version>3.0.0</version>
  7.     </dependency>
  8. </dependencies>
复制代码
创建自界说分区器

  1. import org.apache.kafka.clients.producer.Partitioner;
  2. import org.apache.kafka.common.Cluster;
  3. import java.util.Map;
  4. public class CustomPartitioner implements Partitioner {
  5.     @Override
  6.     public void configure(Map<String, ?> configs) {
  7.         // 可选配置初始化
  8.     }
  9.     @Override
  10.     public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
  11.         List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
  12.         int numPartitions = partitions.size();
  13.         // Example: Simple hash-based partitioning with a custom logic
  14.         if (keyBytes == null) {
  15.             return ThreadLocalRandom.current().nextInt(numPartitions); // Random partition if no key is provided
  16.         } else {
  17.             // Use the key's hash code to determine the partition
  18.             return Math.abs(key.hashCode()) % numPartitions;
  19.         }
  20.     }
  21.     @Override
  22.     public void close() {
  23.         // Cleanup resources if necessary
  24.     }
  25. }
复制代码
使用自界说分区器发送消息

  1. import org.apache.kafka.clients.producer.KafkaProducer;
  2. import org.apache.kafka.clients.producer.ProducerRecord;
  3. import org.apache.kafka.clients.producer.ProducerConfig;
  4. import org.apache.kafka.common.serialization.StringSerializer;
  5. import java.util.Properties;
  6. public class ProducerWithCustomPartitioner {
  7.     private static final String TOPIC = "test-topic";
  8.     public static void main(String[] args) {
  9.         // Producer configuration settings
  10.         Properties props = new Properties();
  11.         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  12.         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  13.         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  14.         props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getName()); // Specify custom partitioner
  15.         try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
  16.             // Send messages using the custom partitioner
  17.             for (int i = 0; i < 10; i++) {
  18.                 producer.send(new ProducerRecord<>(TOPIC, "key-" + i, "message-" + i));
  19.             }
  20.         }
  21.     }
  22. }
复制代码
分区计谋详解



  • 默认分区计谋 (Default Partitioning Strategy):

    • 假如消息明确指定了要发送的分区,则直接使用该分区。
    • 假如提供了键(Key),则通过键的哈希值来决定分区。如许可以保证相同键的消息总是被发送到同一个分区,有助于保持消息次序。
    • 假如既没有指定分区也没有提供键,则会采用轮询(round-robin)或者随机分配的方式将消息分发给各个分区。

  • 自界说分区计谋 (Custom Partitioning Strategy):

    • 开发者可以通过实现 Partitioner 接口来自界说分区逻辑。这使得可以根据应用程序的详细需求机动地控制消息的分区方式。
    • partition() 方法是核心方法,它接收消息的主题名称、键、值以及当前集群状态作为参数,并返回目的分区编号。
    • 在 Kafka 生产者的配置中,通过设置 partitioner.class 属性为自界说分区器的全限定类名来启用自界说分区逻辑。

  • 常见分区计谋示例

    • 按键哈希分区 (Hash-based Partitioning):这是最常用的分区计谋之一,使用键的哈希值来确保相同键的消息进入同一分区。
    • 轮询分区 (Round-robin Partitioning):依次轮流选择每个分区,实用于不需要基于键进行分区的情况。
    • 随机分区 (Random Partitioning):随机选择一个分区,通常用于测试或不关心消息次序的场景。
    • 时间戳分区 (Timestamp-based Partitioning):根据消息的时间戳来分配分区,可用于按时间段构造数据。
    • 地理位置分区 (Geolocation-based Partitioning):依据消息中的地理位置信息进行分区,适当地理分布的应用。

  • 配置与优化

    • partitioner.class:用于指定自界说分区器的类名。
    • num.partitions:界说主题创建时的分区数量,影响数据分布和并行度。
    • default.topic.config:可以在创建主题时设置一些默认配置选项,如压缩范例等。

通过上述机制,Kafka 答应用户根据实际需求选择符合的分区计谋,从而优化系统的性能和可靠性。在 Apache Kafka 中,生产者发送消息时可以选择不同的分区计谋来决定消息应该被发送到主题的哪个分区。公道的分区计谋对于确保数据分布均匀、进步吞吐量和实现特定的业务逻辑至关重要。Kafka 提供了默认的分区计谋,并答应开发者自界说分区器以满足特定需求。
Kafka 分区计谋思维导图

  1. Kafka Partitioning Strategies
  2. ├── 默认分区策略 (Default Partitioning Strategy)
  3. │   ├── 如果指定了分区,则使用指定的分区
  4. │   ├── 如果指定了键(Key),则使用哈希函数计算分区
  5. │   └── 如果既没有指定分区也没有键,则采用轮询或随机分配
  6. ├── 自定义分区策略 (Custom Partitioning Strategy)
  7. │   ├── 实现 `org.apache.kafka.clients.producer.Partitioner` 接口
  8. │   ├── 重写 `partition()` 方法定义分区逻辑
  9. │   └── 在配置中指定自定义分区器类名
  10. ├── 常见分区策略示例
  11. │   ├── 按键哈希分区 (Hash-based Partitioning)
  12. │   │   └── 使用键的哈希值确定分区
  13. │   ├── 轮询分区 (Round-robin Partitioning)
  14. │   │   └── 循环选择每个分区
  15. │   ├── 随机分区 (Random Partitioning)
  16. │   │   └── 随机选择一个分区
  17. │   ├── 时间戳分区 (Timestamp-based Partitioning)
  18. │   │   └── 根据消息的时间戳分配分区
  19. │   └── 地理位置分区 (Geolocation-based Partitioning)
  20. │       └── 根据消息中的地理位置信息分配分区
  21. └── 配置与优化
  22.     ├── partitioner.class - 设置自定义分区器类
  23.     ├── num.partitions - 主题创建时的分区数
  24.     └── default.topic.config - 设置主题级别的默认配置
复制代码
Java代码示例:自界说分区计谋

设置依赖(Maven)

首先,在pom.xml中添加Kafka客户端库依赖:
  1. <dependencies>
  2.     <!-- Kafka Client -->
  3.     <dependency>
  4.         <groupId>org.apache.kafka</groupId>
  5.         <artifactId>kafka-clients</artifactId>
  6.         <version>3.0.0</version>
  7.     </dependency>
  8. </dependencies>
复制代码
创建自界说分区器

  1. import org.apache.kafka.clients.producer.Partitioner;
  2. import org.apache.kafka.common.Cluster;
  3. import java.util.Map;
  4. public class CustomPartitioner implements Partitioner {
  5.     @Override
  6.     public void configure(Map<String, ?> configs) {
  7.         // 可选配置初始化
  8.     }
  9.     @Override
  10.     public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
  11.         List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
  12.         int numPartitions = partitions.size();
  13.         // Example: Simple hash-based partitioning with a custom logic
  14.         if (keyBytes == null) {
  15.             return ThreadLocalRandom.current().nextInt(numPartitions); // Random partition if no key is provided
  16.         } else {
  17.             // Use the key's hash code to determine the partition
  18.             return Math.abs(key.hashCode()) % numPartitions;
  19.         }
  20.     }
  21.     @Override
  22.     public void close() {
  23.         // Cleanup resources if necessary
  24.     }
  25. }
复制代码
使用自界说分区器发送消息

  1. import org.apache.kafka.clients.producer.KafkaProducer;
  2. import org.apache.kafka.clients.producer.ProducerRecord;
  3. import org.apache.kafka.clients.producer.ProducerConfig;
  4. import org.apache.kafka.common.serialization.StringSerializer;
  5. import java.util.Properties;
  6. public class ProducerWithCustomPartitioner {
  7.     private static final String TOPIC = "test-topic";
  8.     public static void main(String[] args) {
  9.         // Producer configuration settings
  10.         Properties props = new Properties();
  11.         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  12.         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  13.         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  14.         props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getName()); // Specify custom partitioner
  15.         try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
  16.             // Send messages using the custom partitioner
  17.             for (int i = 0; i < 10; i++) {
  18.                 producer.send(new ProducerRecord<>(TOPIC, "key-" + i, "message-" + i));
  19.             }
  20.         }
  21.     }
  22. }
复制代码
分区计谋详解



  • 默认分区计谋 (Default Partitioning Strategy):

    • 假如消息明确指定了要发送的分区,则直接使用该分区。
    • 假如提供了键(Key),则通过键的哈希值来决定分区。如许可以保证相同键的消息总是被发送到同一个分区,有助于保持消息次序。
    • 假如既没有指定分区也没有提供键,则会采用轮询(round-robin)或者随机分配的方式将消息分发给各个分区。

  • 自界说分区计谋 (Custom Partitioning Strategy):

    • 开发者可以通过实现 Partitioner 接口来自界说分区逻辑。这使得可以根据应用程序的详细需求机动地控制消息的分区方式。
    • partition() 方法是核心方法,它接收消息的主题名称、键、值以及当前集群状态作为参数,并返回目的分区编号。
    • 在 Kafka 生产者的配置中,通过设置 partitioner.class 属性为自界说分区器的全限定类名来启用自界说分区逻辑。

  • 常见分区计谋示例

    • 按键哈希分区 (Hash-based Partitioning):这是最常用的分区计谋之一,使用键的哈希值来确保相同键的消息进入同一分区。
    • 轮询分区 (Round-robin Partitioning):依次轮流选择每个分区,实用于不需要基于键进行分区的情况。
    • 随机分区 (Random Partitioning):随机选择一个分区,通常用于测试或不关心消息次序的场景。
    • 时间戳分区 (Timestamp-based Partitioning):根据消息的时间戳来分配分区,可用于按时间段构造数据。
    • 地理位置分区 (Geolocation-based Partitioning):依据消息中的地理位置信息进行分区,适当地理分布的应用。

  • 配置与优化

    • partitioner.class:用于指定自界说分区器的类名。
    • num.partitions:界说主题创建时的分区数量,影响数据分布和并行度。
    • default.topic.config:可以在创建主题时设置一些默认配置选项,如压缩范例等。

通过上述机制,Kafka 答应用户根据实际需求选择符合的分区计谋,从而优化系统的性能和可靠性。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

锦通

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表