之前已经有两篇文章介绍过 Go如何操作 kafka
28.windows安装kafka,Go操作kafka示例(sarama库)
51.Go操作kafka示例(kafka-go库)
Apache kafka简介
Apache Kafka是一个开源分布式事件流平台,用于高性能数据管道、流式分析、数据集成和关键使命应用步伐。它提供了诸如流数据管道、实时分析、数据集成、事件源和日志聚合等多种本领。
Kafka的用例和本领
- 流数据管道: Kafka提供了一个分布式发布-订阅消息系统,可以在系统或应用步伐之间流式传输数据。它提供了具有数据复制和容错本领的强大队列。
- 实时分析:Kafka允许利用工具如Kafka Streams和KSQL处理实时数据流,用于构建流式分析和数据处理应用步伐。
- 数据集成 :Kafka可以用来通过在差别数据源和格式之间流式传输数据来集成差别的系统。这使它对流式ETL非常有用。
- 事件源 : Kafka提供了可以重放的事件时间日志,用于重构应用步伐状态,适用于事件源和CQRS模式。
- 日志聚合 : Kafka通常用于将差别服务器和应用步伐的日志聚合到一个中心存储库中。这允许同一访问日志数据。
为什么将Golang与Apache Kafka团结利用
将Golang这一高效并发的编程语言与Apache Kafka这一分布式事件流平台团结起来,提供了一个在构建尖端现代应用步伐方面表现出色的强大技术栈。这两种技术之间的协同作用源自几个关键优势:
- 性能 : Golang和Apache Kafka都提供高性能。Golang快速、高效和轻量级。Kafka为速率而构建,具有高吞吐量和低延迟。它们一起可以处理苛刻的工作负载。
- 可扩展性 : Golang的goroutines和Kafka的分区允许应用步伐程度扩展以处理大量数据。Kafka可以轻松扩展生产者和消费者。
- 并发性 : Golang通过goroutines和channels提供了出色的并发编程本领。Kafka并发通报消息并支持并行性。
- 可用性 : Kafka的分布式架构使其高度可用和容错。Golang应用可以利用这一点来构建弹性系统。
- 互操作性 : Kafka有多种语言的客户端,允许Golang应用与多语言环境互动。Kafka还利用二进制TCP协议以提高服从。
- 现代设计 : Kafka和Golang都接纳现代设计理念,使它们非常恰当云原生和微服务架构。
- 开发人员体验 : Kafka的客户端库团结Goroutines、channels和接口,使其易于利用。
Kafka和Golang将性能、可扩展性和并发与生产力团结在一起 - 使它们成为构建可扩展的服务、管道和流应用步伐的绝佳选择。
开始利用Apache Kafka
在开始利用Golang和Apache Kafka之前,我们必须确保golang和Kafka已经安装并在我们的机器上运行。
安装Kafka
28.windows安装kafka,Go操作kafka示例(sarama库)
Apache Kafka的Golang包
您可以利用go get安装confluent-kafka-go包:
- go get -u github.com/confluentinc/confluent-kafka-go/kafka
复制代码 安装后,您可以在Go代码中导入并利用confluent-kafka-go。
- package main
- import (
- "fmt"
- "github.com/confluentinc/confluent-kafka-go/kafka"
- )
- func main() {
- p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092"})
- if err != nil {
- fmt.Printf("创建生产者失败: %s\n", err)
- return
- }
- // 生产消息到主题,处理交付报告等。
- // 使用后记得关闭生产者
- defer p.Close()
- }
复制代码 构建生产者
Kafka生产者是Apache Kafka生态系统中的一个关键组成部分,作为一个客户端应用步伐,负责向Kafka集群发布(写入)事件。这一部分提供了关于Kafka生产者的全面概述,以及针对调整其行为的配置设置的初步探究。
下面是一个Golang应用步伐的示例,它生产数据并将其发布到Kafka的具体topic。它还阐明了如何在Golang中为Kafka消息序列化数据,并演示了如何处理错误和重试。
- package main
- import (
- "fmt"
- "github.com/confluentinc/confluent-kafka-go/kafka"
- )
- const (
- kafkaBroker = "localhost:9092"
- topic = "test-topic"
- )
- type Message
- struct {
- Key string `json:"key"`
- Value string `json:"value"`
- }
- func main() {
- // 创建一个新的Kafka生产者
- p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": kafkaBroker})
- if err != nil {
- fmt.Printf("创建生产者失败: %s\n", err)
- return
- }
- defer p.Close()
- // 定义要发送的消息
- message := Message{
- Key: "example_key",
- Value: "Hello, Kafka!",
- }
- // 序列化消息
- serializedMessage, err := serializeMessage(message)
- if err != nil {
- fmt.Printf("消息序列化失败: %s\n", err)
- return
- }
- // 将消息生产到Kafka主题
- err = produceMessage(p, topic, serializedMessage)
- if err != nil {
- fmt.Printf("消息生产失败: %s\n", err)
- return
- }
- fmt.Println("消息成功生产!")
- }
- func serializeMessage(message Message) ([]byte, error) {
- // 将消息结构体序列化为JSON
- serialized, err := json.Marshal(message)
- if err != nil {
- return nil, fmt.Errorf("消息序列化失败: %w", err)
- }
- return serialized, nil
- }
- func produceMessage(p *kafka.Producer, topic string, message []byte) error {
- // 创建一个新的要生产的Kafka消息
- kafkaMessage := &kafka.Message{
- TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
- Value: message,
- }
- // 生产Kafka消息
- deliveryChan := make(chan kafka.Event)
- err := p.Produce(kafkaMessage, deliveryChan)
- if err != nil {
- return fmt.Errorf("消息生产失败: %w", err)
- }
- // 等待交付报告或错误
- e := <-deliveryChan
- m := e.(*kafka.Message)
- // 检查交付错误,即生成者方确保发送到Broker的消息不丢失
- // 但可能重复发送,如实际发成功了,但是Broker的ACK返回给生产者时出现了网络Error
- // 从而重试后,导致消息重复发送,这时候需要下游做好幂等处理
- if m.TopicPartition.Error != nil {
- return fmt.Errorf("交付失败: %s", m.TopicPartition.Error)
- }
- // 关闭交付频道
- close(deliveryChan)
- return nil
- }
复制代码 步骤表明:
- 创建一个Kafka生产者。
- 利用json.Marshal函数将自定义消息布局体(Message)序列化为JSON。
- 利用生产者将序列化的消息生产到Kafka topic。
- 利用交付陈诉和错误查抄处理错误和重试。
确保将localhost:9092更换为您的Kafka署理地址,将test-topic更换为所需的主题名称。别的,您大概需要处理更复杂的错误场景并根据您的具体需求实现重试逻辑。
构建消费者
Kafka消费者就像小型事件处理器,它们获取并消化数据流。它们订阅主题并消费任何新到达的消息,处理每一个消息。我们将探究这些消费者的内部工作原理和调整其性能的配置。
下面是一个Golang应用步伐的示例,它从Kafka主题消费消息。如下代码包括了如何处理消费到的消息的阐明,以及对差别消费模式(如单个消费者和消费者组)的讨论。
- package main
- import (
- "fmt"
- "os"
- "os/signal"
- "github.com/confluentinc/confluent-kafka-go/kafka"
- )
- const (
- kafkaBroker = "localhost:9092"
- topic = "test-topic"
- groupID = "test-group"
- )
- func main() {
- // 创建一个新的Kafka消费者
- c, err := kafka.NewConsumer(&kafka.ConfigMap{
- "bootstrap.servers": kafkaBroker,
- "group.id": groupID, // 消费者组标识
- "auto.offset.reset": "earliest", // 从头开始消费
- })
- if err != nil {
- fmt.Printf("创建消费者失败: %s\n", err)
- return
- }
- defer c.Close()
- // 订阅Kafka主题
- err = c.SubscribeTopics([]string{topic}, nil)
- if err != nil {
- fmt.Printf("订阅主题失败: %s\n", err)
- return
- }
- // 设置一个通道来处理操作系统信号,以便优雅地关闭
- sigchan := make(chan os.Signal, 1)
- signal.Notify(sigchan, os.Interrupt)
- // 开始消费消息
- run := true
- for run == true {
- select {
- case sig := <-sigchan:
- fmt.Printf("接收到信号 %v: 正在终止\n", sig)
- run = false
- default:
- // 轮询Kafka消息,1次最多拉取100条消息
- ev := c.Poll(100)
- if ev == nil {
- continue
- }
- switch e := ev.(type) {
- case *kafka.Message:
- // 处理消费的消息
- fmt.Printf("从主题 %s 收到消息: %s\n", *e.TopicPartition.Topic, string(e.Value))
- case kafka.Error:
- // 处理Kafka错误
- fmt.Printf("错误: %v\n", e)
- }
- }
- }
- }
复制代码 步骤表明:
- 创建一个Kafka消费者。
- 订阅一个Kafka主题。
- 设置一个通道来处理操作系统信号(如SIGINT)以优雅地关闭。
- 开始从订阅的Topic消费消息。
- 处理消费的消息以及Kafka错误。
差别的消费模式:
- 单个消费者:在这种模式下,单个消费者实例从主题的一个或多个分区读取消息。当您只需要一个消费者应用步伐实例来处理来自Topic的所有消息时,这很有用。
- 消费者组:消费者组允许您通过将消息处理分布到多个消费者实例来扩展消费,以实现扩展。每个消费者组可以有多个消费者,组内的每个消费者从一部分分区读取消息。这使得消息的并行处理成为大概,提供了容错本领和高吞吐量。
在提供的示例中,group.id配置设置用于指定消费者组ID。这允许消费者应用步伐的多个实例在消费者组中一起工作,从Kafka Topic消费消息。
总结
总之,Apache Kafka作为构建实时数据管道和流应用步伐的强大办理方案,得益于其分布式、可扩展和容错的架构。当与Golang团结时,它形成了一个在性能、可扩展性和并发方面表现出色的强大技术栈,非常恰当现代应用步伐。通过利用Kafka的功能和Golang的优势,开发人员可以构建出具有弹性和高性能的服务、管道和流应用步伐,这些应用步伐可以轻松扩展以满足当今数据驱动世界的需求。无论是处理实时分析、集成差别的系统还是聚合日志,Kafka和Golang提供了一个强势组合,使开发人员能够轻松构建创新和可扩展的办理方案。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |