一文读懂,硬核 Apache DolphinScheduler3.0 源码解析

打印 上一主题 下一主题

主题 660|帖子 660|积分 1980


点亮 ⭐️ Star · 照亮开源之路
https://github.com/apache/dolphinscheduler

本文目录


  • 1 DolphinScheduler的设计与策略
  • 1.1 分布式设计
  • 1.1.1 中心化
  • 1.1.2 去中心化
  • 1.2 DophinScheduler架构设计
  • 1.3 容错问题
  • 1.3.1 宕机容错
  • 1.3.2 失败重试
  • 1.4 远程日志访问
  • 2 DolphinScheduler源码分析
  • 2.1 工程模块介绍与配置文件
  • 2.1.1 工程模块介绍
  • 2.1.2 配置文件
  • 2.2 Api主要任务操作接口
  • 2.3 Quaterz架构与运行流程
  • 2.3.1 概念与架构
  • 2.3.2 初始化与执行流程
  • 2.3.3 集群运转
  • 2.4 Master启动与执行流程
  • 2.4.1 概念与执行逻辑
  • 2.4.2 集群与槽(slot)
  • 2.4.3 代码执行流程
  • 2.5 Work启动与执行流程
  • 2.5.1 概念与执行逻辑
  • 2.5.2 代码执行流程
  • 2.6 rpc交互
  • 2.6.1 Master与Worker交互
  • 2.6.2 其他服务与Master交互
  • 2.7 负载均衡算法
  • 2.7.1 加权随机
  • 2.7.2 线性负载
  • 2.7.3 平滑轮询
  • 2.8 日志服务
  • 2.9 报警
  • 3 后记
  • 3.1 Make friends
  • 3.2 参考文献
前言

研究Apache Dolphinscheduler也是机缘巧合,平时负责基于xxl-job二次开发出来的调度平台,因为遇到了并发性能瓶颈,到了不得不优化重构的地步,所以搜索市面上应用较广的调度平台以借鉴优化思路。
在阅读完DolphinScheduler代码之后,便生出了将其设计与思考记录下来的念头,这便是此篇文章的来源。因为没有正式生产使用,业务理解不一定透彻,理解可能有偏差,欢迎大家交流讨论。
1 DolphinScheduler的设计与策略

大家能关注DolphinScheduler那么一定对调度系统有了一定的了解,对于调度所涉及的到一些专有名词在这里就不做过多的介绍,重点介绍一下流程定义,流程实例,任务定义,任务实例。(没有作业这个概念确实也很新奇,可能是不想和Quartz的JobDetail重叠)。

  • 任务定义:各种类型的任务,是流程定义的关键组成,如sql,shell,spark,mr,python等;
  • 任务实例:任务的实例化,标识着具体的任务执行状态;
  • 流程定义:一组任务节点通过依赖关系建立的起来的有向无环图(DAG);
  • 流程实例:通过手动或者定时调度生成的流程实例;
  • 定时调度:系统采用Quartz 分布式调度器,并同时支持cron表达式可视化的生成;
1.1 分布式设计

分布式系统的架构设计基本分为中心化和去中心化两种,各有优劣,凭借各自的业务选择。
1.1.1 中心化

中心化设计比较简单,集群中的节点安装角色可以分为Master和Slave两种,如下图:


Master: Master的角色主要负责任务分发并监督Slave的健康状态,可以动态的将任务均衡到Slave上,以致Slave节点不至于“忙死”或”闲死”的状态。
中心化设计存在一些问题。
第一点,一旦Master出现了问题,则群龙无首,整个集群就会崩溃。
为了解决这个问题,大多数Master/Slave架构模式都采用了主备Master的设计方案,可以是热备或者冷备,也可以是自动切换或手动切换,而且越来越多的新系统都开始具备自动选举切换Master的能力,以提升系统的可用性。
第二点,如果Scheduler在Master上,虽然可以支持一个DAG中不同的任务运行在不同的机器上,但是会产生Master的过负载。如果Scheduler在Slave上,一个DAG中所有的任务都只能在某一台机器上进行作业提交,在并行任务比较多的时候,Slave的压力可能会比较大。
xxl-job就是采用这种设计方式,但是存在相应的问题。管理器(admin)宕机集群会崩溃,Scheduler在管理器上,管理器负责所有任务的校验和分发,管理器存在过载的风险,需要开发者想方案解决。
1.1.2 去中心化



在去中心化设计里,通常没有Master/Slave的概念,所有的角色都是一样的,地位是平等的,去中心化设计的核心设计在于整个分布式系统中不存在一个区别于其他节点的“管理者”,因此不存在单点故障问题。
但由于不存在“管理者”节点所以每个节点都需要跟其他节点通信才得到必须要的机器信息,而分布式系统通信的不可靠性,则大大增加了上述功能的实现难度。实际上,真正去中心化的分布式系统并不多见。
反而动态中心化分布式系统正在不断涌出。在这种架构下,集群中的管理者是被动态选择出来的,而不是预置的,并且集群在发生故障的时候,集群的节点会自发的举行会议来选举新的管理者去主持工作。
一般都是基于Raft算法实现的选举策略。Raft算法,目前社区也有相应的PR,还没合并。
DolphinScheduler的去中心化是Master/Worker注册到注册中心,实现Master集群和Worker集群无中心。
1.2 DophinScheduler架构设计

随手盗用一张官网的系统架构图,可以看到调度系统采用去中心化设计,由UI,API,MasterServer,Zookeeper,WorkServer,Alert等几部分组成。


API: API接口层,主要负责处理前端UI层的请求。该服务统一提供RESTful api向外部提供请求服务。接口包括工作流的创建、定义、查询、修改、发布、下线、手工启动、停止、暂停、恢复、从该节点开始执行等等。
MasterServer: MasterServer采用分布式无中心设计理念,MasterServer集成了Quartz,主要负责 DAG 任务切分、任务提交监控,并同时监听其它MasterServer和WorkerServer的健康状态。MasterServer服务启动时向Zookeeper注册临时节点,通过监听Zookeeper临时节点变化来进行容错处理。WorkServer:WorkerServer也采用分布式无中心设计理念,WorkerServer主要负责任务的执行和提供日志服务。WorkerServer服务启动时向Zookeeper注册临时节点,并维持心跳。
ZooKeeper: ZooKeeper服务,系统中的MasterServer和WorkerServer节点都通过ZooKeeper来进行集群管理和容错。另外系统还基于ZooKeeper进行事件监听和分布式锁。
**Alert:**提供告警相关接口,接口主要包括两种类型的告警数据的存储、查询和通知功能,支持丰富的告警插件自由拓展配置。
1.3 容错问题

容错分为服务宕机容错和任务重试,服务宕机容错又分为Master容错和Worker容错两种情况;
1.3.1 宕机容错

服务容错设计依赖于ZooKeeper的Watcher机制,实现原理如图:


其中Master监控其他Master和Worker的目录,如果监听到remove事件,则会根据具体的业务逻辑进行流程实例容错或者任务实例容错,容错流程图相对官方文档里面的流程图,人性化了些,大家可以参考一下,具体如下所示。

ZooKeeper Master容错完成之后则重新由DolphinScheduler中Scheduler线程调度,遍历 DAG 找到“正在运行”和“提交成功”的任务,对“正在运行”的任务监控其任务实例的状态,对“提交成功”的任务需要判断Task Queue中是否已经存在,如果存在则同样监控任务实例的状态,如果不存在则重新提交任务实例。

Master Scheduler线程一旦发现任务实例为” 需要容错”状态,则接管任务并进行重新提交。注意由于” 网络抖动”可能会使得节点短时间内失去和ZooKeeper的心跳,从而发生节点的remove事件。
对于这种情况,我们使用最简单的方式,那就是节点一旦和ZooKeeper发生超时连接,则直接将Master或Worker服务停掉。
1.3.2 失败重试

这里首先要区分任务失败重试、流程失败恢复、流程失败重跑的概念:

  • 任务失败重试是任务级别的,是调度系统自动进行的,比如一个Shell任务设置重试次数为3次,那么在Shell任务运行失败后会自己再最多尝试运行3次。
  • 流程失败恢复是流程级别的,是手动进行的,恢复是从只能从失败的节点开始执行或从当前节点开始执行。流程失败重跑也是流程级别的,是手动进行的,重跑是从开始节点进行。
接下来说正题,我们将工作流中的任务节点分了两种类型。

  • 一种是业务节点,这种节点都对应一个实际的脚本或者处理语句,比如Shell节点、MR节点、Spark节点、依赖节点等。
  • 还有一种是逻辑节点,这种节点不做实际的脚本或语句处理,只是整个流程流转的逻辑处理,比如子流程节等。
每一个业务节点都可以配置失败重试的次数,当该任务节点失败,会自动重试,直到成功或者超过配置的重试次数。逻辑节点不支持失败重试。但是逻辑节点里的任务支持重试。
如果工作流中有任务失败达到最大重试次数,工作流就会失败停止,失败的工作流可以手动进行重跑操作或者流程恢复操作。
1.4 远程日志访问

由于Web(UI)和Worker不一定在同一台机器上,所以查看日志不能像查询本地文件那样。
有两种方案:

  • 将日志放到ES搜索引擎上;
  • 通过netty通信获取远程日志信息;
介于考虑到尽可能的DolphinScheduler的轻量级性,所以选择了RPC实现远程访问日志信息,具体代码的实践见2.8章节。
2 DolphinScheduler源码分析

上一章的讲解可能初步看起来还不是很清晰,本章的主要目的是从代码层面一一介绍第一张讲解的功能。关于系统的安装在这里并不会涉及,安装运行请大家自行探索。
2.1 工程模块介绍与配置文件

2.1.1 工程模块介绍


  • dolphinscheduler-alert 告警模块,提供告警服务;
  • dolphinscheduler-api web应用模块,提供 Rest Api 服务,供 UI 进行调用;
  • dolphinscheduler-common 通用的常量枚举、工具类、数据结构或者基类 dolphinscheduler-dao 提供数据库访问等操作;
  • dolphinscheduler-remote 基于netty的客户端、服务端 ;
  • dolphinscheduler-server 日志与心跳服务 ;
  • dolphinscheduler-log-server LoggerServer 用于Rest Api通过RPC查看日志;
  • dolphinscheduler-master MasterServer服务,主要负责 DAG 的切分和任务状态的监控 ;
  • dolphinscheduler-worker WorkerServer服务,主要负责任务的提交、执行和任务状态的更新;
  • dolphinscheduler-service service模块,包含Quartz、Zookeeper、日志客户端访问服务,便于server模块和api模块调用 ;
  • dolphinscheduler-ui 前端模块;
2.1.2 配置文件

dolphinscheduler-common common.properties
  1. #本地工作目录,用于存放临时文件
  2. data.basedir.path=/tmp/dolphinscheduler
  3. #资源文件存储类型: HDFS,S3,NONE
  4. resource.storage.type=NONE
  5. #资源文件存储路径
  6. resource.upload.path=/dolphinscheduler
  7. #hadoop是否开启kerberos权限
  8. hadoop.security.authentication.startup.state=false
  9. #kerberos配置目录
  10. java.security.krb5.conf.path=/opt/krb5.conf
  11. #kerberos登录用户
  12. login.user.keytab.username=hdfs-mycluster@ESZ.COM
  13. #kerberos登录用户keytab
  14. login.user.keytab.path=/opt/hdfs.headless.keytab
  15. #kerberos过期时间,整数,单位为小时
  16. kerberos.expire.time=2
  17. #        如果存储类型为HDFS,需要配置拥有对应操作权限的用户
  18. hdfs.root.user=hdfs
  19. #请求地址如果resource.storage.type=S3,该值类似为: s3a://dolphinscheduler. 如果resource.storage.type=HDFS, 如果 hadoop 配置了 HA,需要复制core-site.xml 和 hdfs-site.xml 文件到conf目录
  20. fs.defaultFS=hdfs://mycluster:8020
  21. aws.access.key.id=minioadmin
  22. aws.secret.access.key=minioadmin
  23. aws.region=us-east-1
  24. aws.endpoint=http://localhost:9000
  25. # resourcemanager port, the default value is 8088 if not specified
  26. resource.manager.httpaddress.port=8088
  27. #yarn resourcemanager 地址, 如果resourcemanager开启了HA, 输入HA的IP地址(以逗号分隔),如果resourcemanager为单节点, 该值为空即可
  28. yarn.resourcemanager.ha.rm.ids=192.168.xx.xx,192.168.xx.xx
  29. #如果resourcemanager开启了HA或者没有使用resourcemanager,保持默认值即可. 如果resourcemanager为单节点,你需要将ds1 配置为resourcemanager对应的hostname
  30. yarn.application.status.address=http://ds1:%s/ws/v1/cluster/apps/%s
  31. # job history status url when application number threshold is reached(default 10000, maybe it was set to 1000)
  32. yarn.job.history.status.address=http://ds1:19888/ws/v1/history/mapreduce/jobs/%s
  33. # datasource encryption enable
  34. datasource.encryption.enable=false
  35. # datasource encryption salt
  36. datasource.encryption.salt=!@#$%^&*
  37. # data quality option
  38. data-quality.jar.name=dolphinscheduler-data-quality-dev-SNAPSHOT.jar
  39. #data-quality.error.output.path=/tmp/data-quality-error-data
  40. # Network IP gets priority, default inner outer
  41. # Whether hive SQL is executed in the same session
  42. support.hive.oneSession=false
  43. # use sudo or not, if set true, executing user is tenant user and deploy user needs sudo permissions; if set false, executing user is the deploy user and doesn't need sudo permissions
  44. sudo.enable=true
  45. # network interface preferred like eth0, default: empty
  46. #dolphin.scheduler.network.interface.preferred=
  47. # network IP gets priority, default: inner outer
  48. #dolphin.scheduler.network.priority.strategy=default
  49. # system env path
  50. #dolphinscheduler.env.path=dolphinscheduler_env.sh
  51. #是否处于开发模式
  52. development.state=false
  53. # rpc port
  54. alert.rpc.port=50052
  55. # Url endpoint for zeppelin RESTful API
  56. zeppelin.rest.url=http://localhost:8080
复制代码
dolphinscheduler-api application.yaml
  1. server:
  2.   port: 12345
  3.   servlet:
  4.     session:
  5.       timeout: 120m
  6.     context-path: /dolphinscheduler/
  7.   compression:
  8.     enabled: true
  9.     mime-types: text/html,text/xml,text/plain,text/css,text/javascript,application/javascript,application/json,application/xml
  10.   jetty:
  11.     max-http-form-post-size: 5000000
  12. spring:
  13.   application:
  14.     name: api-server
  15.   banner:
  16.     charset: UTF-8
  17.   jackson:
  18.     time-zone: UTC
  19.     date-format: "yyyy-MM-dd HH:mm:ss"
  20.   servlet:
  21.     multipart:
  22.       max-file-size: 1024MB
  23.       max-request-size: 1024MB
  24.   messages:
  25.     basename: i18n/messages
  26.   datasource:
  27. #    driver-class-name: org.postgresql.Driver
  28. #    url: jdbc:postgresql://127.0.0.1:5432/dolphinscheduler
  29.     driver-class-name: com.mysql.jdbc.Driver
  30.     url: jdbc:mysql://127.0.0.1:3306/dolphinscheduler?useUnicode=true&serverTimezone=UTC&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull
  31.     username: root
  32.     password: root
  33.     hikari:
  34.       connection-test-query: select 1
  35.       minimum-idle: 5
  36.       auto-commit: true
  37.       validation-timeout: 3000
  38.       pool-name: DolphinScheduler
  39.       maximum-pool-size: 50
  40.       connection-timeout: 30000
  41.       idle-timeout: 600000
  42.       leak-detection-threshold: 0
  43.       initialization-fail-timeout: 1
  44.   quartz:
  45.     auto-startup: false
  46.     job-store-type: jdbc
  47.     jdbc:
  48.       initialize-schema: never
  49.     properties:
  50.       org.quartz.threadPool:threadPriority: 5
  51.       org.quartz.jobStore.isClustered: true
  52.       org.quartz.jobStore.class: org.quartz.impl.jdbcjobstore.JobStoreTX
  53.       org.quartz.scheduler.instanceId: AUTO
  54.       org.quartz.jobStore.tablePrefix: QRTZ_
  55.       org.quartz.jobStore.acquireTriggersWithinLock: true
  56.       org.quartz.scheduler.instanceName: DolphinScheduler
  57.       org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
  58.       org.quartz.jobStore.useProperties: false
  59.       org.quartz.threadPool.makeThreadsDaemons: true
  60.       org.quartz.threadPool.threadCount: 25
  61.       org.quartz.jobStore.misfireThreshold: 60000
  62.       org.quartz.scheduler.makeSchedulerThreadDaemon: true
  63. #      org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.PostgreSQLDelegate
  64.       org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate
  65.       org.quartz.jobStore.clusterCheckinInterval: 5000
  66. management:
  67.   endpoints:
  68.     web:
  69.       exposure:
  70.         include: '*'
  71.   metrics:
  72.     tags:
  73.       application: ${spring.application.name}
  74. registry:
  75.   type: zookeeper
  76.   zookeeper:
  77.     namespace: dolphinscheduler
  78. #    connect-string: localhost:2181
  79.     connect-string: 10.255.158.70:2181
  80.     retry-policy:
  81.       base-sleep-time: 60ms
  82.       max-sleep: 300ms
  83.       max-retries: 5
  84.     session-timeout: 30s
  85.     connection-timeout: 9s
  86.     block-until-connected: 600ms
  87.     digest: ~
  88. audit:
  89.   enabled: false
  90. metrics:
  91.   enabled: true
  92. python-gateway:
  93.   # Weather enable python gateway server or not. The default value is true.
  94.   enabled: true
  95.   # The address of Python gateway server start. Set its value to `0.0.0.0` if your Python API run in different
  96.   # between Python gateway server. It could be be specific to other address like `127.0.0.1` or `localhost`
  97.   gateway-server-address: 0.0.0.0
  98.   # The port of Python gateway server start. Define which port you could connect to Python gateway server from
  99.   # Python API side.
  100.   gateway-server-port: 25333
  101.   # The address of Python callback client.
  102.   python-address: 127.0.0.1
  103.   # The port of Python callback client.
  104.   python-port: 25334
  105.   # Close connection of socket server if no other request accept after x milliseconds. Define value is (0 = infinite),
  106.   # and socket server would never close even though no requests accept
  107.   connect-timeout: 0
  108.   # Close each active connection of socket server if python program not active after x milliseconds. Define value is
  109.   # (0 = infinite), and socket server would never close even though no requests accept
  110.   read-timeout: 0
  111. # Override by profile
  112. ---
  113. spring:
  114.   config:
  115.     activate:
  116.       on-profile: mysql
  117.   datasource:
  118.     driver-class-name: com.mysql.jdbc.Driver
  119.     url: jdbc:mysql://127.0.0.1:3306/dolphinscheduler?useUnicode=true&characterEncoding=UTF-8
  120.   quartz:
  121.     properties:
  122.       org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate
复制代码

