学习本文技能需要已经有如下的基础要求:
- 熟悉Javase基础
- 熟悉Linux常用命令
- 熟悉ldea开辟工具
一、Kafka 概述
1.1 界说
Kafka是由Apache软件基金会开辟的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息体系,它可以处理消费者在网站中的全部动作流数据。 这种动作(网页欣赏,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日记和日记聚合来解决。 对于像Hadoop一样的日记数据和离线分析体系,但又要求实时处理的限定,这是一个可行的解决方案。Kafka的目标是通过Hadoop的并行加载机制来同一线上和离线的消息处理,也是为了通过集群来提供实时的消息。
1.2 消息队列
- 现在企业中比较常见的消息队列产品重要有Kafka、ActiveMQ、RabbitMQ、RocketMQ等。
- 在大数据场景重要采用Kafka作为消息队列。在JavaEE开辟中重要采用ActiveMQ、RabbitMQ、RocketMQ 作为消息队列。
1.2.1 消息队列内部实现原理
消息队列(非kafka)内部实现原理
1.2.2 传统消息队列的应用场景
传统的消息队列的重要应用场景包括:缓存/消峰(消去峰值)、解耦和异步通信。
1.2.3 消息队列的两种模式
1.3 Kafka 基础架构
- (1)Producer:消息生产者,就是向Kafka broker发消息的客户端。
- (2)Consumer:消息消费者,向Kafka broker取消息的客户端。
- (3)Consumer Group(CG):消费者组,由多个consumer构成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。全部的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
- (4)Broker:一台Kafka服务器就是一个broker。一个集群由多个broker构成。一个broker可以容纳多个topic。
- (5)Topic:可以理解为一个队列,生产者和消费者面向的都是一个topic。
- (6)Partition:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。
- (7)Replica:副本。一个topic的每个分区都有多少个副本,一个Leader和多少个Follower。
- (8)Leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是Leader。
- (9)Follower:每个分区多个副本中的“从”,实时从Leader中同步数据,保持和Leader数据的同步。Leader发生故障时,某个Follower会成为新的Leader。
抽象理解 (生产者消费者):
- 生产者消费者,生产者生产鸡蛋,消费者消费鸡蛋,生产者生产一个鸡蛋,消费者就消费一个鸡蛋,假设消费者消费鸡蛋的时候噎住了(体系宕机了),生产者还在生产鸡蛋,那新生产的鸡蛋就丢失了。
- 再比如生产者很强劲(大买卖业务量的情况),生产者1秒钟生产100个鸡蛋,消费者1秒钟只能吃50个鸡蛋,那要不了一会,消费者就吃不消了(消息堵塞,最终导致体系超时),消费者拒绝再吃了,”鸡蛋“又丢失了。
- 这个时候我们放个篮子在它们中间,生产出来的鸡蛋都放到篮子里,消费者去篮子里拿鸡蛋,如许鸡蛋就不会丢失了,都在篮子里。
上面的例子内里:
1.篮子就是kafka,鸡蛋实在就是数据流,体系之间的交互都是通过数据流来传输的(就是tcp、https什么的),也称为报文,大概消息。
2. 消息队列满了,实在就是篮子满了,鸡蛋放不下了,那赶紧多放几个篮子,实在就是kafka的扩容。
- producer:生产者,就是它来生产“鸡蛋”的。
- consumer:消费者,生出的“鸡蛋”它来消费。
- broker:就是篮子了,鸡蛋生产出来后放在篮子里。
- topic:你把它理解为标签,生产者每生产出来一个鸡蛋就贴上一个标签(topic),消费者可不是谁生产的“鸡蛋”都吃的,有的只吃草鸡蛋,有的吃洋鸡蛋,篮子中分为一个个小盒子,草鸡蛋放一个盒子里,洋鸡蛋放另一个盒子里。如许不同的生产者生产出来的“鸡蛋”,消费者就可以选择性的“吃”了。
各人肯定要学会抽象的去思索,上面只是属于业务的角度,假如从技能角度,topic标签实际就是队列,生产者把全部“鸡蛋(消息)”都放到对应的队列里了,消费者到指定的队列里取。
二、 Kafka 快速入门
2.1 安装前的预备
- 先搭建一台虚拟机,再克隆三台出来
- 步骤在这里(三台上面仅需安装jdk,hadoop(可不装),zookeeper(必须装))
2.2 安装部署
安装前的预备:
资源获取:
通过百度网盘分享的文件获取:
kafka软件大全 提取码:yyds
- 1.VMware的安装:VMware-workstation-full-15.5.0
- 2.镜像的安装:CentOS-7.5-x86_64-DVD-1804.iso
- 3.JDK的安装: jdk-8u212-linux-x64.tar.gz
- 4.Hadoop的安装: hadoop-3.1.3.tar.gz
2.2.1 集群规划
linux-102linux-103linux-104zkzkzkkafkakafkakafka 2.2.2 单节点或集群部署
- 1)本地资源下载获取(上面网盘获取)
- 2)上传压缩包到服务器 /usr/local下面并且进行解压
- tar -zxvf kafka_2.12-3.0.0.tgz
复制代码
3)修改解压后的文件名称到当前目录下
- mv kafka_2.12-3.0.0 kafka
复制代码
4)进入到/usr/local/kafka/config目录,修改设置文件 server.properties 如下③ 处地方,并生存。
①
②
③
创建文件夹 datas
5)分发安装包(针对上述集群设置,注:分发后设置内里的这个需要依次改动 broker.id=0,broker.id不得重复,整个集群中唯一。)
- scp -r /usr/local/kafka root@linux-103:/usr/local/
复制代码
6)设置环境变量
(1)在/etc/profile 文件中增长kafka环境变量设置
增长如下内容:
- #kafka
- export KAFKA_HOME=/usr/local/kafka
- export PATH=$PATH:$KAFKA_HOME/bin
复制代码
(2)革新一下环境变量。
(3)分发环境(只针对集群)变量文件到其他节点,并source。
7)启动集群
(1)先启动Zookeeper集群,然后启动Kafka。
- cd /usr/local/zookeeper/bin/
- ./zkServer.sh start
- ./zkServer.sh status
- cd /usr/local/kafka/bin/
- ./kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
- jps
复制代码
(2)依次在linux-102、linux-103、linux-104节点上启动Kafka。(针对集群)(犹如上述启动)
留意:设置文件的路径要能够到server.properties。
8)关闭(集群)
2.2.3 集群启停脚本
1)在/home/bin目录下创建文件kf.sh脚本文件(没有bin文件夹则创建文件夹)
脚本如下:
- #! /bin/bash
- case $1 in
- "start"){
- #集群
- #for i in linux-102 linux-103 linux-104
- #单节点
- for i in linux-102
- do
- echo " --------启动 $i Kafka-------"
- ssh $i "/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties"
- done
- };;
- "stop"){
- #集群
- #for i in linux-102 linux-103 linux-104
- #单节点
- for i in linux-102
- do
- echo " --------停止 $i Kafka-------"
- ssh $i "/usr/local/kafka/bin/kafka-server-stop.sh "
- done
- };;
- esac
复制代码
2)添加执行权限
3)启动集群命令
4)停止集群命令
留意:停止Kafka集群时,肯定要等Kafka全部节点历程全部停止后再停止Zookeeper集群。因为Zookeeper集群当中记录着Kafka集群相干信息,Zookeeper集群一旦先停止,Kafka集群就没有办法再获取停止历程的信息,只能手动杀死Kafka历程了。
2.3 Kafka命令行操作
2.3.1 主题命令行操作
1)查看操作主题命令参数
参数形貌- -bootstrap-server <String: server toconnect to>连接的Kafka Broker主机名称和端口号。- -topic <String: topic>操作的topic名称。- - create创建主题。- - delete删除主题。- - alter修改主题。- - list查看全部主题。- - describe查看主题详细形貌。- - partitions <Integer: # of partitions>设置分区数。- - replication-factor<Integer: replication factor>设置分区副本。- - config <String: name=value>更新体系默认的设置。 2)查看当前服务器中的全部 topic(其中 9092 是kafka的默认端口)
- ./kafka-topics.sh
- --bootstrap-server linux-102:9092 --list#一样平常是生产环境./kafka-topics.sh
- --bootstrap-server linux-102:9092,linux-103:9092 --list
复制代码
3)创建 first topic
- ./kafka-topics.sh
- --bootstrap-server linux-102:9092 --topic first --create --partitions 1 --replication-factor 1./kafka-topics.sh
- --bootstrap-server linux-102:9092 --list
复制代码
选项说明:
- --topic 定义topic名
- --partitions 定义分区数
- --replication-factor 定义副本数
复制代码 4)查看first主题的详情
- ./kafka-topics.sh
- --bootstrap-server linux-102:9092 --describe --topic first
复制代码
5)修改分区数(留意:分区数只能增长,不能淘汰)
6)再次查看first主题的详情
- ./kafka-topics.sh
- --bootstrap-server linux-102:9092 --alter --topic first --partitions 2./kafka-topics.sh
- --bootstrap-server linux-102:9092 --describe --topic first
复制代码
7)删除 topic,删除后再次查看还有没有 topic
- ./kafka-topics.sh
- --bootstrap-server linux-102:9092 --delete --topic first./kafka-topics.sh
- --bootstrap-server linux-102:9092 --list
复制代码
2.3.2 生产者命令行操作
操作:开两个窗口,一个作为生产者,一个作为消费者,当在生产者窗口命令输入的时候,消费者窗口就会主动输出(附截图最后)。
1)查看操作生产者命令参数
- kafka-console-producer.sh
复制代码
参数形貌- -bootstrap-server <String: server toconnect to>连接的Kafka Broker主机名称和端口号。- -topic <String: topic>操作的topic名称。 2)发送消息
2.3.3 消费者命令行操作
1)查看操作消费者命令参数
- ./kafka-console-consumer.sh
复制代码
参数形貌- -bootstrap-server <String: server toconnect to>连接的Kafka Broker主机名称和端口号。- -topic <String: topic>操作的topic名称。- -from-beginning从头开始消费。- -group <String: consumer group id>指定消费者组名称。 2)消费消息
(1)消费first主题中的数据。
- ./kafka-console-consumer.sh
- --bootstrap-server linux-102:9092 --topic first
复制代码
(2)把主题中全部的数据都读取出来(包括汗青数据)。
- ./kafka-console-consumer.sh
- --bootstrap-server linux-102:9092 --from-beginning --topic first
复制代码
2.3.4 生产者生产消费者消费
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |