深入解析Kafka主题分区与消息发送

打印 上一主题 下一主题

主题 911|帖子 911|积分 2733

在分布式消息系统中,Kafka以其高吞吐量、可扩展性和容错性等特点脱颖而出。其中,主题分区(Topic Partitions)是Kafka架构中的一个告急概念。本文将通过实例,具体解析Kafka主题分区的创建、消息的发送以及斲丧者怎样斲丧这些消息。
一、主题分区概述
Kafka为每个主题维护一个分区存储(日志)。每个分区是一个有序的、不可变的纪录序列。分区中的纪录被分配一个顺序编号,称为偏移量(offset),它在分区中唯一标识每条纪录。生产者在发送纪录(消息)到代理(Broker)时,可以指定分区ID。当消息到达Kafka服务器时,会根据主题选择分区,并将纪录放置在序列末端,分配下一个顺序偏移量。通常,斲丧者会随着读取纪录而线性地推进其偏移量。
二、创建主题
(一)使用Admin API创建主题
以下是一个使用Admin API创建主题的Java示例代码:
java复制
package com.logicbig.example;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import java.util.Collections;
import java.util.Properties;
import java.util.stream.Collectors;
public class TopicCreator {
public static void main(String[] args) throws Exception {
createTopic(“example-topic-1”, 1);
createTopic(“example-topic-2”, 2);
}
  1. private static void createTopic(String topicName, int numPartitions) throws Exception {
  2.     Properties config = new Properties();
  3.     config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, ExampleHelper.BROKERS);
  4.     AdminClient admin = AdminClient.create(config);
  5.     //检查主题是否已存在
  6.     boolean alreadyExists = admin.listTopics().names().get().stream()
  7.                                  .anyMatch(existingTopicName -> existingTopicName.equals(topicName));
  8.     if (alreadyExists) {
  9.         System.out.printf("主题已存在:%s%n", topicName);
  10.     } else {
  11.         //创建新主题
  12.         System.out.printf("创建主题:%s%n", topicName);
  13.         NewTopic newTopic = new NewTopic(topicName, numPartitions, (short) 1);
  14.         admin.createTopics(Collections.singleton(newTopic)).all().get();
  15.     }
  16.     //描述主题
  17.     System.out.println("-- 描述主题 --");
  18.     admin.describeTopics(Collections.singleton(topicName)).all().get()
  19.          .forEach((topic, desc) -> {
  20.              System.out.println("主题:" + topic);
  21.              System.out.printf("分区数:%s,分区ID:%s%n", desc.partitions().size(),
  22.                      desc.partitions()
  23.                          .stream()
  24.                          .map(p -> Integer.toString(p.partition()))
  25.                          .collect(Collectors.joining(",")));
  26.          });
  27. }
复制代码
}
(二)示例输出
复制
创建主题:example-topic-1
– 描述主题 –
主题:example-topic-1
分区数:1,分区ID:0
创建主题:example-topic-2
– 描述主题 –
主题:example-topic-2
分区数:2,分区ID:0,1
三、发送消息
(一)向单分区主题发送消息
java复制
public class PartitionExample1 {
private static int PARTITION_COUNT = 1;
private static String TOPIC_NAME = “example-topic-1”;
private static int MSG_COUNT = 4;
  1. public static void main(String[] args) throws Exception {
  2.     ExecutorService executorService = Executors.newFixedThreadPool(2);
  3.     executorService.execute(PartitionExample1::startConsumer);
  4.     executorService.execute(PartitionExample1::sendMessages);
  5.     executorService.shutdown();
  6.     executorService.awaitTermination(10, TimeUnit.MINUTES);
  7. }
  8. private static void startConsumer() {
  9.     KafkaConsumer<String, String> consumer = ExampleHelper.createConsumer(TOPIC_NAME);
  10.     int numMsgReceived = 0;
  11.     while (true) {
  12.         ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2));
  13.         for (ConsumerRecord<String, String> record : records) {
  14.             numMsgReceived++;
  15.             System.out.printf("消费:键 = %s,值 = %s,分区ID = %s,偏移量 = %s%n",
  16.                     record.key(), record.value(), record.partition(), record.offset());
  17.         }
  18.         consumer.commitSync();
  19.         if (numMsgReceived == MSG_COUNT * PARTITION_COUNT) {
  20.             break;
  21.         }
  22.     }
  23. }
  24. private static void sendMessages() {
  25.     KafkaProducer producer = ExampleHelper.createProducer();
  26.     for (int i = 0; i < MSG_COUNT; i++) {
  27.         for (int partitionId = 0; partitionId < PARTITION_COUNT; partitionId++) {
  28.             String value = "消息-" + i;
  29.             String key = Integer.toString(i);
  30.             System.out.printf("发送消息 主题:%s,键:%s,值:%s,分区ID:%s%n",
  31.                     TOPIC_NAME, key, value, partitionId);
  32.             producer.send(new ProducerRecord<>(TOPIC_NAME, partitionId, key, value));
  33.         }
  34.     }
  35. }
复制代码
}
(二)示例输出
复制
发送消息 主题:example-topic-1,键:0,值:消息-0,分区ID:0
发送消息 主题:example-topic-1,键:1,值:消息-1,分区ID:0
发送消息 主题:example-topic-1,键:2,值:消息-2,分区ID:0
发送消息 主题:example-topic-1,键:3,值:消息-3,分区ID:0
斲丧:键 = 0,值 = 消息-0,分区ID = 0,偏移量 = 0
斲丧:键 = 1,值 = 消息-1,分区ID = 0,偏移量 = 1
斲丧:键 = 2,值 = 消息-2,分区ID = 0,偏移量 = 2
斲丧:键 = 3,值 = 消息-3,分区ID = 0,偏移量 = 3
再次运行上述类:
复制
发送消息 主题:example-topic-1,键:0,值:消息-0,分区ID:0
发送消息 主题:example-topic-1,键:1,值:消息-1,分区ID:0
发送消息 主题:example-topic-1,键:2,值:消息-2,分区ID:0
发送消息 主题:example-topic-1,键:3,值:消息-3,分区ID:0
斲丧:键 = 0,值 = 消息-0,分区ID = 0,偏移量 = 4
斲丧:键 = 1,值 = 消息-1,分区ID = 0,偏移量 = 5
斲丧:键 = 2,值 = 消息-2,分区ID = 0,偏移量 = 6
斲丧:键 = 3,值 = 消息-3,分区ID = 0,偏移量 = 7
可以看到,斲丧者端的消息偏移量在线性增长。
(三)向多分区主题发送消息
java复制
public class PartitionExample2 {
private static int PARTITION_COUNT = 2;
private static String TOPIC_NAME = “example-topic-2”;
private static int MSG_COUNT = 4;
  1. public static void main(String[] args) throws Exception {
  2.     ExecutorService executorService = Executors.newFixedThreadPool(2);
  3.     executorService.execute(PartitionExample2::startConsumer);
  4.     executorService.execute(PartitionExample2::sendMessages);
  5.     executorService.shutdown();
  6.     executorService.awaitTermination(10, TimeUnit.MINUTES);
  7. }
  8. private static void startConsumer() {
  9.     KafkaConsumer<String, String> consumer = ExampleHelper.createConsumer(TOPIC_NAME);
  10.     int numMsgReceived = 0;
  11.     while (true) {
  12.         ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2));
  13.         for (ConsumerRecord<String, String> record : records) {
  14.             numMsgReceived++;
  15.             System.out.printf("消费:键 = %s,值 = %s,分区ID = %s,偏移量 = %s%n",
  16.                     record.key(), record.value(), record.partition(), record.offset());
  17.         }
  18.         consumer.commitSync();
  19.         if (numMsgReceived == MSG_COUNT * PARTITION_COUNT) {
  20.             break;
  21.         }
  22.     }
  23. }
  24. private static void sendMessages() {
  25.     KafkaProducer producer = ExampleHelper.createProducer();
  26.     for (int i = 0; i < MSG_COUNT; i++) {
  27.         for (int partitionId = 0; partitionId < PARTITION_COUNT; partitionId++) {
  28.             String value = "消息-" + i;
  29.             String key = Integer.toString(i);
  30.             System.out.printf("发送消息 主题:%s,键:%s,值:%s,分区ID:%s%n",
  31.                     TOPIC_NAME, key, value, partitionId);
  32.             producer.send(new ProducerRecord<>(TOPIC_NAME, partitionId, key, value));
  33.         }
  34.     }
  35. }
复制代码
}
(四)示例输出
发送消息 主题:example-topic

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

慢吞云雾缓吐愁

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表