celery.app.task

celery.app.task

Task Implementation: Task request context, and the base task class.

class celery.app.task.Task[源代码]

Task base class.

When called tasks apply the run() method. This method must be defined by all tasks (that is unless the __call__() method is overridden).

  • AsyncResult(task_id, \*kwargs*)[源代码]

    Get AsyncResult instance for this kind of task.

    参数:task_id – Task id to get result for.
  • class ErrorMail(task, \*kwargs*)

    Defines how and when task error e-mails should be sent.

    参数:task – The task instance that raised the error.

    subject and body are format strings which are passed a context containing the following keys:

    • name

      Name of the task.

    • id

      UUID of the task.

    • exc

      String representation of the exception.

    • args

      Positional arguments.

    • kwargs

      Keyword arguments.

    • traceback

      String representation of the traceback.

    • hostname

      Worker nodename.

    • should_send(context, exc)

      Return true or false depending on if a task error mail should be sent for this type of error.

  • exception Task.MaxRetriesExceededError

    The tasks max restart limit has been exceeded.

  • Task.Strategy = ‘celery.worker.strategy:default’

    Execution strategy used, or the qualified name of one.

  • Task.abstract = None

    If True the task is an abstract base class.

  • Task.accept_magic_kwargs = False

    If disabled the worker will not forward magic keyword arguments. Deprecated and scheduled for removal in v4.0.

  • Task.acks_late = False

    When enabled messages for this task will be acknowledged after the task has been executed, and not just before which is the default behavior.

    Please note that this means the task may be executed twice if the worker crashes mid execution (which may be acceptable for some applications).

    The application default can be overridden with the CELERY_ACKS_LATE setting.

  • Task.after_return(status, retval, task_id, args, kwargs, einfo)[源代码]

    Handler called after the task returns.

    参数:
    • status – Current task state.
    • retval – Task return value/exception.
    • task_id – Unique id of the task.
    • args – Original arguments for the task that failed.
    • kwargs – Original keyword arguments for the task that failed.
    • einfoExceptionInfo instance, containing the traceback (if any).

    The return value of this handler is ignored.

  • Task.apply(args=None, kwargs=None, link=None, link_error=None, \*options*)[源代码]

    Execute this task locally, by blocking until the task returns.

    参数:
    • args – positional arguments passed on to the task.
    • kwargs – keyword arguments passed on to the task.
    • throw – Re-raise task exceptions. Defaults to the CELERY_EAGER_PROPAGATES_EXCEPTIONS setting.

    :rtype celery.result.EagerResult:

  • Task.apply_async(args=None, kwargs=None, task_id=None, producer=None, link=None, link_error=None, \*options*)[源代码]

    Apply tasks asynchronously by sending a message.

    参数:
    • args – The positional arguments to pass on to the task (a list or tuple).
    • kwargs – The keyword arguments to pass on to the task (a dict)
    • countdown – Number of seconds into the future that the task should execute. Defaults to immediate execution.
    • eta – A datetime object describing the absolute time and date of when the task should be executed. May not be specified if countdown is also supplied.
    • expires – Either a int, describing the number of seconds, or a datetime object that describes the absolute time and date of when the task should expire. The task will not be executed after the expiration time.
    • connection – Re-use existing broker connection instead of establishing a new one.
    • retry – If enabled sending of the task message will be retried in the event of connection loss or failure. Default is taken from the CELERY_TASK_PUBLISH_RETRY setting. Note you need to handle the producer/connection manually for this to work.
    • retry_policy – Override the retry policy used. See the CELERY_TASK_PUBLISH_RETRY setting.
    • routing_key – Custom routing key used to route the task to a worker server. If in combination with a queue argument only used to specify custom routing keys to topic exchanges.
    • queue – The queue to route the task to. This must be a key present in CELERY_QUEUES, or CELERY_CREATE_MISSING_QUEUES must be enabled. See Routing Tasks for more information.
    • exchange – Named custom exchange to send the task to. Usually not used in combination with the queue argument.
    • priority – The task priority, a number between 0 and 9. Defaults to the priority attribute.
    • serializer – A string identifying the default serialization method to use. Can be pickle, json, yaml, msgpack or any custom serialization method that has been registered with kombu.serialization.registry. Defaults to the serializer attribute.
    • compression – A string identifying the compression method to use. Can be one of zlib, bzip2, or any custom compression methods registered with kombu.compression.register(). Defaults to the CELERY_MESSAGE_COMPRESSION setting.
    • link – A single, or a list of tasks to apply if the task exits successfully.
    • link_error – A single, or a list of tasks to apply if an error occurs while executing the task.
    • producer – :class:~@amqp.TaskProducer` instance to use.
    • add_to_parent – If set to True (default) and the task is applied while executing another task, then the result will be appended to the parent tasks request.children attribute. Trailing can also be disabled by default using the trail attribute
    • publisher – Deprecated alias to producer.

    Also supports all keyword arguments supported by kombu.Producer.publish().

    注解

    If the CELERY_ALWAYS_EAGER setting is set, it will be replaced by a local apply() call instead.

  • Task.autoregister = True

    If disabled this task won’t be registered automatically.

  • Task.backend = <celery.backends.base.DisabledBackend object at 0xcd06e2c>

    The result store backend used for this task.

  • Task.chunks(it, n)[源代码]

    Creates a chunks task for this task.

  • Task.default_retry_delay = 180

    Default time in seconds before a retry of the task should be executed. 3 minutes by default.

  • Task.delay(\args, **kwargs*)[源代码]

    Star argument version of apply_async().

    Does not support the extra options enabled by apply_async().

    参数:
    • args – positional arguments passed on to the task.
    • *kwargs – keyword arguments passed on to the task.

    :returns celery.result.AsyncResult:

  • Task.expires = None

    Default task expiry time.

  • Task.ignore_result = False

    If enabled the worker will not store task state and return values for this task. Defaults to the CELERY_IGNORE_RESULT setting.

  • Task.map(it)[源代码]

    Creates a xmap task from it.

  • Task.max_retries = 3

    Maximum number of retries before giving up. If set to None, it will never stop retrying.

  • Task.name = None

    Name of the task.

  • classmethod Task.on_bound(app)[源代码]

    This method can be defined to do additional actions when the task class is bound to an app.

  • Task.on_failure(exc, task_id, args, kwargs, einfo)[源代码]

    Error handler.

    This is run by the worker when the task fails.

    参数:
    • exc – The exception raised by the task.
    • task_id – Unique id of the failed task.
    • args – Original arguments for the task that failed.
    • kwargs – Original keyword arguments for the task that failed.
    • einfoExceptionInfo instance, containing the traceback.

    The return value of this handler is ignored.

  • Task.on_retry(exc, task_id, args, kwargs, einfo)[源代码]

    Retry handler.

    This is run by the worker when the task is to be retried.

    参数:
    • exc – The exception sent to retry().
    • task_id – Unique id of the retried task.
    • args – Original arguments for the retried task.
    • kwargs – Original keyword arguments for the retried task.
    • einfoExceptionInfo instance, containing the traceback.

    The return value of this handler is ignored.

  • Task.on_success(retval, task_id, args, kwargs)[源代码]

    Success handler.

    Run by the worker if the task executes successfully.

    参数:
    • retval – The return value of the task.
    • task_id – Unique id of the executed task.
    • args – Original arguments for the executed task.
    • kwargs – Original keyword arguments for the executed task.

    The return value of this handler is ignored.

  • Task.rate_limit = None

    Rate limit for this task type. Examples: None (no rate limit), ‘100/s’ (hundred tasks a second), ‘100/m’ (hundred tasks a minute),`‘100/h’` (hundred tasks an hour)

  • Task.request None

    Get current request object.

  • Task.retry(args=None, kwargs=None, exc=None, throw=True, eta=None, countdown=None, max_retries=None, \*options*)[源代码]

    Retry the task.

    参数:
    • args – Positional arguments to retry with.
    • kwargs – Keyword arguments to retry with.
    • exc

      Custom exception to report when the max restart limit has been exceeded (default: MaxRetriesExceededError).

      If this argument is set and retry is called while an exception was raised (sys.exc_info() is set) it will attempt to reraise the current exception.

      If no exception was raised it will raise the exc argument provided.

    • countdown – Time in seconds to delay the retry for.
    • eta – Explicit time and date to run the retry at (must be a datetime instance).
    • max_retries – If set, overrides the default retry limit.
    • time_limit – If set, overrides the default time limit.
    • soft_time_limit – If set, overrides the default soft time limit.
    • **options – Any extra options to pass on to meth:apply_async.
    • throw – If this is False, do not raise the Retry exception, that tells the worker to mark the task as being retried. Note that this means the task will be marked as failed if the task raises an exception, or successful if it returns.
    引发 celery.exceptions.Retry:
     

    To tell the worker that the task has been re-sent for retry. This always happens, unless the throw keyword argument has been explicitly set to False, and is considered normal operation.

    Example

    1. >>> from imaginary_twitter_lib import Twitter
    2. >>> from proj.celery import app
    3. >>> @app.task()
    4. ... def tweet(auth, message):
    5. ... twitter = Twitter(oauth=auth)
    6. ... try:
    7. ... twitter.post_status_update(message)
    8. ... except twitter.FailWhale as exc:
    9. ... # Retry in 5 minutes.
    10. ... raise tweet.retry(countdown=60 * 5, exc=exc)

    Although the task will never return above as retry raises an exception to notify the worker, we use raise in front of the retry to convey that the rest of the block will not be executed.

  • Task.run(\args, **kwargs*)[源代码]

    The body of the task executed by workers.

  • Task.s(\args, **kwargs*)[源代码]

    .s(*a, **k) -> .subtask(a, k)

  • Task.send_error_emails = False

    If enabled an email will be sent to ADMINS whenever a task of this type fails.

  • Task.serializer = ‘pickle’

    The name of a serializer that are registered with kombu.serialization.registry. Default is ‘pickle’.

  • Task.si(\args, **kwargs*)[源代码]

    .si(*a, **k) -> .subtask(a, k, immutable=True)

  • Task.soft_time_limit = None

    Soft time limit. Defaults to the CELERYD_TASK_SOFT_TIME_LIMIT setting.

  • Task.starmap(it)[源代码]

    Creates a xstarmap task from it.

  • Task.store_errors_even_if_ignored = False

    When enabled errors will be stored even if the task is otherwise configured to ignore results.

  • Task.subtask(args=None, \starargs, **starkwargs*)[源代码]

    Return signature object for this task, wrapping arguments and execution options for a single task invocation.

  • Task.throws = ()

    List/tuple of expected exceptions.

    These are errors that are expected in normal operation and that should not be regarded as a real error by the worker. Currently this means that the state will be updated to an error state, but the worker will not log the event as an error.

  • Task.time_limit = None

    Hard time limit. Defaults to the CELERYD_TASK_TIME_LIMIT setting.

  • Task.track_started = False

    If enabled the task will report its status as ‘started’ when the task is executed by a worker. Disabled by default as the normal behaviour is to not report that level of granularity. Tasks are either pending, finished, or waiting to be retried.

    Having a ‘started’ status can be useful for when there are long running tasks and there is a need to report which task is currently running.

    The application default can be overridden using the CELERY_TRACK_STARTED setting.

  • Task.trail = True

    If enabled the request will keep track of subtasks started by this task, and this information will be sent with the result (result.children).

  • Task.update_state(task_id=None, state=None, meta=None)[源代码]

    Update task state.

    参数:
    • task_id – Id of the task to update, defaults to the id of the current task
    • state – New state (str).
    • meta – State metadata (dict).

class celery.app.task.TaskType[源代码]

Meta class for tasks.

Automatically registers the task in the task registry (except if the Task.abstract` attribute is set).

If no Task.name attribute is provided, then the name is generated from the module and class name.