xxl-job架构原理讲解

打印 上一主题 下一主题

主题 1020|帖子 1020|积分 3060

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
1、调度中心

调度中心是一个单独的Web服务,主要是用来触发定时使命的执行
它提供了一些页面操纵,我们可以很方便地去管理这些定时使命的触发逻辑
调度中心依赖数据库,以是数据都是存在数据库中的
调度中心也支持集群模式,但是它们所依赖的数据库必须是同一个
以是同一个集群中的调度中心实例之间是没有任何通信的,数据都是通过数据库共享的


2、执行器

执行器是用来执行具体的使命逻辑的
执行器你可以理解为就是平常开辟的服务,一个服务实例对应一个执行器实例
每个执行器有自己的名字,为了方便,你可以将执行器的名字设置成服务名
3、使命

一个执行器中也是可以有多个使命的
   总的来说,调用中心是用来控订定时使命的触发逻辑,而执行器是具体执行使命的,这是一种使命和触发逻辑分离的设计思想,这种方式的好处就是使使命更加灵活,可以随时被调用,还可以被不同的调度规则触发。
  

来个Demo

1、搭建调度中心

调度中心搭建很简朴,先下载源码
   github.com/xuxueli/xxl…
  然后改一下数据库连接信息,执行一下在项目源码中的/doc/db下的sql文件


启动可以打成一个jar包,或者本地启动就是可以的
启动完成之后,访问下面这个地址就可以访问到控制台页面了
   http://localhost:8080/xxl-job-admin/toLogin
  用户名暗码默认是 admin/123456
2、执行器和使命添加

添加一个名为sanyou-xxljob-demo执行器


使命添加


执行器选择我们刚刚添加的,指定使命名称为TestJob,corn表达式的意思是每秒执行一次
创建完之后需要启动一下使命,默认是关闭状态,也就不会执行


   创建执行器和使命其实就是CRUD,并没有复杂的业务逻辑
  按照如上配置的整个Demo的意思就是
每隔1s,执行一次sanyou-xxljob-demo这个执行器中的TestJob使命
3、创建执行器和使命

引入依赖
  1. <dependencies>
  2.     <dependency>
  3.         <groupId>org.springframework.boot</groupId>
  4.         <artifactId>spring-boot-starter-web</artifactId>
  5.         <version>2.2.5.RELEASE</version>
  6.     </dependency>
  7.     <dependency>
  8.         <groupId>com.xuxueli</groupId>
  9.         <artifactId>xxl-job-core</artifactId>
  10.         <version>2.4.0</version>
  11.     </dependency>
  12. </dependencies>
复制代码
配置XxlJobSpringExecutor这个Bean
  1. @Configuration
  2. public class XxlJobConfiguration {
  3.     @Bean
  4.     public XxlJobSpringExecutor xxlJobExecutor() {
  5.         XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
  6.         //设置调用中心的连接地址
  7.         xxlJobSpringExecutor.setAdminAddresses("http://localhost:8080/xxl-job-admin");
  8.         //设置执行器的名称
  9.         xxlJobSpringExecutor.setAppname("sanyou-xxljob-demo");
  10.         //设置一个端口,后面会讲作用
  11.         xxlJobSpringExecutor.setPort(9999);
  12.         //这个token是保证访问安全的,默认是这个,当然可以自定义,
  13.         // 但需要保证调度中心配置的xxl.job.accessToken属性跟这个token是一样的
  14.         xxlJobSpringExecutor.setAccessToken("default_token");
  15.         //任务执行日志存放的目录
  16.         xxlJobSpringExecutor.setLogPath("./");
  17.         return xxlJobSpringExecutor;
  18.     }
  19. }
复制代码
XxlJobSpringExecutor这个类的作用,背面会着重讲
通过@XxlJob指定一个名为TestJob的使命,这个使命名需要跟前面页面配置的对应上
  1. @Component
  2. public class TestJob {
  3.     private static final Logger logger = LoggerFactory.getLogger(TestJob.class);
  4.     @XxlJob("TestJob")
  5.     public void testJob() {
  6.         logger.info("TestJob任务执行了。。。");
  7.     }
  8. }
复制代码
以是假如顺利的话,每隔1s钟就会打印一句TestJob使命执行了。。。
启动项目,注意修改一下端口,因为调用中心默认也是8080,本地起会端口冲突
终极执行结果如下,符合预期


讲完概念和使用部门,接下来就来好好讲一讲Xxl-Job核心的实现原理
从执行器启动说起

前面Demo中使用到了一个很重要的一个类
   XxlJobSpringExecutor
  这个类就是整个执行器启动的入口


这个类实现了SmartInitializingSingleton接口
以是颠末Bean的生命周期,肯定会调用afterSingletonsInstantiated这个方法的实现
这个方法干了很多初始化的事,这里我挑三个重要的讲,别的的比及具体的功能的时候再提
1、初始化JobHandler

JobHandler是个什么?
所谓的JobHandler其实就是一个定时使命的封装


一个定时使命会对应一个JobHandler对象
当执行器执行使命的时候,就会调用JobHandler的execute方法
JobHandler有三种实现:


  • MethodJobHandler
  • GlueJobHandler
  • ScriptJobHandler
MethodJobHandler是通过反射来调用方法执行使命


以是MethodJobHandler的使命的实现就是一个方法,刚好我们demo中的例子使命其实就是一个方法
以是Demo中的使命终极被封装成一个MethodJobHandler
GlueJobHandler比力有意思,它支持动态修改使命执行的代码
当你在创建使命的时候,需要指定运行模式为GLUE(Java)


之后需要在操纵按钮点击GLUE IDE编写Java代码


代码必须得实现IJobHandler接口,之后使命执行的时候就会执行execute方法的实现
假如你需要修改使命的逻辑,只需要重新编辑即可,不需要重启服务
ScriptJobHandler,通过名字也可以看出,是专门处理一些脚本的
运行模式除了BEAN和GLUE(Java)之外,别的都是脚本模式
而本节的主旨,所谓的初始化JobHandler就是指,执行器启动的时候会去Spring容器中找到加了@XxlJob注解的Bean
解析注解,然后封装成一个MethodJobHandler对象,终极存到XxlJobSpringExecutor成员变量的一个本地的Map缓存中


缓存key就是使命的名字


至于GlueJobHandler和ScriptJobHandler都是使命触发时才会创建
除了上面这几种,你也自己实现JobHandler,手动注册到JobHandler的缓存中,也是可以通过调度中心触发的
2、创建一个Http服务器

除了初始化JobHandler之外,执行器还会创建一个Http服务器
这个服务器端标语就是通过XxlJobSpringExecutor配置的端口,demo中就是设置的是9999,底层是基于Netty实现的


这个Http服务端会接收来自调度中心的哀求
当执行器接收到调度中心的哀求时,会把哀求交给ExecutorBizImpl来处理


这个类非常重要,所有调度中心的哀求都是这里处理的
ExecutorBizImpl实现了ExecutorBiz接口
当你翻源码的时候会发现,ExecutorBiz还有一个ExecutorBizClient实现


ExecutorBizClient的实现就是发送http哀求,以是这个实现类是在调度中心使用的,用来访问执行器提供的http接口


3、注册到调度中心

当执行器启动的时候,会启动一个注册线程,这个线程会往调度中心注册当前执行器的信息,包括两部门数据


  • 执行器的名字,也就是设置的appname
  • 执行器所在呆板的ip和端口,这样调度中心就可以访问到这个执行器提供的Http接口
前面提到每个服务实例都会对应一个执行器实例,以是调用中心会生存每个执行器实例的地址


   这里你可以把调度中心的功能类比成注册中心
  使命触发原理

弄明白执行器启动时干了哪些事,接下来讲一讲Xxl-Job最最核心的功能,那就是使命触发的原理
使命触发原理我会分下面5个小点来讲解


  • 使命怎样触发?
  • 快慢线程池的异步触发使命优化
  • 怎样选择执行器实例?
  • 执行器怎样去执行使命?
  • 使命执行结果的回调
1、使命怎样触发?

调度中心在启动的时候,会开启一个线程,这个线程的作用就是来计算使命触发机遇,这里我把这个线程称为调度线程
这个调度线程会去查询xxl_job_info这张表
这张表存了使命的一些基本信息和使命下一次执行的时间
调度线程会去查询下一次执行的时间 <= 当前时间 + 5s的使命
这个5s是XxlJob写死的,被称为预读时间,提前读出来,保证使命能准时触发
举个例子,假设当前时间是2023-11-29 08:00:10,这里的查询就会查出下一次使命执行时间在2023-11-29 08:00:15之前执行的使命


