从零开始学Spring Boot系列-集成Kafka

打印 上一主题 下一主题

主题 908|帖子 908|积分 2724

Kafka简介

Apache Kafka是一个开源的分布式流处理平台,由LinkedIn公司开发和维护,后来捐赠给了Apache软件基金会。Kafka主要用于构建实时数据管道和流应用。它类似于一个分布式、高吞吐量的发布-订阅消息系统,可以处理消费者网站的全部动作流数据。这种动作流数据包括页面欣赏、搜索和其他用户的行动。通过这些数据,Kafka能够实时地将数据流传输到系统和应用上。
Kafka的主要特性包括:

  • 高吞吐量:Kafka以高吞吐量处理数据,纵然黑白常大量的数据也能轻松应对。
  • 分布式:Kafka是分布式的,可以在多个节点上运行,从而实现高可用性和容错性。
  • 持久性:Kafka将数据持久化到磁盘,因此纵然系统崩溃,数据也不会丢失。
  • 实时性:Kafka可以实时处理数据,为实时分析、监控和报警等应用提供了强盛的支持。
Ubuntu安装Kafka

本文是在wsl2上的Ubuntu 22.04上安装Kafka。你必要先安装Java环境,因为Kafka是用Java编写的。然后,你可以从Apache Kafka的官方网站下载并安装Kafka。以下是安装步骤:

  • 安装Java环境:你可以使用apt-get命令安装OpenJDK。
    1. sudo apt-get update  
    2. sudo apt-get install openjdk-17-jdk
    复制代码
  • 下载Kafka:从Apache Kafka的官方网站下载得当你操作系统的版本。下载完成后,解压到指定目录。
    1.    wget https://mirrors.aliyun.com/apache/kafka/3.7.0/kafka_2.13-3.7.0.tgz
    2.    tar -xzf kafka_2.13-3.7.0.tgz  
    3.    mv kafka_2.13-3.7.0 kafka
    4.    cd kafka
    复制代码
3.启动Kafka:Kafka依赖于ZooKeeper,所以你必要先启动ZooKeeper,然后再启动Kafka。
  1. # 启动ZooKeeper  
  2. bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
  3. # 启动Kafka  
  4. nohup bin/kafka-server-start.sh config/server.properties 2>&1 &
复制代码
4.使用kafka客户端连接
Spring Boot集成Kafka


  • 添加依赖:在你的Spring Boot项目的build.gradle 文件中添加Kafka的依赖。
    1. dependencies {
    2. implementation 'org.springframework.boot:spring-boot-starter-web'
    3. compileOnly 'org.projectlombok:lombok'
    4. annotationProcessor 'org.projectlombok:lombok'
    5. implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
    6. runtimeOnly 'mysql:mysql-connector-java:8.0.17'
    7. implementation 'org.mybatis.spring.boot:mybatis-spring-boot-starter:3.0.3'
    8. implementation 'org.springframework.boot:spring-boot-starter-data-redis'
    9. implementation 'org.apache.commons:commons-pool2'
    10. implementation 'org.springframework.kafka:spring-kafka'
    11. }
    复制代码
  • 配置Kafka:在application.properties或application.yml文件中配置Kafka的相关属性,如broker地址、端口、topic等。
    1. spring.kafka.bootstrap-servers=localhost:9092  
    2. spring.kafka.consumer.group-id=my-group  
    3. spring.kafka.consumer.auto-offset-reset=earliest  
    4. spring.kafka.template.default-topic=my-topic
    复制代码
  • 创建生产者:使用KafkaTemplate发送消息到Kafka。
    1. package cn.daimajiangxin.springboot.learning.kafka;
    2. import jakarta.annotation.Resource;
    3. import org.springframework.kafka.core.KafkaTemplate;
    4. import org.springframework.stereotype.Service;
    5. @Service
    6. public class KafkaProducer {
    7.      @Resource
    8.      private KafkaTemplate<String, String> kafkaTemplate;
    9.      public void sendMessage(String message) {
    10.          kafkaTemplate.send("my-topic", message);
    11.      }
    12. }
    复制代码
  • 创建消费者:使用@KafkaListener注解监听Kafka中的消息。
    1. package cn.daimajiangxin.springboot.learning.kafka;
    2. import org.springframework.kafka.annotation.KafkaListener;
    3. import org.springframework.stereotype.Service;
    4. @Service
    5. public class KafkaConsumer {
    6.     @KafkaListener(topics = "my-topic", groupId = "my-group")
    7.     public void consume(String message) {
    8.         System.out.println("Received message: " + message);
    9.     }
    10. }
    复制代码
  • 创建控制器:KafkaController
    1. package cn.daimajiangxin.springboot.learning.controller;
    2. import cn.daimajiangxin.springboot.learning.kafka.KafkaProducer;
    3. import org.springframework.beans.factory.annotation.Autowired;
    4. import org.springframework.web.bind.annotation.GetMapping;
    5. import org.springframework.web.bind.annotation.RestController;
    6. @RestController
    7.      public class KafkaController {
    8.      private final  KafkaProducer kafkaProducer;
    9.          @Autowired
    10.          public KafkaController(KafkaProducer kafkaProducer) {
    11.              this.kafkaProducer = kafkaProducer;
    12.          }
    13.          @GetMapping("/kafka")
    14.          public void kafka() {
    15.              kafkaProducer.sendMessage("Hello World");
    16.          }
    17.      }
    复制代码
现在,你的Spring Boot应用已经集成了Kafka,你可以通过生产者发送消息,并通过消费者接收并处理这些消息了。


总结

以上就是关于从零开始学Spring Boot系列文章——集成Kafka的简介。Kafka作为一个强盛的分布式流处理平台,与Spring Boot的集成可以极大地简化实时数据处理应用的开发。希望这篇文章能帮助你更好地明白Kafka及其在Spring Boot项目中的应用。
我是代码匠心,和我一起学习更多精彩知识!!!扫描二维码!关注我,实时获取推送。

源文来自:https://daimajiangxin.cn

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

北冰洋以北

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表