ToB企服应用市场:ToB评测及商务社交产业平台

标题: Dolphinscheduler DAG核心源码剖析 [打印本页]

作者: 熊熊出没    时间: 2024-12-5 10:36
标题: Dolphinscheduler DAG核心源码剖析
背景描述


注意 : 在 Dolphinscheduler 中,离线任务是有完备的声明周期的,比如说停止、暂停、暂停恢复、重跑等等,都是以DAG(有向无环图的形式举行任务组织)T+1离线任务的。
Dolphinscheduler DAG实现

org.apache.dolphinscheduler.common.graph.DAG
DAG三个重要的数据结构 :
  1. // 顶点信息
  2. private final Map<Node, NodeInfo> nodesMap;
  3. // 边关联信息,作用是记录顶点和边的关系,可以找到叶子节点,也可以获取下游节点
  4. private final Map<Node, Map<Node, EdgeInfo>> edgesMap;
  5. // 反向边关联信息,作用是可以快速找到入度为0的节点(起始节点),也可以获取上游节点
  6. private final Map<Node, Map<Node, EdgeInfo>> reverseEdgesMap;
复制代码
如下示例 :
  1. DAG<String, String, String> graph = new DAG<>();
  2. graph.addNode("A", "A");
  3. graph.addNode("B", "B");
  4. graph.addNode("C", "C");
  5. // 添加一个B -> C的边,当前A还飘着呢
  6. graph.addEdge("B", "C");
  7. // 如果添加A -> B,其实就是会从B开始一直到子节点,看有没有可连接的线到A,如果有,说明这个A -> B的边添加不得,因为会形成环,否则就可以添加
  8. graph.addEdge("A", "B");
复制代码
源码分析 :
org.apache.dolphinscheduler.common.graph.DAG#addEdge
  1. public boolean addEdge(Node fromNode, Node toNode, EdgeInfo edge, boolean createNode) {
  2.     lock.writeLock().lock();
  3.     try {
  4.         // TODO 是否可以添加该边
  5.         if (!isLegalAddEdge(fromNode, toNode, createNode)) {
  6.             log.error("serious error: add edge({} -> {}) is invalid, cause cycle!", fromNode, toNode);
  7.             return false;
  8.         }
  9.         // TODO 添加节点
  10.         addNodeIfAbsent(fromNode, null);
  11.         addNodeIfAbsent(toNode, null);
  12.         // TODO 添加边
  13.         addEdge(fromNode, toNode, edge, edgesMap);
  14.         addEdge(toNode, fromNode, edge, reverseEdgesMap);
  15.         return true;
  16.     } finally {
  17.         lock.writeLock().unlock();
  18.     }
  19. }
  20. private boolean isLegalAddEdge(Node fromNode, Node toNode, boolean createNode) {
  21.     // TODO 如果fromNode和toNode两个是同一个顶点,这个边是不能添加的
  22.     if (fromNode.equals(toNode)) {
  23.         log.error("edge fromNode({}) can't equals toNode({})", fromNode, toNode);
  24.         return false;
  25.     }
  26.     // TODO 这里其实就是想说,不是创建节点,也就是说要求fromNode和toNode是需要存在的顶点
  27.     if (!createNode) {
  28.         if (!containsNode(fromNode) || !containsNode(toNode)) {
  29.             log.error("edge fromNode({}) or toNode({}) is not in vertices map", fromNode, toNode);
  30.             return false;
  31.         }
  32.     }
  33.     // Whether an edge can be successfully added(fromNode -> toNode),need to determine whether the
  34.     // DAG has cycle!
  35.     // TODO 这里获取节点的数量
  36.     int verticesCount = getNodesCount();
  37.     Queue<Node> queue = new LinkedList<>();
  38.     // TODO 将toNode放入到queue中
  39.     queue.add(toNode);
  40.     // if DAG doesn't find fromNode, it's not has cycle!
  41.     // TODO 当queue不为空,这里肯定就不为空了
  42.     while (!queue.isEmpty() && (--verticesCount > 0)) {
  43.         // TODO 获取队列里面的元素
  44.         Node key = queue.poll();
  45.         for (Node subsequentNode : getSubsequentNodes(key)) {
  46.             // TODO 其实这里判断的是比如说A -> B 有连接的DAG图,传入的是节点B,看B节点的边是不是有A,如果有A说明已经有B -> A的关联了,
  47.             // TODO 就不能添加了。如果比如说B的下游节点,比如说 A -> B -> C,这样的话,B的下游节点就是C,C是需要放入queue中的
  48.             // TODO 核心思想其实就是要找到它要添加的目标节点的连线,是否有目标节点到源节点的连线存在(这样来判断是否存在环)
  49.             if (subsequentNode.equals(fromNode)) {
  50.                 return false;
  51.             }
  52.             queue.add(subsequentNode);
  53.         }
  54.     }
  55.     return true;
  56. }
