Apache DolphinScheduler的Worker模块是其分布式调度系统的焦点组件之一,负责任务执行、资源管理及集群动态调度。本文将通过源码剖析,揭示其设计头脑与实现细节.
1、Worker接收Master RPC哀求架构图
Worker服务的Netty提供和Master JDK动态代理接口调用,请参考Dolphinscheduler告警模块解说,不再重复地说。
简说 :
org.apache.dolphinscheduler.extract.worker.ITaskInstanceOperator- @RpcService
- public interface ITaskInstanceOperator {
- @RpcMethod
- TaskInstanceDispatchResponse dispatchTask(TaskInstanceDispatchRequest taskInstanceDispatchRequest);
- @RpcMethod
- TaskInstanceKillResponse killTask(TaskInstanceKillRequest taskInstanceKillRequest);
- @RpcMethod
- TaskInstancePauseResponse pauseTask(TaskInstancePauseRequest taskPauseRequest);
- @RpcMethod
- UpdateWorkflowHostResponse updateWorkflowInstanceHost(UpdateWorkflowHostRequest updateWorkflowHostRequest);
- }
复制代码 对实现了@RpcService的接口和@RpcMethod的方法,进行Worker的Netty handler注入和Master动态代理实现。
2、分发任务
(TaskInstanceDispatchOperationFunction)
2.1、WorkerConfig
WorkerConfig : 实在就是从Worker模块下 application.yaml 下读取 worker 开头的设置
2.2、WorkerTaskExecutorFactoryBuilder
WorkerTaskExecutorFactoryBuilder : 是任务执行器工厂的构造器,里面封装了 DefaultWorkerTaskExecutorFactory(默认Worker任务执行器工厂) ,DefaultWorkerTaskExecutorFactory工厂又封装了 DefaultWorkerTaskExecutor 的创建。DefaultWorkerTaskExecutor 的父类是WorkerTaskExecutor,WorkerTaskExecutor又是一个线程。好玩不?
2.3、WorkerTaskExecutorThreadPool
WorkerTaskExecutorThreadPool : 实在就是Fixed线程池的封装而已
2.4、从operator开始说
- public TaskInstanceDispatchResponse operate(TaskInstanceDispatchRequest taskInstanceDispatchRequest) {
- log.info("Receive TaskInstanceDispatchRequest: {}", taskInstanceDispatchRequest);
- // TODO 任务执行上下文
- TaskExecutionContext taskExecutionContext = taskInstanceDispatchRequest.getTaskExecutionContext();
- try {
- // TODO 设置worker地址
- taskExecutionContext.setHost(workerConfig.getWorkerAddress());
- // TODO 设置task日志存放路径
- taskExecutionContext.setLogPath(LogUtils.getTaskInstanceLogFullPath(taskExecutionContext));
- // TODO MDC中设置流程实例id和任务实例id,好像只是put,没有get使用
- LogUtils.setWorkflowAndTaskInstanceIDMDC(
- taskExecutionContext.getProcessInstanceId(),
- taskExecutionContext.getTaskInstanceId());
- // check server status, if server is not running, return failed to reject this task
- if (!ServerLifeCycleManager.isRunning()) {
- log.error("server is not running. reject task: {}", taskExecutionContext.getProcessInstanceId());
- return TaskInstanceDispatchResponse.failed(taskExecutionContext.getTaskInstanceId(),
- "server is not running");
- }
- TaskMetrics.incrTaskTypeExecuteCount(taskExecutionContext.getTaskType());
- // TODO 通过WorkerTaskExecutorFactoryBuilder创建了一个WorkerTaskExecutor
- WorkerTaskExecutor workerTaskExecutor = workerTaskExecutorFactoryBuilder
- .createWorkerTaskExecutorFactory(taskExecutionContext)
- .createWorkerTaskExecutor();
- // todo: hold the workerTaskExecutor
- // TODO 直接进行任务的提交
- if (!workerTaskExecutorThreadPool.submitWorkerTaskExecutor(workerTaskExecutor)) {
- log.info("Submit task: {} to wait queue failed", taskExecutionContext.getTaskName());
- return TaskInstanceDispatchResponse.failed(taskExecutionContext.getTaskInstanceId(),
- "WorkerManagerThread is full");
- } else {
- log.info("Submit task: {} to wait queue success", taskExecutionContext.getTaskName());
- return TaskInstanceDispatchResponse.success(taskExecutionContext.getTaskInstanceId());
- }
- } finally {
- LogUtils.removeWorkflowAndTaskInstanceIdMDC();
- }
- }
复制代码 LogUtils.getTaskInstanceLogFullPath(taskExecutionContext) 解析
org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils#getTaskInstanceLogFullPath : 获取任务日记的全路径- /**
- * Get task instance log full path.
- *
- * @param taskExecutionContext task execution context.
- * @return task instance log full path.
- */
- public static String getTaskInstanceLogFullPath(TaskExecutionContext taskExecutionContext) {
- return getTaskInstanceLogFullPath(
- DateUtils.timeStampToDate(taskExecutionContext.getFirstSubmitTime()),
- taskExecutionContext.getProcessDefineCode(),
- taskExecutionContext.getProcessDefineVersion(),
- taskExecutionContext.getProcessInstanceId(),
- taskExecutionContext.getTaskInstanceId());
- }
复制代码 org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils#getTaskInstanceLogFullPath : 拼接出任务日记的全路径- /**
- * todo: Remove the submitTime parameter?
- * The task instance log full path, the path is like:{log.base}/{taskSubmitTime}/{workflowDefinitionCode}/{workflowDefinitionVersion}/{}workflowInstance}/{taskInstance}.log
- *
- * @param taskFirstSubmitTime task first submit time
- * @param workflowDefinitionCode workflow definition code
- * @param workflowDefinitionVersion workflow definition version
- * @param workflowInstanceId workflow instance id
- * @param taskInstanceId task instance id.
- * @return task instance log full path.
- */
- public static String getTaskInstanceLogFullPath(Date taskFirstSubmitTime,
- Long workflowDefinitionCode,
- int workflowDefinitionVersion,
- int workflowInstanceId,
- int taskInstanceId) {
- if (TASK_INSTANCE_LOG_BASE_PATH == null) {
- throw new IllegalArgumentException(
- "Cannot find the task instance log base path, please check your logback.xml file");
- }
- final String taskLogFileName = Paths.get(
- String.valueOf(workflowDefinitionCode),
- String.valueOf(workflowDefinitionVersion),
- String.valueOf(workflowInstanceId),
- String.format("%s.log", taskInstanceId)).toString();
- return TASK_INSTANCE_LOG_BASE_PATH
- .resolve(DateUtils.format(taskFirstSubmitTime, DateConstants.YYYYMMDD, null))
- .resolve(taskLogFileName)
- .toString();
- }
复制代码 org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils#getTaskInstanceLogBasePath : 读取logback-spring.xml中的设置,获取任务实例日记的基础路径,实在就是获取根目次下/logs为基础路径- /**
- * Get task instance log base absolute path, this is defined in logback.xml
- *
- * @return
- */
- public static Path getTaskInstanceLogBasePath() {
- return Optional.of(LoggerFactory.getILoggerFactory())
- .map(e -> (AppenderAttachable<ILoggingEvent>) (e.getLogger("ROOT")))
- .map(e -> (SiftingAppender) (e.getAppender("TASKLOGFILE")))
- .map(e -> ((TaskLogDiscriminator) (e.getDiscriminator())))
- .map(TaskLogDiscriminator::getLogBase)
- .map(e -> Paths.get(e).toAbsolutePath())
- .orElse(null);
- }
复制代码 worker的 logback-spring.xml :- <configuration scan="true" scanPeriod="120 seconds">
- <property name="log.base" value="logs"/>
- ...
- <appender name="TASKLOGFILE" >
- <filter />
- <Discriminator >
- <key>taskInstanceLogFullPath</key>
- <logBase>${log.base}</logBase>
- </Discriminator>
- <sift>
- <appender name="FILE-${taskInstanceLogFullPath}" >
- <file>${taskInstanceLogFullPath}</file>
- <encoder>
- <pattern>
- [%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} - %message%n
- </pattern>
- <charset>UTF-8</charset>
- </encoder>
- <append>true</append>
- </appender>
- </sift>
- </appender>
- ...
- <root level="INFO">
- <appender-ref ref="STDOUT"/>
- <appender-ref ref="TASKLOGFILE"/>
- </root>
- </configuration>
复制代码 最终地址是:- /opt/dolphinscheduler/worker-server/logs/20240615/13929490938784/1/1815/1202.log
复制代码 2.5、DefaultWorkerTaskExecutor解说
org.apache.dolphinscheduler.server.worker.runner.operator.TaskInstanceDispatchOperationFunction#operate- // TODO 通过WorkerTaskExecutorFactoryBuilder创建了一个WorkerTaskExecutor
- WorkerTaskExecutor workerTaskExecutor = workerTaskExecutorFactoryBuilder
- .createWorkerTaskExecutorFactory(taskExecutionContext)
- .createWorkerTaskExecutor();
- // todo: hold the workerTaskExecutor
- // TODO 直接进行任务的提交
- if (!workerTaskExecutorThreadPool.submitWorkerTaskExecutor(workerTaskExecutor)) {
- log.info("Submit task: {} to wait queue failed", taskExecutionContext.getTaskName());
- return TaskInstanceDispatchResponse.failed(taskExecutionContext.getTaskInstanceId(),
- "WorkerManagerThread is full");
- } else {
- log.info("Submit task: {} to wait queue success", taskExecutionContext.getTaskName());
- return TaskInstanceDispatchResponse.success(taskExecutionContext.getTaskInstanceId());
- }
复制代码 直接使用 workerTaskExecutorThreadPool.submitWorkerTaskExecutor(workerTaskExecutor)进行任务的提交
WorkerTaskExecutor 是一个线程,既然是线程,是不是要看一下run :- public void run() {
- try {
- // TODO MDC中设置流程实例和任务实例,其实就相当于是ThreadLocal使用一样
- LogUtils.setWorkflowAndTaskInstanceIDMDC(
- taskExecutionContext.getProcessInstanceId(),
- taskExecutionContext.getTaskInstanceId());
- // TODO MDC中设置任务的日志路径
- LogUtils.setTaskInstanceLogFullPathMDC(taskExecutionContext.getLogPath());
- // TODO 打印任务的头部
- TaskInstanceLogHeader.printInitializeTaskContextHeader();
- // TODO 进行任务的初始化,其实就是做了任务的开始时间和taskAppId(流程实例id + 任务实例id)
- initializeTask();
- // TODO DRY_RUN其实就是空跑,其实就是直接设置状态为成功
- if (DRY_RUN_FLAG_YES == taskExecutionContext.getDryRun()) {
- taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.SUCCESS);
- taskExecutionContext.setEndTime(System.currentTimeMillis());
- WorkerTaskExecutorHolder.remove(taskExecutionContext.getTaskInstanceId());
- // TODO 通过worker消息发送器将结果信息发送过去
- workerMessageSender.sendMessageWithRetry(taskExecutionContext,
- ITaskInstanceExecutionEvent.TaskInstanceExecutionEventType.FINISH);
- log.info(
- "The current execute mode is dry run, will stop the subsequent process and set the taskInstance status to success");
- return;
- }
- // TODO 打印任务插件的头部
- TaskInstanceLogHeader.printLoadTaskInstancePluginHeader();
- // TODO 执行之前
- beforeExecute();
- // TODO 回调函数
- TaskCallBack taskCallBack = TaskCallbackImpl.builder()
- .workerMessageSender(workerMessageSender)
- .taskExecutionContext(taskExecutionContext)
- .build();
- TaskInstanceLogHeader.printExecuteTaskHeader();
- // TODO 执行
- executeTask(taskCallBack);
- TaskInstanceLogHeader.printFinalizeTaskHeader();
- // TODO 执行之后
- afterExecute();
- closeLogAppender();
- } catch (Throwable ex) {
- log.error("Task execute failed, due to meet an exception", ex);
- afterThrowing(ex);
- closeLogAppender();
- } finally {
- LogUtils.removeWorkflowAndTaskInstanceIdMDC();
- LogUtils.removeTaskInstanceLogFullPathMDC();
- }
- }
复制代码 重点分析:
- 2.5.1、空跑
如果是空跑,任务直接成功,不执行
- // TODO DRY_RUN其实就是空跑,其实就是直接设置状态为成功
- if (DRY_RUN_FLAG_YES == taskExecutionContext.getDryRun()) {
- taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.SUCCESS);
- taskExecutionContext.setEndTime(System.currentTimeMillis());
- WorkerTaskExecutorHolder.remove(taskExecutionContext.getTaskInstanceId());
- // TODO 通过worker消息发送器将结果信息发送过去
- workerMessageSender.sendMessageWithRetry(taskExecutionContext,
- ITaskInstanceExecutionEvent.TaskInstanceExecutionEventType.FINISH);
- log.info(
- "The current execute mode is dry run, will stop the subsequent process and set the taskInstance status to success");
- return;
- }
复制代码 执行之前的准备工作,比如说给Master汇报说本身正在运行、创建租户(linux上用户)、创建工作路径、下载所需资源文件、任务初始化**- protected void beforeExecute() {
- // TODO 先设置为RUNNING状态
- taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.RUNNING_EXECUTION);
- // TODO 向Master发送消息,告诉Master这个任务正在运行
- workerMessageSender.sendMessageWithRetry(taskExecutionContext,
- ITaskInstanceExecutionEvent.TaskInstanceExecutionEventType.RUNNING);
- log.info("Send task status {} master: {}", TaskExecutionStatus.RUNNING_EXECUTION.name(),
- taskExecutionContext.getWorkflowInstanceHost());
- // In most of case the origin tenant is the same as the current tenant
- // Except `default` tenant. The originTenant is used to download the resources
- // TODO 租户信息
- String originTenant = taskExecutionContext.getTenantCode();
- String tenant = TaskExecutionContextUtils.getOrCreateTenant(workerConfig, taskExecutionContext);
- taskExecutionContext.setTenantCode(tenant);
- log.info("TenantCode: {} check successfully", taskExecutionContext.getTenantCode());
- // TODO 创建工作路径
- TaskExecutionContextUtils.createTaskInstanceWorkingDirectory(taskExecutionContext);
- log.info("WorkflowInstanceExecDir: {} check successfully", taskExecutionContext.getExecutePath());
- TaskChannel taskChannel =
- Optional.ofNullable(taskPluginManager.getTaskChannelMap().get(taskExecutionContext.getTaskType()))
- .orElseThrow(() -> new TaskPluginException(taskExecutionContext.getTaskType()
- + " task plugin not found, please check the task type is correct."));
- log.info("Create TaskChannel: {} successfully", taskChannel.getClass().getName());
- // TODO 下载资源
- ResourceContext resourceContext = TaskExecutionContextUtils.downloadResourcesIfNeeded(originTenant, taskChannel,
- storageOperate, taskExecutionContext);
- taskExecutionContext.setResourceContext(resourceContext);
- log.info("Download resources successfully: \n{}", taskExecutionContext.getResourceContext());
- TaskFilesTransferUtils.downloadUpstreamFiles(taskExecutionContext, storageOperate);
- log.info("Download upstream files: {} successfully",
- TaskFilesTransferUtils.getFileLocalParams(taskExecutionContext, Direct.IN));
- // TODO 创建任务
- task = taskChannel.createTask(taskExecutionContext);
- log.info("Task plugin instance: {} create successfully", taskExecutionContext.getTaskType());
- // todo: remove the init method, this should initialize in constructor method
- // TODO 任务进行初始化
- task.init();
- log.info("Success initialized task plugin instance successfully");
- task.getParameters().setVarPool(taskExecutionContext.getVarPool());
- log.info("Set taskVarPool: {} successfully", taskExecutionContext.getVarPool());
- }
复制代码 1、日记打印- log.info("Send task status {} master: {}", TaskExecutionStatus.RUNNING_EXECUTION.name(),
- taskExecutionContext.getWorkflowInstanceHost());
- 这里需要打印的是 taskExecutionContext.getWorkflowInstanceHost(),不应该是taskExecutionContext.getHost()。就是说你给Master汇报信息的呢,打印自己Worker节点的host干啥(自己肯定知道啊),有用的是当前Worker节点是给哪个Master节点汇报自己的任务状态的
复制代码 2、创建租户
org.apache.dolphinscheduler.server.worker.utils.TaskExecutionContextUtils#getOrCreateTenant- public static String getOrCreateTenant(WorkerConfig workerConfig, TaskExecutionContext taskExecutionContext) {
- try {
- TenantConfig tenantConfig = workerConfig.getTenantConfig();
- String tenantCode = taskExecutionContext.getTenantCode();
- if (TenantConstants.DEFAULT_TENANT_CODE.equals(tenantCode) && tenantConfig.isDefaultTenantEnabled()) {
- log.info("Current tenant is default tenant, will use bootstrap user: {} to execute the task",
- TenantConstants.BOOTSTRAPT_SYSTEM_USER);
- return TenantConstants.BOOTSTRAPT_SYSTEM_USER;
- }
- boolean osUserExistFlag;
- // if Using distributed is true and Currently supported systems are linux,Should not let it
- // automatically
- // create tenants,so TenantAutoCreate has no effect
- if (tenantConfig.isDistributedTenantEnabled() && SystemUtils.IS_OS_LINUX) {
- // use the id command to judge in linux
- osUserExistFlag = OSUtils.existTenantCodeInLinux(tenantCode);
- } else if (OSUtils.isSudoEnable() && tenantConfig.isAutoCreateTenantEnabled()) {
- // if not exists this user, then create
- // TODO 默认走的是这里的分支,直接通过 sudo useradd -g %s %s 进行创建
- OSUtils.createUserIfAbsent(tenantCode);
- osUserExistFlag = OSUtils.getUserList().contains(tenantCode);
- } else {
- osUserExistFlag = OSUtils.getUserList().contains(tenantCode);
- }
- if (!osUserExistFlag) {
- throw new TaskException(String.format("TenantCode: %s doesn't exist", tenantCode));
- }
- return tenantCode;
- } catch (TaskException ex) {
- throw ex;
- } catch (Exception ex) {
- throw new TaskException(
- String.format("TenantCode: %s doesn't exist", taskExecutionContext.getTenantCode()));
- }
- }
复制代码 3、TaskChannel
TaskPluginManager Master启动的时候通 google的 @AutoService来完成SPI注册。
Master启动时候TaskPluginManager初始化
org.apache.dolphinscheduler.server.master.MasterServer#run- @PostConstruct
- public void run() throws SchedulerException {
- ......
- // install task plugin
- // TODO 是通过 google的 @AutoService来进行SPI注册的
- this.taskPluginManager.loadPlugin();
- ......
- }
复制代码 org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager#loadPlugin- public void loadPlugin() {
- if (!loadedFlag.compareAndSet(false, true)) {
- log.warn("The task plugin has already been loaded");
- return;
- }
- // TODO 实例化的时候是通过SPI进行加载的
- PrioritySPIFactory<TaskChannelFactory> prioritySPIFactory = new PrioritySPIFactory<>(TaskChannelFactory.class);
- for (Map.Entry<String, TaskChannelFactory> entry : prioritySPIFactory.getSPIMap().entrySet()) {
- String factoryName = entry.getKey();
- TaskChannelFactory factory = entry.getValue();
- log.info("Registering task plugin: {} - {}", factoryName, factory.getClass().getSimpleName());
- taskChannelFactoryMap.put(factoryName, factory);
- taskChannelMap.put(factoryName, factory.create());
- log.info("Registered task plugin: {} - {}", factoryName, factory.getClass().getSimpleName());
- }
- }
复制代码 焦点逻辑实在就是
TaskChannelFactory 接口 :- public interface TaskChannelFactory extends UiChannelFactory, PrioritySPI {
- TaskChannel create();
- default SPIIdentify getIdentify() {
- return SPIIdentify.builder().name(getName()).build();
- }
- }
复制代码 Task插件都实现了TaskChannelFactory接口并使用@AutoService注解 :
以ShellTaskChannelFactory为例 :- @AutoService(TaskChannelFactory.class)
- public class ShellTaskChannelFactory implements TaskChannelFactory {
- @Override
- public TaskChannel create() {
- return new ShellTaskChannel();
- }
- @Override
- public String getName() {
- return "SHELL";
- }
- @Override
- public List<PluginParams> getParams() {
- List<PluginParams> paramsList = new ArrayList<>();
- InputParam nodeName = InputParam.newBuilder("name", "$t('Node name')")
- .addValidate(Validate.newBuilder()
- .setRequired(true)
- .build())
- .build();
- RadioParam runFlag = RadioParam.newBuilder("runFlag", "RUN_FLAG")
- .addParamsOptions(new ParamsOptions("NORMAL", "NORMAL", false))
- .addParamsOptions(new ParamsOptions("FORBIDDEN", "FORBIDDEN", false))
- .build();
- paramsList.add(nodeName);
- paramsList.add(runFlag);
- return paramsList;
- }
- }
复制代码 在这里创建了 ShellTaskChannel,也就是TaskChannel
4、下载所需资源
org.apache.dolphinscheduler.server.worker.utils.TaskExecutionContextUtils#downloadResourcesIfNeeded- public static ResourceContext downloadResourcesIfNeeded(String tenant,
- TaskChannel taskChannel,
- StorageOperate storageOperate,
- TaskExecutionContext taskExecutionContext) {
- AbstractParameters abstractParameters = taskChannel.parseParameters(
- ParametersNode.builder()
- .taskType(taskExecutionContext.getTaskType())
- .taskParams(taskExecutionContext.getTaskParams())
- .build());
- // TODO 其实这里如果要是Sql,这里直接 ArrayList<>()了,下面就不走了
- List<ResourceInfo> resourceFilesList = abstractParameters.getResourceFilesList();
- if (CollectionUtils.isEmpty(resourceFilesList)) {
- log.debug("There is no resource file need to download");
- return new ResourceContext();
- }
- ResourceContext resourceContext = new ResourceContext();
- String taskWorkingDirectory = taskExecutionContext.getExecutePath();
- for (ResourceInfo resourceInfo : resourceFilesList) {
- // TODO 在存储中的路径,比如说hdfs上的文件路径
- String resourceAbsolutePathInStorage = resourceInfo.getResourceName();
- // TODO 文件名称
- String resourceRelativePath = storageOperate.getResourceFileName(tenant, resourceAbsolutePathInStorage);
- // TODO 本地的绝对路径
- String resourceAbsolutePathInLocal = Paths.get(taskWorkingDirectory, resourceRelativePath).toString();
- File file = new File(resourceAbsolutePathInLocal);
- if (!file.exists()) {
- try {
- long resourceDownloadStartTime = System.currentTimeMillis();
- // TODO 资源进行下载
- storageOperate.download(resourceAbsolutePathInStorage, resourceAbsolutePathInLocal, true);
- log.debug("Download resource file {} under: {} successfully", resourceAbsolutePathInStorage,
- resourceAbsolutePathInLocal);
- FileUtils.setFileTo755(file);
- WorkerServerMetrics
- .recordWorkerResourceDownloadTime(System.currentTimeMillis() - resourceDownloadStartTime);
- WorkerServerMetrics
- .recordWorkerResourceDownloadSize(Files.size(Paths.get(resourceAbsolutePathInLocal)));
- WorkerServerMetrics.incWorkerResourceDownloadSuccessCount();
- } catch (Exception ex) {
- WorkerServerMetrics.incWorkerResourceDownloadFailureCount();
- throw new TaskException(
- String.format("Download resource file: %s error", resourceAbsolutePathInStorage), ex);
- }
- }
- // TODO 封装resourceContext
- ResourceContext.ResourceItem resourceItem = ResourceContext.ResourceItem.builder()
- .resourceAbsolutePathInStorage(resourceAbsolutePathInStorage)
- .resourceRelativePath(resourceRelativePath)
- .resourceAbsolutePathInLocal(resourceAbsolutePathInLocal)
- .build();
- resourceContext.addResourceItem(resourceItem);
- }
- return resourceContext;
- }
复制代码 5、下载上游文件(上鄙俚文件的传递)
示例如下 :

upTask :

downTask :

焦点逻辑 : 上鄙俚文件传递实在也很简朴,就是针对本节点来说就是在本地生成对应的文件,然后上传到比如说HDFS范例的资源中心,然后鄙俚节点会跟进上游taskName.输出变量进行指定资源中心文件的下载。
downTask中的downloadUpstreamFiles逻辑:
org.apache.dolphinscheduler.server.worker.utils.TaskFilesTransferUtils#downloadUpstreamFiles- public static void downloadUpstreamFiles(TaskExecutionContext taskExecutionContext, StorageOperate storageOperate) {
- // TODO 上游传递过来的变量池
- List<Property> varPools = getVarPools(taskExecutionContext);
- // get map of varPools for quick search
- Map<String, Property> varPoolsMap = varPools.stream().collect(Collectors.toMap(Property::getProp, x -> x));
- // get "IN FILE" parameters
- // TODO 其实就是看localParams的参数有没有为IN的FILE的本地参数
- List<Property> localParamsProperty = getFileLocalParams(taskExecutionContext, Direct.IN);
- // TODO 一般情况下,就是这里就结束了
- if (localParamsProperty.isEmpty()) {
- return;
- }
- String executePath = taskExecutionContext.getExecutePath();
- // data path to download packaged data
- // TODO 下载的临时目录
- String downloadTmpPath = String.format("%s/%s", executePath, DOWNLOAD_TMP);
- log.info("Download upstream files...");
- for (Property property : localParamsProperty) {
- // TODO 这里其实就是获取
- /**
- * varPoolsMap 如下 :
- * {"prop":"upTask.file-data","direct":"IN","type":"FILE","value":"DATA_TRANSFER/20240624/13978983404960/2_1893/upTask_1320_text.txt"}
- * {"prop":"upTask.dir-data","direct":"IN","type":"FILE","value":"DATA_TRANSFER/20240624/13978983404960/2_1893/upTask_1320_data_ds_pack.zip"}
- *
- * localParamsProperty 如下 :
- * {"prop":"input_dir","direct":"IN","type":"FILE","value":"upTask.dir-data"}
- */
- // TODO 所以这里是不为null的
- Property inVarPool = varPoolsMap.get(property.getValue());
- if (inVarPool == null) {
- log.error("{} not in {}", property.getValue(), varPoolsMap.keySet());
- throw new TaskException(String.format("Can not find upstream file using %s, please check the key",
- property.getValue()));
- }
- String resourcePath = inVarPool.getValue();
- // TODO 其实就是在封装本地的路径
- // TODO 这里注意啊,比如说脚本中 cat input_dir/test1/text.txt,input_dir这个东西是下载路径拼接上的
- String targetPath = String.format("%s/%s", executePath, property.getProp());
- String downloadPath;
- // If the data is packaged, download it to a special directory (DOWNLOAD_TMP) and unpack it to the
- // targetPath
- // TODO 判断是否是zip压缩
- boolean isPack = resourcePath.endsWith(PACK_SUFFIX);
- if (isPack) {
- downloadPath = String.format("%s/%s", downloadTmpPath, new File(resourcePath).getName());
- } else {
- downloadPath = targetPath;
- }
- try {
- // TODO 资源中心路径
- String resourceWholePath =
- storageOperate.getResourceFullName(taskExecutionContext.getTenantCode(), resourcePath);
- log.info("{} --- Remote:{} to Local:{}", property, resourceWholePath, downloadPath);
- // TODO 系在到本地
- storageOperate.download(resourceWholePath, downloadPath, true);
- } catch (IOException ex) {
- throw new TaskException("Download file from storage error", ex);
- }
- // unpack if the data is packaged
- if (isPack) {
- File downloadFile = new File(downloadPath);
- log.info("Unpack {} to {}", downloadPath, targetPath);
- // TODO 如果是zip就是将本地临时目录下的压缩文件解压到目标路径下
- ZipUtil.unpack(downloadFile, new File(targetPath));
- }
- }
- // delete DownloadTmp Folder if DownloadTmpPath exists
- try {
- // TODO 临时目录下文件删除掉
- org.apache.commons.io.FileUtils.deleteDirectory(new File(downloadTmpPath));
- } catch (IOException e) {
- log.error("Delete DownloadTmpPath {} failed, this will not affect the task status", downloadTmpPath, e);
- }
- }
复制代码 6、创建任务并初始化
实在就是步骤3中,创建完毕TaskChannel,然后调用createTask,返回AbstractTask,然后调用init方法- ......
- // TODO 创建任务
- task = taskChannel.createTask(taskExecutionContext);
- log.info("Task plugin instance: {} create successfully", taskExecutionContext.getTaskType());
- // todo: remove the init method, this should initialize in constructor method
- // TODO 任务进行初始化
- task.init();
- log.info("Success initialized task plugin instance successfully");
- ......
复制代码 7、给AbstractParameters设置变量池- // TODO 给任务设置变量池
- // TODO 一般情况下 taskExecutionContext.getVarPool()这里就为null
- task.getParameters().setVarPool(taskExecutionContext.getVarPool());
- log.info("Set taskVarPool: {} successfully", taskExecutionContext.getVarPool());
复制代码 注意: 默认环境下,这个taskExecutionContext.getVarPool()是空的,除非上游有OUT变量。
- // TODO 回调函数,这个还是很关键的把workerMessageSender、taskExecutionContext以构造函数放到了TaskCallBack中
- // TODO 所以taskExecutionContext里面是有之前的内容的
- TaskCallBack taskCallBack = TaskCallbackImpl.builder()
- .workerMessageSender(workerMessageSender)
- .taskExecutionContext(taskExecutionContext)
- .build();
- .......
- // TODO 执行
- executeTask(taskCallBack);
复制代码 executeTask(taskCallBack):是焦点代码,封装了Worker任务的真正的执行逻辑,参数传递的TaskCallBack,用于任务状态的回报(向Master)
下面就来细说executeTask(taskCallBack)的逻辑 :- public void executeTask(TaskCallBack taskCallBack) throws TaskException {
- if (task == null) {
- throw new IllegalArgumentException("The task plugin instance is not initialized");
- }
- // TODO 这里会进行真正的任务处理
- task.handle(taskCallBack);
- }
复制代码 其中的task实在就是AbstractTask,在beforeExecute中 taskChannel.createTask。是Task抽象父类(以ShellTask为例展开说明,其他任务范例类似)
org.apache.dolphinscheduler.plugin.task.shell.ShellTask#handle- public void handle(TaskCallBack taskCallBack) throws TaskException {
- try {
- IShellInterceptorBuilder<?, ?> shellActuatorBuilder = ShellInterceptorBuilderFactory.newBuilder()
- .properties(ParameterUtils.convert(taskExecutionContext.getPrepareParamsMap())) // TODO 这里就是要进行变量的替换
- .appendScript(shellParameters.getRawScript());
- // TODO shell执行
- TaskResponse commandExecuteResult = shellCommandExecutor.run(shellActuatorBuilder, taskCallBack);
- // TODO 执行结果,退出状态码
- setExitStatusCode(commandExecuteResult.getExitStatusCode());
- // TODO 设置进程ID
- setProcessId(commandExecuteResult.getProcessId());
- // TODO shellCommandExecutor.getTaskOutputParams()这返回的是 output -> 123
- shellParameters.dealOutParam(shellCommandExecutor.getTaskOutputParams());
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- log.error("The current Shell task has been interrupted", e);
- setExitStatusCode(EXIT_CODE_FAILURE);
- throw new TaskException("The current Shell task has been interrupted", e);
- } catch (Exception e) {
- log.error("shell task error", e);
- setExitStatusCode(EXIT_CODE_FAILURE);
- throw new TaskException("Execute shell task error", e);
- }
- }
复制代码 org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory
默认走的是 BashShellInterceptorBuilder- public class ShellInterceptorBuilderFactory {
- private final static String INTERCEPTOR_TYPE = PropertyUtils.getString("shell.interceptor.type", "bash");
- @SuppressWarnings("unchecked")
- public static IShellInterceptorBuilder newBuilder() {
- // TODO 默认的走的是这个逻辑
- if (INTERCEPTOR_TYPE.equalsIgnoreCase("bash")) {
- return new BashShellInterceptorBuilder();
- }
- if (INTERCEPTOR_TYPE.equalsIgnoreCase("sh")) {
- return new ShShellInterceptorBuilder();
- }
- if (INTERCEPTOR_TYPE.equalsIgnoreCase("cmd")) {
- return new CmdShellInterceptorBuilder();
- }
- throw new IllegalArgumentException("not support shell type: " + INTERCEPTOR_TYPE);
- }
- }
复制代码- .properties(ParameterUtils.convert(taskExecutionContext.getPrepareParamsMap()))
复制代码 是向BaseShellInterceptorBuilder的propertyMap中进行taskExecutionContext.getPrepareParamsMap()参数的设置(注意 : taskExecutionContext.getPrepareParamsMap()是在Master中进行的封装。- .appendScript(shellParameters.getRawScript())
复制代码 是向BaseShellInterceptorBuilder的scripts进行设置值。
org.apache.dolphinscheduler.plugin.task.api.AbstractCommandExecutor#run- public TaskResponse run(IShellInterceptorBuilder iShellInterceptorBuilder,
- TaskCallBack taskCallBack) throws Exception {
- TaskResponse result = new TaskResponse();
- // todo: we need to use state like JDK Thread to make sure the killed task should not be executed
- iShellInterceptorBuilder = iShellInterceptorBuilder
- // TODO 设置执行路径
- .shellDirectory(taskRequest.getExecutePath())
- // TODO 这里设置shell 名字
- .shellName(taskRequest.getTaskAppId());
-
- // Set system env
- // TODO 在这里是设置默认的,比如说可以设置为 /etc/profile
- if (CollectionUtils.isNotEmpty(ShellUtils.ENV_SOURCE_LIST)) {
- // TODO 这里其实就是向 systemEnvs 中加入ENV_SOURCE_LIST中配置的环境文件的列表
- ShellUtils.ENV_SOURCE_LIST.forEach(iShellInterceptorBuilder::appendSystemEnv);
- }
- // Set custom env
- // TODO 设置自定义的env
- if (StringUtils.isNotBlank(taskRequest.getEnvironmentConfig())) {
- // TODO 向 customEnvScripts 中加入
- iShellInterceptorBuilder.appendCustomEnvScript(taskRequest.getEnvironmentConfig());
- }
- // Set k8s config (This is only work in Linux)
- if (taskRequest.getK8sTaskExecutionContext() != null) {
- iShellInterceptorBuilder.k8sConfigYaml(taskRequest.getK8sTaskExecutionContext().getConfigYaml());
- }
- // Set sudo (This is only work in Linux)
- // TODO 设置sudo为true的模式
- iShellInterceptorBuilder.sudoMode(OSUtils.isSudoEnable());
- // Set tenant (This is only work in Linux)
- // TODO 设置租户
- iShellInterceptorBuilder.runUser(taskRequest.getTenantCode());
- // Set CPU Quota (This is only work in Linux)
- if (taskRequest.getCpuQuota() != null) {
- iShellInterceptorBuilder.cpuQuota(taskRequest.getCpuQuota());
- }
- // Set memory Quota (This is only work in Linux)
- if (taskRequest.getMemoryMax() != null) {
- iShellInterceptorBuilder.memoryQuota(taskRequest.getMemoryMax());
- }
- // TODO 这个是重点
- IShellInterceptor iShellInterceptor = iShellInterceptorBuilder.build();
- // TODO 使用ProcessBuilder进行执行,支持sudo模式,和bash模式
- process = iShellInterceptor.execute();
- // parse process output
- // TODO 这里解析到进程的输出
- parseProcessOutput(this.process);
- // collect pod log
- collectPodLogIfNeeded();
- int processId = getProcessId(this.process);
- result.setProcessId(processId);
- // cache processId
- taskRequest.setProcessId(processId);
- // print process id
- log.info("process start, process id is: {}", processId);
- // if timeout occurs, exit directly
- long remainTime = getRemainTime();
- // update pid before waiting for the run to finish
- if (null != taskCallBack) {
- // TODO 更新任务实例信息
- taskCallBack.updateTaskInstanceInfo(processId);
- }
- // waiting for the run to finish
- boolean status = this.process.waitFor(remainTime, TimeUnit.SECONDS);
- TaskExecutionStatus kubernetesStatus =
- ProcessUtils.getApplicationStatus(taskRequest.getK8sTaskExecutionContext(), taskRequest.getTaskAppId());
- if (taskOutputFuture != null) {
- try {
- // Wait the task log process finished.
- taskOutputFuture.get();
- } catch (ExecutionException e) {
- log.error("Handle task log error", e);
- }
- }
- if (podLogOutputFuture != null) {
- try {
- // Wait kubernetes pod log collection finished
- podLogOutputFuture.get();
- // delete pod after successful execution and log collection
- ProcessUtils.cancelApplication(taskRequest);
- } catch (ExecutionException e) {
- log.error("Handle pod log error", e);
- }
- }
- // if SHELL task exit
- if (status && kubernetesStatus.isSuccess()) {
- // SHELL task state
- result.setExitStatusCode(this.process.exitValue());
- } else {
- log.error("process has failure, the task timeout configuration value is:{}, ready to kill ...",
- taskRequest.getTaskTimeout());
- result.setExitStatusCode(EXIT_CODE_FAILURE);
- cancelApplication();
- }
- int exitCode = this.process.exitValue();
- String exitLogMessage = EXIT_CODE_KILL == exitCode ? "process has killed." : "process has exited.";
- log.info("{} execute path:{}, processId:{} ,exitStatusCode:{} ,processWaitForStatus:{} ,processExitValue:{}",
- exitLogMessage, taskRequest.getExecutePath(), processId, result.getExitStatusCode(), status, exitCode);
- return result;
- }
复制代码 设置默认的环境变量:- // Set system env
- // TODO 在这里是设置默认的,比如说可以设置为 /etc/profile
- if (CollectionUtils.isNotEmpty(ShellUtils.ENV_SOURCE_LIST)) {
- // TODO 这里其实就是向 systemEnvs 中加入ENV_SOURCE_LIST中配置的环境文件的列表 ShellUtils.ENV_SOURCE_LIST.forEach(iShellInterceptorBuilder::appendSystemEnv);
- }
复制代码 org.apache.dolphinscheduler.plugin.task.api.utils.ShellUtils- public List<String> ENV_SOURCE_LIST = Arrays.stream(
- Optional.ofNullable(PropertyUtils.getString("shell.env_source_list")).map(s -> s.split(","))
- .orElse(new String[0]))
- .map(String::trim)
- .filter(StringUtils::isNotBlank)
- .collect(Collectors.toList());
复制代码 读取的是 common.properties,这里可以设置默认的环境变量- # The default env list will be load by Shell task, e.g. /etc/profile,~/.bash_profile
- # 默认是空,比如说可以是
- shell.env_source_list=/etc/profile
复制代码- // TODO 这个是重点
- IShellInterceptor iShellInterceptor = iShellInterceptorBuilder.build();
复制代码 org.apache.dolphinscheduler.plugin.task.api.shell.bash.BashShellInterceptorBuilder#build- public BashShellInterceptor build() throws FileOperateException, IOException {
- // TODO 这里是生成shell脚本的核心点,写到指定目录下
- generateShellScript();
- // TODO 封装命令
- List<String> bootstrapCommand = generateBootstrapCommand();
- // TODO 实例化BashShellInterceptor
- return new BashShellInterceptor(bootstrapCommand, shellDirectory);
- }
复制代码 org.apache.dolphinscheduler.plugin.task.api.shell.BaseLinuxShellInterceptorBuilder#bootstrapCommandInSudoMode
注意 : 这个方法里面有两层含义,如果是资源限制走的是bootstrapCommandInResourceLimitMode,实在这里还蕴藏着一个大大的BUG(我只修改了ShellTask),针对其他范例的Shell封装的任务,比如说MR、Spark、Flink等等,如果走资源限制,这里就有问题,由于这些任务在页面上不能设置CPU和内存的Quota),否则走的是sudo -u 租户 -i /opt/xx.sh。- private List<String> bootstrapCommandInSudoMode() {
- // TODO 如果task.resource.limit.state为false,这里的逻辑不会走,也不会走CPU和内存的限制
- if (PropertyUtils.getBoolean(AbstractCommandExecutorConstants.TASK_RESOURCE_LIMIT_STATE, false)) {
- return bootstrapCommandInResourceLimitMode();
- }
- List<String> bootstrapCommand = new ArrayList<>();
- bootstrapCommand.add("sudo");
- if (StringUtils.isNotBlank(runUser)) {
- bootstrapCommand.add("-u");
- bootstrapCommand.add(runUser);
- }
- bootstrapCommand.add("-i");
- bootstrapCommand.add(shellAbsolutePath().toString());
- return bootstrapCommand;
- }
复制代码- // TODO 使用ProcessBuilder进行执行,支持sudo模式,和bash模式
- process = iShellInterceptor.execute();
复制代码 org.apache.dolphinscheduler.plugin.task.api.shell.BaseShellInterceptor#execute- public Process execute() throws IOException {
- // init process builder
- ProcessBuilder processBuilder = new ProcessBuilder();
- // setting up a working directory
- // TODO 设置工作路径,目的其实就是在执行脚本的时候,可以在该目录的位置来加载比如说jar包什么的
- processBuilder.directory(new File(workingDirectory));
- // merge error information to standard output stream
- processBuilder.redirectErrorStream(true);
- processBuilder.command(executeCommands);
- log.info("Executing shell command : {}", String.join(" ", executeCommands));
- return processBuilder.start();
- }
复制代码 实在就是使用 ProcessBuilder 进行任务的提交。
org.apache.dolphinscheduler.plugin.task.api.AbstractCommandExecutor#parseProcessOutput- // TODO 解析输出
- private void parseProcessOutput(Process process) {
- // todo: remove this this thread pool.
- ExecutorService getOutputLogService = ThreadUtils
- .newSingleDaemonScheduledExecutorService("ResolveOutputLog-thread-" + taskRequest.getTaskName());
- getOutputLogService.execute(() -> {
- TaskOutputParameterParser taskOutputParameterParser = new TaskOutputParameterParser();
- // TODO 这里正好的读取process.getInputStream()的输入
- try (BufferedReader inReader = new BufferedReader(new InputStreamReader(process.getInputStream()))) {
- // TODO 这里设置了任务的日志路径
- LogUtils.setTaskInstanceLogFullPathMDC(taskRequest.getLogPath());
- String line;
- while ((line = inReader.readLine()) != null) {
- // TODO 日志缓冲区
- logBuffer.add(line);
- // TODO 这里解析taskOutputParams,解析比如说 echo '${setValue(output=1)}'。其实就是字符串 ${setValue(output=1)}
- taskOutputParameterParser.appendParseLog(line);
- }
- processLogOutputIsSuccess = true;
- } catch (Exception e) {
- log.error("Parse var pool error", e);
- processLogOutputIsSuccess = true;
- } finally {
- // TODO 在这里的时候就将 taskInstanceLogFullPath 删除了
- LogUtils.removeTaskInstanceLogFullPathMDC();
- }
- taskOutputParams = taskOutputParameterParser.getTaskOutputParams();
- });
- getOutputLogService.shutdown();
- ExecutorService parseProcessOutputExecutorService = ThreadUtils
- .newSingleDaemonScheduledExecutorService("TaskInstanceLogOutput-thread-" + taskRequest.getTaskName());
- taskOutputFuture = parseProcessOutputExecutorService.submit(() -> {
- try {
- LogUtils.setTaskInstanceLogFullPathMDC(taskRequest.getLogPath());
- // TODO 对于非pod(k8s)的任务,其实就是processLogOutputIsSuccess这个标识,这个标识是在上面,就是任务运行完毕了
- while (logBuffer.size() > 1 || !processLogOutputIsSuccess || !podLogOutputIsFinished) {
- if (logBuffer.size() > 1) {
- logHandler.accept(logBuffer);
- logBuffer.clear();
- logBuffer.add(EMPTY_STRING);
- } else {
- // TODO 如果没有日志输出,默认等待1s
- Thread.sleep(TaskConstants.DEFAULT_LOG_FLUSH_INTERVAL);
- }
- }
- } catch (Exception e) {
- log.error("Output task log error", e);
- } finally {
- LogUtils.removeTaskInstanceLogFullPathMDC();
- }
- });
- parseProcessOutputExecutorService.shutdown();
- }
复制代码 解说里面焦点的两个逻辑:
- protected LinkedBlockingQueue<String> logBuffer;
- public AbstractCommandExecutor(Consumer<LinkedBlockingQueue<String>> logHandler,
- TaskExecutionContext taskRequest) {
- this.logHandler = logHandler;
- this.taskRequest = taskRequest;
- this.logBuffer = new LinkedBlockingQueue<>();
- this.logBuffer.add(EMPTY_STRING);
- if (this.taskRequest != null) {
- // set logBufferEnable=true if the task uses logHandler and logBuffer to buffer log messages
- this.taskRequest.setLogBufferEnable(true);
- }
- }
复制代码 通过 logBuffer 暂时存放日记,供parseProcessOutputExecutorService现成消费- 日志的生产端 :
- while ((line = inReader.readLine()) != null) {
- // TODO 日志缓冲区
- logBuffer.add(line);
- // TODO 这里解析taskOutputParams,解析比如说 echo '${setValue(output=1)}'。其实就是字符串 ${setValue(output=1)}
- taskOutputParameterParser.appendParseLog(line);
- }
- 日志的消费端 :
- this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskExecutionContext);
- public void logHandle(LinkedBlockingQueue<String> logs) {
- StringJoiner joiner = new StringJoiner("\n\t");
- while (!logs.isEmpty()) {
- joiner.add(logs.poll());
- }
- log.info(" -> {}", joiner);
- }
- while (logBuffer.size() > 1 || !processLogOutputIsSuccess || !podLogOutputIsFinished) {
- if (logBuffer.size() > 1) {
- logHandler.accept(logBuffer);
- logBuffer.clear();
- logBuffer.add(EMPTY_STRING);
- } else {
- // TODO 如果没有日志输出,默认等待1s
- Thread.sleep(TaskConstants.DEFAULT_LOG_FLUSH_INTERVAL);
- }
- }
复制代码 以是查看如果是Shell输出日记都是 -> 开头的,比如说- [INFO] 2024-06-24 09:35:44.678 +0800 - ->
- .
- ├── 1893_1321.sh
- └── input_dir
- ├── test1
- │ └── text.txt
- └── test2
- └── text.txt
-
- 3 directories, 3 files
- test1 message
- test2 message
复制代码- while ((line = inReader.readLine()) != null) {
- // TODO 日志缓冲区
- logBuffer.add(line);
- // TODO 这里解析taskOutputParams,解析比如说 echo '${setValue(output=1)}'。其实就是字符串 ${setValue(output=1)}
- taskOutputParameterParser.appendParseLog(line);
- }
复制代码 org.apache.dolphinscheduler.plugin.task.api.parser.TaskOutputParameterParser#appendParseLog- public void appendParseLog(String logLine) {
- if (logLine == null) {
- return;
- }
- // TODO 刚开始进来,是不会走这里的
- if (currentTaskOutputParam != null) {
- if (currentTaskOutputParam.size() > maxOneParameterRows
- || currentTaskOutputParamLength > maxOneParameterLength) {
- log.warn(
- "The output param expression '{}' is too long, the max rows is {}, max length is {}, will skip this param",
- String.join("\n", currentTaskOutputParam), maxOneParameterLength, maxOneParameterRows);
- currentTaskOutputParam = null;
- currentTaskOutputParamLength = 0;
- return;
- }
- // continue to parse the rest of line
- int i = logLine.indexOf(")}");
- if (i == -1) {
- // the end of var pool not found
- currentTaskOutputParam.add(logLine);
- currentTaskOutputParamLength += logLine.length();
- } else {
- // the end of var pool found
- currentTaskOutputParam.add(logLine.substring(0, i + 2));
- Pair<String, String> keyValue = parseOutputParam(String.join("\n", currentTaskOutputParam));
- if (keyValue.getKey() != null && keyValue.getValue() != null) {
- // TODO 解析完毕就放入到taskOutputParams中
- taskOutputParams.put(keyValue.getKey(), keyValue.getValue());
- }
- currentTaskOutputParam = null;
- currentTaskOutputParamLength = 0;
- // continue to parse the rest of line
- if (i + 2 != logLine.length()) {
- appendParseLog(logLine.substring(i + 2));
- }
- }
- return;
- }
- int indexOfVarPoolBegin = logLine.indexOf("${setValue(");
- if (indexOfVarPoolBegin == -1) {
- indexOfVarPoolBegin = logLine.indexOf("#{setValue(");
- }
- if (indexOfVarPoolBegin == -1) {
- return;
- }
- currentTaskOutputParam = new ArrayList<>();
- appendParseLog(logLine.substring(indexOfVarPoolBegin));
- }
复制代码 解析完毕就放入到taskOutputParams中
更新Pid(向Master汇报)- // update pid before waiting for the run to finish
- if (null != taskCallBack) {
- // TODO 更新任务实例信息
- taskCallBack.updateTaskInstanceInfo(processId);
- }
复制代码 超时判断- long remainTime = getRemainTime();
- private long getRemainTime() {
- long usedTime = (System.currentTimeMillis() - taskRequest.getStartTime()) / 1000;
- long remainTime = taskRequest.getTaskTimeout() - usedTime;
- if (remainTime < 0) {
- throw new RuntimeException("task execution time out");
- }
- return remainTime;
- }
- ......
- // waiting for the run to finish
- // TODO 这里其实就是一个超时等待,其实就是说如果不设置超时等待时间,无限等待
- boolean status = this.process.waitFor(remainTime, TimeUnit.SECONDS);
复制代码- // TODO 设置退出码
- // if SHELL task exit
- if (status && kubernetesStatus.isSuccess()) {
- // SHELL task state
- result.setExitStatusCode(this.process.exitValue());
- } else {
- log.error("process has failure, the task timeout configuration value is:{}, ready to kill ...",
- taskRequest.getTaskTimeout());
- result.setExitStatusCode(EXIT_CODE_FAILURE);
- cancelApplication();
- }
- int exitCode = this.process.exitValue();
- String exitLogMessage = EXIT_CODE_KILL == exitCode ? "process has killed." : "process has exited.";
- log.info("{} execute path:{}, processId:{} ,exitStatusCode:{} ,processWaitForStatus:{} ,processExitValue:{}",
- exitLogMessage, taskRequest.getExecutePath(), processId, result.getExitStatusCode(), status, exitCode);
复制代码- // TODO 执行结果,退出状态码
- setExitStatusCode(commandExecuteResult.getExitStatusCode());
- // TODO 设置进程ID
- setProcessId(commandExecuteResult.getProcessId());
- // TODO shellCommandExecutor.getTaskOutputParams()这返回的是比如说 output -> 123
- shellParameters.dealOutParam(shellCommandExecutor.getTaskOutputParams());
复制代码 org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters#dealOutParam- public void dealOutParam(Map<String, String> taskOutputParams) {
- // TODO 其实就是说如果localParams不存在,就算设置了输出也不管用
- if (CollectionUtils.isEmpty(localParams)) {
- return;
- }
- // TODO 这里其实就是过滤出来localParams为OUT的参数
- List<Property> outProperty = getOutProperty(localParams);
- if (CollectionUtils.isEmpty(outProperty)) {
- return;
- }
- // TODO 如果taskOutputParams为空,输出参数会放入到varPool中
- if (MapUtils.isEmpty(taskOutputParams)) {
- outProperty.forEach(this::addPropertyToValPool);
- return;
- }
- // TODO 这里其实就是想说,找到outProperty和taskOutputParams相同的key,然后把对应的value换成taskOutputParams中的value
- // TODO 最终放到变量池中
- for (Property info : outProperty) {
- String propValue = taskOutputParams.get(info.getProp());
- if (StringUtils.isNotEmpty(propValue)) {
- info.setValue(propValue);
- addPropertyToValPool(info);
- } else {
- log.warn("Cannot find the output parameter {} in the task output parameters", info.getProp());
- }
- }
- }
复制代码 这里实在就是想说,找到outProperty和taskOutputParams相同的key,然后把对应的value换成taskOutputParams中的value,期待向Master汇报存在TaskInstance的变量池中。
- protected void afterExecute() throws TaskException {
- if (task == null) {
- throw new TaskException("The current task instance is null");
- }
- // TODO 是否要发送告警,使用JDK动态代理 RPC通信调用alert模块AlertBootstrapService
- sendAlertIfNeeded();
- // TODO 发送结果
- sendTaskResult();
- WorkerTaskExecutorHolder.remove(taskExecutionContext.getTaskInstanceId());
- // TODO common.properties development.state=false,默认是false。如果设置true
- // TODO 就会开发模式,意味着Dolpinscheduler封装的脚本、jar包不清理
- log.info("Remove the current task execute context from worker cache");
- clearTaskExecPathIfNeeded();
- }
复制代码 发送效果- protected void sendTaskResult() {
- taskExecutionContext.setCurrentExecutionStatus(task.getExitStatus());
- taskExecutionContext.setProcessId(task.getProcessId());
- taskExecutionContext.setAppIds(task.getAppIds());
- // TODO 其实就是发送变量池,这里是变量池
- taskExecutionContext.setVarPool(JSONUtils.toJsonString(task.getParameters().getVarPool()));
- taskExecutionContext.setEndTime(System.currentTimeMillis());
- // upload out files and modify the "OUT FILE" property in VarPool
- // TODO 上传输出文件并修改输出文件到变量池中
- TaskFilesTransferUtils.uploadOutputFiles(taskExecutionContext, storageOperate);
- log.info("Upload output files: {} successfully",
- TaskFilesTransferUtils.getFileLocalParams(taskExecutionContext, Direct.OUT));
- // TODO 发送任务的结果
- workerMessageSender.sendMessageWithRetry(taskExecutionContext,
- ITaskInstanceExecutionEvent.TaskInstanceExecutionEventType.FINISH);
- log.info("Send task execute status: {} to master : {}", taskExecutionContext.getCurrentExecutionStatus().name(),
- taskExecutionContext.getWorkflowInstanceHost());
- }
复制代码Shell状态码小插曲
- [root@node opt]# vim test.sh
- [root@node opt]# sh test.sh
- me is journey
- [root@node opt]# echo $?
- 0
- [root@node opt]# vim test.sh
- [root@node opt]# sh test.sh
- test.sh: line 2: echo1: command not found
- [root@node opt]# echo $?
- 127
- [root@node opt]# vim test.sh
- [root@node opt]# sh test.sh
- me is 10.253.26.85
- Killed
- [root@node opt]# echo $?
- 137
复制代码 总结 : 实在就是想说SHELL任务正常的退出码为0,被kill掉的状态码为137。其他为非常。
任务状态码判断逻辑:- taskExecutionContext.setCurrentExecutionStatus(task.getExitStatus());
- org.apache.dolphinscheduler.plugin.task.api.AbstractTask#getExitStatus
- // 其实就是说如果状态码返回为0,任务为成功;状态码为137为KILL。其他状态为失败。而task.getExitStatus()状态是由executeTask中设置完成的
- public TaskExecutionStatus getExitStatus() {
- switch (getExitStatusCode()) {
- case TaskConstants.EXIT_CODE_SUCCESS:
- return TaskExecutionStatus.SUCCESS;
- case TaskConstants.EXIT_CODE_KILL:
- return TaskExecutionStatus.KILL;
- default:
- return TaskExecutionStatus.FAILURE;
- }
- }
复制代码 上传输出文件到资源中心:
org.apache.dolphinscheduler.server.worker.utils.TaskFilesTransferUtils#uploadOutputFiles- public static void uploadOutputFiles(TaskExecutionContext taskExecutionContext,
- StorageOperate storageOperate) throws TaskException {
- List<Property> varPools = getVarPools(taskExecutionContext);
- // get map of varPools for quick search
- Map<String, Property> varPoolsMap = varPools.stream().collect(Collectors.toMap(Property::getProp, x -> x));
- // get OUTPUT FILE parameters
- List<Property> localParamsProperty = getFileLocalParams(taskExecutionContext, Direct.OUT);
- if (localParamsProperty.isEmpty()) {
- return;
- }
- log.info("Upload output files ...");
- for (Property property : localParamsProperty) {
- // get local file path
- String path = String.format("%s/%s", taskExecutionContext.getExecutePath(), property.getValue());
- // TODO packIfDir 和 crc其实就是想说,如果是目录,就对目录进行打zip包,然后生成crc。如果是文件就对文件生成crc
- String srcPath = packIfDir(path);
- // get crc file path
- String srcCRCPath = srcPath + CRC_SUFFIX;
- try {
- FileUtils.writeContent2File(FileUtils.getFileChecksum(path), srcCRCPath);
- } catch (IOException ex) {
- throw new TaskException(ex.getMessage(), ex);
- }
- // get remote file path
- // TODO DATA_TRANSFER/DATE/ProcessDefineCode/ProcessDefineVersion_ProcessInstanceID/TaskName_TaskInstanceID_FileName
- String resourcePath = getResourcePath(taskExecutionContext, new File(srcPath).getName());
- String resourceCRCPath = resourcePath + CRC_SUFFIX;
- try {
- // upload file to storage
- // TODO 以hdfs来说
- // TODO hdfs跟路径/tenantCode/resources/DATA_TRANSFER/DATE/ProcessDefineCode/ProcessDefineVersion_ProcessInstanceID/TaskName_TaskInstanceID_FileName
- String resourceWholePath =
- storageOperate.getResourceFullName(taskExecutionContext.getTenantCode(), resourcePath);
- String resourceCRCWholePath =
- storageOperate.getResourceFullName(taskExecutionContext.getTenantCode(), resourceCRCPath);
- log.info("{} --- Local:{} to Remote:{}", property, srcPath, resourceWholePath);
- storageOperate.upload(taskExecutionContext.getTenantCode(), srcPath, resourceWholePath, false, true);
- log.info("{} --- Local:{} to Remote:{}", "CRC file", srcCRCPath, resourceCRCWholePath);
- storageOperate.upload(taskExecutionContext.getTenantCode(), srcCRCPath, resourceCRCWholePath, false,
- true);
- } catch (IOException ex) {
- throw new TaskException("Upload file to storage error", ex);
- }
- // update varPool
- Property oriProperty;
- // if the property is not in varPool, add it
- if (varPoolsMap.containsKey(property.getProp())) { // 理论上不会走到这个分支
- oriProperty = varPoolsMap.get(property.getProp());
- } else {
- oriProperty = new Property(property.getProp(), Direct.OUT, DataType.FILE, property.getValue());
- // TODO 添加到变量池中
- varPools.add(oriProperty);
- }
- // TODO 这里就设置了任务名称.property name
- oriProperty.setProp(String.format("%s.%s", taskExecutionContext.getTaskName(), oriProperty.getProp()));
- // TODO 这里很关键,其实就是把资源的相对路径放入到了变量池对应的value中
- oriProperty.setValue(resourcePath);
- }
- // TODO 这里是设置FILE的变量池
- taskExecutionContext.setVarPool(JSONUtils.toJsonString(varPools));
- }
复制代码 发送任务的效果 :- workerMessageSender.sendMessageWithRetry(taskExecutionContext,
- ITaskInstanceExecutionEvent.TaskInstanceExecutionEventType.FINISH);
复制代码 3、WorkerMessageSender组件作用
4、Kill任务逻辑
org.apache.dolphinscheduler.server.worker.runner.operator.TaskInstanceKillOperationFunction#operate- public TaskInstanceKillResponse operate(TaskInstanceKillRequest taskInstanceKillRequest) {
- log.info("Receive TaskInstanceKillRequest: {}", taskInstanceKillRequest);
- // TODO 任务实例
- int taskInstanceId = taskInstanceKillRequest.getTaskInstanceId();
- try {
- LogUtils.setTaskInstanceIdMDC(taskInstanceId);
- // TODO Worker任务执行器
- WorkerTaskExecutor workerTaskExecutor = WorkerTaskExecutorHolder.get(taskInstanceId);
- if (workerTaskExecutor == null) {
- log.error("Cannot find WorkerTaskExecutor for taskInstance: {}", taskInstanceId);
- return TaskInstanceKillResponse.fail("Cannot find WorkerTaskExecutor");
- }
- // TODO 任务执行上下文
- TaskExecutionContext taskExecutionContext = workerTaskExecutor.getTaskExecutionContext();
- LogUtils.setTaskInstanceLogFullPathMDC(taskExecutionContext.getLogPath());
- // TODO 这里会进行kill
- boolean result = doKill(taskExecutionContext);
- // TODO 使用 Process.destroy() 是 Java 中 Process 类的一个方法,用于销毁与该 Process 对象关联的子进程
- this.cancelApplication(workerTaskExecutor);
- int processId = taskExecutionContext.getProcessId();
- // TODO 这里其实想说的是,如果processId为0,直接把该任务的状态设置为KILL,然后在Worker上报信息的时候就会把KILL状态上报上去
- // TODO 一定要注意,当前情况不一定是真正的kill掉,只是让DS里面的状态是对的
- if (processId == 0) {
- workerManager.killTaskBeforeExecuteByInstanceId(taskInstanceId);
- taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.KILL);
- // todo: the task might be executed, but the processId is 0
- WorkerTaskExecutorHolder.remove(taskInstanceId);
- log.info("The task has not been executed and has been cancelled, task id:{}", taskInstanceId);
- return TaskInstanceKillResponse.success(taskExecutionContext);
- }
- // TODO 这个其实就是说明,我kill掉了。成功了。然后这个时候Worker其实会感知到任务被kill掉,在他的sendResult FINISH的时候上报
- // TODO 上去就可以了
- taskExecutionContext
- .setCurrentExecutionStatus(result ? TaskExecutionStatus.SUCCESS : TaskExecutionStatus.FAILURE);
- WorkerTaskExecutorHolder.remove(taskExecutionContext.getTaskInstanceId());
- // TODO 删除重试消息
- messageRetryRunner.removeRetryMessages(taskExecutionContext.getTaskInstanceId());
- return TaskInstanceKillResponse.success(taskExecutionContext);
- } finally {
- LogUtils.removeTaskInstanceIdMDC();
- LogUtils.removeTaskInstanceLogFullPathMDC();
- }
- }
复制代码 杀历程和yarn上的任务 :- // TODO 这里会进行kill
- boolean result = doKill(taskExecutionContext);
复制代码 org.apache.dolphinscheduler.server.worker.runner.operator.TaskInstanceKillOperationFunction#doKill- private boolean doKill(TaskExecutionContext taskExecutionContext) {
- // kill system process
- // TODO 杀死Shell关联的进程
- boolean processFlag = killProcess(taskExecutionContext.getTenantCode(), taskExecutionContext.getProcessId());
- // TODO kill yarn or k8s application
- try {
- ProcessUtils.cancelApplication(taskExecutionContext);
- } catch (TaskException e) {
- return false;
- }
- return processFlag;
- }
复制代码 org.apache.dolphinscheduler.server.worker.runner.operator.TaskInstanceKillOperationFunction#killProcess
杀历程和子历程: 注意,这里和官网有修改,如果有非常打印warn就好,由于有时候不能有权限杀死所有历程。- protected boolean killProcess(String tenantCode, Integer processId) {
- // todo: directly interrupt the process
- if (processId == null || processId.equals(0)) {
- return true;
- }
- try {
- String pidsStr = ProcessUtils.getPidsStr(processId);
- if (!Strings.isNullOrEmpty(pidsStr)) {
- String cmd = String.format("kill -9 %s", pidsStr);
- cmd = OSUtils.getSudoCmd(tenantCode, cmd);
- log.info("process id:{}, cmd:{}", processId, cmd);
- OSUtils.exeCmd(cmd);
- }
- } catch (Exception e) {
- log.warn("kill task error", e);
- }
- return true;
- }
复制代码 杀死yarn上的任务
org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils#cancelApplication- public static void cancelApplication(TaskExecutionContext taskExecutionContext) {
- try {
- // TODO k8s
- if (Objects.nonNull(taskExecutionContext.getK8sTaskExecutionContext())) {
- if (!TASK_TYPE_SET_K8S.contains(taskExecutionContext.getTaskType())) {
- // Set empty container name for Spark on K8S task
- applicationManagerMap.get(ResourceManagerType.KUBERNETES)
- .killApplication(new KubernetesApplicationManagerContext(
- taskExecutionContext.getK8sTaskExecutionContext(),
- taskExecutionContext.getTaskAppId(), ""));
- }
- } else {
- // TODO YARN
- String host = taskExecutionContext.getHost();
- String executePath = taskExecutionContext.getExecutePath();
- String tenantCode = taskExecutionContext.getTenantCode();
- List<String> appIds;
- // TODO 容错的走这个逻辑
- if (StringUtils.isNotEmpty(taskExecutionContext.getAppIds())) {
- // is failover
- appIds = Arrays.asList(taskExecutionContext.getAppIds().split(COMMA));
- } else {
- String logPath = taskExecutionContext.getLogPath();
- String appInfoPath = taskExecutionContext.getAppInfoPath();
- if (logPath == null || appInfoPath == null || executePath == null || tenantCode == null) {
- log.error(
- "Kill yarn job error, the input params is illegal, host: {}, logPath: {}, appInfoPath: {}, executePath: {}, tenantCode: {}",
- host, logPath, appInfoPath, executePath, tenantCode);
- throw new TaskException("Cancel application failed!");
- }
- log.info("Get appIds from worker {}, taskLogPath: {}", host, logPath);
- // TODO 这里就是正则解析log获取appIds
- appIds = LogUtils.getAppIds(logPath, appInfoPath,
- PropertyUtils.getString(APPID_COLLECT, DEFAULT_COLLECT_WAY));
- taskExecutionContext.setAppIds(String.join(TaskConstants.COMMA, appIds));
- }
- // TODO 如果这里说明appIds是不存在的
- if (CollectionUtils.isEmpty(appIds)) {
- log.info("The appId is empty");
- return;
- }
- ApplicationManager applicationManager = applicationManagerMap.get(ResourceManagerType.YARN);
- applicationManager.killApplication(new YarnApplicationManagerContext(executePath, tenantCode, appIds));
- }
- } catch (Exception e) {
- log.error("Cancel application failed.", e);
- }
- }
复制代码 task日记中使用正则表达式来解析appIds,这里默认走log,不走aop。- appIds = LogUtils.getAppIds(logPath, appInfoPath,
- PropertyUtils.getString(APPID_COLLECT, DEFAULT_COLLECT_WAY));
- public List<String> getAppIds(String logPath, String appInfoPath, String fetchWay) {
- if (!StringUtils.isEmpty(fetchWay) && fetchWay.equals("aop")) {
- log.info("Start finding appId in {}, fetch way: {} ", appInfoPath, fetchWay);
- // TODO 如果走aop拦截的写的日志文件中读取
- return getAppIdsFromAppInfoFile(appInfoPath);
- } else {
- log.info("Start finding appId in {}, fetch way: {} ", logPath, fetchWay);
- // TODO 从日志中进行正则匹配
- return getAppIdsFromLogFile(logPath);
- }
- }
复制代码 真正地来杀yarn上的任务- applicationManager.killApplication(new YarnApplicationManagerContext(executePath, tenantCode, appIds));
- org.apache.dolphinscheduler.plugin.task.api.am.YarnApplicationManager#killApplication
- public boolean killApplication(ApplicationManagerContext applicationManagerContext) throws TaskException {
- YarnApplicationManagerContext yarnApplicationManagerContext =
- (YarnApplicationManagerContext) applicationManagerContext;
- String executePath = yarnApplicationManagerContext.getExecutePath();
- String tenantCode = yarnApplicationManagerContext.getTenantCode();
- List<String> appIds = yarnApplicationManagerContext.getAppIds();
- try {
- String commandFile = String.format("%s/%s.kill", executePath, String.join(Constants.UNDERLINE, appIds));
- String cmd = getKerberosInitCommand() + "yarn application -kill " + String.join(Constants.SPACE, appIds);
- execYarnKillCommand(tenantCode, commandFile, cmd);
- } catch (Exception e) {
- log.warn("Kill yarn application {} failed", appIds, e);
- }
- return true;
- }
- execYarnKillCommand需要注意,因为使用 yarn application -kill。yarn命令可能没有。增加ENV_SOURCE_LIST
- private void execYarnKillCommand(String tenantCode, String commandFile,
- String cmd) throws Exception {
- StringBuilder sb = new StringBuilder();
- sb.append("#!/bin/sh\n");
- sb.append("BASEDIR=$(cd `dirname $0`; pwd)\n");
- sb.append("cd $BASEDIR\n");
- // TODO 在这里是设置默认的,比如说可以设置为 /etc/profile
- if (CollectionUtils.isNotEmpty(ShellUtils.ENV_SOURCE_LIST)) {
- // TODO 这里其实就是向 systemEnvs 中加入ENV_SOURCE_LIST中配置的环境文件的列表
- ShellUtils.ENV_SOURCE_LIST.forEach(env -> sb.append("source " + env + "\n"));
- }
- sb.append("\n\n");
- sb.append(cmd);
- File f = new File(commandFile);
- if (!f.exists()) {
- org.apache.commons.io.FileUtils.writeStringToFile(new File(commandFile), sb.toString(),
- StandardCharsets.UTF_8);
- }
- String runCmd = String.format("%s %s", Constants.SH, commandFile);
- runCmd = org.apache.dolphinscheduler.common.utils.OSUtils.getSudoCmd(tenantCode, runCmd);
- log.info("kill cmd:{}", runCmd);
- org.apache.dolphinscheduler.common.utils.OSUtils.exeCmd(runCmd);
- }
复制代码 总结 : 如果成功把任务kill掉了,WorkerTaskExecutor会感知到的,进而进行KILL状态的FINISH汇报。如果任务已经完成,如果PID=0,将任务上下文状态设置为KILL,线程池中移除该WorkerTaskExecutor线程,WorkerTaskExecutorHolder移除该缓存。
5、停息
- public class TaskInstancePauseOperationFunction
- implements
- ITaskInstanceOperationFunction<TaskInstancePauseRequest, TaskInstancePauseResponse> {
- @Override
- public TaskInstancePauseResponse operate(TaskInstancePauseRequest taskInstancePauseRequest) {
- try {
- LogUtils.setTaskInstanceIdMDC(taskInstancePauseRequest.getTaskInstanceId());
- log.info("Receive TaskInstancePauseRequest: {}", taskInstancePauseRequest);
- log.warn("TaskInstancePauseOperationFunction is not support for worker task yet!");
- return TaskInstancePauseResponse.success();
- } finally {
- LogUtils.removeTaskInstanceIdMDC();
- }
- }
- }
复制代码 划重点 :
实在停息来说对于Worker来说,什么也不做。也做不了,你想想真的都能让任务停息么?除非是引擎程序中有所控制,像MR、SPARK、FLINK这种是不能停息,停息的焦点逻辑是给流程实例发送一个通知,告诉流程实例我要进行流程的停息,让正在运行任务的下一个任务进行停息,当然比如说只有一个任务,任务停息不了,末了只能成功。还有一种环境就是比如说是末了一个任务,也停息不了。还有就是执行的很快,你停息的时候,正好程序要往下执行,而鄙俚已没有任务的环境。这种都是停息不了的。
6、更新流程实例host
这个属于容错,容错章节再详细说。
转载自Journey
原文链接:https://segmentfault.com/a/1190000044966573
本文由 白鲸开源 提供发布支持!
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |