Celery 任务编排入坑
Codle / Nov 11, 2024
基本概念和用法
签名(Signatures)
基本概念
Celery 函数签名的意思是构造一个任务签名对象,该签名对象构造后并不会去执行(类比于一个静态对象),该对象具有函数的名称,参数等信息,可以供任意对象进行调用后启动任务。
signature()
实现了对函数参数,关键字参数和执行选项的包装,使其可以作为参数对象传递到其他函数,也可以序列化后作为消息(Message)传递给其他任务。
以下为一些具体使用的示例:
- 通过函数名来创建
add
任务的签名,下列代码创建了一个参数为两个整数输入2, 2
的add
任务签名,并设置了倒计时为 10。
>>> from celery import signature
>>> signature('tasks.add', args=(2, 2), countdown=10)
tasks.add(2, 2)
- 或者可以使用任务的
signature
方法来创建一个签名对象:
>>> add.signature((2, 2), countdown=10)
tasks.add(2, 2)
- 也可以使用
signature
的首字母s
来代替:
>>> add.s(2, 2)
tasks.add(2, 2)
Signature 的参数被分为三个部分 args, kwargs 和 options,参考示例:
>>> s = add.signature((2, 2), {'debug': True}, countdown=10)
>>> s.args
(2, 2)
>>> s.kwargs
{'debug': True}
>>> s.options
{'countdown': 10}
Signature 支持所有的调用 API,类似 delay
apply_async
等,也包括直接使用。
Partials
Partials 其实是 Signature 的一种特性,用于描述参数不完全的 Signature。
>>> partial = add.s(2) # incomplete signature
>>> partial.delay(4) # 4 + 2
本质上来讲就是任务的一些参数是需要外部传入的,但是另一部分是希望固定下来的,因此就会使用这一特性。
虽然可讲的内容不多,但这一特性在后续的原语中非常重要,对于任务的链式执行中结果传输非常有用。
Immutability
前面提到了 Partials 是 Signature 的特性,那么所有的 Signature 必定都支持这一特性,但是有些任务不希望接受到别的任务传来参数或者手动填入参数,那么就需要固定参数值,这一方法就使用 Immutability 来实现。
完整的使用为设置 Signature 参数中的 immutable=True
immutable_task = task.signature(immutable=True)
可以缩写为
immutable_task = task.si()
Callbacks
任务完成后需要回调去执行其他函数,这以操作通过设置执行方法的 link
参数来执行。
add.delay((2, 2), link=add.s(4)) # (2 + 2) + 4
原语(Primitives)
Chains
链式任务是最为常见的一种任务形式,用于表示任务的串行执行,在上一节中的回调部分其实就是一种构建任务链的方法。这种链式的操作不仅仅是在执行函数中设置,也可以在 Signature 层面进行设置,例如:
>>> s = add.s(2, 2)
>>> s.link(mul.s(4))
>>> s.link(log_result.s())
有两种简化链式任务声明的方法,第一种为使用 chain
函数:
>>> from celery import chain
>>> from proj.tasks import add, mul
>>> # (4 + 4) * 8 * 10
>>> res = chain(add.s(4, 4), mul.s(8), mul.s(10))
>>> tasks = [add.s(4, 4), mul.s(8), mul.s(10)]
>>> res = chain(*task)
第二种为使用 |
运算符:
res = add.s(4, 4) | mul.s(8) | mul.s(10)
Groups
Group 用于并行执行多个任务。传入参数为 Signature 列表:
>>> from celery import group
>>> from proj.tasks import add
>>> group(add.s(2, 2), add.s(4, 4))
(proj.tasks.add(2, 2), proj.tasks.add(4, 4))
Chords
Chord 其实就是带回调的 Group,一般用于 Group 的所有任务完成后执行下一个任务这种形式,形式为:
>>> from celery import chord
>>> from tasks import add, tsum
>>> chord(add.s(i, i)
... for i in range(100))(tsum.s()).get()
>>> 9900
当然这种写法其实和 Group + Chain 的组合效果是一致的:
chain(group(add.s(i, i)
... for i in range(100)), (tsum.s()).get())
>>> 9900
高阶操作
动态编排任务
在某些任务中,其任务调度图依赖于上一个任务的结果,因此无法一开始就创建号任务图,这就需要动态生成任务图去执行了。首先,给出一个错误范例:
@app.task
def run_task(data):
params = prepare_params(data) # 一些参数抽取
workflow = build_workflow(params) # 构造任务流
result = workflow.get() # 同步获取任务流结果
return result # 返回
run_task
本身是一个 Task,在任务中准备数据形成需要的 workflow ,workflow 可能是 Graph
等原语,然后同步获取结果返回。这一做法在 Celery 中是被禁止的,Celery 禁止对子任务使用 get
来同步获取结果,这一结果可能造成性能或者任务死锁的问题。
一种 Celery 希望我们进行的方式为:
@app.task
def prepare_params(data):
# 准备任务
return params
@app.task
def run_workflow(params)
# 编写完整的任务流过程,返回结果
return result
# 基于 Chain 传递结果并执行
run_task = prepare_params.s(data) | run_workflow.s()@app.task
def run_task(data):
params = prepare_params(data) # 一些参数抽取
workflow = build_workflow(params) # 构造任务流
result = workflow.get() # 同步获取任务流结果
return result # 返回
这种方式中没有 build_workflow
执行,而是静态写在 run_workflow
中,缺失了灵活性。
Celery 对这种在任务中构建任务流并执行的操作提供了 replace
方法,用于在一个任务中保留当前任务的 ID 但是替换任务内容为给出的任务,关于该方法的具体声明为:
Replace this task, with a new task inheriting the task id.
Execution of the host task ends immediately and no subsequent statements will be run.
New in version 4.0.
Parameters
- sig (Signature) – signature to replace with.
Raises
- Ignore – This is always raised when called in asynchronous context.
- It is best to always use return self.replace(..) to convey to the reader that the task won't continue after being replaced.
该方法接受一个 Signature 对象,可以使用 Group 、Chain 或者 Chord 方法来获取编排号的 Signature 对象,从而实现动态任务图执行,实现方法为:
def build_workflow(params):
tasks = []
for param in params:
task = build_task(param)
tasks.append(tasks)
return chain(*tasks)
@app.task
def run_workflow()
params = load_params()
workflow = build_workflow(params)
raise self.replace(workflow)
异常捕获
Celery 中依旧使用任务(Task)来定义异常处理方法,且有多种使用方法(有些乱七八糟)。可以参考为:
@app.task
def on_err_func(request, exc, traceback):
print('-------Task {0!r} raised error: {1!r}'.format(request.id, exc))
# 第一种形式,run_task 为单个任务
task = run_task.delay(link_error=on_err_func.s())
# 第二种,针对 Signature
signature = run_task.s()
# 第三种,针对 Chain、Group、Chord
signature = chain(*tasks, link_error=on_err_func.s())
signature = chain(*tasks).on_error(on_err_func.s())
Celery 中的异常捕获非常坑,有很多问题至今还在 GitHub Issue 中没解决,目前从个人踩坑经验来看,主要是以下一些问题:
- 如果 workflow 是一个复杂结构 Signature,类似于 Group of Chains,那么不能只对 workflow 设置错误异常捕获,这样的话依旧无法执行异常函数;
一些可以使用的解决方案:
- 任何使用 Chain、Group、Chord 的地方都需要加上错误异常,对于中间过程可以只是打印一下错误,对于最后 workflow 的异常使用完整的错误处理(例如保存异常状态到数据库);
- 对任务使用 try-except 进行包装,即确保任务一定能执行,这种方法对 celery 或者集群导致的任务丢失之类的问题没法解决,但是对节点出错依旧能执行这种形式可以由很好解决;
参考资料
- Documentation for Task.replace for Dynamic Tasks · Issue #3437 · celery/celery
- Canvas: Designing Work-flows - Celery 5.1.2 documentation