摘要
Kafka是当下非常流行的消息中心件,据官网透露,已有成千上万的公司在使用它。最近实践了一波Kafka,确实很好很强大。今天我们来从三个方面学习下Kafka:Kafaka在Linux下的安装,Kafka的可视化工具,Kafka和SpringBoot结合使用。希望大家看完后能快速入门Kafka,掌握这个流行的消息中心件!
Kafka简介
Kafka是由LinkedIn公司开辟的一款开源分布式消息流平台,由Scala和Java编写。重要作用是为处理及时数据提供一个统一、高吞吐、低延迟的平台,其本质是基于发布订阅模式的消息引擎系统。
Kafka具有以下特性:
- 高吞吐、低延迟:Kafka收发消息非常快,使用集群处理消息延迟可低至2ms。
- 高扩展性:Kafka可以弹性地扩展和收缩,可以扩展到上千个broker,数十万个partition,每天处理数万亿条消息。
- 永久存储:Kafka可以将数据安全地存储在分布式的,长期的,容错的群集中。
- 高可用性:Kafka在可用区上可以有效地扩展群集,某个节点宕机,集群照样能够正常工作。
Kafka安装
我们将接纳Linux下的安装方式,安装环境为CentOS 7.6。此处没有接纳Docker来安装摆设,个人感觉直接安装更简单(重要是官方没提供Docker镜像)!
- 首先我们必要下载Kafka的安装包,下载地点:mirrors.bfsu.edu.cn/apache/kafk…
- bash复制代码cd /mydata/kafka/
- tar -xzf kafka_2.13-2.8.0.tgz
复制代码
- bash
- 复制代码cd kafka_2.13-2.8.0
复制代码
- 固然有消息称Kafka即将移除Zookeeper,但是在Kafka最新版本中尚未移除,以是启动Kafka前照旧必要先启动Zookeeper;
- 启动Zookeeper服务,服务将运行在2181端口;
- bash复制代码# 后台运行服务,并把日志输出到当前文件夹下的zookeeper-out.file文件中
- nohup bin/zookeeper-server-start.sh config/zookeeper.properties > zookeeper-out.file 2>&1 &
复制代码
- 由于目前Kafka是摆设在Linux服务器上的,外网如果想要访问,必要修改Kafka的设置文件config/server.properties,修改下Kafka的监听地点,否则会无法连接;
- properties复制代码############################# Socket Server Settings #############################
- # The address the socket server listens on. It will get the value returned from
- # java.net.InetAddress.getCanonicalHostName() if not configured.
- # FORMAT:
- # listeners = listener_name://host_name:port
- # EXAMPLE:
- # listeners = PLAINTEXT://your.host.name:9092
- listeners=PLAINTEXT://192.168.5.78:9092
复制代码
- 末了启动Kafka服务,服务将运行在9092端口。
- bash复制代码# 后台运行服务,并把日志输出到当前文件夹下的kafka-out.file文件中
- nohup bin/kafka-server-start.sh config/server.properties > kafka-out.file 2>&1 &
复制代码 Kafka命令行操纵
接下来我们使用命令行来操纵下Kafka,认识下Kafka的使用。
- 首先创建一个叫consoleTopic的Topic;
- bash
- 复制代码bin/kafka-topics.sh --create --topic consoleTopic --bootstrap-server 192.168.5.78:9092
复制代码
- bash
- 复制代码bin/kafka-topics.sh --describe --topic consoleTopic --bootstrap-server 192.168.5.78:9092
复制代码
- bash复制代码Topic: consoleTopic TopicId: tJmxUQ8QRJGlhCSf2ojuGw PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824
- Topic: consoleTopic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
复制代码
- bash
- 复制代码bin/kafka-console-producer.sh --topic consoleTopic --bootstrap-server 192.168.5.78:9092
复制代码
- 重新打开一个窗口,通过如下命令可以从Topic中获取消息:
- bash
- 复制代码bin/kafka-console-consumer.sh --topic consoleTopic --from-beginning --bootstrap-server 192.168.5.78:9092
复制代码
Kafka可视化
使用命令行操纵Kafka确实有点麻烦,接下来我们试试可视化工具kafka-eagle。
安装JDK
如果你使用的是CentOS的话,默认没有安装完备版的JDK,必要自行安装!
- 下载JDK 8,下载地点:mirrors.tuna.tsinghua.edu.cn/AdoptOpenJD…
- bash复制代码cd /mydata/java
- tar -zxvf OpenJDK8U-jdk_x64_linux_xxx.tar.gz
- mv OpenJDK8U-jdk_x64_linux_xxx.tar.gz jdk1.8
复制代码
- 在/etc/profile文件中添加环境变量JAVA_HOME。
- bash复制代码vi /etc/profile
- # 在profile文件中添加
- export JAVA_HOME=/mydata/java/jdk1.8
- export PATH=$PATH:$JAVA_HOME/bin
- # 使修改后的profile文件生效
- . /etc/profile
复制代码 安装kafka-eagle
- 下载kafka-eagle的安装包,下载地点:github.com/smartloli/k…
- 下载完成后将kafka-eagle解压到指定目次;
- bash复制代码cd /mydata/kafka/
- tar -zxvf kafka-eagle-web-2.0.5-bin.tar.gz
复制代码
- 在/etc/profile文件中添加环境变量KE_HOME;
- bash复制代码vi /etc/profile
- # 在profile文件中添加
- export KE_HOME=/mydata/kafka/kafka-eagle-web-2.0.5
- export PATH=$PATH:$KE_HOME/bin
- # 使修改后的profile文件生效
- . /etc/profile
复制代码
- 安装MySQL并添加数据库ke,kafka-eagle之后会用到它;
- 修改设置文件$KE_HOME/conf/system-config.properties,重要是修改Zookeeper的设置和数据库设置,注释掉sqlite设置,改为使用MySQL;
- properties复制代码######################################
- # multi zookeeper & kafka cluster list
- ######################################
- kafka.eagle.zk.cluster.alias=cluster1
- cluster1.zk.list=localhost:2181
- ######################################
- # kafka eagle webui port
- ######################################
- kafka.eagle.webui.port=8048
- ######################################
- # kafka sqlite jdbc driver address
- ######################################
- # kafka.eagle.driver=org.sqlite.JDBC
- # kafka.eagle.url=jdbc:sqlite:/hadoop/kafka-eagle/db/ke.db
- # kafka.eagle.username=root
- # kafka.eagle.password=www.kafka-eagle.org
- ######################################
- # kafka mysql jdbc driver address
- ######################################
- kafka.eagle.driver=com.mysql.cj.jdbc.Driver
- kafka.eagle.url=jdbc:mysql://localhost:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
- kafka.eagle.username=root
- kafka.eagle.password=root
复制代码
- bash
- 复制代码$KE_HOME/bin/ke.sh start
复制代码
- 命令实行完成后会显示如下信息,但并不代表服务已经启动乐成,还必要等待一会;
- bash复制代码# 停止服务
- $KE_HOME/bin/ke.sh stop
- # 重启服务
- $KE_HOME/bin/ke.sh restart
- # 查看服务运行状态
- $KE_HOME/bin/ke.sh status
- # 查看服务状态
- $KE_HOME/bin/ke.sh stats
- # 动态查看服务输出日志
- tail -f $KE_HOME/logs/ke_console.out
复制代码
- 启动乐成可以直接访问,输入账号密码admin:123456,访问地点:http://192.168.5.78:8048/
- 登录乐成后可以访问到Dashboard,界面照旧很棒的!
可视化工具使用
- 之前我们使用命令行创建了Topic,这里可以直接通过界面来创建;
- 我们还可以直接通过kafka-eagle来发送消息;
- bash
- 复制代码bin/kafka-console-consumer.sh --topic testTopic --from-beginning --bootstrap-server 192.168.5.78:9092
复制代码
- 还有一个很有意思的功能叫KSQL,可以通过SQL语句来查询Topic中的消息;
- 可视化工具自然少不了监控,如果你想开启kafka-eagle对Kafka的监控功能的话,必要修改Kafka的启动脚本,袒露JMX的端口;
- bash复制代码vi kafka-server-start.sh
- # 暴露JMX端口
- if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
- export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
- export JMX_PORT="9999"
- fi
复制代码
- 还有Zookeeper的命令行功能,总之功能很全,很强大!
SpringBoot整合Kafka
在SpringBoot中操纵Kafka也是非常简单的,比如Kafka的消息模式很简单,没有队列,只有Topic。
- 首先在应用的pom.xml中添加Spring Kafka依靠;
- xml复制代码<!--Spring整合Kafka-->
- <dependency>
- <groupId>org.springframework.kafka</groupId>
- <artifactId>spring-kafka</artifactId>
- <version>2.7.1</version>
- </dependency>
复制代码
- 修改应用设置文件application.yml,设置Kafka服务地点及consumer的group-id;
- yaml复制代码server:
- port: 8088
- spring:
- kafka:
- bootstrap-servers: '192.168.5.78:9092'
- consumer:
- group-id: "bootGroup"
复制代码
- 创建一个生产者,用于向Kafka的Topic中发送消息;
- java复制代码/**
- * Kafka消息生产者
- * Created by macro on 2021/5/19.
- */
- @Component
- public class KafkaProducer {
- @Autowired
- private KafkaTemplate kafkaTemplate;
- public void send(String message){
- kafkaTemplate.send("bootTopic",message);
- }
- }
复制代码
- 创建一个消耗者,用于从Kafka中获取消息并消耗;
- java复制代码/**
- * Kafka消息消费者
- * Created by macro on 2021/5/19.
- */
- @Slf4j
- @Component
- public class KafkaConsumer {
- @KafkaListener(topics = "bootTopic")
- public void processMessage(String content) {
- log.info("consumer processMessage : {}",content);
- }
- }
复制代码
- java复制代码/**
- * Kafka功能测试
- * Created by macro on 2021/5/19.
- */
- @Api(tags = "KafkaController", description = "Kafka功能测试")
- @Controller
- @RequestMapping("/kafka")
- public class KafkaController {
- @Autowired
- private KafkaProducer kafkaProducer;
- @ApiOperation("发送消息")
- @RequestMapping(value = "/sendMessage", method = RequestMethod.GET)
- @ResponseBody
- public CommonResult sendMessage(@RequestParam String message) {
- kafkaProducer.send(message);
- return CommonResult.success(null);
- }
- }
复制代码
- 项目控制台会输出如下信息,表明消息已经被接收并消耗掉了。
- bash
- 复制代码2021-05-19 16:59:21.016 INFO 2344 --- [ntainer#0-0-C-1] c.m.mall.tiny.component.KafkaConsumer : consumer processMessage : Spring Boot message!
复制代码 总结
通过本文的一波实践,大家根本就能入门Kafka了。安装、可视化工具、结合SpringBoot,这些根本都是和开辟者相关的操纵,也是学习Kafka的必经之路。
参考资料
- Kafka官方文档:kafka.apache.org/quickstart
- kafka-eagle官方文档:www.kafka-eagle.org/articles/do…
- Kafka相关概念:juejin.cn/post/684490…
项目源码地点
github.com/macrozheng/…
本文 GitHub github.com/macrozheng/… 已经收录,欢迎大家Star!
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |