Apache DolphinScheduler-1.3.9源码分析(二)

打印 上一主题 下一主题

主题 1809|帖子 1809|积分 5431

引言

随着大数据的发展,任务调度系统成为了数据处理处罚和管理中至关紧张的部分。Apache DolphinScheduler 是一款优秀的开源分布式工作流调度平台,在大数据场景中得到广泛应用。

在本文中,我们将对 Apache DolphinScheduler 1.3.9 版本的源码进行深入分析,主要分析一下Master和Worker的交互设计。
感兴趣的朋侪也可以回顾我们上一篇文章:Apache DolphinScheduler-1.3.9源码分析(一)
Worker配置文件
  1. # worker listener port
  2. worker.listen.port=1234
  3. # worker execute thread number to limit task instances in parallel
  4. # worker可并行的任务数限制
  5. worker.exec.threads=100
  6. # worker heartbeat interval, the unit is second
  7. # worker发送心跳间隔
  8. worker.heartbeat.interval=10
  9. # worker max cpuload avg, only higher than the system cpu load average, worker server can be dispatched tasks. default value -1: the number of cpu cores * 2
  10. # worker最大cpu平均负载,只有系统cpu平均负载低于该值,才能执行任务
  11. # 默认值为-1,则最大cpu平均负载=系统cpu核数 * 2
  12. worker.max.cpuload.avg=-1
  13. # worker reserved memory, only lower than system available memory, worker server can be dispatched tasks. default value 0.3, the unit is G
  14. # worker的预留内存,只有当系统可用内存大于等于该值,才能执行任务,单位为GB
  15. # 默认0.3G
  16. worker.reserved.memory=0.3
  17. # default worker groups separated by comma, like 'worker.groups=default,test'
  18. # 工作组名称,多个用,隔开
  19. worker.groups=default
复制代码
WorkerServer启动
  1. public void run() {
  2.     // init remoting server
  3.     NettyServerConfig serverConfig = new NettyServerConfig();
  4.     serverConfig.setListenPort(workerConfig.getListenPort());
  5.     this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
  6.     this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, new TaskExecuteProcessor());
  7.     this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, new TaskKillProcessor());
  8.     this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_ACK, new DBTaskAckProcessor());
  9.     this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_RESPONSE, new DBTaskResponseProcessor());
  10.     this.nettyRemotingServer.start();
  11.     // worker registry
  12.     try {
  13.         this.workerRegistry.registry();
  14.         this.workerRegistry.getZookeeperRegistryCenter().setStoppable(this);
  15.         Set<String> workerZkPaths = this.workerRegistry.getWorkerZkPaths();
  16.         this.workerRegistry.getZookeeperRegistryCenter().getRegisterOperator().handleDeadServer(workerZkPaths, ZKNodeType.WORKER, Constants.DELETE_ZK_OP);
  17.     } catch (Exception e) {
  18.         logger.error(e.getMessage(), e);
  19.         throw new RuntimeException(e);
  20.     }
  21.     // retry report task status
  22.     this.retryReportTaskStatusThread.start();
  23.     /**
  24.      * register hooks, which are called before the process exits
  25.      */
  26.     Runtime.getRuntime().addShutdownHook(new Thread(() -> {
  27.         if (Stopper.isRunning()) {
  28.             close("shutdownHook");
  29.         }
  30.     }));
  31. }
复制代码
注册四个Command:


  • TASK_EXECUTE_REQUEST:task执行哀求
  • TASK_KILL_REQUEST:task制止哀求
  • DB_TASK_ACK:Worker担当到Master的调度哀求,回应master
  • DB_TASK_RESPONSE:


  • 注册WorkerServer到Zookeeper,并发送心跳
  • 陈诉Task执行状态
RetryReportTaskStatusThread

这是一个兜底机制,主要负责定时轮询向Master汇报任务的状态,直到Master回复状态的ACK,避免任务状态丢失;
每隔5分钟,查抄一下responceCache中的ACK Cache和Response Cache是否为空,如果不为空则向Master发送ack_command和response command哀求。
  1. public void run() {
  2.     ResponceCache responceCache = ResponceCache.get();
  3.     while (Stopper.isRunning()){
  4.         // sleep 5 minutes
  5.         ThreadUtils.sleep(RETRY_REPORT_TASK_STATUS_INTERVAL);
  6.         try {
  7.             if (!responceCache.getAckCache().isEmpty()){
  8.                 Map<Integer,Command> ackCache =  responceCache.getAckCache();
  9.                 for (Map.Entry<Integer, Command> entry : ackCache.entrySet()){
  10.                     Integer taskInstanceId = entry.getKey();
  11.                     Command ackCommand = entry.getValue();
  12.                     taskCallbackService.sendAck(taskInstanceId,ackCommand);
  13.                 }
  14.             }
  15.             if (!responceCache.getResponseCache().isEmpty()){
  16.                 Map<Integer,Command> responseCache =  responceCache.getResponseCache();
  17.                 for (Map.Entry<Integer, Command> entry : responseCache.entrySet()){
  18.                     Integer taskInstanceId = entry.getKey();
  19.                     Command responseCommand = entry.getValue();
  20.                     taskCallbackService.sendResult(taskInstanceId,responseCommand);
  21.                 }
  22.             }
  23.         }catch (Exception e){
  24.             logger.warn("retry report task status error", e);
  25.         }
  26.     }
  27. }
复制代码
Master与Worker的交互设计

Apache DolphinScheduler Master和Worker模块是两个独立的JVM进程,可以部署在差别的服务器上,Master与Worker的通信都是通过Netty实现RPC交互的,一共用到7种处理处罚器。
模块处理处罚器作用mastermasterTaskResponseProcessor处理处罚TaskExecuteResponseCommand消息,将消息添加到TaskResponseService的任务相应队列中mastermasterTaskAckProcessor处理处罚TaskExecuteAckCommand消息,将消息添加到TaskResponseService的任务相应队列中mastermasterTaskKillResponseProcessor处理处罚TaskKillResponseCommand消息,并在日记中打印消息内容workerworkerTaskExecuteProcessor处理处罚TaskExecuteRequestCommand消息,并发送TaskExecuteAckCommand到master,提交任务执行workerworkerTaskKillProcessor处理处罚TaskKillRequestCommand消息,调用kill -9 pid杀死任务对应的进程,并向master发送TaskKillResponseCommand消息workerworkerDBTaskAckProcessor处理处罚DBTaskAckCommand消息,针对执行成功的任务,从ResponseCache中删除workerworkerDBTaskResponseProcessor处理处罚DBTaskResponseCommand消息,针对执行成功的任务,从ResponseCache中删除分发任务如何交互


master#TaskPriorityQueueConsumer

Master任务里有一个TaskPriorityQueueConsumer,会从TaskPriorityQueue里每次取3个Task分发给Worker执行,这里会创建TaskExecuteRequestCommand。
TaskPriorityQueueConsumer#run()

[code]@Overridepublic void run() {    List failedDispatchTasks = new ArrayList();    while (Stopper.isRunning()){        try {            // 每一批次分发任务数量,master.dispatch.task.num = 3            int fetchTaskNum = masterConfig.getMasterDispatchTaskNumber();            failedDispatchTasks.clear();            for(int i = 0; i < fetchTaskNum; i++){                if(taskPriorityQueue.size()

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

刘俊凯

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表