CodLe

Keep calm and make sure the growth is my own business.

Celery 任务编排入坑

Codle / Nov 11, 2024


基本概念和用法

签名(Signatures)

基本概念

Celery 函数签名的意思是构造一个任务签名对象,该签名对象构造后并不会去执行(类比于一个静态对象),该对象具有函数的名称,参数等信息,可以供任意对象进行调用后启动任务。

signature() 实现了对函数参数,关键字参数和执行选项的包装,使其可以作为参数对象传递到其他函数,也可以序列化后作为消息(Message)传递给其他任务。

以下为一些具体使用的示例:

>>> from celery import signature
>>> signature('tasks.add', args=(2, 2), countdown=10)
tasks.add(2, 2)
>>> add.signature((2, 2), countdown=10)
tasks.add(2, 2)
>>> add.s(2, 2)
tasks.add(2, 2)

Signature 的参数被分为三个部分 args, kwargs 和 options,参考示例:

>>> s = add.signature((2, 2), {'debug': True}, countdown=10)
>>> s.args
(2, 2)
>>> s.kwargs
{'debug': True}
>>> s.options
{'countdown': 10}

Signature 支持所有的调用 API,类似 delay apply_async 等,也包括直接使用。

Partials

Partials 其实是 Signature 的一种特性,用于描述参数不完全的 Signature。

>>> partial = add.s(2)          # incomplete signature
>>> partial.delay(4)            # 4 + 2

本质上来讲就是任务的一些参数是需要外部传入的,但是另一部分是希望固定下来的,因此就会使用这一特性。

虽然可讲的内容不多,但这一特性在后续的原语中非常重要,对于任务的链式执行中结果传输非常有用。

Immutability

前面提到了 Partials 是 Signature 的特性,那么所有的 Signature 必定都支持这一特性,但是有些任务不希望接受到别的任务传来参数或者手动填入参数,那么就需要固定参数值,这一方法就使用 Immutability 来实现。

完整的使用为设置 Signature 参数中的 immutable=True

immutable_task = task.signature(immutable=True)

可以缩写为

immutable_task = task.si()

Callbacks

任务完成后需要回调去执行其他函数,这以操作通过设置执行方法的 link 参数来执行。

add.delay((2, 2), link=add.s(4))      # (2 + 2) + 4

原语(Primitives)

Chains

链式任务是最为常见的一种任务形式,用于表示任务的串行执行,在上一节中的回调部分其实就是一种构建任务链的方法。这种链式的操作不仅仅是在执行函数中设置,也可以在 Signature 层面进行设置,例如:

>>> s = add.s(2, 2)
>>> s.link(mul.s(4))
>>> s.link(log_result.s())

有两种简化链式任务声明的方法,第一种为使用 chain 函数:

>>> from celery import chain
>>> from proj.tasks import add, mul

>>> # (4 + 4) * 8 * 10
>>> res = chain(add.s(4, 4), mul.s(8), mul.s(10))

>>> tasks = [add.s(4, 4), mul.s(8), mul.s(10)]
>>> res = chain(*task)

第二种为使用 | 运算符:

res = add.s(4, 4) | mul.s(8) | mul.s(10)

Groups

Group 用于并行执行多个任务。传入参数为 Signature 列表:

>>> from celery import group
>>> from proj.tasks import add

>>> group(add.s(2, 2), add.s(4, 4))
(proj.tasks.add(2, 2), proj.tasks.add(4, 4))

Chords

Chord 其实就是带回调的 Group,一般用于 Group 的所有任务完成后执行下一个任务这种形式,形式为:

>>> from celery import chord
>>> from tasks import add, tsum

>>> chord(add.s(i, i)
...       for i in range(100))(tsum.s()).get()
>>> 9900

当然这种写法其实和 Group + Chain 的组合效果是一致的:

chain(group(add.s(i, i)
...         for i in range(100)), (tsum.s()).get())
>>> 9900

高阶操作

动态编排任务

在某些任务中,其任务调度图依赖于上一个任务的结果,因此无法一开始就创建号任务图,这就需要动态生成任务图去执行了。首先,给出一个错误范例:

@app.task
def run_task(data):
    params = prepare_params(data)      # 一些参数抽取
    workflow = build_workflow(params)  # 构造任务流
    result = workflow.get()            # 同步获取任务流结果
    return result                      # 返回

run_task 本身是一个 Task,在任务中准备数据形成需要的 workflow ,workflow 可能是 Graph 等原语,然后同步获取结果返回。这一做法在 Celery 中是被禁止的,Celery 禁止对子任务使用 get 来同步获取结果,这一结果可能造成性能或者任务死锁的问题。

一种 Celery 希望我们进行的方式为:

@app.task
def prepare_params(data):
    # 准备任务
    return params

@app.task
def run_workflow(params)
    # 编写完整的任务流过程,返回结果
    return result

# 基于 Chain 传递结果并执行
run_task = prepare_params.s(data) | run_workflow.s()@app.task
def run_task(data):
    params = prepare_params(data)      # 一些参数抽取
    workflow = build_workflow(params)  # 构造任务流
    result = workflow.get()            # 同步获取任务流结果
    return result                      # 返回

这种方式中没有 build_workflow 执行,而是静态写在 run_workflow 中,缺失了灵活性。

Celery 对这种在任务中构建任务流并执行的操作提供了 replace 方法,用于在一个任务中保留当前任务的 ID 但是替换任务内容为给出的任务,关于该方法的具体声明为:

Replace this task, with a new task inheriting the task id.
Execution of the host task ends immediately and no subsequent statements will be run.
New in version 4.0.

Parameters

Raises

该方法接受一个 Signature 对象,可以使用 Group 、Chain 或者 Chord 方法来获取编排号的 Signature 对象,从而实现动态任务图执行,实现方法为:

def build_workflow(params):
    tasks = []
    for param in params:
        task = build_task(param)
        tasks.append(tasks)
    return chain(*tasks)

@app.task
def run_workflow()
    params = load_params()
    workflow = build_workflow(params)
    raise self.replace(workflow)

异常捕获

Celery 中依旧使用任务(Task)来定义异常处理方法,且有多种使用方法(有些乱七八糟)。可以参考为:

@app.task
def on_err_func(request, exc, traceback):
    print('-------Task {0!r} raised error: {1!r}'.format(request.id, exc))

# 第一种形式,run_task 为单个任务
task = run_task.delay(link_error=on_err_func.s())

# 第二种,针对 Signature
signature = run_task.s()

# 第三种,针对 Chain、Group、Chord
signature = chain(*tasks, link_error=on_err_func.s())
signature = chain(*tasks).on_error(on_err_func.s())

Celery 中的异常捕获非常坑,有很多问题至今还在 GitHub Issue 中没解决,目前从个人踩坑经验来看,主要是以下一些问题:

  1. 如果 workflow 是一个复杂结构 Signature,类似于 Group of Chains,那么不能只对 workflow 设置错误异常捕获,这样的话依旧无法执行异常函数;

一些可以使用的解决方案:

  1. 任何使用 Chain、Group、Chord 的地方都需要加上错误异常,对于中间过程可以只是打印一下错误,对于最后 workflow 的异常使用完整的错误处理(例如保存异常状态到数据库);
  2. 对任务使用 try-except 进行包装,即确保任务一定能执行,这种方法对 celery 或者集群导致的任务丢失之类的问题没法解决,但是对节点出错依旧能执行这种形式可以由很好解决;

参考资料

  1. Documentation for Task.replace for Dynamic Tasks · Issue #3437 · celery/celery
  2. Canvas: Designing Work-flows - Celery 5.1.2 documentation