Flink CDC同步MySQL数据至Doris
1、情况准备
- Flink vesion: 1.17.2
- Doris version: 2.1.7
- jars:
- flink-sql-connector-mysql-cdc-3.0.1.jar
- mysql-connector-java-8.0.27.jar
- flink-doris-connector-1.17-24.0.1.jar
复制代码 同步时需要在 $FLINK_HOME/lib 目次下添加对应的 Flink CDC 依赖,如上述。
下载链接: maven 仓库
- # 启动flink yarn session
- yarn-session -d
复制代码 2、编写CDC同步脚本
- cd $FLINK_HOME
- vim bin/mysql2doris.sh
- bin/flink run \
- -Dexecution.checkpointing.interval=10s \
- -Dparallelism.default=4 \
- -Denv.java.opts="-Dfile.encoding=UTF-8" \
- #解决中文乱码问题
- -c org.apache.doris.flink.tools.cdc.CdcTools \
- lib/flink-doris-connector-1.17-24.0.1.jar \
- mysql-sync-database \
- --database doris_database_name\
- --mysql-conf hostname=xxx \
- --mysql-conf port=3306 \
- --mysql-conf username=root\
- --mysql-conf password=123456 \
- --mysql-conf database-name=mysql_database_name \
- --mysql-conf scan.incremental.snapshot.chunk.key-column=database.table1:column1,database.table2:column1 \
- --including-tables "[a-z].*" \ #需要同步的 MySQL 表,可以使用 \| 分隔多个表,并支持正则表达式。比如--including-tables table1
- --sink-conf fenodes=xxx \
- --sink-conf username=root \
- --sink-conf password=123456 \
- --sink-conf jdbc-url=jdbc:mysql://xxx:9030 \
- --sink-conf sink.label-prefix=label_1 \
- --table-conf replication_num=3
- # 启动脚本
- sh bin/mysql2doris.sh
复制代码 3、验证
查看checkpoint情况
checkpoint完成,数据同步成功,去doris查看数据即可。
4、踩坑记载
cdc任务启动失败
1、 java.lang.NoSuchFieldError: SCAN_INCREMENTAL_CLOSE_IDLE _READER_ENABLED
错误原因:jar包使用错误,使用的是 flink-connector-mysql-cdc-${version}.jar
解决方案:必须使用 flink-sql-connector-mysql- ${version}.jar
2、Caused by: java.lang.ClassNotFoundException: com.ververica.cdc.connectors.mysql.source.MySqlsource
错误原因:由于之前为相识决第一个错误,下了很多个 flink-sql-connector-mysql- ${version}.jar包到lib目次下,逐个反编译各个jar包后发现 MySqlsource 是存在的。大概lib目次下存在多个jar包,存在冲突
解决方案:将多余的cdc jar包移出lib目次,并重启flink集群
3、写入字符范例数据时凌驾varchar长度限制,Doris写入失败
错误原因:查看flink-doris-connector后发现,connector自动建表时,默认将源表中的varchar范例长度*3,所以不存在超出长度限制问题。
又发现写入数据是中文,doris显示是乱码,大概存在中文乱码问题。
解决方案:
1、修改flink设置文件 flink-conf.yml , 添加以下内容
- env.java.opts: -Dfile.encoding=UTF-8
复制代码 重启flink yarn session
2、在cdc脚本中添加
- -Denv.java.opts="-Dfile.encoding=UTF-8" \
复制代码 两种方案选一即可
4、scan.incremental.snapshot.chunk.key-column’ must be set when the table doesn’t have primary keys
错误原因:整库同步时,遇到部门了无主键的表
解决方案:
1、忽略这些表,不同步
脚本里添加
- --excluding-tables "table1|table2" # 不同表之间用 | 分隔
复制代码 2、设置 scan.incremental.snapshot.chunk.key-column,且只能选择非空范例的一个字段。
脚本里添加
- --mysql-conf scan.incremental.snapshot.chunk.key-column=database.table1:column1,database.table2:column1
- # 不同的库表列之间用 , 隔开。
复制代码 免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |