Debizum Kafka Connect架构
最常见的是通过Apache Kafka Connect部署Debezium。Kafka Connect是一个框架和运行时,用于实现和操作:
- 源毗连器 (source connect)
- 吸收毗连器(sink connect),将记录从Kafka主题流传到其他系统
下图体现了基于Debezium的变动数据捕获管道的架构:
1.Mysql数据库预备
- show variables like 'log_%';
复制代码 假如 log_bin 字段是 OFF,可通过下列配置文件my.cnf开启.
- # [必须配置]服务ID
- server-id=1
- # 建议配置在[datadir]目录下, binlog 配置 只要配置了log_bin地址 就会开启
- log_bin = /data/mysql/mysql_bin
- # 日志存储天数 默认0 永久保存
- # 如果数据库会定期归档,建议设置一个存储时间不需要一直存储binlog日志,理论上只需要存储归档之后的日志
- expire_logs_days = 30
- # [mysql8.0被废弃]binlog最大值
- max_binlog_size = 1024M
- # 规定binlog的格式,binlog有三种格式statement、row、mixad,默认使用statement,
- # 建议使用row格式
- binlog_format = ROW
- # [重点]在提交n次事务后,进行binlog的落盘,0为不进行强行的刷新操作,而是由文件系统控制刷新日志文件,如果是在线交易和账有关的数据建议设置成1,如果是其他数据可以保持为0即可
- sync_binlog = 1
复制代码 然后再重启,systemctl restart mysqld或systemctl restart mysql.service
2.Debezium环境预备
1)zookeeper+kafka+kafka ui
- version: "2"
- services:
- zookeeper:
- image: confluentinc/cp-zookeeper:latest
- container_name: debezium-zookeeper
- environment:
- ZOOKEEPER_CLIENT_PORT: 2181
- ZOOKEEPER_TICK_TIME: 2000
- ZK_SERVER_HEAP: "-Xmx256M -Xms256M"
- ports:
- - 22181:2181
- kafka:
- image: confluentinc/cp-kafka:latest
- container_name: debezium-kafka-broker
- depends_on:
- - zookeeper
- ports:
- - 29092:29092
- environment:
- KAFKA_BROKER_ID: 1
- KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
- KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://192.168.14.107:29092
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
- KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
- KAFKA_HEAP_OPTS: "-Xmx256M -Xms256M"
- init-kafka:
- image: confluentinc/cp-kafka:latest
- depends_on:
- - kafka
- entrypoint: [ '/bin/sh', '-c' ]
- command: |
- "
- # blocks until kafka is reachable
- kafka-topics --bootstrap-server kafka:9092 --list
-
- echo -e 'Creating kafka topics'
- kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic product-update-fail --replication-factor 1 --partitions 1
- kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic order-created --replication-factor 1 --partitions 1
- kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic product-update-successfully --replication-factor 1 --partitions 1
-
- echo -e 'Successfully created the following topics:'
- kafka-topics --bootstrap-server kafka:9092 --list
- "
- kafka-ui:
- container_name: debezium-kafka-ui
- image: provectuslabs/kafka-ui:latest
- ports:
- - 9000:8080
- depends_on:
- - zookeeper
- - kafka
- environment:
- KAFKA_CLUSTERS_0_NAME: loan-kafka-ui
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
- KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
-
- networks:
- debezium_default:
- driver: bridge
复制代码 2)debezium
偶然间可以搞一下 [Debezium UI].(https://debezium.io/documentation/reference/2.7/operations/debezium-ui.html)
- version: "2"
- services:
- connect:
- image: quay.io/debezium/connect:2.7.3.Final
- networks:
- - debezium_default
- ports:
- - 7750:8083
- environment:
- ## kafka链接地址
- - BOOTSTRAP_SERVERS=kafka:9092
- - GROUP_ID=1
- - CONFIG_STORAGE_TOPIC=mx.connect_configs
- - OFFSET_STORAGE_TOPIC=mx.connect_offsets
- - STATUS_STORAGE_TOPIC=mx.connect_statuses
- - HEAP_OPTS=-Xms256M -Xmx512M
- # debezium-ui:
- # image: quay.io/debezium/debezium-ui:2.5
- # networks:
- # - debezium_default
- # ports:
- # - 7780:8080
- # environment:
- # - KAFKA_CONNECT_URIS=http://connect:8083
- networks:
- debezium_default:
- #添加到上面已有的网络中!
- externl: true
复制代码 3.创建Debuzium毗连
debuzium 安装乐成后,想要捕捉mysql数据库变动,必要哀求debuzium 对外暴露的接口创建毗连器,执行下面创建一个source的connector的哀求,执行乐成,则会正确返回,再执行获取指定connector状态的哀求,如果得到一下结果,则阐明毗连器创建乐成。
3.1.connect暴露底子API接口
- #检测活跃的connect
- GET http://192.168.204.134:7750/connectors/
复制代码- #创建新的connect
- POST http://192.168.204.134:7750/connectors
- Content-Type: application/json
- body:{略...}
复制代码- #删除connect
- DELETE http://192.168.204.134:7750/connectors/${connect-name}
复制代码- #获取详情
- http://192.168.204.134:7750/connectors/${connect-name}/config
复制代码 3.2.创建source connect
- POST http://192.168.204.134:7750/connectors
- Content-Type: application/json
- {
- "name": "connector-test-db",
- "config": {
- "connector.class": "io.debezium.connector.mysql.MySqlConnector",
- "database.hostname": "192.168.204.134",
- "database.port": "3306",
- "database.user": "root",
- "database.password": "123456",
- "database.server.id": "10001",
- "database.server.name": "debezium-server-test",
- "database.serverTimezone":"UTC",
- "topic.prefix": "debezium-server-test",
- "time.precision.mode": "connect"
- ,
- "database.include.list": "spring-debezium-demo",
- "table.include.list": "spring-debezium-demo.sys_user",
- "snapshot.mode": "schema_only",
- "database.history.kafka.bootstrap.servers": "kafka:9092",
- "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
- "schema.history.internal.kafka.topic": "debezium-server-test.schemahistory.spring-debezium-demo",
- "database.history.kafka.topic": "debezium-server-test.debezium-server_history_schema",
- "include.schema.changes": "true",
- "transforms":"unwrap",
- "transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState",
- "transforms.unwrap.drop.tombstones":"true",
- "transforms.unwrap.delete.handling.mode":"none",
- "transforms.unwrap.operation.header":"false"
- }
- }
复制代码 参数分析:
- name:毗连器的唯一名称,在 Kafka Connect 集群中应保持唯一性。
- config:包含毗连器的具体配置项。
- connector.class:指定使用 Debezium 的 MySQL 毗连器类。
- database.hostname: 目的 MySQL 数据库服务器的主机地址。
- database.port:目的 MySQL 数据库服务器的端标语,默认 MySQL 端口为 3306。
- database.user: 用于毗连 MySQL 数据库的用户名。
- database.password:对应用户名的密码。
- database.server.id:毗连器在 MySQL 集群中的唯一服务器 ID,克制与其他 MySQL 服务器或毗连器冲突。
- database.server.name: 逻辑服务器名称,用作天生的 Kafka 主题的前缀,标识捕获数据的来源。
- topic.prefix:指定 Kafka 主题的前缀,实际的主题名称通常为 {topic.prefix}.{database}.{table}。应该和上面的${database.server.name}划一。
- database.include.list:指定要捕获的数据库名称,毗连器仅捕获该数据库中的更改。
- table.include.list:指定要捕获的表,格式为 {database}.{table},仅捕获该表的更改。
- snapshot.mode: 界说初始快照模式:
- schema_only:仅捕获数据库模式(布局),不包含数据。其他选项包括 initial(捕获模式和数据)等。
- database.history.kafka.bootstrap.servers:指定用于存储数据库历史记录的 Kafka broker 地址。
- schema.history.internal.kafka.bootstrap.servers: 指定用于存储内部模式历史的 Kafka broker 地址。
- schema.history.internal.kafka.topic:存储内部模式历史的 Kafka 主题名称。
- database.history.kafka.topic: 存储数据库历史信息的 Kafka 主题名称。
- include.schema.changes:指定是否将数据库模式更改(DDL 语句)作为事故写入 Kafka 主题。true 表现写入。
3.2.创建sink connect
Connect容器自带有Debezium的官方Source Connector:
- debezium-connector-db2
- debezium-connector-mysql
- debezium-connector-postgres
- debezium-connector-vitess
- debezium-connector-mongodb
- debezium-connector-oracle
- debezium-connector-sqlserver
sink connect必要从confluent 官网下载kafka-connect-jdbc包
- # docker容器中新建kafka-connect-jdbc目录
- docker exec 容器id mkdir /kafka/connect/kafka-connect-jdbc
- # 下载jar包到本地
- wget https://packages.confluent.io/maven/io/confluent/kafka-connect-jdbc/10.7.4/kafka-connect-jdbc-10.7.4.jar
- # 拷贝jar包到docker容器
- docker cp kafka-connect-jdbc-10.7.4.jar 容器id:/kafka/connect/kafka-connect-jdbc
- # 重启connect容器
- docker restart 容器id
- #【踩坑】注意这个包是没有驱动,需要把容器内debezium-connector-mysql文件夹mysq-connector复制到kafka-connect-jdbc下面
复制代码 创建一个Sink connect目的是下游Mysql
- POST http://192.168.204.134:7750/connectors
- Content-Type: application/json
- {
- "name": "sink-sys-user-mysql",
- "config": {
- "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
- "tasks.max": "1",
- "topics": "debezium-server-test.spring-debezium-demo.sys_user",
- "connection.url": "jdbc:mysql://192.168.204.134:3306/debezium-sink-db?useSSL=false&serverTimezone=UTC",
- "connection.user": "root",
- "connection.password": "123456",
- "insert.mode": "upsert",
- "auto.create": "true",
- "auto.evolve": "true",
- "pk.fields": "ID",
- "pk.mode": "record_key",
- "table.name.format": "sys_user",
- "fields.whitelist": "ID,NAME,USERNAME,PASSWORD,GENDER,REMARK,STATUS,CREATE_TIME,CREATE_ID,UPDATE_TIME,UPDATE_ID",
- "delete.enabled": "true",
- "batch.size": "500",
- "transforms": "convertTimestamps",
- "transforms.convertTimestamps.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
- "transforms.convertTimestamps.field": "CREATE_TIME,UPDATE_TIME",
- "transforms.convertTimestamps.target.type": "Timestamp",
- "transforms.convertTimestamps.format": "yyyy-MM-dd HH:mm:ss"
- }
- }
- #【踩坑】一定要注意数据库,表字段的大小写
复制代码 生产环境不可能一个表一个connect,可以使用topic正则来动态适配,(注意先在source connect配置好table.include.list)
- #取消直接命名
- #"table.name.format": "sys_user"
- #监听的"topics"改为"topics.regex"
- #改为下面的
- "topics.regex": "debezium-server-test.spring-debezium-demo.*",
- "transforms":"routeTopics",
- "transforms.routeTopics.type":"org.apache.kafka.connect.transforms.RegexRouter",
- "transforms.routeTopics.regex":"^(.*)\\.spring-debezium-demo\\.(.*)$",
- "transforms.routeTopics.replacement":"$2",
- "table.name.format":"${topic}"
复制代码 【踩坑】:mysql CDC 关于时间 Incorrect datetime value: ‘1733827592000’ for column ‘CREATE_TIME’ at row 1
办理办法:
方法一:(可能无效)
source添加:
- "time.precision.mode": "connect"
复制代码 sink添加:
- "transforms":"TimestampConverter"
- "transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value"
- "transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss"
- "transforms.TimestampConverter.target.type": "Timestamp"
- "transforms.TimestampConverter.field": "time"
复制代码 方法二:(有效)
source添加:
- "database.serverTimezone":"UTC",
- "time.precision.mode":"connect",
- "transforms":"unwrap",
- "transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState",
- "transforms.unwrap.drop.tombstones":"true",
- "transforms.unwrap.delete.handling.mode":"none",
- "transforms.unwrap.operation.header":"false"
复制代码 sink添加:
- "transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
- "transforms.TimestampConverter.target.type": "Timestamp",
- "transforms.TimestampConverter.field": "create_time,his_create_time,his_update_time"
复制代码 4.运行结果
5.总结
更多资料查阅官方网站:https://debezium.io/documentation/reference/3.0/connectors/jdbc.html
国内的文档较少,CDC过程中发现,就算配置了auto.create是无效的。并不会在sink connect中创建同步的表。特殊要注意,source端和sink的一些数据类型,比如datetime,bigdecimal,json等对应debezium的数据类型。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |