一、分布式任务调治系统实现思绪
现在延迟任务系统已经改造成分布式系统了,一个服务多个节点,在延迟任务系统中有个业务逻辑是定时的将未来数据集合中的数据革新到消费者队列,每个节点都在执行,势必会产生冲突,我们必要有同一的分布式任务调治job系统。对定时革新的任务进行同一调治。- @Scheduled(cron = "0/1 * * * * ?") //每个延迟任务节点都在独立的 刷新
- public void refresh() {}
复制代码
必要有同一的分布式任务调治系统:job节点
整体思绪:
每一个节点都在各自革新,如上图。我们要建立一个同一的调治系统,也就是Job节点,在Job节点中做一个定时,每隔一秒钟调用下方每一个节点的定时革新方法,固然每次调用只是调用一个节点的定时方法 。这个调用我们将使用Feign的接口进行调用。之以是使用Feign接口进行调用 ,我们还可以使用它对负载平衡的支持。来分摊每个节点定时革新的压力。
Job 节点进行定时,每隔一秒钟通过Feign接口来调用某一个节点的定时革新方法
首先第一步要做的就是:将延迟系统的定时方法抽取成外部接口
步骤如下:
1:延迟任务系统暴露定时革新的接口refresh,使得job可以通过feign调用
2:开发job系统,在job系统中来定时的革新,革新逻辑就是通过feign调用schedule-service延迟任务服务的革新方法refresh,同时还能使用feign的负载平衡减轻每个节点的压力。
二、延迟任务系统抽取接口
延迟任务系统抽取refresh接口步骤:
步骤1:在chongba_schedule_service模块中,找到com.chongba.schedule.inf包下的TaskService接口,在该接口中添加定时革新的refresh方法
- /**
- * 未来数据集合向消费者队列定时刷新
- */
- public void refresh();
复制代码
步骤2:在chongba_schedule_service模块中,找到com.chongba.schedule.controller包下的TaskController,添加定时革新的web层接口- @GetMapping("/refresh")
- public ResponseMessage refresh(){
- try {
- taskService.refresh();
- log.info("refresh----");
- return ResponseMessage.ok("");
- } catch (Exception e) {
- log.error("refresh exception");
- return ResponseMessage.error(e.getMessage());
- }
- }
复制代码
步骤3:在chongba_schedule_feign模块中,找到com.chongba.feign包下的TaskServiceClient接口,添加定时革新的方法refresh
- @GetMapping("task/refresh")
- public ResponseMessage refresh();
复制代码
三、分布式任务调治系统工程搭建
任务调治系统工程搭建步骤如下:
步骤1:在父工程chongba_parent下创建子模块:chongba_schedule_job
步骤2:job节点必要通过feign调用延迟任务服务中的定时革新方法,以是必要在chongba_schedule_job的pom文件中添加chongba_schedule_feign的依赖- <dependencies>
- <dependency>
- <groupId>com.chongba</groupId>
- <artifactId>chongba_schedule_feign</artifactId>
- <version>1.0-SNAPSHOT</version>
- </dependency>
- </dependencies>
复制代码
步骤3:在chongba_schedule_job模块src/main/java下创建启启动类:com.chongba.job.JobApplication,注意包,
注意:在job系统中必要通过feign调用延迟任务服务,通过pom我们已经将feign的接口引入了,因此在启动类上必要开启feign的客户端,但同时必要注意的是:feign的接口在包:com.chongba.feign下,而我们当前在:com.chongba.job包下,必要重新指定。- @SpringBootApplication
- @EnableFeignClients(basePackages = {"com.chongba.feign"})
- public class JobApplication {
-
- public static void main(String[] args) {
- SpringApplication.run(JobApplication.class,args);
- }
- }
复制代码
步骤4:由于job系统必要使用feign去调用延迟任务服务,因此job系统必要进行服务发现,在src/main/resources下创建application.yml配置文件并添加如下配置:- server:
- port: 8082
- spring:
- application:
- name: job #指定服务名
- cloud:
- consul:
- host: 127.0.0.1
- port: 8500
- discovery:
- register: false #不需要注册到consul
复制代码
步骤5:在包com.chongba.job下创建同一的job服务JobService,添加同一的定时革新方法refresh()
- @Service
- @Slf4j
- public class JobService {
- @Autowired
- private TaskServiceClient taskServiceClient;
-
- @Scheduled(cron = "*/1 * * * * ?") //注意要将chongba_schedule_service中的@Scheduled注释
- public void refresh(){
- try {
- ResponseMessage refresh = taskServiceClient.refresh();
- log.info("refresh---,{}",refresh);
- } catch (Exception e) {
- log.error("refresh exception {}",e.getMessage());
- }
- }
- }
复制代码
注意:必要从chongba_schedule_service模块中拷贝logback-spring.xml日记配置文件到job系统,同时必要将chongba_schedule_service模块中TaskServiceImpl类中refresh()方法上的定时任务注解@Scheduled注释
步骤6:在chongba_schedule_job模块中开启定时任务注解的支持,在启动类JobApplication上添加注解- @SpringBootApplication
- @EnableFeignClients(basePackages = {"com.chongba.feign"})
- @EnableScheduling
- public class JobApplication {
- public static void main(String[] args) {
- SpringApplication.run(JobApplication.class,args);
- }
- }
复制代码
步骤7:测试
1:在chongba_schedule_service模块中启动ScheduleApplication,在81端口启动,启动完成后修改application.yml中的端口为82,再次启动ScheduleApplication
2:在chongba_schedule_job模块中启动JobApplication,查看测试效果
效果预期:job中每秒革新一次,81/82两个节点中分访问压力。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |