怎样实现Dolphinscheduler YARN Task状态跟踪?

打印 上一主题 下一主题

主题 1612|帖子 1612|积分 4836

背景

Dolphinscheduler针对YARN使命,好比说MR、Spark、Flink,甚至是Shell使命,最初都是会判定假如有YARN使命,剖析到applicationId。这样就会不但单以判定客户端进程为单一判定依据,还要根据YARN状态举行最终的Dolphinscheduler使命状态判定。后期,社区对此举行了重构(确实是好的向往,如今已经是半成品),但是导致了一些问题,好比说针对Flink Stream Application模式,这种客户端分离模式会让客户端Shell直接退出,所以如今Dolphinscheduler里面的使命就直接乐成了。YARN上的使命还在运行呢,但Dolphinscheduler已经不能追踪到YARN上使命的状态了。
那么,想要实现对于YARN上使命的状态跟踪,可以怎么做呢?
注:以3.2.1版本为例。
Worker Task关系图

起首,让我们来看下DolphinScheduler中Worker Task的关系原理。


  • AbstractTask: 主要界说了Task的根本生命周期接口,好比说init、handle和cancel
  • AbstractRemoteTask: 主要对handle方法做了实现,表现了模版方法设计模式,提取了submitApplication、trackApplicationStatus以及cancelApplication三个核心接口方法
  • AbstractYarnTask: 好比说YARN使命,就抽象了AbstractYarnTask,此中submitApplication、trackApplicationStatus以及cancelApplication可以直接是对YARN API的访问
AbstractYarnTask实现YARN状态跟踪

AbstractYarnTask可以实现YARN状态跟踪,参考org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask,完整代码如下 :
  1. public abstract class AbstractYarnTask extends AbstractRemoteTask {
  2.     private static final int MAX_RETRY_ATTEMPTS = 3;
  3.     private ShellCommandExecutor shellCommandExecutor;
  4.     public AbstractYarnTask(TaskExecutionContext taskRequest) {
  5.         super(taskRequest);
  6.         this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskRequest);
  7.     }
  8.     @Override
  9.     public void submitApplication() throws TaskException {
  10.         try {
  11.             IShellInterceptorBuilder shellActuatorBuilder =
  12.                     ShellInterceptorBuilderFactory.newBuilder()
  13.                             .properties(getProperties())
  14.                             // todo: do we need to move the replace to subclass?
  15.                             .appendScript(getScript().replaceAll("\\r\\n", System.lineSeparator()));
  16.             // SHELL task exit code
  17.             TaskResponse response = shellCommandExecutor.run(shellActuatorBuilder, null);
  18.             setExitStatusCode(response.getExitStatusCode());
  19.             setAppIds(String.join(TaskConstants.COMMA, getApplicationIds()));
  20.             setProcessId(response.getProcessId());
  21.         } catch (InterruptedException ex) {
  22.             Thread.currentThread().interrupt();
  23.             log.info("The current yarn task has been interrupted", ex);
  24.             setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
  25.             throw new TaskException("The current yarn task has been interrupted", ex);
  26.         } catch (Exception e) {
  27.             log.error("yarn process failure", e);
  28.             exitStatusCode = -1;
  29.             throw new TaskException("Execute task failed", e);
  30.         }
  31.     }
  32.     @Override
  33.     public void trackApplicationStatus() throws TaskException {
  34.         if (StringUtils.isEmpty(appIds)) {
  35.             return;
  36.         }
  37.         List<String> appIdList = Arrays.asList(appIds.split(","));
  38.         boolean continueTracking = true;
  39.         while (continueTracking) {
  40.             Map<String, YarnState> yarnStateMap = new HashMap<>();
  41.             for (String appId : appIdList) {
  42.                 if (StringUtils.isEmpty(appId)) {
  43.                     continue;
  44.                 }
  45.                 boolean hadoopSecurityAuthStartupState =
  46.                         PropertyUtils.getBoolean(HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false);
  47.                 String yarnStateJson = fetchYarnStateJsonWithRetry(appId, hadoopSecurityAuthStartupState);
  48.                 if (StringUtils.isNotEmpty(yarnStateJson)) {
  49.                     String appJson = JSONUtils.getNodeString(yarnStateJson, "app");
  50.                     YarnTask yarnTask = JSONUtils.parseObject(appJson, YarnTask.class);
  51.                     log.info("yarnTask : {}", yarnTask);
  52.                     yarnStateMap.put(yarnTask.getId(), YarnState.of(yarnTask.getState()));
  53.                 }
  54.             }
  55.             YarnState yarnTaskOverallStatus = YarnTaskStatusChecker.getYarnTaskOverallStatus(yarnStateMap);
  56.             if (yarnTaskOverallStatus.isFinalState()) {
  57.                 handleFinalState(yarnTaskOverallStatus);
  58.                 continueTracking = false;
  59.             } else {
  60.                 try {
  61.                     TimeUnit.MILLISECONDS.sleep(SLEEP_TIME_MILLIS * 10);
  62.                 } catch (InterruptedException e) {
  63.                     Thread.currentThread().interrupt();
  64.                     throw new RuntimeException(e);
  65.                 }
  66.             }
  67.         }
  68.     }
  69.     private String fetchYarnStateJsonWithRetry(String appId,
  70.                                                boolean hadoopSecurityAuthStartupState) throws TaskException {
  71.         int retryCount = 0;
  72.         while (retryCount < MAX_RETRY_ATTEMPTS) {
  73.             try {
  74.                 return fetchYarnStateJson(appId, hadoopSecurityAuthStartupState);
  75.             } catch (Exception e) {
  76.                 retryCount++;
  77.                 log.error("Failed to fetch or parse Yarn state for appId: {}. Attempt: {}/{}",
  78.                         appId, retryCount, MAX_RETRY_ATTEMPTS, e);
  79.                 if (retryCount >= MAX_RETRY_ATTEMPTS) {
  80.                     throw new TaskException("Failed to fetch Yarn state after "
  81.                             + MAX_RETRY_ATTEMPTS + " attempts for appId: " + appId, e);
  82.                 }
  83.                 try {
  84.                     TimeUnit.MILLISECONDS.sleep(SLEEP_TIME_MILLIS);
  85.                 } catch (InterruptedException ie) {
  86.                     Thread.currentThread().interrupt();
  87.                     throw new RuntimeException(ie);
  88.                 }
  89.             }
  90.         }
  91.         return null;
  92.     }
  93.     private void handleFinalState(YarnState yarnState) {
  94.         switch (yarnState) {
  95.             case FINISHED:
  96.                 setExitStatusCode(EXIT_CODE_SUCCESS);
  97.                 break;
  98.             case KILLED:
  99.                 setExitStatusCode(EXIT_CODE_KILL);
  100.                 break;
  101.             default:
  102.                 setExitStatusCode(EXIT_CODE_FAILURE);
  103.                 break;
  104.         }
  105.     }
  106.     private String fetchYarnStateJson(String appId, boolean hadoopSecurityAuthStartupState) throws Exception {
  107.         return hadoopSecurityAuthStartupState
  108.                 ? KerberosHttpClient.get(getApplicationUrl(appId))
  109.                 : HttpUtils.get(getApplicationUrl(appId));
  110.     }
  111.     static class YarnTaskStatusChecker {
  112.         public static YarnState getYarnTaskOverallStatus(Map<String, YarnState> yarnTaskMap) {
  113.             // 检查是否有任何任务处于 FAILED 或 KILLED 状态
  114.             boolean hasKilled = yarnTaskMap.values().stream()
  115.                     .anyMatch(state -> state == YarnState.KILLED);
  116.             if (hasKilled) {
  117.                 return YarnState.KILLED;
  118.             }
  119.             // 检查是否有任何任务处于 FAILED 或 KILLED 状态
  120.             boolean hasFailed = yarnTaskMap.values().stream()
  121.                     .anyMatch(state -> state == YarnState.FAILED);
  122.             if (hasFailed) {
  123.                 return YarnState.FAILED;
  124.             }
  125.             // 检查是否所有任务都处于 FINISHED 状态
  126.             boolean allFINISHED = yarnTaskMap.values().stream()
  127.                     .allMatch(state -> state == YarnState.FINISHED);
  128.             if (allFINISHED) {
  129.                 return YarnState.FINISHED;
  130.             }
  131.             // 检查是否有任何任务处于 RUNNING 状态
  132.             boolean hasRunning = yarnTaskMap.values().stream()
  133.                     .anyMatch(state -> state == YarnState.RUNNING);
  134.             if (hasRunning) {
  135.                 return YarnState.RUNNING;
  136.             }
  137.             // 检查是否有任何任务处于提交中状态
  138.             boolean hasSubmitting = yarnTaskMap.values().stream()
  139.                     .anyMatch(state -> state == YarnState.NEW || state == YarnState.NEW_SAVING
  140.                             || state == YarnState.SUBMITTED || state == YarnState.ACCEPTED);
  141.             if (hasSubmitting) {
  142.                 return YarnState.SUBMITTING;
  143.             }
  144.             // 如果都不匹配,返回未知状态
  145.             return YarnState.UNKNOWN;
  146.         }
  147.     }
  148.     /**
  149.      * cancel application
  150.      *
  151.      * @throws TaskException exception
  152.      */
  153.     @Override
  154.     public void cancelApplication() throws TaskException {
  155.         // cancel process
  156.         try {
  157.             shellCommandExecutor.cancelApplication();
  158.         } catch (Exception e) {
  159.             throw new TaskException("cancel application error", e);
  160.         }
  161.     }
  162.     /**
  163.      * get application ids
  164.      *
  165.      * @return
  166.      * @throws TaskException
  167.      */
  168.     @Override
  169.     public List<String> getApplicationIds() throws TaskException {
  170.         // TODO 这里看common.properties中是否配置 appId.collect了,如果配置了走aop,否则走log
  171.         return LogUtils.getAppIds(
  172.                 taskRequest.getLogPath(),
  173.                 taskRequest.getAppInfoPath(),
  174.                 PropertyUtils.getString(APPID_COLLECT, DEFAULT_COLLECT_WAY));
  175.     }
  176.     /** Get the script used to bootstrap the task */
  177.     protected abstract String getScript();
  178.     /** Get the properties of the task used to replace the placeholders in the script. */
  179.     protected abstract Map<String, String> getProperties();
  180.     @Data
  181.     static class YarnTask {
  182.         private String id;
  183.         private String state;
  184.     }
  185.     private String getApplicationUrl(String applicationId) throws BaseException {
  186.         String yarnResourceRmIds = PropertyUtils.getString(YARN_RESOURCEMANAGER_HA_RM_IDS);
  187.         String yarnAppStatusAddress = PropertyUtils.getString(YARN_APPLICATION_STATUS_ADDRESS);
  188.         String hadoopResourceManagerHttpAddressPort =
  189.                 PropertyUtils.getString(HADOOP_RESOURCE_MANAGER_HTTPADDRESS_PORT);
  190.         String appUrl = StringUtils.isEmpty(yarnResourceRmIds) ?
  191.                 yarnAppStatusAddress :
  192.                 getAppAddress(yarnAppStatusAddress, yarnResourceRmIds);
  193.         if (StringUtils.isBlank(appUrl)) {
  194.             throw new BaseException("yarn application url generation failed");
  195.         }
  196.         log.info("yarn application url:{}", String.format(appUrl, hadoopResourceManagerHttpAddressPort, applicationId));
  197.         return String.format(appUrl, hadoopResourceManagerHttpAddressPort, applicationId);
  198.     }
  199.     private static String getAppAddress(String appAddress, String rmHa) {
  200.         String[] appAddressArr = appAddress.split(Constants.DOUBLE_SLASH);
  201.         if (appAddressArr.length != 2) {
  202.             return null;
  203.         }
  204.         String protocol = appAddressArr[0] + Constants.DOUBLE_SLASH;
  205.         String[] pathSegments = appAddressArr[1].split(Constants.COLON);
  206.         if (pathSegments.length != 2) {
  207.             return null;
  208.         }
  209.         String end = Constants.COLON + pathSegments[1];
  210.         // get active ResourceManager
  211.         String activeRM = YarnHAAdminUtils.getActiveRMName(protocol, rmHa);
  212.         if (StringUtils.isEmpty(activeRM)) {
  213.             return null;
  214.         }
  215.         return protocol + activeRM + end;
  216.     }
  217.     /** yarn ha admin utils */
  218.     private static final class YarnHAAdminUtils {
  219.         /**
  220.          * get active resourcemanager node
  221.          *
  222.          * @param protocol http protocol
  223.          * @param rmIds yarn ha ids
  224.          * @return yarn active node
  225.          */
  226.         public static String getActiveRMName(String protocol, String rmIds) {
  227.             String hadoopResourceManagerHttpAddressPort =
  228.                     PropertyUtils.getString(HADOOP_RESOURCE_MANAGER_HTTPADDRESS_PORT);
  229.             String[] rmIdArr = rmIds.split(Constants.COMMA);
  230.             String yarnUrl = protocol
  231.                     + "%s:"
  232.                     + hadoopResourceManagerHttpAddressPort
  233.                     + "/ws/v1/cluster/info";
  234.             try {
  235.                 /** send http get request to rm */
  236.                 for (String rmId : rmIdArr) {
  237.                     String state = getRMState(String.format(yarnUrl, rmId));
  238.                     if (Constants.HADOOP_RM_STATE_ACTIVE.equals(state)) {
  239.                         return rmId;
  240.                     }
  241.                 }
  242.             } catch (Exception e) {
  243.                 log.error("get yarn ha application url failed", e);
  244.             }
  245.             return null;
  246.         }
  247.         /** get ResourceManager state */
  248.         public static String getRMState(String url) {
  249.             boolean hadoopSecurityAuthStartupState =
  250.                     PropertyUtils.getBoolean(HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false);
  251.             String retStr = Boolean.TRUE.equals(hadoopSecurityAuthStartupState)
  252.                     ? KerberosHttpClient.get(url)
  253.                     : HttpUtils.get(url);
  254.             if (StringUtils.isEmpty(retStr)) {
  255.                 return null;
  256.             }
  257.             // to json
  258.             ObjectNode jsonObject = JSONUtils.parseObject(retStr);
  259.             // get ResourceManager state
  260.             if (!jsonObject.has("clusterInfo")) {
  261.                 return null;
  262.             }
  263.             return jsonObject.get("clusterInfo").path("haState").asText();
  264.         }
  265.     }
  266.     public enum YarnState {
  267.         NEW,
  268.         NEW_SAVING,
  269.         SUBMITTED,
  270.         ACCEPTED,
  271.         RUNNING,
  272.         FINISHED,
  273.         FAILED,
  274.         KILLED,
  275.         SUBMITTING,
  276.         UNKNOWN,
  277.         ;
  278.         // 将字符串转换为枚举
  279.         public static YarnState of(String state) {
  280.             try {
  281.                 return YarnState.valueOf(state);
  282.             } catch (IllegalArgumentException | NullPointerException e) {
  283.                 // 如果字符串无效,则返回 null
  284.                 return null;
  285.             }
  286.         }
  287.         /**
  288.          * 任务结束
  289.          * @return
  290.          */
  291.         public boolean isFinalState() {
  292.             return this == FINISHED || this == FAILED || this == KILLED;
  293.         }
  294.     }
  295. }
复制代码
可以看到,这里的核心逻辑其实就是去掉之前直接把handle接口重写了,而如今针对YARN使命,只需要实现submitApplication、trackApplicationStatus两个核心接口,cancelApplication这个其实原则上应该代理YarnApplicationManager才好(当前没有整合,不过不影响)。
流式使命前端applicationId显示

dolphinscheduler-ui/src/views/projects/task/instance/use-stream-table.ts

后端封装applicationId为YARN URL

dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java 修改

dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java 修改

dolphinscheduler-common/src/main/resources/common.properties修改

dolphinscheduler-storage-plugin/dolphinscheduler-storage-hdfs/src/main/java/org/apache/dolphinscheduler/plugin/storage/hdfs/HdfsStorageOperator.java修改

dolphinscheduler-storage-plugin/dolphinscheduler-storage-hdfs/src/main/java/org/apache/dolphinscheduler/plugin/storage/hdfs/HdfsStorageProperties.java修改

页面效果如下 :

注意 : URL粘贴是需要自己写的,上面的代码不包罗
问题追踪

这里其实是有问题,对于state状态来说,是有FINISHED、FAILED、KILLED三种状态,但是FINISHED状态里面还是有FinalStatus,完成不一定是乐成,FINISHED下面其实也有SUCCEEDED、FAILED和KILLED。其实就是FINISHED不能作为DolphinScheduler的终态,需要继承判定而已。
org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask#handleFinalState
  1. private void handleFinalState(YarnState yarnState) {
  2.     switch (yarnState) {
  3.         case FINISHED:
  4.             setExitStatusCode(EXIT_CODE_SUCCESS);
  5.             break;
  6.         case KILLED:
  7.             setExitStatusCode(EXIT_CODE_KILL);
  8.             break;
  9.         default:
  10.             setExitStatusCode(EXIT_CODE_FAILURE);
  11.             break;
  12.     }
  13. }
复制代码
使用HTTP对使命举行kill
  1. curl -X PUT -d '{"state":"KILLED"}' \
  2. >     -H "Content-Type: application/json" \
  3. >     http://xx.xx.xx.xx:8088/ws/v1/cluster/apps/application_1694766249884_1098/state?user.name=hdfs
复制代码
注意 : 一定要指定user.name,否则不一定能kill掉。
原文链接:https://segmentfault.com/a/1190000045058893
本文由 白鲸开源 提供发布支持!

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

农妇山泉一亩田

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表