搭建Kafka源码环境并测试

打印 上一主题 下一主题

主题 568|帖子 568|积分 1708

一、前言

参考的博客是用的是 Kafka 2.7 版本的,于是我这里也采用 2.7 版本,理由是这个版本还保存着 zookeeper,生产环境比较稳固
二、环境预备



  • JDK:1.8.0_241
  • Scala:2.12.8
  • Gradle:6.6
  • Zookeeper:3.4.14
三、环境搭建

3.1 JDK 环境搭建

直接利用本地环境
3.2 Scala 环境搭建

下载链接:Scala 2.12.8 | The Scala Programming Language (scala-lang.org)
3.2.1 配置 Scala 环境变量

  1. vim ~/.bash_profile
  2. # 安装的路径
  3. export SCALA_HOME=/Users/gabriel/Environment/scala-2.12.8
  4. export PATH=$PATH:$SCALA_HOME/bin
  5. source ~/.bash_profile
复制代码
3.2.2 验证

输入 scala -version 看到提示则成功
3.3 Gradle 环境搭建

下载链接:Gradle Distributions
下载版本为 gradle-6.6-bin
3.3.1 配置 Gradle 环境变量

  1. vim ~/.bash_profile
  2. # 安装的路径
  3. export GRADLE_HOME=/Users/gabriel/Environment/gradle-6.6
  4. export PATH=$PATH:$GRADLE_HOME/bin
  5. source ~/.bash_profile
复制代码
3.3.2 验证

输入 gradle -version 看到提示则成功
3.4 Zookeeper 环境搭建

下载链接:Index of /dist/zookeeper/zookeeper-3.4.14 (apache.org)
3.4.1 配置 Zookeeper 环境变量

  1. vim ~/.bash_profile
  2. # 安装的路径
  3. export ZK_HOME=/Users/gabriel/Environment/zookeeper-3.4.14
  4. export PATH=$PATH:$ZK_HOME/bin
  5. source ~/.bash_profile  
复制代码
创建 data 文件夹用于存放日志数据,并在 cfg 文件中指定
  1. cd zookeeper-3.4.14
  2. mkdir data
  3. cd conf
  4. mv zoo_sample.cfg zoo.cfg
  5. # 修改 zoo.cfg 内的配置
  6. dataDir=/Users/gabriel/Environment/zookeeper-3.4.14/data
复制代码
3.4.2 验证

  1. sh zkServer.sh start
复制代码
输出提示则成功
3.5 Kafka 源码搭建

下载链接:Apache Kafka
这里选择了 2.7.0 版本
3.5.1 导入 Kafka 源码至 IDEA

Setting 里面需要配置 Gradle Home 为本地地点 /Users/gabriel/Environment/gradle-6.6
3.5.2 修改 build.gradle 配置

这里是我第一次利用 gradle 构建项目,根本原理和 maven 差不多,刚好学习一下
需要把镜像文件下载服务器更换为国内的私服,否则会相当慢,直接导致 “time out” 报错。
进入 kafka 源码包,修改 build.gradle 文件,在原来配置上,添加 ali 私服配置。
  1. buildscript {
  2.     repositories {
  3.       maven {
  4.         url 'https://maven.aliyun.com/nexus/content/groups/public/'
  5.       }
  6.       maven {
  7.         url 'https://maven.aliyun.com/nexus/content/repositories/jcenter'
  8.       }
  9.       // 插件下载地址
  10.       maven {
  11.         url 'https://maven.aliyun.com/repository/gradle-plugin'
  12.       }
  13.   }
  14. }
  15. allprojects {
  16.     repositories {
  17.       maven {
  18.         url 'https://maven.aliyun.com/nexus/content/groups/public/'
  19.       }
  20.       maven {
  21.         url 'https://maven.aliyun.com/nexus/content/repositories/jcenter'
  22.       }
  23.       // 插件下载地址
  24.       maven {
  25.         url 'https://maven.aliyun.com/repository/gradle-plugin'
  26.       }
  27.   }
  28. }
复制代码
Bug 1:
疯狂报错如下,好像是 gradle 的插件找不到,从对应的阿里云地点也下载不下来
办理方案:在上面配置 maven 下载地点的地方加上专门的插件下载地点
  1. A problem occurred configuring root project 'kafka-2.7.0-src'.
  2. > Could not resolve all artifacts for configuration ':classpath'.
  3.    > Could not find gradle.plugin.com.github.spotbugs.snom:spotbugs-gradle-plugin:4.4.4.
  4.      Searched in the following locations:
  5.        - https://maven.aliyun.com/nexus/content/groups/public/gradle/plugin/com/github/spotbugs/snom/spotbugs-gradle-plugin/4.4.4/spotbugs-gradle-plugin-4.4.4.pom
  6.        - https://maven.aliyun.com/nexus/content/repositories/jcenter/gradle/plugin/com/github/spotbugs/snom/spotbugs-gradle-plugin/4.4.4/spotbugs-gradle-plugin-4.4.4.pom
  7.        - https://dl.bintray.com/content/netflixoss/external-gradle-plugins/gradle/plugin/com/github/spotbugs/snom/spotbugs-gradle-plugin/4.4.4/spotbugs-gradle-plugin-4.4.4.pom
  8.      Required by:
  9.          project :
  10.    > Could not find org.gradle:test-retry-gradle-plugin:1.1.6.
  11.      Searched in the following locations:
  12.        - https://maven.aliyun.com/nexus/content/groups/public/org/gradle/test-retry-gradle-plugin/1.1.6/test-retry-gradle-plugin-1.1.6.pom
  13.        - https://maven.aliyun.com/nexus/content/repositories/jcenter/org/gradle/test-retry-gradle-plugin/1.1.6/test-retry-gradle-plugin-1.1.6.pom
  14.        - https://dl.bintray.com/content/netflixoss/external-gradle-plugins/org/gradle/test-retry-gradle-plugin/1.1.6/test-retry-gradle-plugin-1.1.6.pom
  15.      Required by:
  16.          project :
  17. Possible solution:
  18. - Declare repository providing the artifact, see the documentation at https://docs.gradle.org/current/userguide/declaring_repositories.html
复制代码
然后直接点右上角的小图标下载依赖
依赖下载完以后需要编译源代码(源代码中可以看到编译的 build 目次),这样子后面摆设 kafka.Kafka 启动类然后利用脚本创建 producer、consumer 的时候,才气成功。手动输入下令如下:
  1. ./gradlew clean build -x test
复制代码
大概直接右侧 gradle build,不过此中需要颠末很多 test 测试,上面手动跳过
四、源码代码结构


重点看下此中几个紧张的模块:


  • bin 目次:保存 Kafka 工具脚本,包罗 kafka-server-start 和 kafka-console-producer 等脚本都存放在这里
  • clients 目次:保存 Kafka 客户端代码,比如生产者和消耗者的代码都在该目次下(对于 Kafka 而言都是客户端)
  • config 目次:保存 Kafka 的配置文件,此中比较紧张的配置文件是 server.properties
  • connect 目次:保存 Connect 组件的源代码。 Kafka Connect 组件是用来实现 Kafka 与外部系统之间的及时数据传输的。
  • core 目次:保存 Broker 端代码。Kafka 服务器端代码全部保存在该目次下(这对于 Kafka 而言是服务端)
  • generator 目次:Kafka 消息类处置惩罚模块,紧张是根据 clients 模块下的 message json 文件生成对应的 java 类,在 build.gradle 文件中,可以看到定义了一个使命 processMessages
  • raft 目次:raft 同等性协议相关。
五、核心 Core 目次代码结构




  • admin 包:实行管理下令的功能;
  • api 包:封装请求和响应 DTO 对象;
  • cluster 包:集群对象,比方 Partition 类代表一个分区,Replica 类代表一个分区副本;
  • common 包:通用类,比如非常类
  • consumer 包:只有一个类 BaseConsumerRecord,后面会丢弃该包,用 clients 包下 org.apache.kafka.clients.consumer.ConsumerRecord 取代;
  • controller 包: 和 kafkaController(kc)相关的类,重点模块,一个kafka集群只有一个leader kafkaController,该kafkaController负责分区管理,副本管理,并保证集群信息在集群中同步;
  • coordinator 包:保存了消耗者端的 GroupCoordinator 代码和用于事件的 TransactionCoordinator 代码。对 coordinator 包进行分析,特别是对消耗者端的 GroupCoordinator 代码进行分析,是 Broker 端协调者组件计划原理的关键;
  • log 包:保存了 Kafka 最核心的日志结构代码,包罗日志、日志段、索引文件等, 别的,该包下还封装了 Log Compaction 的实现机制,是非常紧张的源码包;
  • network 包:封装了 Kafka 服务器端网络层的代码,特别是 SocketServer.scala 这个文件,是 Kafka 实现 Reactor 模式的具体操作类,非常值得一读;
  • server 包:顾名思义,它是 Kafka 的服务器端主代码,里面的类非常多,很多关键的 Kafka 组件都存放在这里,比如状态机、Purgatory 延机会制等;
  • tools 包:工具类。
六、源码摆设验证

最终实在就是在第三章编译完 kafka 全部文件的底子上,要启动 core 目次下和 broker 相关的类,启动类就是 kafka-2.7.0-src/core/src/main/scala/kafka/Kafka.scala
6.1 core.main 下新建日志配置类

在 core/main 下新建 resources 目次,再将 config/log4j.properties 拷贝到该目次下

同时在 build.gradle 中设置

  1. compile group: 'log4j', name: 'log4j', version: '1.2.17'
  2. compile group: 'org.slf4j', name: 'slf4j-api', version: '1.7.30'
  3. compile group: 'org.slf4j', name: 'slf4j-log4j12', version: '1.7.30'
复制代码
这是为了避免启动报错“SLF4J: Failed to load class “org.slf4j.impl.StaticLoggerBinder””
6.2 修改 server.properties

修改 server.properties 中的 log.dirs 值,变为本地目次,
/Users/gabriel/Desktop/DiveinCode/kafka-2.7.0-src 下新建 kafka-logs 目次,修改配置如下
  1. log.dirs=/Users/gabriel/Desktop/DiveinCode/kafka-2.7.0-src/kafka-logs
复制代码
6.3 配置 Run Configurations


6.4 发送、消耗 message

上面已经搭建好 Kafka Broker 的摆设环境,并且编译成功了全部和 producer、consumer 有关的文件类,现在利用 Kafka 自带的脚本工具测试 producer 和 consumer
团体流程如下:

  • 创建 topic
    进入 kafka-2.7.0-src/bin 通过 kafka-topics.sh 下令来创建一个名为 topicjxz 的 topic(前提代码已经 build 编译完),zookeeper 的地点我是先通过 sh zkServer.sh start
    启动,然后 zkCli 启动客户端找到的:
    1. sh kafka-topics.sh --zookeeper localhost:2181 --create --topic topicjxz --replication-factor 1 --partitions 1
    复制代码

–zookeeper 指定了 zookeeper 的地点,–topic 指定主题名称

  • 通过 kafka-console-consumer.sh 下令启动一个下令行的 consumer 来消耗 topicjxz 这个 topic
  1. sh kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topicjxz
复制代码

–bootstrap-server 指定 broker 地点地点,–topic 指定对应发送消息到的主题

  • 重新启动一个下令行终端,通过 kafka-console-producer.sh 下令启动一个下令行的 producer 向 topicjxz 这个 topic 中生产数据
  1. sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topicjxz
复制代码


至此,我们发现已经将整个 producer、broker、consumer 的链路摆设起来并测试成功了。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

东湖之滨

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表