ToB企服应用市场:ToB评测及商务社交产业平台

标题: Strimzi Kafka Bridge(桥接)实战之二:生产和发送消息 [打印本页]

作者: 缠丝猫    时间: 2023-10-8 07:11
标题: Strimzi Kafka Bridge(桥接)实战之二:生产和发送消息
欢迎访问我的GitHub

这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos
本篇概览


准备工作:创建topic

  1. kubectl -n aabbcc \
  2. run kafka-producer \
  3. -ti \
  4. --image=quay.io/strimzi/kafka:0.32.0-kafka-3.3.1 \
  5. --rm=true \
  6. --restart=Never \
  7. -- bin/kafka-topics.sh \
  8. --bootstrap-server my-cluster-kafka-bootstrap:9092 \
  9. --create \
  10. --topic bridge-quickstart-topic \
  11. --partitions 4 \
  12. --replication-factor 1
复制代码
  1. kubectl -n aabbcc \
  2. run kafka-producer \
  3. -ti \
  4. --image=quay.io/strimzi/kafka:0.32.0-kafka-3.3.1 \
  5. --rm=true \
  6. --restart=Never \
  7. -- bin/kafka-topics.sh \
  8. --bootstrap-server my-cluster-kafka-bootstrap:9092 \
  9. --describe \
  10. --topic bridge-quickstart-topic
复制代码
查看指定topic的详情

  1. curl -X GET \
  2.   http://192.168.0.1:31331/topics/bridge-quickstart-topic
复制代码
  1. {
  2.         "name": "bridge-quickstart-topic",
  3.         "configs": {
  4.                 "compression.type": "producer",
  5.                 "leader.replication.throttled.replicas": "",
  6.                 "message.downconversion.enable": "true",
  7.                 "min.insync.replicas": "1",
  8.                 "segment.jitter.ms": "0",
  9.                 "cleanup.policy": "delete",
  10.                 "flush.ms": "9223372036854775807",
  11.                 "follower.replication.throttled.replicas": "",
  12.                 "segment.bytes": "1073741824",
  13.                 "retention.ms": "604800000",
  14.                 "flush.messages": "9223372036854775807",
  15.                 "message.format.version": "3.0-IV1",
  16.                 "max.compaction.lag.ms": "9223372036854775807",
  17.                 "file.delete.delay.ms": "60000",
  18.                 "max.message.bytes": "1048588",
  19.                 "min.compaction.lag.ms": "0",
  20.                 "message.timestamp.type": "CreateTime",
  21.                 "preallocate": "false",
  22.                 "min.cleanable.dirty.ratio": "0.5",
  23.                 "index.interval.bytes": "4096",
  24.                 "unclean.leader.election.enable": "false",
  25.                 "retention.bytes": "-1",
  26.                 "delete.retention.ms": "86400000",
  27.                 "segment.ms": "604800000",
  28.                 "message.timestamp.difference.max.ms": "9223372036854775807",
  29.                 "segment.index.bytes": "10485760"
  30.         },
  31.         "partitions": [
  32.                 {
  33.                         "partition": 0,
  34.                         "leader": 0,
  35.                         "replicas": [
  36.                                 {
  37.                                         "broker": 0,
  38.                                         "leader": true,
  39.                                         "in_sync": true
  40.                                 }
  41.                         ]
  42.                 },
  43.                 {
  44.                         "partition": 1,
  45.                         "leader": 0,
  46.                         "replicas": [
  47.                                 {
  48.                                         "broker": 0,
  49.                                         "leader": true,
  50.                                         "in_sync": true
  51.                                 }
  52.                         ]
  53.                 },
  54.                 {
  55.                         "partition": 2,
  56.                         "leader": 0,
  57.                         "replicas": [
  58.                                 {
  59.                                         "broker": 0,
  60.                                         "leader": true,
  61.                                         "in_sync": true
  62.                                 }
  63.                         ]
  64.                 },
  65.                 {
  66.                         "partition": 3,
  67.                         "leader": 0,
  68.                         "replicas": [
  69.                                 {
  70.                                         "broker": 0,
  71.                                         "leader": true,
  72.                                         "in_sync": true
  73.                                 }
  74.                         ]
  75.                 }
  76.         ]
  77. }
复制代码
批量生产消息(同步)

  1. curl -X POST \
  2.   http://42.193.162.141:31331/topics/bridge-quickstart-topic \
  3.   -H 'content-type: application/vnd.kafka.json.v2+json' \
  4.   -d '{
  5.     "records": [
  6.         {
  7.             "key": "my-key",
  8.             "value": "sales-lead-0001"
  9.         },
  10.         {
  11.             "value": "sales-lead-0002",
  12.             "partition": 2
  13.         },
  14.         {
  15.             "value": "sales-lead-0003"
  16.         }
  17.     ]
  18. }'
复制代码
  1. {
  2.         "offsets": [{
  3.                 "partition": 0,
  4.                 "offset": 0
  5.         }, {
  6.                 "partition": 2,
  7.                 "offset": 0
  8.         }, {
  9.                 "partition": 3,
  10.                 "offset": 0
  11.         }]
  12. }
