Kafka 3.x.x 入门到精通(01)——对标尚硅谷Kafka教程

打印 上一主题 下一主题

主题 897|帖子 897|积分 2691



本文档参看的视频是:


  • 尚硅谷Kafka教程,2024新版kafka视频,零基础入门到实战
  • 黑马程序员Kafka视频教程,大数据企业级消息队列kafka入门到精通
  • 小朋友也可以懂的Kafka入门教程,还不快来学
本文档参看的文档是:


  • 尚硅谷官方文档,并在基础上修改 完善!非常感谢尚硅谷团队!!!!

1. Kafka入门

1.1 概述

Kafka是一种消息队列,紧张用来处理大量数据状态下的消息队列,一般用来做日志的处理。既然是消息队列,那么Kafka也就拥有消息队列的相应的特性了。
作为消息传输的中间件





1.1.1 初识Kafka






   进程之间的交互题目,耦合性高!!!!
  那么解耦合!!!

也就是消息中间件!!!
Kafka是一个由Scala和Java语言开发的,经典高吞吐量的分布式消息发布和订阅系统,也是大数据技术范畴中用作数据互换的焦点组件之一。以高吞吐,低耽误,高伸缩,高可靠性,高并发,且社区活泼度高等特性,从而备受广大技术组织的喜好。
2010年,Linkedin公司为相识决消息传输过程中由各种缺陷导致的阻塞、服务无法访问等题目,主导开发了一款分布式消息日志传输系统。主导开发的首席架构师Jay Kreps因为喜欢写出《变形记》的西方体现主义文学先驱小说家Jay Kafka,所以给这个消息系统起了一个很酷,却和软件系统特性无关的名称Kafka。
因为备受技术组织的喜好,2011年,Kafka软件被捐献给Apache基金会,并于7月被纳入Apache软件基金会孵化器项目举行孵化。2012年10月,Kafka从孵化器项目中毕业,转成Apache的顶级项目。由独立的消息日志传输系统转型为开源分布式事件流处理平台系统,被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用程序。
官网地址:https://kafka.apache.org/


1.1.2 消息队列

   Kafka软件最初的设计就是专门用于数据传输的消息系统,类似功能的软件有RabbitMQ、ActiveMQ、RocketMQ等。这些软件名称中的MQ是英文单词Message Queue的简称,也就是所谓的消息队列的意思。这些软件的焦点功能是传输数据,而Java中假如想要实现数据传输功能,那么这个软件一般需要遵循Java消息服务技术规范JMS(Java Message Service)。前面提到的ActiveMQ软件就完全遵循了JMS技术规范,而RabbitMQ是遵循了类似JMS规范并兼容JMS规范的跨平台的AMQP(Advanced Message Queuing Protocol)规范。除了上面形貌的JMS,AMQP外,另有一种用于物联网小型装备之间传输消息的MQTT通讯协议。
  Kafka拥有作为一个消息系统应该具备的功能,但是却有着独特的设计。可以如许说,Kafka借鉴了JMS规范的思想,但是却并没有完全遵循JMS规范。这也恰恰是软件名称为Kafka,而不是KafkaMQ的原因。

这个消息中间件还是挺多的 JavaEE为了规范如许的接口——Java Message Service
由上可知,无论学习哪一种消息传输系统,JMS规范都是大家应该首先相识的。所以咱们这里就对JMS规范做一个简单的先容:


  • JMS是Java平台的消息中间件通用规范,定义了紧张用于消息中间件的尺度接口。假如不是很理解这个概念,可以简单地将JMS类比为Java和数据库之间的JDBC规范。Java应用程序根据JDBC规范种的接口访问关系型数据库,而每个关系型数据库厂商可以根据JDBC接口来实现具体的访问规则。JMS定义的就是系统和系统之间传输消息的接口。
  • 为了实现系统和系统之间的数据传输,JMS规范中定义许多用于通讯的组件:




  • JMS Provider:JMS消息提供者。实在就是实现JMS接口和规范的消息中间件,也就是我们提供消息服务的软件系统,比如RabbitMQ、ActiveMQ、Kafka。
  • JMS Message:JMS消息。这里的消息指的就是数据。一般接纳Java数据模型举行封装,此中包罗消息头,消息属性和消息主体内容。
  • JMS Producer:JMS消息生产者。所谓的生产者,就是生产数据的客户端应用程序,这些应用通过JMS接口发送JMS消息。
  • JMS Consumer:JMS消息消费者。所谓的消费者,就是从消息提供者(JMS Provider)中获取数据的客户端应用程序,这些应用通过JMS接口接收JMS消息。
JMS支持两种消息发送和接收模型:一种是P2P(Peer-to-Peer)点对点模型,另外一种是发布/订阅(Publish/Subscribe)模型。


  • P2P模型:P2P模型是基于队列的,消息生产者将数据发送到消息队列中,消息消费者从消息队列中接收消息。因为队列的存在,消息的异步传输成为可能。P2P模型的规定就是每一个消息数据,只有一个消费者,当发送者发送消息以后,不管接收者有没有运行都不影响消息发布到队列中。接收者在成功接收消息后会向发送者发送接收成功的消息


  • 发布 / 订阅模型:所谓得发布订阅模型就是事先将传输的数据举行分类,我们管这个数据的分类称之为主题(Topic)。也就是说,生产者发送消息时,会根据主题举行发送。比如咱们的消息中有一个分类是NBA,那么生产者在生产消息时,就可以将NBA篮球消息数据发送到NBA主题中,如许,对NBA消息主题感兴趣的消费者就可以申请订阅NBA主题,然后从该主题中获取消息。如许,也就是说一个消息,是允许被多个消费者同时消费的。这里生产者发送消息,我们称之为发布消息,而消费者从主题中获取消息,我们就称之为订阅消息。Kafka接纳就是这种模型。


1.1.3 生产者-消费者模式

生产者-消费者模式是通过一个容器来解决生产者和消费者的强耦合题目
   生产者和消费者相互之间不直接通讯,而通过阻塞队列来举行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相称于一个消息缓冲区,平衡了生产者和消费者的处理能力。在数据传输过程中,起到了一个削弱峰值的作用,也就是我们常常说到的削峰。

  图形中的缓冲区就是用来给生产者和消费者解耦的。在单点环境中,我们一般会接纳阻塞式队列实现这个缓冲区。而在分布式环境中,一般会接纳第三方软件实现缓冲区,这个第三方软件我们一般称之为中间件。纵观大多数应用场景,解耦合最常用的方式就是增长中间件。
遵循JMS规范的消息传输软件(RabbitMQ、ActiveMQ、Kafka、RocketMQ),我们一般就称之为消息中间件。使用软件的目的本质上也就是为了降低消息生产者和消费者之间的耦合性。提升消息的传输效率。

1.1.4 消息中间件对比

特性ActiveMQRabbitMQRocketMQKafka单机吞吐量万级,比RocketMQ,Kafka低一个数目级万级,比RocketMQ,Kafka低一个数目级10万级,支持高吞吐10万级,支持高吞吐Topic数目对吞吐量的影响Topic可以达到几百/几千量级Topic可以达到几百量级,假如更多的话,吞吐量会大幅度下降时效性ms级微秒级别,耽误最低ms级ms级可用性高,基于主从架构实现高可用高,基于主从架构实现高可用非常高,分布式架构非常高,分布式架构消息可靠性有较低的概率丢失数据根本不丢颠末参数优化配置,可以做到0丢失颠末参数优化配置,可以做到0丢失功能支持MQ范畴的功能极其完备并发能力强,性能极好,延时很低MQ功能较为完善,分布式,扩展性好功能较为简单,支持简单的MQ功能,在大数据范畴被广泛使用其他很早的软件,社区不是很活泼开源,稳定,社区活泼度高阿里开发,社区活泼度不高开源,高吞吐量,社区活泼度极高 通过上面各种消息中间件的对比,大概可以相识,在大数据场景中我们紧张接纳kafka作为消息中间件,而在JaveEE开发中我们紧张接纳ActiveMQ、RabbitMQ、RocketMQ作为消息中间件。假如将JavaEE和大数据在项目中举行融合的话,那么Kafka实在是一个不错的选择。

