ToB企服应用市场:ToB评测及商务社交产业平台
标题:
深入解析Kafka主题分区与消息发送
[打印本页]
作者:
慢吞云雾缓吐愁
时间:
2025-2-18 17:54
标题:
深入解析Kafka主题分区与消息发送
在分布式消息系统中,Kafka以其高吞吐量、可扩展性和容错性等特点脱颖而出。其中,主题分区(Topic Partitions)是Kafka架构中的一个告急概念。本文将通过实例,具体解析Kafka主题分区的创建、消息的发送以及斲丧者怎样斲丧这些消息。
一、主题分区概述
Kafka为每个主题维护一个分区存储(日志)。每个分区是一个有序的、不可变的纪录序列。分区中的纪录被分配一个顺序编号,称为偏移量(offset),它在分区中唯一标识每条纪录。生产者在发送纪录(消息)到代理(Broker)时,可以指定分区ID。当消息到达Kafka服务器时,会根据主题选择分区,并将纪录放置在序列末端,分配下一个顺序偏移量。通常,斲丧者会随着读取纪录而线性地推进其偏移量。
二、创建主题
(一)使用Admin API创建主题
以下是一个使用Admin API创建主题的Java示例代码:
java复制
package com.logicbig.example;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import java.util.Collections;
import java.util.Properties;
import java.util.stream.Collectors;
public class TopicCreator {
public static void main(String[] args) throws Exception {
createTopic(“example-topic-1”, 1);
createTopic(“example-topic-2”, 2);
}
private static void createTopic(String topicName, int numPartitions) throws Exception {
Properties config = new Properties();
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, ExampleHelper.BROKERS);
AdminClient admin = AdminClient.create(config);
//检查主题是否已存在
boolean alreadyExists = admin.listTopics().names().get().stream()
.anyMatch(existingTopicName -> existingTopicName.equals(topicName));
if (alreadyExists) {
System.out.printf("主题已存在:%s%n", topicName);
} else {
//创建新主题
System.out.printf("创建主题:%s%n", topicName);
NewTopic newTopic = new NewTopic(topicName, numPartitions, (short) 1);
admin.createTopics(Collections.singleton(newTopic)).all().get();
}
//描述主题
System.out.println("-- 描述主题 --");
admin.describeTopics(Collections.singleton(topicName)).all().get()
.forEach((topic, desc) -> {
System.out.println("主题:" + topic);
System.out.printf("分区数:%s,分区ID:%s%n", desc.partitions().size(),
desc.partitions()
.stream()
.map(p -> Integer.toString(p.partition()))
.collect(Collectors.joining(",")));
});
}
复制代码
}
(二)示例输出
复制
创建主题:example-topic-1
– 描述主题 –
主题:example-topic-1
分区数:1,分区ID:0
创建主题:example-topic-2
– 描述主题 –
主题:example-topic-2
分区数:2,分区ID:0,1
三、发送消息
(一)向单分区主题发送消息
java复制
public class PartitionExample1 {
private static int PARTITION_COUNT = 1;
private static String TOPIC_NAME = “example-topic-1”;
private static int MSG_COUNT = 4;
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.execute(PartitionExample1::startConsumer);
executorService.execute(PartitionExample1::sendMessages);
executorService.shutdown();
executorService.awaitTermination(10, TimeUnit.MINUTES);
}
private static void startConsumer() {
KafkaConsumer<String, String> consumer = ExampleHelper.createConsumer(TOPIC_NAME);
int numMsgReceived = 0;
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2));
for (ConsumerRecord<String, String> record : records) {
numMsgReceived++;
System.out.printf("消费:键 = %s,值 = %s,分区ID = %s,偏移量 = %s%n",
record.key(), record.value(), record.partition(), record.offset());
}
consumer.commitSync();
if (numMsgReceived == MSG_COUNT * PARTITION_COUNT) {
break;
}
}
}
private static void sendMessages() {
KafkaProducer producer = ExampleHelper.createProducer();
for (int i = 0; i < MSG_COUNT; i++) {
for (int partitionId = 0; partitionId < PARTITION_COUNT; partitionId++) {
String value = "消息-" + i;
String key = Integer.toString(i);
System.out.printf("发送消息 主题:%s,键:%s,值:%s,分区ID:%s%n",
TOPIC_NAME, key, value, partitionId);
producer.send(new ProducerRecord<>(TOPIC_NAME, partitionId, key, value));
}
}
}
复制代码
}
(二)示例输出
复制
发送消息 主题:example-topic-1,键:0,值:消息-0,分区ID:0
发送消息 主题:example-topic-1,键:1,值:消息-1,分区ID:0
发送消息 主题:example-topic-1,键:2,值:消息-2,分区ID:0
发送消息 主题:example-topic-1,键:3,值:消息-3,分区ID:0
斲丧:键 = 0,值 = 消息-0,分区ID = 0,偏移量 = 0
斲丧:键 = 1,值 = 消息-1,分区ID = 0,偏移量 = 1
斲丧:键 = 2,值 = 消息-2,分区ID = 0,偏移量 = 2
斲丧:键 = 3,值 = 消息-3,分区ID = 0,偏移量 = 3
再次运行上述类:
复制
发送消息 主题:example-topic-1,键:0,值:消息-0,分区ID:0
发送消息 主题:example-topic-1,键:1,值:消息-1,分区ID:0
发送消息 主题:example-topic-1,键:2,值:消息-2,分区ID:0
发送消息 主题:example-topic-1,键:3,值:消息-3,分区ID:0
斲丧:键 = 0,值 = 消息-0,分区ID = 0,偏移量 = 4
斲丧:键 = 1,值 = 消息-1,分区ID = 0,偏移量 = 5
斲丧:键 = 2,值 = 消息-2,分区ID = 0,偏移量 = 6
斲丧:键 = 3,值 = 消息-3,分区ID = 0,偏移量 = 7
可以看到,斲丧者端的消息偏移量在线性增长。
(三)向多分区主题发送消息
java复制
public class PartitionExample2 {
private static int PARTITION_COUNT = 2;
private static String TOPIC_NAME = “example-topic-2”;
private static int MSG_COUNT = 4;
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.execute(PartitionExample2::startConsumer);
executorService.execute(PartitionExample2::sendMessages);
executorService.shutdown();
executorService.awaitTermination(10, TimeUnit.MINUTES);
}
private static void startConsumer() {
KafkaConsumer<String, String> consumer = ExampleHelper.createConsumer(TOPIC_NAME);
int numMsgReceived = 0;
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2));
for (ConsumerRecord<String, String> record : records) {
numMsgReceived++;
System.out.printf("消费:键 = %s,值 = %s,分区ID = %s,偏移量 = %s%n",
record.key(), record.value(), record.partition(), record.offset());
}
consumer.commitSync();
if (numMsgReceived == MSG_COUNT * PARTITION_COUNT) {
break;
}
}
}
private static void sendMessages() {
KafkaProducer producer = ExampleHelper.createProducer();
for (int i = 0; i < MSG_COUNT; i++) {
for (int partitionId = 0; partitionId < PARTITION_COUNT; partitionId++) {
String value = "消息-" + i;
String key = Integer.toString(i);
System.out.printf("发送消息 主题:%s,键:%s,值:%s,分区ID:%s%n",
TOPIC_NAME, key, value, partitionId);
producer.send(new ProducerRecord<>(TOPIC_NAME, partitionId, key, value));
}
}
}
复制代码
}
(四)示例输出
发送消息 主题:example-topic
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/)
Powered by Discuz! X3.4