忿忿的泥巴坨 发表于 前天 01:02

Spring Boot教程之五十七:在 Apache Kafka 上发布 JSON 消息

Spring Boot | 如何在 Apache Kafka 上发布 JSON 消息


Apache Kafka是一个发布-订阅消息系统。消息队列允许您在进程、应用程序和服务器之间发送消息。在本文中,我们将相识如何在 Spring Boot 应用程序中向 Apache Kafka 发送 JSON 消息。
为了相识如何创建 Spring Boot 项目,请参阅本文。
JSON 的全称是 JavaScript Object Notation。JSON 是一种用于数据互换的轻量级数据格式,人类可以轻松读取和编写,机器也可以轻松剖析和生成。虽然它源自 JavaScript 的一个子集,但它与语言无关。它是一种完全独立于语言的文本格式。可以按照以下步骤将 JSON 消息发布到 Apache Kafka:

[*]转到spring initializr并创建具有以下依靠项的启动项目:   

[*]Spring Web
[*]Spring for Apache Kafka

[*]在 IDE 中打开项目并同步依靠项。在本文中,我们将创建一个门生模型,我们将在此中发布门生详细信息。因此,创建一个模型类Student。添加数据成员并创建构造函数并创建 getter 和 setter。以下是门生类的实现:
// Java program to implement a
// student class
  
// Creating a student class
public class Student {
  
    // Data members of the
    // student class
    int id;
    String firstName;
    String lastName;
  
    // Constructor of the student
    // class
    public Student(int id, String firstName,
                   String lastName)
    {
        this.id = id;
        this.firstName = firstName;
        this.lastName = lastName;
    }
  
    // Implementing the getters
    // and setters
    public int getId()
    {
        return id;
    }
  
    public void setId(int id)
    {
        this.id = id;
    }
  
    public String getFirstName()
    {
        return firstName;
    }
  
    public void setFirstName(String firstName)
    {
        this.firstName = firstName;
    }
  
    public String getLastName()
    {
        return lastName;
    }
  
    public void setLastName(String lastName)
    {
        this.lastName = lastName;
    }
}

 3. 现在,创建一个带有解释@RestController的新类Controller。创建一个GET API并使用参数作为字符串和模型类对象初始化KafkaTemplate。以下是控制器的实现:
// Java program to implement a
// controller
  
@RestController
@RequestMapping("gfg")
public class UserResource {
  
    @Autowired
    private KafkaTemplate<String, Student>
        kafkaTemplate;
  
    private static final String TOPIC
        = "StudentExample";
  
    @GetMapping("/publish/{id}/"
                + "{firstName}/{lastName}")
  
    public String post(
        @PathVariable("id") final int id,
        @PathVariable("firstName") final
            String firstName,
        @PathVariable("lastName") final
            String lastName)
    {
  
        kafkaTemplate.send(
            TOPIC,
            new Student(
                id, firstName,
                lastName));
  
        return "Published successfully";
    }
}

4. 创建一个带有解释@Configuration的StudentConfig类。在这个类中,我们将序列化模型类的对象。
// Java program to serialize the
// object of the model class
  
@Configuration
public class StudentConfig {
  
    @Bean
    public ProducerFactory<String, Student>
    producerFactory()
    {
        // Create a map of a string
        // and object
        Map<String, Object> config
            = new HashMap<>();
  
        config.put(
            ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
            "127.0.0.1:9092");
  
        config.put(
            ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            StringSerializer.class);
  
        config.put(
            ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            JsonSerializer.class);
  
        return new DefaultKafkaProducerFactory<>(config);
    }
  
    @Bean
    public KafkaTemplate<String, Student>
    kafkaTemplate()
    {
        return new KafkaTemplate<>(
            producerFactory());
    }
}

5. 现在,启动 zookeeper 和 Kafka 服务器。我们需要创建一个名为StudentExample 的新主题。为此,打开一个新的下令提示符窗口并将目录更改为 Kafka 文件夹。
6. 现在,使用以下下令创建一个新主题:
对于 Mac 和 Linux:bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic topic_name
对于 Windows: .\bin\windows\kafka-topics.bat –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic topic_name
7.现在要实时查看 Kafka 服务器上的消息,请使用以下下令:
对于 Mac 和 Linux:bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic topic_name –from-beginning
对于 Windows: .\bin\windows\kafka-console-consumer.bat –bootstrap-server localhost:9092 –topic topic_name –from-beginning
8.运行应用程序并调用 API 如下:
localhost:8080/gfg/publish/{id}/{first name}/{last name}
注意:如果使用了差别的端口,则将端口替换为 8080。
输出:


[*]调用 API:https://i-blog.csdnimg.cn/direct/62aff622c370485781109f6b0f120e7c.png
[*]实时查看消息:https://i-blog.csdnimg.cn/direct/c526b74450114af3b8ed857722e001b5.png




免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: Spring Boot教程之五十七:在 Apache Kafka 上发布 JSON 消息