Spring Boot教程之五十七:在 Apache Kafka 上发布 JSON 消息
Spring Boot | 如何在 Apache Kafka 上发布 JSON 消息Apache Kafka是一个发布-订阅消息系统。消息队列允许您在进程、应用程序和服务器之间发送消息。在本文中,我们将相识如何在 Spring Boot 应用程序中向 Apache Kafka 发送 JSON 消息。
为了相识如何创建 Spring Boot 项目,请参阅本文。
JSON 的全称是 JavaScript Object Notation。JSON 是一种用于数据互换的轻量级数据格式,人类可以轻松读取和编写,机器也可以轻松剖析和生成。虽然它源自 JavaScript 的一个子集,但它与语言无关。它是一种完全独立于语言的文本格式。可以按照以下步骤将 JSON 消息发布到 Apache Kafka:
[*]转到spring initializr并创建具有以下依靠项的启动项目:
[*]Spring Web
[*]Spring for Apache Kafka
[*]在 IDE 中打开项目并同步依靠项。在本文中,我们将创建一个门生模型,我们将在此中发布门生详细信息。因此,创建一个模型类Student。添加数据成员并创建构造函数并创建 getter 和 setter。以下是门生类的实现:
// Java program to implement a
// student class
// Creating a student class
public class Student {
// Data members of the
// student class
int id;
String firstName;
String lastName;
// Constructor of the student
// class
public Student(int id, String firstName,
String lastName)
{
this.id = id;
this.firstName = firstName;
this.lastName = lastName;
}
// Implementing the getters
// and setters
public int getId()
{
return id;
}
public void setId(int id)
{
this.id = id;
}
public String getFirstName()
{
return firstName;
}
public void setFirstName(String firstName)
{
this.firstName = firstName;
}
public String getLastName()
{
return lastName;
}
public void setLastName(String lastName)
{
this.lastName = lastName;
}
}
3. 现在,创建一个带有解释@RestController的新类Controller。创建一个GET API并使用参数作为字符串和模型类对象初始化KafkaTemplate。以下是控制器的实现:
// Java program to implement a
// controller
@RestController
@RequestMapping("gfg")
public class UserResource {
@Autowired
private KafkaTemplate<String, Student>
kafkaTemplate;
private static final String TOPIC
= "StudentExample";
@GetMapping("/publish/{id}/"
+ "{firstName}/{lastName}")
public String post(
@PathVariable("id") final int id,
@PathVariable("firstName") final
String firstName,
@PathVariable("lastName") final
String lastName)
{
kafkaTemplate.send(
TOPIC,
new Student(
id, firstName,
lastName));
return "Published successfully";
}
}
4. 创建一个带有解释@Configuration的StudentConfig类。在这个类中,我们将序列化模型类的对象。
// Java program to serialize the
// object of the model class
@Configuration
public class StudentConfig {
@Bean
public ProducerFactory<String, Student>
producerFactory()
{
// Create a map of a string
// and object
Map<String, Object> config
= new HashMap<>();
config.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"127.0.0.1:9092");
config.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
config.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, Student>
kafkaTemplate()
{
return new KafkaTemplate<>(
producerFactory());
}
}
5. 现在,启动 zookeeper 和 Kafka 服务器。我们需要创建一个名为StudentExample 的新主题。为此,打开一个新的下令提示符窗口并将目录更改为 Kafka 文件夹。
6. 现在,使用以下下令创建一个新主题:
对于 Mac 和 Linux:bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic topic_name
对于 Windows: .\bin\windows\kafka-topics.bat –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic topic_name
7.现在要实时查看 Kafka 服务器上的消息,请使用以下下令:
对于 Mac 和 Linux:bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic topic_name –from-beginning
对于 Windows: .\bin\windows\kafka-console-consumer.bat –bootstrap-server localhost:9092 –topic topic_name –from-beginning
8.运行应用程序并调用 API 如下:
localhost:8080/gfg/publish/{id}/{first name}/{last name}
注意:如果使用了差别的端口,则将端口替换为 8080。
输出:
[*]调用 API:https://i-blog.csdnimg.cn/direct/62aff622c370485781109f6b0f120e7c.png
[*]实时查看消息:https://i-blog.csdnimg.cn/direct/c526b74450114af3b8ed857722e001b5.png
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页:
[1]