elastic-job源码(1)- job自动装配

打印 上一主题 下一主题

主题 886|帖子 886|积分 2660

版本:3.1.0-SNAPSHOTgit地址https://github.com/apache/shardingsphere-elasticjob Maven 坐标
  1. <dependency>
  2.     <groupId>org.apache.shardingsphere.elasticjob</groupId>
  3.     <artifactId>elasticjob-lite-spring-boot-starter</artifactId>
  4.     <version>${latest.version}</version>
  5. </dependency>
复制代码
 Spring.factories配置
  1. org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
  2.   org.apache.shardingsphere.elasticjob.lite.spring.boot.job.ElasticJobLiteAutoConfiguration
复制代码
在添加elasticjob-lite-spring-boot-starter启动类的时候,会自动加载ElasticJobLiteAutoConfiguration,接下来看下ElasticJobLiteAutoConfiguration中所做的处理。 ElasticJobLiteAutoConfiguration.java
  1. /**
  2. * ElasticJob-Lite auto configuration.
  3. */
  4. @Configuration(proxyBeanMethods = false)
  5. @AutoConfigureAfter(DataSourceAutoConfiguration.class)
  6. /**
  7. * elastic job 开关
  8. * elasticjob.enabled.ture默认为true
  9. */
  10. @ConditionalOnProperty(name = "elasticjob.enabled", havingValue = "true", matchIfMissing = true)
  11. /**
  12. * 导入
  13. * ElasticJobRegistryCenterConfiguration.class 注册中心配置
  14. * ElasticJobTracingConfiguration.class job事件追踪配置
  15. * ElasticJobSnapshotServiceConfiguration.class 快照配置
  16. */
  17. @Import({ElasticJobRegistryCenterConfiguration.class, ElasticJobTracingConfiguration.class, ElasticJobSnapshotServiceConfiguration.class})
  18. /**
  19. * job相关配置信息
  20. */
  21. @EnableConfigurationProperties(ElasticJobProperties.class)
  22. public class ElasticJobLiteAutoConfiguration {
  23.    
  24.     @Configuration(proxyBeanMethods = false)
  25.     /**
  26.      * ElasticJobBootstrapConfiguration.class  创建job beans 注入spring容器
  27.      * ScheduleJobBootstrapStartupRunner.class  执行类型为ScheduleJobBootstrap.class 的job开始运行
  28.      */
  29.     @Import({ElasticJobBootstrapConfiguration.class, ScheduleJobBootstrapStartupRunner.class})
  30.     protected static class ElasticJobConfiguration {
  31.     }
  32. }
复制代码
Elastic-job 是利用zookeeper 实现分布式job的功能,所以在自动装配的时候,需要有zookeeper注册中心的配置。自动装配主要做了4件事事1.配置zookeeper 客户端信息,启动连接zookeeper.2.配置事件追踪数据库,用于保存job运行记录3.解析所有job配置文件,将所有job的bean放置在spring 单例bean中4.识别job类型,在zookeeper节点上处理job节点数据,运行定时任务job.  第一件事:配置zookeeper 客户端信息,启动连接zookeeper. ZookeeperRegistryCenter.class
  1. public void init() {
  2.     log.debug("Elastic job: zookeeper registry center init, server lists is: {}.", zkConfig.getServerLists());
  3.     CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
  4.             //设置zookeeper 服务器地址
  5.             .connectString(zkConfig.getServerLists())
  6.             //设置重试机制
  7.             .retryPolicy(new ExponentialBackoffRetry(zkConfig.getBaseSleepTimeMilliseconds(), zkConfig.getMaxRetries(), zkConfig.getMaxSleepTimeMilliseconds()))
  8.             //设置命名空间,zookeeper节点名称
  9.             .namespace(zkConfig.getNamespace());
  10.     //设置session超时时间
  11.     if (0 != zkConfig.getSessionTimeoutMilliseconds()) {
  12.         builder.sessionTimeoutMs(zkConfig.getSessionTimeoutMilliseconds());
  13.     }
  14.     //设置连接超时时间
  15.     if (0 != zkConfig.getConnectionTimeoutMilliseconds()) {
  16.         builder.connectionTimeoutMs(zkConfig.getConnectionTimeoutMilliseconds());
  17.     }
  18.     if (!Strings.isNullOrEmpty(zkConfig.getDigest())) {
  19.         builder.authorization("digest", zkConfig.getDigest().getBytes(StandardCharsets.UTF_8))
  20.                 .aclProvider(new ACLProvider() {
  21.                
  22.                     @Override
  23.                     public List<ACL> getDefaultAcl() {
  24.                         return ZooDefs.Ids.CREATOR_ALL_ACL;
  25.                     }
  26.                
  27.                     @Override
  28.                     public List<ACL> getAclForPath(final String path) {
  29.                         return ZooDefs.Ids.CREATOR_ALL_ACL;
  30.                     }
  31.                 });
  32.     }
  33.     client = builder.build();
  34.     //zookeeper 客户端开始启动
  35.     client.start();
  36.     try {
  37.         //zookeeper 客户端一直连接
  38.         if (!client.blockUntilConnected(zkConfig.getMaxSleepTimeMilliseconds() * zkConfig.getMaxRetries(), TimeUnit.MILLISECONDS)) {
  39.             client.close();
  40.             throw new KeeperException.OperationTimeoutException();
  41.         }
  42.         //CHECKSTYLE:OFF
  43.     } catch (final Exception ex) {
  44.         //CHECKSTYLE:ON
  45.         RegExceptionHandler.handleException(ex);
  46.     }
  47. }
复制代码
 
第二件事: 配置事件追踪数据库,用于保存job运行记录
ElasticJobTracingConfiguration.java
 
  1. /**
  2. * Create a bean of tracing DataSource.
  3. *
  4. * @param tracingProperties tracing Properties
  5. * @return tracing DataSource
  6. */
  7. @Bean("tracingDataSource")
  8. //spring中注入bean name 为tracingDataSource的job数据库连接信息
  9. public DataSource tracingDataSource(final TracingProperties tracingProperties) {
  10.     //获取elastic-job 数据库配置
  11.     DataSourceProperties dataSource = tracingProperties.getDataSource();
  12.     if (dataSource == null) {
  13.         return null;
  14.     }
  15.     HikariDataSource tracingDataSource = new HikariDataSource();
  16.     tracingDataSource.setJdbcUrl(dataSource.getUrl());
  17.     BeanUtils.copyProperties(dataSource, tracingDataSource);
  18.     return tracingDataSource;
  19. }
  20. /**
  21. * Create a bean of tracing configuration.
  22. *
  23. * @param dataSource required by constructor
  24. * @param tracingDataSource tracing ataSource
  25. * @return a bean of tracing configuration
  26. */
  27. @Bean
  28. @ConditionalOnBean(DataSource.class)
  29. @ConditionalOnProperty(name = "elasticjob.tracing.type", havingValue = "RDB")
  30. public TracingConfiguration<DataSource> tracingConfiguration(final DataSource dataSource, @Nullable final DataSource tracingDataSource) {
  31.     /**
  32.      * dataSource 是业务数据库
  33.      * tracingDataSource 是job数据库
  34.      * 当配置elasticjob.tracing.type = RDB时,如果单独配置job数据库是,默认使用job数据库作为job运行轨迹的记录
  35.      * 但这边同时业务数据库和job追踪数据库同时注入是,mybatis-plus 结合@Table 使用的时候,很有可能找不到正确对应的数据源
  36.      */
  37.     DataSource ds = tracingDataSource;
  38.     if (ds == null) {
  39.         ds = dataSource;
  40.     }
  41.     return new TracingConfiguration<>("RDB", ds);
  42. }
复制代码
 
通过elasticjob.tracing.type=RDB的配置开启事件追踪功能,这边job的事件追踪数据源可以和业务数据源配置不一样。
 
