提示:文章写完后,目次可以主动天生,怎样天生可参考右边的资助文档
前言
安装Kafka之前,我们需要安装JDK、Zookeeper、Scala。
JDK(Java Development Kit)是 Java 开发工具包,它包罗 Java 编译器(javac)用于将代码编译成字节码,Java 捏造机(JVM)负责运行字节码,还有大量的 Java 核心类库。这些类库为 Kafka 提供了诸如网络通讯、文件操纵、内存管理等基础功能的支持。
Zookeeper 是一个分布式的、开源的应用程序协调服务。它主要用于维护配置信息、定名、提供分布式同步和组服务等。它以一种简朴的层次结构数据模型(类似文件系统的目次树结构)来存储数据。
Scala 运行在 Java 捏造机上,可以与 Java 代码无缝集成。它具有很多独特的特性,比如类型推断,这使得代码编写更加简洁。
安装原因与使用:Kafka 是用 Scala 编写的,而 Scala 运行在 Java 捏造机(JVM)上。JDK 提供了 Java 编译器和 Java 捏造机等关键组件,是 Kafka 运行的基础情况。Kafka 的启动脚本、核心代码的实行都依赖于 JVM,没有 JDK,Kafka 无法正常运行。Kafka 使用 Zookeeper 来管理和协调集群。Zookeeper 存储了 Kafka 集群的元数据信息,比如集群中有哪些 broker(Kafka 服务器)、每个 topic(主题)的分区分布情况、消费者的消费偏移量等信息。它在 Kafka 集群的动态扩展、故障恢复等过程中起到关键的协调作用。
JDK安装
1.下载jdk
官网地点安装链接:https://www.oracle.com/java/technologies/downloads
进入官网后
这里下载的是Windows版本的JDK 23,从上到下
x64 Compressed Archive(228.70 MB)是一个压缩文件(.zip),这种类型的文件下载后需要手动解压,然后配置系统情况变量才气使用 JDK。适合那些喜欢手动管理软件安装和配置的用户。
x64 Installer(205.21 MB)是一个可实行的安装程序(.exe),这种类型的文件下载后,用户可以直接双击运行安装程序,它会主动将 JDK 安装到系统中。
x64 MSI Installer(203.96 MB)是一个 Windows Installer 包(.msi)这种类型的文件通常用于企业情况或通过组策略进行软件摆设。它提供了更高级的安装选项。
这里我们直接下载第二种,假如有别的需求,也可以按需下载不同系统的JDK安装文件。
2.安装jdk
下载后在文件夹找到文件如图所示
双击运行这个文件,跟随给出的步骤操纵就可以安装好JDK,在这个过程中可以修改JDK的安装目次,这里注意:安装目次一定不要用中文,而且不能有空格。建议直接用默认的安装路径一样平常为C:\Program Files\Java\jdk-23或者C:\Program\Java\jdk-23,不想修改就记一下这里的路径,后面配置情况变量需要用到。
3.验证是否安装成功
按下window键+R,在运行栏中输入cmd, 输入如下命令:
- java -version --执行工具
- javac -version --编译工具
复制代码 如图:
出现版本号,说明安装成功。
4.配置情况变量
上面我们还没有配置情况变量,在cmd终端就能使用如上的“java -version ” 、“javac -version”,,这是因为如今的版本安装jdk后,会主动给我们创建好情况变量(路径在xx\xx\xx\Java\javapath),以前的版本是必须手工创建的,方法如下:
起首,打开“编辑系统情况变量”
进入系统变量–path
删除主动创建的路径(xx\xx\xx\Java\javapath)
然后再去情况变量里添加路径,起首点击新建,名称为JAVA_HOME,值为上面存放JDK的文件路径(我的JDK文件下载在C:\Program Files\Java\jdk-23)。
然后再到情况变量path中添加JAVA_HOME,起首双击path打开该变量目次点击新建,添加变量%JAVA_HOME%\bin,完成后如下
OK ,以后假如改了jdk的安装路径,只需要更改JAVA_HOME的值即可。
Zookeeper安装
1.下载Zookeeper
安装链接:https://pan.baidu.com/s/1TbgZXmK5_O8W4iZiuFicgw?pwd=yyds
我这里是通过一个博主的网盘链接下载的
参考文章:【Zookeeper】Windows下安装Zookeeper(图文记载详细步骤,手把手包安装成功)
2.安装Zookeeper
这个文件我下载后解压在E:bigdata,文件名就是zookeeper,
进入目次后要新建一个data文件,
然后进入conf目次,
修改zoo_sample.cfg文件,改名为zoo.cfg,我这里直接复制了一份改了一下名字
,再进入zoo.cfg把dataDir=/tmp/zookeeper
修改为data文件路径dataDir=E:\bigdata\zookeeper\data
在这里有一个小题目,因为我的zookeeper文件最开始直接安装在E盘,不是bigdata文件,所以路径是E:\zookeeper\data因此储存信息的zookeeper文件出如今E盘,假如想修改或删除kafka运行的相关文件,不光要修改或删除kafka文件里的储存文件,还要修改或删除这个出如今E盘的文件
以上zookeeper文件安装和配置就完成了
3.配置情况变量
再如同JDK文件一样去情况变量添加zookeeper文件的路径,先在系统变量里添加ZOOKEEPER_HOME,值为zookeeper文件路径
再去path中添加ZOOKEEPER_HOME,
如许,以后zookeeper文件路径有变化时直接修改ZOOKEEPER_HOME对应的值就行了
4.验证是否安装成功
以管理员权限打开命令窗口,输入:
出现下图显示,说明安装成功
Scala安装
1.下载Scala
安装链接:https://pan.baidu.com/s/1Qiy1aEndKn_Xs-zSSLaWIA?pwd=yyds
同上,这个文件也是在网盘下载的
参考文件:【Scala】Windows下安装Scala(以Scala 2.11.12为例)
2.安装Scala
这个文件不需要进行修改,直接安装好就行,我把这个文件也是放在了E:bigdata,名称为Scala(所以他的路径为E:bigdata\Scala)
3.配置情况变量
同上,去情况变量中新增情况变量 SCALA_HOME,值为Scala对应路径,如下图:
而且在情况变量Path添加条目:
4.验证是否安装成功
开启一个新的cmd窗口。输入:Scala
出现上述界面,说明安装成功
安装kafka
1.下载kafka
安装链接:https://pan.baidu.com/s/1Av4ZwQPUaAntwVxz79Ne9w?pwd=yyds
同上,网盘下载
参考文件:【Kafka】Windows下安装Kafka(图文记载详细步骤)
2.安装kafka
文件我下载在了E:bigdata,名称为kafka
接下来需要在Kafka安装目次下新建目次logs,
然后修改配置文件 server.properties
文件路径:…bigdata\kafka\config\server.properties
修改 log.dirs 参数值,修改成上一步新建的logs文件夹。注意文件夹路径中是双左斜杠,
比方我的是log.dirs=E:\bigdata\kafka\logs。
再然后修改 listeners 参数值,还是 server.properties文件,
在35行左右添加listeners=PLAINTEXT://localhost:9092
这个代表了你创建的kafka服务器的端口号,在后续毗连进行操纵时会用到
以上前置需要使用的文件安装完成
Kafka运行
Kafka依赖于Zookeeper,
所以在终端以管理员身份打开命令行起首输入命令zkServer
启动Zookeeper服务,这个在验证zookeeper是否安装成功有提到,就不放图了,
再另开一个命令行,还是以管理员身份打开,起首进入kafka文件所在的目次,比方我这里是在E盘的bigdata文件中所以我的过程如下
输入如下命令启动Kafka服务:
- .\bin\windows\kafka-server-start.bat .\config\server.properties
复制代码 运行结果如下
以管理员权限再新开一个命令提示窗口,进入E:\bigdata\kafka\bin\windows目次,
- 1|E:
- 2|cd bigdata\kafka\bin\windows
复制代码 实行以下命令,创建topics:
- kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
复制代码 kafka - topics.bat:这是 Kafka 用于操纵主题(topic)的脚本文件(在 Windows 情况下)。它提供了一系列与主题相关的操纵功能,如创建、删除、查看主题信息等。
–create:这是一个操纵指令,表现要创建一个新的主题。
–bootstrap - server localhost:9092:指定 Kafka 集群的引导服务器地点和端口。localhost表现本田主机,假如 Kafka 摆设在远程服务器上,则需要替换为相应的服务器域名或 IP 地点。9092是 Kafka 服务器默认的监听端口。客户端(如这个创建主题的操纵)通过毗连到这个引导服务器来获取整个 Kafka 集群的相关信息,包罗其他的服务器节点信息等。
–replication - factor 1:复制因子用于指定主题的每个分区在 Kafka 集群中的副本数量。这里设置为1,意味着每个分区只有一个副本。在生产情况中,为了保证数据的高可用性和容错性,通常会设置大于1的值,如许即使某个节点出现故障,数据仍然可以从其他副本中获取。
–partitions 1:指定主题的分区数量。分区是 Kafka 对数据进行分布式存储和处置惩罚的基本单元。通过将主题分别为多个分区,可以实现并行处置惩罚数据,进步系统的吞吐量。这里创建的主题只有1个分区。
–topic test:指定要创建的主题名称为test。主题是 Kafka 中消息的分类标识,生产者将消息发送到特定的主题,消费者从特定的主题中接收消息。
由上可知,–bootstrap - server localhost:9092设置端口,–topic test设置主题,主要通过修改这两个值来达到区分不同主机不同端口的不同主题的目的
如上图,我这里已经创建了test这个主题,所以报错了,更换成test2,当出现Created topic test2.说明创建成功。
查看topics列表:
- kafka-topics.bat --bootstrap-server localhost:9092 --list
复制代码
打开producer(生产者)
以管理员身份新开一个命令行,输入如下命令:
- kafka-console-producer.bat --broker-list localhost:9092 --topic test
复制代码
打开consumer(消费者)
以管理员身份新开一个命令行,输入如下命令:
- kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
复制代码
如许kafka的服务器就搭建完成,在producer(生产者)输入信息,在consumer(消费者)就可以实时接收。
与python结合实例,制作生产者与消费者
使用pycharm运行python文件往kafka服务器,起首要下载kafka库,在自己的捏造情况运行命令pip install kafka,将这个库下载到情况中。
1.发送信息:
到pycharm中运行以下代码:
- from kafka import KafkaProducer
- import json
- import time
- # 创建KafkaProducer实例
- producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092', value_serializer=lambda v: json.dumps(v).encode('utf-8'))
- # 发送消息到指定Topic
- topic = 'my_topic'
- message_dict = {'name': 'luowei', 'message': 'this is a test message'}
- # 发送消息
- while True:
- producer.send(topic, message_dict)
- time.sleep(5)
复制代码 这段代码作用是向指定的kafka服务器端口里的一个指定主题,以utf-8的形式每隔5秒发送一个字典,是一个非常简朴的生产者
运行这段代码后,在终端打开这个my_topic的生产者可以看到接收到了信息
2.使用代码读取kafka服务器主题中的信息
- from kafka import KafkaConsumer
- import json
- # Kafka服务器地址和要读取的主题名称
- bootstrap_servers = '127.0.0.1:9092'
- topic_name = 'my_topic'
- # 创建KafkaConsumer实例
- consumer = KafkaConsumer(
- topic_name,
- bootstrap_servers=bootstrap_servers,
- auto_offset_reset='earliest', # 从最早的消息开始读取,如果不需要可以修改
- group_id='test_group' # 消费者组ID,可以根据需要修改
- )
- for message in consumer:
- try:
- # 这里假设消息的值是字符串类型,如果是其他类型需要相应处理
- print(f"收到消息: {message.value.decode('utf-8')}")
- # 如果消息是JSON格式,可以这样解析
- # message_value = json.loads(message.value.decode('utf-8'))
- # print(f"解析后的消息: {message_value}")
- except Exception as e:
- print(f"处理消息时出错: {e}")
复制代码 结果如下:
待完善的题目
1,在“kafka运行”这一段中还有一个删除方法,最开始直接运行删除命令会报错,在网上查找资料后发现要修改配置文件,在server.properties文件中加入delete.topic.enable = true,可是加入后发现依旧报错,而且整个kafka系统还会报错退出,错误与表明如下
[2024 - 11 - 07 17:10:52,557] ERROR [Broker id = 0] Ignoring StopReplica request (delete = true) from controller 0 with correlation id 5 epoch 1 for partition test - 0 as the local replica for the partition is in an offline log directory (state.change.logger)
这条日记表明 broker 0 收到了来自控制器 0 的针对分区 test - 0 的停止副本(StopReplica)哀求,且哀求中包含了删除(delete = true)操纵。然而,broker 0 忽略了这个哀求,原因是分区 test - 0 的本地副本所在的日记目次处于离线状态。这意味着 broker 无法正常访问该分区的日记数据,可能是由于之前提到的日记目次权限题目或者其他导致目次不可用的原因。
经过查询资料后得出可能的两个原因:一是kafka版本与zookeeper不是很适配,二是版本与输入命令不搭,在一些较早的版本删除命令有些许变化
2,在使用代码写入与读取时速度较慢,尤其是读取时
3,在使用代码发送时,无法向已发送过的topic再次发送信息,这里可能与生产者的配置文件有关,因时间原因并没有深入研究
4.当生产者发送的信息是用UTF-8编码处置惩罚时,有些没有将配置完善的kafka服务器终端在接收信息时会出现乱码情况
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |