配景
公司的数据开发平台需要用到DolphinScheduler做任务调理,其中一个场景是:上游任务执行结束后,需要将任务执行效果转达给下游任务。
DolphinScheduler肯定是能实现任务之间的传参的,详细的可以看:DolphinScheduler | 文档中心 (apache.org)。
但是官方案例中介绍的任务之间传参是提前在管理台上设置好的,OK,那么题目来了,怎样实现任务之间的动态传参呢?比如说我们自界说Task,然后在Task执行结束后将执行效果封装,转达给DAG中的下一个Task。
分析
假如DolphinScheduler官方的案例没有演示怎样动态传,我们开发者应该怎样行止理这种需求?
我是这么做的:分析DolphinScheduler内置的Task,总有一个Task是需要转达参数给下游的。我这里盲猜两个,一个是SqlTask,一个是HttpTask。我的观点是:总不能做完SQL查询,或者做完HTTP请求后就不管效果吧?
分析HttpTask源码
分析HttpTask源码,直接找到HttpTask的handle方法,DolphinScheduler中,任何Task的详细执行逻辑都在这个handle方法中。
handle方法分析
- @Override
- public void handle(TaskCallBack taskCallBack) throws TaskException {
- long startTime = System.currentTimeMillis();
- String formatTimeStamp = DateUtils.formatTimeStamp(startTime);
- String statusCode = null;
- String body = null;
- try (
- CloseableHttpClient client = createHttpClient();
- CloseableHttpResponse response = sendRequest(client)) {
- statusCode = String.valueOf(getStatusCode(response));
- body = getResponseBody(response);
- exitStatusCode = validResponse(body, statusCode);
- // 看名字应该就能猜到是处理请求结果的
- addDefaultOutput(body);
- long costTime = System.currentTimeMillis() - startTime;
- log.info(
- "startTime: {}, httpUrl: {}, httpMethod: {}, costTime : {} milliseconds, statusCode : {}, body : {}, log : {}",
- formatTimeStamp, httpParameters.getUrl(),
- httpParameters.getHttpMethod(), costTime, statusCode, body, output);
- } catch (Exception e) {
- appendMessage(e.toString());
- exitStatusCode = -1;
- log.error("httpUrl[" + httpParameters.getUrl() + "] connection failed:" + output, e);
- throw new TaskException("Execute http task failed", e);
- }
- }
复制代码 继承看addDefaultOutput方法
- public void addDefaultOutput(String response) {
- // put response in output
- // 创建Property对象
- Property outputProperty = new Property();
- // 设置Prop,也就是设置Key
- outputProperty.setProp(String.format("%s.%s", taskExecutionContext.getTaskName(), "response"));
- // 设置是入参还是出参,这里是出参,因为是将结果给下游任务
- outputProperty.setDirect(Direct.OUT);
- // 设置参数类型,VARCHAR表示就是字符串
- outputProperty.setType(DataType.VARCHAR);
- // 设置Value,就是http请求结果
- outputProperty.setValue(response);
- // 重点:将Property添加到varPool中
- httpParameters.addPropertyToValPool(outputProperty);
- }
复制代码 分析SqlTask源码
handler方法分析
- @Override
- public void handle(TaskCallBack taskCallBack) throws TaskException {
- log.info("Full sql parameters: {}", sqlParameters);
- log.info(
- "sql type : {}, datasource : {}, sql : {} , localParams : {},udfs : {},showType : {},connParams : {},varPool : {} ,query max result limit {}",
- sqlParameters.getType(),
- sqlParameters.getDatasource(),
- sqlParameters.getSql(),
- sqlParameters.getLocalParams(),
- sqlParameters.getUdfs(),
- sqlParameters.getShowType(),
- sqlParameters.getConnParams(),
- sqlParameters.getVarPool(),
- sqlParameters.getLimit());
- try {
- // get datasource
- baseConnectionParam = (BaseConnectionParam) DataSourceUtils.buildConnectionParams(dbType,
- sqlTaskExecutionContext.getConnectionParams());
- List<String> subSqls = DataSourceProcessorProvider.getDataSourceProcessor(dbType)
- .splitAndRemoveComment(sqlParameters.getSql());
- // ready to execute SQL and parameter entity Map
- List<SqlBinds> mainStatementSqlBinds = subSqls
- .stream()
- .map(this::getSqlAndSqlParamsMap)
- .collect(Collectors.toList());
- List<SqlBinds> preStatementSqlBinds = Optional.ofNullable(sqlParameters.getPreStatements())
- .orElse(new ArrayList<>())
- .stream()
- .map(this::getSqlAndSqlParamsMap)
- .collect(Collectors.toList());
- List<SqlBinds> postStatementSqlBinds = Optional.ofNullable(sqlParameters.getPostStatements())
- .orElse(new ArrayList<>())
- .stream()
- .map(this::getSqlAndSqlParamsMap)
- .collect(Collectors.toList());
- List<String> createFuncs = createFuncs(sqlTaskExecutionContext.getUdfFuncParametersList());
- // execute sql task
- // 这个方法就是处理sql结果的
- executeFuncAndSql(mainStatementSqlBinds, preStatementSqlBinds, postStatementSqlBinds, createFuncs);
- setExitStatusCode(TaskConstants.EXIT_CODE_SUCCESS);
- } catch (Exception e) {
- setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
- log.error("sql task error", e);
- throw new TaskException("Execute sql task failed", e);
- }
- }
复制代码 所以我们在看下executeFuncAndSql方法内部实现
- public void executeFuncAndSql(List<SqlBinds> mainStatementsBinds,
- List<SqlBinds> preStatementsBinds,
- List<SqlBinds> postStatementsBinds,
- List<String> createFuncs) throws Exception {
- try (
- Connection connection =
- DataSourceClientProvider.getAdHocConnection(DbType.valueOf(sqlParameters.getType()),
- baseConnectionParam)) {
- // create temp function
- if (CollectionUtils.isNotEmpty(createFuncs)) {
- createTempFunction(connection, createFuncs);
- }
- // pre execute
- executeUpdate(connection, preStatementsBinds, "pre");
- // main execute
- String result = null;
- // decide whether to executeQuery or executeUpdate based on sqlType
- if (sqlParameters.getSqlType() == SqlType.QUERY.ordinal()) {
- // query statements need to be convert to JsonArray and inserted into Alert to send
- result = executeQuery(connection, mainStatementsBinds.get(0), "main");
- } else if (sqlParameters.getSqlType() == SqlType.NON_QUERY.ordinal()) {
- // non query statement
- String updateResult = executeUpdate(connection, mainStatementsBinds, "main");
- result = setNonQuerySqlReturn(updateResult, sqlParameters.getLocalParams());
- }
- // deal out params
- // 这个方法就是来处理结果的
- sqlParameters.dealOutParam(result);
- // post execute
- executeUpdate(connection, postStatementsBinds, "post");
- } catch (Exception e) {
- log.error("execute sql error: {}", e.getMessage());
- throw e;
- }
- }
复制代码 通过dealOutParam看详细处理细节
- public void dealOutParam(String result) {
- if (CollectionUtils.isEmpty(localParams)) {
- return;
- }
- List<Property> outProperty = getOutProperty(localParams);
- if (CollectionUtils.isEmpty(outProperty)) {
- return;
- }
- if (StringUtils.isEmpty(result)) {
- varPool = VarPoolUtils.mergeVarPool(Lists.newArrayList(varPool, outProperty));
- return;
- }
- List<Map<String, String>> sqlResult = getListMapByString(result);
- if (CollectionUtils.isEmpty(sqlResult)) {
- return;
- }
- // if sql return more than one line
- if (sqlResult.size() > 1) {
- Map<String, List<String>> sqlResultFormat = new HashMap<>();
- // init sqlResultFormat
- Set<String> keySet = sqlResult.get(0).keySet();
- for (String key : keySet) {
- sqlResultFormat.put(key, new ArrayList<>());
- }
- for (Map<String, String> info : sqlResult) {
- for (String key : info.keySet()) {
- sqlResultFormat.get(key).add(String.valueOf(info.get(key)));
- }
- }
- for (Property info : outProperty) {
- if (info.getType() == DataType.LIST) {
- info.setValue(JSONUtils.toJsonString(sqlResultFormat.get(info.getProp())));
- }
- }
- } else {
- // result only one line
- Map<String, String> firstRow = sqlResult.get(0);
- for (Property info : outProperty) {
- info.setValue(String.valueOf(firstRow.get(info.getProp())));
- }
- }
-
- // 本质还是将sql结果处理后保存在varPool中,varPool才是关键所在
- varPool = VarPoolUtils.mergeVarPool(Lists.newArrayList(varPool, outProperty));
- }
复制代码 所以,源代码分析到这,我们就知道了:假如想实现动态传参,那么我们需要将转达的数据封装成org.apache.dolphinscheduler.plugin.task.api.model.Property,然后添加到内置聚集变量org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters#varPool中
详细实现
这里我们不去讨论自界说Task的详细实现步骤,这不是本文的重点。
当我们实现自界说Task后,可以这样编码实现动态传参:
- Property outputProperty = new Property();
- // 添加我们要传递的数据Key
- outputProperty.setProp("xxxxKey"));
- // OUT
- outputProperty.setDirect(Direct.OUT);
- // 这里传递的数据是什么类型就写什么类型,建议通过json字符串处理数据
- outputProperty.setType(DataType.VARCHAR);
- // 添加我们要传递的数据Key
- outputProperty.setValue("xxxxValue");
- // 这里的xxxxParameters是我们自己自定义的,一般情况下,一个Task对应一个Parameters
- xxxxParameters.addPropertyToValPool(outputProperty);
复制代码 DolphinScheduler内部有将List<roperty> varPool转换成Map<String, Property> varParams的逻辑,然后会将varParams与其他的参数归并,最后通过taskExecutionContext.setPrepareParamsMap(propertyMap) 将数据设置给Map<String, Property> prepareParamsMap。
总结
关于DolphinScheduler(海豚调理器)是什么,能做什么,怎么使用等等,这里我就不再赘述,大家感爱好的可以去看看官方文档:DolphinScheduler | 文档中心 (apache.org)
希望通过本篇文章能让各位读者掌握Task之间的动态传参,然后应用在实际工作中。假如本篇文章能给屏幕前的你们或多或少的一些资助,也是我喜闻乐见的。
假如能帮我点个免费的关注,那就是对我个人的最大的肯定。假如觉得写的还行,分享一下也是我生活的小确幸~
欢迎关注我的公众号
Peace Guys,我们下篇文章再见。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |