ToB企服应用市场:ToB评测及商务社交产业平台

标题: Worker模块源码实战:万字长文解析DolphinScheduler如何实现亿级任务调度 [打印本页]

作者: 祗疼妳一个    时间: 昨天 17:27
标题: Worker模块源码实战:万字长文解析DolphinScheduler如何实现亿级任务调度

Apache DolphinScheduler的Worker模块是其分布式调度系统的焦点组件之一,负责任务执行、资源管理及集群动态调度。本文将通过源码剖析,揭示其设计头脑与实现细节.
1、Worker接收Master RPC哀求架构图


Worker服务的Netty提供和Master JDK动态代理接口调用,请参考Dolphinscheduler告警模块解说,不再重复地说。
简说 :
org.apache.dolphinscheduler.extract.worker.ITaskInstanceOperator
  1. @RpcService
  2. public interface ITaskInstanceOperator {
  3.     @RpcMethod
  4.     TaskInstanceDispatchResponse dispatchTask(TaskInstanceDispatchRequest taskInstanceDispatchRequest);
  5.     @RpcMethod
  6.     TaskInstanceKillResponse killTask(TaskInstanceKillRequest taskInstanceKillRequest);
  7.     @RpcMethod
  8.     TaskInstancePauseResponse pauseTask(TaskInstancePauseRequest taskPauseRequest);
  9.     @RpcMethod
  10.     UpdateWorkflowHostResponse updateWorkflowInstanceHost(UpdateWorkflowHostRequest updateWorkflowHostRequest);
  11. }
复制代码
对实现了@RpcService的接口和@RpcMethod的方法,进行Worker的Netty handler注入和Master动态代理实现。
2、分发任务

(TaskInstanceDispatchOperationFunction)

2.1、WorkerConfig

WorkerConfig : 实在就是从Worker模块下 application.yaml 下读取 worker 开头的设置
2.2、WorkerTaskExecutorFactoryBuilder

WorkerTaskExecutorFactoryBuilder : 是任务执行器工厂的构造器,里面封装了 DefaultWorkerTaskExecutorFactory(默认Worker任务执行器工厂) ,DefaultWorkerTaskExecutorFactory工厂又封装了 DefaultWorkerTaskExecutor 的创建。DefaultWorkerTaskExecutor 的父类是WorkerTaskExecutor,WorkerTaskExecutor又是一个线程。好玩不?
2.3、WorkerTaskExecutorThreadPool

WorkerTaskExecutorThreadPool : 实在就是Fixed线程池的封装而已
2.4、从operator开始说
  1. public TaskInstanceDispatchResponse operate(TaskInstanceDispatchRequest taskInstanceDispatchRequest) {
  2.     log.info("Receive TaskInstanceDispatchRequest: {}", taskInstanceDispatchRequest);
  3.     // TODO 任务执行上下文
  4.     TaskExecutionContext taskExecutionContext = taskInstanceDispatchRequest.getTaskExecutionContext();
  5.     try {
  6.         // TODO 设置worker地址
  7.         taskExecutionContext.setHost(workerConfig.getWorkerAddress());
  8.         // TODO 设置task日志存放路径
  9.         taskExecutionContext.setLogPath(LogUtils.getTaskInstanceLogFullPath(taskExecutionContext));
  10.         // TODO MDC中设置流程实例id和任务实例id,好像只是put,没有get使用
  11.         LogUtils.setWorkflowAndTaskInstanceIDMDC(
  12.                 taskExecutionContext.getProcessInstanceId(),
  13.                 taskExecutionContext.getTaskInstanceId());
  14.         // check server status, if server is not running, return failed to reject this task
  15.         if (!ServerLifeCycleManager.isRunning()) {
  16.             log.error("server is not running. reject task: {}", taskExecutionContext.getProcessInstanceId());
  17.             return TaskInstanceDispatchResponse.failed(taskExecutionContext.getTaskInstanceId(),
  18.                     "server is not running");
  19.         }
  20.         TaskMetrics.incrTaskTypeExecuteCount(taskExecutionContext.getTaskType());
  21.         // TODO 通过WorkerTaskExecutorFactoryBuilder创建了一个WorkerTaskExecutor
  22.         WorkerTaskExecutor workerTaskExecutor = workerTaskExecutorFactoryBuilder
  23.                 .createWorkerTaskExecutorFactory(taskExecutionContext)
  24.                 .createWorkerTaskExecutor();
  25.         // todo: hold the workerTaskExecutor
  26.         // TODO 直接进行任务的提交
  27.         if (!workerTaskExecutorThreadPool.submitWorkerTaskExecutor(workerTaskExecutor)) {
  28.             log.info("Submit task: {} to wait queue failed", taskExecutionContext.getTaskName());
  29.             return TaskInstanceDispatchResponse.failed(taskExecutionContext.getTaskInstanceId(),
  30.                     "WorkerManagerThread is full");
  31.         } else {
  32.             log.info("Submit task: {} to wait queue success", taskExecutionContext.getTaskName());
  33.             return TaskInstanceDispatchResponse.success(taskExecutionContext.getTaskInstanceId());
  34.         }
  35.     } finally {
  36.         LogUtils.removeWorkflowAndTaskInstanceIdMDC();
  37.     }
  38. }
复制代码
LogUtils.getTaskInstanceLogFullPath(taskExecutionContext) 解析
org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils#getTaskInstanceLogFullPath : 获取任务日记的全路径
  1. /**
  2.      * Get task instance log full path.
  3.      *
  4.      * @param taskExecutionContext task execution context.
  5.      * @return task instance log full path.
  6.      */
  7.     public static String getTaskInstanceLogFullPath(TaskExecutionContext taskExecutionContext) {
  8.         return getTaskInstanceLogFullPath(
  9.                 DateUtils.timeStampToDate(taskExecutionContext.getFirstSubmitTime()),
  10.                 taskExecutionContext.getProcessDefineCode(),
  11.                 taskExecutionContext.getProcessDefineVersion(),
  12.                 taskExecutionContext.getProcessInstanceId(),
  13.                 taskExecutionContext.getTaskInstanceId());
  14.     }
复制代码
org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils#getTaskInstanceLogFullPath : 拼接出任务日记的全路径
  1. /**
  2.      * todo: Remove the submitTime parameter?
  3.      * The task instance log full path, the path is like:{log.base}/{taskSubmitTime}/{workflowDefinitionCode}/{workflowDefinitionVersion}/{}workflowInstance}/{taskInstance}.log
  4.      *
  5.      * @param taskFirstSubmitTime       task first submit time
  6.      * @param workflowDefinitionCode    workflow definition code
  7.      * @param workflowDefinitionVersion workflow definition version
  8.      * @param workflowInstanceId        workflow instance id
  9.      * @param taskInstanceId            task instance id.
  10.      * @return task instance log full path.
  11.      */
  12.     public static String getTaskInstanceLogFullPath(Date taskFirstSubmitTime,
  13.                                                     Long workflowDefinitionCode,
  14.                                                     int workflowDefinitionVersion,
  15.                                                     int workflowInstanceId,
  16.                                                     int taskInstanceId) {
  17.         if (TASK_INSTANCE_LOG_BASE_PATH == null) {
  18.             throw new IllegalArgumentException(
  19.                     "Cannot find the task instance log base path, please check your logback.xml file");
  20.         }
  21.         final String taskLogFileName = Paths.get(
  22.                 String.valueOf(workflowDefinitionCode),
  23.                 String.valueOf(workflowDefinitionVersion),
  24.                 String.valueOf(workflowInstanceId),
  25.                 String.format("%s.log", taskInstanceId)).toString();
  26.         return TASK_INSTANCE_LOG_BASE_PATH
  27.                 .resolve(DateUtils.format(taskFirstSubmitTime, DateConstants.YYYYMMDD, null))
  28.                 .resolve(taskLogFileName)
  29.                 .toString();
  30.     }
复制代码
org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils#getTaskInstanceLogBasePath : 读取logback-spring.xml中的设置,获取任务实例日记的基础路径,实在就是获取根目次下/logs为基础路径
  1. /**
  2.      * Get task instance log base absolute path, this is defined in logback.xml
  3.      *
  4.      * @return
  5.      */
  6.     public static Path getTaskInstanceLogBasePath() {
  7.         return Optional.of(LoggerFactory.getILoggerFactory())
  8.                 .map(e -> (AppenderAttachable<ILoggingEvent>) (e.getLogger("ROOT")))
  9.                 .map(e -> (SiftingAppender) (e.getAppender("TASKLOGFILE")))
  10.                 .map(e -> ((TaskLogDiscriminator) (e.getDiscriminator())))
  11.                 .map(TaskLogDiscriminator::getLogBase)
  12.                 .map(e -> Paths.get(e).toAbsolutePath())
  13.                 .orElse(null);
  14.     }
复制代码
worker的 logback-spring.xml :
  1. <configuration scan="true" scanPeriod="120 seconds">
  2.   <property name="log.base" value="logs"/>
  3.   ...
  4.   <appender name="TASKLOGFILE" >
  5.           <filter />
  6.           <Discriminator >
  7.               <key>taskInstanceLogFullPath</key>
  8.               <logBase>${log.base}</logBase>
  9.           </Discriminator>
  10.           <sift>
  11.               <appender name="FILE-${taskInstanceLogFullPath}" >
  12.                   <file>${taskInstanceLogFullPath}</file>
  13.                   <encoder>
  14.                       <pattern>
  15.                           [%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} - %message%n
  16.                       </pattern>
  17.                       <charset>UTF-8</charset>
  18.                   </encoder>
  19.                   <append>true</append>
  20.               </appender>
  21.           </sift>
  22.       </appender>
  23.   ...
  24.   <root level="INFO">
  25.       <appender-ref ref="STDOUT"/>
  26.       <appender-ref ref="TASKLOGFILE"/>
  27.   </root>
  28. </configuration>
复制代码
最终地址是:
  1. /opt/dolphinscheduler/worker-server/logs/20240615/13929490938784/1/1815/1202.log
复制代码
2.5、DefaultWorkerTaskExecutor解说

