啥是书签
流程引擎的核心关注点是安排流程,
如:第1步做什么 → 第2步做什么 → 第n步做什么...,
至于各步调具体是怎么做的,是由你来决定的,这不是流程引擎关注的重点。
流程安排大概会涉及到分叉、并行多个线路执行,但这只是流程安排的一种形式。
别的如果你硬要在步调中直接把某件事做了也行,这种情况一样平常是在自界说的Activity中去完成具体的业务功能,通常这种参杂特定业务逻辑的Activity复用性不高罢了。
有时间需要在执行到某个Activity时愣住,等待别的某个事情完成后,流程再继承执行。 举个简单的请假审批流程的例子:
- 界说流程
员工发起一个请假单的审核流程,先由部门主管审核,然后部门经理审核,最后流程结束。 流程引擎主要关注流程,即这里的:主管审核 → 部门经理审核,主管审核和经理审核的具体动作可以回调业务系统的api或实现自界说的activity去实现, 好比更新数据库中关联请假单的状态。
- 填写表单 这跟流程引擎无关,一样平常是在业务系统中填写表单提交保存到数据库
- 提交流程 在业务系统中,起首将请假单的状态改为:已进入审核流程,此时跟流程引擎无关。
然后调用流程引擎,发起一个执行请假流程,(通常会将请假单的id作为流程引擎的相关性id参数传入)
- 流程引擎执行
- [流程引擎] 开始节点啥也没做,只是表明流程开始执行了,这个似乎不是必须的
- ⭐[流程引擎] 流程进入【主管审核】这个节点,关键就在这里,它做两件事情,1创建一个书签使流程卡在这里,不要继承流转了;2回调业务系统实现主管审核的具体逻辑
- ⭐[业务系统] 在业务系统去做主管审核的事,也就是将状态改为【主管已审核等待经理审核】然后携带书签调用流程引擎,让流程规复继承执行辖区
- [流程引擎] 流程继承执行,一样的道理会通过书签卡住在经理审核步调,经理审核后流程继承,最终完成。
在执行某个Activity时,若需要先卡这里等着,就创建一个书签,然后等待外部事情做完后,用刚才这个书签告诉流程引擎继承执行后续步调。
如果把一个流程看成一本书、把流程中的Activity看成书中每一页,那么书签就是跟真实天下的书签差不多,我看到某一页时,需要去做另一件事时,就在当前页插入一个书签,等我忙完回来后 我翻到书签地点也继承看后续的内容。
书签应用基本流程
Activity核心执行方法界说如下:- ValueTask ExecuteAsync(ActivityExecutionContext context);
复制代码
- 当elsa预界说的Activity或我们自界说的Activity执行时,若需要卡住流程则创建书签,调用context.CreateBookmark即可。
- 在ExecuteAsync执行完成后,context中的书签会被复制到WorkflowExecutionContext中,由于当前Activity是卡住的,以是当前流程的本次执行就结束(虽然整个流程未结束,还在等下次从书签规复)
- 流程引擎有个书签持久化中间件,会从当前流程的WorkflowExecutionContext中取出书签,并做持久化存储。
- 当业务系统完成工作后,携带书签id再次执行此流程时,流程引擎会从持久化中获取书签,并根据书签信息从上次卡住的Activity规复流程实例继承执行。
携带书签规复流程继承执行有多种方式,本文【规复】节会阐明。
书签包罗哪些信息
上面基本流程提到,书签有两种数据格式,一种是存储在流程执行上下文WorkflowExecutionContext中的书签,它是用Bookmark类表示的;另一种是持久化存储的用StoredBookmark表示。 这两种格式差不多,比较持久化是通过上下文中的转换来的, 下面描述书签的几个关键属性:
属性描述id书签实例的唯一idWorkflowInstanceId书签所属的工作流实例的idActivityTypeName书签卡住的Activity类型的名称ActivityInstanceId书签卡住的Activity实例的idHash书签的hash值CorrelationId相关性id(通常关联到业务系统的业务实体id,如:请假单的id)Payload书签数据,差别的activity在创建和规复书签之间传递数据Metadata自界说的一些额外数据CreatedAt创建时间创建书签时,可以在Payload和Metadata存储复杂数据,以便在后续书签规复时,从书签中获取这些数据,属于一种参数传递手段。
它俩的区别我也没看懂,不外猜测Payload是跟地点Activity要完成的任务的业务相关的,而Metadata应该是对书签本身属性的一种扩充手段。 意思是我们自界说Activity时,通常利用Payload,而流程引擎大概会利用Metadata来实现一些特殊功能。
创建书签
书签是在流程实例执行到某个Activity时创建的。
一些elsa预界说的Activity本身就会创建书签,如:RunTask,当流程流转到 这种类型的节点时,它会创建一个书签,使流程卡在这里,然后会触发一个事件,我们的代码可以订阅此事件执行业务逻辑,之后通过书签使流程继承。- protected override async ValueTask ExecuteAsync(ActivityExecutionContext context)
- {
- // Create bookmark.
- var taskName = TaskName.Get(context); //从设计器中配置的任务名称获取任务名称字符串
- var identityGenerator = context.GetRequiredService<IIdentityGenerator>();
- var taskId = identityGenerator.GenerateId();//创建一个任务的唯一id
- var stimulus = new RunTaskStimulus(taskId, taskName);//暂时理解此对象表示一个要执行的任务,作为bookmark的payload存储起来了。
- context.CreateBookmark(stimulus, ResumeAsync, includeActivityInstanceId: false);//重点是这里指定了 当有人携带一个书签请求此流程时,将恢复执行的方法
-
- // Dispatch task request. 后续会触发事件,不是描述书签的重点,请查看官方文档
- var taskParams = Payload.GetOrDefault(context);
- var runTaskRequest = new RunTaskRequest(context, taskId, taskName, taskParams);
- var dispatcher = context.GetRequiredService<ITaskDispatcher>();
- await dispatcher.DispatchAsync(runTaskRequest, context.CancellationToken);
- }
- //通过书签恢复此流程时,流程会从这里继续执行下去
- private async ValueTask ResumeAsync(ActivityExecutionContext context)
- {
- var input = context.GetWorkflowInput<object>(InputKey);
- context.Set(Result, input);
- await context.CompleteActivityAsync();
- }
复制代码 别的就是我们自界说的Activity了,如果需要让流程卡这里,后面再通过书签规复流程继承,可以在其ExecuteAsync方法中调用 ActivityExecutionContext的CreateBookmark方法创建书签,此方法有多个重载,不外最核心的重载界说如下,注意查看注释:- public Bookmark CreateBookmark(CreateBookmarkArgs? options = default)
- {
- var payload = options?.Stimulus;//通过存储参数到这里,以便在通过书签恢复流程时从中获取此数据,属于一种参数传递方式
- var callback = options?.Callback;//当后续通过此书签恢复流程继续时,回调这里指定的方法,不指定的话,默认执行流程书签所在Activity的下一个Activity
- var bookmarkName = options?.BookmarkName ?? Activity.Type;//书签名称
- var bookmarkHasher = GetRequiredService<IStimulusHasher>();
- var identityGenerator = GetRequiredService<IIdentityGenerator>();
- var includeActivityInstanceId = options?.IncludeActivityInstanceId ?? true;
- var hash = bookmarkHasher.Hash(bookmarkName, payload, includeActivityInstanceId ? Id : null);//书签hash值
- var bookmarkId = options?.BookmarkId ?? identityGenerator.GenerateId();//书签唯一id
- var bookmark = new Bookmark(
- bookmarkId,
- bookmarkName,
- hash,
- payload,
- Activity.Id,
- ActivityNode.NodeId,
- Id,
- _systemClock.UtcNow,
- options?.AutoBurn ?? true,
- callback?.Method.Name,
- options?.AutoComplete ?? true,
- options?.Metadata);
- AddBookmark(bookmark);//存储到当前上下文中
- return bookmark;
- }
复制代码
这里注意,可以指定将来流程规复时,从callback指定的委托处开始执行,否则会直接执行书签地点Activity的下一个Activity。
删除书签
Activity无论是主动完成还是外部调用完成,总会调用ActivityExecutionContext的CompleteActivityAsync方法,内部会清空当前Activity的书签, 之后书签持久化中间件PersistBookmarkMiddleware会删除书签。
书签存储(持久化)
通过上述步调创建的书签对象开始存储在当前ActivityExecutionContext中,之后会被转存到WorkflowExecutionContext。 elsa的执行引擎类似asp.net core的中间件管道模型,在执行流程 【前/后】 会执行一系列中间件,此中PersistBookmarkMiddleware中间件就是在流程执行后 从WorkflowExecutionContext提取并存储书签到数据库或其它持久化,同时还会触发书签变动事件WorkflowBookmarksIndexed,后续的书签调度会订阅此事件。
PersistBookmarkMiddleware会同时存储新的书签,也会删除已经从WorkflowExecutionContext移除的书签。
调度(后台作业让书签主动规复)
我们可以携带书签信息主动调用流程引擎来让指定流程实例从书签处继承执行,某些时间可以安排一个后台任务,让书签在指定时间主动规复执行。 这种在后台安排一个作业,以在指定时间通过书签规复指定流程实例继承执行的事就是书签调度。
书签调度的基本流程如下:
- 在流程界说中放一个Activity节点,它必须是Delay、StartAt、Timer、Cron中的一种
- 这种Activity被执行时会创建书签,并在书签的payload携带跟时间相关参数
- 根据本文前面说的“存储(持久化)”会保存书签到数据库,并触发WorkflowBookmarksIndexed事件
- 事件订阅器ScheduleWorkflows会调用书签调度器IWorkflowScheduler去后台作业中安排书签规复任务
- 安排时从payload中获取安排时间,并调用elsa的后台作业框架安排后台作业。
当后台作业执行时,会根据书签信息规复流程继承,这个在文章中的“规复”节描述。
elsa内置的能主动规复的4种Activity
- Delay:当执行到此类型的Activity时,内部会创建一个书签,并在延迟指定时长后主动根据此书签规复。
- StartAt:当执行到此类型的Activity时,内部会创建一个书签,并在到达指定时间点时主动根据此书签规复。
- Timer:当执行到此类型的Activity时,内部会创建一个书签,并在到达指定时间点时主动根据此书签规复。
- Cron:与Timer类似,只不外是按Cron表达式的周期携带书签规复流程执行。
这几个Activity挺特殊,此中Timer、StartAt、Cron是触发器,同时触发器本就是特殊的Activity,而Delay仅仅是普通的Activity
书签调度器
由IBookmarkScheduler表示,默认实现为DefaultBookmarkScheduler,它仅关注前面说的4种类型的Activity创建的书签,书签调度器做如下两件事:
- 安排后台作业,在指定时间点执行 根据书签规复流程实例
- 删除指定书签的后台作业的安排
触发器调度仅关注Timer、StartAt、Cron,而且总是调度执行一个新的流程实例;而书签调度除了关注这3个外还多一个Delay,而且总是调度作业,通过书签规复现有流程实例的执行
触发器是在流程界说发布后调度的;而书签调度是Activity执行后,持久化书签时触发调度的。
Timer、Cron、StartAt
触发器是一个特殊的Activity,以Timer为例,它就是一个触发器,当界说的流程发布时,elsa会从流程中提取所有触发器节点,并保存在数据库中, 以便将来需要触发器时直接从数据库中获取,而不是又去找到流程并提取一次,这也可以称为触发器索引化。
Timer触发器也需要安排回台作业的,以便在固定时间到达时主动触发流程执行,关键在于这里的触发流程执行是每次都会创建新流程实例。- protected override async ValueTask ExecuteAsync(ActivityExecutionContext context)
- {
- //当前流程被执行,是否是因为当前节点触发的
- if(context.IsTriggerOfWorkflow())
- {
- //若是,则直接完成,以便流程后续节点立即执行
- await context.CompleteActivityAsync();
- return;
- }
-
- var clock = context.ExpressionExecutionContext.GetRequiredService<ISystemClock>();
- var timeSpan = context.ExpressionExecutionContext.Get(Interval);
- var resumeAt = clock.UtcNow.Add(timeSpan);
-
- context.JournalData.Add("ResumeAt", resumeAt);
- context.CreateBookmark(new TimerBookmarkPayload(resumeAt));
- }
复制代码 若当前流程就是当前触发器自己触发执行的,则直接让当前Activity完成,以便流程后续节点立刻执行。if(context.IsTriggerOfWorkflow())就是做这事的。假设Timer是流程首个节点,每隔指定时间, 流程都会被执行,且每次都会创建新的流程实例。
若当前执行不是被Timer自己这个节点触发的,阐明流程是流转到此节点的,此时会创建书签,按本文之前的阐明,此书签会被安排后台任务,以便在指定时长后主动规复,此时就跟延迟固定时间后规复一样,以是Timer在书签中并不会一直执行,而是每次仅仅规复自己这个流程实例。
书签匹配模式
有时间我们想触发书签规复流程,但又不想或不方便直接调用api,可以界说一个书签的filter参数(BookmarkQueueItem)来描述我们希望规复的书签,
书签参数BookmarkQueueItem
它其实界说的时书签filter参数,当系统中出现书签时,若与这里界说的过滤条件匹配时,则主动规复书签。
入队IBookmarkQueue
默认实现StoreBookmarkQueue- public async Task EnqueueAsync(NewBookmarkQueueItem item, CancellationToken cancellationToken = default)
- {
- //使用书签过滤条件描述对象item创建一个书签过滤器
- var filter = new BookmarkFilter
- {
- BookmarkId = item.BookmarkId,
- Hash = item.StimulusHash,
- WorkflowInstanceId = item.WorkflowInstanceId,
- ActivityTypeName = item.ActivityTypeName
- };
- //尝试立即恢复此书签
- var result = await resumer.ResumeAsync(filter, item.Options, cancellationToken);
- if (result.Matched)
- {
- //若确实执行成功说明,说明已经直接恢复书签了,这里直接返回即可
- logger.LogDebug("Successfully resumed workflow instance {WorkflowInstance} using bookmark {BookmarkId} for activity type {ActivityType}", item.WorkflowInstanceId, item.BookmarkId, item.ActivityTypeName);
- return;
- }
- // There was no matching bookmark yet. Store the queue item for the system to pick up whenever the bookmark becomes present.
- logger.LogDebug("No bookmark with ID {BookmarkId} found for workflow {WorkflowInstance} for activity type {ActivityType}. Adding the request to the bookmark queue", item.BookmarkId, item.WorkflowInstanceId, item.ActivityTypeName);
-
- var entity = new BookmarkQueueItem
- {
- Id = identityGenerator.GenerateId(),
- WorkflowInstanceId = item.WorkflowInstanceId,
- BookmarkId = item.BookmarkId,
- StimulusHash = item.StimulusHash,
- ActivityInstanceId = item.ActivityInstanceId,
- ActivityTypeName = item.ActivityTypeName,
- Options = item.Options,
- CreatedAt = systemClock.UtcNow,
- };
- //否则持久化此书签过滤条件,IBookmarkQueueProcessor会在后台线程中来消费这个队列
- await store.AddAsync(entity, cancellationToken);
- // Trigger the bookmark queue processor.
- // 触发一个信号,通知消费端,有新的书签恢复请求
- await bookmarkQueueSignaler.TriggerAsync(cancellationToken);
- }
复制代码 消费IBookmarkQueueProcessor
IBookmarkQueueWorker是个死循环后台任务,它里面等待书签变动信号,一旦有信号,它就开始调用BookmarkQueueProcessor- private async Task ProcessItemAsync(BookmarkQueueItem item, CancellationToken cancellationToken = default)
- {
- //根据参数创建过滤器
- var filter = item.CreateBookmarkFilter();
- var options = item.Options;
-
- logger.LogDebug("Processing bookmark queue item {BookmarkQueueItemId} for workflow instance {WorkflowInstanceId} for activity type {ActivityType}", item.Id, item.WorkflowInstanceId, item.ActivityTypeName);
- //尝试恢复书签
- var result = await bookmarkResumer.ResumeAsync(filter, options, cancellationToken);
- if (result.Matched)
- {
- //若成功则从书签参数队列中删除
- logger.LogDebug("Successfully resumed workflow instance {WorkflowInstance} using bookmark {BookmarkId} for activity type {ActivityType}", item.WorkflowInstanceId, item.BookmarkId, item.ActivityTypeName);
- await store.DeleteAsync(item.Id, cancellationToken);
- }
- else
- {
- //否则,等待下次尝试
- logger.LogDebug("No matching bookmark found for bookmark queue item {BookmarkQueueItemId} for workflow instance {WorkflowInstanceId} for activity type {ActivityType}", item.Id, item.WorkflowInstanceId, item.ActivityTypeName);
- }
- }
复制代码 书签队列工作者IBookmarkQueueWorker
它内部搞个死循环等信号,信号一来,立刻调用IBookmarkQueueProcessor消费。
书签队列信号IBookmarkQueueSignaler
貌似是书签入队与消费之间的一个信号器,在入队后触发这个信号,消费看到信号就马上消费下。 书签本身变动时,也会触发它
队列清理IBookmarkQueuePurger
它也是个后台任务,默认实现是DefaultBookmarkQueuePurger,它不管啥情况,太老的数据直接清理,无论是否乐成规复书签
多种持久化方式IBookmarkQueueStore
内存、ef、dapper等方式
elsa内部的利用场景
直接看哪些地方引用IBookmarkQueue就晓得,下面只说一个 ResumeExecuteWorkflowActivity它订阅事件,当一个流程执行完成时,它尝试去规复ExecuteWorkflow这个Activity卡住的活动,也就是子流程执行完成后,通知父活动继承。
书签规复
如论是执行一个新的流程实例,还是利用书签规复执行一个已有流程实例,最底层的接口都是IWorkflowRunner.RunAsync,只不外如果通过书签规复流程实例继承执行时传递参数中要包罗BookmarkId。
如下方式内部本质都是调用IWorkflowRunner
- 通过IWorkflowRuntime创建IWorkflowClient,默认实现LocalWorkflowClient(额外模块的实现:DistributedWorkflowClient、ProtoActorWorkflowClient )后调用其RunInstanceAsync
- 通过IWorkflowStarter的默认实现DefaultWorkflowStarter,内部是调用的IWorkflowRuntime,然后创建IWorkflowClient,最后调用执行流程
- 通过IBookmarkResumer的默认实现BookmarkResumer,内部也是调用的IWorkflowRuntime,然后创建IWorkflowClient,最后调用执行流程
- 调用elsa的http api 也可以利用书签规复流程执行,内部也一样
- 前面提到的“书签匹配模式”
- 前面提到的“调度”会主动规复书签
看命名也懂,若我们要编程的方式规复,可以优先思量利用IBookmarkResumer,或者用“书签匹配模式”中的IBookmarkQueue
若我们是独立摆设elsa,业务系统与elsa服务器分开的,则可以利用http api方式通过书签通知elsa规复已有的流程实例。
别的在Activity中创建书签时,若未指定书签规复时要执行的委托,则主动执行书签地点Activity的下一个Activity,否则就执行这个指定的委托。
总结
略...
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |