Application

The Celery library must be instantiated before use, this instance is called an application (or app for short).

The application is thread-safe so that multiple Celery applications with different configuration, components and tasks can co-exist in the same process space.

Let’s create one now:

  1. >>> from celery import Celery
  2. >>> app = Celery()
  3. >>> app
  4. <Celery __main__:0x100469fd0>

The last line shows the textual representation of the application, which includes the name of the celery class (Celery), the name of the current main module (__main__), and the memory address of the object (0x100469fd0).

Main Name

Only one of these is important, and that is the main module name, let’s look at why that is.

When you send a task message in Celery, that message will not contain any source code, but only the name of the task you want to execute. This works similarly to how host names works on the internet: every worker maintains a mapping of task names to their actual functions, called the task registry.

Whenever you define a task, that task will also be added to the local registry:

  1. >>> @app.task
  2. ... def add(x, y):
  3. ... return x + y
  4. >>> add
  5. <@task: __main__.add>
  6. >>> add.name
  7. __main__.add
  8. >>> app.tasks['__main__.add']
  9. <@task: __main__.add>

and there you see that __main__ again; whenever Celery is not able to detect what module the function belongs to, it uses the main module name to generate the beginning of the task name.

This is only a problem in a limited set of use cases:

  1. If the module that the task is defined in is run as a program.
  2. If the application is created in the Python shell (REPL).

For example here, where the tasks module is also used to start a worker:

tasks.py:

  1. from celery import Celery
  2. app = Celery()
  3. @app.task
  4. def add(x, y): return x + y
  5. if __name__ == '__main__':
  6. app.worker_main()

When this module is executed the tasks will be named starting with “__main__”, but when the module is imported by another process, say to call a task, the tasks will be named starting with “tasks” (the real name of the module):

  1. >>> from tasks import add
  2. >>> add.name
  3. tasks.add

You can specify another name for the main module:

  1. >>> app = Celery('tasks')
  2. >>> app.main
  3. 'tasks'
  4. >>> @app.task
  5. ... def add(x, y):
  6. ... return x + y
  7. >>> add.name
  8. tasks.add

参见

Names

Configuration

There are several options you can set that will change how Celery works. These options can be set directly on the app instance, or you can use a dedicated configuration module.

The configuration is available as Celery.conf:

  1. >>> app.conf.CELERY_TIMEZONE
  2. 'Europe/London'

where you can also set configuration values directly:

  1. >>> app.conf.CELERY_ENABLE_UTC = True

and update several keys at once by using the update method:

  1. >>> app.conf.update(
  2. ... CELERY_ENABLE_UTC=True,
  3. ... CELERY_TIMEZONE='Europe/London',
  4. ...)

The configuration object consists of multiple dictionaries that are consulted in order:

  1. Changes made at runtime.
  2. The configuration module (if any)
  3. The default configuration (celery.app.defaults).

You can even add new default sources by using the Celery.add_defaults() method.

参见

Go to the Configuration reference for a complete listing of all the available settings, and their default values.

config_from_object

The Celery.config_from_object() method loads configuration from a configuration object.

This can be a configuration module, or any object with configuration attributes.

Note that any configuration that was previous set will be reset when config_from_object() is called. If you want to set additional configuration you should do so after.

Example 1: Using the name of a module

  1. from celery import Celery
  2. app = Celery()
  3. app.config_from_object('celeryconfig')

The celeryconfig module may then look like this:

celeryconfig.py:

  1. CELERY_ENABLE_UTC = True
  2. CELERY_TIMEZONE = 'Europe/London'

Example 2: Using a configuration module

小技巧

Using the name of a module is recomended as this means that the module doesn’t need to be serialized when the prefork pool is used. If you’re experiencing configuration pickle errors then please try using the name of a module instead.

  1. from celery import Celery
  2. app = Celery()
  3. import celeryconfig
  4. app.config_from_object(celeryconfig)

Example 3: Using a configuration class/object

  1. from celery import Celery
  2. app = Celery()
  3. class Config:
  4. CELERY_ENABLE_UTC = True
  5. CELERY_TIMEZONE = 'Europe/London'
  6. app.config_from_object(Config)
  7. # or using the fully qualified name of the object:
  8. # app.config_from_object('module:Config')

config_from_envvar

The Celery.config_from_envvar() takes the configuration module name from an environment variable

For example – to load configuration from a module specified in the environment variable named CELERY_CONFIG_MODULE:

  1. import os
  2. from celery import Celery
  3. #: Set default configuration module name
  4. os.environ.setdefault('CELERY_CONFIG_MODULE', 'celeryconfig')
  5. app = Celery()
  6. app.config_from_envvar('CELERY_CONFIG_MODULE')

You can then specify the configuration module to use via the environment:

  1. $ CELERY_CONFIG_MODULE="celeryconfig.prod" celery worker -l info

Censored configuration

If you ever want to print out the configuration, as debugging information or similar, you may also want to filter out sensitive information like passwords and API keys.

Celery comes with several utilities used for presenting the configuration, one is humanize():

  1. >>> app.conf.humanize(with_defaults=False, censored=True)

This method returns the configuration as a tabulated string. This will only contain changes to the configuration by default, but you can include the default keys and values by changing the with_defaults argument.

If you instead want to work with the configuration as a dictionary, then you can use the table() method:

  1. >>> app.conf.table(with_defaults=False, censored=True)

Please note that Celery will not be able to remove all sensitive information, as it merely uses a regular expression to search for commonly named keys. If you add custom settings containing sensitive information you should name the keys using a name that Celery identifies as secret.

A configuration setting will be censored if the name contains any of these substrings:

API, TOKEN, KEY, SECRET, PASS, SIGNATURE, DATABASE

Laziness

The application instance is lazy, meaning that it will not be evaluated until something is actually needed.

Creating a Celery instance will only do the following:

  1. Create a logical clock instance, used for events.
  2. Create the task registry.
  3. Set itself as the current app (but not if the set_as_current argument was disabled)
  4. Call the Celery.on_init() callback (does nothing by default).

The task() decorator does not actually create the tasks at the point when it’s called, instead it will defer the creation of the task to happen either when the task is used, or after the application has been finalized,

This example shows how the task is not created until you use the task, or access an attribute (in this case repr()):

  1. >>> @app.task
  2. >>> def add(x, y):
  3. ... return x + y
  4. >>> type(add)
  5. <class 'celery.local.PromiseProxy'>
  6. >>> add.__evaluated__()
  7. False
  8. >>> add # <-- causes repr(add) to happen
  9. <@task: __main__.add>
  10. >>> add.__evaluated__()
  11. True

Finalization of the app happens either explicitly by calling Celery.finalize() – or implicitly by accessing the tasks attribute.

Finalizing the object will:

  1. Copy tasks that must be shared between apps

    Tasks are shared by default, but if the shared argument to the task decorator is disabled, then the task will be private to the app it’s bound to.

  2. Evaluate all pending task decorators.

  3. Make sure all tasks are bound to the current app.

    Tasks are bound to apps so that it can read default values from the configuration.

The “default app”.

Celery did not always work this way, it used to be that there was only a module-based API, and for backwards compatibility the old API is still there.

Celery always creates a special app that is the “default app”, and this is used if no custom application has been instantiated.

The celery.task module is there to accommodate the old API, and should not be used if you use a custom app. You should always use the methods on the app instance, not the module based API.

For example, the old Task base class enables many compatibility features where some may be incompatible with newer features, such as task methods:

  1. from celery.task import Task # << OLD Task base class.
  2. from celery import Task # << NEW base class.

The new base class is recommended even if you use the old module-based API.

Breaking the chain

While it’s possible to depend on the current app being set, the best practice is to always pass the app instance around to anything that needs it.

I call this the “app chain”, since it creates a chain of instances depending on the app being passed.

The following example is considered bad practice:

  1. from celery import current_app
  2. class Scheduler(object):
  3. def run(self):
  4. app = current_app

Instead it should take the app as an argument:

  1. class Scheduler(object):
  2. def __init__(self, app):
  3. self.app = app

Internally Celery uses the celery.app.app_or_default() function so that everything also works in the module-based compatibility API

  1. from celery.app import app_or_default
  2. class Scheduler(object):
  3. def __init__(self, app=None):
  4. self.app = app_or_default(app)

In development you can set the CELERY_TRACE_APP environment variable to raise an exception if the app chain breaks:

  1. $ CELERY_TRACE_APP=1 celery worker -l info

Evolving the API

Celery has changed a lot in the 3 years since it was initially created.

For example, in the beginning it was possible to use any callable as a task:

  1. def hello(to):
  2. return 'hello {0}'.format(to)
  3. >>> from celery.execute import apply_async
  4. >>> apply_async(hello, ('world!', ))

or you could also create a Task class to set certain options, or override other behavior

  1. from celery.task import Task
  2. from celery.registry import tasks
  3. class Hello(Task):
  4. send_error_emails = True
  5. def run(self, to):
  6. return 'hello {0}'.format(to)
  7. tasks.register(Hello)
  8. >>> Hello.delay('world!')

Later, it was decided that passing arbitrary call-ables was an anti-pattern, since it makes it very hard to use serializers other than pickle, and the feature was removed in 2.0, replaced by task decorators:

  1. from celery.task import task
  2. @task(send_error_emails=True)
  3. def hello(x):
  4. return 'hello {0}'.format(to)

Abstract Tasks

All tasks created using the task() decorator will inherit from the applications base Task class.

You can specify a different base class with the base argument:

  1. @app.task(base=OtherTask):
  2. def add(x, y):
  3. return x + y

To create a custom task class you should inherit from the neutral base class: celery.Task.

  1. from celery import Task
  2. class DebugTask(Task):
  3. abstract = True
  4. def __call__(self, *args, **kwargs):
  5. print('TASK STARTING: {0.name}[{0.request.id}]'.format(self))
  6. return super(DebugTask, self).__call__(*args, **kwargs)

小技巧

If you override the tasks __call__ method, then it’s very important that you also call super so that the base call method can set up the default request used when a task is called directly.

The neutral base class is special because it’s not bound to any specific app yet. Concrete subclasses of this class will be bound, so you should always mark generic base classes as abstract

Once a task is bound to an app it will read configuration to set default values and so on.

It’s also possible to change the default base class for an application by changing its Celery.Task() attribute:

  1. >>> from celery import Celery, Task
  2. >>> app = Celery()
  3. >>> class MyBaseTask(Task):
  4. ... abstract = True
  5. ... send_error_emails = True
  6. >>> app.Task = MyBaseTask
  7. >>> app.Task
  8. <unbound MyBaseTask>
  9. >>> @x.task
  10. ... def add(x, y):
  11. ... return x + y
  12. >>> add
  13. <@task: __main__.add>
  14. >>> add.__class__.mro()
  15. [<class add of <Celery __main__:0x1012b4410>>,
  16. <unbound MyBaseTask>,
  17. <unbound Task>,
  18. <type 'object'>]