复制代码
批量生产消息(异步)

  1. curl -X POST \
  2.   http://42.193.162.141:31331/topics/bridge-quickstart-topic?async=true \
  3.   -H 'content-type: application/vnd.kafka.json.v2+json' \
  4.   -d '{
  5.     "records": [
  6.         {
  7.             "key": "my-key",
  8.             "value": "sales-lead-0001"
  9.         },
  10.         {
  11.             "value": "sales-lead-0002",
  12.             "partition": 2
  13.         },
  14.         {
  15.             "value": "sales-lead-0003"
  16.         }
  17.     ]
  18. }'
复制代码
查看partition

  1. curl -X GET \
  2.   http://42.193.162.141:31331/topics/bridge-quickstart-topic/partitions
复制代码
  1. [{
  2.         "partition": 0,
  3.         "leader": 0,
  4.         "replicas": [{
  5.                 "broker": 0,
  6.                 "leader": true,
  7.                 "in_sync": true
  8.         }]
  9. }, {
  10.         "partition": 1,
  11.         "leader": 0,
  12.         "replicas": [{
  13.                 "broker": 0,
  14.                 "leader": true,
  15.                 "in_sync": true
  16.         }]
  17. }, {
  18.         "partition": 2,
  19.         "leader": 0,
  20.         "replicas": [{
  21.                 "broker": 0,
  22.                 "leader": true,
  23.                 "in_sync": true
  24.         }]
  25. }, {
  26.         "partition": 3,
  27.         "leader": 0,
  28.         "replicas": [{
  29.                 "broker": 0,
  30.                 "leader": true,
  31.                 "in_sync": true
  32.         }]
  33. }]
复制代码
  1. curl -X GET \
  2.   http://42.193.162.141:31331/topics/bridge-quickstart-topic/partitions/0
复制代码
  1. {
  2.         "partition": 0,
  3.         "leader": 0,
  4.         "replicas": [{
  5.                 "broker": 0,
  6.                 "leader": true,
  7.                 "in_sync": true
  8.         }]
  9. }
复制代码
  1. curl -X GET \
  2.   http://42.193.162.141:31331/topics/bridge-quickstart-topic/partitions/0/offsets
复制代码
  1. {
  2.         "beginning_offset": 0,
  3.         "end_offset": 5
  4. }
复制代码
创建bridge consumer

  1. curl -X POST http://42.193.162.141:31331/consumers/bridge-quickstart-consumer-group \
  2.   -H 'content-type: application/vnd.kafka.v2+json' \
  3.   -d '{
  4.     "name": "bridge-quickstart-consumer",
  5.     "auto.offset.reset": "earliest",
  6.     "format": "json",
  7.     "enable.auto.commit": false,
  8.     "fetch.min.bytes": 16,
  9.     "consumer.request.timeout.ms": 300000
  10.   }'
复制代码

  1. {
  2.         "instance_id": "bridge-quickstart-consumer",
  3.         "base_uri": "http://42.193.162.141:31331/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer"
  4. }
复制代码
如何删除bridge consumer

  1. curl -X DELETE http://42.193.162.141:31331/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer
复制代码
订阅指定topic的消息

  1. curl -X POST http://42.193.162.141:31331/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/subscription \
  2.   -H 'content-type: application/vnd.kafka.v2+json' \
  3.   -d '{
  4.     "topics": [
  5.         "bridge-quickstart-topic"
  6.     ]
  7. }'
复制代码
拉取消息

  1. curl -X GET http://42.193.162.141:31331/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/records \
  2.   -H 'accept: application/vnd.kafka.json.v2+json'
复制代码
  1. [
  2.         {
  3.                 "topic": "bridge-quickstart-topic",
  4.                 "key": "my-key",
  5.                 "value": "sales-lead-0001",
  6.                 "partition": 0,
  7.                 "offset": 0
  8.         }, {
  9.                 "topic": "bridge-quickstart-topic",
  10.                 "key": "my-key",
  11.                 "value": "sales-lead-0001",
  12.                 "partition": 0,
  13.                 "offset": 1
  14.         }
  15. ]
复制代码
提交offset

  1. curl -X POST http://42.193.162.141:31331/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/offsets
复制代码
设定offset

  1. curl -X POST \
  2.   http://42.193.162.141:31331/topics/bridge-quickstart-topic \
  3.   -H 'content-type: application/vnd.kafka.json.v2+json' \
  4.   -d '{
  5.     "records": [
  6.         {
  7.             "value": "sales-lead-a002-01234567890123456789",
  8.             "partition": 2
  9.         }
  10.     ]
  11. }'
复制代码
  1. curl -X POST http://42.193.162.141:31331/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/positions \
  2.   -H 'content-type: application/vnd.kafka.v2+json' \
  3.   -d '{
  4.     "offsets": [
  5.         {
  6.             "topic": "bridge-quickstart-topic",
  7.             "partition": 2,
  8.             "offset": 74
  9.         }
  10.     ]
  11. }'
复制代码
欢迎关注博客园:程序员欣宸

学习路上,你不孤单,欣宸原创一路相伴...

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4