查询到使命之后,调度线程会去将这些使命根据执行时间分别为三个部门:


  • 当前时间已经超过使命下一次执行时间5s以上,也就是需要在2023-11-29 08:00:05(不包括05s)之前的执行的使命
  • 当前时间已经超过使命下一次执行时间,但是但不足5s,也就是在2023-11-29 08:00:05和2023-11-29 08:00:10(不包括10s)之间执行的使命
  • 还未到触发时间,但是肯定是5s内就会触发执行的


对于第一部门的已经超过5s以上时间的使命,会根据使命配置的调度逾期计谋来选择要不要执行


调度逾期计谋就两种,就是字面意思


  • 直接忽略这个已颠末期的使命
  • 立马执行一次这个逾期的使命
对于第二部门的超时时间在5s以内的使命,就直接立马执行一次,之后假如判断使命下一次执行时间就在5s内,会直接放到一个时间轮内里,等候下一次触发执行
对于第三部门使命,由于还没到执行时间,以是不会立马执行,也是直接放到时间轮内里,等候触发执行
当这批使命处理完成之后,岂论是前面是什么情况,调度线程都会去重新计算每个使命的下一次触发时间,然后更新xxl_job_info这张表的下一次执行时间
到此,一次调度的计算就算完成了
之后调度线程还会继承重复上面的步骤,查使命,调度使命,更新使命下次执行时间,不停死循环下去,这就实现了使命到了执行时间就会触发的功能
这里在使命触发的时候还有一个很有意思的细节
由于调度中心可以是集群的情势,每个调度中心实例都有调度线程,那么怎样保证使命在同一时间只会被其中的一个调度中心触发一次?
我猜你第一时间肯定想到分布式锁,但是怎么加呢?
XxlJob实现就比力有意思了,它是基于八股文中常说的通过数据库来实现的分布式锁的
在调度之前,调度线程会实验执行下面这句sql


就是这个sql
   select * from xxl_job_lock where lock_name = 'schedule_lock' for update
  一旦执行乐成,阐明当前调度中心乐成抢到了锁,接下来就可以执行调度使命了
当调度使命执行完之后再去关闭连接,从而释放锁
由于每次执行之前都需要去获取锁,这样就保证在调度中心集群中,同时只有一个调度中心执行调度使命


2、快慢线程池的异步触发使命优化

当使命达到了触发条件,并不是由调度线程直接去触发执行器的使命执行
调度线程会将这个触发的使命交给线程池去执行
以是上图中的最后一部门触发使命执行其实是线程池异步去执行的
那么,为什么要使用线程池异步呢?
主要是因为触发使命,需要通过Http接口调用具体的执行器实例去触发使命


这一过程肯定会耗费时间,假如调度线程去做,就会耽误调度的服从
以是就通过异步线程去做,调度线程只负责判断使命是否需要执行
并且,Xxl-Job为了进一步优化使命的触发,将这个触发使命执行的线程池分别成快线程池慢线程池两个线程池


在调用执行器的Http接口触发使命执行的时候,Xxl-Job会去记录每个使命的触发所耗费的时间
注意并不是使命执行时间,只是整个Http哀求耗时时间,这是因为执行器执行使命是异步执行的,以是整个时间不包括使命执行时间,这个背面会具体说
当使命一次触发的时间超过500ms,那么这个使命的慢次数就会加1
假如这个使命一分钟内触发的慢次数超过10次,接下来就会将触发使命交给慢线程池去执行
以是快慢线程池就是避免那种频繁触发并且每次触发时间还很长的使命壅闭其它使命的触发的情况发生
3、怎样选择执行器实例?

上一节说到,当使命需要触发的时候,调度中心会向执行器发送Http哀求,执行器去执行具体的使命
那么问题来了
   由于一个执行器会有很多实例,那么应该向哪个实例哀求?
  这其实就跟使命配置时设置的路由计谋有关了


从图上可以看出xxljob支持多种路由计谋
除了分片广播,别的的具体的算法实现都是通过ExecutorRouter的实现类来实现的


这里简朴讲一讲各种算法的原理,有爱好的小搭档可以去看看内部的实现细节
第一个、最后一个、轮询、随机都很简朴,没什么好说的
同等性Hash讲起来比力复杂,你可以先看看这篇文章,再去检察Xxl-Job的代码实现
   zhuanlan.zhihu.com/p/470368641
  最不经常使用(LFU:Least Frequently Used):Xxl-Job内部会有一个缓存,统计每个使命每个地址的使用次数,每次都选择使用次数最少的地址,这个缓存每隔24小时重置一次
