基于 Celery 的后台任务

如果应用有一个长时间运行的任务,如处理上传数据或者发送电子邮件,而你不想在 请求中等待任务结束,那么可以使用任务队列发送必须的数据给另一个进程。这样就 可以在后台运行任务,立即返回请求。

Celery 是强大的任务队列库,它可以用于简单的后台任务,也可用于复杂的多阶段 应用的计划。本文主要说明如何在 Flask 中配置使用 Celery 。本文假设你 已经阅读过了其官方文档中的 Celery 入门

安装

Celery 是一个独立的 Python 包。使用 pip 从 PyPI 安装:

  1. $ pip install celery

配置

你首先需要有一个 Celery 实例,这个实例称为 celery 应用。其地位就相当于 Flask 中 Flask 一样。这个实例被用作所有 Celery 相关事务的 入口,如创建任务和管理工人,因此它必须可以被其他模块导入。

例如,你可以把它放在一个 tasks 模块中。这样不需要重新配置,你就可以使用 tasks 的子类,增加 Flask 应用情境的支持,并钩接 Flask 的配置。

只要如下这样就可以在 Falsk 中使用 Celery 了:

  1. from celery import Celery
  2. def make_celery(app):
  3. celery = Celery(
  4. app.import_name,
  5. backend=app.config['CELERY_RESULT_BACKEND'],
  6. broker=app.config['CELERY_BROKER_URL']
  7. )
  8. celery.conf.update(app.config)
  9. class ContextTask(celery.Task):
  10. def __call__(self, *args, **kwargs):
  11. with app.app_context():
  12. return self.run(*args, **kwargs)
  13. celery.Task = ContextTask
  14. return celery

这个函数创建了一个新的 Celery 对象,使用了应用配置中的 broker ,并从 Flask 配置中更新了 Celery 的其余配置。然后创建了一个任务子类,在一个应用情境中包 装了任务执行。

一个示例任务

让我们来写一个任务,该任务把两个数字相加并返回结果。我们配置 Celery 的 broker ,后端使用 Redis 。使用上文的工厂创建一个 celery 应用,并用它定 义任务。:

  1. from flask import Flask
  2. flask_app = Flask(__name__)
  3. flask_app.config.update(
  4. CELERY_BROKER_URL='redis://localhost:6379',
  5. CELERY_RESULT_BACKEND='redis://localhost:6379'
  6. )
  7. celery = make_celery(flask_app)
  8. @celery.task()
  9. def add_together(a, b):
  10. return a + b

这个任务现在可以在后台调用了:

  1. result = add_together.delay(23, 42)
  2. result.wait() # 65

运行 Celery 工人

至此,如果你已经按上文一步一步执行,你会失望地发现你的 .wait() 不会真正 返回。这是因为还需要运行一个 Celery 工人来接收和执行任务。:

  1. $ celery -A your_application.celery worker

your_application 字符串替换为你创建 celery 对像的应用包或模块。

现在工人已经在运行中,一旦任务结束, wait 就会返回结果。