A message driven distributed asynchronous workflow framework. 消息驱动的分布式异步工作流程处理框架。
基于消息驱动的分布式异步工作流程处理框架,使用 SourceGenerator
简化开发中的重复工作。
- 典型的消息驱动处理流程
在典型的消息驱动处理流程中,阶段的开始消息
与结束消息
、各个消息的触发都需要手动定义,这些多数属于重复工作,FluentWorkflow
是为了减少这些重复劳动而诞生的
- 已在包
FluentWorkflow.RabbitMQ
启用RabbitMQ.Client
的7.0
版本支持,6.*
版本支持使用包FluentWorkflow.RabbitMQ.Legacy
- 基础代码自动生成,开发时只需要关注业务;
- 跨实例、跨服务工作流程驱动;
- 灵活的子工作流程等待/工作流程嵌套;
- 灵活的拓展性(
partial
/继承); Diagnostic
支持;- 目标框架
net7.0
+; - *针对单个消息类型的Qos;
- 更新包时应当
尽可能
的全链路更新
,避免导致的未知问题; WorkflowContext
核心为字符串字典
其属性在赋值时
进行序列化存放
,对象后续的修改不会
反应到上下文中;Workflow
中重写各个阶段的触发事件
方法时,方法内不能往外抛出异常
,会导致该阶段消息重新进入队列,再次执行;- 默认分发器
FluentWorkflow.RabbitMQ
依赖交换机
和队列
进行消息分发,当存在多套环境
需要隔离
时,确保交换机
和队列
都不相同,否则将会出现消息重复消费; - 默认分发器
FluentWorkflow.RabbitMQ
在绑定信息
(交换机
、队列
)变更时不能完全自动调整,需要人工修正,如手动移除队列错误的交换机绑定
和RoutingKey绑定
,否则将会出现消息重复消费; - 框架暂时没有保证消息可靠性,即在
消息队列中间件
异常的情况下可能会出现流程中断、重复消费等情况;
<ItemGroup>
<PackageReference Include="FluentWorkflow.Core" Version="1.4.0" />
</ItemGroup>
public partial class SampleWorkflow : IWorkflow
{}
- 声明类型为
partial
; - 继承接口
IWorkflow
;
public partial class SampleWorkflow : IWorkflow
{
public SampleWorkflow(SampleWorkflowContext context, IServiceProvider serviceProvider) : base(context, serviceProvider)
{
}
protected override void BuildStages(ISampleWorkflowStageBuilder stageBuilder)
{
stageBuilder.Begin()
.Then("SampleStage1")
.Then("SampleStage2")
.Then("SampleStage3")
.Completion();
}
}
到此一个工作流程就声明完成了,该工作流程名为SampleWorkflow
,包含三个阶段 SampleStage1
-> SampleStage2
-> SampleStage3
- 工作流程在
BuildStages
方法中使用参数stageBuilder
定义,必须链式调用,由Begin()
开始Completion()
结束,使用Then("StageName")
声明每个阶段,声明顺序即为阶段顺序,阶段名称必须满足C#标识符命名规则和约定; - 代码生成器已为工作流程生成了必要的工作代码:
- 工作流程上下文
SampleWorkflowContext
(模板:{WorkflowName}Context
) - 工作流程消息 - 每个阶段的开始完成消息等 (模板:
{WorkflowName}{StageName}(Stage|StageCompleted)Message
) - 阶段处理器基类
SampleWorkflowSampleStage1StageHandlerBase
、SampleWorkflowSampleStage2StageHandlerBase
、SampleWorkflowSampleStage3StageHandlerBase
(模板:{WorkflowName}{StageName}StageHandlerBase
) - *其它相关支撑类型
- 工作流程上下文
// SampleStage2 与 SampleStage3 同理
public class SampleWorkflowSampleStage1StageHandler : SampleWorkflowSampleStage1StageHandlerBase
{
public SampleWorkflowSampleStage1StageHandler(IServiceProvider serviceProvider) : base(serviceProvider)
{
}
protected override Task ProcessAsync(ProcessContext processContext, SampleWorkflowSampleStage1StageMessage stageMessage, CancellationToken cancellationToken)
{
//TODO 阶段业务逻辑
return Task.CompletedTask;
}
}
services.AddFluentWorkflow()
.AddSampleWorkflowScheduler() //添加工作流程调度器
.AddSampleWorkflowResultObserver() //添加结果观察器
.UseInMemoryWorkflowMessageDispatcher(); //配置使用的消息分发器,这里使用基于内存的分发器来示范
services.AddFluentWorkflow()
.AddSampleWorkflowSampleStage1StageHandler<SampleWorkflowSampleStage1StageHandler>() //添加对应阶段的处理器, SampleStage2 与 SampleStage3 同理
.UseInMemoryWorkflowMessageDispatcher(); //配置使用的消息分发器,这里使用基于内存的分发器来示范
FluentWorkflow
正常工作的必要条件:
- 流程中的所有
服务
使用同一套
消息分发器; - 有且仅配置了一个(单个服务,可多实例)工作流程调度器 -
WorkflowScheduler
; - 所有阶段的阶段处理器 -
StageHandler
,各个阶段的阶段处理器有且仅有一个(单个服务,可多实例); - *需要等待
子工作流程
时必须配置子工作流程结果观察器 -ResultObserver
; - *需要单次等待多个
子工作流程
时,必须使用支持等待多个子工作流程
的IWorkflowAwaitProcessor
; (默认实现了基于redis
的多流程等待处理器,配置时使用UseRedisWorkflowAwaitProcessor
方法以启用)
//从DI容器中获取工作流程构建器
var workflowBuilder = ServiceProvider.GetRequiredService<IWorkflowBuilder<SampleWorkflow>>();
//创建工作流程上下文
var context = new SampleWorkflowContext();
//构建工作流程
var workflow = workflowBuilder.Build(context);
//启动工作流程,框架会自动触发各个阶段处理器后完成
await workflow.StartAsync(default);
- 启动工作流程的服务可以不是配置工作流程调度器 -
WorkflowScheduler
的服务,但需要接入消息分发器
并在配置时使用Add****Workflow()
添加对应的工作流程构造器; - 源代码生成器生成的绝大部分类型都是
partial
的,可以声明partial
类进行拓展,不可使用partial
类拓展的功能基本上都可以继承后重写,在配置服务时替换默认实现即可; - 定义的
Workflow
类会添加生命周期各个阶段的触发事件
方法,可以继承后重写
其逻辑以在各个阶段执行相关的逻辑(注意每次触发可能不在同一个服务实例中。重写后应当捕获并处理所有异常,不要抛出); WorkflowContext
核心为字符串字典
,对其修改理论上只对后续可见并在整个执行周期可用,可以将执行参数、结果、中间值等存放其中;- 消息的分发、重试等逻辑由具体使用的消息分发器
IWorkflowMessageDispatcher
控制(默认提供了基于CAP
、Abp
以及基础的FluentWorkflow.RabbitMQ
可选); - 默认情况下
StageHandler
出现异常则认为工作流程失败,不会将异常抛给上层IWorkflowMessageDispatcher
(消息分发的重试不会触发),可以重写StageHandler
的OnException
方法来将异常向上抛出; - 更改既有工作流程时,如果
修改
/删除
了既有的阶段定义,会导致还在处理过程中工作流程无法正常运行(但添加不会影响);
部分功能为源码接入的方式,默认不生成,在项目中指定需要的功能后自动生成
<PropertyGroup>
<FluentWorkflowGeneratorAdditional>AbpFoundation,CAPFoundation,AbpMessageDispatcher,CAPMessageDispatcher,RedisAwaitProcessor</FluentWorkflowGeneratorAdditional>
</PropertyGroup>
名称 | 功能 |
---|---|
AbpFoundation | Abp的基础功能支持 |
CAPFoundation | CAP的基础功能支持 |
AbpMessageDispatcher | Abp的消息分发器 |
CAPMessageDispatcher | CAP的消息分发器 |
RedisAwaitProcessor | 基于StackExchange.Redis 的子流程等待处理器 |
- 生成的可能冲突的类型会放到命名空间
FluentWorkflow.GenericExtension.{工作流程命名空间}
下,如配置拓展方法等;
<ItemGroup>
<PackageReference Include="FluentWorkflow.RabbitMQ" Version="1.4.0" />
</ItemGroup>
services.AddFluentWorkflow()
.UseRabbitMQMessageDispatcher(options =>
{
//配置RabbitMQ
});
配置单个消息的消费速率,其它消息不受限
services.Configure<RabbitMQOptions>(options =>
{
//配置阶段Stage1的消息 - SampleWorkflowSampleStage1StageMessage 的消费速率,即当前服务实例同时只会有一个阶段Stage1在处理
options.Option<SampleWorkflowSampleStage1StageMessage>(static handleOptions =>
{
handleOptions.Qos = 1;
});
});
RabbitMQ消息的消费ack超时时间默认为30分钟,进行长时间处理时可能会出现意外情况,可参照 acknowledgement-timeout 进行调整
- 框架已默认尝试设置队列参数
x-consumer-timeout
为 1 小时(如果RabbitMQ版本支持的话); - 可使用
RabbitMQOptions.QueueArgumentsSetup
对队列的x-consumer-timeout
参数进行调整;
在阶段处理器中实现子工作流程等待逻辑
internal class SampleWorkflowSampleStage1StageHandler : SampleWorkflowSampleStage1StageHandlerBase
{
public SampleWorkflowSampleStage1StageHandler(IServiceProvider serviceProvider) : base(serviceProvider)
{
}
protected async override Task ProcessAsync(ProcessContext processContext, SampleWorkflowSampleStage1StageMessage stageMessage, CancellationToken cancellationToken)
{
//构建子工作流程
var workflowBuilder = ServiceProvider.GetRequiredService<IWorkflowBuilder<SampleWorkflow>>();
var workflow = workflowBuilder.Build(new SampleWorkflowContext());
//Other workflow setting
//将未启动的子工作流程传递给当前阶段处理上下文,并命名为 - 'taskName'
processContext.AwaitChildWorkflow("taskName", workflow);
// Other logic
//当前阶段将等待子工作流程处理完成后再完成
}
protected override async Task OnAwaitFinishedAsync(SampleWorkflowContext context, IReadOnlyDictionary<string, IWorkflowContext?> childWorkflowContexts, CancellationToken cancellationToken)
{
//从等待的子工作流程上下文字典中取出 - 'taskName'
var workflowContext = (SampleWorkflowContext)childWorkflowContexts["taskName"];
//处理子工作流程结果,如将 workflowContext 内的结果赋值给 context,以便在当前工作流程的后续阶段中使用等
await base.OnAwaitFinishedAsync(context, childWorkflowContexts, cancellationToken);
//当前阶段将完成
}
}
services.AddFluentWorkFlow().EnableDiagnostic();
在 WorkFlow
的 On*StageAsync
和 On*StageCompletedAsync
中不执行参数委托 fireMessage
,则后续流程不再执行
在 WorkFlow
的 On*StageAsync
和 On*StageCompletedAsync
中不执行参数委托 fireMessage
,中止流程,在此基础上调用 SerializeContext
方法将上下文序列化后存放
// 存放 contextData 以用于流程恢复
var contextData = SerializeContext(message.Context);
调用具体 WorkFlow
的静态方法 ResumeAsync
使用挂起的流程数据进行恢复执行
// contextData 为序列化的上下文数据
await XXXXWorkflow.ResumeAsync(contextData, serviceProvider, cancellationToken)
恢复流程将会再次调用序列化上下文时的方法,需要注意,小心再次被挂起
更多信息参见源码内的测试代码