Spring Boot教程之五十七:在 Apache Kafka 上发布 JSON 消息

打印 上一主题 下一主题

主题 795|帖子 795|积分 2385

Spring Boot | 如何在 Apache Kafka 上发布 JSON 消息


Apache Kafka是一个发布-订阅消息系统。消息队列允许您在进程、应用程序和服务器之间发送消息。在本文中,我们将相识如何在 Spring Boot 应用程序中向 Apache Kafka 发送 JSON 消息。

为了相识如何创建 Spring Boot 项目,请参阅本文。

JSON 的全称是 JavaScript Object Notation。JSON 是一种用于数据互换的轻量级数据格式,人类可以轻松读取和编写,机器也可以轻松剖析和生成。虽然它源自 JavaScript 的一个子集,但它与语言无关。它是一种完全独立于语言的文本格式。可以按照以下步骤将 JSON 消息发布到 Apache Kafka:


  • 转到spring initializr并创建具有以下依靠项的启动项目:   

    • Spring Web
    • Spring for Apache Kafka

  • 在 IDE 中打开项目并同步依靠项。在本文中,我们将创建一个门生模型,我们将在此中发布门生详细信息。因此,创建一个模型类Student。添加数据成员并创建构造函数并创建 getter 和 setter。以下是门生类的实现:
// Java program to implement a

// student class

  

// Creating a student class

public class Student {

  

    // Data members of the

    // student class

    int id;

    String firstName;

    String lastName;

  

    // Constructor of the student

    // class

    public Student(int id, String firstName,

                   String lastName)

    {

        this.id = id;

        this.firstName = firstName;

        this.lastName = lastName;

    }

  

    // Implementing the getters

    // and setters

    public int getId()

    {

        return id;

    }

  

    public void setId(int id)

    {

        this.id = id;

    }

  

    public String getFirstName()

    {

        return firstName;

    }

  

    public void setFirstName(String firstName)

    {

        this.firstName = firstName;

    }

  

    public String getLastName()

    {

        return lastName;

    }

  

    public void setLastName(String lastName)

    {

        this.lastName = lastName;

    }

}


 3. 现在,创建一个带有解释@RestController的新类Controller。创建一个GET API并使用参数作为字符串和模型类对象初始化KafkaTemplate。以下是控制器的实现:

// Java program to implement a

// controller

  

@RestController

@RequestMapping("gfg")

public class UserResource {

  

    @Autowired

    private KafkaTemplate<String, Student>

        kafkaTemplate;

  

    private static final String TOPIC

        = "StudentExample";

  

    @GetMapping("/publish/{id}/"

                + "{firstName}/{lastName}")

  

    public String post(

        @PathVariable("id") final int id,

        @PathVariable("firstName") final

            String firstName,

        @PathVariable("lastName") final

            String lastName)

    {

  

        kafkaTemplate.send(

            TOPIC,

            new Student(

                id, firstName,

                lastName));

  

        return "ublished successfully";

    }

}


4. 创建一个带有解释@Configuration的StudentConfig类。在这个类中,我们将序列化模型类的对象。

// Java program to serialize the

// object of the model class

  

@Configuration

public class StudentConfig {

  

    @Bean

    public ProducerFactory<String, Student>

    producerFactory()

    {

        // Create a map of a string

        // and object

        Map<String, Object> config

            = new HashMap<>();

  

        config.put(

            ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,

            "127.0.0.1:9092");

  

        config.put(

            ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,

            StringSerializer.class);

  

        config.put(

            ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,

            JsonSerializer.class);

  

        return new DefaultKafkaProducerFactory<>(config);

    }

  

    @Bean

    public KafkaTemplate<String, Student>

    kafkaTemplate()

    {

        return new KafkaTemplate<>(

            producerFactory());

    }

}


5. 现在,启动 zookeeper 和 Kafka 服务器。我们需要创建一个名为StudentExample 的新主题。为此,打开一个新的下令提示符窗口并将目录更改为 Kafka 文件夹。

6. 现在,使用以下下令创建一个新主题:

对于 Mac 和 Linux:bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic topic_name

对于 Windows: .\bin\windows\kafka-topics.bat –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic topic_name

7.现在要实时查看 Kafka 服务器上的消息,请使用以下下令:

对于 Mac 和 Linux:bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic topic_name –from-beginning

对于 Windows: .\bin\windows\kafka-console-consumer.bat –bootstrap-server localhost:9092 –topic topic_name –from-beginning

8.运行应用程序并调用 API 如下:

localhost:8080/gfg/publish/{id}/{first name}/{last name}

注意:如果使用了差别的端口,则将端口替换为 8080。

输出:



  • 调用 API:

  • 实时查看消息:





免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

忿忿的泥巴坨

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表