复制代码
Dolphinscheduler DagHelper讲授

DAG类是一个底子通用的DAG工具类,而DagHelper是任务界说、任务界说直接的关系组装成DAG的一个业务工具类。
org.apache.dolphinscheduler.server.master.graph.WorkflowGraphFactory#createWorkflowGraph
  1. public IWorkflowGraph createWorkflowGraph(ProcessInstance workflowInstance) throws Exception {
  2.     // TODO 这里其实就是获取的流程实例对应的任务数和之间的关系
  3.     List<ProcessTaskRelation> processTaskRelations = processService.findRelationByCode(
  4.             workflowInstance.getProcessDefinitionCode(),
  5.             workflowInstance.getProcessDefinitionVersion());
  6.     // TODO 获取对应的任务定义log
  7.     List<TaskDefinitionLog> taskDefinitionLogs = taskDefinitionLogDao.queryTaskDefineLogList(processTaskRelations);
  8.     // TODO 获取TaskNode
  9.     List<TaskNode> taskNodeList = processService.transformTask(processTaskRelations, taskDefinitionLogs);
  10.     // generate process to get DAG info
  11.     // TODO 这里其实解析的是是否自己手动指定的启动节点列表,默认不会
  12.     List<Long> recoveryTaskNodeCodeList = getRecoveryTaskNodeCodeList(workflowInstance.getCommandParam());
  13.     // TODO 如果 默认startNodeNameList为空
  14.     List<Long> startNodeNameList = parseStartNodeName(workflowInstance.getCommandParam());
  15.     // TODO 构建ProcessDag对象实例
  16.     ProcessDag processDag = DagHelper.generateFlowDag(
  17.             taskNodeList,
  18.             startNodeNameList,
  19.             recoveryTaskNodeCodeList,
  20.             workflowInstance.getTaskDependType());
  21.     if (processDag == null) {
  22.         log.error("ProcessDag is null");
  23.         throw new IllegalArgumentException("Create WorkflowGraph failed, ProcessDag is null");
  24.     }
  25.     // TODO 生成DAG
  26.     DAG<Long, TaskNode, TaskNodeRelation> dagGraph = DagHelper.buildDagGraph(processDag);
  27.     log.debug("Build dag success, dag: {}", dagGraph);
  28.     // TODO 使用WorkflowGraph来封装任务节点列表和dagGraph
  29.     return new WorkflowGraph(taskNodeList, dagGraph);
  30. }
复制代码
org.apache.dolphinscheduler.service.utils.DagHelper#generateFlowDag
  1. public static ProcessDag generateFlowDag(
  2.                                              List<TaskNode> totalTaskNodeList,
  3.                                              List<Long> startNodeNameList,
  4.                                              List<Long> recoveryNodeCodeList,
  5.                                              TaskDependType depNodeType) throws Exception {
  6.     // TODO 其实就是拿到所有的节点
  7.     List<TaskNode> destTaskNodeList =
  8.             generateFlowNodeListByStartNode(
  9.                     totalTaskNodeList, startNodeNameList, recoveryNodeCodeList, depNodeType);
  10.     if (destTaskNodeList.isEmpty()) {
  11.         return null;
  12.     }
  13.     // TODO 获取任务节点之前的关系
  14.     List<TaskNodeRelation> taskNodeRelations = generateRelationListByFlowNodes(destTaskNodeList);
  15.     // TODO 其实就是实例化一个ProcessDag
  16.     ProcessDag processDag = new ProcessDag();
  17.     // TODO 设置DAG的边
  18.     processDag.setEdges(taskNodeRelations);
  19.     // TODO 设置DAG的顶点
  20.     processDag.setNodes(destTaskNodeList);
  21.     return processDag;
  22. }
复制代码
设置了destTaskNodeList和taskNodeRelations
org.apache.dolphinscheduler.service.utils.DagHelper#buildDagGraph
  1. public static DAG<Long, TaskNode, TaskNodeRelation> buildDagGraph(ProcessDag processDag) {
  2.     DAG<Long, TaskNode, TaskNodeRelation> dag = new DAG<>();
  3.     // TODO 添加顶点
  4.     if (CollectionUtils.isNotEmpty(processDag.getNodes())) {
  5.         for (TaskNode node : processDag.getNodes()) {
  6.             dag.addNode(node.getCode(), node);
  7.         }
  8.     }
  9.     // TODO 添加边
  10.     if (CollectionUtils.isNotEmpty(processDag.getEdges())) {
  11.         for (TaskNodeRelation edge : processDag.getEdges()) {
  12.             dag.addEdge(edge.getStartNode(), edge.getEndNode());
  13.         }
  14.     }
  15.     return dag;
  16. }
复制代码
转载自 Journey
原文链接:https://segmentfault.com/a/1190000045117764
本文由 白鲸开源 提供发布支持!

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4