【kafka03】消息队列与微服务之Kafka 读写数据

[复制链接]
发表于 2025-12-19 15:17:25 | 显示全部楼层 |阅读模式
Kafka 读写数据

参考文档
   Apache Kafka
  常见下令
   kafka-topics.sh            #消息的管理下令 
kafka-console-producer.sh  #生产者的模仿下令 
kafka-console-consumer.sh  #斲丧者的模仿下令   
  
创建 Topic

创建topic名为 chen,partitions(分区)为3,replication(每个分区的副本数/每个分区的分区因子)为 2
  1. #新版命令
  2. [root@node1 bin]#/usr/local/kafka/bin/kafka-topics.sh --create --topic chen --bootstrap-server 10.0.0.187:9092 --partitions 3 --replication-factor 2
  3. #在各节点上观察生成的相关数据
  4. [root@node1 ~]#ls /usr/local/kafka/data/
  5. [root@node2 ~]#ls /usr/local/kafka/data/
  6. [root@node3 ~]#ls /usr/local/kafka/data/
  7. #旧版命令
  8. [root@node1 ~]#/usr/local/kafka/bin/kafka-topics.sh --chen --zookeeper 10.0.0.187:2181,10.0.0.188:2181,10.0.0.189:2181 --partitions 3 --replication-factor 2 --topic wang
  9. Created topic wang.
复制代码

获取全部 Topic

  1. #新版命令
  2. [root@node1 bin]#/usr/local/kafka/bin/kafka-topics.sh --list --bootstrap-server 10.0.0.187:9092
  3. chen
  4. #旧版命令
  5. [root@node1 ~]#/usr/local/kafka/bin/kafka-topics.sh --list --zookeeper 10.0.0.101:2181,10.0.0.102:2181,10.0.0.103:2181
  6. chen
复制代码

验证 Topic 详情

状态阐明:wang 有三个分区分别为0、1、2,分区0的leader是3 (broker.id),分区 0 有2 个副本,而且状态都为 lsr(ln-sync,表现可 以到场推举成为 leader)。
  1. [root@node1 bin]#/usr/local/kafka/bin/kafka-topics.sh --describe --bootstrap-server 10.0.0.187:9092  --topic chen
  2. Topic: chen        TopicId: OHzFQnjYTYS_t-PyomxkSQ        PartitionCount: 3        ReplicationFactor: 2        Configs:
  3.         Topic: chen        Partition: 0        Leader: 3        Replicas: 3,1        Isr: 3,1        Elr: N/A        LastKnownElr: N/A
  4.         Topic: chen        Partition: 1        Leader: 1        Replicas: 1,2        Isr: 1,2        Elr: N/A        LastKnownElr: N/A
  5.         Topic: chen        Partition: 2        Leader: 2        Replicas: 2,3        Isr: 2,3        Elr: N/A        LastKnownElr: N/A
  6. [root@node1 bin]#ls -1  /usr/local/kafka/data/
  7. chen-0
  8. chen-1
  9. [root@node2 ~]#ls -1  /usr/local/kafka/data/
  10. chen-1
  11. chen-2
  12. [root@node3 ~]#ls -1  /usr/local/kafka/data/
  13. chen-0
  14. chen-2
复制代码


node3 leadernode1 leadernode2 leadernode1 follwernode2 follwernode3 follwerp0p1p2生产 Topic

kafka-console-producer.sh 格式
  1. #发送消息命令格式:
  2. kafka-console-producer.sh --broker-list <kafkaIP1>:<端口>,<kafkaIP2>:<端口> --topic <topic名称>
  3. #/usr/local/kafka/bin/kafka-console-producer.sh --broker-list 10.0.0.187:9092,10.0.0.102:9092,10.0.0.103:9092 --topic chen
复制代码
范例:
  1. /usr/local/kafka/bin/kafka-console-producer.sh --broker-list 10.0.0.187:9092 --topic chen
  2. >message1
  3. >message2
  4. >message3
  5. >
  6. #或者下面方式
  7. [root@node1 ~]#/usr/local/kafka/bin/kafka-console-producer.sh --topic wang --bootstrap-server 10.0.0.101:9092
复制代码
斲丧 Topic

kafka-console-consumer.sh 格式
  1. #接收消息命令格式:
  2. kafka-console-consumer.sh --bootstrap-server <host>:<post> --topic <topic名称> --from-beginning --consumer
  3. property group.id=<组名称>
复制代码

注意:


  • 消息者老师产消息,斲丧都后续才启动,也能收到之宿世产的消息
  • 同一个消息在同一个group内的斲丧者只有被一个斲丧者斲丧,比如:共100条消息,在一个group内有A,B两个斲丧者,此中A斲丧 50条,B斲丧别的的50条消息。从而实现负载平衡,差别group内的斲丧者则可以同时斲丧同一个消息--from-beginning
  • 表现斲丧发布的消息也能收到,默认只能收到斲丧后发布的新消息
范例:
  1. #交互式持续接收消息,按Ctrl+C退出
  2. /usr/local/kafka/bin/kafka-console-consumer.sh --topic chen --bootstrap-server 10.0.0.187:9092 --from-beginning
  3. #一个消息同时只能被同一个组内一个消费者消费(单播机制),实现负载均衡,而不能组可以同时消费同一个消息(多播机制)
  4. [root@node2 ~]#/usr/local/kafka/bin/kafka-console-consumer.sh --topic chen  --bootstrap-server 10.0.0.102:9092 --from-beginning --consumer-property group.id=group1
  5. [root@node2 ~]#/usr/local/kafka/bin/kafka-console-consumer.sh --topic chen --bootstrap-server 10.0.0.102:9092 --from-beginning --consumer-property group.id=group1
复制代码
删除 Topic

范例:
  1. #注意:需要修改每个节点配置文件server.properties中的delete.topic.enable=true并重启
  2. #新版本
  3. [root@node3 ~]#/usr/local/kafka/bin/kafka-topics.sh --delete --bootstrap-server 10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092 --topic chen
  4. #旧版本
  5. [root@node3 ~]#/usr/local/kafka/bin/kafka-topics.sh --delete --zookeeper
  6. 10.0.0.101:2181,10.0.0.102:2181,10.0.0.103:2181 --topic chen
  7. Topic wang is marked for deletion.
  8. Note: This will have no impact if delete.topic.enable is not set to true.
复制代码
范例:删除zk下面 topic test
  1. #无需修改配置文件server.properties,此方法很危险
  2. [root@zookeeper-node1 ~]#zkCli.sh -server 10.0.0.103:2181
  3. [zk: 10.0.0.103:2181(CONNECTED) 0] ls /brokers/topics
  4. [zk: 10.0.0.103:2181(CONNECTED) 0] deleteall /brokers/topics/test
  5. [zk: 10.0.0.103:2181(CONNECTED) 0] ls /brokers/topics
复制代码


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!qidao123.com:ToB企服之家,中国第一个企服评测及软件市场,开放入驻,技术点评得现金

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

×
回复

使用道具 举报

登录后关闭弹窗

登录参与点评抽奖  加入IT实名职场社区
去登录
快速回复 返回顶部 返回列表