java发送数据到kafka

打印 上一主题 下一主题

主题 624|帖子 624|积分 1872

  1. import org.apache.kafka.clients.producer.KafkaProducer;
  2. import org.apache.kafka.clients.producer.ProducerConfig;
  3. import org.apache.kafka.clients.producer.ProducerRecord;
  4. import org.apache.kafka.clients.producer.RecordMetadata;
  5. import org.apache.kafka.clients.producer.Callback;
  6. import org.apache.kafka.clients.producer.KafkaException;
  7. import org.apache.kafka.common.serialization.StringSerializer;
  8. import java.util.Properties;
  9. public class KafkaTransactionalProducer {
  10.     public static void main(String[] args) {
  11.         Properties props = new Properties();
  12.         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.1.1.1:9000,4.4.4.4:8888");
  13.         props.put("security.protocol", "SASL_PLAINTEXT");
  14.         props.put("sasl.mechanism", "SCRAM-SHA-256");
  15.         props.put("sasl.kerberos.service.name", "target");
  16.         props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username="ee" password="eres";");
  17.         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  18.         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  19.         props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
  20.         props.put(ProducerConfig.ACKS_CONFIG, "all");
  21.         props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-unique-transactional-id"); // 自定义唯一事务ID
  22.         KafkaProducer<String, String> producer = new KafkaProducer<>(props);
  23.         try {
  24.             producer.initTransactions(); // 初始化事务
  25.             producer.beginTransaction(); // 开始事务
  26.             String topic = "your_topic";
  27.             String message = "{"key": "value"}"; // 要发送的消息
  28.             ProducerRecord<String, String> record = new ProducerRecord<>(topic, message);
  29.             
  30.             producer.send(record, new Callback() {
  31.                 public void onCompletion(RecordMetadata metadata, Exception exception) {
  32.                     if (exception != null) {
  33.                         System.err.println("Error sending message: " + exception.getMessage());
  34.                     } else {
  35.                         System.out.println("Message sent successfully to topic " + metadata.topic() +
  36.                                            " partition " + metadata.partition() +
  37.                                            " with offset " + metadata.offset());
  38.                     }
  39.                 }
  40.             });
  41.             producer.commitTransaction(); // 提交事务
  42.         } catch (KafkaException e) {
  43.             producer.abortTransaction(); // 事务失败,回滚
  44.             e.printStackTrace();
  45.         } finally {
  46.             producer.close();
  47.         }
  48.     }
  49. }
  50. 把这些代码改成一个独立的应用服务有自己的端口通过post请求传递json报文,然后给kafka发送消息
复制代码
要将 Kafka 生产者代码改成一个独立的应用服务,可以或许通过 POST 请求吸收 JSON 报文并将其发送到 Kafka,你可以使用 Spring Boot 框架来实现这个功能。Spring Boot 提供了便捷的 REST API 开发和设置管理功能。
以下是一个基于 Spring Boot 的示例项目,展示了怎样实现这个需求:
1. 创建 Spring Boot 项目

首先,你必要创建一个 Spring Boot 项目。如果你使用 Spring Initializr,可以选择以下依赖:


  • Spring Web
  • Spring Kafka
2. 添加 Maven 依赖

在 pom.xml 文件中,确保包含以下依赖
  1. <dependencies>
  2.     <dependency>
  3.         <groupId>org.springframework.boot</groupId>
  4.         <artifactId>spring-boot-starter-web</artifactId>
  5.     </dependency>
  6.     <dependency>
  7.         <groupId>org.springframework.kafka</groupId>
  8.         <artifactId>spring-kafka</artifactId>
  9.     </dependency>
  10. </dependencies>
复制代码
3. 设置 Kafka 生产者

在 application.properties 文件中设置 Kafka 生产者属性:
  1. spring.kafka.producer.bootstrap-servers=10.1.1.1:9000,4.4.4.4:8888
  2. spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
  3. spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
  4. spring.kafka.producer.enable-idempotence=true
  5. spring.kafka.producer.acks=all
  6. spring.kafka.producer.transaction-id-prefix=my-unique-transactional-id
  7. spring.kafka.producer.properties.security.protocol=SASL_PLAINTEXT
  8. spring.kafka.producer.properties.sasl.mechanism=SCRAM-SHA-256
  9. spring.kafka.producer.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="ee" password="eres";
  10. spring.kafka.producer.properties.sasl.kerberos.service.name=target
复制代码
4. 实现 Kafka 生产者服务

创建一个 KafkaProducerService 类来处理惩罚消息发送:
  1. package com.example.kafkaproducer.service;
  2. import org.apache.kafka.clients.producer.KafkaProducer;
  3. import org.apache.kafka.clients.producer.ProducerRecord;
  4. import org.apache.kafka.clients.producer.RecordMetadata;
  5. import org.apache.kafka.clients.producer.Callback;
  6. import org.apache.kafka.clients.producer.KafkaException;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.kafka.core.ProducerFactory;
  9. import org.springframework.kafka.core.KafkaTemplate;
  10. import org.springframework.kafka.core.ProducerFactory;
  11. import org.springframework.stereotype.Service;
  12. @Service
  13. public class KafkaProducerService {
  14.     private final KafkaTemplate<String, String> kafkaTemplate;
  15.     @Autowired
  16.     public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {
  17.         this.kafkaTemplate = kafkaTemplate;
  18.     }
  19.     public void sendMessage(String topic, String message) {
  20.         kafkaTemplate.executeInTransaction(kafkaTemplate -> {
  21.             kafkaTemplate.send(topic, message).addCallback(new org.springframework.util.concurrent.ListenableFutureCallback<>() {
  22.                 @Override
  23.                 public void onSuccess(RecordMetadata result) {
  24.                     System.out.println("Message sent successfully to topic " + result.topic() +
  25.                                        " partition " + result.partition() +
  26.                                        " with offset " + result.offset());
  27.                 }
  28.                 @Override
  29.                 public void onFailure(Throwable ex) {
  30.                     System.err.println("Error sending message: " + ex.getMessage());
  31.                 }
  32.             });
  33.             return null;
  34.         });
  35.     }
  36. }
复制代码
5. 创建 REST Controller

实现一个 REST 控制器来处理惩罚 POST 请求:
  1. package com.example.kafkaproducer.controller;
  2. import com.example.kafkaproducer.service.KafkaProducerService;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.web.bind.annotation.*;
  5. @RestController
  6. @RequestMapping("/api")
  7. public class KafkaController {
  8.     private final KafkaProducerService kafkaProducerService;
  9.     @Autowired
  10.     public KafkaController(KafkaProducerService kafkaProducerService) {
  11.         this.kafkaProducerService = kafkaProducerService;
  12.     }
  13.     @PostMapping("/send")
  14.     public String sendMessage(@RequestBody String message) {
  15.         String topic = "your_topic";
  16.         kafkaProducerService.sendMessage(topic, message);
  17.         return "Message sent to Kafka";
  18.     }
  19. }
复制代码
6. 启动 Spring Boot 应用

在你的 Application 类中启动 Spring Boot 应用:
  1. package com.example.kafkaproducer;
  2. import org.springframework.boot.SpringApplication;
  3. import org.springframework.boot.autoconfigure.SpringBootApplication;
  4. @SpringBootApplication
  5. public class KafkaProducerApplication {
  6.     public static void main(String[] args) {
  7.         SpringApplication.run(KafkaProducerApplication.class, args);
  8.     }
  9. }
复制代码
7. 运行应用并测试

启动 Spring Boot 应用后,你可以通过 POST 请求将 JSON 消息发送到 http://localhost:8080/api/send,好比使用 curl:
  1. curl -X POST http://localhost:8080/api/send -H "Content-Type: application/json" -d '{"key": "value"}'
复制代码
这样,Kafka 生产者服务就可以通过 REST API 吸收消息并将其发送到 Kafka 了。
上面的用java的1.8实现
如果你使用 Java 1.8,你可以按照以下步骤来实现一个独立的应用服务,该服务通过 POST 请求吸收 JSON 报文并将其发送到 Kafka。我们将不使用 Spring Boot,而是使用更底子的 Java Servlet 技能来完成这个使命。
1. 创建 Maven 项目

确保你的 pom.xml 包含必要的依赖:
  1. <dependencies>
  2.     <!-- Spring Boot Starter Web -->
  3.     <dependency>
  4.         <groupId>org.springframework.boot</groupId>
  5.         <artifactId>spring-boot-starter-web</artifactId>
  6.     </dependency>
  7.    
  8.     <!-- Spring Boot Starter Kafka -->
  9.     <dependency>
  10.         <groupId>org.springframework.kafka</groupId>
  11.         <artifactId>spring-kafka</artifactId>
  12.     </dependency>
  13.     <!-- JSON处理 -->
  14.     <dependency>
  15.         <groupId>com.google.code.gson</groupId>
  16.         <artifactId>gson</artifactId>
  17.         <version>2.10.1</version>
  18.     </dependency>
  19. </dependencies>
复制代码
2. 设置 Kafka 生产者

创建 KafkaProducerConfig 类来设置 Kafka 生产者:
  1. import org.apache.kafka.clients.producer.ProducerConfig;
  2. import org.apache.kafka.common.serialization.StringSerializer;
  3. import java.util.Properties;
  4. public class KafkaProducerConfig {
  5.     public static Properties getProducerProperties() {
  6.         Properties props = new Properties();
  7.         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.1.1.1:9000,4.4.4.4:8888");
  8.         props.put("security.protocol", "SASL_PLAINTEXT");
  9.         props.put("sasl.mechanism", "SCRAM-SHA-256");
  10.         props.put("sasl.kerberos.service.name", "target");
  11.         props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username="ee" password="eres";");
  12.         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  13.         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  14.         props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
  15.         props.put(ProducerConfig.ACKS_CONFIG, "all");
  16.         props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-unique-transactional-id");
  17.         return props;
  18.     }
  19. }
复制代码
3. 实现 Kafka 生产者 Servlet

创建 KafkaProducerServlet 类来处理惩罚 POST 请求并将消息发送到 Kafka:
  1. import com.google.gson.Gson;
  2. import org.apache.kafka.clients.producer.KafkaProducer;
  3. import org.apache.kafka.clients.producer.ProducerRecord;
  4. import org.apache.kafka.clients.producer.RecordMetadata;
  5. import org.apache.kafka.clients.producer.Callback;
  6. import org.apache.kafka.clients.producer.KafkaException;
  7. import javax.servlet.ServletException;
  8. import javax.servlet.annotation.WebServlet;
  9. import javax.servlet.http.HttpServlet;
  10. import javax.servlet.http.HttpServletRequest;
  11. import javax.servlet.http.HttpServletResponse;
  12. import java.io.IOException;
  13. @WebServlet("/api/send")
  14. public class KafkaProducerServlet extends HttpServlet {
  15.     private KafkaProducer<String, String> producer;
  16.     @Override
  17.     public void init() throws ServletException {
  18.         super.init();
  19.         producer = new KafkaProducer<>(KafkaProducerConfig.getProducerProperties());
  20.     }
  21.     @Override
  22.     protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
  23.         String topic = "your_topic";
  24.         StringBuilder stringBuilder = new StringBuilder();
  25.         String line;
  26.         while ((line = req.getReader().readLine()) != null) {
  27.             stringBuilder.append(line);
  28.         }
  29.         String message = stringBuilder.toString();
  30.         ProducerRecord<String, String> record = new ProducerRecord<>(topic, message);
  31.         producer.send(record, new Callback() {
  32.             @Override
  33.             public void onCompletion(RecordMetadata metadata, Exception exception) {
  34.                 if (exception != null) {
  35.                     System.err.println("Error sending message: " + exception.getMessage());
  36.                 } else {
  37.                     System.out.println("Message sent successfully to topic " + metadata.topic() +
  38.                                        " partition " + metadata.partition() +
  39.                                        " with offset " + metadata.offset());
  40.                 }
  41.             }
  42.         });
  43.         resp.setContentType("text/plain");
  44.         resp.getWriter().write("Message sent to Kafka");
  45.     }
  46.     @Override
  47.     public void destroy() {
  48.         super.destroy();
  49.         if (producer != null) {
  50.             producer.close();
  51.         }
  52.     }
  53. }
复制代码
4. 设置 web.xml

如果你不使用注解,可以在 web.xml 中设置 Servlet:
  1. <web-app xmlns="http://xmlns.jcp.org/xml/ns/javaee"
  2.          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3.          xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee
  4.                              http://xmlns.jcp.org/xml/ns/javaee/web-app_4_0.xsd"
  5.          version="4.0">
  6.     <servlet>
  7.         <servlet-name>KafkaProducerServlet</servlet-name>
  8.         <servlet-class>com.example.KafkaProducerServlet</servlet-class>
  9.     </servlet>
  10.     <servlet-mapping>
  11.         <servlet-name>KafkaProducerServlet</servlet-name>
  12.         <url-pattern>/api/send</url-pattern>
  13.     </servlet-mapping>
  14. </web-app>
复制代码
5. 部署和运行

将你的应用打包成一个 WAR 文件并部署到支持 Servlet 的服务器(如 Apache Tomcat)。然后,你可以通过 POST 请求将 JSON 消息发送到 http://localhost:8080/your-app/api/send。
例如,使用 curl 来测试:
  1. bashcurl -X POST http://localhost:8080/your-app/api/send -H "Content-Type: application/json" -d '{"key": "value"}'
复制代码
这样,你就完成了将 Kafka 生产者代码改成一个独立的应用服务,通过 POST 请求吸收 JSON 报文并发送到 Kafka 的使命。

上面的用java的1.8springboot实现
要使用 Java 1.8 和 Spring Boot 实现一个独立的应用服务,通过 POST 请求吸收 JSON 报文并将其发送到 Kafka,你可以按照以下步骤举行。
1. 创建 Spring Boot 项目

可以使用 Spring Initializr 来创建一个新的 Spring Boot 项目。选择以下依赖:


  • Spring Web
  • Spring Kafka
2. 添加 Maven 依赖

在 pom.xml 文件中添加 Kafka 和 Spring Boot 相干的依赖:
  1. <dependencies>
  2.     <!-- Spring Boot Starter Web -->
  3.     <dependency>
  4.         <groupId>org.springframework.boot</groupId>
  5.         <artifactId>spring-boot-starter-web</artifactId>
  6.     </dependency>
  7.    
  8.     <!-- Spring Boot Starter Kafka -->
  9.     <dependency>
  10.         <groupId>org.springframework.kafka</groupId>
  11.         <artifactId>spring-kafka</artifactId>
  12.     </dependency>
  13.     <!-- JSON处理 -->
  14.     <dependency>
  15.         <groupId>com.google.code.gson</groupId>
  16.         <artifactId>gson</artifactId>
  17.         <version>2.10.1</version>
  18.     </dependency>
  19. </dependencies>
复制代码
3. 设置 Kafka 生产者

在 application.properties 文件中设置 Kafka 生产者属性:
  1. spring.kafka.producer.bootstrap-servers=10.1.1.1:9000,4.4.4.4:8888
  2. spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
  3. spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
  4. spring.kafka.producer.enable-idempotence=true
  5. spring.kafka.producer.acks=all
  6. spring.kafka.producer.transaction-id-prefix=my-unique-transactional-id
  7. spring.kafka.producer.properties.security.protocol=SASL_PLAINTEXT
  8. spring.kafka.producer.properties.sasl.mechanism=SCRAM-SHA-256
  9. spring.kafka.producer.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="ee" password="eres";
  10. spring.kafka.producer.properties.sasl.kerberos.service.name=target
复制代码
4. 创建 Kafka 生产者服务

创建一个 KafkaProducerService 类来处理惩罚消息发送:
  1. package com.example.kafkaproducer.service;
  2. import org.springframework.kafka.core.KafkaTemplate;
  3. import org.springframework.kafka.core.ProducerFactory;
  4. import org.springframework.kafka.core.KafkaTemplate;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.kafka.annotation.KafkaListener;
  7. import org.springframework.kafka.core.KafkaTemplate;
  8. import org.springframework.stereotype.Service;
  9. @Service
  10. public class KafkaProducerService {
  11.     private final KafkaTemplate<String, String> kafkaTemplate;
  12.     @Autowired
  13.     public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {
  14.         this.kafkaTemplate = kafkaTemplate;
  15.     }
  16.     public void sendMessage(String topic, String message) {
  17.         kafkaTemplate.executeInTransaction(kafkaTemplate -> {
  18.             kafkaTemplate.send(topic, message).addCallback(new ListenableFutureCallback<>() {
  19.                 @Override
  20.                 public void onSuccess(RecordMetadata result) {
  21.                     System.out.println("Message sent successfully to topic " + result.topic() +
  22.                                        " partition " + result.partition() +
  23.                                        " with offset " + result.offset());
  24.                 }
  25.                 @Override
  26.                 public void onFailure(Throwable ex) {
  27.                     System.err.println("Error sending message: " + ex.getMessage());
  28.                 }
  29.             });
  30.             return null;
  31.         });
  32.     }
  33. }
复制代码
5. 创建 REST Controller

创建一个 REST 控制器来处理惩罚 POST 请求:
  1. package com.example.kafkaproducer.controller;
  2. import com.example.kafkaproducer.service.KafkaProducerService;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.web.bind.annotation.*;
  5. @RestController
  6. @RequestMapping("/api")
  7. public class KafkaController {
  8.     private final KafkaProducerService kafkaProducerService;
  9.     @Autowired
  10.     public KafkaController(KafkaProducerService kafkaProducerService) {
  11.         this.kafkaProducerService = kafkaProducerService;
  12.     }
  13.     @PostMapping("/send")
  14.     public String sendMessage(@RequestBody String message) {
  15.         String topic = "your_topic";
  16.         kafkaProducerService.sendMessage(topic, message);
  17.         return "Message sent to Kafka";
  18.     }
  19. }
复制代码
6. 启动 Spring Boot 应用

在你的 Application 类中启动 Spring Boot 应用:
  1. package com.example.kafkaproducer;
  2. import org.springframework.boot.SpringApplication;
  3. import org.springframework.boot.autoconfigure.SpringBootApplication;
  4. @SpringBootApplication
  5. public class KafkaProducerApplication {
  6.     public static void main(String[] args) {
  7.         SpringApplication.run(KafkaProducerApplication.class, args);
  8.     }
  9. }
复制代码
7. 运行应用并测试

启动 Spring Boot 应用后,你可以通过 POST 请求将 JSON 消息发送到 http://localhost:8080/api/send,例如使用 curl:
  1. package com.example.kafkaproducer;
  2. import org.springframework.boot.SpringApplication;
  3. import org.springframework.boot.autoconfigure.SpringBootApplication;
  4. @SpringBootApplication
  5. public class KafkaProducerApplication {
  6.     public static void main(String[] args) {
  7.         SpringApplication.run(KafkaProducerApplication.class, args);
  8.     }
  9. }
复制代码
额外提示


  • 非常处理惩罚: 在实际应用中,你大概必要添加更完善的非常处理惩罚和日志记录。
  • 安全性: 确保你的应用具有恰当的安全设置,尤其是对外暴露的 API。