1.1.5 ZooKeeper

ZooKeeper是一个开放源码的分布式应用程序协调服务软件。在当前的Web软件开发中,多节点分布式的架构设计已经成为必然,那么怎样包管架构中差别的节点所运行的环境,系统配置是相同的,就是一个非常紧张的话题。
一般情况下,我们会接纳独立的第三方软件保存分布式系统中的全局环境信息以及系统配置信息,如许系统中的每一个节点在运行时就可以从第三方软件中获取同等的数据。也就是说通过这个第三方软件来协调分布式各个节点之间的环境以及配置信息。
Kafka软件是一个分布式事件流处理平台系统,底层接纳分布式的架构设计,就是说,也存在多个服务节点,多个节点之间Kafka就是接纳ZooKeeper来实现协调调度的。
ZooKeeper的焦点作用:


  • ZooKeeper的数据存储布局可以简单地理解为一个Tree布局,而Tree布局上的每一个节点可以用于存储数据,所以一般情况下,我们可以将分布式系统的元数据(环境信息以及系统配置信息)保存在ZooKeeper节点中
  • ZooKeeper创建数据节点时,会根据业务场景创建暂时节点或永世(持久)节点。永世节点就是无论客户端是否毗连上ZooKeeper都一直存在的节点,而暂时节点指的是客户端毗连时创建,断开毗连后删除的节点。同时,ZooKeeper也提供了Watch(监控)机制用于监控节点的变化,然后关照对应的客户端举行相应的变化。Kafka软件中就内置了ZooKeeper的客户端,用于举行ZooKeeper的毗连和通讯。
   实在,Kafka作为一个独立的分布式消息传输系统,还需要第三方软件举行节点间的协调调度,不能实现自我管理,无形中就导致Kafka和其他软件之间形成了耦合性,制约了Kafka软件的发展,所以从Kafka 2.8.X版本开始,Kafka就尝试增长了Raft算法实现节点间的协调管理,来代替ZooKeeper。不过Kafka官方不保举此方式应用在生产环境中,计划在Kafka 4.X版本中完全移除ZooKeeper,让我们拭目以待。
  

1.2 快速上手

1.2.1 环境安装

作为开源分布式事件流处理平台,Kafka分布式软件环境的安装相对比力复杂,不利于Kafka软件的入门学习和训练。所以我们这里先搭建相对比力简单的windows单机环境,让初学者快速掌握软件的根本原理和用法,后面的课程中,我们再深入学习Kafka软件在生产环境中的安装和使用。
1.2.1.1 安装Java8(略)

   当前Java软件开发中,主流的版本就是Java 8,而Kafka 3.X官方建议Java版本更新至Java11,但是Java8依然可用。将来Kafka 4.X版本会完全弃用Java8,不过,咱们当前学习的Kafka版本为3.6.1版本,所以使用Java8即可,无需升级。
Kafka的绝大数代码都是Scala语言编写的,而Scala语言本身就是基于Java语言开发的,而且由于Kafka内置了Scala语言包,所以Kafka是可以直接运行在JVM上的,无需安装其他软件。你能看到这个课件,信赖你肯定已经安装Java8了,根本的环境变量也应该配置好了,所以此处安装过程省略。
  1.2.1.2 安装Kafka



  • 下载软件安装包:kafka_2.12-3.6.1.tgz,下载地址:https://kafka.apache.org/downloads


  • 这里的3.6.1,是Kafka软件的版本。停止到2023年12月24日,Kafka最新版本为3.6.1。
  • 2.12是对应的Scala开发语言版本。Scala2.12和Java8是兼容的,所以可以直接使用。
  • tgz是一种linux系统中常见的压缩文件格式,类似与windows系统的zip和rar格式。所以Windows环境中可以直接使用压缩工具举行解压缩

  • 解压文件:kafka_2.12-3.6.1.tgz,解压目录为非系统盘的根目录,比如e:/
为了访问方便,可以将解压后的文件目录改为kafka, 更改后的文件目录布局如下:
binlinux系统下可执行脚本文件bin/windowswindows系统下可执行脚本文件config配置文件libs依赖类库licenses允许信息site-docs文档logs服务日志 1.2.1.3 启动ZooKeeper

