kafka-生产者监听器(SpringBoot整合Kafka)

嚴華  金牌会员 | 2024-6-25 02:50:53 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 854|帖子 854|积分 2562

1、生产者监听器

1.1、创建生产者监听器

  1. package com.atguigu.kafka.listener;
  2. import org.apache.kafka.clients.producer.ProducerRecord;
  3. import org.apache.kafka.clients.producer.RecordMetadata;
  4. import org.springframework.kafka.support.ProducerListener;
  5. @Component
  6. public class MyKafkaProducerListener implements ProducerListener<String,String> {
  7.     //生产者 ack 配置为 0 只要发送即成功
  8.     //ack为 1  leader落盘  broker ack之后 才成功
  9.     //ack为 -1 分区所有副本全部落盘  broker ack之后 才成功
  10.     @Override
  11.     public void onSuccess(ProducerRecord<String, String> producerRecord, RecordMetadata recordMetadata) {
  12.         //ProducerListener.super.onSuccess(producerRecord, recordMetadata);
  13.         System.out.println("MyKafkaProducerListener消息发送成功:"+"topic="+producerRecord.topic()
  14.         +",partition = "+producerRecord.partition()
  15.         +",key = "+producerRecord.key()
  16.         +",value = "+producerRecord.value()
  17.         +",offset = "+recordMetadata.offset());
  18.     }
  19.     //消息发送失败的回调:监听器可以接收到发送失败的消息 可以记录失败的消息
  20.     @Override
  21.     public void onError(ProducerRecord<String, String> producerRecord, RecordMetadata recordMetadata, Exception exception) {
  22.         System.out.println("MyKafkaProducerListener消息发送失败:"+"topic="+producerRecord.topic()
  23.                 +",partition = "+producerRecord.partition()
  24.                 +",key = "+producerRecord.key()
  25.                 +",value = "+producerRecord.value()
  26.                 +",offset = "+recordMetadata.offset());
  27.         System.out.println("异常信息:" + exception.getMessage());
  28.     }
  29. }
复制代码
1.2、创建生产者拦截器

  1. package com.atguigu.kafka.interceptor;
  2. import org.apache.kafka.clients.producer.ProducerInterceptor;
  3. import org.apache.kafka.clients.producer.ProducerRecord;
  4. import org.apache.kafka.clients.producer.RecordMetadata;
  5. import org.springframework.stereotype.Component;
  6. import java.util.Map;
  7. //拦截器必须手动注册给kafka生产者(KafkaTemplate)
  8. @Component
  9. public class MyKafkaInterceptor implements ProducerInterceptor<String,String> {
  10.     //kafka生产者发送消息前执行:拦截发送的消息预处理
  11.     @Override
  12.     public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {
  13.         System.out.println("生产者即将发送消息:topic = "+ producerRecord.topic()
  14.         +",partition:"+producerRecord.partition()
  15.         +",key = "+producerRecord.key()
  16.         +",value = "+producerRecord.value());
  17.         return null;
  18.     }
  19.     //kafka broker 给出应答后执行
  20.     @Override
  21.     public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
  22.         //exception为空表示消息发送成功
  23.         if(e == null){
  24.             System.out.println("消息发送成功:topic = "+ recordMetadata.topic()
  25.                     +",partition:"+recordMetadata.partition()
  26.                     +",offset="+recordMetadata.offset()
  27.             +",timestamp="+recordMetadata.timestamp());
  28.         }
  29.     }
  30.     @Override
  31.     public void close() {
  32.     }
  33.     @Override
  34.     public void configure(Map<String, ?> map) {
  35.     }
  36. }
复制代码
1.3、发送消息测试

  1. package com.atguigu.kafka.producer;
  2. import com.atguigu.kafka.interceptor.MyKafkaInterceptor;
  3. import jakarta.annotation.PostConstruct;
  4. import jakarta.annotation.Resource;
  5. import org.junit.jupiter.api.Test;
  6. import org.springframework.boot.test.context.SpringBootTest;
  7. import org.springframework.kafka.core.KafkaTemplate;
  8. import java.io.IOException;
  9. @SpringBootTest
  10. class KafkaProducerApplicationTests {
  11.     //装配kafka模板类: springboot启动时会自动根据配置文初始化kafka模板类对象注入到容器中
  12.     @Resource
  13.     KafkaTemplate kafkaTemplate;
  14.     @Resource
  15.     MyKafkaInterceptor myKafkaInterceptor;
  16.     @PostConstruct
  17.     public void init() {
  18.         kafkaTemplate.setProducerInterceptor(myKafkaInterceptor);
  19.     }
  20.     @Test
  21.     void contextLoads() throws IOException {
  22.         kafkaTemplate.send("my_topic1", "spring-kafka-生产者监听器");
  23.         //回调是等kafka,ack以后才执行,需要阻塞
  24.         System.in.read();
  25.     }
  26. }
复制代码
1.4、利用Java代码创建主题分区副本

  1. package com.atguigu.kafka.config;
  2. import org.apache.kafka.clients.admin.NewTopic;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.kafka.config.TopicBuilder;
  5. import org.springframework.stereotype.Component;
  6. @Component
  7. public class KafkaTopicConfig {
  8.     @Bean
  9.     public NewTopic myTopic1() {
  10.         //相同名称的主题 只会创建一次,后面创建的主题名称相同配置不同可以做增量更新(分区、副本数)
  11.         return TopicBuilder.name("my_topic1")//主题名称
  12.                 .partitions(3)//主题分区
  13.                 .replicas(3)//主题分区副本数
  14.                 .build();//创建
  15.     }
  16. }
复制代码
1.5、application.yml配置----v1版

  1. server:
  2.   port: 8110
  3. # v1
  4. spring:
  5.   kafka:
  6.     bootstrap-servers: 192.168.74.148:9095,192.168.74.148:9096,192.168.74.148:9097
  7.     producer: # producer 生产者
  8.       retries: 0 # 重试次数 0表示不重试
  9.       acks: -1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、-1/all)
  10.       batch-size: 16384 # 批次大小 单位byte
  11.       buffer-memory: 33554432 # 生产者缓冲区大小 单位byte
  12.       key-serializer: org.apache.kafka.common.serialization.StringSerializer # key的序列化器
  13.       value-serializer: org.apache.kafka.common.serialization.StringSerializer # value的序列化器
复制代码
1.6、屏蔽 kafka debug 日志 logback.xml

  1. <configuration>      
  2.     <!-- 如果觉得idea控制台日志太多,src\main\resources目录下新建logback.xml
  3. 屏蔽kafka debug -->
  4.     <logger name="org.apache.kafka.clients" level="debug" />
  5. </configuration>
复制代码
1.7、引入spring-kafka依靠

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3.          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  4.     <modelVersion>4.0.0</modelVersion>
  5.     <parent>
  6.         <groupId>org.springframework.boot</groupId>
  7.         <artifactId>spring-boot-starter-parent</artifactId>
  8.         <version>3.0.5</version>
  9.         <relativePath/> <!-- lookup parent from repository -->
  10.     </parent>
  11.     <!-- Generated by https://start.springboot.io -->
  12.     <!-- 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn -->
  13.     <groupId>com.atguigu.kafka</groupId>
  14.     <artifactId>kafka-producer</artifactId>
  15.     <version>0.0.1-SNAPSHOT</version>
  16.     <name>kafka-producer</name>
  17.     <description>kafka-producer</description>
  18.     <properties>
  19.         <java.version>17</java.version>
  20.     </properties>
  21.     <dependencies>
  22.         <dependency>
  23.             <groupId>org.springframework.boot</groupId>
  24.             <artifactId>spring-boot-starter</artifactId>
  25.         </dependency>
  26.         <dependency>
  27.             <groupId>org.springframework.boot</groupId>
  28.             <artifactId>spring-boot-starter-test</artifactId>
  29.             <scope>test</scope>
  30.         </dependency>
  31.         <dependency>
  32.             <groupId>org.springframework.kafka</groupId>
  33.             <artifactId>spring-kafka</artifactId>
  34.         </dependency>
  35.     </dependencies>
  36.     <build>
  37.         <plugins>
  38.             <plugin>
  39.                 <groupId>org.springframework.boot</groupId>
  40.                 <artifactId>spring-boot-maven-plugin</artifactId>
  41.             </plugin>
  42.         </plugins>
  43.     </build>
  44. </project>
复制代码
1.8、控制台日志

  1. 生产者即将发送消息:topic = my_topic1,partition:null,key = null,value = spring-kafka-生产者监听器
  2. 消息发送成功:topic = my_topic1,partition:0,offset=0,timestamp=1717573749549
  3. MyKafkaProducerListener消息发送成功:topic=my_topic1,partition = null,key = null,value = spring-kafka-生产者监听器,offset = 0
复制代码


  1. [
  2.   [
  3.     {
  4.       "partition": 0,
  5.       "offset": 0,
  6.       "msg": "spring-kafka-生产者监听器",
  7.       "timespan": 1717573749549,
  8.       "date": "2024-06-05 07:49:09"
  9.     }
  10.   ]
  11. ]
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

嚴華

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表