ToB企服应用市场:ToB评测及商务社交产业平台

标题: 142. Go操作Kafka(confluent-kafka-go库) [打印本页]

作者: 兜兜零元    时间: 2024-11-18 08:55
标题: 142. Go操作Kafka(confluent-kafka-go库)
之前已经有两篇文章介绍过 Go如何操作 kafka
28.windows安装kafka,Go操作kafka示例(sarama库)
51.Go操作kafka示例(kafka-go库)
Apache kafka简介


Apache Kafka是一个开源分布式事件流平台,用于高性能数据管道、流式分析、数据集成和关键使命应用步伐。它提供了诸如流数据管道、实时分析、数据集成、事件源和日志聚合等多种本领。
Kafka的用例和本领

为什么将Golang与Apache Kafka团结利用
将Golang这一高效并发的编程语言与Apache Kafka这一分布式事件流平台团结起来,提供了一个在构建尖端现代应用步伐方面表现出色的强大技术栈。这两种技术之间的协同作用源自几个关键优势:

Kafka和Golang将性能、可扩展性和并发与生产力团结在一起 - 使它们成为构建可扩展的服务、管道和流应用步伐的绝佳选择。
开始利用Apache Kafka

在开始利用Golang和Apache Kafka之前,我们必须确保golang和Kafka已经安装并在我们的机器上运行。
安装Kafka
28.windows安装kafka,Go操作kafka示例(sarama库)
Apache Kafka的Golang包
您可以利用go get安装confluent-kafka-go包:
  1. go get -u github.com/confluentinc/confluent-kafka-go/kafka
复制代码
安装后,您可以在Go代码中导入并利用confluent-kafka-go。
  1. package main
  2. import (
  3.     "fmt"
  4.     "github.com/confluentinc/confluent-kafka-go/kafka"
  5. )
  6. func main() {
  7.     p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092"})
  8.     if err != nil {
  9.         fmt.Printf("创建生产者失败: %s\n", err)
  10.         return
  11.     }
  12.     // 生产消息到主题,处理交付报告等。
  13.     // 使用后记得关闭生产者
  14.     defer p.Close()
  15. }
复制代码
构建生产者

Kafka生产者是Apache Kafka生态系统中的一个关键组成部分,作为一个客户端应用步伐,负责向Kafka集群发布(写入)事件。这一部分提供了关于Kafka生产者的全面概述,以及针对调整其行为的配置设置的初步探究。
下面是一个Golang应用步伐的示例,它生产数据并将其发布到Kafka的具体topic。它还阐明了如何在Golang中为Kafka消息序列化数据,并演示了如何处理错误和重试。
  1. package main
  2. import (
  3.     "fmt"
  4.     "github.com/confluentinc/confluent-kafka-go/kafka"
  5. )
  6. const (
  7.     kafkaBroker = "localhost:9092"
  8.     topic       = "test-topic"
  9. )
  10. type Message
  11. struct {
  12.     Key   string `json:"key"`
  13.     Value string `json:"value"`
  14. }
  15. func main() {
  16.     // 创建一个新的Kafka生产者
  17.     p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": kafkaBroker})
  18.     if err != nil {
  19.         fmt.Printf("创建生产者失败: %s\n", err)
  20.         return
  21.     }
  22.     defer p.Close()
  23.     // 定义要发送的消息
  24.     message := Message{
  25.         Key:   "example_key",
  26.         Value: "Hello, Kafka!",
  27.     }
  28.     // 序列化消息
  29.     serializedMessage, err := serializeMessage(message)
  30.     if err != nil {
  31.         fmt.Printf("消息序列化失败: %s\n", err)
  32.         return
  33.     }
  34.     // 将消息生产到Kafka主题
  35.     err = produceMessage(p, topic, serializedMessage)
  36.     if err != nil {
  37.         fmt.Printf("消息生产失败: %s\n", err)
  38.         return
  39.     }
  40.     fmt.Println("消息成功生产!")
  41. }
  42. func serializeMessage(message Message) ([]byte, error) {
  43.     // 将消息结构体序列化为JSON
  44.     serialized, err := json.Marshal(message)
  45.     if err != nil {
  46.         return nil, fmt.Errorf("消息序列化失败: %w", err)
  47.     }
  48.     return serialized, nil
  49. }
  50. func produceMessage(p *kafka.Producer, topic string, message []byte) error {
  51.     // 创建一个新的要生产的Kafka消息
  52.     kafkaMessage := &kafka.Message{
  53.         TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
  54.         Value:          message,
  55.     }
  56.     // 生产Kafka消息
  57.     deliveryChan := make(chan kafka.Event)
  58.     err := p.Produce(kafkaMessage, deliveryChan)
  59.     if err != nil {
  60.         return fmt.Errorf("消息生产失败: %w", err)
  61.     }
  62.     // 等待交付报告或错误
  63.     e := <-deliveryChan
  64.     m := e.(*kafka.Message)
  65.     // 检查交付错误,即生成者方确保发送到Broker的消息不丢失
  66.     // 但可能重复发送,如实际发成功了,但是Broker的ACK返回给生产者时出现了网络Error
  67.     // 从而重试后,导致消息重复发送,这时候需要下游做好幂等处理
  68.     if m.TopicPartition.Error != nil {
  69.         return fmt.Errorf("交付失败: %s", m.TopicPartition.Error)
  70.     }
  71.     // 关闭交付频道
  72.     close(deliveryChan)
  73.     return nil
  74. }
复制代码
步骤表明:
确保将localhost:9092更换为您的Kafka署理地址,将test-topic更换为所需的主题名称。别的,您大概需要处理更复杂的错误场景并根据您的具体需求实现重试逻辑。
构建消费者

Kafka消费者就像小型事件处理器,它们获取并消化数据流。它们订阅主题并消费任何新到达的消息,处理每一个消息。我们将探究这些消费者的内部工作原理和调整其性能的配置。
下面是一个Golang应用步伐的示例,它从Kafka主题消费消息。如下代码包括了如何处理消费到的消息的阐明,以及对差别消费模式(如单个消费者和消费者组)的讨论。
  1. package main
  2. import (
  3.     "fmt"
  4.     "os"
  5.     "os/signal"
  6.     "github.com/confluentinc/confluent-kafka-go/kafka"
  7. )
  8. const (
  9.     kafkaBroker = "localhost:9092"
  10.     topic       = "test-topic"
  11.     groupID     = "test-group"
  12. )
  13. func main() {
  14.     // 创建一个新的Kafka消费者
  15.     c, err := kafka.NewConsumer(&kafka.ConfigMap{
  16.         "bootstrap.servers":  kafkaBroker,
  17.         "group.id":           groupID, // 消费者组标识
  18.         "auto.offset.reset":  "earliest", // 从头开始消费
  19.     })
  20.     if err != nil {
  21.         fmt.Printf("创建消费者失败: %s\n", err)
  22.         return
  23.     }
  24.     defer c.Close()
  25.     // 订阅Kafka主题
  26.     err = c.SubscribeTopics([]string{topic}, nil)
  27.     if err != nil {
  28.         fmt.Printf("订阅主题失败: %s\n", err)
  29.         return
  30.     }
  31.     // 设置一个通道来处理操作系统信号,以便优雅地关闭
  32.     sigchan := make(chan os.Signal, 1)
  33.     signal.Notify(sigchan, os.Interrupt)
  34.     // 开始消费消息
  35.     run := true
  36.     for run == true {
  37.         select {
  38.         case sig := <-sigchan:
  39.             fmt.Printf("接收到信号 %v: 正在终止\n", sig)
  40.             run = false
  41.         default:
  42.             // 轮询Kafka消息,1次最多拉取100条消息
  43.             ev := c.Poll(100)
  44.             if ev == nil {
  45.                 continue
  46.             }
  47.             switch e := ev.(type) {
  48.             case *kafka.Message:
  49.                 // 处理消费的消息
  50.                 fmt.Printf("从主题 %s 收到消息: %s\n", *e.TopicPartition.Topic, string(e.Value))
  51.             case kafka.Error:
  52.                 // 处理Kafka错误
  53.                 fmt.Printf("错误: %v\n", e)
  54.             }
  55.         }
  56.     }
  57. }
复制代码
步骤表明
差别的消费模式:

在提供的示例中,group.id配置设置用于指定消费者组ID。这允许消费者应用步伐的多个实例在消费者组中一起工作,从Kafka Topic消费消息。
总结

总之,Apache Kafka作为构建实时数据管道和流应用步伐的强大办理方案,得益于其分布式、可扩展和容错的架构。当与Golang团结时,它形成了一个在性能、可扩展性和并发方面表现出色的强大技术栈,非常恰当现代应用步伐。通过利用Kafka的功能和Golang的优势,开发人员可以构建出具有弹性和高性能的服务、管道和流应用步伐,这些应用步伐可以轻松扩展以满足当今数据驱动世界的需求。无论是处理实时分析、集成差别的系统还是聚合日志,Kafka和Golang提供了一个强势组合,使开发人员能够轻松构建创新和可扩展的办理方案。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4