kafka之protobuf

打印 上一主题 下一主题

主题 865|帖子 865|积分 2595

Protobuf 的 .proto 文件是一种形貌消息结构的定义文件,使用这种文件可以定义数据结构(消息),然后生成对应语言的类或代码用于序列化和反序列化数据。生成 .proto 文件涉及到编写 .proto 文件定义,然后通过 protoc 编译器生成目标语言的代码(如 Java、Python、Go 等)。
生成 .proto 文件的步调

1. 编写 .proto 文件

首先,手动编写 .proto 文件来定义消息的结构。每个 .proto 文件定义了消息类型、字段以及字段的类型和编号。
例如,下面的 .proto 文件定义了一个图片消息的结构,包罗文件名、格式和二进制数据:
  1. // image.proto
  2. syntax = "proto3";
  3. message ImageRecord {
  4.   // 文件名
  5.   string filename = 1;
  6.   // 文件格式
  7.   string format = 2;
  8.   // 二进制数据
  9.   bytes imageData = 3;
  10. }
复制代码
2. 使用 protoc 编译 .proto 文件

protoc 是 Google 的 Protocol Buffers 编译器,负责将 .proto 文件编译成对应编程语言的类文件。这些类文件用于序列化和反序列化数据。
2.1. 安装 protoc



  • 下载并安装 protoc:

    • Linux/macOS:使用包管理器安装

  1. # macOS
  2. brew install protobuf
  3. # Ubuntu
  4. sudo apt-get install -y protobuf-compiler
复制代码
Windows:从 官方下载页面 获取并安装。

win 解压上述zip包:

执行的文件就在这里:

为了方便使用可以把这个bin目录设置在系统情况变量里,也可以直接进入到这个文件夹里
如果设置情况变量的话,安装完毕之后验证:
验证安装:
  1. protoc --version
复制代码
2.2. 编译 .proto 文件

通过 protoc 命令来编译 .proto 文件为目标语言代码。下面是几种常见语言的生成方式。
2.2.1. 生成 Java 代码

  1. # 将 image.proto 编译为 Java 类,生成到指定目录
  2. protoc --java_out=./output image.proto
复制代码
上述的指令直接在win的cmd命令行里即可完成,记得提前建好 output目录


执行完毕之后就会生成一个 .java文件

编译后,会在 ./output 目录下生成相应的 Java 类(如 ImageRecord.java),你可以直接使用这些类举行 Protobuf 的序列化和反序列化。
3. 使用生成的类

编译生成的类会包罗以下功能:


  • 序列化:将定义的消息对象转换为二进制格式,得当传输或存储。
  • 反序列化:将二进制格式的数据解析回消息对象。
例如,使用生成的 Java 类序列化和反序列化 ImageRecord:
  1. import com.example.proto.ImageRecord;  // 假设包名为 com.example.proto
  2. import java.nio.file.Files;
  3. import java.io.File;
  4. public class ProtobufExample {
  5.     public static void main(String[] args) throws Exception {
  6.         // 构建 ImageRecord 消息对象
  7.         ImageRecord image = ImageRecord.newBuilder()
  8.             .setFilename("example.jpg")
  9.             .setFormat("jpg")
  10.             .setImageData(ByteString.copyFrom(Files.readAllBytes(new File("example.jpg").toPath())))
  11.             .build();
  12.         // 序列化为二进制数据
  13.         byte[] serializedData = image.toByteArray();
  14.         // 反序列化为 ImageRecord 对象
  15.         ImageRecord deserializedImage = ImageRecord.parseFrom(serializedData);
  16.         System.out.println("Filename: " + deserializedImage.getFilename());
  17.     }
  18. }
复制代码
4. 定义 .proto 文件的规则

以下是 .proto 文件的常见语法:


  • syntax:定义 Protobuf 版本,保举使用 proto3,较为简洁并且是最新的标准。
  1. syntax = "proto3";
复制代码



  • 消息(Message)定义:使用 message 关键字定义数据结构。
  1. message ImageRecord {
  2.     string filename = 1;  // string 类型字段,字段编号为 1
  3.     string format = 2;    // string 类型字段,字段编号为 2
  4.     bytes imageData = 3;  // 二进制数据,字段编号为 3
  5. }
复制代码
字段类型:常见的 Protobuf 字段类型包罗:


  • int32, int64: 整数
  • float, double: 浮点数
  • bool: 布尔值
  • string: 字符串
  • bytes: 二进制数据(如文件、图片、视频)
字段编号:每个字段必须有唯一的编号,编号用于序列化和反序列化。编号必须是正整数,1 到 15 的编号用于最常用的字段,因为它们序列化时占用更少的空间
嵌套消息:可以在消息中定义嵌套的消息类型
  1. message User {
  2.   string username = 1;
  3.   Profile profile = 2; // 嵌套消息类型
  4.   message Profile {
  5.       string email = 1;
  6.       int32 age = 2;
  7.   }
  8. }
复制代码
总结


  • 编写 .proto 文件:定义消息结构,包罗字段类型、名称和编号。
  • 使用 protoc 编译:将 .proto 文件编译为目标语言代码,如 Java、Python、Go 等。
  • 使用生成的类:使用生成的类举行消息的序列化(转换为二进制格式)和反序列化(解析二进制数据)。
kafka和protobuf集成例子:

要将 Protobuf 与 Kafka 集成,我们可以使用 Protobuf 定义的数据结构作为 Kafka 消息体,并通过 Kafka Producer 将序列化的 Protobuf 消息发送到 Kafka。在消耗者端,通过 Kafka Consumer 接收消息并反序列化为原始的 Protobuf 对象。
步调:


  • 编写 .proto 文件:定义消息的结构。
  • 使用 protoc 编译生成类:使用 Protobuf 编译器将 .proto 文件编译为 Java/Python 等语言的类。
  • Kafka Producer 发送 Protobuf 消息:使用生成的类,构造 Protobuf 消息并通过 Kafka Producer 发送。
  • Kafka Consumer 接收并反序列化 Protobuf 消息:在 Kafka Consumer 中接收消息,并反序列化为 Protobuf 对象。
1. 编写 Protobuf .proto 文件

例如,定义一个包罗图片信息的 ImageRecord.proto 文件:
  1. syntax = "proto3";
  2. message ImageRecord {  string filename = 1;  string format = 2;  bytes imageData = 3;}
复制代码
2. 使用 protoc 编译生成 Java 类

假设使用 Java,将 .proto 文件编译为 Java 类:
  1. protoc --java_out=./output ImageRecord.proto
复制代码
3. Kafka Producer 发送 Protobuf 消息

通过 Kafka Producer 发送 Protobuf 格式的消息:
  1. import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
  2. import java.util.Properties
  3. import java.nio.file.Files
  4. import java.io.File
  5. import com.example.proto.ImageRecord
  6. object ProtobufKafkaProducer {
  7.     def main(args: Array[String]): Unit = {
  8.         // Kafka Producer 配置
  9.         val props = new Properties()
  10.         props.put("bootstrap.servers", "localhost:9092")
  11.         props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
  12.         props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer")
  13.         val producer = new KafkaProducer[String, Array[Byte]](props)
  14.         // 构建 Protobuf 消息
  15.         val imageBytes = Files.readAllBytes(new File("/path/to/image.jpg").toPath)
  16.         val imageRecord = ImageRecord.newBuilder()
  17.             .setFilename("image.jpg")
  18.             .setFormat("jpg")
  19.             .setImageData(com.google.protobuf.ByteString.copyFrom(imageBytes))
  20.             .build()
  21.         // 序列化并发送 Protobuf 消息到 Kafka
  22.         val record = new ProducerRecord[String, Array[Byte]]("image_topic", "image_key", imageRecord.toByteArray)
  23.         producer.send(record)
  24.         producer.close()
  25.     }
  26. }
复制代码
4. Kafka Consumer 接收并反序列化 Protobuf 消息

通过 Kafka Consumer 接收 Protobuf 消息并反序列化:
  1. import org.apache.kafka.clients.consumer.{KafkaConsumer, ConsumerRecords}
  2. import java.util.Properties
  3. import com.example.proto.ImageRecord
  4. object ProtobufKafkaConsumer {
  5.     def main(args: Array[String]): Unit = {
  6.         // Kafka Consumer 配置
  7.         val props = new Properties()
  8.         props.put("bootstrap.servers", "localhost:9092")
  9.         props.put("group.id", "test")
  10.         props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
  11.         props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer")
  12.         props.put("auto.offset.reset", "earliest")
  13.         val consumer = new KafkaConsumer[String, Array[Byte]](props)
  14.         consumer.subscribe(java.util.Arrays.asList("image_topic"))
  15.         // 消费并反序列化消息
  16.         while (true) {
  17.             val records: ConsumerRecords[String, Array[Byte]] = consumer.poll(100)
  18.             records.forEach { record =>
  19.                 // 反序列化 Protobuf 消息
  20.                 val imageRecord = ImageRecord.parseFrom(record.value())
  21.                 println(s"Filename: ${imageRecord.getFilename}, Format: ${imageRecord.getFormat}")
  22.             }
  23.         }
  24.     }
  25. }
复制代码
Kafka 与 Protobuf 集成的优势:


  • 高效序列化:Protobuf 生成的二进制格式非常紧凑,得当大数据量和高吞吐场景。
  • 跨语言支持:Protobuf 支持多种语言,因此 Kafka 与 Protobuf 的集成能轻松跨多语言系统工作。
  • Schema 支持:通过 Protobuf,数据结构的厘革可以通过 .proto 文件的模式演进举行管理。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

自由的羽毛

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表