celery.events.state

celery.events.state

This module implements a datastructure used to keep track of the state of a cluster of workers and the tasks it is working on (by consuming events).

For every event consumed the state is updated, so the state represents the state of the cluster at the time of the last event.

Snapshots (celery.events.snapshot) can be used to take “pictures” of this state at regular intervals to e.g. store that in a database.

class celery.events.state.Worker(hostname=None, pid=None, freq=60, heartbeats=None, clock=0, active=None, processed=None, loadavg=None, sw_ident=None, sw_ver=None, sw_sys=None)[源代码]

Worker State.

  • active None

  • alive None[源代码]

  • clock None

  • event None

  • expire_window = 200

  • freq None

  • heartbeat_expires None[源代码]

  • heartbeat_max = 4

  • heartbeats None

  • hostname None

  • id None

  • loadavg None

  • on_heartbeat(\args, **kwargs*)[源代码]

  • on_offline(\args, **kwargs*)[源代码]

  • on_online(\args, **kwargs*)[源代码]

  • pid None

  • processed None

  • status_string None

  • sw_ident None

  • sw_sys None

  • sw_ver None

  • update(f, \*kw*)

  • update_heartbeat(\args, **kwargs*)

class celery.events.state.Task(uuid=None, \*kwargs*)[源代码]

Task State.

  • args = None

  • as_dict()

  • client = None

  • clock = 0

  • eta = None

  • event(type_, timestamp=None, local_received=None, fields=None, precedence=<function precedence at 0xadd495c>, items=<function items at 0x93eea04>, dict=<type ‘dict’>, PENDING=’PENDING’, RECEIVED=’RECEIVED’, STARTED=’STARTED’, FAILURE=’FAILURE’, RETRY=’RETRY’, SUCCESS=’SUCCESS’, REVOKED=’REVOKED’)

  • exception = None

  • exchange = None

  • expires = None

  • failed = None

  • info(fields=None, extra=[])[源代码]

    Information about this task suitable for on-screen display.

  • kwargs = None

  • merge(\args, **kwargs*)[源代码]

  • merge_rules = {‘RECEIVED’: (‘name’, ‘args’, ‘kwargs’, ‘retries’, ‘eta’, ‘expires’)}

    How to merge out of order events. Disorder is detected by logical ordering (e.g. task-received must have happened before a task-failed event).

    A merge rule consists of a state and a list of fields to keep from that state. (RECEIVED, (‘name’, ‘args’), means the name and args fields are always taken from the RECEIVED state, and any values for these fields received before or after is simply ignored.

  • name = None

  • on_failed(\args, **kwargs*)[源代码]

  • on_received(\args, **kwargs*)[源代码]

  • on_retried(\args, **kwargs*)[源代码]

  • on_revoked(\args, **kwargs*)[源代码]

  • on_sent(\args, **kwargs*)[源代码]

  • on_started(\args, **kwargs*)[源代码]

  • on_succeeded(\args, **kwargs*)[源代码]

  • on_unknown_event(\args, **kwargs*)[源代码]

  • origin None

  • ready None[源代码]

  • received = None

  • result = None

  • retried = None

  • retries = None

  • revoked = None

  • routing_key = None

  • runtime = None

  • sent = None

  • started = None

  • state = ‘PENDING’

  • succeeded = None

  • timestamp = None

  • traceback = None

  • update(\args, **kwargs*)[源代码]

  • worker = None

class celery.events.state.State(callback=None, workers=None, tasks=None, taskheap=None, max_workers_in_memory=5000, max_tasks_in_memory=10000, on_node_join=None, on_node_leave=None)[源代码]

Records clusters state.

  • class Task(uuid=None, \*kwargs*)

    Task State.

    • args = None

    • as_dict()

    • client = None

    • clock = 0

    • eta = None

    • event(type_, timestamp=None, local_received=None, fields=None, precedence=<function precedence at 0xadd495c>, items=<function items at 0x93eea04>, dict=<type ‘dict’>, PENDING=’PENDING’, RECEIVED=’RECEIVED’, STARTED=’STARTED’, FAILURE=’FAILURE’, RETRY=’RETRY’, SUCCESS=’SUCCESS’, REVOKED=’REVOKED’)

    • exception = None

    • exchange = None

    • expires = None

    • failed = None

    • info(fields=None, extra=[])

      Information about this task suitable for on-screen display.

    • kwargs = None

    • merge(\args, **kwargs*)

    • merge_rules = {‘RECEIVED’: (‘name’, ‘args’, ‘kwargs’, ‘retries’, ‘eta’, ‘expires’)}

    • name = None

    • on_failed(\args, **kwargs*)

    • on_received(\args, **kwargs*)

    • on_retried(\args, **kwargs*)

    • on_revoked(\args, **kwargs*)

    • on_sent(\args, **kwargs*)

    • on_started(\args, **kwargs*)

    • on_succeeded(\args, **kwargs*)

    • on_unknown_event(\args, **kwargs*)

    • origin None

    • ready None

    • received = None

    • result = None

    • retried = None

    • retries = None

    • revoked = None

    • routing_key = None

    • runtime = None

    • sent = None

    • started = None

    • state = ‘PENDING’

    • succeeded = None

    • timestamp = None

    • traceback = None

    • update(\args, **kwargs*)

    • worker = None

  • class State.Worker(hostname=None, pid=None, freq=60, heartbeats=None, clock=0, active=None, processed=None, loadavg=None, sw_ident=None, sw_ver=None, sw_sys=None)

    Worker State.

    • active None

    • alive None

    • clock None

    • event None

    • expire_window = 200

    • freq None

    • heartbeat_expires None

    • heartbeat_max = 4

    • heartbeats None

    • hostname None

    • id None

    • loadavg None

    • on_heartbeat(\args, **kwargs*)

    • on_offline(\args, **kwargs*)

    • on_online(\args, **kwargs*)

    • pid None

    • processed None

    • status_string None

    • sw_ident None

    • sw_sys None

    • sw_ver None

    • update(f, \*kw*)

    • update_heartbeat(\args, **kwargs*)

  • State.alive_workers()[源代码]

    Return a list of (seemingly) alive workers.

  • State.clear(ready=True)[源代码]

  • State.clear_tasks(ready=True)[源代码]

  • State.event(event)[源代码]

  • State.event_count = 0

  • State.freeze_while(fun, \args, **kwargs*)[源代码]

  • State.get_or_create_task(uuid)[源代码]

    Get or create task by uuid.

  • State.get_or_create_worker(hostname, \*kwargs*)[源代码]

    Get or create worker by hostname.

    Return tuple of (worker, was_created).

  • State.heap_multiplier = 4

  • State.itertasks(limit=None)[源代码]

  • State.rebuild_taskheap(timetuple=<class ‘kombu.clocks.timetuple’>, heapify=<built-in function heapify>)

  • State.task_count = 0

  • State.task_event(type_, fields)[源代码]

    Deprecated, use event().

  • State.task_types()[源代码]

    Return a list of all seen task types.

  • State.tasks_by_time(limit=None)

    Generator giving tasks ordered by time, in (uuid, Task) tuples.

  • State.tasks_by_timestamp(limit=None)[源代码]

    Generator giving tasks ordered by time, in (uuid, Task) tuples.

  • State.tasks_by_type(name, limit=None)[源代码]

    Get all tasks by type.

    Return a list of (uuid, Task) tuples.

  • State.tasks_by_worker(hostname, limit=None)[源代码]

    Get all tasks by worker.

  • State.worker_event(type_, fields)[源代码]

    Deprecated, use event().

celery.events.state.heartbeat_expires(timestamp, freq=60, expire_window=200, Decimal=<class ‘decimal.Decimal’>, float=<type ‘float’>, isinstance=<built-in function isinstance>)[源代码]