dolphinscheduler-master application.yaml
  1. spring:
  2.   banner:
  3.     charset: UTF-8
  4.   application:
  5.     name: master-server
  6.   jackson:
  7.     time-zone: UTC
  8.     date-format: "yyyy-MM-dd HH:mm:ss"
  9.   cache:
  10.     # default enable cache, you can disable by `type: none`
  11.     type: none
  12.     cache-names:
  13.       - tenant
  14.       - user
  15.       - processDefinition
  16.       - processTaskRelation
  17.       - taskDefinition
  18.     caffeine:
  19.       spec: maximumSize=100,expireAfterWrite=300s,recordStats
  20.   datasource:
  21.     #driver-class-name: org.postgresql.Driver
  22.     #url: jdbc:postgresql://127.0.0.1:5432/dolphinscheduler
  23.     driver-class-name: com.mysql.jdbc.Driver
  24.     url: jdbc:mysql://127.0.0.1:3306/dolphinscheduler?useUnicode=true&serverTimezone=UTC&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull
  25.     username: root
  26.     password:
  27.     hikari:
  28.       connection-test-query: select 1
  29.       minimum-idle: 5
  30.       auto-commit: true
  31.       validation-timeout: 3000
  32.       pool-name: DolphinScheduler
  33.       maximum-pool-size: 50
  34.       connection-timeout: 30000
  35.       idle-timeout: 600000
  36.       leak-detection-threshold: 0
  37.       initialization-fail-timeout: 1
  38.   quartz:
  39.     job-store-type: jdbc
  40.     jdbc:
  41.       initialize-schema: never
  42.     properties:
  43.       org.quartz.threadPool:threadPriority: 5
  44.       org.quartz.jobStore.isClustered: true
  45.       org.quartz.jobStore.class: org.quartz.impl.jdbcjobstore.JobStoreTX
  46.       org.quartz.scheduler.instanceId: AUTO
  47.       org.quartz.jobStore.tablePrefix: QRTZ_
  48.       org.quartz.jobStore.acquireTriggersWithinLock: true
  49.       org.quartz.scheduler.instanceName: DolphinScheduler
  50.       org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
  51.       org.quartz.jobStore.useProperties: false
  52.       org.quartz.threadPool.makeThreadsDaemons: true
  53.       org.quartz.threadPool.threadCount: 25
  54.       org.quartz.jobStore.misfireThreshold: 60000
  55.       org.quartz.scheduler.makeSchedulerThreadDaemon: true
  56. #      org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.PostgreSQLDelegate
  57.       org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate
  58.       org.quartz.jobStore.clusterCheckinInterval: 5000
  59. registry:
  60.   type: zookeeper
  61.   zookeeper:
  62.     namespace: dolphinscheduler
  63. #    connect-string: localhost:2181
  64.     connect-string: 10.255.158.70:2181
  65.     retry-policy:
  66.       base-sleep-time: 60ms
  67.       max-sleep: 300ms
  68.       max-retries: 5
  69.     session-timeout: 30s
  70.     connection-timeout: 9s
  71.     block-until-connected: 600ms
  72.     digest: ~
  73. master:
  74.   listen-port: 5678
  75.   # master fetch command num
  76.   fetch-command-num: 10
  77.   # master prepare execute thread number to limit handle commands in parallel
  78.   pre-exec-threads: 10
  79.   # master execute thread number to limit process instances in parallel
  80.   exec-threads: 100
  81.   # master dispatch task number per batch
  82.   dispatch-task-number: 3
  83.   # master host selector to select a suitable worker, default value: LowerWeight. Optional values include random, round_robin, lower_weight
  84.   host-selector: lower_weight
  85.   # master heartbeat interval, the unit is second
  86.   heartbeat-interval: 10
  87.   # master commit task retry times
  88.   task-commit-retry-times: 5
  89.   # master commit task interval, the unit is millisecond
  90.   task-commit-interval: 1000
  91.   state-wheel-interval: 5
  92.   # master max cpuload avg, only higher than the system cpu load average, master server can schedule. default value -1: the number of cpu cores * 2
  93.   max-cpu-load-avg: -1
  94.   # master reserved memory, only lower than system available memory, master server can schedule. default value 0.3, the unit is G
  95.   reserved-memory: 0.3
  96.   # failover interval, the unit is minute
  97.   failover-interval: 10
  98.   # kill yarn jon when failover taskInstance, default true
  99.   kill-yarn-job-when-task-failover: true
  100. server:
  101.   port: 5679
  102. management:
  103.   endpoints:
  104.     web:
  105.       exposure:
  106.         include: '*'
  107.   metrics:
  108.     tags:
  109.       application: ${spring.application.name}
  110. metrics:
  111.   enabled: true
  112. # Override by profile
  113. ---
  114. spring:
  115.   config:
  116.     activate:
  117.       on-profile: mysql
  118.   datasource:
  119.     driver-class-name: com.mysql.jdbc.Driver
  120.     url: jdbc:mysql://127.0.0.1:3306/dolphinscheduler?useUnicode=true&serverTimezone=UTC&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull
  121.   quartz:
  122.     properties:
  123.       org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate
复制代码
dolphinscheduler-worker application.yaml
  1. spring:
  2.   banner:
  3.     charset: UTF-8
  4.   application:
  5.     name: worker-server
  6.   jackson:
  7.     time-zone: UTC
  8.     date-format: "yyyy-MM-dd HH:mm:ss"
  9.   datasource:
  10.     #driver-class-name: org.postgresql.Driver
  11.     #url: jdbc:postgresql://127.0.0.1:5432/dolphinscheduler
  12.     driver-class-name: com.mysql.jdbc.Driver
  13.     url: jdbc:mysql://127.0.0.1:3306/dolphinscheduler?useUnicode=true&serverTimezone=UTC&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull
  14.     username: root
  15.     #password: root
  16.     password:
  17.     hikari:
  18.       connection-test-query: select 1
  19.       minimum-idle: 5
  20.       auto-commit: true
  21.       validation-timeout: 3000
  22.       pool-name: DolphinScheduler
  23.       maximum-pool-size: 50
  24.       connection-timeout: 30000
  25.       idle-timeout: 600000
  26.       leak-detection-threshold: 0
  27.       initialization-fail-timeout: 1
  28. registry:
  29.   type: zookeeper
  30.   zookeeper:
  31.     namespace: dolphinscheduler
  32. #    connect-string: localhost:2181
  33.     connect-string: 10.255.158.70:2181
  34.     retry-policy:
  35.       base-sleep-time: 60ms
  36.       max-sleep: 300ms
  37.       max-retries: 5
  38.     session-timeout: 30s
  39.     connection-timeout: 9s
  40.     block-until-connected: 600ms
  41.     digest: ~
  42. worker:
  43.   # worker listener port
  44.   listen-port: 1234
  45.   # worker execute thread number to limit task instances in parallel
  46.   exec-threads: 100
  47.   # worker heartbeat interval, the unit is second
  48.   heartbeat-interval: 10
  49.   # worker host weight to dispatch tasks, default value 100
  50.   host-weight: 100
  51.   # worker tenant auto create
  52.   tenant-auto-create: true
  53.   # worker max cpuload avg, only higher than the system cpu load average, worker server can be dispatched tasks. default value -1: the number of cpu cores * 2
  54.   max-cpu-load-avg: -1
  55.   # worker reserved memory, only lower than system available memory, worker server can be dispatched tasks. default value 0.3, the unit is G
  56.   reserved-memory: 0.3
  57.   # default worker groups separated by comma, like 'worker.groups=default,test'
  58.   groups:
  59.     - default
  60.   # alert server listen host
  61.   alert-listen-host: localhost
  62.   alert-listen-port: 50052
  63. server:
  64.   port: 1235
  65. management:
  66.   endpoints:
  67.     web:
  68.       exposure:
  69.         include: '*'
  70.   metrics:
  71.     tags:
  72.       application: ${spring.application.name}
  73. metrics:
  74.   enabled: true
复制代码
主要关注数据库,quartz, zookeeper, masker, worker配置。
2.2 API主要任务操作接口

其他业务接口可以不用关注,只需要关注最最主要的流程上线功能接口,此接口可以发散出所有的任务调度相关的代码。
接口:/dolphinscheduler/projects/{projectCode}/schedules/{id}/online;此接口会将定义的流程提交到Quartz调度框架;代码如下:
public Map setScheduleState(User loginUser,                                                 long projectCode,                                                 Integer id,                                                 ReleaseState scheduleStatus) {         Map result = new HashMap();
Project project = projectMapper.queryByCode(projectCode);         // check project auth         boolean hasProjectAndPerm = projectService.hasProjectAndPerm(loginUser, project, result);         if (!hasProjectAndPerm) {             return result;         }
// check schedule exists         Schedule scheduleObj = scheduleMapper.selectById(id);
if (scheduleObj == null) {             putMsg(result, Status.SCHEDULE_CRON_NOT_EXISTS, id);             return result;         }         // check schedule release state         if (scheduleObj.getReleaseState() == scheduleStatus) {             logger.info("schedule release is already {},needn't to change schedule id: {} from {} to {}",                     scheduleObj.getReleaseState(), scheduleObj.getId(), scheduleObj.getReleaseState(), scheduleStatus);             putMsg(result, Status.SCHEDULE_CRON_REALEASE_NEED_NOT_CHANGE, scheduleStatus);             return result;         }         ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(scheduleObj.getProcessDefinitionCode());         if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {             putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(scheduleObj.getProcessDefinitionCode()));             return result;         }         List processTaskRelations = processTaskRelationMapper.queryByProcessCode(projectCode, scheduleObj.getProcessDefinitionCode());         if (processTaskRelations.isEmpty()) {             putMsg(result, Status.PROCESS_DAG_IS_EMPTY);             return result;         }         if (scheduleStatus == ReleaseState.ONLINE) {             // check process definition release state             if (processDefinition.getReleaseState() != ReleaseState.ONLINE) {                 logger.info("not release process definition id: {} , name : {}",                         processDefinition.getId(), processDefinition.getName());                 putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, processDefinition.getName());                 return result;             }             // check sub process definition release state             List subProcessDefineCodes = new ArrayList();             processService.recurseFindSubProcess(processDefinition.getCode(), subProcessDefineCodes);             if (!subProcessDefineCodes.isEmpty()) {                 List subProcessDefinitionList =                         processDefinitionMapper.queryByCodes(subProcessDefineCodes);                 if (subProcessDefinitionList != null && !subProcessDefinitionList.isEmpty()) {                     for (ProcessDefinition subProcessDefinition : subProcessDefinitionList) {                         /**                          * if there is no online process, exit directly                          */                         if (subProcessDefinition.getReleaseState() != ReleaseState.ONLINE) {                             logger.info("not release process definition id: {} , name : {}",                                     subProcessDefinition.getId(), subProcessDefinition.getName());                             putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, String.valueOf(subProcessDefinition.getId()));                             return result;                         }                     }                 }             }         }
// check master server exists         List masterServers = monitorService.getServerListFromRegistry(true);
if (masterServers.isEmpty()) {             putMsg(result, Status.MASTER_NOT_EXISTS);             return result;         }
// set status         scheduleObj.setReleaseState(scheduleStatus);
scheduleMapper.updateById(scheduleObj);
try {             switch (scheduleStatus) {                 case ONLINE:                     logger.info("Call master client set schedule online, project id: {}, flow id: {},host: {}", project.getId(), processDefinition.getId(), masterServers);                     setSchedule(project.getId(), scheduleObj);                     break;                 case OFFLINE:                     logger.info("Call master client set schedule offline, project id: {}, flow id: {},host: {}", project.getId(), processDefinition.getId(), masterServers);                     deleteSchedule(project.getId(), id);                     break;                 default:                     putMsg(result, Status.SCHEDULE_STATUS_UNKNOWN, scheduleStatus.toString());                     return result;             }         } catch (Exception e) {             result.put(Constants.MSG, scheduleStatus == ReleaseState.ONLINE ? "set online failure" : "set offline failure");             throw new ServiceException(result.get(Constants.MSG).toString(), e);         }
putMsg(result, Status.SUCCESS);         return result;     }
  1. public void setSchedule(int projectId, Schedule schedule) {
  2.         logger.info("set schedule, project id: {}, scheduleId: {}", projectId, schedule.getId());
  3.         quartzExecutor.addJob(ProcessScheduleJob.class, projectId, schedule);
  4.     }
复制代码
[code]public void addJob(Class

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

玛卡巴卡的卡巴卡玛

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表