Kafka Connect

打印 上一主题 下一主题

主题 525|帖子 525|积分 1575

confluent官网:https://debezium.io/documentation/reference/stable/connectors/sqlserver.html
Debezium官网:https://debezium.io/documentation/reference/stable/connectors/sqlserver.html
一、什么是 Kafka Connect?

Kafka Connect 是 Apache Kafka® 的一个免费开源组件,可作为集中式数据中心,用于在数据库、键值存储、搜索索引和文件体系之间进行简单的数据集成。
您可以利用 Kafka Connect 在 Apache Kafka 和其他数据体系之间流式传输数据,并快速创建用于将大型数据集移入和移出 Kafka® 的连接器。
二、Kafka Connect 下载(sql server下载举例)

confluent官网下载地址:
https://www.confluent.io/hub/debezium/debezium-connector-sqlserver



三、Kafka Connect 启动

1、修改设置文件(文件在kafka的安装目次有)

vim /opt/kafka/connect-config/connect-distributed.properties
  1. bootstrap.servers=192.168.26.25:9092
  2. group.id=connect-cluster
  3. key.converter=org.apache.kafka.connect.json.JsonConverter
  4. value.converter=org.apache.kafka.connect.json.JsonConverter
  5. key.converter.schemas.enable=true
  6. value.converter.schemas.enable=true
  7. offset.storage.topic=connect-offsets
  8. offset.storage.replication.factor=1
  9. config.storage.topic=connect-configs
  10. config.storage.replication.factor=1
  11. status.storage.topic=connect-status
  12. status.storage.replication.factor=1
  13. offset.flush.interval.ms=10000
  14. plugin.path=/data/qys/infra/kafka_2.12-3.6.1/plugins
  15. topic.creation.default.partitions=1
  16. topic.creation.default.replication.factor=1
复制代码
重要修改内容:


  • 指定好bootstrap server地址 bootstrap.servers
  • 默认分区与分片
    topic.creation.default.partitions=1
    topic.creation.default.replication.factor=1
  • 插件地址
    plugin.path=
2、下载插件

参照 二、Kafka Connect 下载
3、启动kafka-connect历程(前提kafka已经启动)

  1. /opt/kafka/bin/connect-distributed.sh -daemon /opt/kafka/config/connect-distributed.properties
复制代码
postman查看


四、docker-compose启动kafka-connect示例

1、修改设置文件(文件在kafka的安装目次有)

vim ./kafka-connect/conf/connect-distributed.properties(参照上文三的1)
2、下载sql server

参照 二、Kafka Connect 下载
3、docker-compose.yaml

  1. version: '2.4'
  2. services:
  3.   zookeeper:
  4.     image: wurstmeister/zookeeper
  5.     container_name: zk
  6.     ports:
  7.       - "2181:2181"
  8.     restart: always
  9.     volumes:
  10.       - ./zookeeper_data:/opt/zookeeper-3.4.13/data
  11.   kafka1:
  12.     image: wurstmeister/kafka:2.12-2.3.0
  13.     container_name: kafka1
  14.     ports:
  15.       - "32771:9092"
  16.     environment:
  17.       TZ: Asia/Shanghai  # 设置为所需的时区
  18.       KAFKA_ADVERTISED_HOST_NAME: 192.168.180.46
  19.       KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
  20.       KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
  21.     volumes:
  22.       - /var/run/docker.sock:/var/run/docker.sock
  23.       - ./kafka_data/kafka1_data:/kafka
  24.     restart: always
  25.     depends_on:
  26.       - zookeeper
  27.   kafka2:
  28.     image: wurstmeister/kafka:2.12-2.3.0
  29.     container_name: kafka2
  30.     ports:
  31.       - "32772:9092"
  32.     environment:
  33.       TZ: Asia/Shanghai  # 设置为所需的时区
  34.       KAFKA_ADVERTISED_HOST_NAME: 192.168.180.46
  35.       KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
  36.       KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
  37.     volumes:
  38.       - /var/run/docker.sock:/var/run/docker.sock
  39.       - ./kafka_data/kafka2_data:/kafka
  40.     restart: always
  41.     depends_on:
  42.       - zookeeper
  43.   kafka3:
  44.     image: wurstmeister/kafka:2.12-2.3.0
  45.     container_name: kafka3
  46.     ports:
  47.       - "32773:9092"
  48.     environment:
  49.       TZ: Asia/Shanghai  # 设置为所需的时区
  50.       KAFKA_ADVERTISED_HOST_NAME: 192.168.180.46
  51.       KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
  52.       KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
  53.     volumes:
  54.       - /var/run/docker.sock:/var/run/docker.sock
  55.       - ./kafka_data/kafka3_data:/kafka
  56.     restart: always
  57.     depends_on:
  58.       - zookeeper
  59.   kafka-connect:
  60.     image: wurstmeister/kafka:2.12-2.3.0
  61.     container_name: connect
  62.     ports:
  63.       - "38083:8083"
  64.     entrypoint:
  65.       - /opt/kafka/bin/connect-distributed.sh
  66.       - /opt/kafka/connect-config/connect-distributed.properties
  67.     volumes:
  68.       - /etc/localtime:/usr/share/zoneinfo/Asia/Shanghai
  69.       - /var/run/docker.sock:/var/run/docker.sock
  70.       - ./kafka-connect/conf:/opt/kafka/connect-config
  71.       - ./kafka-connect/plugins:/opt/bitnami/kafka/plugin
  72.     restart: always
  73.     depends_on:
  74.       - zookeeper
  75.   kafka-client:
  76.     image: wurstmeister/kafka:2.12-2.3.0
  77.     entrypoint:
  78.       - tail
  79.       - -f
  80.       - /etc/hosts
  81.     volumes:
  82.       - /var/run/docker.sock:/var/run/docker.sock
  83.       - ./kafka-connect/conf:/opt/kafka/connect-config
  84.       - ./kafka-connect/plugins:/opt/kafka/plugins
  85.     restart: always
  86.     depends_on:
  87.       - zookeeper
  88.   kafdrop:
  89.     image: obsidiandynamics/kafdrop
  90.     container_name: kafdrop
  91.     restart: "no"
  92.     ports:
  93.       - "9000:9000"
  94.     environment:
  95.       KAFKA_BROKERCONNECT: "kafka1:9092,kafka2:9092,kafka3:9092"
  96.       TZ: Asia/Shanghai  # 设置为所需的时区
  97.     depends_on:
  98.       - zookeeper
  99.       - kafka1
  100.       - kafka2
  101.       - kafka3
复制代码
五、postman创建SQLserver的connec

1、sqlserver数据库开启cdc

  1. -- 开启该库cdc
  2. USE 库名
  3. EXEC sys.sp_cdc_enable_db
  4. GO
  5. -- 开启表CDC
  6. use 库名;
  7. EXEC sys.sp_cdc_enable_table
  8. @source_schema = N'schema名',
  9. @source_name = N'表名',
  10. @role_name = NULL,
  11. @supports_net_changes = 1
  12. GO
  13. -- 关闭数据库CDC
  14. USE 库名;
  15. GO
  16. EXEC sys.sp_cdc_disable_db
  17. -- 关闭表的CDC功能
  18. USE  库名;
  19. GO
  20.     EXEC sys.sp_cdc_disable_table
  21.     @source_schema = N'schema名',
  22.     @source_name   = N'表名',
  23.     @capture_instance = N'schema名_表名'
  24. GO
  25. -- 查看数据库开启cdc检查
  26. SELECT name ,is_cdc_enabled FROM sys.databases WHERE is_cdc_enabled = 1
  27. -- 查看表cdc开启情况
  28. use Libby;
  29. SELECT name ,is_tracked_by_cdc FROM sys.tables WHERE is_tracked_by_cdc = 1
复制代码
2、创建sqlserver的connect



  • postman创建connect

  1. {
  2.     "name": "cdc_mdata_test_3",
  3.     "config": {
  4.         "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
  5.         "database.hostname": "192.168.180.44",
  6.         "database.port": "1433",
  7.         "database.user": "sdp",
  8.         "database.password": "shared@123",
  9.         "database.dbname": "Libby",
  10.         "table.whitelist": "MappingData.test",
  11.         "database.server.name": "cdc_mdata_test_3",
  12.         "database.history.kafka.bootstrap.servers": "192.168.180.46:32771",
  13.         "database.history.kafka.topic": "cdc_mdata_test_3",
  14.         "transforms": "Reroute",
  15.         "transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter",
  16.         "transforms.Reroute.topic.regex": "cdc_mdata_(.*)",
  17.         "transforms.Reroute.topic.replacement": "cdc_md_combine",
  18.         "errors.tolerance": "all"
  19.     }
  20. }
复制代码


  • postman查看创建的connect

六、connect的restapi

1、参考网址:

https://docs.confluent.io/platform/current/connect/references/restapi.html
2、常用restapi举例

infomethodurlconnect Clustergethttp://192.168.180.46:38083/connectorsgethttp://192.168.180.46:38083/connectorsexpand=status&expand=infogethttp://192.168.180.46:38083/connectors?expand=status&expand=infocreate-connectorposthttp://192.168.180.46:38083/connectorsdelete-connectordeletehttp://192.168.180.46:38083/connectors/cdc_mdata_test_3(connector名称)connector-configgethttp://192.168.180.46:38083/connectors/cdc_mdata_test_3(connector名称)/configconnector-statusgethttp://192.168.180.46:38083/connectors/cdc_mdata_test_3(connector名称)/statusconnector-pauseputhttp://192.168.180.46:38083/connectors/cdc_mdata_test_3/pauseconnector-resumeputhttp://192.168.180.46:38083/connectors/cdc_mdata_test_3/resumeconnector-stopputhttp://192.168.180.46:38083/connectors/mysql-connector-test46/stoptask-restartposthttp://192.168.180.46:38083/connectors/cdc_mdata_test_3/tasks/0/restarttask-statusgethttp://192.168.180.46:38083/connectors/cdc_mdata_test_3/tasks/0/status
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

玛卡巴卡的卡巴卡玛

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表