为什么要用Kafka?单机服务搭建&三步消息交互

打印 上一主题 下一主题

主题 988|帖子 988|积分 2964

1、为什么要用Kafka?

1.1、Kafka简介

Apache Kafka是一个分布式流处理平台,最初由LinkedIn开发并于2011年开源。它重要用于解决大规模数据的实时流式处理和数据管道标题。
它的特点包括:高吞吐量、长期化、分布式、实时性。常用于日记收集、消息系统、事件溯源、流处理等场景。
焦点概念:


  • Topic:数据类别或主题。
  • Partition:Topic 的分区,支持并行处理。
  • Producer:数据生产者,向 Kafka 发送消息。
  • Consumer:数据消耗者,从 Kafka 读取消息。
  • Broker:Kafka 服务器节点。
   官网所在:https://kafka.apache.org/
  1.2、为什么要用Kafka?

以一个典型的日记聚合应用场景为例

该业务场景要求产品的特点:
(1) 数据吞吐量很大: 需要 能够 快速收集各个 渠道的海量日记。
(2) 集群容错性高: 答应集群中少量节点崩溃。
(3) 功能不需要太复杂: Kafka的设计目标是高吞吐、低延迟和可扩展,重要关注消息通报而不是消息处理。以是,并没有支持死信队列、次序消息等高级功能。
(4) 答应少量数据丢失: Kafka本身也在不断优化数据安全标题,目前根本可以以为Kafka可以做到不会丢数据。
2、单机服务搭建

2.1、情况说明&版本选择

运行情况linux , centos 7.x ,内存2G 4核,JDK 17 (官方要求1.8或以上)
   我的虚拟机目录说明
  所有自定义安装目录:/opt/apps
  服务安装目录:/opt/apps/server
  在server目录下再分别创建两个目录:zookeeper 和 kafka
  Kafka官方下载所在:https://kafka.apache.org/downloads
选择当前最新的3.9.0版本: kafka_2.13-3.9.0.tgz
Zookeeper官方下载所在:https://zookeeper.apache.org/releases.html
Zookeeper的版本并没有逼迫要求,这里我们选择比较新的3.8.4稳定版本。注意下载的是bin版本
   kafka的安装程序中自带了Zookeeper,Zookeeper的客户端jar包在kafka的安装包的libs目录下。但通常情况下,为了更好维护应用,我们会单独摆设的Zookeeper,而不使用Kafka自带的Zookeeper。
  2.2、安装包上传并解压

把下载好的安装包上传到服务器,并解压。
   上传工具可使用Xftp,客户端毗连工具可使用Xshell,安装包在文末获取,官方个人免费许可,仅需邮箱注册即可。
  如果需要空白centos 7虚拟机,里面已经配置好jdk17和一些常用命令,在文末获取。
  1. # 解压zookeeper
  2. tar -xzvf apache-zookeeper-3.8.4-bin.tar.gz -C /opt/apps/server/zookeeper
  3. # 解压kafka
  4. tar -xzvf kafka_2.13-3.9.0.tgz -C /opt/apps/server/kafka/
复制代码
2.3、配置情况变量

将摆设目录下的bin目录路径配置到path情况变量中。
  1. # 打开系统级配置(对所有用户生效)
  2. vim /etc/profile
  3. # 配置的内容如下
  4. export ZOOKEEPER_HOME=/opt/apps/server/zookeeper/apache-zookeeper-3.8.4-bin
  5. export PATH=$ZOOKEEPER_HOME/bin:$PATH
  6. export KAFKA_HOME=/opt/apps/server/kafka/kafka_2.13-3.9.0
  7. export PATH=$KAFKA_HOME/bin:$PATH
  8. # 添加内容后,保存退出 【:wq!】
  9. # 生效环境变量
  10. source /etc/profile
  11. # 查看环境变量
  12. echo $PATH
复制代码
2.4、单机服务搭建

启动Kafka之前需要先启动Zookeeper。启动Zookeeper前需要先做一下简单配置。
进入Zookeeper摆设目录,切换到conf目录,复制zoo_sample.cfg,修改配置dataDir值,默认是暂时目录,随时会被删撤除。
  1. # 切换到Zookeeper的配置目录
  2. cd /opt/apps/server/zookeeper/apache-zookeeper-3.8.4-bin/conf/
  3. # 复制出一个配置文件
  4. cp zoo_sample.cfg zoo.cfg
  5. # 修改配置文件
  6. vim zoo.cfg
  7. # 修改内容如下
  8. dataDir=/opt/apps/server/zookeeper/data
复制代码
启动Zookeeper
  1. # 切换到Zookeeper根目录
  2. cd $ZOOKEEPER_HOME
  3. # 启动命令
  4. nohup bin/zkServer.sh start conf/zoo.cfg &
复制代码
确认服务是否启动成功,有三种方式
  1. # 方式一:jps指令,成功可看到一个QuorumPeerMain进程
  2. # 方式二:连接客户端测试,Zookeeper默认端口是2181
  3. bin/zkCli.sh -server localhost:2181
  4. # 方式三:检查端口是否监听
  5. netstat -tuln | grep 2181
复制代码
启动Kafka
  1. # 切换到Kafka根目录
  2. cd $KAFKA_HOME
  3. # 启动命令
  4. nohup bin/kafka-server-start.sh config/server.properties &
复制代码
确认服务是否启动成功,有两种方式
  1. # 方式一:jps指令,成功可看到一个Kafka进程
  2. # 方式二:检查端口是否监听,Kafka默认端口是9092
  3. netstat -tuln | grep 9092
复制代码
3、三步消息交互

Kafka的底子工作机制是消息发送者将消息发送到Kafka上指定的topic,消息消耗者从指定的topic上消耗消息。

底子消息交互三步骤:
第一步:使用Kafka提供的客户端脚本创建Topic。
第二步:启动一个消息发送者端,向指定名称的Topic发送消息。
第三步:启动一个消息消耗端,从指定名称的Topic上吸收消息。
此中,生产和消耗者并不需要同时启动。它们之间可以进行数据交互,但并不依赖于对方。没有生产者,消耗者依然可以正常工作。反过来,没有消耗者,生产者也可以正常工作。表现了生产者和消耗者之间的解耦。
3.1、实验消息交互

第一步:创建Topic名为test
  1. # 创建Topic
  2. bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092
  3. # 创建成功会提示信息
  4. Created topic test
  5. # 查看topic
  6. bin/kafka-topics.sh --describe --topic test --bootstrap-server localhost:9092
复制代码
创建成功时,列出数据示例

第二步:启动一个消息发送者端,向名为test的Topic发送消息。
  1. bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
复制代码
当命令行出现【 > 】符号后,恣意输入一些字符,按回车键完成消息发送操作。Ctrl+C 退出命令行。
第三步:启动一个消息消耗端,从名为tes的Topic上吸收消息。
  1. # 不加任何参数,是指默认从最新消息(latest) 开始消费
  2. bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
  3. test1
  4. test2
  5. 123
  6. ^CProcessed a total of 3 messages
复制代码
这样就完成了一个底子的消息交互。
3.2、其他消耗模式

消息消耗时,如果不指定参数,默认从最新消息开始消耗。消息消耗还有另外两种模式:指定消耗进度分组消耗
指定消耗进度
又可以分为重新开始或精确到从哪一条消息之后开始。
  1. # 如果想要消费之前发送的消息,可以加参数--from-beginning指定
  2. bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
  3. # 如果需要更精确的消费消息,甚至可以指定从哪一条消息开始消费。
  4. # 从第0个partition上的第4个消息开始读
  5. bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --partition 0 --offset 4
复制代码
分组消耗
对于每个消耗者,可以指定一个消耗者组。
kafka中的同一条消息,只能被同一个消耗者组下的某一个消耗者消耗。不属于同一个消耗者组的其他消耗者,可以消耗到这一条消息。
在kafka-console-consumer.sh脚本中,可以通过参数–consumer-property group.id=【消耗组名】指定所属的消耗者组。
实验时,可以启动两个消耗者组、三个消耗者实例【三个窗口】,验证 分组消耗机制。
  1. # 在窗口1和窗口2,输入消费者组【testGroup】的消费命令
  2. bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --consumer-property group.id=testGroup
  3. # 在窗口3,输入消费者组【testGroup2】的消费命令
  4. bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --consumer-property group.id=testGroup2
复制代码
一切预备停当后,在生产者端发送消息,发现【窗口1】没有消耗提示。由此,可证实上面的结论。

查看消耗者组的偏移量
可以使用kafka-consumer-groups.sh观测消耗者的情况,包括它们的消耗进度。
  1. bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group testGroup
复制代码

从这里可以看出,固然业务上是通过Topic分发消息的,但是实际上,消息是保存在Partition数据结构上的。
4、我的公众号&资料获取

敬请关注我的公众号:大象只为你,连续更新技能知识…
相关资料获取:
如需centos7空白虚拟机,请后台复兴:blankOS。
blankOS登录账号暗码:root / 123456
如需客户端毗连工具Xshell,请后台复兴:Xshell。
如需上传工具Xftp,请后台复兴:Xftp。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

欢乐狗

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表