1、生产者监听器
1.1、创建生产者监听器
- package com.atguigu.kafka.listener;
- import org.apache.kafka.clients.producer.ProducerRecord;
- import org.apache.kafka.clients.producer.RecordMetadata;
- import org.springframework.kafka.support.ProducerListener;
- @Component
- public class MyKafkaProducerListener implements ProducerListener<String,String> {
- //生产者 ack 配置为 0 只要发送即成功
- //ack为 1 leader落盘 broker ack之后 才成功
- //ack为 -1 分区所有副本全部落盘 broker ack之后 才成功
- @Override
- public void onSuccess(ProducerRecord<String, String> producerRecord, RecordMetadata recordMetadata) {
- //ProducerListener.super.onSuccess(producerRecord, recordMetadata);
- System.out.println("MyKafkaProducerListener消息发送成功:"+"topic="+producerRecord.topic()
- +",partition = "+producerRecord.partition()
- +",key = "+producerRecord.key()
- +",value = "+producerRecord.value()
- +",offset = "+recordMetadata.offset());
- }
- //消息发送失败的回调:监听器可以接收到发送失败的消息 可以记录失败的消息
- @Override
- public void onError(ProducerRecord<String, String> producerRecord, RecordMetadata recordMetadata, Exception exception) {
- System.out.println("MyKafkaProducerListener消息发送失败:"+"topic="+producerRecord.topic()
- +",partition = "+producerRecord.partition()
- +",key = "+producerRecord.key()
- +",value = "+producerRecord.value()
- +",offset = "+recordMetadata.offset());
- System.out.println("异常信息:" + exception.getMessage());
- }
- }
复制代码 1.2、创建生产者拦截器
- package com.atguigu.kafka.interceptor;
- import org.apache.kafka.clients.producer.ProducerInterceptor;
- import org.apache.kafka.clients.producer.ProducerRecord;
- import org.apache.kafka.clients.producer.RecordMetadata;
- import org.springframework.stereotype.Component;
- import java.util.Map;
- //拦截器必须手动注册给kafka生产者(KafkaTemplate)
- @Component
- public class MyKafkaInterceptor implements ProducerInterceptor<String,String> {
- //kafka生产者发送消息前执行:拦截发送的消息预处理
- @Override
- public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {
- System.out.println("生产者即将发送消息:topic = "+ producerRecord.topic()
- +",partition:"+producerRecord.partition()
- +",key = "+producerRecord.key()
- +",value = "+producerRecord.value());
- return null;
- }
- //kafka broker 给出应答后执行
- @Override
- public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
- //exception为空表示消息发送成功
- if(e == null){
- System.out.println("消息发送成功:topic = "+ recordMetadata.topic()
- +",partition:"+recordMetadata.partition()
- +",offset="+recordMetadata.offset()
- +",timestamp="+recordMetadata.timestamp());
- }
- }
- @Override
- public void close() {
- }
- @Override
- public void configure(Map<String, ?> map) {
- }
- }
复制代码 1.3、发送消息测试
- package com.atguigu.kafka.producer;
- import com.atguigu.kafka.interceptor.MyKafkaInterceptor;
- import jakarta.annotation.PostConstruct;
- import jakarta.annotation.Resource;
- import org.junit.jupiter.api.Test;
- import org.springframework.boot.test.context.SpringBootTest;
- import org.springframework.kafka.core.KafkaTemplate;
- import java.io.IOException;
- @SpringBootTest
- class KafkaProducerApplicationTests {
- //装配kafka模板类: springboot启动时会自动根据配置文初始化kafka模板类对象注入到容器中
- @Resource
- KafkaTemplate kafkaTemplate;
- @Resource
- MyKafkaInterceptor myKafkaInterceptor;
- @PostConstruct
- public void init() {
- kafkaTemplate.setProducerInterceptor(myKafkaInterceptor);
- }
- @Test
- void contextLoads() throws IOException {
- kafkaTemplate.send("my_topic1", "spring-kafka-生产者监听器");
- //回调是等kafka,ack以后才执行,需要阻塞
- System.in.read();
- }
- }
复制代码 1.4、利用Java代码创建主题分区副本
- package com.atguigu.kafka.config;
- import org.apache.kafka.clients.admin.NewTopic;
- import org.springframework.context.annotation.Bean;
- import org.springframework.kafka.config.TopicBuilder;
- import org.springframework.stereotype.Component;
- @Component
- public class KafkaTopicConfig {
- @Bean
- public NewTopic myTopic1() {
- //相同名称的主题 只会创建一次,后面创建的主题名称相同配置不同可以做增量更新(分区、副本数)
- return TopicBuilder.name("my_topic1")//主题名称
- .partitions(3)//主题分区
- .replicas(3)//主题分区副本数
- .build();//创建
- }
- }
复制代码 1.5、application.yml配置----v1版
- server:
- port: 8110
- # v1
- spring:
- kafka:
- bootstrap-servers: 192.168.74.148:9095,192.168.74.148:9096,192.168.74.148:9097
- producer: # producer 生产者
- retries: 0 # 重试次数 0表示不重试
- acks: -1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、-1/all)
- batch-size: 16384 # 批次大小 单位byte
- buffer-memory: 33554432 # 生产者缓冲区大小 单位byte
- key-serializer: org.apache.kafka.common.serialization.StringSerializer # key的序列化器
- value-serializer: org.apache.kafka.common.serialization.StringSerializer # value的序列化器
复制代码 1.6、屏蔽 kafka debug 日志 logback.xml
- <configuration>
- <!-- 如果觉得idea控制台日志太多,src\main\resources目录下新建logback.xml
- 屏蔽kafka debug -->
- <logger name="org.apache.kafka.clients" level="debug" />
- </configuration>
复制代码 1.7、引入spring-kafka依靠
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-parent</artifactId>
- <version>3.0.5</version>
- <relativePath/> <!-- lookup parent from repository -->
- </parent>
- <!-- Generated by https://start.springboot.io -->
- <!-- 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn -->
- <groupId>com.atguigu.kafka</groupId>
- <artifactId>kafka-producer</artifactId>
- <version>0.0.1-SNAPSHOT</version>
- <name>kafka-producer</name>
- <description>kafka-producer</description>
- <properties>
- <java.version>17</java.version>
- </properties>
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.springframework.kafka</groupId>
- <artifactId>spring-kafka</artifactId>
- </dependency>
- </dependencies>
- <build>
- <plugins>
- <plugin>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-maven-plugin</artifactId>
- </plugin>
- </plugins>
- </build>
- </project>
复制代码 1.8、控制台日志
- 生产者即将发送消息:topic = my_topic1,partition:null,key = null,value = spring-kafka-生产者监听器
- 消息发送成功:topic = my_topic1,partition:0,offset=0,timestamp=1717573749549
- MyKafkaProducerListener消息发送成功:topic=my_topic1,partition = null,key = null,value = spring-kafka-生产者监听器,offset = 0
复制代码
- [
- [
- {
- "partition": 0,
- "offset": 0,
- "msg": "spring-kafka-生产者监听器",
- "timespan": 1717573749549,
- "date": "2024-06-05 07:49:09"
- }
- ]
- ]
复制代码 免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |