目录
前文列表
扩展阅读
简介
TaskFlow is a Python library that helps to make task execution easy, consistent and reliable.
A library to do [jobs, tasks, flows] in a highly available, easy to understand and declarative manner (and more!) to be used with OpenStack and other projects.简而言之, TaskFlow 能够控制应用程序中的长流程业务逻辑任务的暂停、重启、恢复以及回滚, 主要用于保证长流程任务执行的可靠性和一致性。
主要应用场景有如 Cinder 的 create volume 这般复杂、冗长、容易失败, 却又要求保持数据与环境一致的业务逻辑.
从 create volume 流程图看, Cinder 在 create_volume.py(cinder/volume/flows/manager/create_volume.py
) 模块中定义了大量的 Tasks class 来组成 TaskFlow:
- OnFailureRescheduleTask
- ExtractVolumeRefTask
- ExtractVolumeSpecTask
- NotifyVolumeActionTask
- CreateVolumeFromSpecTask
- CreateVolumeOnFinishTask
如果在执行任务流的过程中失败了, TaskFlow 的回滚机制能够让程序流和执行环境回滚到初始状态, 并且可以重新开始执行.
总的来说, TaskFlow 非常适合于 面向任务 的应用场景.
基本概念
Atom: An atom is the smallest unit in TaskFlow which acts as the base for other classes
Atom: Atom 是 TaskFlow 的最小单位, 其他的所有类, 包括 Task 类都是 Atom 类的子类.Task: A task (derived from an atom) is a unit of work that can have an execute & rollback sequence associated with it (they are nearly analogous to functions).
Task: task 是拥有执行和回滚功能额最小工作单元. 在 Task 类中开发者能够自定义 execute(执行) 和 revert(回滚) method.Flow: Linear/Unordered/Graph
Flow: 在 TaskFlow 中使用 flow(流) 来关联各个 Task, 并且规定这些 Task 之间的执行和回滚顺序. flow 中不仅能够包含 task 还能够嵌套 flow. 常见类型有以下几种:Linear(
linear_flow.Flow
): 线性流, 该类型 flow 中的 task/flow 按照加入的顺序来依次执行, 按照加入的倒序依次回滚.Unordered(
unordered_flow.Flow
): 无序流, 该类型 flow 中的 task/flow 可能按照任意的顺序来执行和回滚.Graph(
graph_flow.Flow
): 图流, 该类型 flow 中的 task/flow 按照显式指定的依赖关系或通过其间的 provides/requires 属性的隐含依赖关系来执行和回滚.
Retry: A retry (derived from an atom) is a special unit of work that handles errors, controls flow execution and can (for example) retry other atoms with other parameters if needed.
Retry: Retry 是一个控制当错误发生时, 如何进行重试的特殊工作单元, 而且当你需要的时候还能够以其他参数来重试执行别的 Atom 子类. 常见类型:- AlwaysRevert: 错误发生时, 回滚子流
- AlwaysRevertAll: 错误发生时, 回滚所有流
- Times: 错误发生时, 重试子流
- ForEach: 错误发生时, 为子流中的 Atom 提供一个新的值, 然后重试, 直到成功或 retry 中定义的值用完为止.
- ParameterizedForEach: 错误发生时, 从后台存储(由 store 参数提供)中获取重试的值, 然后重试, 直到成功或后台存储中的值用完为止.
Engine: Engines are what really runs your atoms.
Engine: Engines 才是真正运行 Atoms 的对象, 用于 load(载入) 一个 flow, 然后驱动这个 flow 中的 task/flow 开始运行. 我们可以通过 engine_conf 参数来指定不同的 engine 类型. 常见的 engine 类型如下:- serial: 所有的 task 都在调用了 engine.run 的线程中运行.
- parallel: task 可以被调度到不同的线程中运行.
- worker-based: task 可以被调度到不同的 woker 中运行.
实现样例
源码请浏览
#!/usr/bin/env python#filename: tasks.pyimport taskflow.enginesfrom taskflow.patterns import linear_flow as ltfrom taskflow import taskfrom taskflow.types import failure as task_failedclass CallJim(task.Task): default_provides = set(['jim_new_number']) def execute(self, jim_number, *args, **kwargs): print "Calling Jim %s." % jim_number print '=' * 10 jim_new_number = jim_number + 'new' return { 'jim_new_number': jim_new_number} def revert(self, result, *args, **kwargs): if isinstance(result, task_failed.Failure): print "jim result" return None jim_new_number = result['jim_new_number'] print "Calling jim %s and apologizing." % jim_new_numberclass CallJoe(task.Task): default_provides = set(['joe_new_number', 'jim_new_number']) def execute(self, joe_number, jim_new_number, *args, **kwargs): print "Calling jim %s." % jim_new_number print "Calling Joe %s." % joe_number print '=' * 10 joe_new_number = joe_number + 'new' return { 'jim_new_number': jim_new_number, 'joe_new_number': joe_new_number} def revert(self, result, *args, **kwargs): if isinstance(result, task_failed.Failure): print "joe result" return None jim_new_number = result['jim_new_number'] joe_new_number = result['joe_new_number'] print "Calling joe %s and apologizing." % joe_new_numberclass CallJmilkFan(task.Task): default_provides = set(['new_numbers']) def execute(self, jim_new_number, joe_new_number, jmilkfan_number, *args, **kwargs): print "Calling jim %s" % jim_new_number print "Calling joe %s" % joe_new_number print "Calling jmilkfan %s" % jmilkfan_number print '=' * 10 jmilkfan_new_number = jmilkfan_number + 'new' raise ValueError('Error') new_numbers = { 'jim_new_number': jim_new_number, 'joe_new_number': joe_new_number, 'jmilkfan_new_number': jmilkfan_new_number} return { 'new_numbers': new_numbers} def revert(self, result, *args, **kwargs): if isinstance(result, task_failed.Failure): print "jmilkfan result" return None jim_new_number = result['jim_new_number'] joe_new_number = result['joe_new_number'] jmilkfan_new_number = result['jmilkfan_new_number'] print "Calling jmilkfan %s and apologizing." % jmilkfan_new_numberdef get_flow(flow, numbers): flow_name = flow flow_api = lt.Flow(flow_name) flow_api.add(CallJim(), CallJoe(), CallJmilkFan()) return taskflow.engines.load(flow_api, engine_conf={ 'engine': 'serial'}, store=numbers)def main(): numbers = { 'jim_number': '1'*6, 'joe_number': '2'*6, 'jmilkfan_number': '3'*6} try: flow_engine = get_flow(flow='taskflow-demo', numbers=numbers) flow_engine.run() except Exception: print "TaskFlow Failed!" raise new_numbers = flow_engine.storage.fetch('new_numbers')if __name__ == '__main__': main()
Output:
fanguiju@fanguiju:~/project/my-code-repertory/TaskFlow-demo$ python tasks.pyCalling Jim 111111.==========Calling jim 111111new.Calling Joe 222222.==========Calling jim 111111newCalling joe 222222newCalling jmilkfan 333333==========jmilkfan resultCalling joe 222222new and apologizing.Calling jim 111111new and apologizing.TaskFlow Failed!Traceback (most recent call last): File "tasks.py", line 114, inmain() File "tasks.py", line 105, in main flow_engine.run() File "/usr/local/lib/python2.7/dist-packages/taskflow/engines/action_engine/engine.py", line 159, in run for _state in self.run_iter(): File "/usr/local/lib/python2.7/dist-packages/taskflow/engines/action_engine/engine.py", line 223, in run_iter failure.Failure.reraise_if_any(it) File "/usr/local/lib/python2.7/dist-packages/taskflow/types/failure.py", line 292, in reraise_if_any failures[0].reraise() File "/usr/local/lib/python2.7/dist-packages/taskflow/types/failure.py", line 299, in reraise six.reraise(*self._exc_info) File "/usr/local/lib/python2.7/dist-packages/taskflow/engines/action_engine/executor.py", line 82, in _execute_task result = task.execute(**arguments) File "tasks.py", line 66, in execute raise ValueError('Error')ValueError: Error
NOTE 1: 在 function get_flow
中使用 linear_flow.Flow
生成一个 TaskFlow(线性任务流) 对象 flow_api , 再通过flow_api.add
method 添加要 顺序执行且倒序回滚 的 Task class(CallJim/CallJom/CallJmilkFan).
NOTE 2: 使用 taskflow.engines.load
method 来加载 TaskFlow(flow_api)对象/后台存储数据(store)/ engine配置 等信息并生成 Task Engine 对象.
NOTE 3: 最后调用 Task Engine 对象的 flow_engine.run
method 来开始执行该任务流.
NOTE 4: 后台存储 store 的数据在该任务流中被所有 Task class 共享, 并且以 Task class 中的 execute method 的形参作为对接入口. e.g. 上述实现的 store 后台存储中含有 {jim_number: '1'*6}
, 那么 CallJim 的 execute method 就可以通过形参 jim_number 来获取 '1'*6
的值.
NOTE 5: Task class 的属性 default_provides 用于声明在执行过程中新添到后台存储的元素的名称, 其相应的值会自动的从 execute method 返回值中匹配获取, 最终存储后台存储. e.g. CallJim 的属性 default_provides = set(['jim_new_number'])
其中 jim_new_number
的值会从 execute method 的返回 return {'jim_new_number': jim_new_number}
中获取.
NOTE 6: provides 的实现能够有效的帮助传递 Task class 之间在执行时产生的新属性对象. 将上一个 Task 的结果传递给后一个 Task 使用.
最后
当实现的 TaskFlow 中包含了多个 Task(的确可能存在只有一个 Task 的 TaskFlow) 时, 有两点是需要注意的:
在使用线性流类型的 TaskFlow 时, Task class revert method 回滚的应该是上一个 Task class execute method 的业务. e.g. BTask.revert 应该回滚 ATask.execute, 因为只有在 ATask.execute 成功执行的前提之下才有 revert 的价值. 所以在 revert method 的定义中需要实现语句
if isinstance(result, task_failed.Failure): return None
. 当一个 Task 的 execute method 执行失败时, 那么 revert method 接收的 result 实参就是taskflow.types.failure.Failure
的实例对象.尽量让每个 Task class 都仅处理一件事情, 这是为了让每一次回滚都足够精准. e.g. 尽管创建虚拟机和开启虚拟机都同属于对虚拟机的操作, 但是我们仍然应该将两者各自定义一个 Task class. 假如启动虚拟机失败时, 我们只需再次重试启动虚拟机, 而无须再次重复创建虚拟机.