kafka-消耗者组(SpringBoot整合Kafka)

海哥  金牌会员 | 2024-6-25 03:45:00 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 836|帖子 836|积分 2508

1、消耗者组

1.1、利用 efak 创建 主题 my_topic1 并建立6个分区并给每个分区建立3个副本


1.2、创建生产者发送消息

  1. package com.atguigu.spring.kafka.consumer;
  2. import jakarta.annotation.Resource;
  3. import org.junit.jupiter.api.Test;
  4. import org.springframework.boot.test.context.SpringBootTest;
  5. import org.springframework.kafka.core.KafkaTemplate;
  6. @SpringBootTest
  7. class SpringKafkaConsumerApplicationTests {
  8.     @Resource
  9.     KafkaTemplate kafkaTemplate;
  10.     @Test
  11.     void contextLoads() {
  12.         for (int i = 0; i < 10; i++) {
  13.             kafkaTemplate.send("my_topic1",i%6,"", "消费者组"+i);
  14.         }
  15.     }
  16. }
复制代码


1.3、application.yml配置

  1. server:
  2.   port: 8120
  3. # v1
  4. spring:
  5.   Kafka:
  6.     bootstrap-servers: 192.168.74.148:9095,192.168.74.148:9096,192.168.74.148:9097
  7.     consumer:
  8.       # read-committed读事务已提交的消息 解决脏读问题
  9.       isolation-level: read-committed # 消费者的事务隔离级别:read-uncommitted会导致脏读,可以读取生产者事务还未提交的消息
  10.       # 消费者是否自动ack :true自动ack 消费者获取到消息后kafka提交消费者偏移量
  11.       enable-auto-commit: true
  12.       # 消费者提交ack时多长时间批量提交一次
  13.       auto-commit-interval: 1000
  14.       # 消费者第一次消费主题消息时从哪个位置开始
  15.       auto-offset-reset: earliest  #指定Offset消费:earliest | latest | none
  16.       key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  17.       value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
复制代码
1.4、创建消耗者监听器

  1. package com.atguigu.spring.kafka.consumer.listener;
  2. import org.apache.kafka.clients.consumer.ConsumerRecord;
  3. import org.springframework.kafka.annotation.KafkaListener;
  4. import org.springframework.stereotype.Component;
  5. @Component
  6. public class MyKafkaListener {
  7.     @KafkaListener(topics ={"my_topic1"},groupId = "my_group1")
  8.     public void onMessage1(ConsumerRecord<String, String> record) {
  9.         System.out.println("my_group1消费者1获取到消息:topic = "+ record.topic()
  10.                 +",partition:"+record.partition()
  11.                 +",offset = "+record.offset()
  12.                 +",key = "+record.key()
  13.                 +",value = "+record.value());
  14.     }
  15.     @KafkaListener(topics ={"my_topic1"},groupId = "my_group1")
  16.     public void onMessage2(ConsumerRecord<String, String> record) {
  17.         System.out.println("my_group1消费者2获取到消息:topic = "+ record.topic()
  18.                 +",partition:"+record.partition()
  19.                 +",offset = "+record.offset()
  20.                 +",key = "+record.key()
  21.                 +",value = "+record.value());
  22.     }
  23.     @KafkaListener(topics ={"my_topic1"},groupId = "my_group2")
  24.     public void onMessage3(ConsumerRecord<String, String> record) {
  25.         System.out.println("my_group2消费者获取到消息:topic = "+ record.topic()
  26.                 +",partition:"+record.partition()
  27.                 +",offset = "+record.offset()
  28.                 +",key = "+record.key()
  29.                 +",value = "+record.value());
  30.     }
  31. }
复制代码
1.5、创建SpringBoot启动类

  1. package com.atguigu.spring.kafka.consumer;
  2. import org.springframework.boot.SpringApplication;
  3. import org.springframework.boot.autoconfigure.SpringBootApplication;
  4. // Generated by https://start.springboot.io
  5. // 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn
  6. @SpringBootApplication
  7. public class SpringKafkaConsumerApplication {
  8.     public static void main(String[] args) {
  9.         SpringApplication.run(SpringKafkaConsumerApplication.class, args);
  10.     }
  11. }
复制代码
1.6、屏蔽 kafka debug 日记 logback.xml

  1. <configuration>      
  2.     <!-- 如果觉得idea控制台日志太多,src\main\resources目录下新建logback.xml
  3. 屏蔽kafka debug -->
  4.     <logger name="org.apache.kafka.clients" level="debug" />
  5. </configuration>
复制代码
1.7、引入spring-kafka依靠

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3.          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  4.     <modelVersion>4.0.0</modelVersion>
  5.     <parent>
  6.         <groupId>org.springframework.boot</groupId>
  7.         <artifactId>spring-boot-starter-parent</artifactId>
  8.         <version>3.0.5</version>
  9.         <relativePath/> <!-- lookup parent from repository -->
  10.     </parent>
  11.     <!-- Generated by https://start.springboot.io -->
  12.     <!-- 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn -->
  13.     <groupId>com.atguigu</groupId>
  14.     <artifactId>spring-kafka-consumer</artifactId>
  15.     <version>0.0.1-SNAPSHOT</version>
  16.     <name>spring-kafka-consumer</name>
  17.     <description>spring-kafka-consumer</description>
  18.     <properties>
  19.         <java.version>17</java.version>
  20.     </properties>
  21.     <dependencies>
  22.         <dependency>
  23.             <groupId>org.springframework.boot</groupId>
  24.             <artifactId>spring-boot-starter</artifactId>
  25.         </dependency>
  26.         <dependency>
  27.             <groupId>org.springframework.boot</groupId>
  28.             <artifactId>spring-boot-starter-test</artifactId>
  29.             <scope>test</scope>
  30.         </dependency>
  31.         <dependency>
  32.             <groupId>org.springframework.boot</groupId>
  33.             <artifactId>spring-boot-starter-web</artifactId>
  34.         </dependency>
  35.         <dependency>
  36.             <groupId>org.springframework.kafka</groupId>
  37.             <artifactId>spring-kafka</artifactId>
  38.         </dependency>
  39.     </dependencies>
  40.     <build>
  41.         <plugins>
  42.             <plugin>
  43.                 <groupId>org.springframework.boot</groupId>
  44.                 <artifactId>spring-boot-maven-plugin</artifactId>
  45.             </plugin>
  46.         </plugins>
  47.     </build>
  48. </project>
复制代码
1.8、消耗者控制台:

  1.   .   ____          _            __ _ _
  2. /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
  3. ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
  4. \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  5.   '  |____| .__|_| |_|_| |_\__, | / / / /
  6. =========|_|==============|___/=/_/_/_/
  7. :: Spring Boot ::                (v3.0.5)
  8. my_group2消费者获取到消息:topic = my_topic1,partition:2,offset = 0,key = ,value = 消费者组2
  9. my_group2消费者获取到消息:topic = my_topic1,partition:2,offset = 1,key = ,value = 消费者组8
  10. my_group2消费者获取到消息:topic = my_topic1,partition:0,offset = 0,key = ,value = 消费者组0
  11. my_group2消费者获取到消息:topic = my_topic1,partition:0,offset = 1,key = ,value = 消费者组6
  12. my_group2消费者获取到消息:topic = my_topic1,partition:5,offset = 0,key = ,value = 消费者组5
  13. my_group2消费者获取到消息:topic = my_topic1,partition:3,offset = 0,key = ,value = 消费者组3
  14. my_group2消费者获取到消息:topic = my_topic1,partition:3,offset = 1,key = ,value = 消费者组9
  15. my_group2消费者获取到消息:topic = my_topic1,partition:1,offset = 0,key = ,value = 消费者组1
  16. my_group2消费者获取到消息:topic = my_topic1,partition:1,offset = 1,key = ,value = 消费者组7
  17. my_group2消费者获取到消息:topic = my_topic1,partition:4,offset = 0,key = ,value = 消费者组4
  18. my_group1消费者2获取到消息:topic = my_topic1,partition:2,offset = 0,key = ,value = 消费者组2
  19. my_group1消费者2获取到消息:topic = my_topic1,partition:2,offset = 1,key = ,value = 消费者组8
  20. my_group1消费者1获取到消息:topic = my_topic1,partition:5,offset = 0,key = ,value = 消费者组5
  21. my_group1消费者1获取到消息:topic = my_topic1,partition:4,offset = 0,key = ,value = 消费者组4
  22. my_group1消费者1获取到消息:topic = my_topic1,partition:3,offset = 0,key = ,value = 消费者组3
  23. my_group1消费者1获取到消息:topic = my_topic1,partition:3,offset = 1,key = ,value = 消费者组9
  24. my_group1消费者2获取到消息:topic = my_topic1,partition:1,offset = 0,key = ,value = 消费者组1
  25. my_group1消费者2获取到消息:topic = my_topic1,partition:1,offset = 1,key = ,value = 消费者组7
  26. my_group1消费者2获取到消息:topic = my_topic1,partition:0,offset = 0,key = ,value = 消费者组0
  27. my_group1消费者2获取到消息:topic = my_topic1,partition:0,offset = 1,key = ,value = 消费者组6
复制代码


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

海哥

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表