通过surging的后台托管服务编写任务调度并支持规则引擎自定义脚本 ...

打印 上一主题 下一主题

主题 595|帖子 595|积分 1785

简介

     过去,如果在业务中需要处理任务调度的时候,大家都会使用第三方的任务调度组件,而第三方组件有一套自己的规则,在微服务的中显得那么格格不入,这样就会造成代码臃肿,耦合性高,如果有分布式还需要搭建新的分布式环境,如果把任务调度做成组件服务,这个就完全满足了微服务的模块化,组件化,而下面谈的是在surging 中如何支持规则引擎自定义脚本。
调度频率设置

       首先在开始之前,先看看如何通过脚本分配多种调度计划,先看下表:
方法描述EveryMinute()每分钟执行一次任务EveryFiveMinutes();每五分钟执行一次任务EveryTenMinutes(); 每十分钟执行一次任务EveryThirtyMinutes()每半小时执行一次任务Hourly();每小时执行一次任务HourlyAt(10)每一个小时的第 10 分钟运行一次Daily()每到午夜执行一次任务DailyAt("3:00")在 3:00 执行一次任务TwiceDaily(1, 3)在 1:00 和 3:00 分别执行一次任务Weekly()每周执行一次任务Monthly()每月执行一次任务MonthlyOn(4, "3:00")在每个月的第四天的 3:00 执行一次任务Quarterly()每季度执行一次任务Yearly()每年执行一次任务Timezone("utc")设置utc时区 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 


举个例子,在工作日每三秒在时间8:00-23:30内执行任务。脚本如下:
  1. parser.TimeZone(""utc"")
  2.       .Weekdays()<br>      .SecondAt(3)<br>       .Between(""8:00"", ""23:30"")
复制代码
 额外的限制条件列表如下:
方法描述Weekdays()限制任务在工作日Sundays()限制任务在星期日Mondays()限制任务在星期一Tuesdays()限制任务在星期二Wednesdays()限制任务在星期三Thursdays()限制任务在星期四Fridays()限制任务在星期五Saturdays()限制任务在星期六When( function(lastExecTime))限制任务基于一个script脚本返回为真的验证Skip( function(lastExecTime))限制任务基于一个script脚本返回为假的验证 
 
 
 
 
 
 
 
 
 
 
 
举个例子,在工作日每三秒在时间8:00-23:30内执行任务。如果设置When返回为true,skip返回false 就会执行,脚本如下:
  1. parser.TimeZone(""utc"")
  2.        .When(function(lastExecTime){
  3.                 return true;
  4.             })
  5.        .Skip(
  6.              function(lastExecTime){
  7.                 return false;
  8.             })
  9.       .Weekdays()
  10.       .SecondAt(3)
  11.        .Between(""8:00"", ""23:30"")
复制代码
然后在function 脚本中支持DateUtils对象,可以针对lastExecTime进行逻辑判断,比如是否是周末:DateUtils.IsWeekend(lastExecTime), 是否是今天DateUtils.IsToday(lastExecTime),代码如下:
 
  1. parser.TimeZone(""utc"")
  2.        .When(function(lastExecTime){
  3.                return DateUtils.IsToday(lastExecTime);
  4.             })
  5.        .Skip(
  6.              function(lastExecTime){
  7.                 return DateUtils.IsWeekend(lastExecTime);
  8.             })
  9.       .Weekdays()
  10.       .SecondAt(3)
  11.        .Between(""8:00"", ""23:30"")
复制代码
 
编写调度服务

surging微服务引擎是支持后台管理托管服务的,如果要基于BackgroundService编写任务调度,那服务就要继承BackgroundServiceBehavior,还要继承ISingleInstance以设置注入单例模式,
首先,创建接口服务,这样就可以远程添加任务,开启关闭服务了,代码如下:
  1.    [ServiceBundle("Background/{Service}")]
  2.     public interface IWorkService : IServiceKey
  3.     {
  4.         Task<bool> AddWork(Message message);
  5.          Task StartAsync();
  6.         Task StopAsync();
  7.     }
复制代码
然后创建业务领域服务,以下代码是通过规则引擎自定义脚本设置执行频率,并且可以设置execsize 以标识同时执行任务的大小,通过以下业务逻辑代码大家可以扩展支持持久化。
  1. public class WorkService : BackgroundServiceBehavior, IWorkService, ISingleInstance
  2.     {
  3.         private readonly ILogger<WorkService> _logger;
  4.         private readonly Queue<Tuple<Message, RulesEngine.RulesEngine, SchedulerRuleWorkflow>> _queue = new Queue<Tuple<Message, RulesEngine.RulesEngine, SchedulerRuleWorkflow>>();
  5.         private readonly ConcurrentDictionary<string, DateTime> _keyValuePairs = new ConcurrentDictionary<string, DateTime>();
  6.         private readonly IServiceProxyProvider _serviceProxyProvider;
  7.         private AtomicLong _atomic=new AtomicLong(1);
  8.         private const int EXECSIZE = 1;
  9.         private CancellationToken _token;
  10.         public WorkService(ILogger<WorkService> logger, IServiceProxyProvider serviceProxyProvider)
  11.         {
  12.             _logger = logger;
  13.             _serviceProxyProvider = serviceProxyProvider;
  14.             /*   var script = @"parser
  15.                                .Weekdays().SecondAt(3).Between(""8:00"", ""22:00"")";*/
  16.             var script = @"parser
  17.                               .TimeZone(""utc"")
  18.                                .When(
  19.                               function(lastExecTime){
  20.                 return DateUtils.IsToday(lastExecTime);
  21.             }).Skip(
  22.              function(lastExecTime){
  23.                 return DateUtils.IsWeekend(lastExecTime);
  24.             }).Weekdays().SecondAt(3).Between(""8:00"", ""23:30"")";
  25.             var ruleWorkflow = GetSchedulerRuleWorkflow(script);
  26.             var messageId = Guid.NewGuid().ToString();
  27.             _keyValuePairs.AddOrUpdate(messageId, DateTime.Now, (key, value) => DateTime.Now);
  28.             _queue.Enqueue(new Tuple<Message, RulesEngine.RulesEngine, SchedulerRuleWorkflow>(new Message() { MessageId= messageId,Config=new SchedulerConfig() {  IsPersistence=true} }, GetRuleEngine(ruleWorkflow), ruleWorkflow));
  29.         }
  30.         public  Task<bool> AddWork(Message message)
  31.         {
  32.             var ruleWorkflow = GetSchedulerRuleWorkflow(message.Config.Script);
  33.             _keyValuePairs.AddOrUpdate(message.MessageId, DateTime.Now, (key, value) => DateTime.Now);
  34.             _queue.Enqueue(new Tuple<Message, RulesEngine.RulesEngine, SchedulerRuleWorkflow>(message, GetRuleEngine(ruleWorkflow), ruleWorkflow));
  35.             return Task.FromResult(true);
  36.         }
  37.         protected override async  Task ExecuteAsync(CancellationToken stoppingToken)
  38.         {
  39.             try
  40.             {
  41.                 _token = stoppingToken;
  42.                 _logger.LogInformation("Worker running at: {time}", DateTimeOffset.Now);
  43.                 _queue.TryDequeue(out Tuple<Message, RulesEngine.RulesEngine, SchedulerRuleWorkflow>? message);
  44.                 if (message != null)
  45.                 {
  46.                     var parser = await GetParser(message.Item3, message.Item2);
  47.                     await PayloadSubscribe(parser, message.Item1, message.Item2, message.Item3);
  48.                     _keyValuePairs.TryGetValue(message.Item1.MessageId, out DateTime dateTime);
  49.                     parser.Build(dateTime == DateTime.MinValue ? DateTime.Now : dateTime);
  50.                 }
  51.                 if (!_token.IsCancellationRequested && (message == null || _atomic.GetAndAdd(1) == EXECSIZE))
  52.                 {
  53.                     _atomic = new AtomicLong(1);
  54.                     await Task.Delay(1000, stoppingToken);
  55.                 }
  56.             }
  57.             catch (Exception ex){
  58.                 _logger.LogError("WorkService execute error, message:{message} ,trace info:{trace} ", ex.Message, ex.StackTrace);
  59.             }
  60.         }
  61.         public async Task StartAsync()
  62.         {
  63.             if (_token.IsCancellationRequested)
  64.             {
  65.                 await base.StartAsync(_token);
  66.             }
  67.         }
  68.         public async Task StopAsync()
  69.         {
  70.             if (!_token.IsCancellationRequested)
  71.             {
  72.                await  base.StopAsync(_token);
  73.             }
  74.         }
  75.         private async Task PayloadSubscribe(RulePipePayloadParser parser, Message message, RulesEngine.RulesEngine rulesEngine, SchedulerRuleWorkflow ruleWorkflow)
  76.         {
  77.             parser.HandlePayload().Subscribe(async (temperature) =>
  78.             {
  79.                 try
  80.                 {
  81.                     if (temperature)
  82.                     {
  83.                        await  ExecuteByPlanAsyn(message);
  84.                         _logger.LogInformation("Worker exec at: {time}", DateTimeOffset.Now);
  85.                     }
  86.                 }
  87.                 catch (Exception ex) { }
  88.                 finally
  89.                 {
  90.                     if (message.Config.IsPersistence || (!temperature && !message.Config.IsPersistence))
  91.                         _queue.Enqueue(new Tuple<Message, RulesEngine.RulesEngine, SchedulerRuleWorkflow>(message, rulesEngine, ruleWorkflow));
  92.                 }
  93.             });
  94.         }
  95.         private async Task<bool> ExecuteByPlanAsyn(Message message)
  96.         {
  97.             var result = false;
  98.             var isExec = true;
  99.             try
  100.             {
  101.                 if (!string.IsNullOrEmpty(message.RoutePath))
  102.                 {
  103.                     var serviceResult = await _serviceProxyProvider.Invoke<object>(message.Parameters, message.RoutePath, message.ServiceKey);
  104.                     bool.TryParse(serviceResult?.ToString(), out result);
  105.                     isExec = true;
  106.                 }
  107.             }
  108.             catch { }
  109.             finally
  110.             {
  111.                 if (isExec && message.Config.IsPersistence)
  112.                     _keyValuePairs.AddOrUpdate(message.MessageId, DateTime.Now, (key, value) => DateTime.Now);
  113.                 else if (!message.Config.IsPersistence)
  114.                     _keyValuePairs.TryRemove(message.MessageId, out DateTime dateTime);
  115.             }
  116.             return result;
  117.         }
  118.         private async Task<RulePipePayloadParser> GetParser(SchedulerRuleWorkflow ruleWorkflow, RulesEngine.RulesEngine engine)
  119.         {
  120.             var payloadParser = new RulePipePayloadParser();
  121.             var ruleResult = await engine.ExecuteActionWorkflowAsync(ruleWorkflow.WorkflowName, ruleWorkflow.RuleName, new RuleParameter[] { new RuleParameter("parser", payloadParser) });
  122.             if (ruleResult.Exception != null && _logger.IsEnabled(LogLevel.Error))
  123.                 _logger.LogError(ruleResult.Exception, ruleResult.Exception.Message);
  124.             return payloadParser;
  125.         }
  126.         private RulesEngine.RulesEngine GetRuleEngine(SchedulerRuleWorkflow ruleWorkFlow)
  127.         {
  128.             var reSettingsWithCustomTypes = new ReSettings { CustomTypes = new Type[] { typeof(RulePipePayloadParser) } };
  129.             var result = new RulesEngine.RulesEngine(new Workflow[] { ruleWorkFlow.GetWorkflow() }, null, reSettingsWithCustomTypes);
  130.             return result;
  131.         }
  132.         private SchedulerRuleWorkflow GetSchedulerRuleWorkflow(string script)
  133.         {
  134.             var result = new SchedulerRuleWorkflow("1==1");
  135.             if (!string.IsNullOrEmpty(script))
  136.             {
  137.                 result = new SchedulerRuleWorkflow(script);
  138.             }
  139.             return result;
  140.         }
  141.     }
复制代码
总结

因为工作繁忙,微服务平台暂时搁置,等公司基于surging 的物联网项目上线后,再投入时间研发,surging 一直开发中未曾放弃,也许你没看到的版本才是最强大的。之前的QQ群被封了,如果感兴趣可以加:744677125
开源地址:https://github.com/fanliang11/surging
 

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

自由的羽毛

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表