简单的工作流引擎的设计

最近工作中有几个项目因为逻辑太复杂,工作流比较复杂,如果手写工作量太大。而又因为不会用微软自家的工作量引擎,也感觉它的功能太多太复杂,觉得出了问题凭我刚学的C#水平不一定HOLD得住,所以设计并开发了一个简单的工作流引擎。

首先,我把这个工作量引擎定义为一个状态机。为了在工作流中持久的保存数据,不因一次请求没有让工作量执行结束或者系统停止而消失,在每个工作流的实例中持有一个存储空间UserSpace,用来存放数据(键值对)。一个工作流有许多个状态,其中有一个开始状态(Start)和若干个结束状态(Final, Error, Timeout等)。一类工作流包含了多个Task,每个Task有自己的依赖条件(状态依赖,请求参数依赖,工作量存储内容的依赖等等)和结束状态。每次要执行工作流时,要构造一个请求传给工作流的实例,工作量引擎判断出在Tasks中满足依赖的所有Task,根据一定规则(状态依赖,优先级等)挑选出一个Task来执行(如果有的话),执行完这个Task之后把请求重新抛给工作流,循环,知道工作流达到结束状态或者没有Task可执行。如果工作流的状态没有到结束状态的话,下次还可以接收请求并继续执行。

又因为工作流接收请求的时间不确定,这段时间可能发生各种事情,包括系统重启等,所以工作流肯定是不能放内存中的,需要序列化(字节流序列化,XML序列化或者其它方式)存储。每次Task执行后都要更新存储一次。第一次创建工作流之后,如果没有执行任何Task,也要存储一次(应该在创建并初始化工作流设置之后)。为了避免无意义的存储和安全考虑,在存储之前要去除掉工作流的UserSpace中的一些内容,比如user/request/response(自定义,这是我在web应用中用到的),如果有的话。

因为工作流是一个状态机循环,一个请求在被一个Task处理之后又会重新抛入状态机循环,为了避免原本只需要被一次处理的参数被多次使用,甚至造成死循环等后果,最好在Task执行结尾清理掉这些参数(要判断是否request参数和workflow的UserSpace中是否都要删除)。当然,Task也可以把数据存入workflow的UserSpace。

为了封装多个Task中可能的类似的行为,将Task的行为(处理函数还有各种事件回调机制)封装成IOperation接口,在这些方法中都是调用IOperation接口的实现类。

每个Task有一个FinalState属性,表示Task正常完成之后工作流的当前状态会改成的属性。如果Task中没错处理,正常执行完成,则workflow的CurrentState就会改成这个值。但考虑到在业务中有类似审核时拒绝,打回编辑状态,或者跳过某些阶段的情况,在Task中,也可以通过ReDispatch(workflow, request, state)手动控制状态跳转。调用了这个函数后就不会发生自动把workflow当前状态修改成当前task的FinalState的情况了。不过要注意的是,这个函数调用后并不会直接去把request重新抛给工作流,执行其他task,而是会简单处理后立刻返回,后面代码马上就好执行的。

顺便说一句,为了增加可扩展性,也方便在需要的时刻做处理,在工作流和Task的各个执行阶段(BeforeRequest, AfterRequst, BeforeProcessTask, OnFailRequirementsCheck等)都有事件回调,通过覆盖方法或注册处理函数的方式可以在这些时刻调用自定义的代码。

再来说下Task的requirements,包含多种条件,比如对状态的依赖(多个中只要满足一个就通过,后面其他的条件是都要满足才可),request参数依赖(string键),workflow的UserSpace内容的依赖(string键),对request的依赖(一个函数(request) => bool),对workflow的UserSpace的依赖(一个函数(request) => bool)。为了方便代码生成已经避免修改Task的定义,将其中函数形式的依赖的值放在一个类Conditions(自定义)中(这个类的静态方法)

考虑到工作流的使用场景非常可能有很多自定义的情况,所有Workflow,Conditions, Operation(s), WorkflowInitor(在workflow初始化和反序列化之后调用),以及工作流的持久化方式等都做成可扩展的形式,根据具体项目需求可以通过继承类或接口来改变实现方式,在workflow的持久化上,使用了策略模式,通过定义新策略可以定义新的序列号和反序列化的方式,比如我们项目中就采用了sql存储(本工作流系统中内置了内存和redis的支持),并且sql存储的表结构也可以自定义。

描述一下工作流的使用场景。 比如在创建一条记录的时候,先创建一个工作流的实例,构造一个请求request,把各种来源的需要的参数(比如HTTP请求参数,session内容,当前用户等)放入request.Parameters中,然后把这个request传给工作流workflow:

var workflow = new Workflow();
var request = new Request();
request.Parameters.Set("user", user);
// ......
var result = workflow.PostRequest(request);

这样工作流引擎就好自动执行状态机了。 每个工作流实例默认都有一个唯一的ID,但如果有特别需求的话,比如工作流ID需要与在CreateTask中创建的记录的ID有关联的话,可以在这个CreateTask的代码中,手动根据新记录ID修改workflow的ID值,只要确保在工作流第一次持久化之前修改ID就好了(从这个角度讲,要控制工作流的持久化是在真正需要时才持久化,如果工作流创建后会执行task的话,不应该多余的持久化,在本工作流引擎中已经这样做了)。

可以看到,这里有一个返回值result。但在状态机中怎么返回值呢,已经返回哪个task的值呢(如果执行了多个task的话)? 在task中有一个属性result,每次执行前都会清空,在Task的处理代码中可以设置这个result属性,里面存放了返回值,这个返回值会传给工作流并最终返回。

工作流的超时机制很简单,就不说了。

最后,关于工作流和Task的定义,如果每次自己手写代码就不方便了。所以采用了代码生成的方式,通过配置xml文件(本来设计了一套类似ruby的语法了,不过写得时候觉得太麻烦,就先用xml了, 将来可能改掉),生产Workflow和Tasks的代码,里面包含各状态的定义,工作流的定义,采用的配置(持久化策略,WorkflowInitor类,继承的Workflow类等,包含的Task列表),各Task的依赖和FinalState等。 然后我们只需要根据生产的代码在编译时报的错写Conditions, Operations(Task的行为), WorkflowInitor等等就好了。

在初步使用了本工作流引擎后,一个很复杂的多级审核(审核人构成也比较复杂),并且有比较多的打回之前阶段的业务,除XML配置之外,主要代码变得只有两块:创建和审核(并且这个审核只需要写一次通用化的代码,用到多个审核中,甚至可以用到其他类似业务中),开发和维护方便多了。