基于 Celery 的后台任务¶

如果应用有一个长时间运行的任务,如处理上传数据或者发送电子邮件,而你不想在请求中等待任务结束,那么可以使用任务队列发送必须的数据给另一个进程。这样就可以在后台运行任务,立即返回请求。

Celery 是强大的任务队列库,它可以用于简单的后台任务,也可用于复杂的多阶段应用的计划。本文主要说明如何在 Flask 中配置使用 Celery 。本文假设你已经阅读过了其官方文档中的 Celery 入门

安装¶

Celery 是一个独立的 Python 包。使用 pip 从 PyPI 安装:

  1. $ pip install celery

配置¶

你首先需要有一个 Celery 实例,这个实例称为 celery 应用。其地位就相当于Flask 中 Flask 一样。这个实例被用作所有 Celery 相关事务的入口,如创建任务和管理工人,因此它必须可以被其他模块导入。

例如,你可以把它放在一个 tasks 模块中。这样不需要重新配置,你就可以使用tasks 的子类,增加 Flask 应用情境的支持,并钩接 Flask 的配置。

只要如下这样就可以在 Falsk 中使用 Celery 了:

  1. from celery import Celery
  2.  
  3. def make_celery(app):
  4. celery = Celery(
  5. app.import_name,
  6. backend=app.config['CELERY_RESULT_BACKEND'],
  7. broker=app.config['CELERY_BROKER_URL']
  8. )
  9. celery.conf.update(app.config)
  10.  
  11. class ContextTask(celery.Task):
  12. def __call__(self, *args, **kwargs):
  13. with app.app_context():
  14. return self.run(*args, **kwargs)
  15.  
  16. celery.Task = ContextTask
  17. return celery

这个函数创建了一个新的 Celery 对象,使用了应用配置中的 broker ,并从 Flask配置中更新了 Celery 的其余配置。然后创建了一个任务子类,在一个应用情境中包装了任务执行。

一个示例任务¶

让我们来写一个任务,该任务把两个数字相加并返回结果。我们配置 Celery 的broker ,后端使用 Redis 。使用上文的工厂创建一个 celery 应用,并用它定义任务。:

  1. from flask import Flask
  2.  
  3. flask_app = Flask(__name__)
  4. flask_app.config.update(
  5. CELERY_BROKER_URL='redis://localhost:6379',
  6. CELERY_RESULT_BACKEND='redis://localhost:6379'
  7. )
  8. celery = make_celery(flask_app)
  9.  
  10. @celery.task()
  11. def add_together(a, b):
  12. return a + b

这个任务现在可以在后台调用了:

  1. result = add_together.delay(23, 42)
  2. result.wait() # 65

运行 Celery 工人¶

至此,如果你已经按上文一步一步执行,你会失望地发现你的 .wait() 不会真正返回。这是因为还需要运行一个 Celery 工人来接收和执行任务。:

  1. $ celery -A your_application.celery worker

yourapplication 字符串替换为你创建 _celery 对像的应用包或模块。

现在工人已经在运行中,一旦任务结束, wait 就会返回结果。

原文: https://dormousehole.readthedocs.io/en/latest/patterns/celery.html