DolphinScheduler心脏:Quartz的定时任务调度框架深度解析

打印 上一主题 下一主题

主题 815|帖子 815|积分 2445

Quartz是一个开源的Java作业调度框架,它提供了强大的定时任务调度功能。在DolphinScheduler中,Quartz用于实现定时任务的调度和管理。DolphinScheduler通过QuartzExecutorImpl类与Quartz集成,将工作流及其定时管理操作与Quartz调度框架相结合,实现任务的调度实行。
本文将详细分析Quartz的原理机制,以及在Dolphinscheduler中使用Quartz的原理。
Quartz ER图



  • QRTZ_JOB_DETAILS 和 QRTZ_TRIGGERS 是中央表,定义了任务与触发器之间的关系;
  • QRTZ_TRIGGERS 表通过外键关联了多个触发器类型表,如 QRTZ_SIMPLE_TRIGGERS 和 QRTZ_CRON_TRIGGERS,用于实现不同类型的触发方式;
  • QRTZ_FIRED_TRIGGERS 用于记载每次任务实行的历史,与任务和触发器表都有关联;
  • QRTZ_CALENDARS 用于定义触发器的日历排除规则,QRTZ_PAUSED_TRIGGER_GRPS 用于管理触发器组的暂停状态;
  • QRTZ_SCHEDULER_STATE 和 QRTZ_LOCKS 重要用于集群环境中的任务调度协调,确保高可用性。
Dolphinscheduler Quartz使用

新建SHELL任务


流程定义上线并配置调度



定时上线


流程实例运行效果


原理分析

创建调度
  1. org.apache.dolphinscheduler.api.controller.SchedulerController#createSchedule
  2. --org.apache.dolphinscheduler.api.service.impl.SchedulerServiceImpl#insertSchedule
  3. ....
  4. Schedule scheduleObj = new Schedule();
  5. Date now = new Date();
  6. scheduleObj.setTenantCode(tenantCode);
  7. scheduleObj.setProjectName(project.getName());
  8. scheduleObj.setProcessDefinitionCode(processDefineCode);
  9. scheduleObj.setProcessDefinitionName(processDefinition.getName());
  10. ScheduleParam scheduleParam = JSONUtils.parseObject(schedule, ScheduleParam.class);
  11. scheduleObj.setCrontab(scheduleParam.getCrontab());
  12. scheduleObj.setTimezoneId(scheduleParam.getTimezoneId());
  13. scheduleObj.setWarningType(warningType);
  14. scheduleObj.setWarningGroupId(warningGroupId);
  15. scheduleObj.setFailureStrategy(failureStrategy);
  16. scheduleObj.setCreateTime(now);
  17. scheduleObj.setUpdateTime(now);
  18. scheduleObj.setUserId(loginUser.getId());
  19. scheduleObj.setUserName(loginUser.getUserName());
  20. scheduleObj.setReleaseState(ReleaseState.OFFLINE);
  21. scheduleObj.setProcessInstancePriority(processInstancePriority);
  22. scheduleObj.setWorkerGroup(workerGroup);
  23. scheduleObj.setEnvironmentCode(environmentCode);
  24. scheduleMapper.insert(scheduleObj);
  25. ....
复制代码
核心其实就是向 schedule 表中插入了一条数据而已,如下 :

调度上线
  1. org.apache.dolphinscheduler.api.controller.SchedulerController#publishScheduleOnline
  2. --org.apache.dolphinscheduler.api.service.impl.SchedulerServiceImpl#onlineScheduler
  3. ----org.apache.dolphinscheduler.api.service.impl.SchedulerServiceImpl#doOnlineScheduler
  4. ------org.apache.dolphinscheduler.scheduler.quartz.QuartzScheduler#insertOrUpdateScheduleTask
  5. 精简代码如下 :
  6. // TODO 使用schedule id和projectId封装 JobKey,比如jobName=job_25(schedulerId),jobGroup=jobgroup_1(projectId)
  7. JobKey jobKey = QuartzTaskUtils.getJobKey(schedule.getId(), projectId);
  8. // TODO 使用projectId和schedule封装jobDataMap,里面封装的是projectId、scheduleId和schedule(JSON存储)
  9. Map<String, Object> jobDataMap = QuartzTaskUtils.buildDataMap(projectId, schedule);
  10. // TODO 获取cron表达式
  11. String cronExpression = schedule.getCrontab();
  12. // TODO 获取时区
  13. String timezoneId = schedule.getTimezoneId();
  14. // TODO 定时调度的开启时间
  15. Date startDate = DateUtils.transformTimezoneDate(schedule.getStartTime(), timezoneId);
  16. // TODO 定时调度的结束时间
  17. Date endDate = DateUtils.transformTimezoneDate(schedule.getEndTime(), timezoneId);
  18. jobDetail jobDetail = newJob(ProcessScheduleTask.class).withIdentity(jobKey).build();
  19. jobDetail.getJobDataMap().putAll(jobDataMap);
  20. // TODO 创建一个Job
  21. scheduler.addJob(jobDetail, false, true);
  22. // TODO 封装Trigger
  23. TriggerKey triggerKey = new TriggerKey(jobKey.getName(), jobKey.getGroup());
  24. CronTrigger cronTrigger = newTrigger()
  25.                     .withIdentity(triggerKey)
  26.                     .startAt(startDate)
  27.                     .endAt(endDate)
  28.                     .withSchedule(
  29.                             cronSchedule(cronExpression)
  30.                                     .withMisfireHandlingInstructionIgnoreMisfires()
  31.                                     .inTimeZone(DateUtils.getTimezone(timezoneId)))
  32.                     .forJob(jobDetail).build();
  33. // TODO 开始调度
  34. scheduler.scheduleJob(cronTrigger);
复制代码
对应的表

存储每个任务的详细信息

存储触发器的基本信息,是全部触发器类型的父表

存储 Cron 表达式触发器(Cron Trigger)的信息

调度实行
  1. org.apache.dolphinscheduler.scheduler.quartz.ProcessScheduleTask,这个类是 qrtz_job_details 中的 JOB_CLASS_NAME 字段
  2. protected void executeInternal(JobExecutionContext context) {
  3.     JobDataMap dataMap = context.getJobDetail().getJobDataMap();
  4.     int projectId = dataMap.getInt(QuartzTaskUtils.PROJECT_ID);
  5.     int scheduleId = dataMap.getInt(QuartzTaskUtils.SCHEDULE_ID);
  6.     Date scheduledFireTime = context.getScheduledFireTime();
  7.     Date fireTime = context.getFireTime();
  8.     Command command = new Command();
  9.     command.setCommandType(CommandType.SCHEDULER);
  10.     command.setExecutorId(schedule.getUserId());
  11.     command.setFailureStrategy(schedule.getFailureStrategy());
  12.     command.setProcessDefinitionCode(schedule.getProcessDefinitionCode());
  13.     command.setScheduleTime(scheduledFireTime);
  14.     command.setStartTime(fireTime);
  15.     command.setWarningGroupId(schedule.getWarningGroupId());
  16.     String workerGroup = StringUtils.isEmpty(schedule.getWorkerGroup()) ? Constants.DEFAULT_WORKER_GROUP
  17.             : schedule.getWorkerGroup();
  18.     command.setWorkerGroup(workerGroup);
  19.     command.setTenantCode(schedule.getTenantCode());
  20.     command.setEnvironmentCode(schedule.getEnvironmentCode());
  21.     command.setWarningType(schedule.getWarningType());
  22.     command.setProcessInstancePriority(schedule.getProcessInstancePriority());
  23.     command.setProcessDefinitionVersion(processDefinition.getVersion());
  24.     commandService.createCommand(command);
  25. }
复制代码
说白了,这个就是quartz的一个回调函数,终极生成Command。
转载自Journey
原文链接:https://segmentfault.com/a/1190000045471756
本文由 白鲸开源 提供发布支持!

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

大连密封材料

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表