SpingBoot集成kafka-发送读取消息示例

打印 上一主题 下一主题

主题 517|帖子 517|积分 1551

kafka的几个常见概念


1、springboot和kafka对应版本(重要)

https://spring.io/projects/spring-kafka


2、创建springboot项目,引入kafka依靠


2.1、生产者EventProducer

  1. package com.power.producer;
  2. import org.springframework.kafka.core.KafkaTemplate;
  3. import org.springframework.stereotype.Component;
  4. import javax.annotation.Resource;
  5. @Component
  6. public class EventProducer {
  7.     @Resource
  8.     private KafkaTemplate<String,String> kafkaTemplate;
  9.     public void sendEvent(){
  10.         kafkaTemplate.send("hello-topic","hello kafka");
  11.     }
  12. }
复制代码
2.2、消耗者EventConsumer

  1. package com.power.consumer;
  2. import org.springframework.kafka.annotation.KafkaListener;
  3. import org.springframework.stereotype.Component;
  4. @Component
  5. public class EventConsumer {
  6.     //采用监听的方式接收事件(消息,数据)
  7.     @KafkaListener(topics = {"hello-topic"},groupId="hello-group")
  8.     public void onEvent(String event){
  9.         System.out.println("读取到的事件:"+event);
  10.     }
  11. }
复制代码
2.3、启动生产者的方法SpringBoot01KafkaBaseApplication

实行一次该方法,会调用一次生产者发送一次消息。
即每实行一次,会调用EventProducer类下的sendEvent方法一次。
  1. package com.power;
  2. import com.power.producer.EventProducer;
  3. import org.junit.jupiter.api.Test;
  4. import org.springframework.boot.test.context.SpringBootTest;
  5. import javax.annotation.Resource;
  6. @SpringBootTest
  7. public class SpringBoot01KafkaBaseApplication {
  8.     @Resource
  9.     private EventProducer eventProducer;
  10.     @Test
  11.     void test01(){
  12.         eventProducer.sendEvent();
  13.     }
  14. }
复制代码
2.4、application.yml

  1. spring:
  2.   application:
  3.     #应用名称
  4.     name: spring-boot-01-kafka-base
  5.   #kafka连接地址(ip+port)
  6.   kafka:
  7.     bootstrap-servers: <你的服务器ip>:9092
  8.     #配置生产者(有24个配置)
  9.     #producer:
  10.     #配置消费者(有24个配置)
  11.     #consumer:
复制代码
2.5、pom.xml

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3.          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4.          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5.     <modelVersion>4.0.0</modelVersion>
  6.     <parent>
  7.         <groupId>org.springframework.boot</groupId>
  8.         <artifactId>spring-boot-starter-parent</artifactId>
  9.         <version>2.7.2</version>
  10.         <relativePath />
  11.     </parent>
  12.     <groupId>org.powernode</groupId>
  13.     <artifactId>spring-boot-01-kafka-base</artifactId>
  14.     <version>0.0.1-SNAPSHOT</version>
  15.     <name>kafkaSpringBootProject</name>
  16.     <description>kafka project for Spring Boot</description>
  17.     <properties>
  18.         <java.version>8</java.version>
  19.     </properties>
  20.     <repositories>
  21.         <repository>
  22.             <id>central</id>
  23.             <name>aliyun maven</name>
  24.             <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
  25.             <layout>default</layout>
  26.             <!-- 是否开启发布版构件下载 -->
  27.             <releases>
  28.                 <enabled>true</enabled>
  29.             </releases>
  30.             <!-- 是否开启快照版构件下载 -->
  31.             <snapshots>
  32.                 <enabled>false</enabled>
  33.             </snapshots>
  34.         </repository>
  35.     </repositories>
  36.     <dependencies>
  37.         <dependency>
  38.             <groupId>org.springframework.boot</groupId>
  39.             <artifactId>spring-boot-starter</artifactId>
  40.         </dependency>
  41.         <!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
  42.         <dependency>
  43.             <groupId>org.springframework.kafka</groupId>
  44.             <artifactId>spring-kafka</artifactId>
  45.             <version>2.8.0</version>
  46.         </dependency>
  47.         <dependency>
  48.             <groupId>org.springframework.boot</groupId>
  49.             <artifactId>spring-boot-devtools</artifactId>
  50.             <scope>runtime</scope>
  51.             <optional>true</optional>
  52.         </dependency>
  53.         <dependency>
  54.             <groupId>org.springframework.boot</groupId>
  55.             <artifactId>spring-boot-starter-test</artifactId>
  56.             <scope>test</scope>
  57.         </dependency>
  58.         <!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka-test -->
  59.         <dependency>
  60.             <groupId>org.springframework.kafka</groupId>
  61.             <artifactId>spring-kafka-test</artifactId>
  62.             <version>2.8.0</version>
  63.             <scope>test</scope>
  64.         </dependency>
  65.     </dependencies>
  66.     <build>
  67.         <plugins>
  68.             <plugin>
  69.                 <groupId>org.springframework.boot</groupId>
  70.                 <artifactId>spring-boot-maven-plugin</artifactId>
  71.                 <configuration>
  72.                     <excludes>
  73.                         <exclude>
  74.                             <groupId>org.projectlombok</groupId>
  75.                             <artifactId>lombok</artifactId>
  76.                         </exclude>
  77.                     </excludes>
  78.                 </configuration>
  79.             </plugin>
  80.         </plugins>
  81.     </build>
  82. </project>
复制代码
2.6、启动springboot项目标启动类(Application)报错

项目启动类
  1. package com.power;
  2. import org.springframework.boot.SpringApplication;
  3. import org.springframework.boot.autoconfigure.SpringBootApplication;
  4. @SpringBootApplication
  5. public class Application {
  6.         public static void main(String[] args) {
  7.                 SpringApplication.run(Application.class, args);
  8.                 System.out.println("启动成功--------------------------");
  9.         }
  10. }
复制代码
启动服务后发现报错:

修改server.properties设置文件:
修改前:

修改后:

3、springboot集成kafka读取最早的消息

已经被消耗者读取/消耗的消息,无法被新启动的消耗组消息的,那么新启动的消耗组该如何读取最早的消息呢,可以通过设置消耗者auto-offset-reset: earliest去实现。

3.1、如何设置消耗者auto-offset-reset: earliest


1、修改application.yml

3.2、设置消耗者auto-offset-reset: earliest后存在的题目


3.2.1、修改消耗组ID

原消耗组ID

修改后的消耗组ID
4、新的消耗组ID乐成读取到之前的消息

3.2.2、手动重置偏移量

3.2.2.1、手动将偏移量设置为最早

  1. #示例:./kafka-consumer-groups.sh --bootstrap-server <your-kafka-bootstrap-servers> --group <your-consumer-group> --topic <your-topic> --reset-offsets --to-earliest --execute
复制代码
来到kafka安装目录下:
实行如下下令:
  1. ./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group hello-group --topic hello-topic --reset-offsets --to-earliest --execute
复制代码
实行后报错

需要先停掉服务,在去手动重置偏移量,此时重置偏移量乐成,偏移量为0

3.2.2.2、手动将偏移量设置为最新

  1. #示例:./kafka-consumer-groups.sh --bootstrap-server <your-kafka-bootstrap-servers> --group <your-consumer-group> --topic <your-topic> --reset-offsets --to-latest --execute
复制代码
  1. ./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group hello-group --topic hello-topic --reset-offsets --to-latest --execute
复制代码
设置乐成,此时偏移量已为最新:


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

南飓风

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表