Celery celery.exceptions.NotRegistered 和 celery.exceptions.TimeoutError: Operation timed out (3.0)

最近公司业务复杂度在提升,因此需要更复杂的celery(3.1.24)任务调度与依赖。

其中一项是chain of groups,需要把group任务进行chain串行调度。

参照Celery的文档:

chain group with other task will be a chord

如果直接把group放在chain里,实际上会被转化为chord, 因此group里面的每一个task,都必须满足chord的要求,参照:

Chord should not ignore result

其中最重要的是,task必须有返回结果(即使默认的None返回),另外一点文中没有提到的,就是CELERY_TASK_RESULT_EXPIRES必须设置得足够长,能够存活到chord task被执行之时

然而在解决了上面的问题后,如果只是单纯的把group用chain连接的话,仍然会发生任务丢失或执行顺序错乱的问题,这似乎是celery的一个bug,参照:

dummy chord finisher

我们需要一个额外的chord finish task和把group都包装成chord, 最后总结示例代码:

1
2
3
# config.py
CELERY_TASK_RESULT_EXPIRES = 1000 # long enough within a chord
CELERY_IGNORE_RESULT = False # cannot ignore result
1
2
3
4
5
6
7
8
9
10
11
12
# task.py
from celery import app
# notice you must not use ignore_result=True
@app.task
def dummy(msg):
# do something
print(msg)
@app.task
def finisher():
pass
1
2
3
4
5
6
7
8
9
10
11
12
# main.py
from celery import chain, chord, group
from task import dummy, finisher
chain(
chord(group(dummy.si('1.1'), dummy.si('1.2')),
finisher.si()),
chord([dummy.si('2')],
finisher.si()),
chord(group(dummy.si('3.1')),
finisher.si())
)()

如果大家有什么更好的解决方案,欢迎联系交流

Zhanzhao Deo Liang wechat
欢迎关注我的个人订阅号: deoXdeo
今天的午餐全赖有你支持!