ToB企服应用市场:ToB评测及商务社交产业平台

标题: Flink CDC同步MySQL数据至Doris及遇到的问题踩坑 [打印本页]

作者: 愛在花開的季節    时间: 2024-12-30 00:13
标题: Flink CDC同步MySQL数据至Doris及遇到的问题踩坑
Flink CDC同步MySQL数据至Doris

1、情况准备

  1. Flink vesion: 1.17.2
  2. Doris version: 2.1.7
  3. jars:
  4.         flink-sql-connector-mysql-cdc-3.0.1.jar
  5.         mysql-connector-java-8.0.27.jar
  6.         flink-doris-connector-1.17-24.0.1.jar
复制代码
同步时需要在 $FLINK_HOME/lib 目次下添加对应的 Flink CDC 依赖,如上述。
下载链接: maven 仓库
  1. # 启动flink yarn session
  2. yarn-session -d
复制代码
2、编写CDC同步脚本

  1. cd $FLINK_HOME
  2. vim bin/mysql2doris.sh
  3. bin/flink run \
  4.     -Dexecution.checkpointing.interval=10s \
  5.     -Dparallelism.default=4 \
  6.     -Denv.java.opts="-Dfile.encoding=UTF-8" \
  7. #解决中文乱码问题
  8.     -c org.apache.doris.flink.tools.cdc.CdcTools \
  9.     lib/flink-doris-connector-1.17-24.0.1.jar \
  10.     mysql-sync-database \
  11.     --database doris_database_name\
  12.     --mysql-conf hostname=xxx \
  13.     --mysql-conf port=3306 \
  14.     --mysql-conf username=root\
  15.     --mysql-conf password=123456 \
  16.     --mysql-conf database-name=mysql_database_name \
  17.     --mysql-conf scan.incremental.snapshot.chunk.key-column=database.table1:column1,database.table2:column1 \
  18.     --including-tables "[a-z].*" \ #需要同步的 MySQL 表,可以使用 \| 分隔多个表,并支持正则表达式。比如--including-tables table1
  19.     --sink-conf fenodes=xxx \
  20.     --sink-conf username=root \
  21.     --sink-conf password=123456 \
  22.     --sink-conf jdbc-url=jdbc:mysql://xxx:9030 \
  23.     --sink-conf sink.label-prefix=label_1 \
  24.     --table-conf replication_num=3
  25. # 启动脚本
  26. sh bin/mysql2doris.sh
复制代码
3、验证


查看checkpoint情况

checkpoint完成,数据同步成功,去doris查看数据即可。
4、踩坑记载

cdc任务启动失败

1、 java.lang.NoSuchFieldError: SCAN_INCREMENTAL_CLOSE_IDLE _READER_ENABLED


错误原因:jar包使用错误,使用的是 flink-connector-mysql-cdc-${version}.jar
解决方案:必须使用 flink-sql-connector-mysql- ${version}.jar
2、Caused by: java.lang.ClassNotFoundException: com.ververica.cdc.connectors.mysql.source.MySqlsource



错误原因:由于之前为相识决第一个错误,下了很多个 flink-sql-connector-mysql- ${version}.jar包到lib目次下,逐个反编译各个jar包后发现 MySqlsource 是存在的。大概lib目次下存在多个jar包,存在冲突

解决方案:将多余的cdc jar包移出lib目次,并重启flink集群
3、写入字符范例数据时凌驾varchar长度限制,Doris写入失败



错误原因:查看flink-doris-connector后发现,connector自动建表时,默认将源表中的varchar范例长度*3,所以不存在超出长度限制问题。
又发现写入数据是中文,doris显示是乱码,大概存在中文乱码问题。

解决方案:
1、修改flink设置文件 flink-conf.yml , 添加以下内容
  1. env.java.opts: -Dfile.encoding=UTF-8
复制代码
重启flink yarn session
2、在cdc脚本中添加
  1. -Denv.java.opts="-Dfile.encoding=UTF-8" \
复制代码
两种方案选一即可
4、scan.incremental.snapshot.chunk.key-column’ must be set when the table doesn’t have primary keys

错误原因:整库同步时,遇到部门了无主键的表
解决方案:
1、忽略这些表,不同步
脚本里添加
  1. --excluding-tables "table1|table2" # 不同表之间用 | 分隔
复制代码
2、设置 scan.incremental.snapshot.chunk.key-column,且只能选择非空范例的一个字段。
脚本里添加
  1. --mysql-conf scan.incremental.snapshot.chunk.key-column=database.table1:column1,database.table2:column1
  2. # 不同的库表列之间用 , 隔开。
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4