ToB企服应用市场:ToB评测及商务社交产业平台

标题: xxl-job定时调度任务Java代码分析 [打印本页]

作者: 熊熊出没    时间: 2022-12-20 21:23
标题: xxl-job定时调度任务Java代码分析
简介

用xxl-job做后台任务管理, 主要是快速解决定时任务的HA问题, 项目代码量不大, 功能精简, 没有特殊依赖. 因为产品中用到了这个项目, 上午花了点时间研究了一下运行机制. 把看到的记一下.
环境
  1. <dependency>
  2.     <groupId>com.xuxueli</groupId>
  3.     <artifactId>xxl-job-core</artifactId>
  4.     <version>${最新稳定版本}</version>
  5. </dependency>
复制代码
运行需要 JDK1.8, MySQL5.7
数据库结构

代码结构

项目文件结构如下
  1. ├───doc
  2. │   ├───db                                               # 初始化的sql
  3. │   └───images
  4. ├───xxl-job-admin                                        # 运行的服务端模块, 提供界面和调度
  5. │   └───src
  6. │       ├───main
  7. │       │   ├───java
  8. │       │   │   └───com
  9. │       │   │       └───xxl
  10. │       │   │           └───job
  11. │       │   │               └───admin
  12. │       │   │                   ├───controller
  13. │       │   │                   │   ├───annotation
  14. │       │   │                   │   ├───interceptor
  15. │       │   │                   │   └───resolver
  16. │       │   │                   ├───core
  17. │       │   │                   ├───dao
  18. │       │   │                   └───service
  19. │       │   │                       └───impl
  20. │       │   └───resources
  21. │       │       ├───i18n                                 # 多国化, 简繁英
  22. │       │       ├───mybatis-mapper                       # xml形式的mapper
  23. │       │       ├───static                               # 前端静态文件
  24. │       │       └───templates                            # Freemarker模板
  25. │       └───test
  26. │           └───java
  27. ├───xxl-job-core                                         # 公用jar包, 模块内部依赖
  28. │   └───src
  29. │       └───main
  30. │           └───java
  31. └───xxl-job-executor-samples
  32.     ├───xxl-job-executor-sample-frameless                # 任务执行层示例
  33.     │   └───src
  34.     │       ├───main
  35.     │       │   ├───java
  36.     │       │   └───resources
  37.     │       └───test
  38.     │           └───java
  39.     └───xxl-job-executor-sample-springboot               # 使用SpringBoot的执行层示例
  40.         └───src
  41.             ├───main
  42.             │   ├───java
  43.             │   └───resources
  44.             └───test
复制代码
运行机制

执行端需要准备以下信息
非 Spring 的场景

通过调用 FrameLessXxlJobConfig.getInstance().initXxlJobExecutor() 这个方法将 XxlJobSimpleExecutor 实例化, 并注册到xxl_job服务端
Spring 场景

远程调用服务

xxl_job 并未使用Spring的服务机制, 而是内部实现了一个侦听指定IP+端口的服务. 这个实现对应的类是 EmbedServer, 服务基于 Netty, 核心代码是
  1. // start server
  2. ServerBootstrap bootstrap = new ServerBootstrap();
  3. bootstrap.group(bossGroup, workerGroup)
  4.         .channel(NioServerSocketChannel.class)
  5.         .childHandler(new ChannelInitializer<SocketChannel>() {
  6.             @Override
  7.             public void initChannel(SocketChannel channel) throws Exception {
  8.                 channel.pipeline()
  9.                         .addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS))  // beat 3N, close if idle
  10.                         .addLast(new HttpServerCodec())
  11.                         .addLast(new HttpObjectAggregator(5 * 1024 * 1024))  // merge request & reponse to FULL
  12.                         .addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
  13.             }
  14.         })
  15.         .childOption(ChannelOption.SO_KEEPALIVE, true);
复制代码
这行代码注册了内部的XxlJob方法
  1. .addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool)
复制代码
处理远程请求时, 在下面的代码中, 通过executorBiz.run(triggerParam)调用XxlJob方法
  1. private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {
  2.     //...
  3.     // services mapping
  4.     try {
  5.         switch (uri) {
  6.             case "/beat":
  7.                 return executorBiz.beat();
  8.             case "/idleBeat":
  9.                 IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
  10.                 return executorBiz.idleBeat(idleBeatParam);
  11.             case "/run":
  12.                 TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
  13.                 return executorBiz.run(triggerParam);
  14.             case "/kill":
  15.                 KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
  16.                 return executorBiz.kill(killParam);
  17.             case "/log":
  18.                 LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
  19.                 return executorBiz.log(logParam);
  20.             default:
  21.                 return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping(" + uri + ") not found.");
  22.         }
  23.     } catch (Exception e) {
  24.     //...
  25. }
复制代码
锁机制

通过select ... for update实现的, 这个表并没有放到 MyBatis, 在 JobScheduleHelper 中, 通过
  1. preparedStatement = conn.prepareStatement(  "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
  2. preparedStatement.execute();
复制代码
得到锁, 在方法末尾释放
  1. // close PreparedStatement
  2. if (null != preparedStatement) {
  3.     try {
  4.         preparedStatement.close();
  5.     } catch (SQLException e) {
  6.         if (!scheduleThreadToStop) {
  7.             logger.error(e.getMessage(), e);
  8.         }
  9.     }
  10. }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4