1、消耗者组
1.1、利用 efak 创建 主题 my_topic1 并建立6个分区并给每个分区建立3个副本
1.2、创建生产者发送消息
- package com.atguigu.spring.kafka.consumer;
- import jakarta.annotation.Resource;
- import org.junit.jupiter.api.Test;
- import org.springframework.boot.test.context.SpringBootTest;
- import org.springframework.kafka.core.KafkaTemplate;
- @SpringBootTest
- class SpringKafkaConsumerApplicationTests {
- @Resource
- KafkaTemplate kafkaTemplate;
- @Test
- void contextLoads() {
- for (int i = 0; i < 10; i++) {
- kafkaTemplate.send("my_topic1",i%6,"", "消费者组"+i);
- }
- }
- }
复制代码
1.3、application.yml配置
- server:
- port: 8120
- # v1
- spring:
- Kafka:
- bootstrap-servers: 192.168.74.148:9095,192.168.74.148:9096,192.168.74.148:9097
- consumer:
- # read-committed读事务已提交的消息 解决脏读问题
- isolation-level: read-committed # 消费者的事务隔离级别:read-uncommitted会导致脏读,可以读取生产者事务还未提交的消息
- # 消费者是否自动ack :true自动ack 消费者获取到消息后kafka提交消费者偏移量
- enable-auto-commit: true
- # 消费者提交ack时多长时间批量提交一次
- auto-commit-interval: 1000
- # 消费者第一次消费主题消息时从哪个位置开始
- auto-offset-reset: earliest #指定Offset消费:earliest | latest | none
- key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
- value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
复制代码 1.4、创建消耗者监听器
- package com.atguigu.spring.kafka.consumer.listener;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.springframework.kafka.annotation.KafkaListener;
- import org.springframework.stereotype.Component;
- @Component
- public class MyKafkaListener {
- @KafkaListener(topics ={"my_topic1"},groupId = "my_group1")
- public void onMessage1(ConsumerRecord<String, String> record) {
- System.out.println("my_group1消费者1获取到消息:topic = "+ record.topic()
- +",partition:"+record.partition()
- +",offset = "+record.offset()
- +",key = "+record.key()
- +",value = "+record.value());
- }
- @KafkaListener(topics ={"my_topic1"},groupId = "my_group1")
- public void onMessage2(ConsumerRecord<String, String> record) {
- System.out.println("my_group1消费者2获取到消息:topic = "+ record.topic()
- +",partition:"+record.partition()
- +",offset = "+record.offset()
- +",key = "+record.key()
- +",value = "+record.value());
- }
- @KafkaListener(topics ={"my_topic1"},groupId = "my_group2")
- public void onMessage3(ConsumerRecord<String, String> record) {
- System.out.println("my_group2消费者获取到消息:topic = "+ record.topic()
- +",partition:"+record.partition()
- +",offset = "+record.offset()
- +",key = "+record.key()
- +",value = "+record.value());
- }
- }
复制代码 1.5、创建SpringBoot启动类
- package com.atguigu.spring.kafka.consumer;
- import org.springframework.boot.SpringApplication;
- import org.springframework.boot.autoconfigure.SpringBootApplication;
- // Generated by https://start.springboot.io
- // 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn
- @SpringBootApplication
- public class SpringKafkaConsumerApplication {
- public static void main(String[] args) {
- SpringApplication.run(SpringKafkaConsumerApplication.class, args);
- }
- }
复制代码 1.6、屏蔽 kafka debug 日记 logback.xml
- <configuration>
- <!-- 如果觉得idea控制台日志太多,src\main\resources目录下新建logback.xml
- 屏蔽kafka debug -->
- <logger name="org.apache.kafka.clients" level="debug" />
- </configuration>
复制代码 1.7、引入spring-kafka依靠
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-parent</artifactId>
- <version>3.0.5</version>
- <relativePath/> <!-- lookup parent from repository -->
- </parent>
- <!-- Generated by https://start.springboot.io -->
- <!-- 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn -->
- <groupId>com.atguigu</groupId>
- <artifactId>spring-kafka-consumer</artifactId>
- <version>0.0.1-SNAPSHOT</version>
- <name>spring-kafka-consumer</name>
- <description>spring-kafka-consumer</description>
- <properties>
- <java.version>17</java.version>
- </properties>
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.kafka</groupId>
- <artifactId>spring-kafka</artifactId>
- </dependency>
- </dependencies>
- <build>
- <plugins>
- <plugin>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-maven-plugin</artifactId>
- </plugin>
- </plugins>
- </build>
- </project>
复制代码 1.8、消耗者控制台:
- . ____ _ __ _ _
- /\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
- ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
- \\/ ___)| |_)| | | | | || (_| | ) ) ) )
- ' |____| .__|_| |_|_| |_\__, | / / / /
- =========|_|==============|___/=/_/_/_/
- :: Spring Boot :: (v3.0.5)
- my_group2消费者获取到消息:topic = my_topic1,partition:2,offset = 0,key = ,value = 消费者组2
- my_group2消费者获取到消息:topic = my_topic1,partition:2,offset = 1,key = ,value = 消费者组8
- my_group2消费者获取到消息:topic = my_topic1,partition:0,offset = 0,key = ,value = 消费者组0
- my_group2消费者获取到消息:topic = my_topic1,partition:0,offset = 1,key = ,value = 消费者组6
- my_group2消费者获取到消息:topic = my_topic1,partition:5,offset = 0,key = ,value = 消费者组5
- my_group2消费者获取到消息:topic = my_topic1,partition:3,offset = 0,key = ,value = 消费者组3
- my_group2消费者获取到消息:topic = my_topic1,partition:3,offset = 1,key = ,value = 消费者组9
- my_group2消费者获取到消息:topic = my_topic1,partition:1,offset = 0,key = ,value = 消费者组1
- my_group2消费者获取到消息:topic = my_topic1,partition:1,offset = 1,key = ,value = 消费者组7
- my_group2消费者获取到消息:topic = my_topic1,partition:4,offset = 0,key = ,value = 消费者组4
- my_group1消费者2获取到消息:topic = my_topic1,partition:2,offset = 0,key = ,value = 消费者组2
- my_group1消费者2获取到消息:topic = my_topic1,partition:2,offset = 1,key = ,value = 消费者组8
- my_group1消费者1获取到消息:topic = my_topic1,partition:5,offset = 0,key = ,value = 消费者组5
- my_group1消费者1获取到消息:topic = my_topic1,partition:4,offset = 0,key = ,value = 消费者组4
- my_group1消费者1获取到消息:topic = my_topic1,partition:3,offset = 0,key = ,value = 消费者组3
- my_group1消费者1获取到消息:topic = my_topic1,partition:3,offset = 1,key = ,value = 消费者组9
- my_group1消费者2获取到消息:topic = my_topic1,partition:1,offset = 0,key = ,value = 消费者组1
- my_group1消费者2获取到消息:topic = my_topic1,partition:1,offset = 1,key = ,value = 消费者组7
- my_group1消费者2获取到消息:topic = my_topic1,partition:0,offset = 0,key = ,value = 消费者组0
- my_group1消费者2获取到消息:topic = my_topic1,partition:0,offset = 1,key = ,value = 消费者组6
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |