开源轻量级工作流WorkflowCore介绍

打印 上一主题 下一主题

主题 546|帖子 546|积分 1638

在.Net Framework环境下,我们使用Windows Workflow Foundation(WF)作为项目的工作流引擎,可是.Net Core已经不支持WF了,需要为基于.Net Core的项目选择新的工作流引擎。基本要求如下:

  • 轻量级,部署和使用都很简单。
  • 有相当数量的用户,往往使用的人越多,产品也就越可靠,遇到问题也容易找到解决办法。
  • 支持使用配置文件定义工作流,而不仅仅是使用代码定义。
符合上述要求的开源项目有几个,这里介绍开源项目WorkflowCore,项目地址:https://github.com/danielgerlag/workflow-core。
本文的示例可以从github下载:https://github.com/zhenl/ZL.WorflowCoreDemo
简单的控制台项目

首先,使用Visual Studio创建一个.Net Core的控制台项目,在NuGet管理器中引入下面程序包:

  • WorkflowCore
  • Microsoft.Extensions.DependencyInjection
  • Microsoft.Extensions.Logging
然后,创建两个工作流的步骤:
  1. using WorkflowCore.Interface;
  2. using WorkflowCore.Models;
  3. namespace WorkflowCoreTest
  4. {
  5.     public class HelloWorld : StepBody
  6.     {
  7.         public override ExecutionResult Run(IStepExecutionContext context)
  8.         {
  9.             Console.WriteLine("你好");
  10.             return ExecutionResult.Next();
  11.         }
  12.     }
  13. }
复制代码
  1. using WorkflowCore.Interface;
  2. using WorkflowCore.Models;
  3. namespace WorkflowCoreTest
  4. {
  5.     public class GoodbyeWorld : StepBody
  6.     {
  7.         public override ExecutionResult Run(IStepExecutionContext context)
  8.         {
  9.             Console.WriteLine("再见");
  10.             return ExecutionResult.Next();
  11.         }
  12.     }
  13. }
复制代码
接下来使用这两个步骤定义一个工作流:
  1. using WorkflowCore.Interface;
  2. namespace WorkflowCoreTest
  3. {
  4.     public class HelloWorldWorkflow : IWorkflow
  5.     {
  6.         public string Id => "HelloWorld";
  7.         public int Version => 1;
  8.         public void Build(IWorkflowBuilder<object> builder)
  9.         {
  10.             builder
  11.                 .StartWith<HelloWorld>()
  12.                 .Then<GoodbyeWorld>();
  13.         }
  14.     }
  15. }
复制代码
最后,在主程序中,创建WorkflowHost,注册并运行工作流,代码如下:
  1. using Microsoft.Extensions.DependencyInjection;
  2. using System;
  3. using System.Threading;
  4. using WorkflowCore.Interface;
  5. namespace WorkflowCoreTest
  6. {
  7.     class Program
  8.     {
  9.         static void Main(string[] args)
  10.         {
  11.             IServiceProvider serviceProvider = ConfigureServices();
  12.             var host = serviceProvider.GetService<IWorkflowHost>();
  13.             host.RegisterWorkflow<HelloWorldWorkflow>();
  14.             host.Start();
  15.             host.StartWorkflow("HelloWorld", 1, null);
  16.             Console.ReadLine();
  17.             host.Stop();
  18.         }
  19.         private static IServiceProvider ConfigureServices()
  20.         {
  21.             //setup dependency injection
  22.             IServiceCollection services = new ServiceCollection();
  23.             services.AddLogging();
  24.             services.AddWorkflow();
  25.                         
  26.             var serviceProvider = services.BuildServiceProvider();
  27.             return serviceProvider;
  28.         }
  29.     }
  30. }
复制代码
简单的工作流就完成了。
WorkflowHost

上一节通过一个简单的控制台例子介绍了WorkflowCore工作流的定义和运行过程,从例子中可以看到,工作流是运行在WorkflowHost实例中的,再看一下代码:
  1. static void Main(string[] args)
  2.         {
  3.             IServiceProvider serviceProvider = ConfigureServices();
  4.             var host = serviceProvider.GetService<IWorkflowHost>();
  5.                        
  6.             host.RegisterWorkflow<HelloWorldWorkflow>();
  7.             host.Start();
  8.             host.StartWorkflow("HelloWorld", 1, null);
  9.             
  10.             Console.ReadLine();
  11.             host.Stop();
  12.         }
复制代码
WorkflowHost的工作过程是这样的,首先需要获取WorkflowHost的实例,然后注册工作流,这里可以注册多个工作流,接下来,启动host,然后可以启动工作流,这里可以启动多个工作流实例,最后,关闭host。
我们需要对WorkflowHost有进一步的了解,第一个问题,每次使用serviceProvider.GetService()获得的host是否是同一对象?为了回答这个问题,我们增加一些代码:
  1.             var host = serviceProvider.GetService<IWorkflowHost>();
  2.             var host1 = serviceProvider.GetService<IWorkflowHost>();
  3.             Console.WriteLine(host == host1);
复制代码
我们获取两个host变量比较一下看是否指向相同的对象,结果是True,也就是使用serviceProvider.GetService "HelloWithNameWorkflow";        public int Version => 1;        public void Build(IWorkflowBuilder builder)        {            builder                .StartWith(context => ExecutionResult.Next())                .WaitFor("MyEvent", (data, context) => context.Workflow.Id, data => DateTime.Now)                    .Output(data => data.MyName, step => step.EventData)                .Then()                    .Input(step => step.Name, data => data.MyName)                .Then()                    .Input(step => step.Name, data => data.MyName);        }    }}[/code]这里,流程声明为 IWorkflow,说明流程使用这个类存储数据,在流程定义中,可以使用data操作相关的数据对象,比如: .Input(step => step.Name, data => data.MyName) 就是将流程数据中的MyName传递给步骤中的Name(step.Name)。
这段代码中还使用WaitFor定义了一个事件,这个事件的输出是将事件接收的外部参数(step.EventData)传递给流程的MyName属性。
还需要修改两个步骤,增加名称字段:
  1.             host.RegisterWorkflow<HelloWorldWorkflow>();
  2.             host.Start();
  3.             host.StartWorkflow("HelloWorld", 1, null);
  4.             host.Stop();
  5.             Console.ReadLine();
  6.             
复制代码
  1.             host.RegisterWorkflow<HelloWorldWorkflow>();
  2.             host.Start();
  3.             host.StartWorkflow("HelloWorld", 1, null);
  4.             host.StartWorkflow("HelloWorld", 1, null);
  5.             host.StartWorkflow("HelloWorld", 1, null);
  6.             host.Stop();
  7.             Console.ReadLine();
复制代码
下面是流程注册和运行的代码:
  1. namespace ZL.WorflowCoreDemo.InputDataToStep
  2. {
  3.     public class MyNameClass
  4.     {
  5.         public string MyName { get; set; }
  6.     }
  7. }
复制代码
  1. using System;
  2. using WorkflowCore.Interface;
  3. using WorkflowCore.Models;
  4. using ZL.WorflowCoreDemo.InputDataToStep.Steps;
  5. namespace ZL.WorflowCoreDemo.InputDataToStep
  6. {
  7.     public class HelloWithNameWorkflow : IWorkflow<MyNameClass>
  8.     {
  9.         public string Id => "HelloWithNameWorkflow";
  10.         public int Version => 1;
  11.         public void Build(IWorkflowBuilder<MyNameClass> builder)
  12.         {
  13.             builder
  14.                 .StartWith(context => ExecutionResult.Next())
  15.                 .WaitFor("MyEvent", (data, context) => context.Workflow.Id, data => DateTime.Now)
  16.                     .Output(data => data.MyName, step => step.EventData)
  17.                 .Then<HelloWithName>()
  18.                     .Input(step => step.Name, data => data.MyName)
  19.                 .Then<GoodbyeWithName>()
  20.                     .Input(step => step.Name, data => data.MyName);
  21.         }
  22.     }
  23. }
复制代码
我们也可以使用字典作为数据对象,流程的定义如下:
  1. using System;
  2. using System.Collections.Generic;
  3. using WorkflowCore.Interface;
  4. using WorkflowCore.Models;
  5. namespace ZL.WorflowCoreDemo.InputDataToStep.Steps
  6. {
  7.     public class HelloWithName : StepBody
  8.     {
  9.         public string Name { get; set; }
  10.         public override ExecutionResult Run(IStepExecutionContext context)
  11.         {
  12.             Console.WriteLine("你好," + Name);
  13.             return ExecutionResult.Next();
  14.         }
  15.     }
  16. }
复制代码
这里没有使用自定义的类,而是使用了字典Dictionary,流程的运行代码如下:
  1. using System;
  2. using WorkflowCore.Interface;
  3. using WorkflowCore.Models;
  4. namespace ZL.WorflowCoreDemo.InputDataToStep.Steps
  5. {
  6.     public class GoodbyeWithName : StepBody
  7.     {
  8.         public string Name { get; set; }
  9.         public override ExecutionResult Run(IStepExecutionContext context)
  10.         {
  11.             Console.WriteLine(Name + ",再见");
  12.             return ExecutionResult.Next();
  13.         }
  14.     }
  15. }
复制代码
采用JSON格式定义流程

WorkflowCore 支持采用JSON或者YAML格式定义流程,使用时通过使用IDefintionLoader加载流程来替代RegisterWorkflow。我们仍然通过简单的例子来说明。在我们现有的工程中已经定义了几个简单的流程步骤,我们用JSON格式将这几个步骤组成简单的工作流。
首先,在现有的解决方案中增加一个.Net Core的控制台项目,名称为ZL.WorkflowCoreDemo.Json,使用NuGet引入WorkflowCore,Microsoft.Extensions.Logging,还有WorkflowCore.DSL,然后,我们在项目中增加一个json文件,将文件的属性“复制到输出目录”修改为“始终复制”:

在json文件中定义流程:
  1. using System;
  2. using WorkflowCore.Interface;
  3. using WorkflowCore.Models;
  4. using ZL.WorflowCoreDemo.InputDataToStep.Steps;
  5. namespace ZL.WorflowCoreDemo.InputDataToStep
  6. {
  7.     public class HelloWithNameWorkflow : IWorkflow<MyNameClass>
  8.     {
  9.         public string Id => "HelloWithNameWorkflow";
  10.         public int Version => 1;
  11.         public void Build(IWorkflowBuilder<MyNameClass> builder)
  12.         {
  13.             builder
  14.                 .StartWith(context => ExecutionResult.Next())
  15.                 .WaitFor("MyEvent", (data, context) => context.Workflow.Id, data => DateTime.Now)
  16.                     .Output(data => data.MyName, step => step.EventData)
  17.                 .Then<HelloWithName>()
  18.                     .Input(step => step.Name, data => data.MyName)
  19.                 .Then<GoodbyeWithName>()
  20.                     .Input(step => step.Name, data => data.MyName);
  21.         }
  22.     }
  23. }
复制代码
Json定义格式符合WorkflowCore的DSL,这里不进行DSL的详细介绍,我们重点关注流程如何定义,加载和运行。
我们可以将前面项目中的代码拷贝过来进行修改,首先修改下面的函数:
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Threading;
  4. using Microsoft.Extensions.DependencyInjection;
  5. using WorkflowCore.Interface;
  6. namespace ZL.WorflowCoreDemo.InputDataToStep
  7. {
  8.     public class FlowRun
  9.     {
  10.         public static void Run()
  11.         {
  12.             IServiceProvider serviceProvider = ConfigureServices();
  13.             var host = serviceProvider.GetService<IWorkflowHost>();
  14.             
  15.             host.RegisterWorkflow<HelloWithNameWorkflow, MyNameClass>();
  16.             host.Start();
  17.             var initialData = new MyNameClass();
  18.             var workflowId = host.StartWorkflow("HelloWithNameWorkflow", 1, initialData).Result;
  19.             
  20.             Console.WriteLine("输入名字");
  21.             string value = Console.ReadLine();
  22.             using System;
  23. using Xunit;
  24. using WorkflowCore.Testing;
  25. using ZL.WorflowCoreDemo.Basic;
  26. using WorkflowCore.Models;
  27. using System.Threading;
  28. using FluentAssertions;
  29. namespace ZL.WorkflowCoreDemo.Test
  30. {
  31.     public class DemoUnitTest:WorkflowTest<HelloWorldWorkflow,dynamic>
  32.     {
  33.         public DemoUnitTest()
  34.         {
  35.             Setup();
  36.         }
  37.         [Fact]
  38.         public void Test1()
  39.         {
  40.             dynamic data = new { };
  41.             var workflowId = StartWorkflow(data);
  42.             WaitForWorkflowToComplete(workflowId, TimeSpan.FromSeconds(30));
  43.             WorkflowStatus status = GetStatus(workflowId);
  44.             status.Should().Be(WorkflowStatus.Complete);
  45.             UnhandledStepErrors.Count.Should().Be(0);
  46.            
  47.         }
  48.         protected new WorkflowStatus GetStatus(string workflowId)
  49.         {
  50.             var instance = PersistenceProvider.GetWorkflowInstance(workflowId).Result;
  51.             return instance.Status;
  52.         }
  53.         protected new void WaitForWorkflowToComplete(string workflowId, TimeSpan timeOut)
  54.         {
  55.             var status = GetStatus(workflowId);
  56.             var counter = 0;
  57.             while ((status == WorkflowStatus.Runnable) && (counter < (timeOut.TotalMilliseconds / 100)))
  58.             {
  59.                 Thread.Sleep(100);
  60.                 counter++;
  61.                 status = GetStatus(workflowId);
  62.             }
  63.         }
  64.     }
  65. }
  66.             Console.ReadLine();
  67.             host.Stop();
  68.         }
  69.         private static IServiceProvider ConfigureServices()
  70.         {
  71.             //setup dependency injection
  72.             IServiceCollection services = new ServiceCollection();
  73.             services.AddLogging();
  74.             services.AddWorkflow();
  75.             var serviceProvider = services.BuildServiceProvider();
  76.             return serviceProvider;
  77.         }
  78.     }
  79. }
复制代码
ConfigureServices新增加了services.AddWorkflowDSL();
在主函数中,使用IDefintionLoader加载JSON格式的流程定义:
  1. using System;
  2. using System.Collections.Generic;
  3. using WorkflowCore.Interface;
  4. using WorkflowCore.Models;
  5. using ZL.WorflowCoreDemo.InputDataToStep.Steps;
  6. namespace ZL.WorflowCoreDemo.InputDataToStep
  7. {
  8.     public class HelloWithNameWorkflowDynamic : IWorkflow<Dictionary<string,string>>
  9.     {
  10.         public string Id => "HelloWithNameWorkflowDynamic";
  11.         public int Version => 1;
  12.         public void Build(IWorkflowBuilder<Dictionary<string, string>> builder)
  13.         {
  14.             builder
  15.                 .StartWith(context => ExecutionResult.Next())
  16.                 .WaitFor("MyEvent", (data, context) => context.Workflow.Id, data => DateTime.Now)
  17.                     .Output((step,data)=>data.Add("Name",(string)step.EventData))
  18.                 .Then<HelloWithName>()
  19.                     .Input(step => step.Name, data => data["Name"])
  20.                 .Then<GoodbyeWithName>()
  21.                     .Input(step => step.Name, data => data["Name"]);
  22.         }
  23.     }
  24. }
复制代码
现在,流程可以运行了。
在研究过程中发现了一个坑,可能需要注意。在这个例子中我们使用了前面项目定义的流程步骤,如果在本项目中定义流程步骤,会出现找不到相应动态库的错误,不知道是否是一个缺陷。
JSON格式(DSL)定义流程与使用Fluent API定义流程的比较

前面我们分别讨论了使用Fluent API定义流程和使用JSON格式定义流程,按照以前的使用经验,感觉这两种定义方式应该可以互相转换,互相代替,但在实际应用中发现并不是如此,两种方式都有不能被替代的功能。
使用Fluent API可以使用Lambda 表达式定义步骤

我们可以在流程中直接使用Lambda表达式定义步骤,而不需要定义类,比如:
  1. IServiceProvider serviceProvider = ConfigureServices();
  2.             var host = serviceProvider.GetService<IWorkflowHost>();
  3.             
  4.             host.RegisterWorkflow<HelloWithNameWorkflowDynamic, Dictionary<string,string>>();
  5.             host.Start();
  6.             var initialData = new Dictionary<string,string>();
  7.             var workflowId = host.StartWorkflow("HelloWithNameWorkflowDynamic", 1, initialData).Result;
  8.             
  9.             Console.WriteLine("输入名字");
  10.             string value = Console.ReadLine();
  11.             using System;
  12. using Xunit;
  13. using WorkflowCore.Testing;
  14. using ZL.WorflowCoreDemo.Basic;
  15. using WorkflowCore.Models;
  16. using System.Threading;
  17. using FluentAssertions;
  18. namespace ZL.WorkflowCoreDemo.Test
  19. {
  20.     public class DemoUnitTest:WorkflowTest<HelloWorldWorkflow,dynamic>
  21.     {
  22.         public DemoUnitTest()
  23.         {
  24.             Setup();
  25.         }
  26.         [Fact]
  27.         public void Test1()
  28.         {
  29.             dynamic data = new { };
  30.             var workflowId = StartWorkflow(data);
  31.             WaitForWorkflowToComplete(workflowId, TimeSpan.FromSeconds(30));
  32.             WorkflowStatus status = GetStatus(workflowId);
  33.             status.Should().Be(WorkflowStatus.Complete);
  34.             UnhandledStepErrors.Count.Should().Be(0);
  35.            
  36.         }
  37.         protected new WorkflowStatus GetStatus(string workflowId)
  38.         {
  39.             var instance = PersistenceProvider.GetWorkflowInstance(workflowId).Result;
  40.             return instance.Status;
  41.         }
  42.         protected new void WaitForWorkflowToComplete(string workflowId, TimeSpan timeOut)
  43.         {
  44.             var status = GetStatus(workflowId);
  45.             var counter = 0;
  46.             while ((status == WorkflowStatus.Runnable) && (counter < (timeOut.TotalMilliseconds / 100)))
  47.             {
  48.                 Thread.Sleep(100);
  49.                 counter++;
  50.                 status = GetStatus(workflowId);
  51.             }
  52.         }
  53.     }
  54. }
  55.             
  56.             Console.ReadLine();
  57.             foreach (var key in initialData.Keys)
  58.             {
  59.                 Console.WriteLine(key + ":" + initialData[key]);
  60.             }
  61.             Console.ReadLine();
  62.             host.Stop();
复制代码
这种方式无法使用JSON等格式实现。
采用JSON等DSL格式可以方便地定义步骤间的跳转

采用JSON等DSL格式时,每个步骤有明示的ID,步骤转移通过ID标识进行,这样可以很方便地进行步骤间的跳转。而采用Fluent API则没有这么灵活,我们看以下的定义:
  1. {
  2.   "Id": "HelloWorld",
  3.   "Version": 1,
  4.   "Steps": [
  5.     {
  6.       "Id": "Hello",
  7.       "StepType": "ZL.WorflowCoreDemo.Basic.Steps.HelloWorld,ZL.WorflowCoreDemo",
  8.       "NextStepId": "Bye"
  9.     },
  10.     {
  11.       "Id": "Bye",
  12.       "StepType": "ZL.WorflowCoreDemo.Basic.Steps.GoodbyeWorld,ZL.WorflowCoreDemo"
  13.       
  14.     }
  15.   ]
  16. }
复制代码
步骤“Hello”执行完成后,执行"Bye",“Bye”执行完又回到“Hello”,如此循环。但在Fluent API中就没有这么方便,必须使用循环或者其它的方式。而这种跳转方式在实际应用中非常常见,最常见的场景就是审批流程中的提交/驳回,提交-驳回过程可以形成多次循环,这种流程模式,采用带有步骤标记的跳转很容易实现。
流程数据类的局限性

流程相关的数据类和流程步骤中的属性在理论上是没有限制的,我们可以使用复杂的数据类型,比如Dictionary或者具有复杂层次的数据类,但在研究中我们发现由于JSON DSL定义的限制,我们无法实现复杂数据结构的数据传递。使用Fluent API定义的流程中,可以使用Lamdba 表达式,但在JSON DSL中没找到更好的方法。
下面的代码展示通过Lamdba表达式实现两个Dictionary之间的数据传递,但在DSL中没有对应的方式:
  1. private static IServiceProvider ConfigureServices()
  2.         {
  3.             //setup dependency injection
  4.             IServiceCollection services = new ServiceCollection();
  5.             services.AddLogging();
  6.             services.AddWorkflow();
  7.             //这是新增加的服务
  8.             services.AddWorkflowDSL();
  9.             var serviceProvider = services.BuildServiceProvider();
  10.             return serviceProvider;
  11.         }
复制代码
而在实际应用中,我们需要使用流程定义文件而不是写死的代码来定义流程,这样在流程修改时,就不需要修改代码和重新编译部署。这个限制是WorkflowCore在实际项目中落地的一个主要障碍。
工作流持久化与恢复

WorkflowCore提供了几乎针对流行数据库的各种持久化方式,支持SqlServer、Sqlite等关系数据库,也支持MongoDb、Redis等非关系数据库。缺省使用的是在内存中保存流程数据,但在实际应用中,必须将流程数据持久化以保证系统的可靠性。当系统因为计划内或者意外原因出现异常后,正在执行的流程应该能够在断点处恢复并继续执行。我们改造一下第一部分的例子,增加持久化设置,并模拟流程中断和恢复过程。
首先,我们需要使用NuGet引入SqlServer持久化Provider:WorkflowCore.Persistence.SqlServer,当然也可以使用其它类型的数据存储。
然后,修改ConfigureServices,将services.AddWorkflow()修改为:
  1. static void Main(string[] args)
  2.         {
  3.             IServiceProvider serviceProvider = ConfigureServices();
  4.             var loader = serviceProvider.GetService<IDefinitionLoader>();
  5.             var json = System.IO.File.ReadAllText("myflow.json");
  6.             loader.LoadDefinition(json, Deserializers.Json);
  7.             var host = serviceProvider.GetService<IWorkflowHost>();
  8.             host.Start();
  9.             host.StartWorkflow("HelloWorld", 1, null);
  10.             
  11.             Console.ReadLine();
  12.             host.Stop();
  13.         }
复制代码
最后修改一下执行代码,增加流程Id输入和恢复代码:
  1. public class HelloWorldWorkflow : IWorkflow
  2. {
  3.     public string Id => "HelloWorld";
  4.     public int Version => 1;
  5.     public void Build(IWorkflowBuilder<object> builder)
  6.     {
  7.         builder
  8.             .StartWith(context =>
  9.             {
  10.                 Console.WriteLine("你好");
  11.                 return ExecutionResult.Next();
  12.             })
  13.             .Then(context =>
  14.             {
  15.                 Console.WriteLine("再见");
  16.                 return ExecutionResult.Next();
  17.             });
  18.     }
  19. }
复制代码
下面,我们模拟中断-恢复过程。首先,运行程序,不输入流程id,直接按回车,会生成新的流程,并输出流程Id,拷贝这个流程ID,并退出程序:

再次执行程序,输入或粘贴上一次生成的流程编号,可以继续执行流程:

单元测试

我们已经创建简单的工作流,并可以在控制台环境运行,现在我们可以为工作流创建简单的单元测试,这里我们使用xUnit作为测试框架。
在ZL.WorkflowCoreDemo解决方案中增加一个xUnit测试项目,命名为ZL.WorkflowCoreDemo.Test,创建好的项目中已经包含xunit和xunit.runner.visualstudio。我们还需要使用NuGet引入其它的框架,首先要引入FluentAssertions,这个框架结合xUnit,可以让 我们在测试中使用Should断言。还需要引入WorkflowCore和WorkflowCore.Testing以及我们需要测试的项目。这里我们测试最简单的HelloWorldWorkflow。
接下来编写测试代码,测试类需要继承WorkflowTest,由于HelloWorldWorkflow没有相关的数据类,我们使用dynamic代替,类的定义如下:
  1. {
  2.   "Id": "HelloWorld",
  3.   "Version": 1,
  4.   "Steps": [
  5.     {
  6.       "Id": "Hello",
  7.       "StepType": "ZL.WorflowCoreDemo.Basic.Steps.HelloWorld,ZL.WorflowCoreDemo",
  8.       "NextStepId": "Bye"
  9.     },
  10.     {
  11.       "Id": "Bye",
  12.       "StepType": "ZL.WorflowCoreDemo.Basic.Steps.GoodbyeWorld,ZL.WorflowCoreDemo"
  13.       "NextStepId": "Hello"
  14.     }
  15.   ]
  16. }
复制代码
需要注意的是在测试类的构造函数中调用Setup(),用来初始化流程引擎。
现在我们可以在测试资源管理器中运行测试项目,如果一切顺利的化,结果是这样的:

但有时候理想和现实总是有些差距,我在执行时遇到了如下的异常:

通过研究发现我引用的WorkflowCore是最新的3.1.2版本,而WorkflowCore.Testing的版本是2.2,应该是版本不一致造成的问题,WorkflowCore和WorkflowCore.Testing的更新不同步。这时,开源项目的好处就体现出来了,通过查看代码,改写测试类如下:
  1.                     .Output((step, data)=> {
  2.                         var dic = step.EventData as Dictionary<string, object>;
  3.                         foreach (var key in dic.Keys)
  4.                         {
  5.                             if (data.MyDic.ContainsKey(key)) data.MyDic[key] = dic[key];
  6.                             else data.MyDic.Add(key, dic[key]);
  7.                         }
复制代码
再次运行,测试通过了。
Activity Workers

前面提到了使用WaitFor暂停工作流,等待人工输入后发布事件重新激活流程,今天介绍另一种方式,使用WorkflowCore的Activity,它的作用就是等待数据输入,数据输入完成后,工作流继续执行。下面是简单的例子:
  1. services.AddWorkflow(x => x.UseSqlServer(@"Server=.;Database=WorkflowCore;Trusted_Connection=True;", true, true));
复制代码
这个例子很简单,使用了我们前面定义的两个步骤,HelloWithName和GoodbyeWithName,Activity在这里就是接收外部输入的Name。流程的运行代码如下:
  1. IServiceProvider serviceProvider = ConfigureServices();
  2.             var host = serviceProvider.GetService<IWorkflowHost>();
  3.             
  4.             host.RegisterWorkflow<HelloWithNameWorkflowDynamic, Dictionary<string,string>>();
  5.             host.Start();
  6.             var initialData = new Dictionary<string,string>();
  7.             Console.WriteLine("请输入需要恢复的流程编号,如执行新流程直接回车:");
  8.             string workflowId = Console.ReadLine();
  9.             
  10.             if (string.IsNullOrEmpty(workflowId))
  11.             {
  12.                 workflowId = host.StartWorkflow("HelloWithNameWorkflowDynamic", 1, initialData).Result;
  13.                 Console.WriteLine(workflowId);
  14.             }
  15.             else
  16.             {
  17.                 host.ResumeWorkflow(workflowId);
  18.             }
  19.               
  20.             
  21.             Console.WriteLine("输入名字");
  22.             string value = Console.ReadLine();
  23.             using System;
  24. using Xunit;
  25. using WorkflowCore.Testing;
  26. using ZL.WorflowCoreDemo.Basic;
  27. using WorkflowCore.Models;
  28. using System.Threading;
  29. using FluentAssertions;
  30. namespace ZL.WorkflowCoreDemo.Test
  31. {
  32.     public class DemoUnitTest:WorkflowTest<HelloWorldWorkflow,dynamic>
  33.     {
  34.         public DemoUnitTest()
  35.         {
  36.             Setup();
  37.         }
  38.         [Fact]
  39.         public void Test1()
  40.         {
  41.             dynamic data = new { };
  42.             var workflowId = StartWorkflow(data);
  43.             WaitForWorkflowToComplete(workflowId, TimeSpan.FromSeconds(30));
  44.             WorkflowStatus status = GetStatus(workflowId);
  45.             status.Should().Be(WorkflowStatus.Complete);
  46.             UnhandledStepErrors.Count.Should().Be(0);
  47.            
  48.         }
  49.         protected new WorkflowStatus GetStatus(string workflowId)
  50.         {
  51.             var instance = PersistenceProvider.GetWorkflowInstance(workflowId).Result;
  52.             return instance.Status;
  53.         }
  54.         protected new void WaitForWorkflowToComplete(string workflowId, TimeSpan timeOut)
  55.         {
  56.             var status = GetStatus(workflowId);
  57.             var counter = 0;
  58.             while ((status == WorkflowStatus.Runnable) && (counter < (timeOut.TotalMilliseconds / 100)))
  59.             {
  60.                 Thread.Sleep(100);
  61.                 counter++;
  62.                 status = GetStatus(workflowId);
  63.             }
  64.         }
  65.     }
  66. }
复制代码
工作流启动后,需要通过host.GetPendingActivity获取Activity,获取成功,就从外部获取数据,然后使用host.SubmitActivitySuccess提交数据。
WaitFor vs Activity

使用WorkflowCore获取外部数据时,有两种方法可以让流程等待外部数据,一是使用WaitFor注册一个事件,外部数据输入完成后,通过PublishEvent返回流程;另一种是使用Activity,注册一个人工活动,执行到这个活动时,工作流等待,外部代码通过GetPendingActivity获取相应的Activity,通过SubmitActivitySuccess提交数据。看起来两种都可以完成外部数据输入的任务,但实际中发现GetPendingActivity无法获取是哪一个工作流实例的活动,如果有两个实例同时运行,就没有办法分清除向哪个流程提报数据:
  1. using System;
  2. using Xunit;
  3. using WorkflowCore.Testing;
  4. using ZL.WorflowCoreDemo.Basic;
  5. using WorkflowCore.Models;
  6. using System.Threading;
  7. using FluentAssertions;
  8. namespace ZL.WorkflowCoreDemo.Test
  9. {
  10.     public class DemoUnitTest:WorkflowTest<HelloWorldWorkflow,dynamic>
  11.     {
  12.         public DemoUnitTest()
  13.         {
  14.             Setup();
  15.         }
  16.         [Fact]
  17.         public void Test1()
  18.         {
  19.             dynamic data = new { };
  20.             var workflowId = StartWorkflow(data);
  21.             WaitForWorkflowToComplete(workflowId, TimeSpan.FromSeconds(30));
  22.             WorkflowStatus status = GetStatus(workflowId);
  23.             status.Should().Be(WorkflowStatus.Complete);
  24.             UnhandledStepErrors.Count.Should().Be(0);
  25.            
  26.         }
  27.     }
  28. }
复制代码
WairFor事件发布时有工作流实例ID传入:
  1. using System;
  2. using Xunit;
  3. using WorkflowCore.Testing;
  4. using ZL.WorflowCoreDemo.Basic;
  5. using WorkflowCore.Models;
  6. using System.Threading;
  7. using FluentAssertions;
  8. namespace ZL.WorkflowCoreDemo.Test
  9. {
  10.     public class DemoUnitTest:WorkflowTest<HelloWorldWorkflow,dynamic>
  11.     {
  12.         public DemoUnitTest()
  13.         {
  14.             Setup();
  15.         }
  16.         [Fact]
  17.         public void Test1()
  18.         {
  19.             dynamic data = new { };
  20.             var workflowId = StartWorkflow(data);
  21.             WaitForWorkflowToComplete(workflowId, TimeSpan.FromSeconds(30));
  22.             WorkflowStatus status = GetStatus(workflowId);
  23.             status.Should().Be(WorkflowStatus.Complete);
  24.             UnhandledStepErrors.Count.Should().Be(0);
  25.            
  26.         }
  27.         protected new WorkflowStatus GetStatus(string workflowId)
  28.         {
  29.             var instance = PersistenceProvider.GetWorkflowInstance(workflowId).Result;
  30.             return instance.Status;
  31.         }
  32.         protected new void WaitForWorkflowToComplete(string workflowId, TimeSpan timeOut)
  33.         {
  34.             var status = GetStatus(workflowId);
  35.             var counter = 0;
  36.             while ((status == WorkflowStatus.Runnable) && (counter < (timeOut.TotalMilliseconds / 100)))
  37.             {
  38.                 Thread.Sleep(100);
  39.                 counter++;
  40.                 status = GetStatus(workflowId);
  41.             }
  42.         }
  43.     }
  44. }
复制代码
没有上面的缺陷。
使用ForEach并行执行多个流程

如果需要同时执行多个过程相同的而输入不同的流程,可以使用ForEach控制语句,一定要注意,这里的ForEach不是循环,不是一个流程执行完再执行另一个流程,我们仍然使用前面定义的简单的步骤来组织ForEach示例流程,代码如下:
  1. using WorkflowCore.Interface;
  2. using ZL.WorflowCoreDemo.InputDataToStep;
  3. using ZL.WorflowCoreDemo.InputDataToStep.Steps;
  4. namespace ZL.WorflowCoreDemo.ActivityWorker
  5. {
  6.     public class MyActivityWorkflow : IWorkflow<MyNameClass>
  7.     {
  8.         public string Id => "MyActivityWorkflow";
  9.         public int Version => 1;
  10.         public void Build(IWorkflowBuilder<MyNameClass> builder)
  11.         {
  12.             builder
  13.                 .StartWith<HelloWithName>().Input(data => data.Name, step => step.MyName)
  14.                     .Activity("activity-1", (data) => data.MyName)
  15.                         .Output(data => data.MyName, step => step.Result)
  16.                     .Then<GoodbyeWithName>()
  17.                         .Input(step => step.Name, data => data.MyName)
  18.                     .Activity("activity-2", (data) => data.MyName)
  19.                         .Output(data => data.MyName, step => step.Result)
  20.                      .Then<HelloWithName>().Input(step => step.Name, data => data.MyName)
  21.                     .Then<GoodbyeWithName>()
  22.                         .Input(step => step.Name, data => data.MyName);
  23.         }
  24.     }
  25. }
复制代码
在这个例子里,我们没有定义相关的数据类,需要输入的人名作为ForEach中的循环变量,这些变量保存在context中,输入到相应的环节中。执行代码如下:
  1. IServiceProvider serviceProvider = ConfigureServices();
  2.             var host = serviceProvider.GetService<IWorkflowHost>();
  3.             host.RegisterWorkflow<MyActivityWorkflow, MyNameClass>();
  4.             host.Start();
  5.             var myClass = new MyNameClass { MyName = "张三" };
  6.             host.StartWorkflow("MyActivityWorkflow", 1, myClass);
  7.             var activity = host.GetPendingActivity("activity-1", "worker1", TimeSpan.FromMinutes(1)).Result;
  8.             if (activity != null)
  9.             {
  10.                 Console.WriteLine("输入名字");
  11.                 string value = Console.ReadLine();
  12.                 host.SubmitActivitySuccess(activity.Token, value);
  13.             }
  14.             activity = host.GetPendingActivity("activity-2", "worker2", TimeSpan.FromMinutes(1)).Result;
  15.             if (activity != null)
  16.             {
  17.                 Console.WriteLine("输入名字");
  18.                 string value = Console.ReadLine();
  19.                 host.SubmitActivitySuccess(activity.Token, value);
  20.             }
  21.             Console.ReadLine();
  22.             host.Stop();
复制代码
Parallel并行执行多个流程

前面我们提到了使用ForEach执行并行流程,这些流程的执行过程相同,不同的只是输入的参数。如果需要并行执行多个不同的流程,需要使用Parallel,示例代码如下:
  1.             var id1=host.StartWorkflow("MyActivityWorkflow", 1, myClass).Result;
  2.             var id2 = host.StartWorkflow("MyActivityWorkflow", 1, myClass).Result;
  3.              //上面两个实例中有相同的activity-1,无法知道这里获取的是哪一个实例的活动,         
  4.             var activity = host.GetPendingActivity("activity-1", "worker1", TimeSpan.FromMinutes(1)).Result;
复制代码
为了说明分支语句的构成,这个流程没有使用关联的数据类,也没有使用类定义步骤,全部使用Lambda表达式。Parallel的结构是分支的开始是Parallel(),结束是Join(),每个分支在Do语句中表示。流程的运行代码如下:
  1. host.PublishEvent("MyEvent", workflowId, value);
复制代码
While循环

While循环会重复执行某些步骤,直到条件得到满足再继续执行下面的流程。使用While循环可以实现审批流程中的“提交/驳回”,如果审批没有通过,驳回重新输入,直到审批通过或者驳回次数到达上限。这里举一个简单的例子说明使用方法,结合前面提到的Activity,可以实现对输入进行判断,如果输入不满足要求,就重新输入。流程定义如下:
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Text;
  4. using WorkflowCore.Interface;
  5. using WorkflowCore.Models;
  6. using ZL.WorflowCoreDemo.InputDataToStep.Steps;
  7. namespace ZL.WorflowCoreDemo.Paralle
  8. {
  9.     public class ParalleWorkflow : IWorkflow
  10.     {
  11.         public string Id => "ParalleWorkflow";
  12.         public int Version => 1;
  13.         public void Build(IWorkflowBuilder<object> builder)
  14.         {
  15.             builder
  16.             .StartWith(context => { Console.WriteLine("开始"); ExecutionResult.Next(); })
  17.             .ForEach(data => new List<string>() { "张三", "李四", "王五", "赵六" })
  18.                 .Do(x => x
  19.                     .StartWith<HelloWithName>()
  20.                         .Input(step => step.Name, (data, context) => context.Item as string)
  21.                     .Then<GoodbyeWithName>()
  22.                         .Input(step => step.Name, (data, context) => context.Item as string)
  23.                     )
  24.             .Then(context => { Console.WriteLine("结束"); ExecutionResult.Next(); });
  25.         }
  26.     }
  27. }
复制代码
流程运行的代码如下:
  1.             IServiceProvider serviceProvider = ConfigureServices();
  2.             var host = serviceProvider.GetService<IWorkflowHost>();
  3.             host.RegisterWorkflow<ParalleWorkflow>();
  4.             host.Start();
  5.             host.StartWorkflow("ParalleWorkflow", 1, null);
  6.             Console.ReadLine();
  7.             host.Stop();
复制代码
If判断

If判断比较简单,根据流程关联的数据对象中的值进行判断,如果条件满足执行相应的分支。需要注意的是没有else相关语句,如果需要实现相关逻辑,需要再次进行一次条件相反的判断。下面是简单的例子,仍然使用前面定义的数据类和步骤,输入采用Activity:
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Text;
  4. using WorkflowCore.Interface;
  5. using WorkflowCore.Models;
  6. using ZL.WorflowCoreDemo.InputDataToStep;
  7. namespace ZL.WorflowCoreDemo.Paralle
  8. {
  9.     public class ParallePathWorkflow : IWorkflow
  10.     {
  11.         public string Id => "ParallePathWorkflow";
  12.         public int Version => 1;
  13.         public void Build(IWorkflowBuilder<object> builder)
  14.         {
  15.             builder
  16.             .StartWith(context => { Console.WriteLine("开始"); ExecutionResult.Next(); })
  17.             .Parallel()
  18.                 .Do(then =>
  19.                     then.StartWith(context=>{ Console.WriteLine("分支一开始"); ExecutionResult.Next(); })
  20.                         .Then(context => { Console.WriteLine("分支一结束"); ExecutionResult.Next(); }))
  21.                 .Do(then =>
  22.                     then.StartWith(context => { Console.WriteLine("分支二开始"); ExecutionResult.Next(); })
  23.                         .Then(context => { Console.WriteLine("分支二结束"); ExecutionResult.Next(); }))
  24.                 .Do(then =>
  25.                     then.StartWith(context => { Console.WriteLine("分支二开始"); ExecutionResult.Next(); })
  26.                         .Then(context => { Console.WriteLine("分支二结束"); ExecutionResult.Next(); }))
  27.             .Join()
  28.             .Then(context => { Console.WriteLine("结束"); ExecutionResult.Next(); });
  29.         }
  30.     }
  31. }
复制代码
流程的运行代码如下:
  1. IServiceProvider serviceProvider = ConfigureServices();
  2.             var host = serviceProvider.GetService<IWorkflowHost>();
  3.             host.RegisterWorkflow<ParallePathWorkflow>();
  4.             host.Start();
  5.             host.StartWorkflow("ParallePathWorkflow", 1, null);
  6.             Console.ReadLine();
  7.             host.Stop();
复制代码
条件分支Decision Branches

Decision Branches有点类似于switch语句,可以为每个条件创建一个分支,这些分支相对独立,根据不同的条件选择执行。如果使用Fluent API,可以使用CreateBranch方法创建分支,然后在流程中使用分支。为了说明问题,我们改造前面的If流程,使用Decision Branches实现相同的功能,流程定义的代码如下:
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Text;
  4. using WorkflowCore.Interface;
  5. using WorkflowCore.Models;
  6. using ZL.WorflowCoreDemo.InputDataToStep;
  7. using ZL.WorflowCoreDemo.InputDataToStep.Steps;
  8. namespace ZL.WorflowCoreDemo.ControlStructures
  9. {
  10.     public class WhileWorkflow : IWorkflow<MyNameClass>
  11.     {
  12.         public string Id => "WhileWorkflow";
  13.         public int Version => 1;
  14.         public void Build(IWorkflowBuilder<MyNameClass> builder)
  15.         {
  16.             builder
  17.                 .StartWith<HelloWithName>()
  18.                     .Input(step => step.Name, data => data.MyName)
  19.                 .While(data => data.MyName.Length < 3)
  20.                     .Do(x => x
  21.                         .StartWith(context=> { Console.WriteLine("输入小于3个字符"); ExecutionResult.Next(); })
  22.                         .Activity("activity-1", (data) => data.MyName)
  23.                         .Output(data => data.MyName, step => step.Result))
  24.                 .Then<GoodbyeWithName>()
  25.                    .Input(step => step.Name, data => data.MyName);
  26.         }
  27.     }
  28. }
复制代码
流程执行定义的代码如下:
  1.             IServiceProvider serviceProvider = ConfigureServices();
  2.             var host = serviceProvider.GetService<IWorkflowHost>();
  3.             host.RegisterWorkflow<WhileWorkflow, MyNameClass>();
  4.             host.Start();
  5.             var myClass = new MyNameClass { MyName = "张三" };
  6.             host.StartWorkflow("WhileWorkflow", 1, myClass);
  7.             var activity = host.GetPendingActivity("activity-1", "worker1", TimeSpan.FromMinutes(1)).Result;
  8.             
  9.             while (activity != null)
  10.             {
  11.                 Console.WriteLine("输入大于3个字符的名字结束,小于3个字符的名字继续");
  12.                 string value = Console.ReadLine();
  13.                 host.SubmitActivitySuccess(activity.Token, value);
  14.                 activity = host.GetPendingActivity("activity-1", "worker1", TimeSpan.FromMinutes(1)).Result;
  15.             }
  16.                         
  17.             Console.ReadLine();
  18.             host.Stop();
复制代码
使用Schedule执行定时任务

WorkflowCore 提供了定时执行后台任务的功能,使用Schedule可以定义异步执行的任务,在工作流的后台执行。示例代码如下:
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Text;
  4. using WorkflowCore.Interface;
  5. using WorkflowCore.Models;
  6. using ZL.WorflowCoreDemo.InputDataToStep;
  7. using ZL.WorflowCoreDemo.InputDataToStep.Steps;
  8. namespace ZL.WorflowCoreDemo.ControlStructures
  9. {
  10.     public class IfWorkflow : IWorkflow<MyNameClass>
  11.     {
  12.         public string Id => "IfWorkflow";
  13.         public int Version => 1;
  14.         public void Build(IWorkflowBuilder<MyNameClass> builder)
  15.         {
  16.             builder
  17.                 .StartWith(context=> ExecutionResult.Next())
  18.                 .Activity("activity-1", (data) => data.MyName)
  19.                         .Output(data => data.MyName, step => step.Result)   
  20.                 .If(data => data.MyName.Length < 3)
  21.                     .Do(then=>then
  22.                         .StartWith(context => { Console.WriteLine("输入小于3个字符"); ExecutionResult.Next(); }))
  23.                 .If(data => data.MyName.Length >= 3)
  24.                     .Do(then => then
  25.                         .StartWith(context => { Console.WriteLine("输入大于等于3个字符"); ExecutionResult.Next(); }))
  26.                 .Then<GoodbyeWithName>()
  27.                    .Input(step => step.Name, data => data.MyName);
  28.         }
  29.     }
  30. }
复制代码
在上面的代码中,工作流开始后,定义了一个Schedule,这个任务在延时5秒后,启动一个后台流程。流程的执行代码如下:
  1.             IServiceProvider serviceProvider = ConfigureServices();
  2.             var host = serviceProvider.GetService<IWorkflowHost>();
  3.             host.RegisterWorkflow<IfWorkflow, MyNameClass>();
  4.             host.Start();
  5.             var myClass = new MyNameClass { MyName = "张三" };
  6.             host.StartWorkflow("IfWorkflow", 1, myClass);
  7.             var activity = host.GetPendingActivity("activity-1", "worker1", TimeSpan.FromMinutes(1)).Result;
  8.             if (activity != null)
  9.             {
  10.                 Console.WriteLine("输入名字");
  11.                 string value = Console.ReadLine();
  12.                 host.SubmitActivitySuccess(activity.Token, value);
  13.                
  14.             }
  15.             Console.ReadLine();
  16.             host.Stop();
复制代码
流程的执行代码与前面的例子基本类似,执行结果如下:

执行时,前台任务完成5秒后,后台工作才执行。
使用Recur执行重复的后台任务

前面介绍的Schedule可以启动一个后台的定时任务,这个任务只执行一次。如果需要执行多次固定间隔的任务,可以使用Recur,当条件满足时任务不再执行。Recur的定义与Schedule类似,只是多了条件判断输入,流程定义的代码如下:
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Text;
  4. using WorkflowCore.Interface;
  5. using WorkflowCore.Models;
  6. using ZL.WorflowCoreDemo.InputDataToStep;
  7. using ZL.WorflowCoreDemo.InputDataToStep.Steps;
  8. namespace ZL.WorflowCoreDemo.ControlStructures
  9. {
  10.     public class DecisionWorkflow : IWorkflow<MyNameClass>
  11.     {
  12.         public string Id => "DecisionWorkflow";
  13.         public int Version => 1;
  14.         public void Build(IWorkflowBuilder<MyNameClass> builder)
  15.         {
  16.             var branch1 = builder.CreateBranch()
  17.                 .StartWith(context => { Console.WriteLine("输入小于3个字符"); ExecutionResult.Next(); });
  18.             var branch2 = builder.CreateBranch()
  19.                 .StartWith(context => { Console.WriteLine("输入大于等于3个字符"); ExecutionResult.Next(); });
  20.             builder
  21.                 .StartWith(context => ExecutionResult.Next())
  22.                 .Activity("activity-1", (data) => data.MyName)
  23.                         .Output(data => data.MyName, step => step.Result)
  24.                 .Decide(data => data.MyName.Length)
  25.                      .Branch((data, outcome) => data.MyName.Length<3, branch1)
  26.                      .Branch((data, outcome) => data.MyName.Length >= 3, branch2)
  27.                 .Then<GoodbyeWithName>()
  28.                    .Input(step => step.Name, data => data.MyName);
  29.         }
  30.     }
  31. }
复制代码
这流程稍微复杂一点,我们增加了使用Activity的输入,目的是看一下前台的输入等待是否会影响后台的进程运行,还有就是前台输入的数据,能否正确传递到后台,流程的运行代码如下:
  1.             IServiceProvider serviceProvider = ConfigureServices();
  2.             var host = serviceProvider.GetService<IWorkflowHost>();
  3.             host.RegisterWorkflow<DecisionWorkflow, MyNameClass>();
  4.             host.Start();
  5.             var myClass = new MyNameClass { MyName = "张三" };
  6.             host.StartWorkflow("DecisionWorkflow", 1, myClass);
  7.             var activity = host.GetPendingActivity("activity-1", "worker1", TimeSpan.FromMinutes(1)).Result;
  8.             if (activity != null)
  9.             {
  10.                 Console.WriteLine("输入名字");
  11.                 string value = Console.ReadLine();
  12.                 host.SubmitActivitySuccess(activity.Token, value);
  13.                
  14.             }
  15.             Console.ReadLine();
  16.             host.Stop();
复制代码
运行效果如下:

可以看出,前台需要的输入等待并没有影响后台的执行,我们输入一个新名字后:

集成Elasticsearch

WorkflowCore 自身的查询功能很弱,不过它提供了Elasticsearch的plugin,可以使用Elasticsearch对流程进行索引和查询。不太方便的地方是必须要安装Elasticsearch。这里先简单介绍一下Elasticsearch,它是基于Lucene的搜索服务器,提供了分布式多用户的全文检索引擎,基于RESTful web接口。网上关于Elasticsearch的资料很多,可以自行搜索。
如果希望使用Elasticsearch索引工作流,需要在项目中安装WorkflowCore.Providers.Elasticsearch,使用NuGet安装这个插件,然后在services中进行设置:
  1. using System;
  2. using WorkflowCore.Interface;
  3. namespace ZL.WorflowCoreDemo.ControlStructures
  4. {
  5.     public class ScheduleWorkflow : IWorkflow
  6.     {
  7.         public string Id => "ScheduleWorkflow";
  8.         public int Version => 1;
  9.         public void Build(IWorkflowBuilder<object> builder)
  10.         {
  11.             builder
  12.                 .StartWith(context => Console.WriteLine("开始"))
  13.                     .Schedule(data => TimeSpan.FromSeconds(5)).Do(schedule => schedule
  14.                     .StartWith(context => Console.WriteLine("后台工作")))
  15.                 .Then(context => Console.WriteLine("前台工作"));
  16.         }
  17.     }
  18. }
复制代码
在代码中,通过依赖注入引入ISearchIndex,使用Search方法进行搜索:
  1.            IServiceProvider serviceProvider = ConfigureServices();
  2.             var host = serviceProvider.GetService<IWorkflowHost>();
  3.             host.RegisterWorkflow<ScheduleWorkflow>();
  4.             host.Start();
  5.             
  6.             var workflowId = host.StartWorkflow("ScheduleWorkflow", 1, null).Result;
  7.             Console.ReadLine();
  8.             host.Stop();
复制代码
检索的范围包括流程的定义、描述、状态等。如果流程相关的自定义数据类需要检索,数据类需要实现ISearchable接口。
异常处理

WorkflowCore启动的流程多线程的方式运行,如果流程中出现的异常不会抛出到主程序,很多情况下感觉流程莫名奇妙地结束了。为了避免这种情况,需要显示地声明流程步骤的异常处理。如果使用Fluent API定义流程,可以在流程后附加OnError处理异常,但我们更希望对异常进行集中处理和记录,这时可以使用WorkflowHost服务的OnStepError事件。定义如下:
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Text;
  4. using WorkflowCore.Interface;
  5. using ZL.WorflowCoreDemo.InputDataToStep;
  6. using ZL.WorflowCoreDemo.InputDataToStep.Steps;
  7. namespace ZL.WorflowCoreDemo.ControlStructures
  8. {
  9.     public class RecurWorkflow : IWorkflow<MyNameClass>
  10.     {
  11.         public string Id => "RecurWorkflow";
  12.         public int Version => 1;
  13.         public void Build(IWorkflowBuilder<MyNameClass> builder)
  14.         {
  15.             builder
  16.                 .StartWith(context => Console.WriteLine("开始"))
  17.                     .Recur(data => TimeSpan.FromSeconds(5),data=>data.MyName.Length>5).Do(recur => recur
  18.                     .StartWith<HelloWithName>()
  19.                     .Input(step => step.Name, data => data.MyName))
  20.                 .Then(context => Console.WriteLine("前台工作"))
  21.                 .Activity("activity-1", (data) => data.MyName)
  22.                         .Output(data => data.MyName, step => step.Result);
  23.         }
  24.     }
  25. }
复制代码
异常处理代码可以写在Host_OnStepError中:
  1.             IServiceProvider serviceProvider = ConfigureServices();
  2.             var host = serviceProvider.GetService<IWorkflowHost>();
  3.             host.RegisterWorkflow<RecurWorkflow,MyNameClass>();
  4.             host.Start();
  5.             var myClass = new MyNameClass { MyName = "张三" };
  6.             var workflowId = host.StartWorkflow("RecurWorkflow", 1, myClass).Result;
  7.             var activity = host.GetPendingActivity("activity-1", "worker1", TimeSpan.FromMinutes(1)).Result;
  8.             if (activity != null)
  9.             {
  10.                 Console.WriteLine("输入名字");
  11.                 string value = Console.ReadLine();
  12.                 host.SubmitActivitySuccess(activity.Token, value);
  13.             }
  14.             Console.ReadLine();
  15.             host.Stop();
复制代码
实际使用中的问题

到这里,我们介绍了WorkflowCore的使用,下面谈一下这个项目在实际使用时遇到一些问题。

  • 轻量级,部署和使用都很简单。项目本身满足这个条件,但对流程相关的查询功能很弱,如果需要增强,需要Elasticsearch的支持。部署和使用Elasticsearch带来了额外的工作量。
  • WorkflowCore支持使用JSON格式定义工作流,然而从功能上要弱于使用Fluent API定义的工作流,因为不具备解析Lambda表达式的能力
  • 参数传递功能相对较弱,无法传递复杂对象。
    上述问题是我们在实际中遇到的,希望对大家有所帮助。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

杀鸡焉用牛刀

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表