第三件事:解析所有job配置文件
ElasticJobBootstrapConfiguration.class
 
  1. public void createJobBootstrapBeans() {
  2.     //获取job配置
  3.     ElasticJobProperties elasticJobProperties = applicationContext.getBean(ElasticJobProperties.class);
  4.     //获取单利注册对象
  5.     SingletonBeanRegistry singletonBeanRegistry = ((ConfigurableApplicationContext) applicationContext).getBeanFactory();
  6.     //获取注入zookeeper 客户端
  7.     CoordinatorRegistryCenter registryCenter = applicationContext.getBean(CoordinatorRegistryCenter.class);
  8.     //获取job事件追踪
  9.     TracingConfiguration<?> tracingConfig = getTracingConfiguration();
  10.     //构造JobBootstraps
  11.     constructJobBootstraps(elasticJobProperties, singletonBeanRegistry, registryCenter, tracingConfig);
  12. }
复制代码
重要的是constructJobBootstraps 这个方法,来看下
  1. private void constructJobBootstraps(final ElasticJobProperties elasticJobProperties, final SingletonBeanRegistry singletonBeanRegistry,
  2.                                     final CoordinatorRegistryCenter registryCenter, final TracingConfiguration<?> tracingConfig) {
  3.     //遍历配置的每一个job
  4.     for (Map.Entry<String, ElasticJobConfigurationProperties> entry : elasticJobProperties.getJobs().entrySet()) {
  5.         ElasticJobConfigurationProperties jobConfigurationProperties = entry.getValue();
  6.         //校验 job class 和 type 都为空抛异常
  7.         Preconditions.checkArgument(null != jobConfigurationProperties.getElasticJobClass()
  8.                         || !Strings.isNullOrEmpty(jobConfigurationProperties.getElasticJobType()),
  9.                 "Please specific [elasticJobClass] or [elasticJobType] under job configuration.");
  10.         //校验 job class 和 type 都有 报相互排斥
  11.         Preconditions.checkArgument(null == jobConfigurationProperties.getElasticJobClass()
  12.                         || Strings.isNullOrEmpty(jobConfigurationProperties.getElasticJobType()),
  13.                 "[elasticJobClass] and [elasticJobType] are mutually exclusive.");
  14.         if (null != jobConfigurationProperties.getElasticJobClass()) {
  15.             //通过class 注入job
  16.             registerClassedJob(entry.getKey(), entry.getValue().getJobBootstrapBeanName(), singletonBeanRegistry, registryCenter, tracingConfig, jobConfigurationProperties);
  17.         } else if (!Strings.isNullOrEmpty(jobConfigurationProperties.getElasticJobType())) {
  18.             //通过type 注入job
  19.             registerTypedJob(entry.getKey(), entry.getValue().getJobBootstrapBeanName(), singletonBeanRegistry, registryCenter, tracingConfig, jobConfigurationProperties);
  20.         }
  21.     }
  22. }
复制代码
Job 有两种类型的注入,第一种是是class,配置成job的全路径信息注入  再来看看registerClassedJob 方法里的内容
  1. private void registerClassedJob(final String jobName, final String jobBootstrapBeanName, final SingletonBeanRegistry singletonBeanRegistry, final CoordinatorRegistryCenter registryCenter,
  2.                                 final TracingConfiguration<?> tracingConfig, final ElasticJobConfigurationProperties jobConfigurationProperties) {
  3.     //获取job配置
  4.     JobConfiguration jobConfig = jobConfigurationProperties.toJobConfiguration(jobName);
  5.     //配置job事件追踪
  6.     jobExtraConfigurations(jobConfig, tracingConfig);
  7.     //获取job类型
  8.     ElasticJob elasticJob = applicationContext.getBean(jobConfigurationProperties.getElasticJobClass());
  9.     //没有配置cron表达式 就初始化为OneOffJobBootstrap对象,一次性任务
  10.     if (Strings.isNullOrEmpty(jobConfig.getCron())) {
  11.         Preconditions.checkArgument(!Strings.isNullOrEmpty(jobBootstrapBeanName), "The property [jobBootstrapBeanName] is required for One-off job.");
  12.         singletonBeanRegistry.registerSingleton(jobBootstrapBeanName, new OneOffJobBootstrap(registryCenter, elasticJob, jobConfig));
  13.     } else {
  14.         //有配置cron表达式 就初始化为ScheduleJobBootstrap对象,定时任务
  15.         //设置bean name
  16.         String beanName = !Strings.isNullOrEmpty(jobBootstrapBeanName) ? jobBootstrapBeanName : jobConfig.getJobName() + "ScheduleJobBootstrap";
  17.         //注入ScheduleJobBootstrap对象为单利对象
  18.         singletonBeanRegistry.registerSingleton(beanName, new ScheduleJobBootstrap(registryCenter, elasticJob, jobConfig));
  19.     }
  20. }
复制代码
Class 类型注入的job有两种类型1.ScheduleJobBootstrap:定时任务类型的job。2.OneOffJobBootstrap:一定次job类型。  看下定义的new ScheduleJobBootstrap 方法
  1. public JobScheduler(final CoordinatorRegistryCenter regCenter, final ElasticJob elasticJob, final JobConfiguration jobConfig) {
  2.     Preconditions.checkArgument(null != elasticJob, "Elastic job cannot be null.");
  3.     this.regCenter = regCenter;
  4.     //获取job监听器
  5.     Collection<ElasticJobListener> jobListeners = getElasticJobListeners(jobConfig);
  6.     // 集成所有操作zookeeper 节点的services,job 监听器
  7.     setUpFacade = new SetUpFacade(regCenter, jobConfig.getJobName(), jobListeners);
  8.     //获取当前job名称
  9.     String jobClassName = JobClassNameProviderFactory.getProvider().getJobClassName(elasticJob);
  10.     //zookeeper节点 {namespace}/{jobclassname}/config 放置job配置信息
  11.     this.jobConfig = setUpFacade.setUpJobConfiguration(jobClassName, jobConfig);
  12.     // 集成所有操作zookeeper 节点的services
  13.     schedulerFacade = new SchedulerFacade(regCenter, jobConfig.getJobName());
  14.     jobFacade = new LiteJobFacade(regCenter, jobConfig.getJobName(), jobListeners, findTracingConfiguration().orElse(null));
  15.     //检验job配置
  16.     validateJobProperties();
  17.     //定义job执行器
  18.     jobExecutor = new ElasticJobExecutor(elasticJob, this.jobConfig, jobFacade);
  19.     //监听器里注入GuaranteeService
  20.     setGuaranteeServiceForElasticJobListeners(regCenter, jobListeners);
  21.     //创建定时任务,开始执行
  22.     jobScheduleController = createJobScheduleController();
  23. }
复制代码
 
看下createJobScheduleController
  1. private JobScheduleController createJobScheduleController() {
  2.     JobScheduleController result = new JobScheduleController(createScheduler(), createJobDetail(), getJobConfig().getJobName());
  3.     //注册job
  4.     JobRegistry.getInstance().registerJob(getJobConfig().getJobName(), result);
  5.     //注册器开始运行
  6.     registerStartUpInfo();
  7.     return result;
  8. }
复制代码
看下registerStartUpInfo方法
  1. public void registerStartUpInfo(final boolean enabled) {
  2.     //开始所有的监听器
  3.     listenerManager.startAllListeners();
  4.     //选举leader /{namespace}/leader/election/instance 放置选举出来的服务器
  5.     leaderService.electLeader();
  6.     //{namespace}/{ipservers} 设置enable处理
  7.     serverService.persistOnline(enabled);
  8.     //临时节点   /{namespave}/instances 放置运行服务实例信息
  9.     instanceService.persistOnline();
  10.     //开启一个异步服务
  11.     if (!reconcileService.isRunning()) {
  12.         reconcileService.startAsync();
  13.     }
  14. }
复制代码
这里实行的操作:1.开启所有监听器处理2.leader选举3.持久化节点数据4.开启异步服务  第四步:4.识别job类型,在zookeeper节点上处理job节点数据,运行定时任务job. 
  1. @Override
  2. public void run(final String... args) {
  3.     log.info("Starting ElasticJob Bootstrap.");
  4.     applicationContext.getBeansOfType(ScheduleJobBootstrap.class).values().forEach(ScheduleJobBootstrap::schedule);
  5.     log.info("ElasticJob Bootstrap started.");
  6. }
复制代码
获取到所有的定时任务job(ScheduleJobBootstrap类型),执行schedule方法,底层实际使用quartz框架运行定时任务。     
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

tsx81428

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表