windows当地搭建zookeeper和kafka环境

打印 上一主题 下一主题

主题 908|帖子 908|积分 2724

zookeeper

1.1 下载zookeeper

下载地址
任意进一个站点,默认是新版本,旧版本点击archives进入,选择符合的版本下载,本文使用的是3.7.2
下载时候选择apache-zookeeper-3.7.2-bin.tar.gz 格式的,编译后的,解压后直接使用


1.2 zk启动

1.2.1 单机启动

解压后进入 /zk/conf/ 复制一份zoo_sample.cfg文件到同级目录,然后更改名称为zoo.cfg。编辑zoo.cfg,确认端口和数据目录
  1. clientPort=2181
  2. dataDir=D:\\xxx\\apache-zookeeper-3.7.2-bin\\data
复制代码
然后进入zk/bin/目录下,运行zkServer.cmd则启动zk服务器完成
1.2.2 伪集群启动

解压后拷贝3份,同理修改配置zoo.cfg,多了server.x的配置。区别在于端口要配置不一样,因为是一台电脑
dataDir指向本身的目录的data,每个data下面创建myid文件,myid就是下面配置的1,2,3 ,对应conf文件内里的server.x
举例:
D:\env\apache-zookeeper-3.7.2-bin-A\data\myid的内容就是 1
D:\env\apache-zookeeper-3.7.2-bin-B\data\myid的内容就是 2
D:\env\apache-zookeeper-3.7.2-bin-C\data\myid的内容就是 3


  1. dataDir=D:\\env\\apache-zookeeper-3.7.2-bin-A\\data
  2. clientPort=2181
  3. server.1=localhost:2887:3887
  4. server.2=localhost:2888:3888
  5. server.3=localhost:2889:3889
复制代码
  1. dataDir=D:\\env\\apache-zookeeper-3.7.2-bin-B\\data
  2. clientPort=2182
  3. server.1=localhost:2887:3887
  4. server.2=localhost:2888:3888
  5. server.3=localhost:2889:3889
复制代码
  1. dataDir=D:\\env\\apache-zookeeper-3.7.2-bin-C\\data
  2. clientPort=2183
  3. server.1=localhost:2887:3887
  4. server.2=localhost:2888:3888
  5. server.3=localhost:2889:3889
复制代码
启动同理,到3个zk的bin目录下启动zkserver,也可以写个bat一次性启动
zk-cluster.bat
  1. @echo off
  2. start cmd /c "D:\\env\\apache-zookeeper-3.7.2-bin-A\\bin\\zkServer.cmd"
  3. start cmd /c "D:\\env\\apache-zookeeper-3.7.2-bin-B\\bin\\zkServer.cmd"
  4. start cmd /c "D:\\env\\apache-zookeeper-3.7.2-bin-C\\bin\\zkServer.cmd"
复制代码
kafka

下载

下载地址
选择二进制文件下载,我下载的是2.13-3.71

下载后解压,进入config目录,修改server.properties文件
对接单zk ,这边要和上面zk的端口一样
  1. zookeeper.connect=localhost:2181
  2. log.dirs=D:\\env\\kafka-zk-single\\logs
复制代码
对接zk集群,这边要和上面zkA、B、C的端口一样
  1. zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
  2. log.dirs=D:\\env\\kafka-zk-cluster\\logs
复制代码
启动kafka

进入kakfa/bin/windows/目录,启动kafka-server-start.bat 同时指定刚才的配置文件
也可以写个bat,放在外面

kafka-single.bat
  1. "D:\\env\\kafka\\bin\\windows\\kafka-server-start.bat" "D:\\env\\kafka\\config\\server.properties"
复制代码
这里提醒一下,不要一个kafka一会对接zk单机,一会对接zk集群,集群id是唯一的,混着用会报错
可以再复制一个kafka出来
类似
kafka-zk-single 用来对接单zk
kafka-zk-cluster 用来对接zk集群
kafka-ui

kafka可视化监控,类似的软件很多,这里用 UI for Apache Kafka
主要是方便,一个jar包,直接启动就好了,不过要求java版本高于17? 用idea下载一个,直接指定位置启动就好了,不影响日常jdk8使用


下载好后同目录创建一个配置文件 application.yml。
  1. server:
  2.   port: 9988
  3. kafka:
  4.   clusters:
  5.     -
  6.       name: local
  7.       bootstrapServers: localhost:9092
复制代码
简单解释下,详细配置看文档
一个是修改下端口,默认是8080,可能会冲突
另一个就是配置kafka的地址
然后启动!
  1. C:\Users\Admin\.jdks\graalvm-jdk-17.0.11\bin\java.exe -jar kafka-ui-api-v0.7.2.jar -Dspring.config.additional-location=application.yml
复制代码
http://localhost:9988/

springboot对接kafka

加依赖、加配置、写代码
  1.         <dependency>
  2.             <groupId>org.springframework.kafka</groupId>
  3.             <artifactId>spring-kafka</artifactId>
  4.             <version>2.8.11</version>
  5.         </dependency>
复制代码
  1. spring:
  2.   kafka:
  3.     bootstrap-servers: localhost:9092
  4.     consumer:
  5.       auto-offset-reset: latest
  6.       enable-auto-commit: true
  7.       auto-commit-interval: 3000
  8.       key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  9.       value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  10.     producer:
  11.       retries: 0
  12.       acks: 1
  13.       key-serializer: org.apache.kafka.common.serialization.StringSerializer
  14.       value-serializer: org.apache.kafka.common.serialization.StringSerializer
复制代码
3种监听方式
  1.     @KafkaListener(topics = "test", groupId = "test_group")
  2.     public void receiveMessage(String message) {
  3.         System.out.println("消费消息:============> " + message);
  4.     }
  5.     @KafkaListener(topics = "test", groupId = "test_group1", properties = {"auto.commit.interval.ms=4000"})
  6.     public void receiveMessage2(ConsumerRecord<String, String> record) {
  7.         System.out.println("消费record:============> " + record);
  8.     }
  9.     @KafkaListener(topics = "test", groupId = "test_group2")
  10.     public void receiveMessage3(List<ConsumerRecord<String, String>> records) {
  11.         System.out.println("消费records:============> " + records);
  12.     }
复制代码
启动程序。。
测试

进入web页面test的topic内里



代码控制台打印:
  1. 消费record:============> ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 1, CreateTime = 1724315931298, serialized key size = 9, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = 测试key, value = 测试值)
  2. 消费消息:============> 测试值
  3. 消费records:============> [测试值]
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

大号在练葵花宝典

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表