IT评测·应用市场-qidao123.com
标题:
为什么要用Kafka?单机服务搭建&三步消息交互
[打印本页]
作者:
欢乐狗
时间:
2025-3-13 07:56
标题:
为什么要用Kafka?单机服务搭建&三步消息交互
1、为什么要用Kafka?
1.1、Kafka简介
Apache Kafka是一个分布式流处理平台,最初由LinkedIn开发并于2011年开源。它重要用于解决大规模数据的实时流式处理和数据管道标题。
它的特点包括:高吞吐量、长期化、分布式、实时性。常用于日记收集、消息系统、事件溯源、流处理等场景。
焦点概念:
Topic:数据类别或主题。
Partition:Topic 的分区,支持并行处理。
Producer:数据生产者,向 Kafka 发送消息。
Consumer:数据消耗者,从 Kafka 读取消息。
Broker:Kafka 服务器节点。
官网所在:https://kafka.apache.org/
1.2、为什么要用Kafka?
以一个典型的日记聚合应用场景为例
该业务场景要求产品的特点:
(1) 数据吞吐量很大:
需要 能够 快速收集各个 渠道的海量日记。
(2) 集群容错性高:
答应集群中少量节点崩溃。
(3) 功能不需要太复杂:
Kafka的设计目标是高吞吐、低延迟和可扩展,重要关注消息通报而不是消息处理。以是,并没有支持死信队列、次序消息等高级功能。
(4) 答应少量数据丢失:
Kafka本身也在不断优化数据安全标题,目前根本可以以为Kafka可以做到不会丢数据。
2、单机服务搭建
2.1、情况说明&版本选择
运行情况linux , centos 7.x ,内存2G 4核,JDK 17 (官方要求1.8或以上)
我的虚拟机目录说明
所有自定义安装目录:/opt/apps
服务安装目录:/opt/apps/server
在server目录下再分别创建两个目录:zookeeper 和 kafka
Kafka官方下载所在:https://kafka.apache.org/downloads
选择当前最新的3.9.0版本: kafka_2.13-3.9.0.tgz
Zookeeper官方下载所在:https://zookeeper.apache.org/releases.html
Zookeeper的版本并没有逼迫要求,这里我们选择比较新的3.8.4稳定版本。注意下载的是
bin版本
。
kafka的安装程序中自带了Zookeeper,Zookeeper的客户端jar包在kafka的安装包的libs目录下。但通常情况下,为了更好维护应用,我们会单独摆设的Zookeeper,而不使用Kafka自带的Zookeeper。
2.2、安装包上传并解压
把下载好的安装包上传到服务器,并解压。
上传工具可使用Xftp,客户端毗连工具可使用Xshell,安装包在文末获取,官方个人免费许可,仅需邮箱注册即可。
如果需要空白centos 7虚拟机,里面已经配置好jdk17和一些常用命令,在文末获取。
# 解压zookeeper
tar -xzvf apache-zookeeper-3.8.4-bin.tar.gz -C /opt/apps/server/zookeeper
# 解压kafka
tar -xzvf kafka_2.13-3.9.0.tgz -C /opt/apps/server/kafka/
复制代码
2.3、配置情况变量
将摆设目录下的bin目录路径配置到path情况变量中。
# 打开系统级配置(对所有用户生效)
vim /etc/profile
# 配置的内容如下
export ZOOKEEPER_HOME=/opt/apps/server/zookeeper/apache-zookeeper-3.8.4-bin
export PATH=$ZOOKEEPER_HOME/bin:$PATH
export KAFKA_HOME=/opt/apps/server/kafka/kafka_2.13-3.9.0
export PATH=$KAFKA_HOME/bin:$PATH
# 添加内容后,保存退出 【:wq!】
# 生效环境变量
source /etc/profile
# 查看环境变量
echo $PATH
复制代码
2.4、单机服务搭建
启动Kafka之前需要先启动Zookeeper。启动Zookeeper前需要先做一下简单配置。
进入Zookeeper摆设目录,切换到conf目录,复制zoo_sample.cfg,修改配置dataDir值,默认是暂时目录,随时会被删撤除。
# 切换到Zookeeper的配置目录
cd /opt/apps/server/zookeeper/apache-zookeeper-3.8.4-bin/conf/
# 复制出一个配置文件
cp zoo_sample.cfg zoo.cfg
# 修改配置文件
vim zoo.cfg
# 修改内容如下
dataDir=/opt/apps/server/zookeeper/data
复制代码
启动Zookeeper
# 切换到Zookeeper根目录
cd $ZOOKEEPER_HOME
# 启动命令
nohup bin/zkServer.sh start conf/zoo.cfg &
复制代码
确认服务是否启动成功,有三种方式
# 方式一:jps指令,成功可看到一个QuorumPeerMain进程
# 方式二:连接客户端测试,Zookeeper默认端口是2181
bin/zkCli.sh -server localhost:2181
# 方式三:检查端口是否监听
netstat -tuln | grep 2181
复制代码
启动Kafka
# 切换到Kafka根目录
cd $KAFKA_HOME
# 启动命令
nohup bin/kafka-server-start.sh config/server.properties &
复制代码
确认服务是否启动成功,有两种方式
# 方式一:jps指令,成功可看到一个Kafka进程
# 方式二:检查端口是否监听,Kafka默认端口是9092
netstat -tuln | grep 9092
复制代码
3、三步消息交互
Kafka的底子工作机制是消息发送者将消息发送到Kafka上指定的topic,消息消耗者从指定的topic上消耗消息。
底子消息交互三步骤:
第一步:使用Kafka提供的客户端脚本创建Topic。
第二步:启动一个消息发送者端,向指定名称的Topic发送消息。
第三步:启动一个消息消耗端,从指定名称的Topic上吸收消息。
此中,生产和消耗者并不需要同时启动。它们之间可以进行数据交互,但并不依赖于对方。没有生产者,消耗者依然可以正常工作。反过来,没有消耗者,生产者也可以正常工作。表现了生产者和消耗者之间的解耦。
3.1、实验消息交互
第一步:创建Topic名为test
# 创建Topic
bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092
# 创建成功会提示信息
Created topic test
# 查看topic
bin/kafka-topics.sh --describe --topic test --bootstrap-server localhost:9092
复制代码
创建成功时,列出数据示例
第二步:启动一个消息发送者端,向名为test的Topic发送消息。
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
复制代码
当命令行出现【 > 】符号后,恣意输入一些字符,按回车键完成消息发送操作。Ctrl+C 退出命令行。
第三步:启动一个消息消耗端,从名为tes的Topic上吸收消息。
# 不加任何参数,是指默认从最新消息(latest) 开始消费
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
test1
test2
123
^CProcessed a total of 3 messages
复制代码
这样就完成了一个底子的消息交互。
3.2、其他消耗模式
消息消耗时,如果不指定参数,默认从最新消息开始消耗。消息消耗还有另外两种模式:
指定消耗进度
和
分组消耗
。
指定消耗进度
又可以分为重新开始或精确到从哪一条消息之后开始。
# 如果想要消费之前发送的消息,可以加参数--from-beginning指定
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
# 如果需要更精确的消费消息,甚至可以指定从哪一条消息开始消费。
# 从第0个partition上的第4个消息开始读
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --partition 0 --offset 4
复制代码
分组消耗
对于每个消耗者,可以指定一个消耗者组。
kafka中的同一条消息,只能被同一个消耗者组下的某一个消耗者消耗。不属于同一个消耗者组的其他消耗者,可以消耗到这一条消息。
在kafka-console-consumer.sh脚本中,可以通过参数–consumer-property group.id=【消耗组名】指定所属的消耗者组。
实验时,可以启动两个消耗者组、三个消耗者实例【三个窗口】,验证 分组消耗机制。
# 在窗口1和窗口2,输入消费者组【testGroup】的消费命令
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --consumer-property group.id=testGroup
# 在窗口3,输入消费者组【testGroup2】的消费命令
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --consumer-property group.id=testGroup2
复制代码
一切预备停当后,在生产者端发送消息,发现【窗口1】没有消耗提示。由此,可证实上面的结论。
查看消耗者组的偏移量
可以使用kafka-consumer-groups.sh观测消耗者的情况,包括它们的消耗进度。
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group testGroup
复制代码
从这里可以看出,固然业务上是通过Topic分发消息的,但是实际上,消息是保存在Partition数据结构上的。
4、我的公众号&资料获取
敬请关注我的公众号:
大象只为你
,连续更新技能知识…
相关资料获取:
如需centos7空白虚拟机,请后台复兴:blankOS。
blankOS登录账号暗码:root / 123456
如需客户端毗连工具Xshell,请后台复兴:Xshell。
如需上传工具Xftp,请后台复兴:Xftp。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
欢迎光临 IT评测·应用市场-qidao123.com (https://dis.qidao123.com/)
Powered by Discuz! X3.4