ToB企服应用市场:ToB评测及商务社交产业平台

标题: SpringBoot-Learning系列之Kafka整合 [打印本页]

作者: 莱莱    时间: 2023-9-12 17:22
标题: SpringBoot-Learning系列之Kafka整合
SpringBoot-Learning系列之Kafka整合

本系列是一个独立的SpringBoot学习系列,本着 What Why How 的思想去整合Java开发领域各种组件。

<ul>消息系统
Kafka是什么
kafka是一个消息队列产品,基于Topic partitions的设计,能达到非常高的消息发送处理性能。是java领域常用的消息队列。
核心概念:
windows 安装kafka
本地环境DockerDeskTop+WSL2,基于Docker方式安装Kafka
2.8.0后不需要依赖zk了
SpringBoot集成
SpringBoot3.1.0+jdk17
<ul>pom依赖
  1.                                                                 ```
  2.                                                                                 <?xml version="1.0" encoding="UTF-8"?>
  3.                                                                                 <project xmlns="http://maven.apache.org/POM/4.0.0"
  4.                                                                                                                  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  5.                                                                                                                  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  6.                                                                                                 <modelVersion>4.0.0</modelVersion>
  7.                                                                                                 <parent>
  8.                                                                                                                 <groupId>org.springframework.boot</groupId>
  9.                                                                                                                 <artifactId>spring-boot-starter-parent</artifactId>
  10.                                                                                                                 <version>3.1.0</version>
  11.                                                                                                                 <relativePath/>
  12.                                                                                                 </parent>
  13.                                                                                                 <groupId>io.github.vino42</groupId>
  14.                                                                                                 <artifactId>springboot-kafka</artifactId>
  15.                                                                                                 <version>1.0-SNAPSHOT</version>
  16.                                                                                                 <properties>
  17.                                                                                                                 <java.version>17</java.version>
  18.                                                                                                                 <maven.compiler.source>17</maven.compiler.source>
  19.                                                                                                                 <maven.compiler.target>17</maven.compiler.target>
  20.                                                                                                                 <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  21.                                                                                                 </properties>
  22.                                                                                                 <dependencies>
  23.                                                                                                                 <dependency>
  24.                                                                                                                                 <groupId>org.projectlombok</groupId>
  25.                                                                                                                                 <artifactId>lombok</artifactId>
  26.                                                                                                                                 <optional>true</optional>
  27.                                                                                                                 </dependency>
  28.                                                                                                                 <dependency>
  29.                                                                                                                                 <groupId>org.springframework.boot</groupId>
  30.                                                                                                                                 <artifactId>spring-boot-starter-test</artifactId>
  31.                                                                                                                                 <scope>test</scope>
  32.                                                                                                                                 <exclusions>
  33.                                                                                                                                                 <exclusion>
  34.                                                                                                                                                                 <groupId>org.springframework.boot</groupId>
  35.                                                                                                                                                                 <artifactId>spring-boot-starter-logging</artifactId>
  36.                                                                                                                                                 </exclusion>
  37.                                                                                                                                 </exclusions>
  38.                                                                                                                 </dependency>
  39.                                                                                                                 <dependency>
  40.                                                                                                                                 <groupId>org.springframework.boot</groupId>
  41.                                                                                                                                 <artifactId>spring-boot-starter-web</artifactId>
  42.                                                                                                                                 <exclusions>
  43.                                                                                                                                                 <exclusion>
  44.                                                                                                                                                                 <groupId>org.springframework.boot</groupId>
  45.                                                                                                                                                                 <artifactId>spring-boot-starter-logging</artifactId>
  46.                                                                                                                                                 </exclusion>
  47.                                                                                                                                 </exclusions>
  48.                                                                                                                 </dependency>
  49.                                                                                                                 <dependency>
  50.                                                                                                                                 <groupId>org.springframework.boot</groupId>
  51.                                                                                                                                 <artifactId>spring-boot-starter-log4j2</artifactId>
  52.                                                                                                                 </dependency>
  53.                                                                                                                
  54.                                                                                                                 <dependency>
  55.                                                                                                                                 <groupId>org.springframework.kafka</groupId>
  56.                                                                                                                                 <artifactId>spring-kafka</artifactId>
  57.                                                                                                                                 <exclusions>
  58.                                                                                                                                                
  59.                                                                                                                                                 <exclusion>
  60.                                                                                                                                                                 <groupId>org.apache.kafka</groupId>
  61.                                                                                                                                                                 <artifactId>kafka-clients</artifactId>
  62.                                                                                                                                                 </exclusion>
  63.                                                                                                                                 </exclusions>
  64.                                                                                                                 </dependency>
  65.                                                                                                                 <dependency>
  66.                                                                                                                                 <groupId>org.apache.kafka</groupId>
  67.                                                                                                                                 <artifactId>kafka-clients</artifactId>
  68.                                                                                                                                 <version>3.5.1</version>
  69.                                                                                                                 </dependency>
  70.                                                                                                                 <dependency>
  71.                                                                                                                                 <groupId>com.google.code.gson</groupId>
  72.                                                                                                                                 <artifactId>gson</artifactId>
  73.                                                                                                                                 <version>2.10.1</version>
  74.                                                                                                                 </dependency>
  75.                                                                                                                 <dependency>
  76.                                                                                                                                 <groupId>cn.hutool</groupId>
  77.                                                                                                                                 <artifactId>hutool-all</artifactId>
  78.                                                                                                                                 <version>5.8.21</version>
  79.                                                                                                                 </dependency>
  80.                                                                                                 </dependencies>
  81.                                                                                                 <build>
  82.                                                                                                                 <plugins>
  83.                                                                                                                                 <plugin>
  84.                                                                                                                                                 <groupId>org.springframework.boot</groupId>
  85.                                                                                                                                                 <artifactId>spring-boot-maven-plugin</artifactId>
  86.                                                                                                                                                 <version>3.1.0</version>
  87.                                                                                                                                 </plugin>
  88.                                                                                                                 </plugins>
  89.                                                                                                 </build>
  90.                                                                                 </project>
  91.                                                 ```
复制代码
配置
  1. spring:
  2.   kafka:
  3.     bootstrap-servers: 172.31.192.1:9092
  4.     producer:
  5.       retries: 0
  6.       # 每次批量发送消息的数量
  7.       batch-size: 16384
  8.       buffer-memory: 33554432
  9.       # 指定消息key和消息体的编解码方式
  10.       key-serializer: org.apache.kafka.common.serialization.StringSerializer
  11.       value-serializer: org.apache.kafka.common.serialization.StringSerializer
  12.     listener:
  13.       missing-topics-fatal: false
  14. #      MANUAL        poll()拉取一批消息,处理完业务后,手动调用Acknowledgment.acknowledge()先将offset存放到map本地缓存,在下一次poll之前从缓存拿出来批量提交
  15.       #      MANUAL_IMMEDIATE        每处理完业务手动调用Acknowledgment.acknowledge()后立即提交
  16.       #      RECORD        当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
  17.       #      BATCH        当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
  18.       #      TIME        当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交
  19.       #      COUNT        当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交
  20.       #      COUNT_TIME        TIME或COUNT满足其中一个时提交
  21.       ack-mode: manual_immediate
  22.     consumer:
  23.       group-id: test
  24.       # 是否自动提交
  25.       enable-auto-commit: false
  26.       max-poll-records: 100
  27.       #      用于指定消费者在启动时、重置消费偏移量时的行为。
  28.       #      earliest:消费者会将消费偏移量重置为最早的可用偏移量,也就是从最早的消息开始消费。
  29.       #      latest:消费者会将消费偏移量重置为最新的可用偏移量,也就是只消费最新发送的消息。
  30.       #      none:如果找不到已保存的消费偏移量,消费者会抛出一个异常
  31.       auto-offset-reset: earliest
  32.       auto-commit-interval: 100
  33.       # 指定消息key和消息体的编解码方式
  34.       key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  35.       value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  36.       properties:
  37.         max.poll.interval.ms: 3600000
  38. server:
  39.   port: 8888spring:
  40.   kafka:
  41.     bootstrap-servers: 172.31.192.1:9092
  42.     producer:
  43.       retries: 0
  44.       # 每次批量发送消息的数量
  45.       batch-size: 16384
  46.       buffer-memory: 33554432
  47.       # 指定消息key和消息体的编解码方式
  48.       key-serializer: org.apache.kafka.common.serialization.StringSerializer
  49.       value-serializer: org.apache.kafka.common.serialization.StringSerializer
  50.     listener:
  51.       missing-topics-fatal: false
  52.       ack-mode: manual_immediate
  53.     consumer:
  54.       group-id: test
  55.       enable-auto-commit: false
  56.       max-poll-records: 100
  57.       auto-offset-reset: earliest
  58.       auto-commit-interval: 100
  59.       # 指定消息key和消息体的编解码方式
  60.       key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  61.       value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  62.       properties:
  63.         max.poll.interval.ms: 3600000
复制代码
生产者代码示例
  1. package io.github.vino42.publiser;
  2. import com.google.gson.Gson;
  3. import com.google.gson.GsonBuilder;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.kafka.core.KafkaTemplate;
  6. import org.springframework.stereotype.Component;
  7. /**
  8. * =====================================================================================
  9. *
  10. * @Created :   2023/8/30 21:29
  11. * @Compiler :  jdk 17
  12. * @Author :    VINO
  13. * @Copyright : VINO
  14. * @Decription : kafak 消息生产者
  15. * =====================================================================================
  16. */
  17. @Component
  18. public class KafkaPublishService {
  19.     @Autowired
  20.     KafkaTemplate kafkaTemplate;
  21.     /**
  22.      * 这里为了简单 直接发送json字符串
  23.      *
  24.      * @param json
  25.      */
  26.     public void send(String topic, String json) {
  27.         kafkaTemplate.send(topic, json);
  28.     }
  29. }
复制代码
  1.     @RequestMapping("/send")
  2.     public String send() {
  3.         IntStream.range(0, 10000).forEach(d -> {
  4.             kafkaPublishService.send("test", RandomUtil.randomString(16));
  5.         });
  6.         return "ok";
  7.     }
复制代码
消费者
[code]@Component@Slf4jpublic class CustomKafkaListener {    @org.springframework.kafka.annotation.KafkaListener(topics = "test")    public void listenUser(ConsumerRecord




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4