版本:3.1.0-SNAPSHOTgit地址:https://github.com/apache/shardingsphere-elasticjob Maven 坐标- <dependency>
- <groupId>org.apache.shardingsphere.elasticjob</groupId>
- <artifactId>elasticjob-lite-spring-boot-starter</artifactId>
- <version>${latest.version}</version>
- </dependency>
复制代码 Spring.factories配置- org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
- org.apache.shardingsphere.elasticjob.lite.spring.boot.job.ElasticJobLiteAutoConfiguration
复制代码 在添加elasticjob-lite-spring-boot-starter启动类的时候,会自动加载ElasticJobLiteAutoConfiguration,接下来看下ElasticJobLiteAutoConfiguration中所做的处理。 ElasticJobLiteAutoConfiguration.java- /**
- * ElasticJob-Lite auto configuration.
- */
- @Configuration(proxyBeanMethods = false)
- @AutoConfigureAfter(DataSourceAutoConfiguration.class)
- /**
- * elastic job 开关
- * elasticjob.enabled.ture默认为true
- */
- @ConditionalOnProperty(name = "elasticjob.enabled", havingValue = "true", matchIfMissing = true)
- /**
- * 导入
- * ElasticJobRegistryCenterConfiguration.class 注册中心配置
- * ElasticJobTracingConfiguration.class job事件追踪配置
- * ElasticJobSnapshotServiceConfiguration.class 快照配置
- */
- @Import({ElasticJobRegistryCenterConfiguration.class, ElasticJobTracingConfiguration.class, ElasticJobSnapshotServiceConfiguration.class})
- /**
- * job相关配置信息
- */
- @EnableConfigurationProperties(ElasticJobProperties.class)
- public class ElasticJobLiteAutoConfiguration {
-
- @Configuration(proxyBeanMethods = false)
- /**
- * ElasticJobBootstrapConfiguration.class 创建job beans 注入spring容器
- * ScheduleJobBootstrapStartupRunner.class 执行类型为ScheduleJobBootstrap.class 的job开始运行
- */
- @Import({ElasticJobBootstrapConfiguration.class, ScheduleJobBootstrapStartupRunner.class})
- protected static class ElasticJobConfiguration {
- }
- }
复制代码 Elastic-job 是利用zookeeper 实现分布式job的功能,所以在自动装配的时候,需要有zookeeper注册中心的配置。自动装配主要做了4件事事1.配置zookeeper 客户端信息,启动连接zookeeper.2.配置事件追踪数据库,用于保存job运行记录3.解析所有job配置文件,将所有job的bean放置在spring 单例bean中4.识别job类型,在zookeeper节点上处理job节点数据,运行定时任务job. 第一件事:配置zookeeper 客户端信息,启动连接zookeeper. ZookeeperRegistryCenter.class- public void init() {
- log.debug("Elastic job: zookeeper registry center init, server lists is: {}.", zkConfig.getServerLists());
- CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
- //设置zookeeper 服务器地址
- .connectString(zkConfig.getServerLists())
- //设置重试机制
- .retryPolicy(new ExponentialBackoffRetry(zkConfig.getBaseSleepTimeMilliseconds(), zkConfig.getMaxRetries(), zkConfig.getMaxSleepTimeMilliseconds()))
- //设置命名空间,zookeeper节点名称
- .namespace(zkConfig.getNamespace());
- //设置session超时时间
- if (0 != zkConfig.getSessionTimeoutMilliseconds()) {
- builder.sessionTimeoutMs(zkConfig.getSessionTimeoutMilliseconds());
- }
- //设置连接超时时间
- if (0 != zkConfig.getConnectionTimeoutMilliseconds()) {
- builder.connectionTimeoutMs(zkConfig.getConnectionTimeoutMilliseconds());
- }
- if (!Strings.isNullOrEmpty(zkConfig.getDigest())) {
- builder.authorization("digest", zkConfig.getDigest().getBytes(StandardCharsets.UTF_8))
- .aclProvider(new ACLProvider() {
-
- @Override
- public List<ACL> getDefaultAcl() {
- return ZooDefs.Ids.CREATOR_ALL_ACL;
- }
-
- @Override
- public List<ACL> getAclForPath(final String path) {
- return ZooDefs.Ids.CREATOR_ALL_ACL;
- }
- });
- }
- client = builder.build();
- //zookeeper 客户端开始启动
- client.start();
- try {
- //zookeeper 客户端一直连接
- if (!client.blockUntilConnected(zkConfig.getMaxSleepTimeMilliseconds() * zkConfig.getMaxRetries(), TimeUnit.MILLISECONDS)) {
- client.close();
- throw new KeeperException.OperationTimeoutException();
- }
- //CHECKSTYLE:OFF
- } catch (final Exception ex) {
- //CHECKSTYLE:ON
- RegExceptionHandler.handleException(ex);
- }
- }
复制代码
第二件事: 配置事件追踪数据库,用于保存job运行记录
ElasticJobTracingConfiguration.java
- /**
- * Create a bean of tracing DataSource.
- *
- * @param tracingProperties tracing Properties
- * @return tracing DataSource
- */
- @Bean("tracingDataSource")
- //spring中注入bean name 为tracingDataSource的job数据库连接信息
- public DataSource tracingDataSource(final TracingProperties tracingProperties) {
- //获取elastic-job 数据库配置
- DataSourceProperties dataSource = tracingProperties.getDataSource();
- if (dataSource == null) {
- return null;
- }
- HikariDataSource tracingDataSource = new HikariDataSource();
- tracingDataSource.setJdbcUrl(dataSource.getUrl());
- BeanUtils.copyProperties(dataSource, tracingDataSource);
- return tracingDataSource;
- }
- /**
- * Create a bean of tracing configuration.
- *
- * @param dataSource required by constructor
- * @param tracingDataSource tracing ataSource
- * @return a bean of tracing configuration
- */
- @Bean
- @ConditionalOnBean(DataSource.class)
- @ConditionalOnProperty(name = "elasticjob.tracing.type", havingValue = "RDB")
- public TracingConfiguration<DataSource> tracingConfiguration(final DataSource dataSource, @Nullable final DataSource tracingDataSource) {
- /**
- * dataSource 是业务数据库
- * tracingDataSource 是job数据库
- * 当配置elasticjob.tracing.type = RDB时,如果单独配置job数据库是,默认使用job数据库作为job运行轨迹的记录
- * 但这边同时业务数据库和job追踪数据库同时注入是,mybatis-plus 结合@Table 使用的时候,很有可能找不到正确对应的数据源
- */
- DataSource ds = tracingDataSource;
- if (ds == null) {
- ds = dataSource;
- }
- return new TracingConfiguration<>("RDB", ds);
- }
复制代码
通过elasticjob.tracing.type=RDB的配置开启事件追踪功能,这边job的事件追踪数据源可以和业务数据源配置不一样。
第三件事:解析所有job配置文件
ElasticJobBootstrapConfiguration.class
- public void createJobBootstrapBeans() {
- //获取job配置
- ElasticJobProperties elasticJobProperties = applicationContext.getBean(ElasticJobProperties.class);
- //获取单利注册对象
- SingletonBeanRegistry singletonBeanRegistry = ((ConfigurableApplicationContext) applicationContext).getBeanFactory();
- //获取注入zookeeper 客户端
- CoordinatorRegistryCenter registryCenter = applicationContext.getBean(CoordinatorRegistryCenter.class);
- //获取job事件追踪
- TracingConfiguration<?> tracingConfig = getTracingConfiguration();
- //构造JobBootstraps
- constructJobBootstraps(elasticJobProperties, singletonBeanRegistry, registryCenter, tracingConfig);
- }
复制代码 重要的是constructJobBootstraps 这个方法,来看下- private void constructJobBootstraps(final ElasticJobProperties elasticJobProperties, final SingletonBeanRegistry singletonBeanRegistry,
- final CoordinatorRegistryCenter registryCenter, final TracingConfiguration<?> tracingConfig) {
- //遍历配置的每一个job
- for (Map.Entry<String, ElasticJobConfigurationProperties> entry : elasticJobProperties.getJobs().entrySet()) {
- ElasticJobConfigurationProperties jobConfigurationProperties = entry.getValue();
- //校验 job class 和 type 都为空抛异常
- Preconditions.checkArgument(null != jobConfigurationProperties.getElasticJobClass()
- || !Strings.isNullOrEmpty(jobConfigurationProperties.getElasticJobType()),
- "Please specific [elasticJobClass] or [elasticJobType] under job configuration.");
- //校验 job class 和 type 都有 报相互排斥
- Preconditions.checkArgument(null == jobConfigurationProperties.getElasticJobClass()
- || Strings.isNullOrEmpty(jobConfigurationProperties.getElasticJobType()),
- "[elasticJobClass] and [elasticJobType] are mutually exclusive.");
- if (null != jobConfigurationProperties.getElasticJobClass()) {
- //通过class 注入job
- registerClassedJob(entry.getKey(), entry.getValue().getJobBootstrapBeanName(), singletonBeanRegistry, registryCenter, tracingConfig, jobConfigurationProperties);
- } else if (!Strings.isNullOrEmpty(jobConfigurationProperties.getElasticJobType())) {
- //通过type 注入job
- registerTypedJob(entry.getKey(), entry.getValue().getJobBootstrapBeanName(), singletonBeanRegistry, registryCenter, tracingConfig, jobConfigurationProperties);
- }
- }
- }
复制代码 Job 有两种类型的注入,第一种是是class,配置成job的全路径信息注入 再来看看registerClassedJob 方法里的内容- private void registerClassedJob(final String jobName, final String jobBootstrapBeanName, final SingletonBeanRegistry singletonBeanRegistry, final CoordinatorRegistryCenter registryCenter,
- final TracingConfiguration<?> tracingConfig, final ElasticJobConfigurationProperties jobConfigurationProperties) {
- //获取job配置
- JobConfiguration jobConfig = jobConfigurationProperties.toJobConfiguration(jobName);
- //配置job事件追踪
- jobExtraConfigurations(jobConfig, tracingConfig);
- //获取job类型
- ElasticJob elasticJob = applicationContext.getBean(jobConfigurationProperties.getElasticJobClass());
- //没有配置cron表达式 就初始化为OneOffJobBootstrap对象,一次性任务
- if (Strings.isNullOrEmpty(jobConfig.getCron())) {
- Preconditions.checkArgument(!Strings.isNullOrEmpty(jobBootstrapBeanName), "The property [jobBootstrapBeanName] is required for One-off job.");
- singletonBeanRegistry.registerSingleton(jobBootstrapBeanName, new OneOffJobBootstrap(registryCenter, elasticJob, jobConfig));
- } else {
- //有配置cron表达式 就初始化为ScheduleJobBootstrap对象,定时任务
- //设置bean name
- String beanName = !Strings.isNullOrEmpty(jobBootstrapBeanName) ? jobBootstrapBeanName : jobConfig.getJobName() + "ScheduleJobBootstrap";
- //注入ScheduleJobBootstrap对象为单利对象
- singletonBeanRegistry.registerSingleton(beanName, new ScheduleJobBootstrap(registryCenter, elasticJob, jobConfig));
- }
- }
复制代码 Class 类型注入的job有两种类型1.ScheduleJobBootstrap:定时任务类型的job。2.OneOffJobBootstrap:一定次job类型。 看下定义的new ScheduleJobBootstrap 方法- public JobScheduler(final CoordinatorRegistryCenter regCenter, final ElasticJob elasticJob, final JobConfiguration jobConfig) {
- Preconditions.checkArgument(null != elasticJob, "Elastic job cannot be null.");
- this.regCenter = regCenter;
- //获取job监听器
- Collection<ElasticJobListener> jobListeners = getElasticJobListeners(jobConfig);
- // 集成所有操作zookeeper 节点的services,job 监听器
- setUpFacade = new SetUpFacade(regCenter, jobConfig.getJobName(), jobListeners);
- //获取当前job名称
- String jobClassName = JobClassNameProviderFactory.getProvider().getJobClassName(elasticJob);
- //zookeeper节点 {namespace}/{jobclassname}/config 放置job配置信息
- this.jobConfig = setUpFacade.setUpJobConfiguration(jobClassName, jobConfig);
- // 集成所有操作zookeeper 节点的services
- schedulerFacade = new SchedulerFacade(regCenter, jobConfig.getJobName());
- jobFacade = new LiteJobFacade(regCenter, jobConfig.getJobName(), jobListeners, findTracingConfiguration().orElse(null));
- //检验job配置
- validateJobProperties();
- //定义job执行器
- jobExecutor = new ElasticJobExecutor(elasticJob, this.jobConfig, jobFacade);
- //监听器里注入GuaranteeService
- setGuaranteeServiceForElasticJobListeners(regCenter, jobListeners);
- //创建定时任务,开始执行
- jobScheduleController = createJobScheduleController();
- }
复制代码
看下createJobScheduleController- private JobScheduleController createJobScheduleController() {
- JobScheduleController result = new JobScheduleController(createScheduler(), createJobDetail(), getJobConfig().getJobName());
- //注册job
- JobRegistry.getInstance().registerJob(getJobConfig().getJobName(), result);
- //注册器开始运行
- registerStartUpInfo();
- return result;
- }
复制代码 看下registerStartUpInfo方法- public void registerStartUpInfo(final boolean enabled) {
- //开始所有的监听器
- listenerManager.startAllListeners();
- //选举leader /{namespace}/leader/election/instance 放置选举出来的服务器
- leaderService.electLeader();
- //{namespace}/{ipservers} 设置enable处理
- serverService.persistOnline(enabled);
- //临时节点 /{namespave}/instances 放置运行服务实例信息
- instanceService.persistOnline();
- //开启一个异步服务
- if (!reconcileService.isRunning()) {
- reconcileService.startAsync();
- }
- }
复制代码 这里实行的操作:1.开启所有监听器处理2.leader选举3.持久化节点数据4.开启异步服务 第四步:4.识别job类型,在zookeeper节点上处理job节点数据,运行定时任务job. - @Override
- public void run(final String... args) {
- log.info("Starting ElasticJob Bootstrap.");
- applicationContext.getBeansOfType(ScheduleJobBootstrap.class).values().forEach(ScheduleJobBootstrap::schedule);
- log.info("ElasticJob Bootstrap started.");
- }
复制代码 获取到所有的定时任务job(ScheduleJobBootstrap类型),执行schedule方法,底层实际使用quartz框架运行定时任务。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |