实战分享:DolphinScheduler 中 Shell 使命环境变量最佳设置方式 ...

打印 上一主题 下一主题

主题 2150|帖子 2150|积分 6450

在使用 Apache DolphinScheduler 编排使命的过程中,Shell 类型使命是最常见的使命类型之一。然而,很多用户在实际使用中都会遇到一个看似简朴却常常引发问题的问题——环境变量怎么设置才有效?
如果你也曾经因为使命执行环境差别等、找不到命令路径、引用变量失败等问题而抓狂,那么这篇文章将为你拨开迷雾。本文将深入解析 DolphinScheduler 中 Shell 使命的环境变量设置机制,分享几种常见的设置方式、注意事项以及实战踩坑履历,帮助你高效、稳定地设置使命运行环境。
使命类型总结


  • SHELL使命类型:
    SHELL、JAVA、PYTHON、FLINK、MR、FLINK_STREAM、HIVECLI、SPARK、SEATUNNEL、DATAX、SQOOP、DATA_QUALICY、JUPYTER、MLFLOW、OPENMLDB、DVC、PYTORCH、KUBEFLOW、CHUNJUN、LINKIS
注意 : 所谓的SHELL使命类型,都是对SHELL使命类型举行的封装,说白了底层调用的就是Java ProcessBudiler封装的SHELL。

  • SQL使命类型(JDBC):
    SQL、PROCEDURE
注意 : SQL使命类型实在使用的就是各个DB驱动的JDBC举行的操作。

  • HTTP使命类型:
    HTTP、DINKY、PIGEON(WebSocket)
注意 : HTTP使命类型实在就是访问其OPEN API,举行HttpClient封装调用的操作。

  • 逻辑节点:
    SUB_PROCESS、DEPENDENT、CONDITIONS、SWITHC、DYNAMIC
注意 : 所谓的逻辑节点是虚拟使命,这类使命不会调治到Worker节点上去运行,只会在Master节点作为控制节点。

  • Client使命类型:
    EMR、K8S、DMS、DATA_FACTORY、SAGEMAKER、ZEPPELIN、DATASYNC、REMOTESHELL
注意 : 实在就是调用各个使命的开放的Client举利用命的封装。
Shell使命怎么设置环境变量呢?

因为可能涉及到一个组件的差别的版本的客户端,比如说Spark2、Spark3。还有就是针对差别集群的差别客户端,比如说集群1的Spark3客户端和集群2的Spark客户端。 像这样的需求,怎么在dolphinscheduler中举行设置呢?大概说有几种设置方式呢?
两种方式 : 1、通过task差别的环境变量 2、默认的环境变量
1. 通过task差别的环境变量

安全中心 -> 环境管理

使命中引用

默认的环境变量

common.properties
  1. # The default env list will be load by Shell task, e.g. /etc/profile,~/.bash_profile
  2. shell.env_source_list=/etc/profile
  3. # The interceptor type of Shell task, e.g. bash, sh, cmd
  4. shell.interceptor.type=bash
复制代码
org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory
  1. public class ShellInterceptorBuilderFactory {
  2.     private final static String INTERCEPTOR_TYPE = PropertyUtils.getString("shell.interceptor.type", "bash");
  3.     @SuppressWarnings("unchecked")
  4.     public static IShellInterceptorBuilder newBuilder() {
  5.         // TODO 默认的走的是这个逻辑
  6.         if (INTERCEPTOR_TYPE.equalsIgnoreCase("bash")) {
  7.             return new BashShellInterceptorBuilder();
  8.         }
  9.         if (INTERCEPTOR_TYPE.equalsIgnoreCase("sh")) {
  10.             return new ShShellInterceptorBuilder();
  11.         }
  12.         if (INTERCEPTOR_TYPE.equalsIgnoreCase("cmd")) {
  13.             return new CmdShellInterceptorBuilder();
  14.         }
  15.         throw new IllegalArgumentException("not support shell type: " + INTERCEPTOR_TYPE);
  16.     }
  17. }
