kafka的几个常见概念
1、springboot和kafka对应版本(重要)
https://spring.io/projects/spring-kafka
2、创建springboot项目,引入kafka依靠
2.1、生产者EventProducer
- package com.power.producer;
- import org.springframework.kafka.core.KafkaTemplate;
- import org.springframework.stereotype.Component;
- import javax.annotation.Resource;
- @Component
- public class EventProducer {
- @Resource
- private KafkaTemplate<String,String> kafkaTemplate;
- public void sendEvent(){
- kafkaTemplate.send("hello-topic","hello kafka");
- }
- }
复制代码 2.2、消耗者EventConsumer
- package com.power.consumer;
- import org.springframework.kafka.annotation.KafkaListener;
- import org.springframework.stereotype.Component;
- @Component
- public class EventConsumer {
- //采用监听的方式接收事件(消息,数据)
- @KafkaListener(topics = {"hello-topic"},groupId="hello-group")
- public void onEvent(String event){
- System.out.println("读取到的事件:"+event);
- }
- }
复制代码 2.3、启动生产者的方法SpringBoot01KafkaBaseApplication
实行一次该方法,会调用一次生产者发送一次消息。
即每实行一次,会调用EventProducer类下的sendEvent方法一次。
- package com.power;
- import com.power.producer.EventProducer;
- import org.junit.jupiter.api.Test;
- import org.springframework.boot.test.context.SpringBootTest;
- import javax.annotation.Resource;
- @SpringBootTest
- public class SpringBoot01KafkaBaseApplication {
- @Resource
- private EventProducer eventProducer;
- @Test
- void test01(){
- eventProducer.sendEvent();
- }
- }
复制代码 2.4、application.yml
- spring:
- application:
- #应用名称
- name: spring-boot-01-kafka-base
- #kafka连接地址(ip+port)
- kafka:
- bootstrap-servers: <你的服务器ip>:9092
- #配置生产者(有24个配置)
- #producer:
- #配置消费者(有24个配置)
- #consumer:
复制代码 2.5、pom.xml
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-parent</artifactId>
- <version>2.7.2</version>
- <relativePath />
- </parent>
- <groupId>org.powernode</groupId>
- <artifactId>spring-boot-01-kafka-base</artifactId>
- <version>0.0.1-SNAPSHOT</version>
- <name>kafkaSpringBootProject</name>
- <description>kafka project for Spring Boot</description>
- <properties>
- <java.version>8</java.version>
- </properties>
- <repositories>
- <repository>
- <id>central</id>
- <name>aliyun maven</name>
- <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
- <layout>default</layout>
- <!-- 是否开启发布版构件下载 -->
- <releases>
- <enabled>true</enabled>
- </releases>
- <!-- 是否开启快照版构件下载 -->
- <snapshots>
- <enabled>false</enabled>
- </snapshots>
- </repository>
- </repositories>
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter</artifactId>
- </dependency>
- <!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
- <dependency>
- <groupId>org.springframework.kafka</groupId>
- <artifactId>spring-kafka</artifactId>
- <version>2.8.0</version>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-devtools</artifactId>
- <scope>runtime</scope>
- <optional>true</optional>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- <scope>test</scope>
- </dependency>
- <!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka-test -->
- <dependency>
- <groupId>org.springframework.kafka</groupId>
- <artifactId>spring-kafka-test</artifactId>
- <version>2.8.0</version>
- <scope>test</scope>
- </dependency>
- </dependencies>
- <build>
- <plugins>
- <plugin>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-maven-plugin</artifactId>
- <configuration>
- <excludes>
- <exclude>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- </exclude>
- </excludes>
- </configuration>
- </plugin>
- </plugins>
- </build>
- </project>
复制代码 2.6、启动springboot项目标启动类(Application)报错
项目启动类
- package com.power;
- import org.springframework.boot.SpringApplication;
- import org.springframework.boot.autoconfigure.SpringBootApplication;
- @SpringBootApplication
- public class Application {
- public static void main(String[] args) {
- SpringApplication.run(Application.class, args);
- System.out.println("启动成功--------------------------");
- }
- }
复制代码 启动服务后发现报错:
修改server.properties设置文件:
修改前:
修改后:
3、springboot集成kafka读取最早的消息
已经被消耗者读取/消耗的消息,无法被新启动的消耗组消息的,那么新启动的消耗组该如何读取最早的消息呢,可以通过设置消耗者auto-offset-reset: earliest去实现。
3.1、如何设置消耗者auto-offset-reset: earliest
1、修改application.yml
3.2、设置消耗者auto-offset-reset: earliest后存在的题目
3.2.1、修改消耗组ID
原消耗组ID
修改后的消耗组ID
4、新的消耗组ID乐成读取到之前的消息
3.2.2、手动重置偏移量
3.2.2.1、手动将偏移量设置为最早
- #示例:./kafka-consumer-groups.sh --bootstrap-server <your-kafka-bootstrap-servers> --group <your-consumer-group> --topic <your-topic> --reset-offsets --to-earliest --execute
复制代码 来到kafka安装目录下:
实行如下下令:
- ./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group hello-group --topic hello-topic --reset-offsets --to-earliest --execute
复制代码 实行后报错
需要先停掉服务,在去手动重置偏移量,此时重置偏移量乐成,偏移量为0
3.2.2.2、手动将偏移量设置为最新
- #示例:./kafka-consumer-groups.sh --bootstrap-server <your-kafka-bootstrap-servers> --group <your-consumer-group> --topic <your-topic> --reset-offsets --to-latest --execute
复制代码- ./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group hello-group --topic hello-topic --reset-offsets --to-latest --execute
复制代码 设置乐成,此时偏移量已为最新:
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
|