ToB企服应用市场:ToB评测及商务社交产业平台
标题:
Debizum Kafka Connect实现Mysql CDC功能
[打印本页]
作者:
罪恶克星
时间:
2025-1-6 10:43
标题:
Debizum Kafka Connect实现Mysql CDC功能
Debizum Kafka Connect架构
最常见的是通过Apache Kafka Connect部署Debezium。Kafka Connect是一个框架和运行时,用于实现和操作:
源毗连器 (source connect)
吸收毗连器(sink connect),将记录从Kafka主题流传到其他系统
下图体现了基于Debezium的变动数据捕获管道的架构:
1.Mysql数据库预备
配置mysql的binlog日志必要开启!!!!
show variables like 'log_%';
复制代码
假如 log_bin 字段是 OFF,可通过下列配置文件my.cnf开启.
# [必须配置]服务ID
server-id=1
# 建议配置在[datadir]目录下, binlog 配置 只要配置了log_bin地址 就会开启
log_bin = /data/mysql/mysql_bin
# 日志存储天数 默认0 永久保存
# 如果数据库会定期归档,建议设置一个存储时间不需要一直存储binlog日志,理论上只需要存储归档之后的日志
expire_logs_days = 30
# [mysql8.0被废弃]binlog最大值
max_binlog_size = 1024M
# 规定binlog的格式,binlog有三种格式statement、row、mixad,默认使用statement,
# 建议使用row格式
binlog_format = ROW
# [重点]在提交n次事务后,进行binlog的落盘,0为不进行强行的刷新操作,而是由文件系统控制刷新日志文件,如果是在线交易和账有关的数据建议设置成1,如果是其他数据可以保持为0即可
sync_binlog = 1
复制代码
然后再重启,systemctl restart mysqld或systemctl restart mysql.service
2.Debezium环境预备
1)zookeeper+kafka+kafka ui
version: "2"
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
container_name: debezium-zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ZK_SERVER_HEAP: "-Xmx256M -Xms256M"
ports:
- 22181:2181
kafka:
image: confluentinc/cp-kafka:latest
container_name: debezium-kafka-broker
depends_on:
- zookeeper
ports:
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://192.168.14.107:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_HEAP_OPTS: "-Xmx256M -Xms256M"
init-kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- kafka
entrypoint: [ '/bin/sh', '-c' ]
command: |
"
# blocks until kafka is reachable
kafka-topics --bootstrap-server kafka:9092 --list
echo -e 'Creating kafka topics'
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic product-update-fail --replication-factor 1 --partitions 1
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic order-created --replication-factor 1 --partitions 1
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic product-update-successfully --replication-factor 1 --partitions 1
echo -e 'Successfully created the following topics:'
kafka-topics --bootstrap-server kafka:9092 --list
"
kafka-ui:
container_name: debezium-kafka-ui
image: provectuslabs/kafka-ui:latest
ports:
- 9000:8080
depends_on:
- zookeeper
- kafka
environment:
KAFKA_CLUSTERS_0_NAME: loan-kafka-ui
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
networks:
debezium_default:
driver: bridge
复制代码
2)debezium
偶然间可以搞一下 [Debezium UI].(https://debezium.io/documentation/reference/2.7/operations/debezium-ui.html)
version: "2"
services:
connect:
image: quay.io/debezium/connect:2.7.3.Final
networks:
- debezium_default
ports:
- 7750:8083
environment:
## kafka链接地址
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=mx.connect_configs
- OFFSET_STORAGE_TOPIC=mx.connect_offsets
- STATUS_STORAGE_TOPIC=mx.connect_statuses
- HEAP_OPTS=-Xms256M -Xmx512M
# debezium-ui:
# image: quay.io/debezium/debezium-ui:2.5
# networks:
# - debezium_default
# ports:
# - 7780:8080
# environment:
# - KAFKA_CONNECT_URIS=http://connect:8083
networks:
debezium_default:
#添加到上面已有的网络中!
externl: true
复制代码
3.创建Debuzium毗连
debuzium 安装乐成后,想要捕捉mysql数据库变动,必要哀求debuzium 对外暴露的接口创建毗连器,执行下面创建一个source的connector的哀求,执行乐成,则会正确返回,再执行获取指定connector状态的哀求,如果得到一下结果,则阐明毗连器创建乐成。
3.1.connect暴露底子API接口
#检测活跃的connect
GET http://192.168.204.134:7750/connectors/
复制代码
#创建新的connect
POST http://192.168.204.134:7750/connectors
Content-Type: application/json
body:{略...}
复制代码
#删除connect
DELETE http://192.168.204.134:7750/connectors/${connect-name}
复制代码
#获取详情
http://192.168.204.134:7750/connectors/${connect-name}/config
复制代码
3.2.创建source connect
POST http://192.168.204.134:7750/connectors
Content-Type: application/json
{
"name": "connector-test-db",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "192.168.204.134",
"database.port": "3306",
"database.user": "root",
"database.password": "123456",
"database.server.id": "10001",
"database.server.name": "debezium-server-test",
"database.serverTimezone":"UTC",
"topic.prefix": "debezium-server-test",
"time.precision.mode": "connect"
,
"database.include.list": "spring-debezium-demo",
"table.include.list": "spring-debezium-demo.sys_user",
"snapshot.mode": "schema_only",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
"schema.history.internal.kafka.topic": "debezium-server-test.schemahistory.spring-debezium-demo",
"database.history.kafka.topic": "debezium-server-test.debezium-server_history_schema",
"include.schema.changes": "true",
"transforms":"unwrap",
"transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones":"true",
"transforms.unwrap.delete.handling.mode":"none",
"transforms.unwrap.operation.header":"false"
}
}
复制代码
参数分析:
name:毗连器的唯一名称,在 Kafka Connect 集群中应保持唯一性。
config:包含毗连器的具体配置项。
connector.class:指定使用 Debezium 的 MySQL 毗连器类。
database.hostname: 目的 MySQL 数据库服务器的主机地址。
database.port:目的 MySQL 数据库服务器的端标语,默认 MySQL 端口为 3306。
database.user: 用于毗连 MySQL 数据库的用户名。
database.password:对应用户名的密码。
database.server.id:毗连器在 MySQL 集群中的唯一服务器 ID,克制与其他 MySQL 服务器或毗连器冲突。
database.server.name: 逻辑服务器名称,用作天生的 Kafka 主题的前缀,标识捕获数据的来源。
topic.prefix:指定 Kafka 主题的前缀,实际的主题名称通常为 {topic.prefix}.{database}.{table}。应该和上面的${database.server.name}划一。
database.include.list:指定要捕获的数据库名称,毗连器仅捕获该数据库中的更改。
table.include.list:指定要捕获的表,格式为 {database}.{table},仅捕获该表的更改。
snapshot.mode: 界说初始快照模式:
schema_only:仅捕获数据库模式(布局),不包含数据。其他选项包括 initial(捕获模式和数据)等。
database.history.kafka.bootstrap.servers:指定用于存储数据库历史记录的 Kafka broker 地址。
schema.history.internal.kafka.bootstrap.servers: 指定用于存储内部模式历史的 Kafka broker 地址。
schema.history.internal.kafka.topic:存储内部模式历史的 Kafka 主题名称。
database.history.kafka.topic: 存储数据库历史信息的 Kafka 主题名称。
include.schema.changes:指定是否将数据库模式更改(DDL 语句)作为事故写入 Kafka 主题。true 表现写入。
3.2.创建sink connect
Connect容器自带有Debezium的官方Source Connector:
debezium-connector-db2
debezium-connector-mysql
debezium-connector-postgres
debezium-connector-vitess
debezium-connector-mongodb
debezium-connector-oracle
debezium-connector-sqlserver
sink connect必要从confluent 官网下载kafka-connect-jdbc包
# docker容器中新建kafka-connect-jdbc目录
docker exec 容器id mkdir /kafka/connect/kafka-connect-jdbc
# 下载jar包到本地
wget https://packages.confluent.io/maven/io/confluent/kafka-connect-jdbc/10.7.4/kafka-connect-jdbc-10.7.4.jar
# 拷贝jar包到docker容器
docker cp kafka-connect-jdbc-10.7.4.jar 容器id:/kafka/connect/kafka-connect-jdbc
# 重启connect容器
docker restart 容器id
#【踩坑】注意这个包是没有驱动,需要把容器内debezium-connector-mysql文件夹mysq-connector复制到kafka-connect-jdbc下面
复制代码
创建一个Sink connect目的是下游Mysql
POST http://192.168.204.134:7750/connectors
Content-Type: application/json
{
"name": "sink-sys-user-mysql",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "debezium-server-test.spring-debezium-demo.sys_user",
"connection.url": "jdbc:mysql://192.168.204.134:3306/debezium-sink-db?useSSL=false&serverTimezone=UTC",
"connection.user": "root",
"connection.password": "123456",
"insert.mode": "upsert",
"auto.create": "true",
"auto.evolve": "true",
"pk.fields": "ID",
"pk.mode": "record_key",
"table.name.format": "sys_user",
"fields.whitelist": "ID,NAME,USERNAME,PASSWORD,GENDER,REMARK,STATUS,CREATE_TIME,CREATE_ID,UPDATE_TIME,UPDATE_ID",
"delete.enabled": "true",
"batch.size": "500",
"transforms": "convertTimestamps",
"transforms.convertTimestamps.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.convertTimestamps.field": "CREATE_TIME,UPDATE_TIME",
"transforms.convertTimestamps.target.type": "Timestamp",
"transforms.convertTimestamps.format": "yyyy-MM-dd HH:mm:ss"
}
}
#【踩坑】一定要注意数据库,表字段的大小写
复制代码
生产环境不可能一个表一个connect,可以使用topic正则来动态适配,(注意先在source connect配置好table.include.list)
#取消直接命名
#"table.name.format": "sys_user"
#监听的"topics"改为"topics.regex"
#改为下面的
"topics.regex": "debezium-server-test.spring-debezium-demo.*",
"transforms":"routeTopics",
"transforms.routeTopics.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.routeTopics.regex":"^(.*)\\.spring-debezium-demo\\.(.*)$",
"transforms.routeTopics.replacement":"$2",
"table.name.format":"${topic}"
复制代码
【踩坑】:mysql CDC 关于时间 Incorrect datetime value: ‘1733827592000’ for column ‘CREATE_TIME’ at row 1
办理办法:
方法一:(可能无效)
source添加:
"time.precision.mode": "connect"
复制代码
sink添加:
"transforms":"TimestampConverter"
"transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value"
"transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss"
"transforms.TimestampConverter.target.type": "Timestamp"
"transforms.TimestampConverter.field": "time"
复制代码
方法二:(有效)
source添加:
"database.serverTimezone":"UTC",
"time.precision.mode":"connect",
"transforms":"unwrap",
"transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones":"true",
"transforms.unwrap.delete.handling.mode":"none",
"transforms.unwrap.operation.header":"false"
复制代码
sink添加:
"transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.TimestampConverter.target.type": "Timestamp",
"transforms.TimestampConverter.field": "create_time,his_create_time,his_update_time"
复制代码
4.运行结果
5.总结
更多资料查阅官方网站:https://debezium.io/documentation/reference/3.0/connectors/jdbc.html
国内的文档较少,CDC过程中发现,就算配置了auto.create是无效的。并不会在sink connect中创建同步的表。特殊要注意,source端和sink的一些数据类型,比如datetime,bigdecimal,json等对应debezium的数据类型。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/)
Powered by Discuz! X3.4