复制代码
org.apache.dolphinscheduler.plugin.task.api.shell.bash.BashShellInterceptorBuilder
  1. public class BashShellInterceptorBuilder
  2.         extends
  3.             BaseLinuxShellInterceptorBuilder<BashShellInterceptorBuilder, BashShellInterceptor> {
  4.     @Override
  5.     public BashShellInterceptorBuilder newBuilder() {
  6.         return new BashShellInterceptorBuilder();
  7.     }
  8.     @Override
  9.     public BashShellInterceptor build() throws FileOperateException, IOException {
  10.         // TODO 这里是生成shell脚本的核心点
  11.         generateShellScript();
  12.         List<String> bootstrapCommand = generateBootstrapCommand();
  13.         // TODO 实例化BashShellInterceptor
  14.         return new BashShellInterceptor(bootstrapCommand, shellDirectory);
  15.     }
  16.     @Override
  17.     protected String shellInterpreter() {
  18.         return "bash";
  19.     }
  20.     @Override
  21.     protected String shellExtension() {
  22.         return ".sh";
  23.     }
  24.     @Override
  25.     protected String shellHeader() {
  26.         return "#!/bin/bash";
  27.     }
  28. }
复制代码
org.apache.dolphinscheduler.plugin.task.api.AbstractCommandExecutor#run
  1. public TaskResponse run(IShellInterceptorBuilder iShellInterceptorBuilder,
  2.                             TaskCallBack taskCallBack) throws Exception {
  3.         TaskResponse result = new TaskResponse();
  4.         int taskInstanceId = taskRequest.getTaskInstanceId();
  5.         // todo: we need to use state like JDK Thread to make sure the killed task should not be executed
  6.         iShellInterceptorBuilder = iShellInterceptorBuilder
  7.                 // TODO 设置执行路径
  8.                 .shellDirectory(taskRequest.getExecutePath())
  9.                 // TODO 这里设置shell 名字
  10.                 .shellName(taskRequest.getTaskAppId());
  11.         // Set system env
  12.         // TODO 在这里是设置默认的,其实也是可以设置为 /opt/dolphinscheduler/bin/env/dolphinscheduler_env.sh
  13.         if (CollectionUtils.isNotEmpty(ShellUtils.ENV_SOURCE_LIST)) {
  14.             // TODO 这里其实就是向 systemEnvs 中加入ENV_SOURCE_LIST中配置的环境文件的列表
  15.             ShellUtils.ENV_SOURCE_LIST.forEach(iShellInterceptorBuilder::appendSystemEnv);
  16.         }
  17.         // Set custom env
  18.         // TODO 设置自定义的env
  19.         if (StringUtils.isNotBlank(taskRequest.getEnvironmentConfig())) {
  20.             // TODO 向 customEnvScripts 中加入
  21.             iShellInterceptorBuilder.appendCustomEnvScript(taskRequest.getEnvironmentConfig());
  22.         }
  23.         // Set k8s config (This is only work in Linux)
  24.         if (taskRequest.getK8sTaskExecutionContext() != null) {
  25.             iShellInterceptorBuilder.k8sConfigYaml(taskRequest.getK8sTaskExecutionContext().getConfigYaml());
  26.         }
  27.         // Set sudo (This is only work in Linux)
  28.         // TODO 设置sudo为true的模式
  29.         iShellInterceptorBuilder.sudoMode(OSUtils.isSudoEnable());
  30.         // Set tenant (This is only work in Linux)
  31.         // TODO 设置租户
  32.         iShellInterceptorBuilder.runUser(taskRequest.getTenantCode());
  33.         // Set CPU Quota (This is only work in Linux)
  34.         if (taskRequest.getCpuQuota() != null) {
  35.             iShellInterceptorBuilder.cpuQuota(taskRequest.getCpuQuota());
  36.         }
  37.         // Set memory Quota (This is only work in Linux)
  38.         if (taskRequest.getMemoryMax() != null) {
  39.             iShellInterceptorBuilder.memoryQuota(taskRequest.getMemoryMax());
  40.         }
  41.         IShellInterceptor iShellInterceptor = iShellInterceptorBuilder.build();
  42.         // TODO 使用ProcessBuilder进行执行,支持sudo模式,和bash模式
  43.         process = iShellInterceptor.execute();
  44.         // parse process output
  45.         // TODO 这里解析到进程的输出
  46.         parseProcessOutput(this.process);
  47.         // collect pod log
  48.         collectPodLogIfNeeded();
  49.         int processId = getProcessId(this.process);
  50.         result.setProcessId(processId);
  51.         // cache processId
  52.         taskRequest.setProcessId(processId);
  53.         // print process id
  54.         log.info("process start, process id is: {}", processId);
  55.         // if timeout occurs, exit directly
  56.         long remainTime = getRemainTime();
  57.         // update pid before waiting for the run to finish
  58.         if (null != taskCallBack) {
  59.             // TODO 这里其实就是更新任务实例西悉尼
  60.             taskCallBack.updateTaskInstanceInfo(taskInstanceId);
  61.         }
  62.         // waiting for the run to finish
  63.         boolean status = this.process.waitFor(remainTime, TimeUnit.SECONDS);
  64.         TaskExecutionStatus kubernetesStatus =
  65.                 ProcessUtils.getApplicationStatus(taskRequest.getK8sTaskExecutionContext(), taskRequest.getTaskAppId());
  66.         if (taskOutputFuture != null) {
  67.             try {
  68.                 // Wait the task log process finished.
  69.                 taskOutputFuture.get();
  70.             } catch (ExecutionException e) {
  71.                 log.error("Handle task log error", e);
  72.             }
  73.         }
  74.         if (podLogOutputFuture != null) {
  75.             try {
  76.                 // Wait kubernetes pod log collection finished
  77.                 podLogOutputFuture.get();
  78.                 // delete pod after successful execution and log collection
  79.                 ProcessUtils.cancelApplication(taskRequest);
  80.             } catch (ExecutionException e) {
  81.                 log.error("Handle pod log error", e);
  82.             }
  83.         }
  84.         // if SHELL task exit
  85.         if (status && kubernetesStatus.isSuccess()) {
  86.             // SHELL task state
  87.             result.setExitStatusCode(this.process.exitValue());
  88.         } else {
  89.             log.error("process has failure, the task timeout configuration value is:{}, ready to kill ...",
  90.                     taskRequest.getTaskTimeout());
  91.             result.setExitStatusCode(EXIT_CODE_FAILURE);
  92.             cancelApplication();
  93.         }
  94.         int exitCode = this.process.exitValue();
  95.         String exitLogMessage = EXIT_CODE_KILL == exitCode ? "process has killed." : "process has exited.";
  96.         log.info("{} execute path:{}, processId:{} ,exitStatusCode:{} ,processWaitForStatus:{} ,processExitValue:{}",
  97.                 exitLogMessage, taskRequest.getExecutePath(), processId, result.getExitStatusCode(), status, exitCode);
  98.         return result;
  99.     }
复制代码
重点就是:
  1. // Set system env
  2.         // TODO 在这里是设置默认的,其实也是可以设置为 /opt/dolphinscheduler/bin/env/dolphinscheduler_env.sh
  3.         if (CollectionUtils.isNotEmpty(ShellUtils.ENV_SOURCE_LIST)) {
  4.             // TODO 这里其实就是向 systemEnvs 中加入ENV_SOURCE_LIST中配置的环境文件的列表
  5.             ShellUtils.ENV_SOURCE_LIST.forEach(iShellInterceptorBuilder::appendSystemEnv);
  6.         }
  7.         // Set custom env
  8.         // TODO 设置自定义的env
  9.         if (StringUtils.isNotBlank(taskRequest.getEnvironmentConfig())) {
  10.             // TODO 向 customEnvScripts 中加入
  11.             iShellInterceptorBuilder.appendCustomEnvScript(taskRequest.getEnvironmentConfig());
  12.         }
复制代码
实在就是说自定的环境变量是可以覆盖默认的环境变量的。
转载自Journey
原文链接:https://segmentfault.com/a/1190000044954252
本文由 白鲸开源 提供发布支持!

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

钜形不锈钢水箱

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表