Celery 简介

何为任务队列?

任务队列是一种在线程或机器间分发任务的机制。

消息队列的输入是工作的一个单元,称为任务,独立的职程(Worker)进程持续 监视队列中是否有需要处理的新任务。

Celery 用消息通信,通常使用中间人(Broker)在客户端和职程间斡旋。这个过程 从客户端向队列添加消息开始,之后中间人把消息派送给职程。

Celery 系统可包含多个职程和中间人,以此获得高可用性和横向扩展能力。

Celery 是用 Python 编写的,但协议可以用任何语言实现。迄今,已有 Ruby 实现 的 RCelery 、node.js 实现的 node-celery 以及一个 PHP 客户端 ,语言 互通也可以通过 using webhooks 实现。

我需要什么?

版本需求

Celery 的 3.0 版本可运行在

  • Python ❨2.5, 2.6, 2.7, 3.2, 3.3❩
  • PyPy ❨1.8, 1.9❩
  • Jython ❨2.5, 2.7❩.

这是最后一个支持 Python 2.5 的版本,也即从下个版本需要 Python 2.6 或更新版本的 Python。最后一个支持 Python 2.4 的版本为 Celery 2.2 系列。

Celery 需要一个发送和接受消息的传输者。RabbitMQ 和 Redis 中间人 的消息传输支持所有特性,但也提供大量其他实验性方案的支持,包括 用 SQLite 进行本地开发。

Celery 可以单机运行,也可以在多台机器上运行,甚至可以跨越数据中心运行。

上手

如果这是你第一次尝试 Celery,或你从以前版本刚步入 Celery 3.0,那么你应该 阅读一下我们的入门教程:

Celery 是…

  • 简单

    Celery 易于使用和维护,并且它 不需要配置文件

    Celery 有一个活跃、友好的社区来让你寻求帮助,包括一个 邮件列表 和一个 IRC 频道

    下面是一个你可以实现的最简应用:

    1. from celery import Celery
    2. app = Celery('hello', broker='amqp:[email protected]//')
    3. @app.task
    4. def hello():
    5. return 'hello world'
  • 高可用性

    倘若连接丢失或失败,职程和客户端会自动重试,并且一些中间人 通过 主/主主/从 方式复制来提高可用性。

  • 快速

    单个 Celery 进程每分钟可处理数以百万计的任务,而保持往返延迟 在亚毫秒级(使用 RabbitMQ、py-librabbitmq 和优化过的设置)。

  • 灵活

    Celery 几乎所有部分都可以扩展或单独使用。可以自制连接池、 序列化、压缩模式、日志、调度器、消费者、生产者、自动扩展、 中间人传输或更多。

它支持

  • 结果存储

    • AMQP, Redis
    • memcached, MongoDB
    • SQLAlchemy, Django ORM
    • Apache Cassandra
  • 序列化

    • pickle, json, yaml, msgpack
    • zlib, bzip2 压缩
    • 密码学消息签名

特性

  • 监视

    整条流水线的监视时间由职程发出,并用于内建或外部的工具 告知你集群的工作状况——而且是实时的。

    深入了解….

  • 工作流

    一系列功能强大的称为“Canvas”的原语(Primitive)用于构建 或简单、或复杂的工作流。包括分组、连锁、分割等等。

    深入了解….

  • 时间和速率限制

    你可以控制每秒/分钟/小时执行的任务数,或任务的最长运行时间, 并且可以为特定职程或不同类型的任务设置为默认值。

    深入了解….

  • 计划任务

    你可以指定任务在若干秒后或在 datetime 运行,或你可以基于单纯的时间间隔或支持分钟、小时、每周的第 几天、每月的第几天以及每年的第几月的 crontab 表达式来使用 周期任务来重现事件。

    深入了解….

  • 自动重载入

    在开发中,职程可以配置为在源码修改时自动重载入,包含对 Linux 上的 inotify(7) 支持。

    深入了解….

  • 自动扩展

    根据负载自动重调职程池的大小或用户指定的测量值,用于限制 共享主机/云环境的内存使用,或是保证给定的服务质量。

    深入了解….

  • 资源泄露保护

    —maxtasksperchild 选项用于控制用户任务泄露的诸如 内存或文件描述符这些易超出掌控的资源。

    深入了解….

  • 用户组件

    每个职程组件都可以自定义,并且额外组件可以由用户定义。职程是用 “bootsteps” 构建的——一个允许细粒度控制职程内构件的依赖图。

框架集成

Celery 易于与 Web 框架集成,其中的一些甚至已经有了集成包:

Djangodjango-celery
Pyramidpyramid_celery
Pylonscelery-pylons
Flask不需要
web2pyweb2py-celery
Tornadotornado-celery

集成包并非是严格必要的,但它们让开发更简便,并且有时它们在 fork(2) 上添加了比如关闭数据库连接这样的重要回调。

快速跳转

我想要阅读⟶

跳转至 ⟶

安装

你可以从 Python Package Index(PyPI)或源码安装 Celery。

用 pip 安装:

  1. $ pip install -U Celery

用 easy_install 安装:

  1. $ easy_install -U Celery

捆绑

Celery 也定义了一组用于安装 Celery 和给定特性依赖的捆绑。

你可以在 requirements.txt 中指定或在 pip 命令中使用方括号。多个捆绑 用逗号分隔。

  1. $ pip install celery[librabbitmq]
  2. $ pip install celery[librabbitmq,redis,auth,msgpack]

以下是可用的捆绑:

序列化

celery[auth]:使用 auth 序列化。
celery[msgpack]:
 使用 msgpack 序列化。
celery[yaml]:使用 yaml 序列化。

并发

celery[eventlet]:
 使用 eventlet 池。
celery[gevent]:使用 gevent 池。
celery[threads]:
 使用线程池。

传输和后端

celery[librabbitmq]:
 使用 librabbitmq 的 C 库.
celery[redis]:使用 Redis 作为消息传输方式或结果后端。
celery[mongodb]:
 使用 MongoDB 作为消息传输方式( 实验性 ),或是结果后端( 已支持 )。
celery[sqs]:使用 Amazon SQS 作为消息传输方式( 实验性 )。
celery[memcache]:
 使用 memcache 作为结果后端。
celery[cassandra]:
 使用 Apache Cassandra 作为结果后端。
celery[couchdb]:
 使用 CouchDB 作为消息传输方式( 实验性 )。
celery[couchbase]:
 使用 CouchBase 作为结果后端。
celery[beanstalk]:
 使用 Beanstalk 作为消息传输方式( 实验性 )。
celery[zookeeper]:
 使用 Zookeeper 作为消息传输方式。
celery[zeromq]:使用 ZeroMQ 作为消息传输方式( 实验性 )。
celery[sqlalchemy]:
 使用 SQLAlchemy 作为消息传输方式( 实验性 ),或作为结果后端( 已支持 )。
celery[pyro]:使用 Pyro4 消息传输方式( 实验性 )。
celery[slmq]:使用 SoftLayer Message Queue 传输( 实验性 )。

从源码安装

http://pypi.python.org/pypi/celery/ 下载最新版本的 Celery。

你可以通过以下步骤安装:

  1. $ tar xvfz celery-0.0.0.tar.gz
  2. $ cd celery-0.0.0
  3. $ python setup.py build
  4. # python setup.py install

如果不是在 virtualenv 里安装,最后一条命令必须以管理员权限执行。

使用开发版本

pip 途径

Celery 开发版本需要开发版本的 kombu 、 amqp 和 billiard 。

你可以用下面的 pip 命令来安装这些库最新的快照:

  1. $ pip install https://github.com/celery/celery/zipball/master#egg=celery
  2. $ pip install https://github.com/celery/billiard/zipball/master#egg=billiard
  3. $ pip install https://github.com/celery/py-amqp/zipball/master#egg=amqp
  4. $ pip install https://github.com/celery/kombu/zipball/master#egg=kombu

Git 途径

请见 Contributing 章节。