ToB企服应用市场:ToB评测及商务社交产业平台

标题: Strimzi Kafka Bridge(桥接)实战之三:自制sdk(golang版本) [打印本页]

作者: 悠扬随风    时间: 2023-10-16 04:42
标题: Strimzi Kafka Bridge(桥接)实战之三:自制sdk(golang版本)
欢迎访问我的GitHub

这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos
本篇概览

为什么是golang版本

环境信息


下载OpenApi的配置文件

下载swagger工具

用swagger工具生成客户端sdk代码

  1. java -jar swagger-codegen-cli-2.4.9.jar generate \
  2. -i ./openapiv2.json \
  3. -l go \
  4. -o swagger
复制代码
  1. ➜  001 tree swagger
  2. swagger
  3. ├── README.md
  4. ├── api
  5. │   └── swagger.yaml
  6. ├── api_consumers.go
  7. ├── api_default.go
  8. ├── api_producer.go
  9. ├── api_seek.go
  10. ├── api_topics.go
  11. ├── client.go
  12. ├── configuration.go
  13. ├── docs
  14. │   ├── AssignedTopicPartitions.md
  15. │   ├── BridgeInfo.md
  16. │   ├── Consumer.md
  17. │   ├── ConsumerRecord.md
  18. │   ├── ConsumerRecordList.md
  19. │   ├── ConsumersApi.md
  20. │   ├── CreatedConsumer.md
  21. │   ├── DefaultApi.md
  22. │   ├── KafkaHeader.md
  23. │   ├── KafkaHeaderList.md
  24. │   ├── ModelError.md
  25. │   ├── OffsetCommitSeek.md
  26. │   ├── OffsetCommitSeekList.md
  27. │   ├── OffsetRecordSent.md
  28. │   ├── OffsetRecordSentList.md
  29. │   ├── OffsetsSummary.md
  30. │   ├── Partition.md
  31. │   ├── PartitionMetadata.md
  32. │   ├── Partitions.md
  33. │   ├── ProducerApi.md
  34. │   ├── ProducerRecord.md
  35. │   ├── ProducerRecordList.md
  36. │   ├── ProducerRecordToPartition.md
  37. │   ├── ProducerRecordToPartitionList.md
  38. │   ├── Replica.md
  39. │   ├── SeekApi.md
  40. │   ├── SubscribedTopicList.md
  41. │   ├── TopicMetadata.md
  42. │   ├── Topics.md
  43. │   └── TopicsApi.md
  44. ├── git_push.sh
  45. ├── model_assigned_topic_partitions.go
  46. ├── model_bridge_info.go
  47. ├── model_consumer.go
  48. ├── model_consumer_record.go
  49. ├── model_consumer_record_list.go
  50. ├── model_created_consumer.go
  51. ├── model_error.go
  52. ├── model_kafka_header.go
  53. ├── model_kafka_header_list.go
  54. ├── model_offset_commit_seek.go
  55. ├── model_offset_commit_seek_list.go
  56. ├── model_offset_record_sent.go
  57. ├── model_offset_record_sent_list.go
  58. ├── model_offsets_summary.go
  59. ├── model_partition.go
  60. ├── model_partition_metadata.go
  61. ├── model_partitions.go
  62. ├── model_producer_record.go
  63. ├── model_producer_record_list.go
  64. ├── model_producer_record_to_partition.go
  65. ├── model_producer_record_to_partition_list.go
  66. ├── model_replica.go
  67. ├── model_subscribed_topic_list.go
  68. ├── model_topic_metadata.go
  69. ├── model_topics.go
  70. └── response.go
  71. 2 directories, 66 files
复制代码
创建一个golang的demo程序,使用刚刚生成的客户端sdk代码

  1. go mod init sdkdemo
复制代码
  1. go get golang.org/x/oauth2
  2. go get github.com/antihax/optional
复制代码
修复有问题的sdk源码,第一个问题

第二个问题

第三个问题

  1. func (c *APIClient) decode(v interface{}, b []byte, contentType string) (err error) {
  2.         if strings.Contains(contentType, "application/xml") {
  3.                 if err = xml.Unmarshal(b, v); err != nil {
  4.                         return err
  5.                 }
  6.                 return nil
  7.         } else if strings.Contains(contentType, "application/json") {
  8.                 if err = json.Unmarshal(b, v); err != nil {
  9.                         return err
  10.                 }
  11.                 return nil
  12.         }
  13.         return errors.New("undefined response type")
  14. }
复制代码
  1. func (c *APIClient) decode(v interface{}, b []byte, contentType string) (err error) {
  2.         if strings.Contains(contentType, "application/xml") {
  3.                 if err = xml.Unmarshal(b, v); err != nil {
  4.                         return err
  5.                 }
  6.                 return nil
  7.         } else if strings.Contains(contentType, "application/json") ||
  8.                 strings.Contains(contentType, "application/vnd.kafka.v2+json") ||
  9.                 strings.Contains(contentType, "application/vnd.kafka.json.v2+json") {
  10.                 if err = json.Unmarshal(b, v); err != nil {
  11.                         return err
  12.                 }
  13.                 return nil
  14.         }
  15.         return errors.New("undefined response type")
  16. }
复制代码
第四个问题

  1. // Set request body from an interface{}
  2. func setBody(body interface{}, contentType string) (bodyBuf *bytes.Buffer, err error) {
  3.         if bodyBuf == nil {
  4.                 bodyBuf = &bytes.Buffer{}
  5.         }
  6.         if reader, ok := body.(io.Reader); ok {
  7.                 _, err = bodyBuf.ReadFrom(reader)
  8.         } else if b, ok := body.([]byte); ok {
  9.                 _, err = bodyBuf.Write(b)
  10.         } else if s, ok := body.(string); ok {
  11.                 _, err = bodyBuf.WriteString(s)
  12.         } else if s, ok := body.(*string); ok {
  13.                 _, err = bodyBuf.WriteString(*s)
  14.         } else if jsonCheck.MatchString(contentType) {
  15.                 err = json.NewEncoder(bodyBuf).Encode(body)
  16.         } else if xmlCheck.MatchString(contentType) {
  17.                 xml.NewEncoder(bodyBuf).Encode(body)
  18.         }
  19.         if err != nil {
  20.                 return nil, err
  21.         }
  22.         if bodyBuf.Len() == 0 {
  23.                 err = fmt.Errorf("Invalid body type %s\n", contentType)
  24.                 return nil, err
  25.         }
  26.         return bodyBuf, nil
  27. }
复制代码
第五个问题

第六个问题

  1. package swagger
  2. type ConsumerRecordList struct {
  3. }
复制代码
  1. package swagger
  2. type ConsumerRecordList []ConsumerRecord
复制代码
第七个问题

  1. package swagger
  2. type ProducerRecord struct {
  3.         Partition int32 `json:"partition,omitempty"`
  4.         Headers *KafkaHeaderList `json:"headers,omitempty"`
  5. }
复制代码
  1. package swagger
  2. type ProducerRecord struct {
  3.         Partition int32 `json:"partition,omitempty"`
  4.         Value string `json:"value"`
  5.         Key string `json:"key,omitempty"`
  6.         Headers *KafkaHeaderList `json:"headers,omitempty"`
  7. }
复制代码
第八个问题

编写代码验证功能:查看topic列表

  1. // 测试用的topic
  2. const TEST_TOPIC = "bridge-quickstart-topic"
  3. const TEST_GROUP = "client-sdk-group"
  4. const CONSUMER_NAME = "client-sdk-consumer-002"
  5. // strimzi bridge地址
  6. const BASE_PATH = "http://127.0.0.1:31331"
  7. var client *swagger.APIClient
  8. func init() {
  9.         configuration := swagger.NewConfiguration()
  10.         configuration.BasePath = BASE_PATH
  11.         client = swagger.NewAPIClient(configuration)
  12. }
复制代码
  1. func getAllTopics() ([]string, error) {
  2.         array, response, err := client.TopicsApi.ListTopics(context.Background())
  3.         if err != nil {
  4.                 log.Printf("getAllTopics err: %v\n", err)
  5.                 return nil, err
  6.         }
  7.         log.Printf("response: %v", response)
  8.         return array, nil
  9. }
复制代码
  1. func main() {
  2.         topics, err := getAllTopics()
  3.         if err != nil {
  4.                 return
  5.         }
  6.         fmt.Printf("topics: %v\n", topics)
  7. }
复制代码
  1. 2022/12/18 21:26:33 response: &{200 OK 200 HTTP/1.1 1 1 map[Content-Length:[109] Content-Type:[application/vnd.kafka.v2+json]] 0x140000e0300 109 [] false false map[] 0x14000118100 <nil>}
  2. topics: [__strimzi_store_topic bridge-quickstart-topic __strimzi-topic-operator-kstreams-topic-store-changelog]
  3. Process finished with the exit code 0
复制代码
编写代码验证功能:发送消息

  1. // 发送消息(异步模式,不会收到offset返回)
  2. func sendAsync(info string) error {
  3.         log.Print("send [" + info + "]")
  4.         _, response, err := client.ProducerApi.Send(context.Background(),
  5.                 TEST_TOPIC,
  6.                 swagger.ProducerRecordList{
  7.                         Records: []swagger.ProducerRecord{
  8.                                 {Value: "message from go swagger SDK"},
  9.                         },
  10.                 },
  11.                 &swagger.SendOpts{Async: optional.NewBool(true)},
  12.         )
  13.         if err != nil {
  14.                 log.Printf("send err: %v\n", err)
  15.                 return err
  16.         }
  17.         log.Printf("response: %v", response.StatusCode)
  18.         return nil
  19. }
复制代码
  1. func main() {
  2.         for i := 0; i < 10; i++ {
  3.                 sendAsync("message from go client " + strconv.Itoa(i))
  4.         }
  5. }
复制代码
  1. /private/var/folders/5v/p3bj9bzx2nd99y5l21nb1c080000gn/T/GoLand/___go_build_sdkdemo
  2. 2022/12/18 21:35:47 send [message from go client 0]
  3. 2022/12/18 21:35:47 response: 204
  4. 2022/12/18 21:35:47 send [message from go client 1]
  5. 2022/12/18 21:35:47 response: 204
  6. 2022/12/18 21:35:47 send [message from go client 2]
  7. 2022/12/18 21:35:47 response: 204
  8. 2022/12/18 21:35:47 send [message from go client 3]
  9. 2022/12/18 21:35:47 response: 204
  10. 2022/12/18 21:35:47 send [message from go client 4]
  11. 2022/12/18 21:35:47 response: 204
  12. 2022/12/18 21:35:47 send [message from go client 5]
  13. 2022/12/18 21:35:47 response: 204
  14. 2022/12/18 21:35:47 send [message from go client 6]
  15. 2022/12/18 21:35:47 response: 204
  16. 2022/12/18 21:35:47 send [message from go client 7]
  17. 2022/12/18 21:35:47 response: 204
  18. 2022/12/18 21:35:47 send [message from go client 8]
  19. 2022/12/18 21:35:47 response: 204
  20. 2022/12/18 21:35:47 send [message from go client 9]
  21. 2022/12/18 21:35:47 response: 204
  22. Process finished with the exit code 0
复制代码
编写代码验证功能:创建consumer

  1. // 取出swagger特有的error类型,从中提取中有效的错误信息
  2. func getErrorMessage(err error) string {
  3.         e := err.(swagger.GenericSwaggerError)
  4.         return string(e.Body())
  5. }
  6. func getBodyStr(body io.ReadCloser) string {
  7.         buf := new(bytes.Buffer)
  8.         buf.ReadFrom(body)
  9.         return buf.String()
  10. }
复制代码
  1. // 创建consumer
  2. func CreateConsumer(group string, consumerName string) (*swagger.CreatedConsumer, error) {
  3.         consumer, response, err := client.ConsumersApi.CreateConsumer(context.Background(),
  4.                 group,
  5.                 swagger.Consumer{
  6.                         Name:                     consumerName,
  7.                         AutoOffsetReset:          "latest",
  8.                         FetchMinBytes:            16,
  9.                         ConsumerRequestTimeoutMs: 300 * 1000,
  10.                         EnableAutoCommit:         false,
  11.                         Format:                   "json",
  12.                 })
  13.         if err != nil {
  14.                 log.Printf("CreateConsumer error : %v", getErrorMessage(err))
  15.                 return nil, err
  16.         }
  17.         log.Printf("CreateConsumer response : %v, body [%v]", response, getBodyStr(response.Body))
  18.         log.Printf("consumer : %v", consumer)
  19.         return &consumer, nil
  20. }
复制代码
  1. func main() {
  2.         // 创建consumer
  3.         CreateConsumer(TEST_GROUP, CONSUMER_NAME)
  4. }
复制代码
编写代码验证功能:订阅

  1. // 订阅
  2. func Subsciribe(topic string, consumerGroup string, consumerName string) error {
  3.         response, err := client.ConsumersApi.Subscribe(context.Background(),
  4.                 swagger.Topics{Topics: []string{topic}},
  5.                 consumerGroup,
  6.                 consumerName,
  7.         )
  8.         if err != nil {
  9.                 log.Printf("Subscribe error : %v", err)
  10.                 return err
  11.         }
  12.         log.Printf("Subscribe response : %v", response)
  13.         return nil
  14. }
复制代码
  1. func main() {
  2.         err := Subsciribe(TEST_TOPIC, TEST_GROUP, CONSUMER_NAME)
  3.         if err != nil {
  4.                 fmt.Printf("err : %v\n", err)
  5.         }
  6. }
复制代码
编写代码验证功能:拉取消息

  1. // 拉取消息
  2. func Poll(consumerGroup string, consumerName string) error {
  3.         // ctx context.Context, groupid string, name string, localVarOptionals *PollOpts
  4.         recordList, response, err := client.ConsumersApi.Poll(context.Background(), consumerGroup, consumerName, nil)
  5.         if err != nil {
  6.                 log.Printf("Poll error : %v", err)
  7.                 return err
  8.         }
  9.         log.Printf("Poll response : %v", response)
  10.         fmt.Printf("recordList: %v\n", recordList)
  11.         return nil
  12. }
复制代码
  1. func main() {
  2.         Poll(TEST_GROUP, CONSUMER_NAME)
  3. }
复制代码
  1. /private/var/folders/5v/p3bj9bzx2nd99y5l21nb1c080000gn/T/GoLand/___go_build_sdkdemo
  2. 2022/12/18 21:43:16 Poll response : &{200 OK 200 HTTP/1.1 1 1 map[Content-Length:[2301] Content-Type:[application/vnd.kafka.json.v2+json]] 0x140000e0340 2301 [] false false map[] 0x1400011a100 <nil>}
  3. recordList: [{ 163468 0 bridge-quickstart-topic message from go swagger SDK <nil>} { 163469 0 bridge-quickstart-topic message from go swagger SDK <nil>} { 163470 0 bridge-quickstart-topic message from go swagger SDK <nil>} { 163471 0 bridge-quickstart-topic message from go swagger SDK <nil>} { 163472 0 bridge-quickstart-topic message from go swagger SDK <nil>} { 163473 0 bridge-quickstart-topic message from go swagger SDK <nil>} { 162246 2 bridge-quickstart-topic message from go swagger SDK <nil>} { 162247 2 bridge-quickstart-topic message from go swagger SDK <nil>} { 162248 2 bridge-quickstart-topic message from go swagger SDK <nil>} { 162249 2 bridge-quickstart-topic message from go swagger SDK <nil>} { 162250 2 bridge-quickstart-topic message from go swagger SDK <nil>} { 163669 1 bridge-quickstart-topic message from go swagger SDK <nil>} { 163670 1 bridge-quickstart-topic message from go swagger SDK <nil>} { 163671 1 bridge-quickstart-topic message from go swagger SDK <nil>} { 163672 1 bridge-quickstart-topic message from go swagger SDK <nil>} { 163146 3 bridge-quickstart-topic message from go swagger SDK <nil>} { 163147 3 bridge-quickstart-topic message from go swagger SDK <nil>} { 163148 3 bridge-quickstart-topic message from go swagger SDK <nil>} { 163149 3 bridge-quickstart-topic message from go swagger SDK <nil>} { 163150 3 bridge-quickstart-topic message from go swagger SDK <nil>}]
  4. Process finished with the exit code 0
复制代码
编写代码验证功能:提交offset

  1. // 提交offset
  2. func Offset(consumerGroup string, consumerName string) error {
  3.         response, err := client.ConsumersApi.Commit(context.Background(),
  4.                 consumerGroup,
  5.                 consumerName, nil)
  6.         if err != nil {
  7.                 log.Printf("Poll error : %v", err)
  8.                 return err
  9.         }
  10.         log.Printf("Offset response : %v", response)
  11.         return nil
  12. }
复制代码
  1. func main() {
  2.         err := Offset(TEST_GROUP, CONSUMER_NAME)
  3.         if err != nil {
  4.                 print(err)
  5.         }
  6. }
复制代码
  1. /private/var/folders/5v/p3bj9bzx2nd99y5l21nb1c080000gn/T/GoLand/___go_build_sdkdemo
  2. 2022/12/18 22:07:38 Offset response : &{204 No Content 204 HTTP/1.1 1 1 map[] {} 0 [] false false map[] 0x1400011a100 <nil>}
  3. Process finished with the exit code 0
复制代码
java的问题

有收获吗?

欢迎关注博客园:程序员欣宸

学习路上,你不孤单,欣宸原创一路相伴...

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4