当前版本Kafka软件内部依然依赖ZooKeeper举行多节点协调调度,所以启动Kafka软件之前,需要先启动ZooKeeper软件。不过因为Kafka软件本身内置了ZooKeeper软件,所以无需额外安装ZooKeeper软件,直接调用脚本下令启动即可。具体利用步骤如下:


  • 进入Kafka解压缩文件夹的config目录,修改zookeeper.properties配置文件
  1. # the directory where the snapshot is stored.
  2. # 修改dataDir配置,用于设置ZooKeeper数据存储位置,该路径如果不存在会自动创建。
  3. dataDir=E:/kafka_2.12-3.6.1/data/zk
复制代码



  • 打开DOS窗口,进入e:/kafka_2.12-3.6.1/bin/windows目录
  • 因为本章节演示的是Windows环境下Kafka软件的安装和使用,所以启动 ZooKeeper软件的指令为Windows环境下的bat批处理文件。调用启动指令时, 需要传递配置文件的路径
  1. # 因为当前目录为windows,所以需要通过相对路径找到zookeeper的配置文件。
  2. zookeeper-server-start.bat ../../config/zookeeper.properties
复制代码



  • 出现如下界面,ZooKeeper启动成功。

  • 为了利用方便,也可以在kafka解压缩后的目录中,创建脚本文件zk.cmd。
  1. # 调用启动命令,且同时指定配置文件。
  2. call bin/windows/zookeeper-server-start.bat config/zookeeper.properties
复制代码

1.2.1.4 启动Kafka



  • 进入Kafka解压缩文件夹的config目录,修改server.properties配置文件
  1. # Listener name, hostname and port the broker will advertise to clients.
  2. # If not set, it uses the value for "listeners".
  3. # 客户端访问Kafka服务器时,默认连接的服务为本机的端口9092,如果想要改变,可以修改如下配置
  4. # 此处我们不做任何改变,默认即可
  5. #advertised.listeners=PLAINTEXT://your.host.name:9092
  6. # A comma separated list of directories under which to store log files
  7. # 配置Kafka数据的存放位置,如果文件目录不存在,会自动生成。
  8. log.dirs=E:/kafka_2.12-3.6.1/data/kafka
复制代码



  • 打开DOS窗口,进入e:/kafka_2.12-3.6.1/bin/windows目录



  • 调用启动指令,传递配置文件的路径
  1. # 因为当前目录为windows,所以需要通过相对路径找到kafka的配置文件。
  2. kafka-server-start.bat ../../config/server.properties
复制代码



  • 为了利用方便,也可以在kafka解压缩后的目录中,创建脚本文件kfk.cmd。
  1. # 调用启动命令,且同时指定配置文件。
  2. call bin/windows/kafka-server-start.bat config/server.properties
复制代码



  • DOS窗口中,输入jps指令,查看当前启动的软件进程

   这里名称为QuorumPeerMain的就是ZooKeeper软件进程,名称为Kafka的就是Kafka系统进程。此时,说明Kafka已经可以正常使用了。
  

1.2.2 消息主题


   说的就是Topic的事变!!因为 我运行的时候 没有Topic 对于Producer 和 Consumer 都需要用到Topic 所以。。。。
  在消息发布/订阅(Publish/Subscribe)模型中,为了可以让消费者对感兴趣的消息举行消费,而不是对全部的数据举行消费,包罗那些不感兴趣的消息,所以定义了主题(Topic)的概念,也就是说将差别的消息举行分类,分成差别的主题(Topic),然后消息生产者在生成消息时,就会向指定的主题(Topic)中发送。而消息消费者也可以订阅自己感兴趣的主题(Topic)并从中获取消息。
有许多种方式都可以利用Kafka消息中的主题(Topic):下令行、第三方工具、Java API、自动创建。而对于初学者来讲,掌握根本的下令行利用是必要的。所以接下来,我们接纳下令行举行利用。

1.2.2.1 创建主题



  • 启动ZooKeeper,Kafka服务进程(略)
  • 打开DOS窗口,进入e:/kafka_2.12-3.6.1/bin/windows目录
dir 一下



  • DOS窗口输入指令,创建主题
Kafka是通过 kafka-topics.bat 指令文件举行消息主题利用的。此中包罗了对主题的查询,创建,删除等功能。
调用指令创建主题时,需要传递多个参数,而且参数的前缀为两个横线。因为参数比力多
为了演示方便,这里我们只说明必须传递的参数,其他参数后面课程中会举行解说


  • --bootstrap-server : 把当前的DOS窗口当成Kafka的客户端,那么举行利用前,就需要毗连服务器,这里的参数就体现服务器的毗连方式,因为我们在本机启动Kafka服务进程,且Kafka默认端口为9092,所以此处,后面接的参数值为localhost:9092,用空格隔开
  • --create : 体现对主题的创建利用,是个利用参数,后面无需增长参数值
  • --topic : 主题的名称,后面接的参数值一般就是见名知意的字符串名称,类似于java中的字符串范例标识符名称,当然也可以使用数字,只不过末了还是当成数字字符串使用。
  1. # 指令
  2. kafka-topics.bat --bootstrap-server localhost:9092 --create --topic test
复制代码



1.2.2.2 查询主题



  • DOS窗口输入指令,查看全部主题
Kafka是通过kafka-topics.bat文件举行消息主题利用的。此中包罗了对主题的查询,创建,删除等功能。
–bootstrap-server : 把当前的DOS窗口当成Kafka的客户端,那么举行利用前,就需要毗连服务器,这里的参数就体现服务器的毗连方式,因为我们在本机启动Kafka服务进程,且Kafka默认端口为9092,所以此处,后面接的参数值为localhost:9092,用空格隔开
–list : 体现对全部主题的查询利用,是个利用参数,后面无需增长参数值
  1. # 指令
  2. kafka-topics.bat --bootstrap-server localhost:9092 --list
复制代码



  • DOS窗口输入指令,查看指定主题信息
–bootstrap-server : 把当前的DOS窗口当成Kafka的客户端,那么举行利用前,就需要毗连服务器,这里的参数就体现服务器的毗连方式,因为我们在本机启动Kafka服务进程,且Kafka默认端口为9092,所以此处,后面接的参数值为localhost:9092,用空格隔开
–describe : 查看主题的详细信息
–topic : 查询的主题名称
  1. # 指令
  2. kafka-topics.bat --bootstrap-server localhost:9092 --describe --topic test
复制代码


1.2.2.3 修改主题

创建主题后,可能需要对某些参数举行修改,那么就需要使用指令举行利用。


  • DOS窗口输入指令,修改指定主题的参数
Kafka是通过kafka-topics.bat文件举行消息主题利用的。此中包罗了对主题的查询,创建,删除等功能。
–bootstrap-server : 把当前的DOS窗口当成Kafka的客户端,那么举行利用前,就需要毗连服务器,这里的参数就体现服务器的毗连方式,因为我们在本机启动Kafka服务进程,且Kafka默认端口为9092,所以此处,后面接的参数值为localhost:9092,用空格隔开
–alter : 体现对全部主题的查询利用,是个利用参数,后面无需增长参数值
–topic : 修改的主题名称
–partitions : 修改的配置参数:分区数目
  1. # 指令
  2. kafka-topics.bat --bootstrap-server localhost:9092 --topic test --alter --partitions 2
复制代码


1.2.2.4 删除主题

假如主题创建后不需要了,或创建的主题有题目,那么我们可以通过相应的指令删除主题。


  • DOS窗口输入指令,删除指定名称的主题
Kafka是通过kafka-topics.bat文件举行消息主题利用的。此中包罗了对主题的查询,创建,删除等功能。
–bootstrap-server : 把当前的DOS窗口当成Kafka的客户端,那么举行利用前,就需要毗连服务器,这里的参数就体现服务器的毗连方式,因为我们在本机启动Kafka服务进程,且Kafka默认端口为9092,所以此处,后面接的参数值为localhost:9092,用空格隔开
–delete: 体现对主题的删除利用,是个利用参数,后面无需增长参数值。默认情况下,删除利用是逻辑删除,也就是说数据存储的文件依然存在,但是通过指令查询不出来。假如想要直接删除,需要在server.properties文件中设置参数delete.topic.enable=true
–topic : 删除的主题名称
  1. # 指令
  2. kafka-topics.bat --bootstrap-server localhost:9092 --topic test --delete
复制代码

注意:windows系统中由于权限或进程锁定的题目,删除topic会导致kafka服务节点非常关闭。


请在后续的linux系统下演示此利用。

1.2.3 生产数据

消息主题创建好了,就可以通过Kafka客户端向Kafka服务器的主题中发送消息了。Kafka生产者客户端并不是一个独立的软件系统,而是一套API接口,只要通过接口能毗连Kafka并发送数据的组件我们都可以称之为Kafka生产者。下面我们就演示几种差别的方式:
1.2.3.1 下令行利用



  • 打开DOS窗口,进入e:/kafka_2.12-3.6.1/bin/windows目录




  • DOS窗口输入指令,进入生产者控制台
Kafka是通过kafka-console-producer.bat文件举行消息生产者利用的。
调用指令时,需要传递多个参数,而且参数的前缀为两个横线,因为参数比力多。为了演示方便,这里我们只说明必须传递的参数,其他参数后面课程中会举行解说
–bootstrap-server : 把当前的DOS窗口当成Kafka的客户端,那么举行利用前,就需要毗连服务器,这里的参数就体现服务器的毗连方式,因为我们在本机启动Kafka服务进程,且Kafka默认端口为9092,所以此处,后面接的参数值为localhost:9092,用空格隔开。早期版本的Kafka也可以通过 --broker-list参数举行毗连,当前版本已经不保举使用了。
–topic : 主题的名称,后面接的参数值就是之前已经创建好的主题名称。
  1. # 指令
  2. kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test
复制代码

  1. # 指令
  2. kafka-console-producer.bat --bootstrap-server localhost:9092 --topic test
复制代码



  • 控制台生产数据


   注意:这里的数据需要回车后,才能真正将数据发送到Kafka服务器。
  

1.2.3.2 工具利用

   有的时候,使用下令行举行利用还是有一些麻烦,而且利用起来也不是很直观,所以我们一般会接纳一些小工具举行快速访问。这里我们先容一个kafkatool_64bit.exe工具软件。软件的安装过程比力简单,根据提示默认安装即可,这里就不举行先容了。

  

  • 安装好以后,我们打开工具

  • 点击左上角按钮File -> Add New Connection…建立毗连

  • 点击Test按钮测试

  • 增长毗连


  • 按照下面的步骤,生产数据

  • 增长成功后,点击绿色箭头按钮举行查询,工具会显示当前数据


1.2.3.3 Java API

一般情况下,我们也可以通过Java程序来生产数据,所以接下来,我们就演示一下IDEA中使用Kafka Java API来生产数据:


  • 创建Kafka项目


  • 修改pom.xml文件,增长Maven依赖

  1. <dependencies>
  2.     <dependency>
  3.         <groupId>org.apache.kafka</groupId>
  4.         <artifactId>kafka-clients</artifactId>
  5.         <version>3.6.1</version>
  6.     </dependency>
  7. </dependencies>
复制代码


  • 创建 com.atguigu.kafka.test.KafkaProducerTest类

  • 添加main方法,并增长生产者代码


  1. package com.atguigu.kafka.test;
  2. import org.apache.kafka.clients.producer.KafkaProducer;
  3. import org.apache.kafka.clients.producer.ProducerConfig;
  4. import org.apache.kafka.clients.producer.ProducerRecord;
  5. import java.util.HashMap;
  6. import java.util.Map;
  7. public class KafkaProducerTest {
  8.     public static void main(String[] args) {
  9.         // TODO 配置属性集合
  10.         Map<String, Object> configMap = new HashMap<>();
  11.         // TODO 配置属性:Kafka服务器集群地址
  12.         configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  13.         // TODO 配置属性:Kafka生产的数据为KV对,所以在生产数据进行传输前需要分别对K,V进行对应的序列化操作
  14.         configMap.put(
  15.                 ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
  16.                 "org.apache.kafka.common.serialization.StringSerializer");
  17.         configMap.put(
  18.                 ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
  19.                 "org.apache.kafka.common.serialization.StringSerializer");
  20.         // TODO 创建Kafka生产者对象,建立Kafka连接
  21.         //      构造对象时,需要传递配置参数
  22.         KafkaProducer<String, String> producer = new KafkaProducer<>(configMap);
  23.         // TODO 准备数据,定义泛型
  24.         //      构造对象时需要传递 【Topic主题名称】,【Key】,【Value】三个参数
  25.         ProducerRecord<String, String> record = new ProducerRecord<String, String>(
  26.                 "test", "key1", "value1"
  27.         );
  28.         // TODO 生产(发送)数据
  29.         producer.send(record);
  30.         // TODO 关闭生产者连接
  31.         producer.close();
  32.     }
  33. }
复制代码

1.2.4 消费数据

消息已经通过Kafka生产者客户端发送到Kafka服务器中了。那么此时,这个消息就会暂存在Kafka中,我们也就可以通过Kafka消费者客户端对服务器指定主题的消息举行消费了。
1.2.4.1 下令行利用



  • 打开DOS窗口,进入e:/kafka_2.12-3.6.1/bin/windows目录
  • DOS窗口输入指令,进入消费者控制台
Kafka是通过kafka-console-consumer.bat文件举行消息消费者利用的。
调用指令时,需要传递多个参数,而且参数的前缀为两个横线,因为参数比力多。为了演示方便,这里我们只说明必须传递的参数,其他参数后面课程中会举行解说
-bootstrap-server : 把当前的DOS窗口当成Kafka的客户端,那么举行利用前,就需要毗连服务器,这里的参数就体现服务器的毗连方式,因为我们在本机启动Kafka服务进程,且Kafka默认端口为9092,所以此处,后面接的参数值为localhost:9092,用空格隔开。早期版本的Kafka也可以通过 --broker-list参数举行毗连,当前版本已经不保举使用了。
–topic : 主题的名称,后面接的参数值就是之前已经创建好的主题名称。实在这个参数并不是必须传递的参数,因为假如不传递这个参数的话,那么消费者会消费全部主题的消息。假如传递这个参数,那么消费者只能消费到指定主题的消息数据。
–from-beginning : 从第一条数据开始消费,无参数值,是一个标记参数。默认情况下,消费者客户端毗连上服务器后,是不会消费到毗连之前所生产的数据的。也就意味着假如生产者客户端在消费者客户端毗连前已经生产了数据,那么这部门数据消费者是无法正常消费到的。所以在实际环境中,应该是先启动消费者客户端,再启动生产者客户端,包管消费数据的完整性。增长参数后,Kafka就会从第一条数据开始消费,包管消息数据的完整性。
  1. # 指令
  2. kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test
  3. --from-beginning
复制代码


1.2.4.2 Java API

一般情况下,我们可以通过Java程序来消费(获取)数据,所以接下来,我们就演示一下IDEA中Kafka Java API怎样消费数据:


  • 创建Maven项目并增长Kafka依赖
  • 创建com.atguigu.kafka.test.KafkaConsumerTest类

  • 添加main方法,并增长消费者代码


  1. package com.atguigu.kafka.test;
  2. import org.apache.kafka.clients.consumer.ConsumerConfig;
  3. import org.apache.kafka.clients.consumer.ConsumerRecord;
  4. import org.apache.kafka.clients.consumer.ConsumerRecords;
  5. import org.apache.kafka.clients.consumer.KafkaConsumer;
  6. import java.time.Duration;
  7. import java.util.Arrays;
  8. import java.util.Collections;
  9. import java.util.HashMap;
  10. import java.util.Map;
  11. public class KafkaConsumerTest {
  12.     public static void main(String[] args) {
  13.         // TODO 配置属性集合
  14.         Map<String, Object> configMap = new HashMap<String, Object>();
  15.         // TODO 配置属性:Kafka集群地址
  16.         configMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  17.         // TODO 配置属性: Kafka传输的数据为KV对,所以需要对获取的数据分别进行反序列化
  18.         configMap.put(
  19.                 ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
  20.                 "org.apache.kafka.common.serialization.StringDeserializer");
  21.         configMap.put(
  22.                 ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
  23.                 "org.apache.kafka.common.serialization.StringDeserializer");
  24.         // TODO 配置属性: 读取数据的位置 ,取值为earliest(最早),latest(最晚)
  25.         configMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
  26.         // TODO 配置属性: 消费者组
  27.         configMap.put("group.id", "atguigu");
  28.         // TODO 配置属性: 自动提交偏移量
  29.         configMap.put("enable.auto.commit", "true");
  30.         KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configMap);
  31.         // TODO 消费者订阅指定主题的数据
  32.         consumer.subscribe(Collections.singletonList("test"));
  33.         while ( true ) {
  34.             // TODO 每隔100毫秒,抓取一次数据
  35.             ConsumerRecords<String, String> records =
  36.                 consumer.poll(Duration.ofMillis(100));
  37.             // TODO 打印抓取的数据
  38.             for (ConsumerRecord<String, String> record : records) {
  39.                 System.out.println("K = " + record.key() + ", V = " + record.value());
  40.             }
  41.         }
  42.     }
  43. }
复制代码

我使用教学的方式有点题目,我使用了Properties
  1. // TODO 创建配置对象
  2.         // TODO 对生产的数据 K,V 进行序列化的操作
  3.         Properties properties = new Properties();
  4.         properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
  5.         properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
  6.         properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
复制代码
  1. // TODO 创建配置对象
  2.         // TODO 对数据 K,V 进行序列化的操作
  3.         Properties properties = new Properties();
  4.         properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
  5.         properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  6.         properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
  7.         properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"atguigu");
复制代码

1.2.5 源码关联(可选)

将源码压缩包kafka-3.6.1-src.tgz解压缩到指定位置

Kafka3.6.1的源码需要使用JDK17和Scala2.13举行编译才能查看,所以需要举行安装
1.2.5.1 安装Java17

(1) 再资料文件夹中双击安装包jdk-17_windows-x64_bin.exe

(2) 根据安装提示安装即可。
1.2.5.2 安装Scala

(1) 进入Scala官方网站https://www.scala-lang.org/下载Scala压缩包scala-2.13.12.zip。
(2) 在IDEA中安装Scala插件

(3) 项目配置中关联Scala就可以了


1.2.5.3 安装Gradle

(1) 进入Gradle官方网站https://gradle.org/releases/下载Gradle安装包,根据自己需要选择差别版本举行下载。下载后将Gradle文件解压到相应目录
(2) 新增系统环境GRADLE_HOME,指定gradle安装路径,并将%GRADLE_HOME%\bin添加到path中
(3) Gradle安装及环境变量配置完成之后,打开Windows的cmd下令窗口,输入gradle –version

(4) 在解压缩目录中打开下令行,依次执行gradle idea下令

(5) 在下令行中执行gradle build --exclude-task test下令

(6) 使用IDE工具IDEA打开该项目目录


1.2.6 总结

   本章作为Kafka软件的入门章节,先容了一些消息传输系统中的根本概念以及单机版Windows系统中Kafka软件的根本利用。假如仅从利用上,感觉Kafka和数据库的功能还是有点像的。比如:
  

  • 数据库可以创建表保存数据,kafka可以创建主题保存消息。
  • Java客户端程序可以通过JDBC访问数据库:保存数据、修改数据、查询数据,kafka可以通过生产者客户端生产数据,通过消费者客户端消费数据。
    从这几点来看,确实有相像的地方,但实在两者的本质并不一样:
  • 数据库的本质是为了更好的组织和管理数据,所以关注点是怎样设计更好的数据模型用于保存数据,包管焦点的业务数据不丢失,如许才能准确地对数据举行利用。
  • Kafka的本质是为了高效地传输数据。所以软件的侧重点是怎样优化传输的过程,让数据更快,更安全地举行系统间的传输。
   通过以上的先容,你会发现,两者的区别还是很大的,不能等量齐观。接下来的章节我们会给大家详细解说Kafka在分布式环境中是怎样高效地传输数据的。
  


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

用户国营

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表