论坛
潜水/灌水快乐,沉淀知识,认识更多同行。
ToB圈子
加入IT圈,遇到更多同好之人。
朋友圈
看朋友圈动态,了解ToB世界。
ToB门户
了解全球最新的ToB事件
博客
Blog
排行榜
Ranklist
文库
业界最专业的IT文库,上传资料也可以赚钱
下载
分享
Share
导读
Guide
相册
Album
记录
Doing
搜索
本版
文章
帖子
ToB圈子
用户
免费入驻
产品入驻
解决方案入驻
公司入驻
案例入驻
登录
·
注册
只需一步,快速开始
账号登录
立即注册
找回密码
用户名
Email
自动登录
找回密码
密码
登录
立即注册
首页
找靠谱产品
找解决方案
找靠谱公司
找案例
找对的人
专家智库
悬赏任务
圈子
SAAS
IT评测·应用市场-qidao123.com
»
论坛
›
大数据
›
数据仓库与分析
›
为什么要用Kafka?单机服务搭建&三步消息交互 ...
为什么要用Kafka?单机服务搭建&三步消息交互
欢乐狗
金牌会员
|
2025-3-13 07:56:25
|
显示全部楼层
|
阅读模式
楼主
主题
988
|
帖子
988
|
积分
2964
1、为什么要用Kafka?
1.1、Kafka简介
Apache Kafka是一个分布式流处理平台,最初由LinkedIn开发并于2011年开源。它重要用于解决大规模数据的实时流式处理和数据管道标题。
它的特点包括:高吞吐量、长期化、分布式、实时性。常用于日记收集、消息系统、事件溯源、流处理等场景。
焦点概念:
Topic:数据类别或主题。
Partition:Topic 的分区,支持并行处理。
Producer:数据生产者,向 Kafka 发送消息。
Consumer:数据消耗者,从 Kafka 读取消息。
Broker:Kafka 服务器节点。
官网所在:https://kafka.apache.org/
1.2、为什么要用Kafka?
以一个典型的日记聚合应用场景为例
该业务场景要求产品的特点:
(1) 数据吞吐量很大:
需要 能够 快速收集各个 渠道的海量日记。
(2) 集群容错性高:
答应集群中少量节点崩溃。
(3) 功能不需要太复杂:
Kafka的设计目标是高吞吐、低延迟和可扩展,重要关注消息通报而不是消息处理。以是,并没有支持死信队列、次序消息等高级功能。
(4) 答应少量数据丢失:
Kafka本身也在不断优化数据安全标题,目前根本可以以为Kafka可以做到不会丢数据。
2、单机服务搭建
2.1、情况说明&版本选择
运行情况linux , centos 7.x ,内存2G 4核,JDK 17 (官方要求1.8或以上)
我的虚拟机目录说明
所有自定义安装目录:/opt/apps
服务安装目录:/opt/apps/server
在server目录下再分别创建两个目录:zookeeper 和 kafka
Kafka官方下载所在:https://kafka.apache.org/downloads
选择当前最新的3.9.0版本: kafka_2.13-3.9.0.tgz
Zookeeper官方下载所在:https://zookeeper.apache.org/releases.html
Zookeeper的版本并没有逼迫要求,这里我们选择比较新的3.8.4稳定版本。注意下载的是
bin版本
。
kafka的安装程序中自带了Zookeeper,Zookeeper的客户端jar包在kafka的安装包的libs目录下。但通常情况下,为了更好维护应用,我们会单独摆设的Zookeeper,而不使用Kafka自带的Zookeeper。
2.2、安装包上传并解压
把下载好的安装包上传到服务器,并解压。
上传工具可使用Xftp,客户端毗连工具可使用Xshell,安装包在文末获取,官方个人免费许可,仅需邮箱注册即可。
如果需要空白centos 7虚拟机,里面已经配置好jdk17和一些常用命令,在文末获取。
# 解压zookeeper
tar -xzvf apache-zookeeper-3.8.4-bin.tar.gz -C /opt/apps/server/zookeeper
# 解压kafka
tar -xzvf kafka_2.13-3.9.0.tgz -C /opt/apps/server/kafka/
复制代码
2.3、配置情况变量
将摆设目录下的bin目录路径配置到path情况变量中。
# 打开系统级配置(对所有用户生效)
vim /etc/profile
# 配置的内容如下
export ZOOKEEPER_HOME=/opt/apps/server/zookeeper/apache-zookeeper-3.8.4-bin
export PATH=$ZOOKEEPER_HOME/bin:$PATH
export KAFKA_HOME=/opt/apps/server/kafka/kafka_2.13-3.9.0
export PATH=$KAFKA_HOME/bin:$PATH
# 添加内容后,保存退出 【:wq!】
# 生效环境变量
source /etc/profile
# 查看环境变量
echo $PATH
复制代码
2.4、单机服务搭建
启动Kafka之前需要先启动Zookeeper。启动Zookeeper前需要先做一下简单配置。
进入Zookeeper摆设目录,切换到conf目录,复制zoo_sample.cfg,修改配置dataDir值,默认是暂时目录,随时会被删撤除。
# 切换到Zookeeper的配置目录
cd /opt/apps/server/zookeeper/apache-zookeeper-3.8.4-bin/conf/
# 复制出一个配置文件
cp zoo_sample.cfg zoo.cfg
# 修改配置文件
vim zoo.cfg
# 修改内容如下
dataDir=/opt/apps/server/zookeeper/data
复制代码
启动Zookeeper
# 切换到Zookeeper根目录
cd $ZOOKEEPER_HOME
# 启动命令
nohup bin/zkServer.sh start conf/zoo.cfg &
复制代码
确认服务是否启动成功,有三种方式
# 方式一:jps指令,成功可看到一个QuorumPeerMain进程
# 方式二:连接客户端测试,Zookeeper默认端口是2181
bin/zkCli.sh -server localhost:2181
# 方式三:检查端口是否监听
netstat -tuln | grep 2181
复制代码
启动Kafka
# 切换到Kafka根目录
cd $KAFKA_HOME
# 启动命令
nohup bin/kafka-server-start.sh config/server.properties &
复制代码
确认服务是否启动成功,有两种方式
# 方式一:jps指令,成功可看到一个Kafka进程
# 方式二:检查端口是否监听,Kafka默认端口是9092
netstat -tuln | grep 9092
复制代码
3、三步消息交互
Kafka的底子工作机制是消息发送者将消息发送到Kafka上指定的topic,消息消耗者从指定的topic上消耗消息。
底子消息交互三步骤:
第一步:使用Kafka提供的客户端脚本创建Topic。
第二步:启动一个消息发送者端,向指定名称的Topic发送消息。
第三步:启动一个消息消耗端,从指定名称的Topic上吸收消息。
此中,生产和消耗者并不需要同时启动。它们之间可以进行数据交互,但并不依赖于对方。没有生产者,消耗者依然可以正常工作。反过来,没有消耗者,生产者也可以正常工作。表现了生产者和消耗者之间的解耦。
3.1、实验消息交互
第一步:创建Topic名为test
# 创建Topic
bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092
# 创建成功会提示信息
Created topic test
# 查看topic
bin/kafka-topics.sh --describe --topic test --bootstrap-server localhost:9092
复制代码
创建成功时,列出数据示例
第二步:启动一个消息发送者端,向名为test的Topic发送消息。
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
复制代码
当命令行出现【 > 】符号后,恣意输入一些字符,按回车键完成消息发送操作。Ctrl+C 退出命令行。
第三步:启动一个消息消耗端,从名为tes的Topic上吸收消息。
# 不加任何参数,是指默认从最新消息(latest) 开始消费
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
test1
test2
123
^CProcessed a total of 3 messages
复制代码
这样就完成了一个底子的消息交互。
3.2、其他消耗模式
消息消耗时,如果不指定参数,默认从最新消息开始消耗。消息消耗还有另外两种模式:
指定消耗进度
和
分组消耗
。
指定消耗进度
又可以分为重新开始或精确到从哪一条消息之后开始。
# 如果想要消费之前发送的消息,可以加参数--from-beginning指定
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
# 如果需要更精确的消费消息,甚至可以指定从哪一条消息开始消费。
# 从第0个partition上的第4个消息开始读
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --partition 0 --offset 4
复制代码
分组消耗
对于每个消耗者,可以指定一个消耗者组。
kafka中的同一条消息,只能被同一个消耗者组下的某一个消耗者消耗。不属于同一个消耗者组的其他消耗者,可以消耗到这一条消息。
在kafka-console-consumer.sh脚本中,可以通过参数–consumer-property group.id=【消耗组名】指定所属的消耗者组。
实验时,可以启动两个消耗者组、三个消耗者实例【三个窗口】,验证 分组消耗机制。
# 在窗口1和窗口2,输入消费者组【testGroup】的消费命令
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --consumer-property group.id=testGroup
# 在窗口3,输入消费者组【testGroup2】的消费命令
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --consumer-property group.id=testGroup2
复制代码
一切预备停当后,在生产者端发送消息,发现【窗口1】没有消耗提示。由此,可证实上面的结论。
查看消耗者组的偏移量
可以使用kafka-consumer-groups.sh观测消耗者的情况,包括它们的消耗进度。
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group testGroup
复制代码
从这里可以看出,固然业务上是通过Topic分发消息的,但是实际上,消息是保存在Partition数据结构上的。
4、我的公众号&资料获取
敬请关注我的公众号:
大象只为你
,连续更新技能知识…
相关资料获取:
如需centos7空白虚拟机,请后台复兴:blankOS。
blankOS登录账号暗码:root / 123456
如需客户端毗连工具Xshell,请后台复兴:Xshell。
如需上传工具Xftp,请后台复兴:Xftp。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
本帖子中包含更多资源
您需要
登录
才可以下载或查看,没有账号?
立即注册
x
回复
使用道具
举报
0 个回复
倒序浏览
返回列表
快速回复
高级模式
B
Color
Image
Link
Quote
Code
Smilies
您需要登录后才可以回帖
登录
or
立即注册
本版积分规则
发表回复
回帖并转播
回帖后跳转到最后一页
发新帖
回复
欢乐狗
金牌会员
这个人很懒什么都没写!
楼主热帖
C# 读写文件从用户态切到内核态,到底 ...
LeetCode刷题100道,让你滚瓜烂熟拿下S ...
我的 Java 学习&面试网站又又又升级了 ...
不到一周我开发出了属于自己的知识共享 ...
SQL server 2008 r2 安装教程
基于梯度优化的混沌PSO算法matlab仿真 ...
x64dbg 配置插件SDK开发环境
Spring Boot 多数据源配置
dfs学习笔记
KubeEdge在边缘计算领域的安全防护及洞 ...
标签云
运维
CIO
存储
服务器
浏览过的版块
容器及微服务
快速回复
返回顶部
返回列表