f Oracle-纯干货 | Dolphinscheduler Master模块源码分析 - Powered by qidao123.com技术社区

纯干货 | Dolphinscheduler Master模块源码分析

打印 上一主题 下一主题

主题 2049|帖子 2049|积分 6147

此前我们曾用万字长文解释了Apache DolphinScheduler的Worker模块源码,今天,我们再来一起看看Master模块源码的原理。
Master Slot盘算


核心代码逻辑
org.apache.dolphinscheduler.server.master.registry.MasterSlotManager.SlotChangeListener#notify
  1. public void notify(Map<String, MasterHeartBeat> masterNodeInfo) {
  2.     List<Server> serverList = masterNodeInfo.values().stream()
  3.             // TODO 这里其实就是过滤掉buzy的master节点
  4.             .filter(heartBeat -> !heartBeat.getServerStatus().equals(ServerStatus.BUSY))
  5.             .map(this::convertHeartBeatToServer).collect(Collectors.toList());
  6.     // TODO 同步master节点
  7.     syncMasterNodes(serverList);
  8. }
复制代码
盘算 totalSlot和currentSlot
  1. private void syncMasterNodes(List<Server> masterNodes) {
  2.     slotLock.lock();
  3.     try {
  4.         this.masterPriorityQueue.clear();
  5.         // TODO 这里会把所有的master节点都放入到masterPriorityQueue中,比如说 [192.168.220.1:12345,192.168.220.2:12345]
  6.         this.masterPriorityQueue.putAll(masterNodes);
  7.         // TODO 就是获取到本地ip的在队列中的位置
  8.         int tempCurrentSlot = masterPriorityQueue.getIndex(masterConfig.getMasterAddress());
  9.         // TODO 所有节点数量
  10.         int tempTotalSlot = masterNodes.size();
  11.         // TODO 正常情况下不会小于0
  12.         if (tempCurrentSlot < 0) {
  13.             totalSlot = 0;
  14.             currentSlot = 0;
  15.             log.warn("Current master is not in active master list");
  16.         } else if (tempCurrentSlot != currentSlot || tempTotalSlot != totalSlot) {
  17.             // TODO 这里其实就是记录的是比如说一共有两个slot,我的slot是0或者1
  18.             totalSlot = tempTotalSlot;
  19.             currentSlot = tempCurrentSlot;
  20.             log.info("Update master nodes, total master size: {}, current slot: {}", totalSlot, currentSlot);
  21.         }
  22.     } finally {
  23.         slotLock.unlock();
  24.     }
  25. }
复制代码
this.masterPriorityQueue.putAll(masterNodes); 管帐算索引
  1. public void putAll(Collection<Server> serverList) {
  2.     for (Server server : serverList) {
  3.         this.queue.put(server);
  4.     }
  5.     // TODO 这里更新了hostIndexMap,存放的是 <host:port> -> 索引
  6.     refreshMasterList();
  7. }
  8. private void refreshMasterList() {
  9.     hostIndexMap.clear();
  10.     Iterator<Server> iterator = queue.iterator();
  11.     int index = 0;
  12.     while (iterator.hasNext()) {
  13.         Server server = iterator.next();
  14.         String addr = NetUtils.getAddr(server.getHost(), server.getPort());
  15.         hostIndexMap.put(addr, index);
  16.         index += 1;
  17.     }
  18. }
复制代码
Master消费Command生成流程实例


command终极的获取逻辑:
  1. 比如说两个Master节点 :
  2. masterCount=2 thisMasterSlot=0  master1
  3. masterCount=2 thisMasterSlot=1  master2
  4. command中的数据如下 :
  5. 1 master2
  6. 2 master1
  7. 3 master2
  8. 4 master1
  9. select *
  10.         from t_ds_command
  11.         where id % #{masterCount} = #{thisMasterSlot}
  12.         order by process_instance_priority, id asc
  13.             limit #{limit}
复制代码
有没有感到疑惑,就是如果一个master更新到的最新的,一个没有更新到,怎么办?
  1. 比如说,master1节点是这样的
  2. 1  master2
  3. 2  master1
  4. 3  master2
  5. 4  master1
  6. 比如说,master2节点是这样的,是不是发现master2节点都是他的,都可以拉取消费?那就导致重复消费,比如说1这个command
  7. 1 master1
  8. 2 master1
  9. 3 master1
  10. 4 master1
复制代码
org.apache.dolphinscheduler.service.process.ProcessServiceImpl#handleCommand
  1. @Transactional
  2. public @Nullable ProcessInstance handleCommand(String host,
  3.                                                    Command command) throws CronParseException, CodeGenerateException {
  4.     // TODO 创建流程实例
  5.     ProcessInstance processInstance = constructProcessInstance(command, host);
  6.     // cannot construct process instance, return null
  7.     if (processInstance == null) {
  8.         log.error("scan command, command parameter is error: {}", command);
  9.         commandService.moveToErrorCommand(command, "process instance is null");
  10.         return null;
  11.     }
  12.     processInstance.setCommandType(command.getCommandType());
  13.     processInstance.addHistoryCmd(command.getCommandType());
  14.     processInstance.setTestFlag(command.getTestFlag());
  15.     // if the processDefinition is serial
  16.     ProcessDefinition processDefinition = this.findProcessDefinition(processInstance.getProcessDefinitionCode(),
  17.             processInstance.getProcessDefinitionVersion());
  18.     // TODO 是否是串行执行
  19.     if (processDefinition.getExecutionType().typeIsSerial()) {
  20.         saveSerialProcess(processInstance, processDefinition);
  21.         if (processInstance.getState() != WorkflowExecutionStatus.RUNNING_EXECUTION) {
  22.             setSubProcessParam(processInstance);
  23.             triggerRelationService.saveProcessInstanceTrigger(command.getId(), processInstance.getId());
  24.             deleteCommandWithCheck(command.getId());
  25.             // todo: this is a bad design to return null here, whether trigger the task
  26.             return null;
  27.         }
  28.     } else {
  29.         // TODO 并行执行
  30.         processInstanceDao.upsertProcessInstance(processInstance);
  31.     }
  32.     // TODO 这里其实还会向triggerRelation表中插入一条数据,是流程实例和triggerCode的关系
  33.     triggerRelationService.saveProcessInstanceTrigger(command.getId(), processInstance.getId());
  34.     // TODO 设置子流程参数
  35.     setSubProcessParam(processInstance);
  36.     // TODO 删除command
  37.     deleteCommandWithCheck(command.getId());
  38.     return processInstance;
  39. }
复制代码
留意:这个方法是加@Transactional的,以是说创建流程实例和删除Command是在一个事物内里的,如果差别的Master消费到同一个Command。肯定会有一个删除Command失败,这时会抛出一个非常,这样就会让数据库举行回滚。
工作流启动流程


DAG切分 & 任务提交


Master事件状态流转


图连接 : Master事件状态流转
TaskEventService组件中的TaskEventDispatchThread(线程)和TaskEventHandlerThread(线程)解析

其实就是Master本身状态(DISPATCH)和Worker汇报上来的状态(RUNNING、UPDATE_PID、RESULT)都会放入到eventQueue,TaskEventDispatchThread(线程)会阻塞的方式举行获取,然后放入到对应的TaskExecuteRunnable中(留意 : 不执行的),只有通过TaskEventHandlerThread(线程)才会使用TaskExecuteThreadPool线程举行TaskExecuteRunnable的提交。
转载自Journey
原文链接:https://segmentfault.com/a/1190000044992842
本文由 白鲸开源 提供发布支持!

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
继续阅读请点击广告

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

雁过留声

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表