SpringBoot-Learning系列之Kafka整合

莱莱  金牌会员 | 2023-9-12 17:22:17 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 903|帖子 903|积分 2709

SpringBoot-Learning系列之Kafka整合

本系列是一个独立的SpringBoot学习系列,本着 What Why How 的思想去整合Java开发领域各种组件。

<ul>消息系统

  • 主要应用场景

    • 流量消峰(秒杀 抢购)、应用解耦(核心业务与非核心业务之间的解耦)
    • 异步处理、顺序处理
    • 实时数据传输管道
    • 异构语言架构系统之间的通信

      • 如 C语言的CS客户端的HIS系统与java语言开发的互联网在线诊疗系统的交互


Kafka是什么
kafka是一个消息队列产品,基于Topic partitions的设计,能达到非常高的消息发送处理性能。是java领域常用的消息队列。
核心概念:

  • 生产者(Producer) 生产者应用向主题队列中投送消息数据
  • 消费者 (Consumer)  消费者应用从订阅的Kafka的主题队列中获取数据、处理数据等后续操作
  • 主题 (Topic)  可以理解为生产者与消费者交互的桥梁
  • 分区 (Partition) 默认一个主题有一个分区,用户可以设置多个分区。每个分区可以有多个副本(Replica)。分区的作用是,将数据划分为多个小块,提高并发性和可扩展性。每个分区都有一个唯一的标识符,称为分区号。消息按照键(key)来进行分区,相同键的消息会被分配到同一个分区中。分区可以有不同的消费者同时消费。副本的作用是提供数据的冗余和故障恢复。每个分区可以有多个副本,其中一个被称为领导者(leader),其他副本被称为追随者(follower)。领导者负责处理读写请求,而追随者只负责复制领导者的数据。如果领导者宕机或不可用,某个追随者会被选举为新的领导者,保证数据的可用性。
windows 安装kafka
本地环境DockerDeskTop+WSL2,基于Docker方式安装Kafka
2.8.0后不需要依赖zk了

  • 拉取镜像
    1. docker pull wurstmeister/zookeeper
    2. docker pull wurstmeister/kafka
    复制代码
  • 创建网络
    1. docker network create kafka-net --driver bridge
    复制代码
  • 安装zk
    1. docker run --net=kafka-net --name zookeeper -p 21810:2181 -d wurstmeister/zookeeper
    复制代码
  • 安装kafka
    1. docker run -d --name kafka --publish 9092:9092 \
    2. --link zookeeper \
    3. --env KAFKA_ZOOKEEPER_CONNECT=172.31.192.1:2181 \
    4. --env KAFKA_ADVERTISED_HOST_NAME=172.31.192.1 \
    5. --env KAFKA_ADVERTISED_PORT=9092  \
    6. --volume /etc/localtime:/etc/localtime \
    7. wurstmeister/kafka:latest
    复制代码
  • 测试
    1. telnet localhost:9092
    复制代码
SpringBoot集成
SpringBoot3.1.0+jdk17
<ul>pom依赖
  1.                                                                 ```
  2.                                                                                 <?xml version="1.0" encoding="UTF-8"?>
  3.                                                                                 <project xmlns="http://maven.apache.org/POM/4.0.0"
  4.                                                                                                                  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  5.                                                                                                                  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  6.                                                                                                 <modelVersion>4.0.0</modelVersion>
  7.                                                                                                 <parent>
  8.                                                                                                                 <groupId>org.springframework.boot</groupId>
  9.                                                                                                                 <artifactId>spring-boot-starter-parent</artifactId>
  10.                                                                                                                 <version>3.1.0</version>
  11.                                                                                                                 <relativePath/>
  12.                                                                                                 </parent>
  13.                                                                                                 <groupId>io.github.vino42</groupId>
  14.                                                                                                 <artifactId>springboot-kafka</artifactId>
  15.                                                                                                 <version>1.0-SNAPSHOT</version>
  16.                                                                                                 <properties>
  17.                                                                                                                 <java.version>17</java.version>
  18.                                                                                                                 <maven.compiler.source>17</maven.compiler.source>
  19.                                                                                                                 <maven.compiler.target>17</maven.compiler.target>
  20.                                                                                                                 <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  21.                                                                                                 </properties>
  22.                                                                                                 <dependencies>
  23.                                                                                                                 <dependency>
  24.                                                                                                                                 <groupId>org.projectlombok</groupId>
  25.                                                                                                                                 <artifactId>lombok</artifactId>
  26.                                                                                                                                 <optional>true</optional>
  27.                                                                                                                 </dependency>
  28.                                                                                                                 <dependency>
  29.                                                                                                                                 <groupId>org.springframework.boot</groupId>
  30.                                                                                                                                 <artifactId>spring-boot-starter-test</artifactId>
  31.                                                                                                                                 <scope>test</scope>
  32.                                                                                                                                 <exclusions>
  33.                                                                                                                                                 <exclusion>
  34.                                                                                                                                                                 <groupId>org.springframework.boot</groupId>
  35.                                                                                                                                                                 <artifactId>spring-boot-starter-logging</artifactId>
  36.                                                                                                                                                 </exclusion>
  37.                                                                                                                                 </exclusions>
  38.                                                                                                                 </dependency>
  39.                                                                                                                 <dependency>
  40.                                                                                                                                 <groupId>org.springframework.boot</groupId>
  41.                                                                                                                                 <artifactId>spring-boot-starter-web</artifactId>
  42.                                                                                                                                 <exclusions>
  43.                                                                                                                                                 <exclusion>
  44.                                                                                                                                                                 <groupId>org.springframework.boot</groupId>
  45.                                                                                                                                                                 <artifactId>spring-boot-starter-logging</artifactId>
  46.                                                                                                                                                 </exclusion>
  47.                                                                                                                                 </exclusions>
  48.                                                                                                                 </dependency>
  49.                                                                                                                 <dependency>
  50.                                                                                                                                 <groupId>org.springframework.boot</groupId>
  51.                                                                                                                                 <artifactId>spring-boot-starter-log4j2</artifactId>
  52.                                                                                                                 </dependency>
  53.                                                                                                                
  54.                                                                                                                 <dependency>
  55.                                                                                                                                 <groupId>org.springframework.kafka</groupId>
  56.                                                                                                                                 <artifactId>spring-kafka</artifactId>
  57.                                                                                                                                 <exclusions>
  58.                                                                                                                                                
  59.                                                                                                                                                 <exclusion>
  60.                                                                                                                                                                 <groupId>org.apache.kafka</groupId>
  61.                                                                                                                                                                 <artifactId>kafka-clients</artifactId>
  62.                                                                                                                                                 </exclusion>
  63.                                                                                                                                 </exclusions>
  64.                                                                                                                 </dependency>
  65.                                                                                                                 <dependency>
  66.                                                                                                                                 <groupId>org.apache.kafka</groupId>
  67.                                                                                                                                 <artifactId>kafka-clients</artifactId>
  68.                                                                                                                                 <version>3.5.1</version>
  69.                                                                                                                 </dependency>
  70.                                                                                                                 <dependency>
  71.                                                                                                                                 <groupId>com.google.code.gson</groupId>
  72.                                                                                                                                 <artifactId>gson</artifactId>
  73.                                                                                                                                 <version>2.10.1</version>
  74.                                                                                                                 </dependency>
  75.                                                                                                                 <dependency>
  76.                                                                                                                                 <groupId>cn.hutool</groupId>
  77.                                                                                                                                 <artifactId>hutool-all</artifactId>
  78.                                                                                                                                 <version>5.8.21</version>
  79.                                                                                                                 </dependency>
  80.                                                                                                 </dependencies>
  81.                                                                                                 <build>
  82.                                                                                                                 <plugins>
  83.                                                                                                                                 <plugin>
  84.                                                                                                                                                 <groupId>org.springframework.boot</groupId>
  85.                                                                                                                                                 <artifactId>spring-boot-maven-plugin</artifactId>
  86.                                                                                                                                                 <version>3.1.0</version>
  87.                                                                                                                                 </plugin>
  88.                                                                                                                 </plugins>
  89.                                                                                                 </build>
  90.                                                                                 </project>
  91.                                                 ```
复制代码
配置
  1. spring:
  2.   kafka:
  3.     bootstrap-servers: 172.31.192.1:9092
  4.     producer:
  5.       retries: 0
  6.       # 每次批量发送消息的数量
  7.       batch-size: 16384
  8.       buffer-memory: 33554432
  9.       # 指定消息key和消息体的编解码方式
  10.       key-serializer: org.apache.kafka.common.serialization.StringSerializer
  11.       value-serializer: org.apache.kafka.common.serialization.StringSerializer
  12.     listener:
  13.       missing-topics-fatal: false
  14. #      MANUAL        poll()拉取一批消息,处理完业务后,手动调用Acknowledgment.acknowledge()先将offset存放到map本地缓存,在下一次poll之前从缓存拿出来批量提交
  15.       #      MANUAL_IMMEDIATE        每处理完业务手动调用Acknowledgment.acknowledge()后立即提交
  16.       #      RECORD        当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
  17.       #      BATCH        当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
  18.       #      TIME        当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交
  19.       #      COUNT        当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交
  20.       #      COUNT_TIME        TIME或COUNT满足其中一个时提交
  21.       ack-mode: manual_immediate
  22.     consumer:
  23.       group-id: test
  24.       # 是否自动提交
  25.       enable-auto-commit: false
  26.       max-poll-records: 100
  27.       #      用于指定消费者在启动时、重置消费偏移量时的行为。
  28.       #      earliest:消费者会将消费偏移量重置为最早的可用偏移量,也就是从最早的消息开始消费。
  29.       #      latest:消费者会将消费偏移量重置为最新的可用偏移量,也就是只消费最新发送的消息。
  30.       #      none:如果找不到已保存的消费偏移量,消费者会抛出一个异常
  31.       auto-offset-reset: earliest
  32.       auto-commit-interval: 100
  33.       # 指定消息key和消息体的编解码方式
  34.       key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  35.       value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  36.       properties:
  37.         max.poll.interval.ms: 3600000
  38. server:
  39.   port: 8888spring:
  40.   kafka:
  41.     bootstrap-servers: 172.31.192.1:9092
  42.     producer:
  43.       retries: 0
  44.       # 每次批量发送消息的数量
  45.       batch-size: 16384
  46.       buffer-memory: 33554432
  47.       # 指定消息key和消息体的编解码方式
  48.       key-serializer: org.apache.kafka.common.serialization.StringSerializer
  49.       value-serializer: org.apache.kafka.common.serialization.StringSerializer
  50.     listener:
  51.       missing-topics-fatal: false
  52.       ack-mode: manual_immediate
  53.     consumer:
  54.       group-id: test
  55.       enable-auto-commit: false
  56.       max-poll-records: 100
  57.       auto-offset-reset: earliest
  58.       auto-commit-interval: 100
  59.       # 指定消息key和消息体的编解码方式
  60.       key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  61.       value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  62.       properties:
  63.         max.poll.interval.ms: 3600000
复制代码
生产者代码示例
  1. package io.github.vino42.publiser;
  2. import com.google.gson.Gson;
  3. import com.google.gson.GsonBuilder;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.kafka.core.KafkaTemplate;
  6. import org.springframework.stereotype.Component;
  7. /**
  8. * =====================================================================================
  9. *
  10. * @Created :   2023/8/30 21:29
  11. * @Compiler :  jdk 17
  12. * @Author :    VINO
  13. * @Copyright : VINO
  14. * @Decription : kafak 消息生产者
  15. * =====================================================================================
  16. */
  17. @Component
  18. public class KafkaPublishService {
  19.     @Autowired
  20.     KafkaTemplate kafkaTemplate;
  21.     /**
  22.      * 这里为了简单 直接发送json字符串
  23.      *
  24.      * @param json
  25.      */
  26.     public void send(String topic, String json) {
  27.         kafkaTemplate.send(topic, json);
  28.     }
  29. }
复制代码
  1.     @RequestMapping("/send")
  2.     public String send() {
  3.         IntStream.range(0, 10000).forEach(d -> {
  4.             kafkaPublishService.send("test", RandomUtil.randomString(16));
  5.         });
  6.         return "ok";
  7.     }
复制代码
消费者
[code]@Component@Slf4jpublic class CustomKafkaListener {    @org.springframework.kafka.annotation.KafkaListener(topics = "test")    public void listenUser(ConsumerRecord

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

莱莱

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表