Celery 任务编排入坑

基本概念和用法

签名(Signatures)

基本概念

Celery 函数签名的意思是构造一个任务签名对象,该签名对象构造后并不会去执行(类比于一个静态对象),该对象具有函数的名称,参数等信息,可以供任意对象进行调用后启动任务。

signature() 实现了对函数参数,关键字参数和执行选项的包装,使其可以作为参数对象传递到其他函数,也可以序列化后作为消息(Message)传递给其他任务。

以下为一些具体使用的示例:

  • 通过函数名来创建 add 任务的签名,下列代码创建了一个参数为两个整数输入 2, 2add 任务签名,并设置了倒计时为 10。
>>> from celery import signature
>>> signature('tasks.add', args=(2, 2), countdown=10)
tasks.add(2, 2)
  • 或者可以使用任务的 signature 方法来创建一个签名对象:
>>> add.signature((2, 2), countdown=10)
tasks.add(2, 2)
  • 也可以使用 signature 的首字母 s 来代替:
>>> add.s(2, 2)
tasks.add(2, 2)

Signature 的参数被分为三个部分 args, kwargs 和 options,参考示例:

>>> s = add.signature((2, 2), {'debug': True}, countdown=10)
>>> s.args
(2, 2)
>>> s.kwargs
{'debug': True}
>>> s.options
{'countdown': 10}

Signature 支持所有的调用 API,类似 delay apply_async 等,也包括直接使用。

Partials

Partials 其实是 Signature 的一种特性,用于描述参数不完全的 Signature。

>>> partial = add.s(2)          # incomplete signature
>>> partial.delay(4)            # 4 + 2

本质上来讲就是任务的一些参数是需要外部传入的,但是另一部分是希望固定下来的,因此就会使用这一特性。

虽然可讲的内容不多,但这一特性在后续的原语中非常重要,对于任务的链式执行中结果传输非常有用。

Immutability

前面提到了 Partials 是 Signature 的特性,那么所有的 Signature 必定都支持这一特性,但是有些任务不希望接受到别的任务传来参数或者手动填入参数,那么就需要固定参数值,这一方法就使用 Immutability 来实现。

完整的使用为设置 Signature 参数中的 immutable=True

immutable_task = task.signature(immutable=True)

可以缩写为

immutable_task = task.si()

Callbacks

任务完成后需要回调去执行其他函数,这以操作通过设置执行方法的 link 参数来执行。

add.delay((2, 2), link=add.s(4))      # (2 + 2) + 4

原语(Primitives)

Chains

链式任务是最为常见的一种任务形式,用于表示任务的串行执行,在上一节中的回调部分其实就是一种构建任务链的方法。这种链式的操作不仅仅是在执行函数中设置,也可以在 Signature 层面进行设置,例如:

>>> s = add.s(2, 2)
>>> s.link(mul.s(4))
>>> s.link(log_result.s())

有两种简化链式任务声明的方法,第一种为使用 chain 函数:

>>> from celery import chain
>>> from proj.tasks import add, mul

>>> # (4 + 4) * 8 * 10
>>> res = chain(add.s(4, 4), mul.s(8), mul.s(10))

>>> tasks = [add.s(4, 4), mul.s(8), mul.s(10)]
>>> res = chain(*task)

第二种为使用 | 运算符:

res = add.s(4, 4) | mul.s(8) | mul.s(10)

Groups

Group 用于并行执行多个任务。传入参数为 Signature 列表:

>>> from celery import group
>>> from proj.tasks import add

>>> group(add.s(2, 2), add.s(4, 4))
(proj.tasks.add(2, 2), proj.tasks.add(4, 4))

Chords

Chord 其实就是带回调的 Group,一般用于 Group 的所有任务完成后执行下一个任务这种形式,形式为:

>>> from celery import chord
>>> from tasks import add, tsum

>>> chord(add.s(i, i)
...       for i in range(100))(tsum.s()).get()
>>> 9900

当然这种写法其实和 Group + Chain 的组合效果是一致的:

chain(group(add.s(i, i)
...         for i in range(100)), (tsum.s()).get())
>>> 9900

高阶操作

动态编排任务

在某些任务中,其任务调度图依赖于上一个任务的结果,因此无法一开始就创建号任务图,这就需要动态生成任务图去执行了。首先,给出一个错误范例:

@app.task
def run_task(data):
    params = prepare_params(data)      # 一些参数抽取
    workflow = build_workflow(params)  # 构造任务流
    result = workflow.get()            # 同步获取任务流结果
    return result                      # 返回

run_task 本身是一个 Task,在任务中准备数据形成需要的 workflow ,workflow 可能是 Graph 等原语,然后同步获取结果返回。这一做法在 Celery 中是被禁止的,Celery 禁止对子任务使用 get 来同步获取结果,这一结果可能造成性能或者任务死锁的问题。

一种 Celery 希望我们进行的方式为:

@app.task
def prepare_params(data):
    # 准备任务
    return params

@app.task
def run_workflow(params)
    # 编写完整的任务流过程,返回结果
    return result

# 基于 Chain 传递结果并执行
run_task = prepare_params.s(data) | run_workflow.s()@app.task
def run_task(data):
    params = prepare_params(data)      # 一些参数抽取
    workflow = build_workflow(params)  # 构造任务流
    result = workflow.get()            # 同步获取任务流结果
    return result                      # 返回

这种方式中没有 build_workflow 执行,而是静态写在 run_workflow 中,缺失了灵活性。

Celery 对这种在任务中构建任务流并执行的操作提供了 replace 方法,用于在一个任务中保留当前任务的 ID 但是替换任务内容为给出的任务,关于该方法的具体声明为:

Replace this task, with a new task inheriting the task id.
Execution of the host task ends immediately and no subsequent statements will be run.
New in version 4.0.

Parameters

  • sig (Signature) – signature to replace with.

Raises

  • Ignore – This is always raised when called in asynchronous context.
  • It is best to always use return self.replace(..) to convey to the reader that the task won't continue after being replaced.

该方法接受一个 Signature 对象,可以使用 Group 、Chain 或者 Chord 方法来获取编排号的 Signature 对象,从而实现动态任务图执行,实现方法为:

def build_workflow(params):
    tasks = []
    for param in params:
        task = build_task(param)
        tasks.append(tasks)
    return chain(*tasks)

@app.task
def run_workflow()
    params = load_params()
    workflow = build_workflow(params)
    raise self.replace(workflow)

异常捕获

Celery 中依旧使用任务(Task)来定义异常处理方法,且有多种使用方法(有些乱七八糟)。可以参考为:

@app.task
def on_err_func(request, exc, traceback):
    print('-------Task {0!r} raised error: {1!r}'.format(request.id, exc))

# 第一种形式,run_task 为单个任务
task = run_task.delay(link_error=on_err_func.s())

# 第二种,针对 Signature
signature = run_task.s()

# 第三种,针对 Chain、Group、Chord
signature = chain(*tasks, link_error=on_err_func.s())
signature = chain(*tasks).on_error(on_err_func.s())

Celery 中的异常捕获非常坑,有很多问题至今还在 GitHub Issue 中没解决,目前从个人踩坑经验来看,主要是以下一些问题:

  1. 如果 workflow 是一个复杂结构 Signature,类似于 Group of Chains,那么不能只对 workflow 设置错误异常捕获,这样的话依旧无法执行异常函数;

一些可以使用的解决方案:

  1. 任何使用 Chain、Group、Chord 的地方都需要加上错误异常,对于中间过程可以只是打印一下错误,对于最后 workflow 的异常使用完整的错误处理(例如保存异常状态到数据库);
  2. 对任务使用 try-except 进行包装,即确保任务一定能执行,这种方法对 celery 或者集群导致的任务丢失之类的问题没法解决,但是对节点出错依旧能执行这种形式可以由很好解决;

参考资料

  1. Documentation for Task.replace for Dynamic Tasks · Issue #3437 · celery/celery
  2. Canvas: Designing Work-flows - Celery 5.1.2 documentation