ToB企服应用市场:ToB评测及商务社交产业平台
标题:
SpringBoot-Learning系列之Kafka整合
[打印本页]
作者:
莱莱
时间:
2023-9-12 17:22
标题:
SpringBoot-Learning系列之Kafka整合
SpringBoot-Learning系列之Kafka整合
本系列是一个独立的SpringBoot学习系列,本着 What Why How 的思想去整合Java开发领域各种组件。
<ul>消息系统
主要应用场景
流量消峰(秒杀 抢购)、应用解耦(核心业务与非核心业务之间的解耦)
异步处理、顺序处理
实时数据传输管道
异构语言架构系统之间的通信
如 C语言的CS客户端的HIS系统与java语言开发的互联网在线诊疗系统的交互
Kafka是什么
kafka是一个消息队列产品,基于Topic partitions的设计,能达到非常高的消息发送处理性能。是java领域常用的消息队列。
核心概念:
生产者(Producer) 生产者应用向主题队列中投送消息数据
消费者 (Consumer) 消费者应用从订阅的Kafka的主题队列中获取数据、处理数据等后续操作
主题 (Topic) 可以理解为生产者与消费者交互的桥梁
分区 (Partition) 默认一个主题有一个分区,用户可以设置多个分区。每个分区可以有多个副本(Replica)。分区的作用是,将数据划分为多个小块,提高并发性和可扩展性。每个分区都有一个唯一的标识符,称为分区号。消息按照键(key)来进行分区,相同键的消息会被分配到同一个分区中。分区可以有不同的消费者同时消费。副本的作用是提供数据的冗余和故障恢复。每个分区可以有多个副本,其中一个被称为领导者(leader),其他副本被称为追随者(follower)。领导者负责处理读写请求,而追随者只负责复制领导者的数据。如果领导者宕机或不可用,某个追随者会被选举为新的领导者,保证数据的可用性。
windows 安装kafka
本地环境DockerDeskTop+WSL2,基于Docker方式安装Kafka
2.8.0后不需要依赖zk了
拉取镜像
docker pull wurstmeister/zookeeper
docker pull wurstmeister/kafka
复制代码
创建网络
docker network create kafka-net --driver bridge
复制代码
安装zk
docker run --net=kafka-net --name zookeeper -p 21810:2181 -d wurstmeister/zookeeper
复制代码
安装kafka
docker run -d --name kafka --publish 9092:9092 \
--link zookeeper \
--env KAFKA_ZOOKEEPER_CONNECT=172.31.192.1:2181 \
--env KAFKA_ADVERTISED_HOST_NAME=172.31.192.1 \
--env KAFKA_ADVERTISED_PORT=9092 \
--volume /etc/localtime:/etc/localtime \
wurstmeister/kafka:latest
复制代码
测试
telnet localhost:9092
复制代码
SpringBoot集成
SpringBoot3.1.0+jdk17
<ul>pom依赖
```
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.1.0</version>
<relativePath/>
</parent>
<groupId>io.github.vino42</groupId>
<artifactId>springboot-kafka</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<java.version>17</java.version>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.5.1</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.10.1</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.21</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>3.1.0</version>
</plugin>
</plugins>
</build>
</project>
```
复制代码
配置
spring:
kafka:
bootstrap-servers: 172.31.192.1:9092
producer:
retries: 0
# 每次批量发送消息的数量
batch-size: 16384
buffer-memory: 33554432
# 指定消息key和消息体的编解码方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
listener:
missing-topics-fatal: false
# MANUAL poll()拉取一批消息,处理完业务后,手动调用Acknowledgment.acknowledge()先将offset存放到map本地缓存,在下一次poll之前从缓存拿出来批量提交
# MANUAL_IMMEDIATE 每处理完业务手动调用Acknowledgment.acknowledge()后立即提交
# RECORD 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
# BATCH 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
# TIME 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交
# COUNT 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交
# COUNT_TIME TIME或COUNT满足其中一个时提交
ack-mode: manual_immediate
consumer:
group-id: test
# 是否自动提交
enable-auto-commit: false
max-poll-records: 100
# 用于指定消费者在启动时、重置消费偏移量时的行为。
# earliest:消费者会将消费偏移量重置为最早的可用偏移量,也就是从最早的消息开始消费。
# latest:消费者会将消费偏移量重置为最新的可用偏移量,也就是只消费最新发送的消息。
# none:如果找不到已保存的消费偏移量,消费者会抛出一个异常
auto-offset-reset: earliest
auto-commit-interval: 100
# 指定消息key和消息体的编解码方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties:
max.poll.interval.ms: 3600000
server:
port: 8888spring:
kafka:
bootstrap-servers: 172.31.192.1:9092
producer:
retries: 0
# 每次批量发送消息的数量
batch-size: 16384
buffer-memory: 33554432
# 指定消息key和消息体的编解码方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
listener:
missing-topics-fatal: false
ack-mode: manual_immediate
consumer:
group-id: test
enable-auto-commit: false
max-poll-records: 100
auto-offset-reset: earliest
auto-commit-interval: 100
# 指定消息key和消息体的编解码方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties:
max.poll.interval.ms: 3600000
复制代码
生产者代码示例
package io.github.vino42.publiser;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
/**
* =====================================================================================
*
* @Created : 2023/8/30 21:29
* @Compiler : jdk 17
* @Author : VINO
* @Copyright : VINO
* @Decription : kafak 消息生产者
* =====================================================================================
*/
@Component
public class KafkaPublishService {
@Autowired
KafkaTemplate kafkaTemplate;
/**
* 这里为了简单 直接发送json字符串
*
* @param json
*/
public void send(String topic, String json) {
kafkaTemplate.send(topic, json);
}
}
复制代码
@RequestMapping("/send")
public String send() {
IntStream.range(0, 10000).forEach(d -> {
kafkaPublishService.send("test", RandomUtil.randomString(16));
});
return "ok";
}
复制代码
消费者
[code]@Component@Slf4jpublic class CustomKafkaListener { @org.springframework.kafka.annotation.KafkaListener(topics = "test") public void listenUser(ConsumerRecord
欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/)
Powered by Discuz! X3.4