ToB企服应用市场:ToB评测及商务社交产业平台

标题: Debizum Kafka Connect实现Mysql CDC功能 [打印本页]

作者: 罪恶克星    时间: 2025-1-6 10:43
标题: Debizum Kafka Connect实现Mysql CDC功能
Debizum Kafka Connect架构

最常见的是通过Apache Kafka Connect部署Debezium。Kafka Connect是一个框架和运行时,用于实现和操作:

1.Mysql数据库预备

  1. show variables like 'log_%';
复制代码
假如 log_bin 字段是 OFF,可通过下列配置文件my.cnf开启.
  1.    # [必须配置]服务ID
  2.    server-id=1
  3.    # 建议配置在[datadir]目录下, binlog 配置 只要配置了log_bin地址 就会开启
  4.    log_bin = /data/mysql/mysql_bin
  5.    # 日志存储天数 默认0 永久保存
  6.    # 如果数据库会定期归档,建议设置一个存储时间不需要一直存储binlog日志,理论上只需要存储归档之后的日志
  7.    expire_logs_days = 30
  8.    # [mysql8.0被废弃]binlog最大值
  9.    max_binlog_size = 1024M
  10.    # 规定binlog的格式,binlog有三种格式statement、row、mixad,默认使用statement,
  11.    # 建议使用row格式
  12.    binlog_format = ROW
  13.    # [重点]在提交n次事务后,进行binlog的落盘,0为不进行强行的刷新操作,而是由文件系统控制刷新日志文件,如果是在线交易和账有关的数据建议设置成1,如果是其他数据可以保持为0即可
  14.    sync_binlog = 1
复制代码
然后再重启,systemctl restart mysqld或systemctl restart mysql.service
2.Debezium环境预备

1)zookeeper+kafka+kafka ui
  1. version: "2"
  2. services:
  3.     zookeeper:
  4.         image: confluentinc/cp-zookeeper:latest
  5.         container_name: debezium-zookeeper
  6.         environment:
  7.             ZOOKEEPER_CLIENT_PORT: 2181
  8.             ZOOKEEPER_TICK_TIME: 2000
  9.             ZK_SERVER_HEAP: "-Xmx256M -Xms256M"
  10.         ports:
  11.             - 22181:2181
  12.     kafka:
  13.         image: confluentinc/cp-kafka:latest
  14.         container_name: debezium-kafka-broker
  15.         depends_on:
  16.             - zookeeper
  17.         ports:
  18.             - 29092:29092
  19.         environment:
  20.             KAFKA_BROKER_ID: 1
  21.             KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
  22.             KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://192.168.14.107:29092
  23.             KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
  24.             KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
  25.             KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
  26.             KAFKA_HEAP_OPTS: "-Xmx256M -Xms256M"
  27.     init-kafka:
  28.         image: confluentinc/cp-kafka:latest
  29.         depends_on:
  30.           - kafka
  31.         entrypoint: [ '/bin/sh', '-c' ]
  32.         command: |
  33.           "
  34.           # blocks until kafka is reachable
  35.           kafka-topics --bootstrap-server kafka:9092 --list
  36.    
  37.           echo -e 'Creating kafka topics'
  38.           kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic product-update-fail --replication-factor 1 --partitions 1
  39.           kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic order-created --replication-factor 1 --partitions 1
  40.           kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic product-update-successfully --replication-factor 1 --partitions 1
  41.    
  42.           echo -e 'Successfully created the following topics:'
  43.           kafka-topics --bootstrap-server kafka:9092 --list
  44.           "
  45.     kafka-ui:
  46.         container_name: debezium-kafka-ui
  47.         image: provectuslabs/kafka-ui:latest
  48.         ports:
  49.           - 9000:8080
  50.         depends_on:
  51.           - zookeeper
  52.           - kafka
  53.         environment:
  54.           KAFKA_CLUSTERS_0_NAME: loan-kafka-ui
  55.           KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
  56.           KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
  57.      
  58. networks:
  59.   debezium_default:
  60.     driver: bridge
复制代码
2)debezium
偶然间可以搞一下 [Debezium UI].(https://debezium.io/documentation/reference/2.7/operations/debezium-ui.html)
  1. version: "2"
  2. services:
  3.   connect:
  4.     image: quay.io/debezium/connect:2.7.3.Final
  5.     networks:
  6.       - debezium_default
  7.     ports:
  8.      - 7750:8083
  9.     environment:
  10.     ## kafka链接地址
  11.      - BOOTSTRAP_SERVERS=kafka:9092
  12.      - GROUP_ID=1
  13.      - CONFIG_STORAGE_TOPIC=mx.connect_configs
  14.      - OFFSET_STORAGE_TOPIC=mx.connect_offsets
  15.      - STATUS_STORAGE_TOPIC=mx.connect_statuses
  16.      - HEAP_OPTS=-Xms256M -Xmx512M
  17. #  debezium-ui:
  18. #    image: quay.io/debezium/debezium-ui:2.5
  19. #    networks:
  20. #      - debezium_default
  21. #    ports:
  22. #      - 7780:8080
  23. #    environment:
  24. #      - KAFKA_CONNECT_URIS=http://connect:8083
  25. networks:
  26.   debezium_default:
  27.      #添加到上面已有的网络中!
  28.      externl: true
复制代码
3.创建Debuzium毗连

debuzium 安装乐成后,想要捕捉mysql数据库变动,必要哀求debuzium 对外暴露的接口创建毗连器,执行下面创建一个source的connector的哀求,执行乐成,则会正确返回,再执行获取指定connector状态的哀求,如果得到一下结果,则阐明毗连器创建乐成。
3.1.connect暴露底子API接口

  1. #检测活跃的connect
  2. GET  http://192.168.204.134:7750/connectors/
复制代码
  1. #创建新的connect
  2. POST  http://192.168.204.134:7750/connectors
  3. Content-Type: application/json
  4. body:{略...}
复制代码
  1. #删除connect
  2. DELETE http://192.168.204.134:7750/connectors/${connect-name}
复制代码
  1. #获取详情
  2. http://192.168.204.134:7750/connectors/${connect-name}/config
复制代码
3.2.创建source connect

  1. POST  http://192.168.204.134:7750/connectors
  2. Content-Type: application/json
  3. {
  4.   "name": "connector-test-db",
  5.   "config": {
  6.     "connector.class": "io.debezium.connector.mysql.MySqlConnector",
  7.     "database.hostname": "192.168.204.134",
  8.     "database.port": "3306",
  9.     "database.user": "root",
  10.     "database.password": "123456",
  11.     "database.server.id": "10001",
  12.     "database.server.name": "debezium-server-test",
  13.     "database.serverTimezone":"UTC",
  14.     "topic.prefix": "debezium-server-test",
  15.     "time.precision.mode": "connect"
  16. ,
  17.     "database.include.list": "spring-debezium-demo",
  18.     "table.include.list": "spring-debezium-demo.sys_user",
  19.     "snapshot.mode": "schema_only",
  20.     "database.history.kafka.bootstrap.servers": "kafka:9092",
  21.     "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
  22.     "schema.history.internal.kafka.topic": "debezium-server-test.schemahistory.spring-debezium-demo",
  23.     "database.history.kafka.topic": "debezium-server-test.debezium-server_history_schema",
  24.     "include.schema.changes": "true",
  25.     "transforms":"unwrap",
  26.     "transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState",
  27.     "transforms.unwrap.drop.tombstones":"true",
  28.     "transforms.unwrap.delete.handling.mode":"none",
  29.     "transforms.unwrap.operation.header":"false"
  30.   }
  31. }
复制代码
参数分析:

3.2.创建sink connect

Connect容器自带有Debezium的官方Source Connector:

  1. # docker容器中新建kafka-connect-jdbc目录
  2. docker exec 容器id mkdir /kafka/connect/kafka-connect-jdbc
  3. # 下载jar包到本地
  4. wget https://packages.confluent.io/maven/io/confluent/kafka-connect-jdbc/10.7.4/kafka-connect-jdbc-10.7.4.jar
  5. # 拷贝jar包到docker容器
  6. docker cp kafka-connect-jdbc-10.7.4.jar 容器id:/kafka/connect/kafka-connect-jdbc
  7. # 重启connect容器
  8. docker restart 容器id
  9. #【踩坑】注意这个包是没有驱动,需要把容器内debezium-connector-mysql文件夹mysq-connector复制到kafka-connect-jdbc下面
复制代码
创建一个Sink connect目的是下游Mysql
  1. POST  http://192.168.204.134:7750/connectors
  2. Content-Type: application/json
  3. {
  4.   "name": "sink-sys-user-mysql",
  5.   "config": {
  6.     "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
  7.     "tasks.max": "1",
  8.     "topics": "debezium-server-test.spring-debezium-demo.sys_user",
  9.     "connection.url": "jdbc:mysql://192.168.204.134:3306/debezium-sink-db?useSSL=false&serverTimezone=UTC",
  10.     "connection.user": "root",
  11.     "connection.password": "123456",
  12.     "insert.mode": "upsert",
  13.     "auto.create": "true",
  14.     "auto.evolve": "true",
  15.     "pk.fields": "ID",
  16.     "pk.mode": "record_key",
  17.     "table.name.format": "sys_user",
  18.     "fields.whitelist": "ID,NAME,USERNAME,PASSWORD,GENDER,REMARK,STATUS,CREATE_TIME,CREATE_ID,UPDATE_TIME,UPDATE_ID",
  19.     "delete.enabled": "true",
  20.     "batch.size": "500",
  21.     "transforms": "convertTimestamps",
  22.     "transforms.convertTimestamps.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
  23.     "transforms.convertTimestamps.field": "CREATE_TIME,UPDATE_TIME",
  24.     "transforms.convertTimestamps.target.type": "Timestamp",
  25.     "transforms.convertTimestamps.format": "yyyy-MM-dd HH:mm:ss"
  26.   }
  27. }
  28. #【踩坑】一定要注意数据库,表字段的大小写
复制代码
生产环境不可能一个表一个connect,可以使用topic正则来动态适配,(注意先在source connect配置好table.include.list)
  1. #取消直接命名
  2. #"table.name.format": "sys_user"
  3. #监听的"topics"改为"topics.regex"
  4. #改为下面的
  5. "topics.regex": "debezium-server-test.spring-debezium-demo.*",
  6. "transforms":"routeTopics",
  7. "transforms.routeTopics.type":"org.apache.kafka.connect.transforms.RegexRouter",
  8. "transforms.routeTopics.regex":"^(.*)\\.spring-debezium-demo\\.(.*)$",
  9. "transforms.routeTopics.replacement":"$2",
  10. "table.name.format":"${topic}"
复制代码
【踩坑】:mysql CDC 关于时间 Incorrect datetime value: ‘1733827592000’ for column ‘CREATE_TIME’ at row 1
办理办法:
方法一:(可能无效)
source添加:
  1. "time.precision.mode": "connect"
复制代码
sink添加:
  1. "transforms":"TimestampConverter"
  2. "transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value"
  3. "transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss"
  4. "transforms.TimestampConverter.target.type": "Timestamp"
  5. "transforms.TimestampConverter.field": "time"
复制代码
方法二:(有效)
source添加:
  1. "database.serverTimezone":"UTC",
  2. "time.precision.mode":"connect",
  3. "transforms":"unwrap",
  4. "transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState",
  5. "transforms.unwrap.drop.tombstones":"true",
  6. "transforms.unwrap.delete.handling.mode":"none",
  7. "transforms.unwrap.operation.header":"false"
复制代码
sink添加:
  1. "transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
  2. "transforms.TimestampConverter.target.type": "Timestamp",
  3. "transforms.TimestampConverter.field": "create_time,his_create_time,his_update_time"
复制代码
4.运行结果




5.总结

更多资料查阅官方网站:https://debezium.io/documentation/reference/3.0/connectors/jdbc.html
国内的文档较少,CDC过程中发现,就算配置了auto.create是无效的。并不会在sink connect中创建同步的表。特殊要注意,source端和sink的一些数据类型,比如datetime,bigdecimal,json等对应debezium的数据类型。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4