ToB企服应用市场:ToB评测及商务社交产业平台

标题: .net工作流elsa-触发器 [打印本页]

作者: 我爱普洱茶    时间: 2025-1-16 23:56
标题: .net工作流elsa-触发器
必备知识

触发器会用到书签和调理,这个在我的另外两篇文章中有分析。
什么是触发器

可以直接调用流程引擎的IWorkflowRuntime获取IWorkflowClient,然后调用它的CreateAndRunInstanceAsync来启动一个全新的流程。
也可以让流程引擎监听一个变乱,当变乱触发时,自动创建并执行(或借助书签恢复)一个流程实例,这就是触发器。
比如 定义一个触发器,当指定文件变化时,自动启动一个指定的流程。 再比如 定义一个触发器,每隔5分钟自动触发执行某个流程。
在流程定义中配置触发器

elsa提供代码或可视化计划器的方式定义流程,由于触发器仅仅是一个特殊的Activity,以是一样的,通过代码或在计划器拖拽触发器到流程定义中即可。
不同类型的触发器需要配置不同参数,
如:elsa内置的StartAt,它表示在指定时间点自动触发执行,以是需要设置它的DateTime,表示在这个时间点自动触发
再比如:HttpEndpoint是另一个elsa内置的触发器,它监听到指定请求时自动触发,以是需要配置它的 监听地址、Http方法、是否做授权判断等等属性。
触发器存储(索引化)

触发器都是定义在流程定义中的,且一个流程定义中,可能有多个雷同或不同类型的触发器, 从所有流程定义中把触发器都抽取出来,单独存储到一个列表中,当系统启动时,或别的情况需要访问整个系统中的触发器配置时,可以直接从这个列表中快速获取触发器, 这比每次都遍历所有流程定义,再从中抽取触发器更快,这就是触发器索引化,假如你用的ef配置为elsa的持久化,那么它会存储在Triggers表中
触发器索引化器由ITriggerIndexer接口表示,默认实现是TriggerIndexer,它就提供生存、删除、获取触发器的功能。 它在生存时会根据流程或流程定义,获取里面定义的触发器列表,然后调用其GetTriggerPayloadsAsync方法,获取触发器配置时的参数,这个参数通常是根据触发器属性生成的, 比力特别的是某些触发器中,GetTriggerPayloadsAsync会返回多个payload,这会导致触发器索引列表中存储多个记录,比如内置触发器HttpEndpoint,会根据用户配置的多个Http方法, 返回多个数据,假如你配置了GET POST,触发器索引列表会存储对应的两条记录,未来外部请求同一个url地址时,无论是get 照旧post,HttpEndpoint这个节点都会被执行。
流程定义变更后发布流程时,或直接刷新流程定义时,或别的情况,总之流程定义变化后,都会调用ITriggerIndexer重新生产并生存触发器,生存后会触发WorkflowTriggersIndexed变乱。
以是索引器还起到一个希奇的作用,就是让我们在流程定义中配置触发器相关参数,而配合触发器的外部监听功能可以通过从持久化获取,或从变乱参数中 得到 触发器的配置数据,从而控制监听逻辑。
触发器外部的监听部分

监听这件事并不是定义在触发器节点内部的,而是外部配合的,比如HttpEndpont触发器,监听是单独的asp.net core 中间件来实现的,但这个中间件应该依靠我们配置流程时给HttpEndpont触发器定义的参数。
配合HttpPoint触发的外部部分有个UpdateRouteTable,它监听WorkflowTriggersIndexed,并根据变乱参数获取监听的地址,进而配置路由。另外asp.net core中间件中还可以直接从持久化中获取触发器, 进而访问器payload中的触发器配置参数,并根据这些参数控制此中间件的执行流程。
而配合定时器相关触发器Timer StartAt Cron等的外部分是ScheduleWorkflows,它也监听WorkflowTriggersIndexed变乱,在变乱处理中,调用elsa调理器安排后台作业来,以实现到指定时间后让触发器执行。
触发器节点被执行

触发器是特殊的Activity,假如有个流程:A → B → C,其中B是触发器,当前流程可能并不是因为B的外部监听触发此流程的执行,可能是A执行后,流转到B,导致B的ExecuteAsync被执行。
ActivityExecutionContext.IsTriggerOfWorkflow就是用来判断这种情况的,若当前流程就是自己这个节点触发的,则为true,否则为false
以是触发器执行时ExecuteAsync方法中通常需要判断这两种情况。
内置HttpEndpoint触发器分析

这里分析下内置的HttpEndpoint触发器,但仅关注触发器的原理部分,以帮助我们更深刻地明白触发器的工作原理。 它定义在Elsa.Http模块中,它继承至Trigger
与触发器相关输入参数

参数名形貌SupportedMethods监听哪些http方法,可选值:"GET", "OST", "UT", "HEAD", "DELETE"Authorize监听的地址被请求时,是否做权限判断Policy权限判断用的策略名称Path监听的url地址RequestTimeout请求超时设置RequestSizeLimit请求体大小限制HttpEndpoint.GetTriggerPayloads

焦点源码:
  1.     protected override IEnumerable<object> GetTriggerPayloads(TriggerIndexingContext context) => GetBookmarkPayloads(context.ExpressionExecutionContext);
  2.     private IEnumerable<object> GetBookmarkPayloads(ExpressionExecutionContext context)
  3.     {
  4.         // Generate bookmark data for path and selected methods.
  5.         var normalizedRoute = context.Get(Path)!.NormalizeRoute();
  6.         var methods = SupportedMethods.GetOrDefault(context) ?? new List<string> { HttpMethods.Get };
  7.         var authorize = Authorize.GetOrDefault(context);
  8.         var policy = Policy.GetOrDefault(context);
  9.         var requestTimeout = RequestTimeout.GetOrDefault(context);
  10.         var requestSizeLimit = RequestSizeLimit.GetOrDefault(context);
  11.         //根据http请求方法,返回多个数据,会在触发器索引列表中创建多条记录
  12.         return methods
  13.             .Select(x => new HttpEndpointBookmarkPayload(normalizedRoute, x.ToLowerInvariant(), authorize, policy, requestTimeout, requestSizeLimit))
  14.             .Cast<object>()
  15.             .ToArray();
  16.     }
复制代码
在所在流程被发布时,会调用GetTriggerPayloads方法,而它会返回上述输入参数,这些输入参数终极被生存到数据库中,还会触发WorkflowTriggersIndexed变乱,这些监听相关的 参数还会生存到这个变乱的参数中。
这个方法会根据配置的SupportedMethods返回一个或多个对象,终极导致触发器索引列表中出现多条对应记录。
UpdateRouteTable

它监听WorkflowTriggersIndexed变乱,从变乱参数中获取Path,然后更新elsa路由表
  1. public class UpdateRouteTable(IRouteTableUpdater routeTableUpdater, IOptions<HttpActivityOptions> options) :
  2.     INotificationHandler<WorkflowTriggersIndexed>,
  3.     INotificationHandler<WorkflowBookmarksIndexed>
  4. {
  5.     /// <inheritdoc />
  6.     public async Task HandleAsync(WorkflowTriggersIndexed notification, CancellationToken cancellationToken)
  7.     {
  8.         routeTableUpdater.RemoveRoutes(notification.IndexedWorkflowTriggers.RemovedTriggers);
  9.         await routeTableUpdater.AddRoutesAsync(notification.IndexedWorkflowTriggers.AddedTriggers, cancellationToken);
  10.         await routeTableUpdater.AddRoutesAsync(notification.IndexedWorkflowTriggers.UnchangedTriggers, cancellationToken);
  11.     }
复制代码
HttpWorkflowsMiddleware

elsa http endpoint监听中间件,直接看注释吧
  1. public async Task InvokeAsync(HttpContext httpContext, IServiceProvider serviceProvider)
  2.     {
  3.         //当前请求路径
  4.         var path = GetPath(httpContext);
  5.         //根据elsa路由表匹配路由数据
  6.         var matchingPath = GetMatchingRoute(serviceProvider, path).Route;
  7.         //配置elsa时,HttpActivityOptions中指定的基础地址
  8.         var basePath = options.Value.BasePath?.ToString().NormalizeRoute();
  9.         //若请求地址连elsa配置的基础地址都不匹配,则直接执行下个中间件,说明没见听到触发器定义的要求
  10.         // If the request path does not match the configured base path to handle workflows, then skip.
  11.         if (!string.IsNullOrWhiteSpace(basePath))
  12.         {
  13.             if (!path.StartsWith(basePath, StringComparison.OrdinalIgnoreCase))
  14.             {
  15.                 await next(httpContext);
  16.                 return;
  17.             }
  18.             // Strip the base path.
  19.             matchingPath = matchingPath[basePath.Length..];
  20.         }
  21.         matchingPath = matchingPath.NormalizeRoute();
  22.         var input = new Dictionary<string, object>
  23.         {
  24.             [HttpEndpoint.HttpContextInputKey] = true,
  25.             [HttpEndpoint.RequestPathInputKey] = path.NormalizeRoute()
  26.         };
  27.         var cancellationToken = httpContext.RequestAborted;
  28.         var request = httpContext.Request;
  29.         var method = request.Method.ToLowerInvariant();
  30.         var httpWorkflowLookupService = serviceProvider.GetRequiredService<IHttpWorkflowLookupService>();
  31.         var workflowInstanceId = await GetWorkflowInstanceIdAsync(serviceProvider, httpContext, cancellationToken);
  32.         var correlationId = await GetCorrelationIdAsync(serviceProvider, httpContext, cancellationToken);
  33.         //根据请求路径 http方法 和 HttpEndpoint计算hash值
  34.         var bookmarkHash = ComputeBookmarkHash(serviceProvider, matchingPath, method);
  35.         //根据上面的hash值,从存储中获取匹配的工作流及其触发器列表
  36.         var lookupResult = await httpWorkflowLookupService.FindWorkflowAsync(bookmarkHash, cancellationToken);
  37.         if (lookupResult != null)
  38.         {
  39.             //若找到了流程,且里面仅包含一个与当前请求匹配的触发器,则说了匹配上了,执行流程,否则报错
  40.             var triggers = lookupResult.Triggers;
  41.             if (triggers.Count > 1)
  42.             {
  43.                 //报错
  44.                 await HandleMultipleWorkflowsFoundAsync(httpContext, () => triggers.Select(x => new
  45.                 {
  46.                     x.WorkflowDefinitionId
  47.                 }), cancellationToken);
  48.                 return;
  49.             }
  50.             var trigger = triggers.FirstOrDefault();
  51.             if (trigger != null)
  52.             {
  53.                 var workflowGraph = lookupResult.WorkflowGraph!;
  54.                 //执行流程中触发器所在节点
  55.                 await StartWorkflowAsync(httpContext, trigger, workflowGraph, input, workflowInstanceId, correlationId);
  56.                 return;
  57.             }
  58.         }
  59.         //若触发器节点已经执行过,也就是之前从其它节点流转过去的,那次触发器节点被执行时会创建书签的,则直接根据书签恢复执行
  60.         var bookmarks = await FindBookmarksAsync(serviceProvider, bookmarkHash, workflowInstanceId, correlationId, cancellationToken).ToList();
  61.         //若找到多个匹配的,报错
  62.         if (bookmarks.Count > 1)
  63.         {
  64.             await HandleMultipleWorkflowsFoundAsync(httpContext, () => bookmarks.Select(x => new
  65.             {
  66.                 x.WorkflowInstanceId
  67.             }), cancellationToken);
  68.             return;
  69.         }
  70.         var bookmark = bookmarks.SingleOrDefault();
  71.         if (bookmark != null)
  72.         {
  73.             //恢复书签执行
  74.             await ResumeWorkflowAsync(httpContext, bookmark, input, correlationId);
  75.             return;
  76.         }
  77.         // 如果基础地址都匹配上了,却没找到对应的流程,则抛出404错误
  78.         if (basePath != null)
  79.         {
  80.             await httpContext.Response.SendNotFoundAsync(cancellation: cancellationToken);
  81.             return;
  82.         }
  83.         // If no base path was configured, the request should be handled by subsequent middlewares.
  84.         await next(httpContext);
  85.     }
复制代码
HttpEndpoint.ExecuteAsync

若前面的中间件匹配上当前触发器节点
  1.     protected override async ValueTask ExecuteAsync(ActivityExecutionContext context)
  2.     {
  3.         var path = Path.Get(context);
  4.         if (path.Contains("//"))
  5.             throw new RoutePatternException(path, "Path cannot contain double slashes (//)");
  6.         //如果本次执行不是由当前HttpEndpoint自己触发的,比如在当前节点前另一个触发器执行了此流程,但是节点流转到这里来了。
  7.         if (!context.IsTriggerOfWorkflow())
  8.         {
  9.             //则直接创建书签卡住流程,等到匹配的http请求被HttpWorkflowsMiddleware流转到这里时,OnResumeAsync将被执行
  10.             context.CreateBookmarks(GetBookmarkPayloads(context.ExpressionExecutionContext), includeActivityInstanceId: false, callback: OnResumeAsync);
  11.             return;
  12.         }
  13.         //否则,说明当前流程的执行,就是这里配置的触发器触发的。
  14.         var httpContextAccessor = context.GetRequiredService<IHttpContextAccessor>();
  15.         var httpContext = httpContextAccessor.HttpContext;
  16.         //触发器也是activity,它可能直接被执行,而不是被http请求执行,则需要等待传统的书签请求来恢复OnResumeAsync
  17.         if (httpContext == null)
  18.         {
  19.             // We're executing in a non-HTTP context (e.g. in a virtual actor).
  20.             // Create a bookmark to allow the invoker to export the state and resume execution from there.
  21.             context.CreateBookmark(OnResumeAsync, BookmarkMetadata.HttpCrossBoundary);
  22.             return;
  23.         }
  24.         //否则说明当前流程的触发器就是自己,并且被http请求触发了
  25.         await HandleRequestAsync(context, httpContext);
  26.     }
复制代码
HttpEndpoint.OnResumeAsync

无论时HttpWorkflowsMiddleware匹配上,通过书签恢复流程执行;照旧走的传统的书签恢复,都会执行这里。
  1.     private async ValueTask OnResumeAsync(ActivityExecutionContext context)
  2.     {
  3.         var httpContextAccessor = context.GetRequiredService<IHttpContextAccessor>();
  4.         var httpContext = httpContextAccessor.HttpContext;
  5.         //在恢复执行时,可能并不是http请求恢复的,可能是直接调用书签恢复的
  6.         if (httpContext == null)
  7.         {
  8.             // We're executing in a non-HTTP context (e.g. in a virtual actor).
  9.             // Create a bookmark to allow the invoker to export the state and resume execution from there.
  10.             context.CreateBookmark(OnResumeAsync, BookmarkMetadata.HttpCrossBoundary);
  11.             return;
  12.         }
  13.         //处理http请求
  14.         await HandleRequestAsync(context, httpContext);
  15.     }
复制代码
触发器调理

有几个跟时间相关的内置触发器:Cron、StartAt、Timer,它们使用elsa的工作流调理框架,在后台作业中,根据设置的时间规则,触发执行流程。
触发器调理器ITriggerScheduler

ITriggerScheduler它定义两个方法,调理触发器、注销触发器调理。 默认实现DefaultTriggerScheduler,它使用elsa的流程调理器,实如今后台作业中实现流程安排。 值得留意的是,它们在触发时,DefaultTriggerScheduler总是创建新的流程实例。焦点源码如下:
  1. public async Task ScheduleAsync(IEnumerable<StoredTrigger> triggers, CancellationToken cancellationToken = default)
  2.     {
  3.         var triggerList = triggers.ToList();
  4.         var timerTriggers = triggerList.Filter<Activities.Timer>();
  5.         var startAtTriggers = triggerList.Filter<StartAt>();
  6.         var cronTriggers = triggerList.Filter<Cron>();
  7.         var now = systemClock.UtcNow;
  8.         // Schedule each Timer trigger.
  9.         foreach (var trigger in timerTriggers)
  10.         {
  11.             var (startAt, interval) = trigger.GetPayload<TimerTriggerPayload>();
  12.             var input = new { StartAt = startAt, Interval = interval }.ToDictionary();
  13.             //安排流程作业时,要求创建新的流程实例
  14.             var request = new ScheduleNewWorkflowInstanceRequest
  15.             {
  16.                 WorkflowDefinitionHandle = WorkflowDefinitionHandle.ByDefinitionVersionId(trigger.WorkflowDefinitionVersionId),
  17.                 TriggerActivityId = trigger.ActivityId,
  18.                 Input = input
  19.             };
  20.             await workflowScheduler.ScheduleRecurringAsync(trigger.Id, request, startAt, interval, cancellationToken);
  21.         }
  22.         // Schedule each StartAt trigger.
  23.         foreach (var trigger in startAtTriggers)
  24.         {
  25.             var executeAt = trigger.GetPayload<StartAtPayload>().ExecuteAt;
  26.             
  27.             // If the trigger is in the past, log info and skip scheduling.
  28.             if (executeAt < now)
  29.             {
  30.                 logger.LogInformation("StartAt trigger is in the past. TriggerId: {TriggerId}. ExecuteAt: {ExecuteAt}. Skipping scheduling", trigger.Id, executeAt);
  31.                 continue;
  32.             }
  33.             
  34.             var input = new { ExecuteAt = executeAt }.ToDictionary();
  35.             //安排流程作业时,要求创建新的流程实例
  36.             var request = new ScheduleNewWorkflowInstanceRequest
  37.             {
  38.                 WorkflowDefinitionHandle = WorkflowDefinitionHandle.ByDefinitionVersionId(trigger.WorkflowDefinitionVersionId),
  39.                 TriggerActivityId = trigger.ActivityId,
  40.                 Input = input
  41.             };
  42.             await workflowScheduler.ScheduleAtAsync(trigger.Id, request, executeAt, cancellationToken);
  43.         }
  44.         // Schedule each Cron trigger.
  45.         foreach (var trigger in cronTriggers)
  46.         {
  47.             var payload = trigger.GetPayload<CronTriggerPayload>();
  48.             var cronExpression = payload.CronExpression;
  49.             if (string.IsNullOrWhiteSpace(cronExpression))
  50.             {
  51.                 logger.LogWarning("Cron expression is empty. TriggerId: {TriggerId}. Skipping scheduling of this trigger", trigger.Id);
  52.                 continue;
  53.             }
  54.             
  55.             var input = new { CronExpression = cronExpression }.ToDictionary();
  56.             //安排流程作业时,要求创建新的流程实例
  57.             var request = new ScheduleNewWorkflowInstanceRequest
  58.             {
  59.                 WorkflowDefinitionHandle = WorkflowDefinitionHandle.ByDefinitionVersionId(trigger.WorkflowDefinitionVersionId),
  60.                 TriggerActivityId = trigger.ActivityId,
  61.                 Input = input
  62.             };
  63.             try
  64.             {
  65.                 await workflowScheduler.ScheduleCronAsync(trigger.Id, request, cronExpression, cancellationToken);
  66.             }
  67.             catch (FormatException ex)
  68.             {
  69.                 logger.LogWarning(ex, "Cron expression format error. CronExpression: {CronExpression}", cronExpression);
  70.             }
  71.         }
  72.     }
复制代码
ScheduleWorkflows监听触发器变更变乱,并进行触发器调理。
  1. public class ScheduleWorkflows : INotificationHandler<WorkflowTriggersIndexed>, INotificationHandler<WorkflowBookmarksIndexed>
  2. {
  3.     //...其它代码
  4.     public async Task HandleAsync(WorkflowTriggersIndexed notification, CancellationToken cancellationToken)
  5.     {
  6.         //注销之前的后台作业
  7.         await _triggerScheduler.UnscheduleAsync(notification.IndexedWorkflowTriggers.RemovedTriggers, cancellationToken);
  8.         //使用后台作业,在指定时间点恢复触发器所在节点
  9.         await _triggerScheduler.ScheduleAsync(notification.IndexedWorkflowTriggers.AddedTriggers, cancellationToken);
  10.     }
复制代码
以StartAt触发器为例

这个相对简单,发布流程时触发触发器以是变更变乱
[code]    protected override object GetTriggerPayload(TriggerIndexingContext context)    {        //从输入参数中获取payload        var executeAt = context.ExpressionExecutionContext.Get(DateTime);        //返回,以供触发器调理器访问        return new StartAtPayload(executeAt);    }    ///     protected override async ValueTask ExecuteAsync(ActivityExecutionContext context)    {        //若当前流程的执行,正是当前触发器导致执行的,则直接完成,因为此时是时间到了,当前方法第二次被执行。        if (context.IsTriggerOfWorkflow())        {            await context.CompleteActivityAsync();            return;        }        //否则说明是另一个触发器导致流程执行,并流转到这里,这个时候应该去调理任务。        //从输入参数中获取指定的触发时间        var executeAt = context.ExpressionExecutionContext.Get(DateTime);        var clock = context.ExpressionExecutionContext.GetRequiredService();        var now = clock.UtcNow;        var logger = context.GetRequiredService();        context.JournalData.Add("Executed At", now);                if (executeAt




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4