# Kafka_深入探秘者(4):kafka 主题 topic

打印 上一主题 下一主题

主题 692|帖子 692|积分 2076

Kafka_深入探秘者(4):kafka 主题 topic

一、kafka 主题管理

1、kafka 创建主题 topic 命令

1)命令:
  1. # 切换到 kafka 安装目录
  2. cd /usr/local/kafka/kafka_2.12-2.8.0/
  3. # 创建一个名为 heima 的 主题
  4. bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic heima --partitions 2 --replication-factor 1
  5. # 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110):
  6. bin/kafka-topics.sh --zookeeper 172.18.30.110:2181 --create --topic heima --partitions 2 --replication-factor 1
复制代码
2)命令 参数说明:
–zookeeper :指定了 kafka 所毗连的 zookeeper 服务地点。zookeeper必传参数,多个zookeeper用’,"分开。
–topic : 指定了所要创建主题的名称
–partitions : 指定了分区个数,每个线程处理一个分区数据。
–replication-factor : 指定了副本因子,用于设置主题副本数,每个副天职布在不通节点,不能凌驾总结点数。如你只有一个节点,但是创建时指定副本数为2,就会报错
–create : 创建主题的动作指令。
2、查看 topic 元数据信细的方法:

topic 元数据信息保存在 Zookeeper 节点中,
  1. # 切换到 zookeeper 安装目录
  2. cd /usr/local/zookeeper/apache-zookeeper-3.6.3-bin
  3. # 连接 zookeeper 查看元数据
  4. bin/zkCli.sh -server localhost:2181
  5. # 查看 节点 信息:
  6. get /brokers/topics/heima
复制代码

3、展示出当前全部主题

  1. # 切换目录
  2. cd /usr/local/kafka/kafka_2.12-2.8.0/
  3. # 展示出当前所有主题
  4. bin/kafka-topics.sh --list --zookeeper localhost:2181
  5. # 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110):
  6. bin/kafka-topics.sh --list --zookeeper 172.18.30.110:2181
复制代码
4、 查看主题详情:

  1. # 切换目录
  2. cd /usr/local/kafka/kafka_2.12-2.8.0/
  3. # 查看主题详情
  4. bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic heima
  5. # 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110):
  6. bin/kafka-topics.sh --zookeeper 172.18.30.110:2181 --describe --topic heima
复制代码
5、 修改 主题 topic 设置(增长设置)

  1. # 切换到 kafka 安装目录
  2. cd /usr/local/kafka/kafka_2.12-2.8.0/
  3. # 修改 主题 topic 配置(增加配置)
  4. bin/kafka-topics.sh  --alter --zookeeper localhost:2181 --topic heima --config flush.messages=1
  5. # 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110):
  6. bin/kafka-topics.sh  --alter --zookeeper 172.18.30.110:2181 --topic heima --config flush.messages=1
复制代码
6、 删除 主题 topic 设置 flush.messages=1

  1. # 切换到 kafka 安装目录
  2. cd /usr/local/kafka/kafka_2.12-2.8.0/
  3. # 删除 主题 topic 配置 flush.messages=1
  4. bin/kafka-topics.sh  --alter --zookeeper localhost:2181 --topic heima --delete-config flush.messages
  5. # 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110):
  6. bin/kafka-topics.sh  --alter --zookeeper 172.18.30.110:2181 --topic heima --delete-config flush.messages
复制代码
7、 删除 topic 主题

若 delete.topic.enable=true 直接彻底删除该 Topic 主题。
若 delete.topic.enable=false
假如当前 Topic 没有使用过,即没有传输过信息:可以彻底删除。
假如当前 Topic 有使用过,即有过传输过信息:并没有真正删除 Topic,只是把这个 Topic 标志为删除(marked fordeletion),重启 Kafka Server 后删除。
  1. # 切换到 kafka 安装目录
  2. cd /usr/local/kafka/kafka_2.12-2.8.0/
  3. # 删除 topic 主题名为 heima 的主题
  4. bin/kafka-topics.sh  --delete --zookeeper localhost:2181 --topic heima
  5. # 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110):
  6. bin/kafka-topics.sh  --delete --zookeeper 172.18.30.110:2181 --topic heima
复制代码

二、kafka 分区

1、kafka 增长分区

  1. # 切换到 kafka 安装目录
  2. cd /usr/local/kafka/kafka_2.12-2.8.0/
  3. # 增加 topic 主题名为 heima 的主题的 分区为 3 个
  4. bin/kafka-topics.sh  --alter --zookeeper localhost:2181 --topic heima --partitions 3
  5. # 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110):
  6. bin/kafka-topics.sh  --alter --zookeeper 172.18.30.110:2181 --topic heima --partitions 3
  7. # 查看 heima 主题详情:
  8. bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic heima
  9. # 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110):
  10. bin/kafka-topics.sh --describe --zookeeper 172.18.30.110:2181 --topic heima
复制代码

2、其他主题参数设置

见官方文档: http://kafka.apache.org/documentation/#topicconfigs
  1. bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic my-topic --partitions 1 \
  2.   --replication-factor 1 --config max.message.bytes=64000 --config flush.messages=1
  3.   
  4.   
  5. bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic
  6.   --alter --add-config max.message.bytes=128000
  7.   
  8.   
  9. bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic --describe  
  10. bin/kafka-configs.sh --bootstrap-server localhost:9092  --entity-type topics --entity-name my-topic
  11.   --alter --delete-config max.message.bytes
复制代码
三、kafka 主题 topic 总结

1、KafkaAdminClient 应用

我们都习惯使用 Kafka 中 bin 目次下的脚本工具来管理査看 Kafka,但是有些时候需要将某些管理查看的功能集成到系统 (比如Kafka Manager)中,那么就需要调用一些 API 来直接利用 Kafka 了。
2、在 kafka_learn 工程中,创建 KafkaAdminConfigOperation.java 类, KafkaAdminClient 应用

  1. /**
  2. *  D:\java-test\idea2019\kafka_learn\src\main\java\djh\it\kafka\learn\chapter3\KafkaAdminConfigOperation.java
  3. *
  4. *  2024-6-22 创建 KafkaAdminConfigOperation.java 类, KafkaAdminClient 应用
  5. */
  6. package djh.it.kafka.learn.chapter3;
  7. import kafka.controller.NewPartition;
  8. import org.apache.kafka.clients.admin.*;
  9. import org.apache.kafka.common.config.ConfigResource;
  10. import java.util.Collections;
  11. import java.util.HashMap;
  12. import java.util.Map;
  13. import java.util.Properties;
  14. import java.util.concurrent.ExecutionException;
  15. public class KafkaAdminConfigOperation {
  16.     public static void main( String[] args ) throws ExecutionException, InterruptedException {
  17.         describeTopicConfig();   //获取主题详细信息
  18. //        alterTopicConfig();    //添加主题
  19. //        addTopicPartitions();  //添加分区
  20.     }
  21.     private static void addTopicPartitions() throws ExecutionException, InterruptedException {
  22.         String brokerList = "172.18.30.110:9092";
  23.         String topic = "heima";
  24.         Properties props = new Properties();
  25.         props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
  26.         props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
  27.         AdminClient client = AdminClient.create(props);
  28.         NewPartitions newPartitions = NewPartitions.increaseTo(4);
  29.         Map<String, NewPartitions> newPartitionsMap = new HashMap<>();
  30.         newPartitionsMap.put(topic,newPartitions);
  31.         CreatePartitionsResult result = client.createPartitions(newPartitionsMap);
  32.         result.all().get();
  33.         client.close();
  34.     }
  35.     private static void describeTopicConfig() throws ExecutionException, InterruptedException {
  36.         String brokerList = "172.18.30.110:9092";
  37.         String topic = "heima";
  38.         Properties props = new Properties();
  39.         props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
  40.         props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
  41.         AdminClient client = AdminClient.create(props);
  42.         ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topic);
  43.         DescribeConfigsResult result = client.describeConfigs(Collections.singleton(resource));
  44.         Config config = result.all().get().get(resource);
  45.         System.out.println(config);
  46.         client.close();
  47.     }
  48. }
复制代码

上一节关联链接请点击
# Kafka_深入探秘者(3):kafka 消费者

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

麻花痒

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表