近来最久未使用(LRU:Least Recently Used):将地址存到LinkedHashMap中,它使用LinkedHashMap可以根据元素访问(get/put)顺序来给元素排序的特性,快速找到近来最久未使用(未访问)的节点
故障转移:调度中心都会去哀求每个执行器,只要能接收到相应,阐明执行器正常,那么使命就会交给这个执行器去执行
繁忙转移:调度中心也会去哀求每个执行器,判断执行器是不是正在执行当前需要执行的使命(使命执行时间过长,导致上一次使命还没执行完,下一次又触发了),假如在执行,阐明繁忙,不能用,否则就可以用
分片广播:XxlJob给每个执行器分配一个编号,从0开始递增,然后向所有执行器触发使命,告诉每个执行器自己的编号和统共执行器的数据
我们可以通过XxlJobHelper#getShardIndex获取到编号,XxlJobHelper#getShardTotal获取到执行器的总数据量
分片广播就是将使命量分散到各个执行器,每个执行器只执行一部门使命,加快使命的处理
举个例子,好比你如今需要处理30w条数据,有3个执行器,此时使用分片广播,那么此时可将使命分成3分,每份10w条数据,执行器根据自己的编号选择对应的那份10w数据处理


当选择好了具体的执行器实例之后,调用中心就会携带一些触发的参数,发送Http哀求,触发使命
4、执行器怎样去执行使命?

相信你肯定记得我前面在说执行器启动是会创建一个Http服务器的时候提到这么一句
   当执行器接收到调度中心的哀求时,会把哀求交给ExecutorBizImpl来处理
  以是前面提到的故障转移和繁忙转移哀求执行器进行判断,终极执行器也是交给ExecutorBizImpl处理的
执行器处理触发哀求是这个ExecutorBizImpl的run方法实现的


当执行器接收到哀求,在正常情况下,执行器会去为这个使命创建一个单独的线程,这个线程被称为JobThread
   每个使命在触发的时候都有单独的线程去执行,保证不同的使命执行互不影响
  之后使命并不是直接交给线程处理的,而是直接放到一个内存队列中,线程直接从队列中获取使命


这里我相信你肯定有个疑惑
   为什么不直接处理,而是交给队列,从队列中获取使命呢?
  那就得讲讲不正常的情况了
假如调度中心选择的执行器实例正在处理定时使命,那么此时该怎么处理呢?**
这时就跟壅闭处理计谋有关了


壅闭处理计谋统共有三种:


  • 单机串行
  • 丢弃后续调度
  • 覆盖之前调度
单机串行的实现就是将使命放到队列中,由于队列是先辈先出的,以是就实现串行,这也是为什么放在队列的原因
丢弃调度的实现就是执行器什么事都不用干就可以了,天然而然使命就丢了
覆盖之前调度的实现就很暴力了,他是直接重新创建一个JobThread来执行使命,并且实验打断之前的正在处理使命的JobThread,丢弃之前队列中的使命
   打断是通过Thread#interrupt方法实现的,以是正在处理的使命还是有大概继承运行,并不是说一打断正在运行的使命就终止了
  这里需要注意的一点就是,壅闭处理计谋是对于单个执行器上的使命来见效的,不同执行器实例上的同一个使命是互不影响的
好比说,有一个使命有两个执行器A和B,路由计谋是轮询
使命第一次触发的时候选择了执行器实例A,由于使命执行时间长,使命第二次触发的时候,执行器的路由到了B,此时A的使命还在执行,但是B感知不到A的使命在执行,以是此时B就直接执行了使命
以是此时你配置的什么壅闭处理计谋就没什么用了
假如业务中需要保证定时使命同一时间只有一个能运行,需要把使命路由到同一个执行器上,好比路由计谋就选择第一个
5、使命执行结果的回调

当使命处理完成之后,执行器会将使命执行的结果发送给调度中心


如上图所示,这整个过程也是异步化的


  • JobThread会将使命执行的结果发送到一个内存队列中
  • 执行器启动的时候会开启一个处发送使命执行结果的线程:TriggerCallbackThread
  • 这个线程会不绝地从队列中获取所有的执行结果,将执行结果批量发送给调度中心
  • 调用中心接收到哀求时,会根据执行的结果修改这次使命的执行状态和进行一些后续的事,好比失败了是否需要重试,是否有子使命需要触发等等
到此,一次使命的就算真正处理完成了






免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

宝塔山

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表