这样,你就完成了使用 Java 1.8 和 Spring Boot 实现 Kafka 生产者服务的使命,通过 POST 请求吸收 JSON 报文并将其发送到 Kafka。

public void onSuccess(RecordMetadata result) { 我这个继承的参数不是RecordMetadata result

如果你的 onSuccess 方法的参数不是 RecordMetadata result,那么你大概在使用 ListenableFutureCallback 接口的实现时遇到了标题。在 Kafka 的生产者 API 中,ListenableFutureCallback 接口通常用于处理惩罚 send 方法的回调。
请确保你正确地实现了 ListenableFutureCallback 接口。以下是一个完整的示例,展示了怎样实现 Kafka 生产者的回调处理惩罚,包括 onSuccess 和 onFailure 方法。
完整示例:KafkaProducerService 类

在这个示例中,我们将使用 ListenableFutureCallback 处理惩罚 Kafka 生产的结果。
  1. package com.example.kafkaproducer.service;
  2. import org.apache.kafka.clients.producer.KafkaProducer;
  3. import org.apache.kafka.clients.producer.ProducerRecord;
  4. import org.apache.kafka.clients.producer.RecordMetadata;
  5. import org.apache.kafka.clients.producer.Callback;
  6. import org.apache.kafka.clients.producer.KafkaException;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.kafka.core.KafkaTemplate;
  9. import org.springframework.kafka.core.ProducerFactory;
  10. import org.springframework.kafka.core.KafkaTemplate;
  11. import org.springframework.stereotype.Service;
  12. import org.springframework.util.concurrent.ListenableFutureCallback;
  13. @Service
  14. public class KafkaProducerService {
  15.     private final KafkaTemplate<String, String> kafkaTemplate;
  16.     @Autowired
  17.     public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {
  18.         this.kafkaTemplate = kafkaTemplate;
  19.     }
  20.     public void sendMessage(String topic, String message) {
  21.         kafkaTemplate.executeInTransaction(kafkaTemplate -> {
  22.             kafkaTemplate.send(topic, message).addCallback(new ListenableFutureCallback<>() {
  23.                 @Override
  24.                 public void onSuccess(RecordMetadata result) {
  25.                     System.out.println("Message sent successfully to topic " + result.topic() +
  26.                                        " partition " + result.partition() +
  27.                                        " with offset " + result.offset());
  28.                 }
  29.                 @Override
  30.                 public void onFailure(Throwable ex) {
  31.                     System.err.println("Error sending message: " + ex.getMessage());
  32.                 }
  33.             });
  34.             return null;
  35.         });
  36.     }
  37. }
复制代码
关键点


  • KafkaTemplate 的回调: 使用 kafkaTemplate.send().addCallback 方法可以处理惩罚消息发送的结果。addCallback 方法接受一个 ListenableFutureCallback 对象来处理惩罚乐成和失败的回调。
  • 实现 ListenableFutureCallback: ListenableFutureCallback 是 Spring 提供的接口,用于处理惩罚异步操作的乐成和失败结果。你必要实现 onSuccess 和 onFailure 方法。
  • ProducerRecord 和 RecordMetadata: ProducerRecord 用于构建消息记录,RecordMetadata 用于吸收发送消息后的元数据。
使用 Callback 接口的示例

如果你直接使用 Kafka 的 Callback 接口而不是 Spring 的 ListenableFutureCallback,示例如下:
  1. import org.apache.kafka.clients.producer.Callback;
  2. import org.apache.kafka.clients.producer.RecordMetadata;
  3. public class KafkaProducerCallback implements Callback {
  4.     @Override
  5.     public void onCompletion(RecordMetadata metadata, Exception exception) {
  6.         if (exception != null) {
  7.             System.err.println("Error sending message: " + exception.getMessage());
  8.         } else {
  9.             System.out.println("Message sent successfully to topic " + metadata.topic() +
  10.                                " partition " + metadata.partition() +
  11.                                " with offset " + metadata.offset());
  12.         }
  13.     }
  14. }
复制代码
然后在 KafkaProducerService 中使用这个 Callback 类:
  1. public void sendMessage(String topic, String message) {
  2.     kafkaTemplate.executeInTransaction(kafkaTemplate -> {
  3.         kafkaTemplate.send(topic, message).addCallback(new KafkaProducerCallback());
  4.         return null;
  5.     });
  6. }
复制代码
这样,你就可以确保 Kafka 消息发送的结果被正确地处理惩罚,无论是乐成还是失败。

public void onSuccess(RecordMetadata result) { 我这个继承的参数shi Object o怎么回事
如果你在实现 Kafka 生产者的回调时遇到了 onSuccess 方法的参数类型为 Object o 的标题,这大概是由于使用了差别的回调接口或设置。
在 Kafka 中,send 方法的回调接口通常是 Callback,其 onCompletion 方法的签名如下:
void onCompletion(RecordMetadata metadata, Exception exception);
如果你的实现中 onSuccess 方法的参数类型是 Object,大概你在使用的是与 Kafka 原生 API 不匹配的回调接口或在自定义实现时出现了标题。
确保使用正确的回调接口

以下是怎样正确实现 Kafka 的 Callback 接口,并处理惩罚 onSuccess 和 onFailure 方法:
1. Kafka Callback 接口的正确实现

  1. import org.apache.kafka.clients.producer.Callback;
  2. import org.apache.kafka.clients.producer.RecordMetadata;
  3. import org.apache.kafka.clients.producer.ProducerRecord;
  4. public class KafkaProducerCallback implements Callback {
  5.     @Override
  6.     public void onCompletion(RecordMetadata metadata, Exception exception) {
  7.         if (exception != null) {
  8.             System.err.println("Error sending message: " + exception.getMessage());
  9.         } else {
  10.             System.out.println("Message sent successfully to topic " + metadata.topic() +
  11.                                " partition " + metadata.partition() +
  12.                                " with offset " + metadata.offset());
  13.         }
  14.     }
  15. }
复制代码
2. 使用 Kafka Callback 的 send 方法

在 Kafka 生产者服务中使用上述回调:
  1. import org.apache.kafka.clients.producer.KafkaProducer;
  2. import org.apache.kafka.clients.producer.ProducerRecord;
  3. import org.apache.kafka.clients.producer.ProducerConfig;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.kafka.core.KafkaTemplate;
  6. import org.springframework.stereotype.Service;
  7. @Service
  8. public class KafkaProducerService {
  9.     private final KafkaTemplate<String, String> kafkaTemplate;
  10.     @Autowired
  11.     public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {
  12.         this.kafkaTemplate = kafkaTemplate;
  13.     }
  14.     public void sendMessage(String topic, String message) {
  15.         kafkaTemplate.executeInTransaction(kafkaTemplate -> {
  16.             kafkaTemplate.send(topic, message).addCallback(new KafkaProducerCallback());
  17.             return null;
  18.         });
  19.     }
  20. }
复制代码
自定义实现中的 Object o 标题

如果你看到 onSuccess(Object o),这大概是你在自定义回调或在其他上下文中使用了差别的回调接口。请查抄以下几点:

  • 查抄回调接口类型:确保你的回调接口实现的是 Kafka 的 Callback 接口,而不是其他的接口或自定义接口。
  • 回调方法的签名:确保 onSuccess 方法的签名与 Callback 接口的要求匹配。如果你的代码中 onSuccess 方法接受 Object 类型的参数,大概必要更改为 RecordMetadata 类型。
  • 代码示例:确保在代码中,send 方法的回调处理惩罚与 Kafka 的回调接口划一。
结论

确保你使用的回调接口正确且实现了 Kafka 的 Callback 接口,以便 onCompletion 方法可以正确处理惩罚消息的发送结果。如果你遇到差别类型的回调接口,大概必要查抄代码的其他部分或依赖项以确保划一

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

灌篮少年

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表