Quartz是一个开源的Java作业调度框架,它提供了强大的定时任务调度功能。在DolphinScheduler中,Quartz用于实现定时任务的调度和管理。DolphinScheduler通过QuartzExecutorImpl类与Quartz集成,将工作流及其定时管理操作与Quartz调度框架相结合,实现任务的调度实行。
本文将详细分析Quartz的原理机制,以及在Dolphinscheduler中使用Quartz的原理。
Quartz ER图
- QRTZ_JOB_DETAILS 和 QRTZ_TRIGGERS 是中央表,定义了任务与触发器之间的关系;
- QRTZ_TRIGGERS 表通过外键关联了多个触发器类型表,如 QRTZ_SIMPLE_TRIGGERS 和 QRTZ_CRON_TRIGGERS,用于实现不同类型的触发方式;
- QRTZ_FIRED_TRIGGERS 用于记载每次任务实行的历史,与任务和触发器表都有关联;
- QRTZ_CALENDARS 用于定义触发器的日历排除规则,QRTZ_PAUSED_TRIGGER_GRPS 用于管理触发器组的暂停状态;
- QRTZ_SCHEDULER_STATE 和 QRTZ_LOCKS 重要用于集群环境中的任务调度协调,确保高可用性。
Dolphinscheduler Quartz使用
新建SHELL任务
流程定义上线并配置调度
定时上线
流程实例运行效果
原理分析
创建调度
- org.apache.dolphinscheduler.api.controller.SchedulerController#createSchedule
- --org.apache.dolphinscheduler.api.service.impl.SchedulerServiceImpl#insertSchedule
- ....
- Schedule scheduleObj = new Schedule();
- Date now = new Date();
- scheduleObj.setTenantCode(tenantCode);
- scheduleObj.setProjectName(project.getName());
- scheduleObj.setProcessDefinitionCode(processDefineCode);
- scheduleObj.setProcessDefinitionName(processDefinition.getName());
- ScheduleParam scheduleParam = JSONUtils.parseObject(schedule, ScheduleParam.class);
- scheduleObj.setCrontab(scheduleParam.getCrontab());
- scheduleObj.setTimezoneId(scheduleParam.getTimezoneId());
- scheduleObj.setWarningType(warningType);
- scheduleObj.setWarningGroupId(warningGroupId);
- scheduleObj.setFailureStrategy(failureStrategy);
- scheduleObj.setCreateTime(now);
- scheduleObj.setUpdateTime(now);
- scheduleObj.setUserId(loginUser.getId());
- scheduleObj.setUserName(loginUser.getUserName());
- scheduleObj.setReleaseState(ReleaseState.OFFLINE);
- scheduleObj.setProcessInstancePriority(processInstancePriority);
- scheduleObj.setWorkerGroup(workerGroup);
- scheduleObj.setEnvironmentCode(environmentCode);
- scheduleMapper.insert(scheduleObj);
- ....
复制代码 核心其实就是向 schedule 表中插入了一条数据而已,如下 :

调度上线
- org.apache.dolphinscheduler.api.controller.SchedulerController#publishScheduleOnline
- --org.apache.dolphinscheduler.api.service.impl.SchedulerServiceImpl#onlineScheduler
- ----org.apache.dolphinscheduler.api.service.impl.SchedulerServiceImpl#doOnlineScheduler
- ------org.apache.dolphinscheduler.scheduler.quartz.QuartzScheduler#insertOrUpdateScheduleTask
- 精简代码如下 :
- // TODO 使用schedule id和projectId封装 JobKey,比如jobName=job_25(schedulerId),jobGroup=jobgroup_1(projectId)
- JobKey jobKey = QuartzTaskUtils.getJobKey(schedule.getId(), projectId);
- // TODO 使用projectId和schedule封装jobDataMap,里面封装的是projectId、scheduleId和schedule(JSON存储)
- Map<String, Object> jobDataMap = QuartzTaskUtils.buildDataMap(projectId, schedule);
- // TODO 获取cron表达式
- String cronExpression = schedule.getCrontab();
- // TODO 获取时区
- String timezoneId = schedule.getTimezoneId();
- // TODO 定时调度的开启时间
- Date startDate = DateUtils.transformTimezoneDate(schedule.getStartTime(), timezoneId);
- // TODO 定时调度的结束时间
- Date endDate = DateUtils.transformTimezoneDate(schedule.getEndTime(), timezoneId);
- jobDetail jobDetail = newJob(ProcessScheduleTask.class).withIdentity(jobKey).build();
- jobDetail.getJobDataMap().putAll(jobDataMap);
- // TODO 创建一个Job
- scheduler.addJob(jobDetail, false, true);
- // TODO 封装Trigger
- TriggerKey triggerKey = new TriggerKey(jobKey.getName(), jobKey.getGroup());
- CronTrigger cronTrigger = newTrigger()
- .withIdentity(triggerKey)
- .startAt(startDate)
- .endAt(endDate)
- .withSchedule(
- cronSchedule(cronExpression)
- .withMisfireHandlingInstructionIgnoreMisfires()
- .inTimeZone(DateUtils.getTimezone(timezoneId)))
- .forJob(jobDetail).build();
- // TODO 开始调度
- scheduler.scheduleJob(cronTrigger);
复制代码 对应的表
存储每个任务的详细信息
存储触发器的基本信息,是全部触发器类型的父表
存储 Cron 表达式触发器(Cron Trigger)的信息
调度实行
- org.apache.dolphinscheduler.scheduler.quartz.ProcessScheduleTask,这个类是 qrtz_job_details 中的 JOB_CLASS_NAME 字段
- protected void executeInternal(JobExecutionContext context) {
- JobDataMap dataMap = context.getJobDetail().getJobDataMap();
- int projectId = dataMap.getInt(QuartzTaskUtils.PROJECT_ID);
- int scheduleId = dataMap.getInt(QuartzTaskUtils.SCHEDULE_ID);
- Date scheduledFireTime = context.getScheduledFireTime();
- Date fireTime = context.getFireTime();
- Command command = new Command();
- command.setCommandType(CommandType.SCHEDULER);
- command.setExecutorId(schedule.getUserId());
- command.setFailureStrategy(schedule.getFailureStrategy());
- command.setProcessDefinitionCode(schedule.getProcessDefinitionCode());
- command.setScheduleTime(scheduledFireTime);
- command.setStartTime(fireTime);
- command.setWarningGroupId(schedule.getWarningGroupId());
- String workerGroup = StringUtils.isEmpty(schedule.getWorkerGroup()) ? Constants.DEFAULT_WORKER_GROUP
- : schedule.getWorkerGroup();
- command.setWorkerGroup(workerGroup);
- command.setTenantCode(schedule.getTenantCode());
- command.setEnvironmentCode(schedule.getEnvironmentCode());
- command.setWarningType(schedule.getWarningType());
- command.setProcessInstancePriority(schedule.getProcessInstancePriority());
- command.setProcessDefinitionVersion(processDefinition.getVersion());
- commandService.createCommand(command);
- }
复制代码 说白了,这个就是quartz的一个回调函数,终极生成Command。
转载自Journey
原文链接:https://segmentfault.com/a/1190000045471756
本文由 白鲸开源 提供发布支持!
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |