基于chunjun纯钧的增量数据同步问题排查【博客园-实习小生】 ...

打印 上一主题 下一主题

主题 675|帖子 675|积分 2025

基于chunjun纯钧的增量数据同步

目前我司的大数据平台使用的是flink技术栈,底层的连接器插件使用的是国产的chunjun插件,在使用chunjun的过程中也遇到了很多问题,本次记录下在SQL模式的情况下怎么支持增量的数据同步
chunjun的官网文档对增量同步已经做出了一定的说明

纯钧官方
根据文档我编写了一个SQL脚本
  1. create table `source` (
  2.         `sfzh` STRING COMMENT '',
  3.         `xm` STRING COMMENT '',
  4.         `xb` STRING COMMENT '',
  5.         `xbdm` STRING COMMENT '',
  6.         `jzdz` STRING COMMENT '',
  7.         `fzrq` DATE COMMENT '',
  8.         `dsc_biz_record_id` STRING COMMENT ''
  9. ) with (
  10.         'connector' = 'mysql-x',
  11.         'url' = 'jdbc:mysql://:/?useSSL=false&useInformationSchema=true&nullCatalogMeansCurrent=true',
  12.         'table-name' = '',
  13.         'username' = '',
  14.         'password' = '',
  15.         'scan.fetch-size' = '1024',
  16.         'scan.increment.column' = 'fzrq',
  17.         --'scan.increment.column-type' = 'date',
  18.         'scan.start-location' = '1659974400000'
  19. );
  20. create table `sink` (
  21.         `sfzh` STRING COMMENT '',
  22.         `xm` STRING COMMENT '',
  23.         `xb` STRING COMMENT '',
  24.         `xbdm` STRING COMMENT '',
  25.         `jzdz` STRING COMMENT '',
  26.         `fzrq` DATE COMMENT '',
  27.         `dsc_biz_record_id` STRING COMMENT '',
  28.         PRIMARY KEY (`dsc_biz_record_id`) NOT ENFORCED
  29. ) with (
  30.         'connector' = 'stream-x'
  31. );
复制代码
然后提交任务的时候发现已经记录了start-locationstart-location的指标信息了,但是并没有上报到Prometheus!
在本地调试源码解决问题的大致过程

在类 com.dtstack.chunjun.source.format.BaseRichInputFormat中有一个成员变量
  1. /** 自定义的prometheus reporter,用于提交startLocation和endLocation指标 */
  2. protected transient CustomReporter customReporter;
复制代码
该变量是用来提交增量信息的对象,flink任务在开始的时候会执行一下方法
  1.     @Override
  2.     public void openInputFormat() throws IOException {
  3.         Map<String, String> vars = getRuntimeContext().getMetricGroup().getAllVariables();
  4.         if (vars != null) {
  5.             jobName = vars.getOrDefault(Metrics.JOB_NAME, "defaultJobName");
  6.             jobId = vars.get(Metrics.JOB_NAME);
  7.             indexOfSubTask = Integer.parseInt(vars.get(Metrics.SUBTASK_INDEX));
  8.         }
  9.         LOG.info("是否使用自定义报告 {}", useCustomReporter());
  10.         if (useCustomReporter()) {
  11.             customReporter =
  12.                     DataSyncFactoryUtil.discoverMetric(
  13.                             config, getRuntimeContext(), makeTaskFailedWhenReportFailed());
  14.             customReporter.open();
  15.             LOG.info("customReporter 的hashcode is {}", customReporter.hashCode());
  16.         }
  17.         startTime = System.currentTimeMillis();
  18.     }
复制代码
通过排查useCustomReporter方法得知 jdbcConf.getInitReporter()是false,而在JdbcConfig类里面这个对象默认是true
  1. /** 使用自定义的指标输出器把增量指标打到普罗米修斯 */
  2.     @Override
  3.     protected boolean useCustomReporter() {
  4.         return jdbcConf.isIncrement() && jdbcConf.getInitReporter();
  5.     }
  6.     /** 增量同步或者间隔轮询时,是否初始化外部存储 */
  7.     protected Boolean initReporter = true;
复制代码
经过查找 initReporter 属性的set方法调用,找到了下面的问题
在类 com.dtstack.chunjun.connector.jdbc.source.JdbcDynamicTableSource 中有个地方说暂时不支持SQL的方式

尝试一下将false修改为true,然后在本地进行测试,测试的时候将pushgateway的host和port写到代码里面,执行任务发现pushgateway里面已经有数据了

那么可以开始打包了,由于改了源代码,所以要先格式化代码 mvn spotless:apply 再打包 mvn clean package -DskipTests
后续问题

打包到虚拟机进行测试,我使用的是yarn-per-job模式,提交任务后发现报找不到Prometheus报告类的异常,通过异常信息发现在前面提到的方法里有classloader
  1. public void openInputFormat() throws IOException {
  2.         Map<String, String> vars = getRuntimeContext().getMetricGroup().getAllVariables();
  3.         if (vars != null) {
  4.             jobName = vars.getOrDefault(Metrics.JOB_NAME, "defaultJobName");
  5.             jobId = vars.get(Metrics.JOB_NAME);
  6.             indexOfSubTask = Integer.parseInt(vars.get(Metrics.SUBTASK_INDEX));
  7.         }
  8.         LOG.info("是否使用自定义报告 {}", useCustomReporter());
  9.         if (useCustomReporter()) {
  10.             customReporter =
  11.                     DataSyncFactoryUtil.discoverMetric(
  12.                             config, getRuntimeContext(), makeTaskFailedWhenReportFailed());
  13.             customReporter.open();
  14.             LOG.info("customReporter 的hashcode is {}", customReporter.hashCode());
  15.         }
  16.         startTime = System.currentTimeMillis();
  17.     }
  18.     public static CustomReporter discoverMetric(
  19.             ChunJunCommonConf commonConf,
  20.             RuntimeContext context,
  21.             boolean makeTaskFailedWhenReportFailed) {
  22.         try {
  23.             String pluginName = commonConf.getMetricPluginName();
  24.             // 这里获取到了类的全限定名 com.dtstack.chunjun.metrics.prometheus.PrometheusReport
  25.             String pluginClassName = PluginUtil.getPluginClassName(pluginName, OperatorType.metric);
  26.             MetricParam metricParam =
  27.                     new MetricParam(
  28.                             context, makeTaskFailedWhenReportFailed, commonConf.getMetricProps());
  29.             ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
  30.             Class<?> clazz = classLoader.loadClass(pluginClassName);
  31.             Constructor<?> constructor = clazz.getConstructor(MetricParam.class);
  32.             return (CustomReporter) constructor.newInstance(metricParam);
  33.         } catch (Exception e) {
  34.             throw new ChunJunRuntimeException(e);
  35.         }
  36.     }
复制代码
在本地的时候这里加载类的时候是没问题的,但是在线上的时候出现了了找不到类的异常,猜测是相关的jar没有加载到flink jvm进程里面,所以将项目里面的 chunjun-metrics-prometheus.jar 放到了flink的lib目录下,再次启动任务 问题得以解决!

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

用多少眼泪才能让你相信

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表