Apache DolphinScheduler使用图关系解决核心链路告警问题,减轻任务运维负 ...

打印 上一主题 下一主题

主题 1698|帖子 1698|积分 5094

转载自程序员小陶
Apache DolphinScheduler 在使用过程中,肯定会有任务出现失败的情况,那么问题来了:调理任务的告警是必要人为配置的,在生产环境中,面对海量的任务,如何找到重要的任务,并且在失败的时候,第一时间告警呢?
先思索一下。
先看思路

本文提供一个思路,接着往下看吧。
不卖关子了。
本质是路径查找,本文这里使用了图数据库,或者你也可以自己使用Java实现路径查找。
下面是必要实现的目的,看一组任务的关系,如下图所示,存在 A/B/C/D/E 五个任务,E 任务被配置为核心任务,当 B 任务报错时,检测到 B 和 E 之前存在路径,则必要电话告警。

以是在配置核心链路告警的时候,我们只必要配置叶子节点,在实际生产中,一般是应用层的任务,比如报表、标签、接口数据等任务。
清洗依赖数据

核心逻辑就是把所有工作流内部、跨工作流以及跨项目的依赖全部清洗出来,生成一张关系表。具体清洗逻辑,可以看:海豚调理监控:新增依赖缺失巡检,上游改动再也不用担心了!

最终生成了
t_ds_task_node_base_data 任务基础表,后续会用于 Nebula Graph,这个后面会讲。
t_ds_dag_task_relation_data_df 关系最终表,后续会用于 Nebula Graph,这个后面会讲。
t_ds_dag_task_relation_data_df 这个表结构如下:

关系导入图数据库

这里用的国产图数据库 Nebula Graph,固然你也可以自己使用 Java 实现路径查找。
为什么我们一定要引入图数据库呢?有下面几方面考虑:

  • 可以减轻调理体系Mysql的压力,把负责的路径计算放在图数据库内里。
  • 探索更多调理任务数据治理和运维的大概性,比如任务权重,影响分析等。

用到的组件是 Nebula Graph,最关键的函数是 find path 查询最短链路
① 用到的语法是:FIND SHORTEST PATH必要留意的是,留意查询步长,UPTO  {STEP|STEPS}:路径的最大跳数。默认值为5。
② 3.3.0 开始,子图支持了边的条件限制了,查询的时候只拿最新的一批关系。

  • 创建图空间
  1. CREATE SPACE s_schedule_job (partition_num = 225, replica_factor = 3, vid_type = FIXED_STRING(180)) COMMENT = "大数据平台调度系统任务的血缘关系";
复制代码

  • 创建边和点
  1. ## 任务标签
  2. DROP tag if exists t_task;
  3. CREATE tag if not exists t_task(  id string NULL COMMENT "project_code,dag_code,task_code,拼接,",  project_name string NULL COMMENT "project_name",  project_code string NULL COMMENT "project_code",  dag_name string NULL COMMENT "dag_name",  dag_code string NULL COMMENT "dag_code",  dag_version string NULL COMMENT "dag_version",  task_code string NULL COMMENT "task_code",  task_version string NULL COMMENT "task_version",  task_name string NULL COMMENT "task_name",  task_type string NULL COMMENT "task_type",  create_time string NULL COMMENT "时间戳") comment='调度任务节点';
  4. ## 调度任务关系
  5. drop edge if exists e_task;
  6. create edge if not exists e_task(  pre_project_name string NULL COMMENT "project_name",  pre_project_code string NULL COMMENT "project_code",  pre_dag_name string NULL COMMENT "dag_name",  pre_dag_code string NULL COMMENT "dag_code",  pre_dag_version string NULL COMMENT "dag_version",  pre_task_code string NULL COMMENT "task_code",  pre_task_version string NULL COMMENT "task_version",  pre_task_name string NULL COMMENT "task_name",  pre_task_type string NULL COMMENT "task_type",  post_project_name string NULL COMMENT "project_name",  post_project_code string NULL COMMENT "project_code",  post_dag_name string NULL COMMENT "dag_name",  post_dag_code string NULL COMMENT "dag_code",  post_dag_version string NULL COMMENT "dag_version",  post_task_code string NULL COMMENT "task_code",  post_task_version string NULL COMMENT "task_version",  post_task_name string NULL COMMENT "task_name",  post_task_type string NULL COMMENT "task_type",  create_time string NULL COMMENT "时间戳") comment='调度任务关系';
复制代码

  • 导入数据
  1. 同步点:
  2. {
  3.   spark: {
  4.     app: {
  5.       name: Nebula_Exchange_t_task
  6.     }
  7.     driver: {
  8.       cores: 2
  9.       maxResultSize: 5G
  10.     }
  11.   }
  12.   nebula: {
  13.     address:{
  14.       graph:["10.1.x.xx:9669","10.1.x.xx:9669","10.1.x.xx:9669","10.1.x.xx3:9669","10.1.x.xx:9669"]
  15.       meta:["10.1.x.xx:9559","10.1.x.xx:9559","10.1.x.xx:9559"]
  16.     }
  17.     user: root
  18.     pswd: "nebula密码"
  19.     space: s_schedule_job
  20.     connection {
  21.       timeout: 60000
  22.       retry: 3
  23.     }
  24.     execution {
  25.       retry: 3
  26.     }
  27.     error: {
  28.       max: 32
  29.       output: /tmp/errors/t_task
  30.     }
  31.     rate: {
  32.       limit: 1024
  33.       timeout: 10000
  34.     }
  35.   }
  36.   tags: [
  37.     {
  38.       name: t_task
  39.       type: {
  40.         source: mysql
  41.         sink: client
  42.       }
  43.       host:"调度系统MYSQL数据库IP"
  44.       port:3307
  45.       database:"调度系统MYSQL数据库"
  46.       table:"t_ds_task_node_base_data"
  47.       user:"调度系统MYSQL用户"
  48.       password:"调度系统MYSQL用户密码"
  49.       sentence:"SELECT concat(project_code,'_',dag_code,'_',task_code) as id,project_name,project_code,dag_name,dag_code,dag_version,task_code,task_version,task_name,task_type,create_time FROM t_ds_task_node_base_data"
  50.       fields: [project_name,project_code,dag_name,dag_code,dag_version,task_code,task_version,task_name,task_type,create_time]
  51.       nebula.fields: [project_name,project_code,dag_name,dag_code,dag_version,task_code,task_version,task_name,task_type,create_time]
  52.       vertex:{
  53.         field:id
  54.       }
  55.       batch: 256
  56.       partition: 32
  57.     }
  58.   ]
  59. }
复制代码
同步边:
  1. {
  2.   spark: {
  3.     app: {
  4.       name: Nebula_Exchange_e_task
  5.     }
  6.     driver: {
  7.       cores: 2
  8.       maxResultSize: 5G
  9.     }
  10.   }
  11.   nebula: {
  12.     address:{
  13.       graph:["10.1.x.xx:9669","10.1.x.xx:9669","10.1.x.xx:9669","10.1.x.xx3:9669","10.1.x.xx:9669"]
  14.       meta:["10.1.x.xx:9559","10.1.x.xx:9559","10.1.x.xx:9559"]
  15.     }
  16.     user: root
  17.     pswd: "aD@VX2018#"
  18.     space: s_schedule_job
  19.     connection {
  20.       timeout: 60000
  21.       retry: 3
  22.     }
  23.     execution {
  24.       retry: 3
  25.     }
  26.     error: {
  27.       max: 32
  28.       output: /tmp/errors/e_task
  29.     }
  30.     rate: {
  31.       limit: 1024
  32.       timeout: 10000
  33.     }
  34.   }
  35.   edges: [
  36.     {
  37.       name: e_task
  38.       type: {
  39.         source: mysql
  40.         sink: client
  41.       }
  42.       host:"调度系统MYSQL数据库IP"
  43.       port:3307
  44.       database:"调度系统MYSQL数据库"
  45.       table:"t_ds_task_node_base_data"
  46.       user:"调度系统MYSQL用户"
  47.       password:"调度系统MYSQL用户密码"
  48.       sentence:"SELECT concat(pre_project_code,'_',pre_dag_code,'_',pre_task_code) as from_id,concat(post_project_code,'_',post_dag_code,'_',post_task_code) as to_id,pre_project_name,pre_project_code,pre_dag_name,pre_dag_code,pre_dag_version,pre_task_code,pre_task_name,pre_task_type,pre_task_version,post_project_name,post_project_code,post_dag_name,post_dag_code,post_dag_version,post_task_code,post_task_name,post_task_type,post_task_version,create_time FROM t_ds_dag_task_relation_data_df"
  49.       fields: [pre_project_name,pre_project_code,pre_dag_name,pre_dag_code,pre_dag_version,pre_task_code,pre_task_name,pre_task_type,pre_task_version,post_project_name,post_project_code,post_dag_name,post_dag_code,post_dag_version,post_task_code,post_task_name,post_task_type,post_task_version,create_time]
  50.       nebula.fields: [pre_project_name,pre_project_code,pre_dag_name,pre_dag_code,pre_dag_version,pre_task_code,pre_task_name,pre_task_type,pre_task_version,post_project_name,post_project_code,post_dag_name,post_dag_code,post_dag_version,post_task_code,post_task_name,post_task_type,post_task_version,create_time]
  51.       source: {
  52.         field: from_id
  53.       }
  54.       target: {
  55.         field: to_id
  56.       }
  57.       batch: 256
  58.       partition: 225
  59.     }
  60.   ]
  61. }
复制代码
定时脚本: 使用 Nebula Graph 社区提供的 exchange 工具把数据从 mysql 导入 Nebula Graph。
  1. #!/bin/bash
  2. # 作业参数
  3. basepath='/opt/vcredit-graph-db/s_schedule_job/exchange'
  4. tmpdir='/tmp/nebula/s_schedule_job'
  5. mkdir -p $tmpdir
  6. sourcefile=${basepath}/${jobname}.conf
  7. targetfile=${tmpdir}/${jobname}_${vardate}.conf
  8. cat ${sourcefile} > ${targetfile}
  9. sed -i "s/vardate/${vardate}/g" ${targetfile}
  10. sed -i "s/varhivetable/${varhivetable}/g" ${targetfile}
  11. # 运行环境
  12. export JAVA_HOME=/usr/java/jdk1.8.0_181-cloudera
  13. spark_submit="/opt/spark-2.4.8-bin-hadoop2.7/bin/spark-submit"
  14. # 开始运行
  15. ${spark_submit} \
  16. --principal hive@VCREDIT.COM \
  17. --keytab /etc/security/hive.keytab \
  18. --master "local[*]" \
  19. --class com.vesoft.nebula.exchange.Exchange /opt/nebula/nebula-exchange_spark_2.4-3.0.0.jar  -c ${targetfile} -h
复制代码
Java 服务
  1. /**
  2. * 判断这个任务是否会影响核心任务
  3. * @param projectName
  4. * @param dagName
  5. * @param taskName
  6. * @return
  7. */
  8. @ApiOperation(value = "dolphinTaskIsOnCall", notes = "判断这个任务是否会影响核心任务,是 1 ,否 0")
  9. @ApiImplicitParams({
  10.         @ApiImplicitParam(name = "projectName", value = "T-1", required = false, dataType = "String", example = "BigData"),
  11.         @ApiImplicitParam(name = "dagName", value = "T-1", required = false, dataType = "String", example = "公共和自定义域(pub)_daily"),
  12.         @ApiImplicitParam(name = "taskName", value = "T-1", required = false, dataType = "String", example = "dwd_pub_screen_zxd_cust_df")
  13. })
  14. @GetMapping("/dolphinTaskIsOnCall")
  15. @ResponseBody
  16. public DataResult dolphinTaskIsOnCall(
  17.         @RequestParam(value = "projectName", required = true) String projectName,
  18.         @RequestParam(value = "dagName", required = true) String dagName,
  19.         @RequestParam(value = "taskName", required = true) String taskName) throws GraphDatabaseException, UnsupportedEncodingException {
  20.     HashMap<String,Object> res = dolphinService.dolphinTaskIsOnCall(projectName, dagName, taskName);
  21.     return DataResult.ok(res);
  22. }
复制代码
核心代码,在第 17 行:
  1. @Override
  2. public HashMap<String, Object> dolphinTaskIsOnCall(String projectName, String dagName, String taskName) throws GraphDatabaseException, UnsupportedEncodingException {
  3.     HashMap<String,Object> resMap = new HashMap<>();
  4.     // 查询该任务 codes
  5.     HashMap<String,Object> task = dolphinTaskInstanceMapper.getTaskCode(projectName,dagName,taskName);
  6.     if (task == null){
  7.         resMap.put("res","任务不存在!");
  8.         return resMap;
  9.     }
  10.     String fromCodes = task.get("project_code") + "_" + task.get("dag_code") + "_" + task.get("task_code");
  11.     // 查询核心任务 codes
  12.     List<HashMap<String,Object>> tasks = dolphinTaskInstanceMapper.getOnCallTasks();
  13.     // 查询最短链路
  14.     for (HashMap<String,Object> t : tasks){
  15.         String toCodes = t.get("project_code") + "_" + t.get("dag_code") + "_" + t.get("task_code");
  16.         // 查询Nebula
  17.         String NgSql = "FIND SHORTEST PATH with PROP FROM "" + fromCodes + "" TO "" + toCodes + "" OVER * WHERE e_task.create_time > '" + DateUtils.dayToString(DateUtils.getSomeDay(new Date(), -1)) + "' UPTO 100 STEPS  YIELD path AS p;";
  18.         int res = nebulaService.isOnCallTask("s_schedule_job",NgSql);
  19.         if (res > 0){
  20.             resMap.put("res",res);
  21.             return resMap;
  22.         }
  23.     }
  24.     resMap.put("res",0);
  25.     return resMap;
  26. }
复制代码
返回值阐明:
① 影响核心任务,必要打电话
② 不影响核心任务,不必要打电话
③ 任务不存在,忽略
④ code 不等于 0 ,接口异常,忽略。
封装好接口之后,任务失败的程序调这个接口,判断失败任务是否影响核心任务,假如影响就打电话。
钉钉告警样式:

电话告警,直接给对应负责人打电话。
至此,我们淘汰了很多任务告警的配置工作,只必要关注核心的叶子节点是什么,也就是核心的应用任务是什么,大大提高了任务告警的配置服从!!!

  • 留意:清洗数据 和 导入图数据库,在天天的 23:30 分进行,一天初始化一次,确保破晓的任务关系是最新的,重要是用于破晓告警。
以上就使用图关系网络解决核心链路告警的全部内容,假如有任何疑问,都可以与我交流,希望可以帮到你,下次见。
原文链接:https://blog.csdn.net/qq_31975963/article/details/139839102
本文由 白鲸开源 提供发布支持!

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

万万哇

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表