【大数据】DolphinScheduler将上游Task执行效果转达给下游Task ...

打印 上一主题 下一主题

主题 868|帖子 868|积分 2604

配景

公司的数据开发平台需要用到DolphinScheduler做任务调理,其中一个场景是:上游任务执行结束后,需要将任务执行效果转达给下游任务。
DolphinScheduler肯定是能实现任务之间的传参的,详细的可以看:DolphinScheduler | 文档中心 (apache.org)。
但是官方案例中介绍的任务之间传参是提前在管理台上设置好的,OK,那么题目来了,怎样实现任务之间的动态传参呢?比如说我们自界说Task,然后在Task执行结束后将执行效果封装,转达给DAG中的下一个Task。
分析

假如DolphinScheduler官方的案例没有演示怎样动态传,我们开发者应该怎样行止理这种需求?
我是这么做的:分析DolphinScheduler内置的Task,总有一个Task是需要转达参数给下游的。我这里盲猜两个,一个是SqlTask,一个是HttpTask。我的观点是:总不能做完SQL查询,或者做完HTTP请求后就不管效果吧?
分析HttpTask源码

分析HttpTask源码,直接找到HttpTask的handle方法,DolphinScheduler中,任何Task的详细执行逻辑都在这个handle方法中。
handle方法分析
  1. @Override
  2. public void handle(TaskCallBack taskCallBack) throws TaskException {
  3.     long startTime = System.currentTimeMillis();
  4.     String formatTimeStamp = DateUtils.formatTimeStamp(startTime);
  5.     String statusCode = null;
  6.     String body = null;
  7.     try (
  8.             CloseableHttpClient client = createHttpClient();
  9.             CloseableHttpResponse response = sendRequest(client)) {
  10.         statusCode = String.valueOf(getStatusCode(response));
  11.         body = getResponseBody(response);
  12.         exitStatusCode = validResponse(body, statusCode);
  13.         // 看名字应该就能猜到是处理请求结果的
  14.         addDefaultOutput(body);
  15.         long costTime = System.currentTimeMillis() - startTime;
  16.         log.info(
  17.                 "startTime: {}, httpUrl: {}, httpMethod: {}, costTime : {} milliseconds, statusCode : {}, body : {}, log : {}",
  18.                 formatTimeStamp, httpParameters.getUrl(),
  19.                 httpParameters.getHttpMethod(), costTime, statusCode, body, output);
  20.     } catch (Exception e) {
  21.         appendMessage(e.toString());
  22.         exitStatusCode = -1;
  23.         log.error("httpUrl[" + httpParameters.getUrl() + "] connection failed:" + output, e);
  24.         throw new TaskException("Execute http task failed", e);
  25.     }
  26. }
复制代码
继承看addDefaultOutput方法
  1. public void addDefaultOutput(String response) {
  2.     // put response in output
  3.     // 创建Property对象
  4.     Property outputProperty = new Property();
  5.     // 设置Prop,也就是设置Key
  6.     outputProperty.setProp(String.format("%s.%s", taskExecutionContext.getTaskName(), "response"));
  7.     // 设置是入参还是出参,这里是出参,因为是将结果给下游任务
  8.     outputProperty.setDirect(Direct.OUT);
  9.     // 设置参数类型,VARCHAR表示就是字符串
  10.     outputProperty.setType(DataType.VARCHAR);
  11.     // 设置Value,就是http请求结果
  12.     outputProperty.setValue(response);
  13.     // 重点:将Property添加到varPool中
  14.     httpParameters.addPropertyToValPool(outputProperty);
  15. }
复制代码
分析SqlTask源码

handler方法分析
  1. @Override
  2. public void handle(TaskCallBack taskCallBack) throws TaskException {
  3.     log.info("Full sql parameters: {}", sqlParameters);
  4.     log.info(
  5.             "sql type : {}, datasource : {}, sql : {} , localParams : {},udfs : {},showType : {},connParams : {},varPool : {} ,query max result limit  {}",
  6.             sqlParameters.getType(),
  7.             sqlParameters.getDatasource(),
  8.             sqlParameters.getSql(),
  9.             sqlParameters.getLocalParams(),
  10.             sqlParameters.getUdfs(),
  11.             sqlParameters.getShowType(),
  12.             sqlParameters.getConnParams(),
  13.             sqlParameters.getVarPool(),
  14.             sqlParameters.getLimit());
  15.     try {
  16.         // get datasource
  17.         baseConnectionParam = (BaseConnectionParam) DataSourceUtils.buildConnectionParams(dbType,
  18.                 sqlTaskExecutionContext.getConnectionParams());
  19.         List<String> subSqls = DataSourceProcessorProvider.getDataSourceProcessor(dbType)
  20.                 .splitAndRemoveComment(sqlParameters.getSql());
  21.         // ready to execute SQL and parameter entity Map
  22.         List<SqlBinds> mainStatementSqlBinds = subSqls
  23.                 .stream()
  24.                 .map(this::getSqlAndSqlParamsMap)
  25.                 .collect(Collectors.toList());
  26.         List<SqlBinds> preStatementSqlBinds = Optional.ofNullable(sqlParameters.getPreStatements())
  27.                 .orElse(new ArrayList<>())
  28.                 .stream()
  29.                 .map(this::getSqlAndSqlParamsMap)
  30.                 .collect(Collectors.toList());
  31.         List<SqlBinds> postStatementSqlBinds = Optional.ofNullable(sqlParameters.getPostStatements())
  32.                 .orElse(new ArrayList<>())
  33.                 .stream()
  34.                 .map(this::getSqlAndSqlParamsMap)
  35.                 .collect(Collectors.toList());
  36.         List<String> createFuncs = createFuncs(sqlTaskExecutionContext.getUdfFuncParametersList());
  37.         // execute sql task
  38.         // 这个方法就是处理sql结果的
  39.         executeFuncAndSql(mainStatementSqlBinds, preStatementSqlBinds, postStatementSqlBinds, createFuncs);
  40.         setExitStatusCode(TaskConstants.EXIT_CODE_SUCCESS);
  41.     } catch (Exception e) {
  42.         setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
  43.         log.error("sql task error", e);
  44.         throw new TaskException("Execute sql task failed", e);
  45.     }
  46. }
复制代码
所以我们在看下executeFuncAndSql方法内部实现
  1. public void executeFuncAndSql(List<SqlBinds> mainStatementsBinds,
  2.                               List<SqlBinds> preStatementsBinds,
  3.                               List<SqlBinds> postStatementsBinds,
  4.                               List<String> createFuncs) throws Exception {
  5.     try (
  6.             Connection connection =
  7.                     DataSourceClientProvider.getAdHocConnection(DbType.valueOf(sqlParameters.getType()),
  8.                             baseConnectionParam)) {
  9.         // create temp function
  10.         if (CollectionUtils.isNotEmpty(createFuncs)) {
  11.             createTempFunction(connection, createFuncs);
  12.         }
  13.         // pre execute
  14.         executeUpdate(connection, preStatementsBinds, "pre");
  15.         // main execute
  16.         String result = null;
  17.         // decide whether to executeQuery or executeUpdate based on sqlType
  18.         if (sqlParameters.getSqlType() == SqlType.QUERY.ordinal()) {
  19.             // query statements need to be convert to JsonArray and inserted into Alert to send
  20.             result = executeQuery(connection, mainStatementsBinds.get(0), "main");
  21.         } else if (sqlParameters.getSqlType() == SqlType.NON_QUERY.ordinal()) {
  22.             // non query statement
  23.             String updateResult = executeUpdate(connection, mainStatementsBinds, "main");
  24.             result = setNonQuerySqlReturn(updateResult, sqlParameters.getLocalParams());
  25.         }
  26.         // deal out params
  27.         // 这个方法就是来处理结果的
  28.         sqlParameters.dealOutParam(result);
  29.         // post execute
  30.         executeUpdate(connection, postStatementsBinds, "post");
  31.     } catch (Exception e) {
  32.         log.error("execute sql error: {}", e.getMessage());
  33.         throw e;
  34.     }
  35. }
复制代码
通过dealOutParam看详细处理细节
  1. public void dealOutParam(String result) {
  2.     if (CollectionUtils.isEmpty(localParams)) {
  3.         return;
  4.     }
  5.     List<Property> outProperty = getOutProperty(localParams);
  6.     if (CollectionUtils.isEmpty(outProperty)) {
  7.         return;
  8.     }
  9.     if (StringUtils.isEmpty(result)) {
  10.         varPool = VarPoolUtils.mergeVarPool(Lists.newArrayList(varPool, outProperty));
  11.         return;
  12.     }
  13.     List<Map<String, String>> sqlResult = getListMapByString(result);
  14.     if (CollectionUtils.isEmpty(sqlResult)) {
  15.         return;
  16.     }
  17.     // if sql return more than one line
  18.     if (sqlResult.size() > 1) {
  19.         Map<String, List<String>> sqlResultFormat = new HashMap<>();
  20.         // init sqlResultFormat
  21.         Set<String> keySet = sqlResult.get(0).keySet();
  22.         for (String key : keySet) {
  23.             sqlResultFormat.put(key, new ArrayList<>());
  24.         }
  25.         for (Map<String, String> info : sqlResult) {
  26.             for (String key : info.keySet()) {
  27.                 sqlResultFormat.get(key).add(String.valueOf(info.get(key)));
  28.             }
  29.         }
  30.         for (Property info : outProperty) {
  31.             if (info.getType() == DataType.LIST) {
  32.                 info.setValue(JSONUtils.toJsonString(sqlResultFormat.get(info.getProp())));
  33.             }
  34.         }
  35.     } else {
  36.         // result only one line
  37.         Map<String, String> firstRow = sqlResult.get(0);
  38.         for (Property info : outProperty) {
  39.             info.setValue(String.valueOf(firstRow.get(info.getProp())));
  40.         }
  41.     }
  42.    
  43.     // 本质还是将sql结果处理后保存在varPool中,varPool才是关键所在
  44.     varPool = VarPoolUtils.mergeVarPool(Lists.newArrayList(varPool, outProperty));
  45. }
复制代码
所以,源代码分析到这,我们就知道了:假如想实现动态传参,那么我们需要将转达的数据封装成org.apache.dolphinscheduler.plugin.task.api.model.Property,然后添加到内置聚集变量org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters#varPool中
详细实现

这里我们不去讨论自界说Task的详细实现步骤,这不是本文的重点。
当我们实现自界说Task后,可以这样编码实现动态传参:
  1. Property outputProperty = new Property();
  2. // 添加我们要传递的数据Key
  3. outputProperty.setProp("xxxxKey"));
  4. // OUT
  5. outputProperty.setDirect(Direct.OUT);
  6. // 这里传递的数据是什么类型就写什么类型,建议通过json字符串处理数据
  7. outputProperty.setType(DataType.VARCHAR);
  8. // 添加我们要传递的数据Key
  9. outputProperty.setValue("xxxxValue");
  10. // 这里的xxxxParameters是我们自己自定义的,一般情况下,一个Task对应一个Parameters
  11. xxxxParameters.addPropertyToValPool(outputProperty);
复制代码
DolphinScheduler内部有将List<roperty> varPool转换成Map<String, Property> varParams的逻辑,然后会将varParams与其他的参数归并,最后通过taskExecutionContext.setPrepareParamsMap(propertyMap) 将数据设置给Map<String, Property> prepareParamsMap。
总结

关于DolphinScheduler(海豚调理器)是什么,能做什么,怎么使用等等,这里我就不再赘述,大家感爱好的可以去看看官方文档:DolphinScheduler | 文档中心 (apache.org)
希望通过本篇文章能让各位读者掌握Task之间的动态传参,然后应用在实际工作中。假如本篇文章能给屏幕前的你们或多或少的一些资助,也是我喜闻乐见的。
假如能帮我点个免费的关注,那就是对我个人的最大的肯定。假如觉得写的还行,分享一下也是我生活的小确幸~
欢迎关注我的公众号

Peace Guys,我们下篇文章再见。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

我可以不吃啊

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表