2.阿里云flink&selectdb-jar作业

打印 上一主题 下一主题

主题 1031|帖子 1031|积分 3093

1.概述

本文继承介绍利用阿里云实时计算flink把数据从自建mysql同步到阿里云selectdb的过程。上一节利用sql作业,不够强大,有如下问题:


  • 不支持自动创建效果表(selectdb表)。同步前需要手动在selectdb创建效果表;
  • 不支持源表(mysql表)的ddl语句。源表增加/修改字段,需要先手动在效果表(selectdb表)执行,然后重启sql作业;
  • 不支持添加新的源表。添加新表源表需要重新从全量同步阶段开始运行(flink cdc作业分为全量同步和增量同步两个阶段);
  • 不支持毗连复用。sql作业内里的每个insert语句都需要一个源表(mysql表)的毗连,当同步的源表比较多时,会占用大量的数据库毗连;
本节利用jar作业,通过写代码的方式,解决sql作业存在的问题;
2.目标

把自建mysql的约100张表准实时同步到云服务selectdb。数据量不大,约5个G左右;
源表flink效果表自建mysql实时计算flink云服务selectdb 3.步调(重点)

对问题和过程没兴趣的同学,可以直接看这里。本章节记录了阿里云flink与selectdb集成时,利用jar作业的实现方式;
3.1.创建作业



  • JAR作业开发需要利用JDK 1.8版本;
  • JAR作业需要线下完成开发,然后打成jar包,上传到在Flink全托管控制台上部署并运行;
  • JAR作业不支持在Main函数中读取当地配置,读取配置文件需要可通过以下方式;

    • 部署作业所添加附加依靠文件将会加载到作业所运行Pod的/flink/usrlib目次下。配置文件以作业附加依靠文件上传,然后通过代码读取;
    • 上传到别的可访问地点,通过网络读取(注意Flink版默认不能访问公网,需要额外操作开通);

  • JAR作业依靠的别的jar包,可通过直接打进JAR作业的方式 ,也可以通过部署作业时添加附加依靠文件的方式;
  1. @Slf4j
  2. public class CdcMysqlToDorisStream {
  3.     public static void main(String[] args) throws Exception {
  4.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  5.         String database = "test";
  6.         Map<String, String> mysqlConfig = new HashMap<>();
  7.             mysqlConfig.put(MySqlSourceOptions.DATABASE_NAME.key(), "bpms");
  8.             mysqlConfig.put(MySqlSourceOptions.HOSTNAME.key(), "127.0.0.1");
  9.             mysqlConfig.put(MySqlSourceOptions.PORT.key(), "3306");
  10.             mysqlConfig.put(MySqlSourceOptions.USERNAME.key(), "test");
  11.             mysqlConfig.put(MySqlSourceOptions.PASSWORD.key(), "test");
  12.             mysqlConfig.put("jdbc.properties.use_ssl", "false");
  13.             mysqlConfig.put("sink.properties.format", "json");
  14.             //**支持在作业运行到增量同步阶段后,动态添加新的源表
  15.             mysqlConfig.put(MySqlSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED.key(), "true");
  16.         Configuration config = Configuration.fromMap(mysqlConfig);
  17.         Map<String, String> sinkConfig = new HashMap<>();
  18.             sinkConfig.put(DorisConfigOptions.FENODES.key(), "test.selectdbfe.rds.aliyuncs.com:8080");
  19.             sinkConfig.put(DorisConfigOptions.USERNAME.key(), "test");
  20.             sinkConfig.put(DorisConfigOptions.PASSWORD.key(), "test");
  21.             sinkConfig.put(DorisConfigOptions.JDBC_URL.key(), "jdbc:mysql://test.selectdbfe.rds.aliyuncs.com:9030");
  22.             sinkConfig.put(DorisConfigOptions.SINK_LABEL_PREFIX.key(), UUID.randomUUID().toString());
  23.             sinkConfig.put("sink.enable-delete", "false");
  24.         Configuration sinkConf = Configuration.fromMap(sinkConfig);
  25.         Map<String, String> tableConfig = new HashMap<>();
  26.             tableConfig.put(DatabaseSyncConfig.REPLICATION_NUM, "1");
  27.             tableConfig.put(DatabaseSyncConfig.TABLE_BUCKETS, ".*:1");
  28.         String includingTables = getTables();
  29.         String excludingTables = "";
  30.         boolean ignoreDefaultValue = false;
  31.         boolean useNewSchemaChange = true;
  32.         String schemaChangeMode = SchemaChangeMode.DEBEZIUM_STRUCTURE.getName();
  33.         boolean singleSink = false;
  34.         boolean ignoreIncompatible = false;
  35.         DatabaseSync databaseSync = new MysqlDatabaseSync();
  36.         databaseSync.setEnv(env)
  37.                 .setDatabase(database)
  38.                 .setConfig(config)
  39.                 .setIncludingTables(includingTables)
  40.                 .setExcludingTables(excludingTables)
  41.                 .setIgnoreDefaultValue(ignoreDefaultValue)
  42.                 .setSinkConfig(sinkConf)
  43.                 .setTableConfig(new DorisTableConfig(tableConfig))
  44.                 .setCreateTableOnly(false)
  45.                 .setNewSchemaChange(useNewSchemaChange)
  46.                 .setSchemaChangeMode(schemaChangeMode)
  47.                 .setSingleSink(singleSink)
  48.                 .setIgnoreIncompatible(ignoreIncompatible)
  49.                 .create();
  50.         databaseSync.build();
  51.         env.execute(String.format("mysql-doris数据库同步,database=%s", database));
  52.     }
  53.     //**读取配置文件里面,获取需要同步的表
  54.     @SneakyThrows
  55.     private static String getTables() {
  56.         String rst;
  57.         //**Flink JAR作业不支持在Main函数中读取本地配置
  58.         //**在作业运行时,部署作业所添加附加依赖文件将会加载到作业所运行Pod的/flink/usrlib目录下
  59.         try (Stream<String> stream = Files.lines(Paths.get("/flink/usrlib/mysql-to-doris-tables"))) {
  60.             rst = stream.map(String::trim).filter(StringUtils::isNotBlank).collect(joining("|"));
  61.         }
  62.         log.info("读取同步的表成功,tables={}", rst);
  63.         Assert.notBlank(rst, "同步的表不能为空");
  64.         return rst;
  65.     }
  66. }
复制代码
3.2.部署作业


4.碰到的问题

全托管flink怎么获取上传的文件;

我们利用的是阿里云全托管flink,文件都上传到了阿里云管理的oss。查询文档发现地点如下:
  1. oss://flink-fullymanaged-<工作空间ID>/artifacts/namespaces/<项目空间名称>/文件名
复制代码
如果照旧不知道是多少,可以先创建一个jar作业,然后在作业的基础信配置->JAR Uri内里查看;
运行报错java.lang.NoSuchMethodError: java.nio.ByteBuffer.flip()Ljava/nio/ByteBuffer;



  • 原因: 编译的jdk版本和运行的jdk版本不一致。jdk8和jdk11的此方法不兼容;
  • 解决: 查看打包编译的jdk版本,需利用jdk8;

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

老婆出轨

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表