利用SpringBoot对接Kafka

打印 上一主题 下一主题

主题 857|帖子 857|积分 2571

Kafka是什么,以及怎样利用SpringBoot对接Kafka




一、Kafka与流处理

我们先来看看比较正式的介绍:Kafka是一种流处理平台,由LinkedIn公司创建,如今是Apache下的开源项目。Kafka通过发布/订阅机制实现消息的异步传输和处理。它具有高吞吐量、低延迟、可伸缩性和可靠性等长处,使其成为了流处理和实时数据管道的首选解决方案
介绍实在是比较清楚的,如果你是第一次接触“流处理”概念,我们也可以做一点解释,流处理指的是对连续、实时产生的数据流举行实时处理、盘算和分析的过程。
   假设你正在玩一款在线游戏,其他玩家的动作和游戏事件会实时地传到服务器上。这些事件就形成了一条数据流。在流处理中,我们会对这条数据流举行实时处理,比方盘算每个玩家的分数、监控游戏地区内的非常情况、统计玩家在线时长等等。如许,游戏管理员就可以实时地监控和管理游戏,而不需要等到游戏结束才举行操纵。
雷同的,流处理还可以应用在其他实时性要求比较高的场景中,比方金融生意业务、物联网、实时监测等。通过对数据流举行实时处理,我们可以更加精准地把握数据变化的情况,并实时做出反应和调解,
  二、Spring Boot与Kafka的整合Demo

1. 新建springboot工程

如果你没有现成的Spring boot项目,那么我们可以利用IDEA自带的Spring Initializr 来创建一个spring-boot的项目

此时我们可以直接选择利用Apache Kafka,另外项目还可以加个Spring Web准备让前台调用

2. 添加Kafka依赖

如果你不是像上述一样新建的项目,那你也可以选择在已有的Spring Boot应用程序中利用Kafka,那么你需要在pom.xml文件中添加以下依赖:
  1. <dependency>
  2.       <groupId>org.springframework.kafka</groupId>
  3.       <artifactId>spring-kafka</artifactId>
  4.       <version>2.8.11</version>
  5. </dependency>
复制代码
3. 配置Kafka

在application.properties文件中添加以下配置:
  1. spring.kafka.bootstrap-servers=localhost:9092
  2. spring.kafka.consumer.group-id=test_group
复制代码
这里我们指定了Kafka服务器的地址和端口,并配置了消耗者组的ID,关于消耗者组的概念,实在就是某一些消耗者具备雷同的功能,因此会把他们设为同一个消耗者组,如许他们就不会重复消耗同一条消息了。更详细地原理,我们会在之后地篇章中介绍。
4. 创建Kafka生产者

在Kafka中,生产者是发送消息的应用程序或服务。在Spring Boot中,我们可以利用KafkaTemplate类来创建Kafka生产者
  1. package com.zhanfu.kafkademo.service;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.kafka.core.KafkaTemplate;
  4. import org.springframework.stereotype.Service;
  5. @Service
  6. public class KafkaService {
  7.    
  8.     @Autowired
  9.     private KafkaTemplate<String, String> kafkaTemplate;
  10.     public void sendMessage(String message) {
  11.         kafkaTemplate.send("test_topic", message);
  12.     }
  13. }
复制代码
这里我们利用@Autowired注解来自动注入KafkaTemplate,并利用send方法将消息发送到名为“test_topic”的Kafka主题中。
5. 创建Kafka消耗者

在Kafka中,消耗者是吸取并处理订阅主题消息的应用程序或服务。在Spring Boot中,我们可以利用@KafkaListener注解来创建Kafka消耗者。
  1. package com.zhanfu.kafkademo.listener;
  2. import org.springframework.kafka.annotation.KafkaListener;
  3. import org.springframework.stereotype.Component;
  4. @Component
  5. public class KafkaLis {
  6.     @KafkaListener(topics = "test_topic", groupId = "test_group")
  7.     public void receiveMessage(String message) {
  8.         System.out.println("Received message: " + message);
  9.     }
  10. }
复制代码
6. 应用程序入口

如今我们已经完成了Spring Boot和Kafka的整合。我们可以启动Spring Boot应用程序,然后发送消息并消耗它,以测试我们的应用程序是否正确地与Kafka集成。
  1. package com.zhanfu.kafkademo.controller;
  2. import com.zhanfu.kafkademo.service.KafkaService;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.web.bind.annotation.GetMapping;
  5. import org.springframework.web.bind.annotation.PathVariable;
  6. import org.springframework.web.bind.annotation.RestController;
  7. @RestController
  8. public class MessageController {
  9.     @Autowired
  10.     private KafkaService kafkaService;
  11.     @GetMapping("/send/{message}")
  12.     public String sendMessage(@PathVariable String message) {
  13.         kafkaService.sendMessage(message);
  14.         return "Message sent successfully";
  15.     }
  16. }
复制代码
在这个例子中,我们利用@Autowired注解来自动注入KafkaProducer,并通过发送消息的方法来调用sendMessage方法。终极项目整体框架如图:

三、启动与验证

起首自然是启动 Kafka ,然后是启动我们的Spring Boot项目

然后在欣赏器中输入
  1. http://127.0.0.1:8080/send/hello
复制代码

末了查抄我们的项目日记:

可以看到,整个发送和吸取的流程都走通了
四、KafkaTemplate 介绍

不难看出,在Springboot中,利用kafka的关键在于 KafkaTemplate, 它是 Spring 提供的 Kafka 生产者模版,用于向 Kafka 集群发送消息。并且把 Kafka 的生产者客户端封装成了一个 Spring Bean,提供更加方便易用的 API。
它有三个主要属性:
        producerFactory:生产者工厂类,用于创建 KafkaProducer 实例。
        defaultTopic:默认主题名称,如果在发送消息时没有指定主题名称,则利用该默认主题。
        messageConverter:消息转换器,用于将消息对象转换为 Kafka ProducerRecord
它的主要方法:
        send(ProducerRecord<K,V> record):向指定的 Kafka 主题发送一条消息。ProducerRecord 包罗了主题名称、分区编号、Key 和 Value 等信息。
        send(String topic, V data):向指定的 Kafka 主题发送一条消息。
        send(String topic, K key, V data):向指定的 Kafka 主题发送一条消息,并指定消息的 Key。
        execute(ProducerCallback<K,V> callback):利用回调方式发送消息,可以自定义消息的创建过程和错误处理过程。
        inTransaction():启用事件,多个 send 方法调用将被包装在一个事件中,保证 Kafka 事件的原子性。
除了上述方法外,KafkaTemplate 还提供了其他方法,如 sendDefault()sendOffsetsToTransaction() 等,可以根据实际需要举行选择和利用。
需要注意的是,在利用 KafkaTemplate 发送消息时应该注意消息的序列化方式、主题和分区的选择以及错误处理等题目,以保证消息的可靠性和正确性。
固然,很多同学可能还注意到一个细节,我们在上面的Demo中,我们直接将其 @Autowired进我们的代码中,这是怎么做到的呢?换句话说,这个 KafkaTemplate 为什么自己就会被spring 容器管理的呢?实在这得益于SpringBoot中对Kafka有了很多自动配置的内容。如下:


如上图,相信对Spring Boot熟悉的同学看到 ConditionalOnClass ConditionalOnMissingBean 应该就明确了。实在Spring Boot 早就知心的为我们预留了这些自动配置,只要我们引入了 spring-kafka 包,使得项目中出现了 KafkaTemplate 类,那么它就能被自动配置并存入Spring 容器内
总结
        本日我们通过一个Demo讲解了在SpringBoot中怎样对接Kafka,也介绍了下关键类 KafkaTemplate ,得益于Spring Boot 的自动配置,开辟者要做的配置内容实在并不多,利用也主要是依赖其提供的API,相对简单,相信大家很容易也都学会了,那么在背面的过程中,我们将继承学习其利用,并且会着重讲解 Kafka 的原理与结构

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

冬雨财经

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表