Kafka_深入探秘者(4):kafka 主题 topic
一、kafka 主题管理
1、kafka 创建主题 topic 命令
1)命令:
- # 切换到 kafka 安装目录
- cd /usr/local/kafka/kafka_2.12-2.8.0/
- # 创建一个名为 heima 的 主题
- bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic heima --partitions 2 --replication-factor 1
- # 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110):
- bin/kafka-topics.sh --zookeeper 172.18.30.110:2181 --create --topic heima --partitions 2 --replication-factor 1
复制代码 2)命令 参数说明:
–zookeeper :指定了 kafka 所毗连的 zookeeper 服务地点。zookeeper必传参数,多个zookeeper用’,"分开。
–topic : 指定了所要创建主题的名称
–partitions : 指定了分区个数,每个线程处理一个分区数据。
–replication-factor : 指定了副本因子,用于设置主题副本数,每个副天职布在不通节点,不能凌驾总结点数。如你只有一个节点,但是创建时指定副本数为2,就会报错
–create : 创建主题的动作指令。
2、查看 topic 元数据信细的方法:
topic 元数据信息保存在 Zookeeper 节点中,
- # 切换到 zookeeper 安装目录
- cd /usr/local/zookeeper/apache-zookeeper-3.6.3-bin
- # 连接 zookeeper 查看元数据
- bin/zkCli.sh -server localhost:2181
- # 查看 节点 信息:
- get /brokers/topics/heima
复制代码
3、展示出当前全部主题
- # 切换目录
- cd /usr/local/kafka/kafka_2.12-2.8.0/
- # 展示出当前所有主题
- bin/kafka-topics.sh --list --zookeeper localhost:2181
- # 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110):
- bin/kafka-topics.sh --list --zookeeper 172.18.30.110:2181
复制代码 4、 查看主题详情:
- # 切换目录
- cd /usr/local/kafka/kafka_2.12-2.8.0/
- # 查看主题详情
- bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic heima
- # 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110):
- bin/kafka-topics.sh --zookeeper 172.18.30.110:2181 --describe --topic heima
复制代码 5、 修改 主题 topic 设置(增长设置)
- # 切换到 kafka 安装目录
- cd /usr/local/kafka/kafka_2.12-2.8.0/
- # 修改 主题 topic 配置(增加配置)
- bin/kafka-topics.sh --alter --zookeeper localhost:2181 --topic heima --config flush.messages=1
- # 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110):
- bin/kafka-topics.sh --alter --zookeeper 172.18.30.110:2181 --topic heima --config flush.messages=1
复制代码 6、 删除 主题 topic 设置 flush.messages=1
- # 切换到 kafka 安装目录
- cd /usr/local/kafka/kafka_2.12-2.8.0/
- # 删除 主题 topic 配置 flush.messages=1
- bin/kafka-topics.sh --alter --zookeeper localhost:2181 --topic heima --delete-config flush.messages
- # 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110):
- bin/kafka-topics.sh --alter --zookeeper 172.18.30.110:2181 --topic heima --delete-config flush.messages
复制代码 7、 删除 topic 主题
若 delete.topic.enable=true 直接彻底删除该 Topic 主题。
若 delete.topic.enable=false
假如当前 Topic 没有使用过,即没有传输过信息:可以彻底删除。
假如当前 Topic 有使用过,即有过传输过信息:并没有真正删除 Topic,只是把这个 Topic 标志为删除(marked fordeletion),重启 Kafka Server 后删除。
- # 切换到 kafka 安装目录
- cd /usr/local/kafka/kafka_2.12-2.8.0/
- # 删除 topic 主题名为 heima 的主题
- bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic heima
- # 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110):
- bin/kafka-topics.sh --delete --zookeeper 172.18.30.110:2181 --topic heima
复制代码
二、kafka 分区
1、kafka 增长分区
- # 切换到 kafka 安装目录
- cd /usr/local/kafka/kafka_2.12-2.8.0/
- # 增加 topic 主题名为 heima 的主题的 分区为 3 个
- bin/kafka-topics.sh --alter --zookeeper localhost:2181 --topic heima --partitions 3
- # 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110):
- bin/kafka-topics.sh --alter --zookeeper 172.18.30.110:2181 --topic heima --partitions 3
- # 查看 heima 主题详情:
- bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic heima
- # 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110):
- bin/kafka-topics.sh --describe --zookeeper 172.18.30.110:2181 --topic heima
复制代码
2、其他主题参数设置
见官方文档: http://kafka.apache.org/documentation/#topicconfigs
- bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic my-topic --partitions 1 \
- --replication-factor 1 --config max.message.bytes=64000 --config flush.messages=1
-
-
- bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic
- --alter --add-config max.message.bytes=128000
-
-
- bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic --describe
- bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic
- --alter --delete-config max.message.bytes
复制代码 三、kafka 主题 topic 总结
1、KafkaAdminClient 应用
我们都习惯使用 Kafka 中 bin 目次下的脚本工具来管理査看 Kafka,但是有些时候需要将某些管理查看的功能集成到系统 (比如Kafka Manager)中,那么就需要调用一些 API 来直接利用 Kafka 了。
2、在 kafka_learn 工程中,创建 KafkaAdminConfigOperation.java 类, KafkaAdminClient 应用
- /**
- * D:\java-test\idea2019\kafka_learn\src\main\java\djh\it\kafka\learn\chapter3\KafkaAdminConfigOperation.java
- *
- * 2024-6-22 创建 KafkaAdminConfigOperation.java 类, KafkaAdminClient 应用
- */
- package djh.it.kafka.learn.chapter3;
- import kafka.controller.NewPartition;
- import org.apache.kafka.clients.admin.*;
- import org.apache.kafka.common.config.ConfigResource;
- import java.util.Collections;
- import java.util.HashMap;
- import java.util.Map;
- import java.util.Properties;
- import java.util.concurrent.ExecutionException;
- public class KafkaAdminConfigOperation {
- public static void main( String[] args ) throws ExecutionException, InterruptedException {
- describeTopicConfig(); //获取主题详细信息
- // alterTopicConfig(); //添加主题
- // addTopicPartitions(); //添加分区
- }
- private static void addTopicPartitions() throws ExecutionException, InterruptedException {
- String brokerList = "172.18.30.110:9092";
- String topic = "heima";
- Properties props = new Properties();
- props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
- props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
- AdminClient client = AdminClient.create(props);
- NewPartitions newPartitions = NewPartitions.increaseTo(4);
- Map<String, NewPartitions> newPartitionsMap = new HashMap<>();
- newPartitionsMap.put(topic,newPartitions);
- CreatePartitionsResult result = client.createPartitions(newPartitionsMap);
- result.all().get();
- client.close();
- }
- private static void describeTopicConfig() throws ExecutionException, InterruptedException {
- String brokerList = "172.18.30.110:9092";
- String topic = "heima";
- Properties props = new Properties();
- props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
- props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
- AdminClient client = AdminClient.create(props);
- ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topic);
- DescribeConfigsResult result = client.describeConfigs(Collections.singleton(resource));
- Config config = result.all().get().get(resource);
- System.out.println(config);
- client.close();
- }
- }
复制代码
上一节关联链接请点击
# Kafka_深入探秘者(3):kafka 消费者
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |