Flink CDC同步MySQL数据至Doris及遇到的问题踩坑
Flink CDC同步MySQL数据至Doris1、情况准备
Flink vesion: 1.17.2
Doris version: 2.1.7
jars:
flink-sql-connector-mysql-cdc-3.0.1.jar
mysql-connector-java-8.0.27.jar
flink-doris-connector-1.17-24.0.1.jar
同步时需要在 $FLINK_HOME/lib 目次下添加对应的 Flink CDC 依赖,如上述。
下载链接: maven 仓库
# 启动flink yarn session
yarn-session -d
2、编写CDC同步脚本
cd $FLINK_HOME
vim bin/mysql2doris.sh
bin/flink run \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=4 \
-Denv.java.opts="-Dfile.encoding=UTF-8" \
#解决中文乱码问题
-c org.apache.doris.flink.tools.cdc.CdcTools \
lib/flink-doris-connector-1.17-24.0.1.jar \
mysql-sync-database \
--database doris_database_name\
--mysql-conf hostname=xxx \
--mysql-conf port=3306 \
--mysql-conf username=root\
--mysql-conf password=123456 \
--mysql-conf database-name=mysql_database_name \
--mysql-conf scan.incremental.snapshot.chunk.key-column=database.table1:column1,database.table2:column1 \
--including-tables ".*" \ #需要同步的 MySQL 表,可以使用 \| 分隔多个表,并支持正则表达式。比如--including-tables table1
--sink-conf fenodes=xxx \
--sink-conf username=root \
--sink-conf password=123456 \
--sink-conf jdbc-url=jdbc:mysql://xxx:9030 \
--sink-conf sink.label-prefix=label_1 \
--table-conf replication_num=3
# 启动脚本
sh bin/mysql2doris.sh
3、验证
https://i-blog.csdnimg.cn/direct/468b1f6d405640da8fa1c7bdaa86b046.png
查看checkpoint情况
https://i-blog.csdnimg.cn/direct/788ffa8f782d4ac5afcbefb5b828da4a.png
checkpoint完成,数据同步成功,去doris查看数据即可。
4、踩坑记载
cdc任务启动失败
1、 java.lang.NoSuchFieldError: SCAN_INCREMENTAL_CLOSE_IDLE _READER_ENABLEDhttps://i-blog.csdnimg.cn/direct/f2a8ef31653c4287ac847a82c4abfe22.png
错误原因:jar包使用错误,使用的是 flink-connector-mysql-cdc-${version}.jar
解决方案:必须使用 flink-sql-connector-mysql- ${version}.jar
2、Caused by: java.lang.ClassNotFoundException: com.ververica.cdc.connectors.mysql.source.MySqlsource
https://i-blog.csdnimg.cn/direct/3626f6b6f97a4575bd8afb7886a5feee.png
https://i-blog.csdnimg.cn/direct/d86bcda79e354b6e99b23be7870a3249.png
错误原因:由于之前为相识决第一个错误,下了很多个 flink-sql-connector-mysql- ${version}.jar包到lib目次下,逐个反编译各个jar包后发现 MySqlsource 是存在的。大概lib目次下存在多个jar包,存在冲突
https://i-blog.csdnimg.cn/direct/e39ee543ee4c4792bff5193ccabb1caf.png
解决方案:将多余的cdc jar包移出lib目次,并重启flink集群
3、写入字符范例数据时凌驾varchar长度限制,Doris写入失败
https://i-blog.csdnimg.cn/direct/470c566947eb4f548ee1e41101c31f8f.png
https://i-blog.csdnimg.cn/direct/0cecc0305e1e4214ba8854e15ea2435a.png
错误原因:查看flink-doris-connector后发现,connector自动建表时,默认将源表中的varchar范例长度*3,所以不存在超出长度限制问题。
又发现写入数据是中文,doris显示是乱码,大概存在中文乱码问题。
https://i-blog.csdnimg.cn/direct/39b67a5a18be4b429c55829bba5336d3.jpeg
解决方案:
1、修改flink设置文件 flink-conf.yml , 添加以下内容
env.java.opts: -Dfile.encoding=UTF-8
重启flink yarn session
2、在cdc脚本中添加
-Denv.java.opts="-Dfile.encoding=UTF-8" \
两种方案选一即可
4、scan.incremental.snapshot.chunk.key-column’ must be set when the table doesn’t have primary keys
错误原因:整库同步时,遇到部门了无主键的表
解决方案:
1、忽略这些表,不同步
脚本里添加
--excluding-tables "table1|table2" # 不同表之间用 | 分隔
2、设置 scan.incremental.snapshot.chunk.key-column,且只能选择非空范例的一个字段。
脚本里添加
--mysql-conf scan.incremental.snapshot.chunk.key-column=database.table1:column1,database.table2:column1
# 不同的库表列之间用 , 隔开。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页:
[1]