org.apache.dolphinscheduler.server.worker.runner.operator.TaskInstanceDispatchOperationFunction#operate
  1. // TODO 通过WorkerTaskExecutorFactoryBuilder创建了一个WorkerTaskExecutor
  2.             WorkerTaskExecutor workerTaskExecutor = workerTaskExecutorFactoryBuilder
  3.                     .createWorkerTaskExecutorFactory(taskExecutionContext)
  4.                     .createWorkerTaskExecutor();
  5.             // todo: hold the workerTaskExecutor
  6.             // TODO 直接进行任务的提交
  7.             if (!workerTaskExecutorThreadPool.submitWorkerTaskExecutor(workerTaskExecutor)) {
  8.                 log.info("Submit task: {} to wait queue failed", taskExecutionContext.getTaskName());
  9.                 return TaskInstanceDispatchResponse.failed(taskExecutionContext.getTaskInstanceId(),
  10.                         "WorkerManagerThread is full");
  11.             } else {
  12.                 log.info("Submit task: {} to wait queue success", taskExecutionContext.getTaskName());
  13.                 return TaskInstanceDispatchResponse.success(taskExecutionContext.getTaskInstanceId());
  14.             }
复制代码
直接使用 workerTaskExecutorThreadPool.submitWorkerTaskExecutor(workerTaskExecutor)进行任务的提交
WorkerTaskExecutor 是一个线程,既然是线程,是不是要看一下run :
  1. public void run() {
  2.         try {
  3.             // TODO MDC中设置流程实例和任务实例,其实就相当于是ThreadLocal使用一样
  4.             LogUtils.setWorkflowAndTaskInstanceIDMDC(
  5.                     taskExecutionContext.getProcessInstanceId(),
  6.                     taskExecutionContext.getTaskInstanceId());
  7.             // TODO MDC中设置任务的日志路径
  8.             LogUtils.setTaskInstanceLogFullPathMDC(taskExecutionContext.getLogPath());
  9.             // TODO 打印任务的头部
  10.             TaskInstanceLogHeader.printInitializeTaskContextHeader();
  11.             // TODO 进行任务的初始化,其实就是做了任务的开始时间和taskAppId(流程实例id + 任务实例id)
  12.             initializeTask();
  13.             // TODO DRY_RUN其实就是空跑,其实就是直接设置状态为成功
  14.             if (DRY_RUN_FLAG_YES == taskExecutionContext.getDryRun()) {
  15.                 taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.SUCCESS);
  16.                 taskExecutionContext.setEndTime(System.currentTimeMillis());
  17.                 WorkerTaskExecutorHolder.remove(taskExecutionContext.getTaskInstanceId());
  18.                 // TODO 通过worker消息发送器将结果信息发送过去
  19.                 workerMessageSender.sendMessageWithRetry(taskExecutionContext,
  20.                         ITaskInstanceExecutionEvent.TaskInstanceExecutionEventType.FINISH);
  21.                 log.info(
  22.                         "The current execute mode is dry run, will stop the subsequent process and set the taskInstance status to success");
  23.                 return;
  24.             }
  25.             // TODO 打印任务插件的头部
  26.             TaskInstanceLogHeader.printLoadTaskInstancePluginHeader();
  27.             // TODO 执行之前
  28.             beforeExecute();
  29.             // TODO 回调函数
  30.             TaskCallBack taskCallBack = TaskCallbackImpl.builder()
  31.                     .workerMessageSender(workerMessageSender)
  32.                     .taskExecutionContext(taskExecutionContext)
  33.                     .build();
  34.             TaskInstanceLogHeader.printExecuteTaskHeader();
  35.             // TODO 执行
  36.             executeTask(taskCallBack);
  37.             TaskInstanceLogHeader.printFinalizeTaskHeader();
  38.             // TODO 执行之后
  39.             afterExecute();
  40.             closeLogAppender();
  41.         } catch (Throwable ex) {
  42.             log.error("Task execute failed, due to meet an exception", ex);
  43.             afterThrowing(ex);
  44.             closeLogAppender();
  45.         } finally {
  46.             LogUtils.removeWorkflowAndTaskInstanceIdMDC();
  47.             LogUtils.removeTaskInstanceLogFullPathMDC();
  48.         }
  49.     }
复制代码
重点分析:
  1. // TODO DRY_RUN其实就是空跑,其实就是直接设置状态为成功
  2.             if (DRY_RUN_FLAG_YES == taskExecutionContext.getDryRun()) {
  3.                 taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.SUCCESS);
  4.                 taskExecutionContext.setEndTime(System.currentTimeMillis());
  5.                 WorkerTaskExecutorHolder.remove(taskExecutionContext.getTaskInstanceId());
  6.                 // TODO 通过worker消息发送器将结果信息发送过去
  7.                 workerMessageSender.sendMessageWithRetry(taskExecutionContext,
  8.                         ITaskInstanceExecutionEvent.TaskInstanceExecutionEventType.FINISH);
  9.                 log.info(
  10.                         "The current execute mode is dry run, will stop the subsequent process and set the taskInstance status to success");
  11.                 return;
  12.             }
复制代码
执行之前的准备工作,比如说给Master汇报说本身正在运行、创建租户(linux上用户)、创建工作路径、下载所需资源文件、任务初始化**
  1. protected void beforeExecute() {
  2.         // TODO 先设置为RUNNING状态
  3.         taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.RUNNING_EXECUTION);
  4.         // TODO 向Master发送消息,告诉Master这个任务正在运行
  5.         workerMessageSender.sendMessageWithRetry(taskExecutionContext,
  6.                 ITaskInstanceExecutionEvent.TaskInstanceExecutionEventType.RUNNING);
  7.         log.info("Send task status {} master: {}", TaskExecutionStatus.RUNNING_EXECUTION.name(),
  8.                 taskExecutionContext.getWorkflowInstanceHost());
  9.         // In most of case the origin tenant is the same as the current tenant
  10.         // Except `default` tenant. The originTenant is used to download the resources
  11.         // TODO 租户信息
  12.         String originTenant = taskExecutionContext.getTenantCode();
  13.         String tenant = TaskExecutionContextUtils.getOrCreateTenant(workerConfig, taskExecutionContext);
  14.         taskExecutionContext.setTenantCode(tenant);
  15.         log.info("TenantCode: {} check successfully", taskExecutionContext.getTenantCode());
  16.         // TODO 创建工作路径
  17.         TaskExecutionContextUtils.createTaskInstanceWorkingDirectory(taskExecutionContext);
  18.         log.info("WorkflowInstanceExecDir: {} check successfully", taskExecutionContext.getExecutePath());
  19.         TaskChannel taskChannel =
  20.                 Optional.ofNullable(taskPluginManager.getTaskChannelMap().get(taskExecutionContext.getTaskType()))
  21.                         .orElseThrow(() -> new TaskPluginException(taskExecutionContext.getTaskType()
  22.                                 + " task plugin not found, please check the task type is correct."));
  23.         log.info("Create TaskChannel: {} successfully", taskChannel.getClass().getName());
  24.         // TODO 下载资源
  25.         ResourceContext resourceContext = TaskExecutionContextUtils.downloadResourcesIfNeeded(originTenant, taskChannel,
  26.                 storageOperate, taskExecutionContext);
  27.         taskExecutionContext.setResourceContext(resourceContext);
  28.         log.info("Download resources successfully: \n{}", taskExecutionContext.getResourceContext());
  29.         TaskFilesTransferUtils.downloadUpstreamFiles(taskExecutionContext, storageOperate);
  30.         log.info("Download upstream files: {} successfully",
  31.                 TaskFilesTransferUtils.getFileLocalParams(taskExecutionContext, Direct.IN));
  32.         // TODO 创建任务
  33.         task = taskChannel.createTask(taskExecutionContext);
  34.         log.info("Task plugin instance: {} create successfully", taskExecutionContext.getTaskType());
  35.         // todo: remove the init method, this should initialize in constructor method
  36.         // TODO 任务进行初始化
  37.         task.init();
  38.         log.info("Success initialized task plugin instance successfully");
  39.         task.getParameters().setVarPool(taskExecutionContext.getVarPool());
  40.         log.info("Set taskVarPool: {} successfully", taskExecutionContext.getVarPool());
  41.     }
复制代码
1、日记打印
  1. log.info("Send task status {} master: {}", TaskExecutionStatus.RUNNING_EXECUTION.name(),
  2.                 taskExecutionContext.getWorkflowInstanceHost());
  3. 这里需要打印的是 taskExecutionContext.getWorkflowInstanceHost(),不应该是taskExecutionContext.getHost()。就是说你给Master汇报信息的呢,打印自己Worker节点的host干啥(自己肯定知道啊),有用的是当前Worker节点是给哪个Master节点汇报自己的任务状态的
复制代码
2、创建租户
org.apache.dolphinscheduler.server.worker.utils.TaskExecutionContextUtils#getOrCreateTenant
  1. public static String getOrCreateTenant(WorkerConfig workerConfig, TaskExecutionContext taskExecutionContext) {
  2.         try {
  3.             TenantConfig tenantConfig = workerConfig.getTenantConfig();
  4.             String tenantCode = taskExecutionContext.getTenantCode();
  5.             if (TenantConstants.DEFAULT_TENANT_CODE.equals(tenantCode) && tenantConfig.isDefaultTenantEnabled()) {
  6.                 log.info("Current tenant is default tenant, will use bootstrap user: {} to execute the task",
  7.                         TenantConstants.BOOTSTRAPT_SYSTEM_USER);
  8.                 return TenantConstants.BOOTSTRAPT_SYSTEM_USER;
  9.             }
  10.             boolean osUserExistFlag;
  11.             // if Using distributed is true and Currently supported systems are linux,Should not let it
  12.             // automatically
  13.             // create tenants,so TenantAutoCreate has no effect
  14.             if (tenantConfig.isDistributedTenantEnabled() && SystemUtils.IS_OS_LINUX) {
  15.                 // use the id command to judge in linux
  16.                 osUserExistFlag = OSUtils.existTenantCodeInLinux(tenantCode);
  17.             } else if (OSUtils.isSudoEnable() && tenantConfig.isAutoCreateTenantEnabled()) {
  18.                 // if not exists this user, then create
  19.                 // TODO 默认走的是这里的分支,直接通过 sudo useradd -g %s %s 进行创建
  20.                 OSUtils.createUserIfAbsent(tenantCode);
  21.                 osUserExistFlag = OSUtils.getUserList().contains(tenantCode);
  22.             } else {
  23.                 osUserExistFlag = OSUtils.getUserList().contains(tenantCode);
  24.             }
  25.             if (!osUserExistFlag) {
  26.                 throw new TaskException(String.format("TenantCode: %s doesn't exist", tenantCode));
  27.             }
  28.             return tenantCode;
  29.         } catch (TaskException ex) {
  30.             throw ex;
  31.         } catch (Exception ex) {
  32.             throw new TaskException(
  33.                     String.format("TenantCode: %s doesn't exist", taskExecutionContext.getTenantCode()));
  34.         }
  35.     }
复制代码
3、TaskChannel
TaskPluginManager Master启动的时候通 google的 @AutoService来完成SPI注册。
Master启动时候TaskPluginManager初始化
org.apache.dolphinscheduler.server.master.MasterServer#run
  1. @PostConstruct
  2.     public void run() throws SchedulerException {
  3.         ......
  4.         // install task plugin
  5.         // TODO 是通过 google的 @AutoService来进行SPI注册的
  6.         this.taskPluginManager.loadPlugin();
  7.        ......
  8.     }
复制代码
org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager#loadPlugin
  1. public void loadPlugin() {
  2.         if (!loadedFlag.compareAndSet(false, true)) {
  3.             log.warn("The task plugin has already been loaded");
  4.             return;
  5.         }
  6.         // TODO 实例化的时候是通过SPI进行加载的
  7.         PrioritySPIFactory<TaskChannelFactory> prioritySPIFactory = new PrioritySPIFactory<>(TaskChannelFactory.class);
  8.         for (Map.Entry<String, TaskChannelFactory> entry : prioritySPIFactory.getSPIMap().entrySet()) {
  9.             String factoryName = entry.getKey();
  10.             TaskChannelFactory factory = entry.getValue();
  11.             log.info("Registering task plugin: {} - {}", factoryName, factory.getClass().getSimpleName());
  12.             taskChannelFactoryMap.put(factoryName, factory);
  13.             taskChannelMap.put(factoryName, factory.create());
  14.             log.info("Registered task plugin: {} - {}", factoryName, factory.getClass().getSimpleName());
  15.         }
  16.     }
复制代码
焦点逻辑实在就是
TaskChannelFactory 接口 :
  1. public interface TaskChannelFactory extends UiChannelFactory, PrioritySPI {
  2.     TaskChannel create();
  3.     default SPIIdentify getIdentify() {
  4.         return SPIIdentify.builder().name(getName()).build();
  5.     }
  6. }
复制代码
Task插件都实现了TaskChannelFactory接口并使用@AutoService注解 :
以ShellTaskChannelFactory为例 :
  1. @AutoService(TaskChannelFactory.class)
  2. public class ShellTaskChannelFactory implements TaskChannelFactory {
  3.     @Override
  4.     public TaskChannel create() {
  5.         return new ShellTaskChannel();
  6.     }
  7.     @Override
  8.     public String getName() {
  9.         return "SHELL";
  10.     }
  11.     @Override
  12.     public List<PluginParams> getParams() {
  13.         List<PluginParams> paramsList = new ArrayList<>();
  14.         InputParam nodeName = InputParam.newBuilder("name", "$t('Node name')")
  15.                 .addValidate(Validate.newBuilder()
  16.                         .setRequired(true)
  17.                         .build())
  18.                 .build();
  19.         RadioParam runFlag = RadioParam.newBuilder("runFlag", "RUN_FLAG")
  20.                 .addParamsOptions(new ParamsOptions("NORMAL", "NORMAL", false))
  21.                 .addParamsOptions(new ParamsOptions("FORBIDDEN", "FORBIDDEN", false))
  22.                 .build();
  23.         paramsList.add(nodeName);
  24.         paramsList.add(runFlag);
  25.         return paramsList;
  26.     }
  27. }
复制代码
在这里创建了 ShellTaskChannel,也就是TaskChannel
4、下载所需资源
org.apache.dolphinscheduler.server.worker.utils.TaskExecutionContextUtils#downloadResourcesIfNeeded
  1. public static ResourceContext downloadResourcesIfNeeded(String tenant,
  2.                                                             TaskChannel taskChannel,
  3.                                                             StorageOperate storageOperate,
  4.                                                             TaskExecutionContext taskExecutionContext) {
  5.         AbstractParameters abstractParameters = taskChannel.parseParameters(
  6.                 ParametersNode.builder()
  7.                         .taskType(taskExecutionContext.getTaskType())
  8.                         .taskParams(taskExecutionContext.getTaskParams())
  9.                         .build());
  10.         // TODO 其实这里如果要是Sql,这里直接 ArrayList<>()了,下面就不走了
  11.         List<ResourceInfo> resourceFilesList = abstractParameters.getResourceFilesList();
  12.         if (CollectionUtils.isEmpty(resourceFilesList)) {
  13.             log.debug("There is no resource file need to download");
  14.             return new ResourceContext();
  15.         }
  16.         ResourceContext resourceContext = new ResourceContext();
  17.         String taskWorkingDirectory = taskExecutionContext.getExecutePath();
  18.         for (ResourceInfo resourceInfo : resourceFilesList) {
  19.             // TODO 在存储中的路径,比如说hdfs上的文件路径
  20.             String resourceAbsolutePathInStorage = resourceInfo.getResourceName();
  21.             // TODO 文件名称
  22.             String resourceRelativePath = storageOperate.getResourceFileName(tenant, resourceAbsolutePathInStorage);
  23.             // TODO 本地的绝对路径
  24.             String resourceAbsolutePathInLocal = Paths.get(taskWorkingDirectory, resourceRelativePath).toString();
  25.             File file = new File(resourceAbsolutePathInLocal);
  26.             if (!file.exists()) {
  27.                 try {
  28.                     long resourceDownloadStartTime = System.currentTimeMillis();
  29.                     // TODO 资源进行下载
  30.                     storageOperate.download(resourceAbsolutePathInStorage, resourceAbsolutePathInLocal, true);
  31.                     log.debug("Download resource file {} under: {} successfully", resourceAbsolutePathInStorage,
  32.                             resourceAbsolutePathInLocal);
  33.                     FileUtils.setFileTo755(file);
  34.                     WorkerServerMetrics
  35.                             .recordWorkerResourceDownloadTime(System.currentTimeMillis() - resourceDownloadStartTime);
  36.                     WorkerServerMetrics
  37.                             .recordWorkerResourceDownloadSize(Files.size(Paths.get(resourceAbsolutePathInLocal)));
  38.                     WorkerServerMetrics.incWorkerResourceDownloadSuccessCount();
  39.                 } catch (Exception ex) {
  40.                     WorkerServerMetrics.incWorkerResourceDownloadFailureCount();
  41.                     throw new TaskException(
  42.                             String.format("Download resource file: %s error", resourceAbsolutePathInStorage), ex);
  43.                 }
  44.             }
  45.             // TODO 封装resourceContext
  46.             ResourceContext.ResourceItem resourceItem = ResourceContext.ResourceItem.builder()
  47.                     .resourceAbsolutePathInStorage(resourceAbsolutePathInStorage)
  48.                     .resourceRelativePath(resourceRelativePath)
  49.                     .resourceAbsolutePathInLocal(resourceAbsolutePathInLocal)
  50.                     .build();
  51.             resourceContext.addResourceItem(resourceItem);
  52.         }
  53.         return resourceContext;
  54.     }
复制代码
5、下载上游文件(上鄙俚文件的传递)
示例如下 :

upTask :

downTask :

焦点逻辑 : 上鄙俚文件传递实在也很简朴,就是针对本节点来说就是在本地生成对应的文件,然后上传到比如说HDFS范例的资源中心,然后鄙俚节点会跟进上游taskName.输出变量进行指定资源中心文件的下载
downTask中的downloadUpstreamFiles逻辑:
org.apache.dolphinscheduler.server.worker.utils.TaskFilesTransferUtils#downloadUpstreamFiles
  1. public static void downloadUpstreamFiles(TaskExecutionContext taskExecutionContext, StorageOperate storageOperate) {
  2.         // TODO 上游传递过来的变量池
  3.         List<Property> varPools = getVarPools(taskExecutionContext);
  4.         // get map of varPools for quick search
  5.         Map<String, Property> varPoolsMap = varPools.stream().collect(Collectors.toMap(Property::getProp, x -> x));
  6.         // get "IN FILE" parameters
  7.         // TODO 其实就是看localParams的参数有没有为IN的FILE的本地参数
  8.         List<Property> localParamsProperty = getFileLocalParams(taskExecutionContext, Direct.IN);
  9.         // TODO 一般情况下,就是这里就结束了
  10.         if (localParamsProperty.isEmpty()) {
  11.             return;
  12.         }
  13.         String executePath = taskExecutionContext.getExecutePath();
  14.         // data path to download packaged data
  15.         // TODO 下载的临时目录
  16.         String downloadTmpPath = String.format("%s/%s", executePath, DOWNLOAD_TMP);
  17.         log.info("Download upstream files...");
  18.         for (Property property : localParamsProperty) {
  19.             // TODO 这里其实就是获取
  20.             /**
  21.              * varPoolsMap 如下 :
  22.              * {"prop":"upTask.file-data","direct":"IN","type":"FILE","value":"DATA_TRANSFER/20240624/13978983404960/2_1893/upTask_1320_text.txt"}
  23.              * {"prop":"upTask.dir-data","direct":"IN","type":"FILE","value":"DATA_TRANSFER/20240624/13978983404960/2_1893/upTask_1320_data_ds_pack.zip"}
  24.              *
  25.              * localParamsProperty 如下 :
  26.              * {"prop":"input_dir","direct":"IN","type":"FILE","value":"upTask.dir-data"}
  27.              */
  28.             // TODO 所以这里是不为null的
  29.             Property inVarPool = varPoolsMap.get(property.getValue());
  30.             if (inVarPool == null) {
  31.                 log.error("{} not in  {}", property.getValue(), varPoolsMap.keySet());
  32.                 throw new TaskException(String.format("Can not find upstream file using %s, please check the key",
  33.                         property.getValue()));
  34.             }
  35.             String resourcePath = inVarPool.getValue();
  36.             // TODO 其实就是在封装本地的路径
  37.             // TODO 这里注意啊,比如说脚本中 cat input_dir/test1/text.txt,input_dir这个东西是下载路径拼接上的
  38.             String targetPath = String.format("%s/%s", executePath, property.getProp());
  39.             String downloadPath;
  40.             // If the data is packaged, download it to a special directory (DOWNLOAD_TMP) and unpack it to the
  41.             // targetPath
  42.             // TODO 判断是否是zip压缩
  43.             boolean isPack = resourcePath.endsWith(PACK_SUFFIX);
  44.             if (isPack) {
  45.                 downloadPath = String.format("%s/%s", downloadTmpPath, new File(resourcePath).getName());
  46.             } else {
  47.                 downloadPath = targetPath;
  48.             }
  49.             try {
  50.                 // TODO 资源中心路径
  51.                 String resourceWholePath =
  52.                         storageOperate.getResourceFullName(taskExecutionContext.getTenantCode(), resourcePath);
  53.                 log.info("{} --- Remote:{} to Local:{}", property, resourceWholePath, downloadPath);
  54.                 // TODO 系在到本地
  55.                 storageOperate.download(resourceWholePath, downloadPath, true);
  56.             } catch (IOException ex) {
  57.                 throw new TaskException("Download file from storage error", ex);
  58.             }
  59.             // unpack if the data is packaged
  60.             if (isPack) {
  61.                 File downloadFile = new File(downloadPath);
  62.                 log.info("Unpack {} to {}", downloadPath, targetPath);
  63.                 // TODO 如果是zip就是将本地临时目录下的压缩文件解压到目标路径下
  64.                 ZipUtil.unpack(downloadFile, new File(targetPath));
  65.             }
  66.         }
  67.         // delete DownloadTmp Folder if DownloadTmpPath exists
  68.         try {
  69.             // TODO 临时目录下文件删除掉
  70.             org.apache.commons.io.FileUtils.deleteDirectory(new File(downloadTmpPath));
  71.         } catch (IOException e) {
  72.             log.error("Delete DownloadTmpPath {} failed, this will not affect the task status", downloadTmpPath, e);
  73.         }
  74.     }
复制代码
6、创建任务并初始化
实在就是步骤3中,创建完毕TaskChannel,然后调用createTask,返回AbstractTask,然后调用init方法
  1. ......
  2. // TODO 创建任务
  3. task = taskChannel.createTask(taskExecutionContext);
  4. log.info("Task plugin instance: {} create successfully", taskExecutionContext.getTaskType());
  5. // todo: remove the init method, this should initialize in constructor method
  6. // TODO 任务进行初始化
  7. task.init();
  8. log.info("Success initialized task plugin instance successfully");
  9. ......
复制代码
7、给AbstractParameters设置变量池
  1. // TODO 给任务设置变量池
  2. // TODO 一般情况下 taskExecutionContext.getVarPool()这里就为null
  3. task.getParameters().setVarPool(taskExecutionContext.getVarPool());
  4. log.info("Set taskVarPool: {} successfully", taskExecutionContext.getVarPool());
复制代码
注意: 默认环境下,这个taskExecutionContext.getVarPool()是空的,除非上游有OUT变量
  1. // TODO 回调函数,这个还是很关键的把workerMessageSender、taskExecutionContext以构造函数放到了TaskCallBack中
  2. // TODO 所以taskExecutionContext里面是有之前的内容的
  3. TaskCallBack taskCallBack = TaskCallbackImpl.builder()
  4.         .workerMessageSender(workerMessageSender)
  5.         .taskExecutionContext(taskExecutionContext)
  6.         .build();
  7. .......
  8. // TODO 执行
  9. executeTask(taskCallBack);
复制代码
executeTask(taskCallBack):是焦点代码,封装了Worker任务的真正的执行逻辑,参数传递的TaskCallBack,用于任务状态的回报(向Master)
下面就来细说executeTask(taskCallBack)的逻辑 :
  1. public void executeTask(TaskCallBack taskCallBack) throws TaskException {
  2.     if (task == null) {
  3.         throw new IllegalArgumentException("The task plugin instance is not initialized");
  4.     }
  5.     // TODO 这里会进行真正的任务处理
  6.     task.handle(taskCallBack);
  7. }
复制代码
其中的task实在就是AbstractTask,在beforeExecute中 taskChannel.createTask。是Task抽象父类(以ShellTask为例展开说明,其他任务范例类似)
org.apache.dolphinscheduler.plugin.task.shell.ShellTask#handle
  1. public void handle(TaskCallBack taskCallBack) throws TaskException {
  2.     try {
  3.         IShellInterceptorBuilder<?, ?> shellActuatorBuilder = ShellInterceptorBuilderFactory.newBuilder()
  4.                 .properties(ParameterUtils.convert(taskExecutionContext.getPrepareParamsMap())) // TODO 这里就是要进行变量的替换
  5.                 .appendScript(shellParameters.getRawScript());
  6.         // TODO shell执行
  7.         TaskResponse commandExecuteResult = shellCommandExecutor.run(shellActuatorBuilder, taskCallBack);
  8.         // TODO 执行结果,退出状态码
  9.         setExitStatusCode(commandExecuteResult.getExitStatusCode());
  10.         // TODO 设置进程ID
  11.         setProcessId(commandExecuteResult.getProcessId());
  12.         // TODO shellCommandExecutor.getTaskOutputParams()这返回的是 output -> 123
  13.         shellParameters.dealOutParam(shellCommandExecutor.getTaskOutputParams());
  14.     } catch (InterruptedException e) {
  15.         Thread.currentThread().interrupt();
  16.         log.error("The current Shell task has been interrupted", e);
  17.         setExitStatusCode(EXIT_CODE_FAILURE);
  18.         throw new TaskException("The current Shell task has been interrupted", e);
  19.     } catch (Exception e) {
  20.         log.error("shell task error", e);
  21.         setExitStatusCode(EXIT_CODE_FAILURE);
  22.         throw new TaskException("Execute shell task error", e);
  23.     }
  24. }
复制代码
org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory
默认走的是 BashShellInterceptorBuilder
  1. public class ShellInterceptorBuilderFactory {
  2.     private final static String INTERCEPTOR_TYPE = PropertyUtils.getString("shell.interceptor.type", "bash");
  3.     @SuppressWarnings("unchecked")
  4.     public static IShellInterceptorBuilder newBuilder() {
  5.         // TODO 默认的走的是这个逻辑
  6.         if (INTERCEPTOR_TYPE.equalsIgnoreCase("bash")) {
  7.             return new BashShellInterceptorBuilder();
  8.         }
  9.         if (INTERCEPTOR_TYPE.equalsIgnoreCase("sh")) {
  10.             return new ShShellInterceptorBuilder();
  11.         }
  12.         if (INTERCEPTOR_TYPE.equalsIgnoreCase("cmd")) {
  13.             return new CmdShellInterceptorBuilder();
  14.         }
  15.         throw new IllegalArgumentException("not support shell type: " + INTERCEPTOR_TYPE);
  16.     }
  17. }
复制代码
  1. .properties(ParameterUtils.convert(taskExecutionContext.getPrepareParamsMap()))  
复制代码
是向BaseShellInterceptorBuilder的propertyMap中进行taskExecutionContext.getPrepareParamsMap()参数的设置(注意 : taskExecutionContext.getPrepareParamsMap()是在Master中进行的封装。
  1. .appendScript(shellParameters.getRawScript())
复制代码
是向BaseShellInterceptorBuilder的scripts进行设置值。
org.apache.dolphinscheduler.plugin.task.api.AbstractCommandExecutor#run
  1. public TaskResponse run(IShellInterceptorBuilder iShellInterceptorBuilder,
  2.                             TaskCallBack taskCallBack) throws Exception {
  3.     TaskResponse result = new TaskResponse();
  4.     // todo: we need to use state like JDK Thread to make sure the killed task should not be executed
  5.     iShellInterceptorBuilder = iShellInterceptorBuilder
  6.             // TODO 设置执行路径
  7.             .shellDirectory(taskRequest.getExecutePath())
  8.             // TODO 这里设置shell 名字
  9.             .shellName(taskRequest.getTaskAppId());
  10.             
  11.     // Set system env
  12.     // TODO 在这里是设置默认的,比如说可以设置为 /etc/profile
  13.     if (CollectionUtils.isNotEmpty(ShellUtils.ENV_SOURCE_LIST)) {
  14.         // TODO 这里其实就是向 systemEnvs 中加入ENV_SOURCE_LIST中配置的环境文件的列表
  15.         ShellUtils.ENV_SOURCE_LIST.forEach(iShellInterceptorBuilder::appendSystemEnv);
  16.     }
  17.     // Set custom env
  18.     // TODO 设置自定义的env
  19.     if (StringUtils.isNotBlank(taskRequest.getEnvironmentConfig())) {
  20.         // TODO 向 customEnvScripts 中加入
  21.         iShellInterceptorBuilder.appendCustomEnvScript(taskRequest.getEnvironmentConfig());
  22.     }
  23.     // Set k8s config (This is only work in Linux)
  24.     if (taskRequest.getK8sTaskExecutionContext() != null) {
  25.         iShellInterceptorBuilder.k8sConfigYaml(taskRequest.getK8sTaskExecutionContext().getConfigYaml());
  26.     }
  27.     // Set sudo (This is only work in Linux)
  28.     // TODO 设置sudo为true的模式
  29.     iShellInterceptorBuilder.sudoMode(OSUtils.isSudoEnable());
  30.     // Set tenant (This is only work in Linux)
  31.     // TODO 设置租户
  32.     iShellInterceptorBuilder.runUser(taskRequest.getTenantCode());
  33.     // Set CPU Quota (This is only work in Linux)
  34.     if (taskRequest.getCpuQuota() != null) {
  35.         iShellInterceptorBuilder.cpuQuota(taskRequest.getCpuQuota());
  36.     }
  37.     // Set memory Quota (This is only work in Linux)
  38.     if (taskRequest.getMemoryMax() != null) {
  39.         iShellInterceptorBuilder.memoryQuota(taskRequest.getMemoryMax());
  40.     }
  41.     // TODO 这个是重点
  42.     IShellInterceptor iShellInterceptor = iShellInterceptorBuilder.build();
  43.     // TODO 使用ProcessBuilder进行执行,支持sudo模式,和bash模式
  44.     process = iShellInterceptor.execute();
  45.     // parse process output
  46.     // TODO 这里解析到进程的输出
  47.     parseProcessOutput(this.process);
  48.     // collect pod log
  49.     collectPodLogIfNeeded();
  50.     int processId = getProcessId(this.process);
  51.     result.setProcessId(processId);
  52.     // cache processId
  53.     taskRequest.setProcessId(processId);
  54.     // print process id
  55.     log.info("process start, process id is: {}", processId);
  56.     // if timeout occurs, exit directly
  57.     long remainTime = getRemainTime();
  58.     // update pid before waiting for the run to finish
  59.     if (null != taskCallBack) {
  60.         // TODO 更新任务实例信息
  61.         taskCallBack.updateTaskInstanceInfo(processId);
  62.     }
  63.     // waiting for the run to finish
  64.     boolean status = this.process.waitFor(remainTime, TimeUnit.SECONDS);
  65.     TaskExecutionStatus kubernetesStatus =
  66.             ProcessUtils.getApplicationStatus(taskRequest.getK8sTaskExecutionContext(), taskRequest.getTaskAppId());
  67.     if (taskOutputFuture != null) {
  68.         try {
  69.             // Wait the task log process finished.
  70.             taskOutputFuture.get();
  71.         } catch (ExecutionException e) {
  72.             log.error("Handle task log error", e);
  73.         }
  74.     }
  75.     if (podLogOutputFuture != null) {
  76.         try {
  77.             // Wait kubernetes pod log collection finished
  78.             podLogOutputFuture.get();
  79.             // delete pod after successful execution and log collection
  80.             ProcessUtils.cancelApplication(taskRequest);
  81.         } catch (ExecutionException e) {
  82.             log.error("Handle pod log error", e);
  83.         }
  84.     }
  85.     // if SHELL task exit
  86.     if (status && kubernetesStatus.isSuccess()) {
  87.         // SHELL task state
  88.         result.setExitStatusCode(this.process.exitValue());
  89.     } else {
  90.         log.error("process has failure, the task timeout configuration value is:{}, ready to kill ...",
  91.                 taskRequest.getTaskTimeout());
  92.         result.setExitStatusCode(EXIT_CODE_FAILURE);
  93.         cancelApplication();
  94.     }
  95.     int exitCode = this.process.exitValue();
  96.     String exitLogMessage = EXIT_CODE_KILL == exitCode ? "process has killed." : "process has exited.";
  97.     log.info("{} execute path:{}, processId:{} ,exitStatusCode:{} ,processWaitForStatus:{} ,processExitValue:{}",
  98.             exitLogMessage, taskRequest.getExecutePath(), processId, result.getExitStatusCode(), status, exitCode);
  99.     return result;
  100.     }
复制代码
设置默认的环境变量:
  1. // Set system env
  2. // TODO 在这里是设置默认的,比如说可以设置为 /etc/profile
  3. if (CollectionUtils.isNotEmpty(ShellUtils.ENV_SOURCE_LIST)) {
  4.     // TODO 这里其实就是向 systemEnvs 中加入ENV_SOURCE_LIST中配置的环境文件的列表 ShellUtils.ENV_SOURCE_LIST.forEach(iShellInterceptorBuilder::appendSystemEnv);
  5. }
复制代码
org.apache.dolphinscheduler.plugin.task.api.utils.ShellUtils
  1. public List<String> ENV_SOURCE_LIST = Arrays.stream(
  2.             Optional.ofNullable(PropertyUtils.getString("shell.env_source_list")).map(s -> s.split(","))
  3.                     .orElse(new String[0]))
  4.             .map(String::trim)
  5.             .filter(StringUtils::isNotBlank)
  6.             .collect(Collectors.toList());
复制代码
读取的是 common.properties,这里可以设置默认的环境变量
  1. # The default env list will be load by Shell task, e.g. /etc/profile,~/.bash_profile
  2. # 默认是空,比如说可以是
  3. shell.env_source_list=/etc/profile
复制代码
  1. // TODO 这个是重点
  2. IShellInterceptor iShellInterceptor = iShellInterceptorBuilder.build();
复制代码
org.apache.dolphinscheduler.plugin.task.api.shell.bash.BashShellInterceptorBuilder#build
  1. public BashShellInterceptor build() throws FileOperateException, IOException {
  2.     // TODO 这里是生成shell脚本的核心点,写到指定目录下
  3.     generateShellScript();
  4.     // TODO 封装命令
  5.     List<String> bootstrapCommand = generateBootstrapCommand();
  6.     // TODO 实例化BashShellInterceptor
  7.     return new BashShellInterceptor(bootstrapCommand, shellDirectory);
  8. }
复制代码
org.apache.dolphinscheduler.plugin.task.api.shell.BaseLinuxShellInterceptorBuilder#bootstrapCommandInSudoMode
注意 : 这个方法里面有两层含义,如果是资源限制走的是bootstrapCommandInResourceLimitMode,实在这里还蕴藏着一个大大的BUG(我只修改了ShellTask),针对其他范例的Shell封装的任务,比如说MR、Spark、Flink等等,如果走资源限制,这里就有问题,由于这些任务在页面上不能设置CPU和内存的Quota),否则走的是sudo -u 租户 -i /opt/xx.sh。
  1. private List<String> bootstrapCommandInSudoMode() {
  2.     // TODO 如果task.resource.limit.state为false,这里的逻辑不会走,也不会走CPU和内存的限制
  3.     if (PropertyUtils.getBoolean(AbstractCommandExecutorConstants.TASK_RESOURCE_LIMIT_STATE, false)) {
  4.         return bootstrapCommandInResourceLimitMode();
  5.     }
  6.     List<String> bootstrapCommand = new ArrayList<>();
  7.     bootstrapCommand.add("sudo");
  8.     if (StringUtils.isNotBlank(runUser)) {
  9.         bootstrapCommand.add("-u");
  10.         bootstrapCommand.add(runUser);
  11.     }
  12.     bootstrapCommand.add("-i");
  13.     bootstrapCommand.add(shellAbsolutePath().toString());
  14.     return bootstrapCommand;
  15. }
复制代码
  1. // TODO 使用ProcessBuilder进行执行,支持sudo模式,和bash模式
  2. process = iShellInterceptor.execute();
复制代码
org.apache.dolphinscheduler.plugin.task.api.shell.BaseShellInterceptor#execute
  1. public Process execute() throws IOException {
  2.     // init process builder
  3.     ProcessBuilder processBuilder = new ProcessBuilder();
  4.     // setting up a working directory
  5.     // TODO 设置工作路径,目的其实就是在执行脚本的时候,可以在该目录的位置来加载比如说jar包什么的
  6.     processBuilder.directory(new File(workingDirectory));
  7.     // merge error information to standard output stream
  8.     processBuilder.redirectErrorStream(true);
  9.     processBuilder.command(executeCommands);
  10.     log.info("Executing shell command : {}", String.join(" ", executeCommands));
  11.     return processBuilder.start();
  12. }
复制代码
实在就是使用 ProcessBuilder 进行任务的提交。
org.apache.dolphinscheduler.plugin.task.api.AbstractCommandExecutor#parseProcessOutput
  1. // TODO 解析输出
  2. private void parseProcessOutput(Process process) {
  3.     // todo: remove this this thread pool.
  4.     ExecutorService getOutputLogService = ThreadUtils
  5.             .newSingleDaemonScheduledExecutorService("ResolveOutputLog-thread-" + taskRequest.getTaskName());
  6.     getOutputLogService.execute(() -> {
  7.         TaskOutputParameterParser taskOutputParameterParser = new TaskOutputParameterParser();
  8.         // TODO 这里正好的读取process.getInputStream()的输入
  9.         try (BufferedReader inReader = new BufferedReader(new InputStreamReader(process.getInputStream()))) {
  10.             // TODO 这里设置了任务的日志路径
  11.             LogUtils.setTaskInstanceLogFullPathMDC(taskRequest.getLogPath());
  12.             String line;
  13.             while ((line = inReader.readLine()) != null) {
  14.                 // TODO 日志缓冲区
  15.                 logBuffer.add(line);
  16.                 // TODO 这里解析taskOutputParams,解析比如说 echo '${setValue(output=1)}'。其实就是字符串 ${setValue(output=1)}
  17.                 taskOutputParameterParser.appendParseLog(line);
  18.             }
  19.             processLogOutputIsSuccess = true;
  20.         } catch (Exception e) {
  21.             log.error("Parse var pool error", e);
  22.             processLogOutputIsSuccess = true;
  23.         } finally {
  24.             // TODO 在这里的时候就将 taskInstanceLogFullPath 删除了
  25.             LogUtils.removeTaskInstanceLogFullPathMDC();
  26.         }
  27.         taskOutputParams = taskOutputParameterParser.getTaskOutputParams();
  28.     });
  29.     getOutputLogService.shutdown();
  30.     ExecutorService parseProcessOutputExecutorService = ThreadUtils
  31.             .newSingleDaemonScheduledExecutorService("TaskInstanceLogOutput-thread-" + taskRequest.getTaskName());
  32.     taskOutputFuture = parseProcessOutputExecutorService.submit(() -> {
  33.         try {
  34.             LogUtils.setTaskInstanceLogFullPathMDC(taskRequest.getLogPath());
  35.             // TODO 对于非pod(k8s)的任务,其实就是processLogOutputIsSuccess这个标识,这个标识是在上面,就是任务运行完毕了
  36.             while (logBuffer.size() > 1 || !processLogOutputIsSuccess || !podLogOutputIsFinished) {
  37.                 if (logBuffer.size() > 1) {
  38.                     logHandler.accept(logBuffer);
  39.                     logBuffer.clear();
  40.                     logBuffer.add(EMPTY_STRING);
  41.                 } else {
  42.                     // TODO 如果没有日志输出,默认等待1s
  43.                     Thread.sleep(TaskConstants.DEFAULT_LOG_FLUSH_INTERVAL);
  44.                 }
  45.             }
  46.         } catch (Exception e) {
  47.             log.error("Output task log error", e);
  48.         } finally {
  49.             LogUtils.removeTaskInstanceLogFullPathMDC();
  50.         }
  51.     });
  52.     parseProcessOutputExecutorService.shutdown();
  53. }
复制代码
解说里面焦点的两个逻辑
  1. protected LinkedBlockingQueue<String> logBuffer;
  2. public AbstractCommandExecutor(Consumer<LinkedBlockingQueue<String>> logHandler,
  3.                                    TaskExecutionContext taskRequest) {
  4.         this.logHandler = logHandler;
  5.         this.taskRequest = taskRequest;
  6.         this.logBuffer = new LinkedBlockingQueue<>();
  7.         this.logBuffer.add(EMPTY_STRING);
  8.         if (this.taskRequest != null) {
  9.             // set logBufferEnable=true if the task uses logHandler and logBuffer to buffer log messages
  10.             this.taskRequest.setLogBufferEnable(true);
  11.         }
  12.     }
复制代码
通过 logBuffer 暂时存放日记,供parseProcessOutputExecutorService现成消费
  1. 日志的生产端 :
  2. while ((line = inReader.readLine()) != null) {
  3.     // TODO 日志缓冲区
  4.     logBuffer.add(line);
  5.     // TODO 这里解析taskOutputParams,解析比如说 echo '${setValue(output=1)}'。其实就是字符串 ${setValue(output=1)}
  6.     taskOutputParameterParser.appendParseLog(line);
  7. }
  8. 日志的消费端 :
  9. this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskExecutionContext);
  10. public void logHandle(LinkedBlockingQueue<String> logs) {
  11.     StringJoiner joiner = new StringJoiner("\n\t");
  12.     while (!logs.isEmpty()) {
  13.         joiner.add(logs.poll());
  14.     }
  15.     log.info(" -> {}", joiner);
  16. }
  17. while (logBuffer.size() > 1 || !processLogOutputIsSuccess || !podLogOutputIsFinished) {
  18.     if (logBuffer.size() > 1) {
  19.         logHandler.accept(logBuffer);
  20.         logBuffer.clear();
  21.         logBuffer.add(EMPTY_STRING);
  22.     } else {
  23.         // TODO 如果没有日志输出,默认等待1s
  24.         Thread.sleep(TaskConstants.DEFAULT_LOG_FLUSH_INTERVAL);
  25.     }
  26. }
复制代码
以是查看如果是Shell输出日记都是 -> 开头的,比如说
  1. [INFO] 2024-06-24 09:35:44.678 +0800 -  ->
  2.     .
  3.     ├── 1893_1321.sh
  4.     └── input_dir
  5.         ├── test1
  6.         │   └── text.txt
  7.         └── test2
  8.             └── text.txt
  9.    
  10.     3 directories, 3 files
  11.     test1 message
  12.     test2 message
复制代码
  1. while ((line = inReader.readLine()) != null) {
  2.     // TODO 日志缓冲区
  3.     logBuffer.add(line);
  4.     // TODO 这里解析taskOutputParams,解析比如说 echo '${setValue(output=1)}'。其实就是字符串 ${setValue(output=1)}
  5.     taskOutputParameterParser.appendParseLog(line);
  6. }
复制代码
org.apache.dolphinscheduler.plugin.task.api.parser.TaskOutputParameterParser#appendParseLog
  1. public void appendParseLog(String logLine) {
  2.     if (logLine == null) {
  3.         return;
  4.     }
  5.     // TODO 刚开始进来,是不会走这里的
  6.     if (currentTaskOutputParam != null) {
  7.         if (currentTaskOutputParam.size() > maxOneParameterRows
  8.                 || currentTaskOutputParamLength > maxOneParameterLength) {
  9.             log.warn(
  10.                     "The output param expression '{}' is too long, the max rows is {}, max length is {}, will skip this param",
  11.                     String.join("\n", currentTaskOutputParam), maxOneParameterLength, maxOneParameterRows);
  12.             currentTaskOutputParam = null;
  13.             currentTaskOutputParamLength = 0;
  14.             return;
  15.         }
  16.         // continue to parse the rest of line
  17.         int i = logLine.indexOf(")}");
  18.         if (i == -1) {
  19.             // the end of var pool not found
  20.             currentTaskOutputParam.add(logLine);
  21.             currentTaskOutputParamLength += logLine.length();
  22.         } else {
  23.             // the end of var pool found
  24.             currentTaskOutputParam.add(logLine.substring(0, i + 2));
  25.             Pair<String, String> keyValue = parseOutputParam(String.join("\n", currentTaskOutputParam));
  26.             if (keyValue.getKey() != null && keyValue.getValue() != null) {
  27.                 // TODO 解析完毕就放入到taskOutputParams中
  28.                 taskOutputParams.put(keyValue.getKey(), keyValue.getValue());
  29.             }
  30.             currentTaskOutputParam = null;
  31.             currentTaskOutputParamLength = 0;
  32.             // continue to parse the rest of line
  33.             if (i + 2 != logLine.length()) {
  34.                 appendParseLog(logLine.substring(i + 2));
  35.             }
  36.         }
  37.         return;
  38.     }
  39.     int indexOfVarPoolBegin = logLine.indexOf("${setValue(");
  40.     if (indexOfVarPoolBegin == -1) {
  41.         indexOfVarPoolBegin = logLine.indexOf("#{setValue(");
  42.     }
  43.     if (indexOfVarPoolBegin == -1) {
  44.         return;
  45.     }
  46.     currentTaskOutputParam = new ArrayList<>();
  47.     appendParseLog(logLine.substring(indexOfVarPoolBegin));
  48. }
复制代码
解析完毕就放入到taskOutputParams中
更新Pid(向Master汇报)
  1. // update pid before waiting for the run to finish
  2. if (null != taskCallBack) {
  3.     // TODO 更新任务实例信息
  4.     taskCallBack.updateTaskInstanceInfo(processId);
  5. }
复制代码
超时判断
  1. long remainTime = getRemainTime();
  2. private long getRemainTime() {
  3.     long usedTime = (System.currentTimeMillis() - taskRequest.getStartTime()) / 1000;
  4.     long remainTime = taskRequest.getTaskTimeout() - usedTime;
  5.     if (remainTime < 0) {
  6.         throw new RuntimeException("task execution time out");
  7.     }
  8.     return remainTime;
  9. }
  10. ......
  11. // waiting for the run to finish
  12. // TODO 这里其实就是一个超时等待,其实就是说如果不设置超时等待时间,无限等待
  13. boolean status = this.process.waitFor(remainTime, TimeUnit.SECONDS);
复制代码
  1. // TODO 设置退出码
  2. // if SHELL task exit
  3. if (status && kubernetesStatus.isSuccess()) {
  4.     // SHELL task state
  5.     result.setExitStatusCode(this.process.exitValue());
  6. } else {
  7.     log.error("process has failure, the task timeout configuration value is:{}, ready to kill ...",
  8.             taskRequest.getTaskTimeout());
  9.     result.setExitStatusCode(EXIT_CODE_FAILURE);
  10.     cancelApplication();
  11. }
  12. int exitCode = this.process.exitValue();
  13. String exitLogMessage = EXIT_CODE_KILL == exitCode ? "process has killed." : "process has exited.";
  14. log.info("{} execute path:{}, processId:{} ,exitStatusCode:{} ,processWaitForStatus:{} ,processExitValue:{}",
  15.         exitLogMessage, taskRequest.getExecutePath(), processId, result.getExitStatusCode(), status, exitCode);
复制代码
  1. // TODO 执行结果,退出状态码
  2. setExitStatusCode(commandExecuteResult.getExitStatusCode());
  3. // TODO 设置进程ID
  4. setProcessId(commandExecuteResult.getProcessId());
  5. // TODO shellCommandExecutor.getTaskOutputParams()这返回的是比如说 output -> 123
  6. shellParameters.dealOutParam(shellCommandExecutor.getTaskOutputParams());
复制代码
org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters#dealOutParam
  1. public void dealOutParam(Map<String, String> taskOutputParams) {
  2.     // TODO 其实就是说如果localParams不存在,就算设置了输出也不管用
  3.     if (CollectionUtils.isEmpty(localParams)) {
  4.         return;
  5.     }
  6.     // TODO 这里其实就是过滤出来localParams为OUT的参数
  7.     List<Property> outProperty = getOutProperty(localParams);
  8.     if (CollectionUtils.isEmpty(outProperty)) {
  9.         return;
  10.     }
  11.     // TODO 如果taskOutputParams为空,输出参数会放入到varPool中
  12.     if (MapUtils.isEmpty(taskOutputParams)) {
  13.         outProperty.forEach(this::addPropertyToValPool);
  14.         return;
  15.     }
  16.     // TODO 这里其实就是想说,找到outProperty和taskOutputParams相同的key,然后把对应的value换成taskOutputParams中的value
  17.     // TODO 最终放到变量池中
  18.     for (Property info : outProperty) {
  19.         String propValue = taskOutputParams.get(info.getProp());
  20.         if (StringUtils.isNotEmpty(propValue)) {
  21.             info.setValue(propValue);
  22.             addPropertyToValPool(info);
  23.         } else {
  24.             log.warn("Cannot find the output parameter {} in the task output parameters", info.getProp());
  25.         }
  26.     }
  27. }
复制代码
这里实在就是想说,找到outProperty和taskOutputParams相同的key,然后把对应的value换成taskOutputParams中的value,期待向Master汇报存在TaskInstance的变量池中。
  1. protected void afterExecute() throws TaskException {
  2.     if (task == null) {
  3.         throw new TaskException("The current task instance is null");
  4.     }
  5.     // TODO 是否要发送告警,使用JDK动态代理 RPC通信调用alert模块AlertBootstrapService
  6.     sendAlertIfNeeded();
  7.     // TODO 发送结果
  8.     sendTaskResult();
  9.     WorkerTaskExecutorHolder.remove(taskExecutionContext.getTaskInstanceId());
  10.     // TODO common.properties development.state=false,默认是false。如果设置true
  11.     // TODO 就会开发模式,意味着Dolpinscheduler封装的脚本、jar包不清理
  12.     log.info("Remove the current task execute context from worker cache");
  13.     clearTaskExecPathIfNeeded();
  14. }
复制代码
发送效果
  1. protected void sendTaskResult() {
  2.     taskExecutionContext.setCurrentExecutionStatus(task.getExitStatus());
  3.     taskExecutionContext.setProcessId(task.getProcessId());
  4.     taskExecutionContext.setAppIds(task.getAppIds());
  5.     // TODO 其实就是发送变量池,这里是变量池
  6.     taskExecutionContext.setVarPool(JSONUtils.toJsonString(task.getParameters().getVarPool()));
  7.     taskExecutionContext.setEndTime(System.currentTimeMillis());
  8.     // upload out files and modify the "OUT FILE" property in VarPool
  9.     // TODO 上传输出文件并修改输出文件到变量池中
  10.     TaskFilesTransferUtils.uploadOutputFiles(taskExecutionContext, storageOperate);
  11.     log.info("Upload output files: {} successfully",
  12.             TaskFilesTransferUtils.getFileLocalParams(taskExecutionContext, Direct.OUT));
  13.     // TODO 发送任务的结果
  14.     workerMessageSender.sendMessageWithRetry(taskExecutionContext,
  15.             ITaskInstanceExecutionEvent.TaskInstanceExecutionEventType.FINISH);
  16.     log.info("Send task execute status: {} to master : {}", taskExecutionContext.getCurrentExecutionStatus().name(),
  17.             taskExecutionContext.getWorkflowInstanceHost());
  18. }
复制代码
Shell状态码小插曲
  1. [root@node opt]# vim test.sh
  2. [root@node opt]# sh test.sh
  3. me is journey
  4. [root@node opt]# echo $?
  5. 0
  6. [root@node opt]# vim test.sh
  7. [root@node opt]# sh test.sh
  8. test.sh: line 2: echo1: command not found
  9. [root@node opt]# echo $?
  10. 127
  11. [root@node opt]# vim test.sh
  12. [root@node opt]# sh test.sh
  13. me is 10.253.26.85
  14. Killed
  15. [root@node opt]# echo $?
  16. 137
复制代码
总结 : 实在就是想说SHELL任务正常的退出码为0,被kill掉的状态码为137。其他为非常。
任务状态码判断逻辑:
  1. taskExecutionContext.setCurrentExecutionStatus(task.getExitStatus());
  2. org.apache.dolphinscheduler.plugin.task.api.AbstractTask#getExitStatus
  3. // 其实就是说如果状态码返回为0,任务为成功;状态码为137为KILL。其他状态为失败。而task.getExitStatus()状态是由executeTask中设置完成的
  4. public TaskExecutionStatus getExitStatus() {
  5.     switch (getExitStatusCode()) {
  6.         case TaskConstants.EXIT_CODE_SUCCESS:
  7.             return TaskExecutionStatus.SUCCESS;
  8.         case TaskConstants.EXIT_CODE_KILL:
  9.             return TaskExecutionStatus.KILL;
  10.         default:
  11.             return TaskExecutionStatus.FAILURE;
  12.     }
  13. }
复制代码
上传输出文件到资源中心:
org.apache.dolphinscheduler.server.worker.utils.TaskFilesTransferUtils#uploadOutputFiles
  1. public static void uploadOutputFiles(TaskExecutionContext taskExecutionContext,
  2.                                          StorageOperate storageOperate) throws TaskException {
  3.     List<Property> varPools = getVarPools(taskExecutionContext);
  4.     // get map of varPools for quick search
  5.     Map<String, Property> varPoolsMap = varPools.stream().collect(Collectors.toMap(Property::getProp, x -> x));
  6.     // get OUTPUT FILE parameters
  7.     List<Property> localParamsProperty = getFileLocalParams(taskExecutionContext, Direct.OUT);
  8.     if (localParamsProperty.isEmpty()) {
  9.         return;
  10.     }
  11.     log.info("Upload output files ...");
  12.     for (Property property : localParamsProperty) {
  13.         // get local file path
  14.         String path = String.format("%s/%s", taskExecutionContext.getExecutePath(), property.getValue());
  15.         // TODO packIfDir 和 crc其实就是想说,如果是目录,就对目录进行打zip包,然后生成crc。如果是文件就对文件生成crc
  16.         String srcPath = packIfDir(path);
  17.         // get crc file path
  18.         String srcCRCPath = srcPath + CRC_SUFFIX;
  19.         try {
  20.             FileUtils.writeContent2File(FileUtils.getFileChecksum(path), srcCRCPath);
  21.         } catch (IOException ex) {
  22.             throw new TaskException(ex.getMessage(), ex);
  23.         }
  24.         // get remote file path
  25.         // TODO DATA_TRANSFER/DATE/ProcessDefineCode/ProcessDefineVersion_ProcessInstanceID/TaskName_TaskInstanceID_FileName
  26.         String resourcePath = getResourcePath(taskExecutionContext, new File(srcPath).getName());
  27.         String resourceCRCPath = resourcePath + CRC_SUFFIX;
  28.         try {
  29.             // upload file to storage
  30.             // TODO 以hdfs来说
  31.             // TODO hdfs跟路径/tenantCode/resources/DATA_TRANSFER/DATE/ProcessDefineCode/ProcessDefineVersion_ProcessInstanceID/TaskName_TaskInstanceID_FileName
  32.             String resourceWholePath =
  33.                     storageOperate.getResourceFullName(taskExecutionContext.getTenantCode(), resourcePath);
  34.             String resourceCRCWholePath =
  35.                     storageOperate.getResourceFullName(taskExecutionContext.getTenantCode(), resourceCRCPath);
  36.             log.info("{} --- Local:{} to Remote:{}", property, srcPath, resourceWholePath);
  37.             storageOperate.upload(taskExecutionContext.getTenantCode(), srcPath, resourceWholePath, false, true);
  38.             log.info("{} --- Local:{} to Remote:{}", "CRC file", srcCRCPath, resourceCRCWholePath);
  39.             storageOperate.upload(taskExecutionContext.getTenantCode(), srcCRCPath, resourceCRCWholePath, false,
  40.                     true);
  41.         } catch (IOException ex) {
  42.             throw new TaskException("Upload file to storage error", ex);
  43.         }
  44.         // update varPool
  45.         Property oriProperty;
  46.         // if the property is not in varPool, add it
  47.         if (varPoolsMap.containsKey(property.getProp())) { // 理论上不会走到这个分支
  48.             oriProperty = varPoolsMap.get(property.getProp());
  49.         } else {
  50.             oriProperty = new Property(property.getProp(), Direct.OUT, DataType.FILE, property.getValue());
  51.             // TODO 添加到变量池中
  52.             varPools.add(oriProperty);
  53.         }
  54.         // TODO 这里就设置了任务名称.property name
  55.         oriProperty.setProp(String.format("%s.%s", taskExecutionContext.getTaskName(), oriProperty.getProp()));
  56.         // TODO 这里很关键,其实就是把资源的相对路径放入到了变量池对应的value中
  57.         oriProperty.setValue(resourcePath);
  58.     }
  59.     // TODO 这里是设置FILE的变量池
  60.     taskExecutionContext.setVarPool(JSONUtils.toJsonString(varPools));
  61. }
复制代码
发送任务的效果 :
  1. workerMessageSender.sendMessageWithRetry(taskExecutionContext,
  2.                 ITaskInstanceExecutionEvent.TaskInstanceExecutionEventType.FINISH);
复制代码
3、WorkerMessageSender组件作用


4、Kill任务逻辑

org.apache.dolphinscheduler.server.worker.runner.operator.TaskInstanceKillOperationFunction#operate
  1. public TaskInstanceKillResponse operate(TaskInstanceKillRequest taskInstanceKillRequest) {
  2.     log.info("Receive TaskInstanceKillRequest: {}", taskInstanceKillRequest);
  3.     // TODO 任务实例
  4.     int taskInstanceId = taskInstanceKillRequest.getTaskInstanceId();
  5.     try {
  6.         LogUtils.setTaskInstanceIdMDC(taskInstanceId);
  7.         // TODO Worker任务执行器
  8.         WorkerTaskExecutor workerTaskExecutor = WorkerTaskExecutorHolder.get(taskInstanceId);
  9.         if (workerTaskExecutor == null) {
  10.             log.error("Cannot find WorkerTaskExecutor for taskInstance: {}", taskInstanceId);
  11.             return TaskInstanceKillResponse.fail("Cannot find WorkerTaskExecutor");
  12.         }
  13.         // TODO 任务执行上下文
  14.         TaskExecutionContext taskExecutionContext = workerTaskExecutor.getTaskExecutionContext();
  15.         LogUtils.setTaskInstanceLogFullPathMDC(taskExecutionContext.getLogPath());
  16.         // TODO 这里会进行kill
  17.         boolean result = doKill(taskExecutionContext);
  18.         // TODO 使用 Process.destroy() 是 Java 中 Process 类的一个方法,用于销毁与该 Process 对象关联的子进程
  19.         this.cancelApplication(workerTaskExecutor);
  20.         int processId = taskExecutionContext.getProcessId();
  21.         // TODO 这里其实想说的是,如果processId为0,直接把该任务的状态设置为KILL,然后在Worker上报信息的时候就会把KILL状态上报上去
  22.         // TODO 一定要注意,当前情况不一定是真正的kill掉,只是让DS里面的状态是对的
  23.         if (processId == 0) {
  24.             workerManager.killTaskBeforeExecuteByInstanceId(taskInstanceId);
  25.             taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.KILL);
  26.             // todo: the task might be executed, but the processId is 0
  27.             WorkerTaskExecutorHolder.remove(taskInstanceId);
  28.             log.info("The task has not been executed and has been cancelled, task id:{}", taskInstanceId);
  29.             return TaskInstanceKillResponse.success(taskExecutionContext);
  30.         }
  31.         // TODO 这个其实就是说明,我kill掉了。成功了。然后这个时候Worker其实会感知到任务被kill掉,在他的sendResult FINISH的时候上报
  32.         // TODO 上去就可以了
  33.         taskExecutionContext
  34.                 .setCurrentExecutionStatus(result ? TaskExecutionStatus.SUCCESS : TaskExecutionStatus.FAILURE);
  35.         WorkerTaskExecutorHolder.remove(taskExecutionContext.getTaskInstanceId());
  36.         // TODO 删除重试消息
  37.         messageRetryRunner.removeRetryMessages(taskExecutionContext.getTaskInstanceId());
  38.         return TaskInstanceKillResponse.success(taskExecutionContext);
  39.     } finally {
  40.         LogUtils.removeTaskInstanceIdMDC();
  41.         LogUtils.removeTaskInstanceLogFullPathMDC();
  42.     }
  43. }
复制代码
杀历程和yarn上的任务 :
  1. // TODO 这里会进行kill
  2. boolean result = doKill(taskExecutionContext);
复制代码
org.apache.dolphinscheduler.server.worker.runner.operator.TaskInstanceKillOperationFunction#doKill
  1. private boolean doKill(TaskExecutionContext taskExecutionContext) {
  2.     // kill system process
  3.     // TODO 杀死Shell关联的进程
  4.     boolean processFlag = killProcess(taskExecutionContext.getTenantCode(), taskExecutionContext.getProcessId());
  5.     // TODO kill yarn or k8s application
  6.     try {
  7.         ProcessUtils.cancelApplication(taskExecutionContext);
  8.     } catch (TaskException e) {
  9.         return false;
  10.     }
  11.     return processFlag;
  12. }
复制代码
org.apache.dolphinscheduler.server.worker.runner.operator.TaskInstanceKillOperationFunction#killProcess
杀历程和子历程: 注意,这里和官网有修改,如果有非常打印warn就好,由于有时候不能有权限杀死所有历程。
  1. protected boolean killProcess(String tenantCode, Integer processId) {
  2.     // todo: directly interrupt the process
  3.     if (processId == null || processId.equals(0)) {
  4.         return true;
  5.     }
  6.     try {
  7.         String pidsStr = ProcessUtils.getPidsStr(processId);
  8.         if (!Strings.isNullOrEmpty(pidsStr)) {
  9.             String cmd = String.format("kill -9 %s", pidsStr);
  10.             cmd = OSUtils.getSudoCmd(tenantCode, cmd);
  11.             log.info("process id:{}, cmd:{}", processId, cmd);
  12.             OSUtils.exeCmd(cmd);
  13.         }
  14.     } catch (Exception e) {
  15.         log.warn("kill task error", e);
  16.     }
  17.     return true;
  18. }
复制代码
杀死yarn上的任务
org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils#cancelApplication
  1. public static void cancelApplication(TaskExecutionContext taskExecutionContext) {
  2.     try {
  3.         // TODO k8s
  4.         if (Objects.nonNull(taskExecutionContext.getK8sTaskExecutionContext())) {
  5.             if (!TASK_TYPE_SET_K8S.contains(taskExecutionContext.getTaskType())) {
  6.                 // Set empty container name for Spark on K8S task
  7.                 applicationManagerMap.get(ResourceManagerType.KUBERNETES)
  8.                         .killApplication(new KubernetesApplicationManagerContext(
  9.                                 taskExecutionContext.getK8sTaskExecutionContext(),
  10.                                 taskExecutionContext.getTaskAppId(), ""));
  11.             }
  12.         } else {
  13.             // TODO YARN
  14.             String host = taskExecutionContext.getHost();
  15.             String executePath = taskExecutionContext.getExecutePath();
  16.             String tenantCode = taskExecutionContext.getTenantCode();
  17.             List<String> appIds;
  18.             // TODO 容错的走这个逻辑
  19.             if (StringUtils.isNotEmpty(taskExecutionContext.getAppIds())) {
  20.                 // is failover
  21.                 appIds = Arrays.asList(taskExecutionContext.getAppIds().split(COMMA));
  22.             } else {
  23.                 String logPath = taskExecutionContext.getLogPath();
  24.                 String appInfoPath = taskExecutionContext.getAppInfoPath();
  25.                 if (logPath == null || appInfoPath == null || executePath == null || tenantCode == null) {
  26.                     log.error(
  27.                             "Kill yarn job error, the input params is illegal, host: {}, logPath: {}, appInfoPath: {}, executePath: {}, tenantCode: {}",
  28.                             host, logPath, appInfoPath, executePath, tenantCode);
  29.                     throw new TaskException("Cancel application failed!");
  30.                 }
  31.                 log.info("Get appIds from worker {}, taskLogPath: {}", host, logPath);
  32.                 // TODO 这里就是正则解析log获取appIds
  33.                 appIds = LogUtils.getAppIds(logPath, appInfoPath,
  34.                         PropertyUtils.getString(APPID_COLLECT, DEFAULT_COLLECT_WAY));
  35.                 taskExecutionContext.setAppIds(String.join(TaskConstants.COMMA, appIds));
  36.             }
  37.             // TODO 如果这里说明appIds是不存在的
  38.             if (CollectionUtils.isEmpty(appIds)) {
  39.                 log.info("The appId is empty");
  40.                 return;
  41.             }
  42.             ApplicationManager applicationManager = applicationManagerMap.get(ResourceManagerType.YARN);
  43.             applicationManager.killApplication(new YarnApplicationManagerContext(executePath, tenantCode, appIds));
  44.         }
  45.     } catch (Exception e) {
  46.         log.error("Cancel application failed.", e);
  47.     }
  48. }
复制代码
task日记中使用正则表达式来解析appIds,这里默认走log,不走aop。
  1. appIds = LogUtils.getAppIds(logPath, appInfoPath,
  2.                             PropertyUtils.getString(APPID_COLLECT, DEFAULT_COLLECT_WAY));
  3. public List<String> getAppIds(String logPath, String appInfoPath, String fetchWay) {
  4.     if (!StringUtils.isEmpty(fetchWay) && fetchWay.equals("aop")) {
  5.         log.info("Start finding appId in {}, fetch way: {} ", appInfoPath, fetchWay);
  6.         // TODO 如果走aop拦截的写的日志文件中读取
  7.         return getAppIdsFromAppInfoFile(appInfoPath);
  8.     } else {
  9.         log.info("Start finding appId in {}, fetch way: {} ", logPath, fetchWay);
  10.         // TODO 从日志中进行正则匹配
  11.         return getAppIdsFromLogFile(logPath);
  12.     }
  13. }
复制代码
真正地来杀yarn上的任务
  1. applicationManager.killApplication(new YarnApplicationManagerContext(executePath, tenantCode, appIds));
  2. org.apache.dolphinscheduler.plugin.task.api.am.YarnApplicationManager#killApplication
  3. public boolean killApplication(ApplicationManagerContext applicationManagerContext) throws TaskException {
  4.     YarnApplicationManagerContext yarnApplicationManagerContext =
  5.             (YarnApplicationManagerContext) applicationManagerContext;
  6.     String executePath = yarnApplicationManagerContext.getExecutePath();
  7.     String tenantCode = yarnApplicationManagerContext.getTenantCode();
  8.     List<String> appIds = yarnApplicationManagerContext.getAppIds();
  9.     try {
  10.         String commandFile = String.format("%s/%s.kill", executePath, String.join(Constants.UNDERLINE, appIds));
  11.         String cmd = getKerberosInitCommand() + "yarn application -kill " + String.join(Constants.SPACE, appIds);
  12.         execYarnKillCommand(tenantCode, commandFile, cmd);
  13.     } catch (Exception e) {
  14.          log.warn("Kill yarn application {} failed", appIds, e);
  15.     }
  16.     return true;
  17. }
  18. execYarnKillCommand需要注意,因为使用 yarn application -kill。yarn命令可能没有。增加ENV_SOURCE_LIST
  19. private void execYarnKillCommand(String tenantCode, String commandFile,
  20.                                      String cmd) throws Exception {
  21.     StringBuilder sb = new StringBuilder();
  22.     sb.append("#!/bin/sh\n");
  23.     sb.append("BASEDIR=$(cd `dirname $0`; pwd)\n");
  24.     sb.append("cd $BASEDIR\n");
  25.     // TODO 在这里是设置默认的,比如说可以设置为 /etc/profile
  26.     if (CollectionUtils.isNotEmpty(ShellUtils.ENV_SOURCE_LIST)) {
  27.         // TODO 这里其实就是向 systemEnvs 中加入ENV_SOURCE_LIST中配置的环境文件的列表
  28.         ShellUtils.ENV_SOURCE_LIST.forEach(env -> sb.append("source " + env + "\n"));
  29.     }
  30.     sb.append("\n\n");
  31.     sb.append(cmd);
  32.     File f = new File(commandFile);
  33.     if (!f.exists()) {
  34.         org.apache.commons.io.FileUtils.writeStringToFile(new File(commandFile), sb.toString(),
  35.                 StandardCharsets.UTF_8);
  36.     }
  37.     String runCmd = String.format("%s %s", Constants.SH, commandFile);
  38.     runCmd = org.apache.dolphinscheduler.common.utils.OSUtils.getSudoCmd(tenantCode, runCmd);
  39.     log.info("kill cmd:{}", runCmd);
  40.     org.apache.dolphinscheduler.common.utils.OSUtils.exeCmd(runCmd);
  41. }
复制代码
总结 : 如果成功把任务kill掉了,WorkerTaskExecutor会感知到的,进而进行KILL状态的FINISH汇报。如果任务已经完成,如果PID=0,将任务上下文状态设置为KILL,线程池中移除该WorkerTaskExecutor线程,WorkerTaskExecutorHolder移除该缓存
5、停息
  1. public class TaskInstancePauseOperationFunction
  2.         implements
  3.             ITaskInstanceOperationFunction<TaskInstancePauseRequest, TaskInstancePauseResponse> {
  4.     @Override
  5.     public TaskInstancePauseResponse operate(TaskInstancePauseRequest taskInstancePauseRequest) {
  6.         try {
  7.             LogUtils.setTaskInstanceIdMDC(taskInstancePauseRequest.getTaskInstanceId());
  8.             log.info("Receive TaskInstancePauseRequest: {}", taskInstancePauseRequest);
  9.             log.warn("TaskInstancePauseOperationFunction is not support for worker task yet!");
  10.             return TaskInstancePauseResponse.success();
  11.         } finally {
  12.             LogUtils.removeTaskInstanceIdMDC();
  13.         }
  14.     }
  15. }
复制代码
划重点 :
实在停息来说对于Worker来说,什么也不做。也做不了,你想想真的都能让任务停息么?除非是引擎程序中有所控制,像MR、SPARK、FLINK这种是不能停息,停息的焦点逻辑是给流程实例发送一个通知,告诉流程实例我要进行流程的停息,让正在运行任务的下一个任务进行停息,当然比如说只有一个任务,任务停息不了,末了只能成功。还有一种环境就是比如说是末了一个任务,也停息不了。还有就是执行的很快,你停息的时候,正好程序要往下执行,而鄙俚已没有任务的环境。这种都是停息不了的。

6、更新流程实例host

这个属于容错,容错章节再详细说。
转载自Journey
原文链接:https://segmentfault.com/a/1190000044966573
本文由 白鲸开源 提供发布支持!

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4