celery.events.state
celery.events.state
This module implements a datastructure used to keep track of the state of a cluster of workers and the tasks it is working on (by consuming events).
For every event consumed the state is updated, so the state represents the state of the cluster at the time of the last event.
Snapshots (celery.events.snapshot) can be used to take “pictures” of this state at regular intervals to e.g. store that in a database.
class celery.events.state.Worker(hostname=None, pid=None, freq=60, heartbeats=None, clock=0, active=None, processed=None, loadavg=None, sw_ident=None, sw_ver=None, sw_sys=None)[源代码]
Worker State.
active None
alive None[源代码]
clock None
event None
expire_window = 200
freq None
heartbeat_expires None[源代码]
heartbeat_max = 4
heartbeats None
hostname None
id None
loadavg None
on_heartbeat(\args, **kwargs*)[源代码]
on_offline(\args, **kwargs*)[源代码]
on_online(\args, **kwargs*)[源代码]
pid None
processed None
status_string None
sw_ident None
sw_sys None
sw_ver None
update(f, \*kw*)
update_heartbeat(\args, **kwargs*)
class celery.events.state.Task(uuid=None, \*kwargs*)[源代码]
Task State.
args = None
as_dict()
client = None
clock = 0
eta = None
event(type_, timestamp=None, local_received=None, fields=None, precedence=<function precedence at 0xadd495c>, items=<function items at 0x93eea04>, dict=<type ‘dict’>, PENDING=’PENDING’, RECEIVED=’RECEIVED’, STARTED=’STARTED’, FAILURE=’FAILURE’, RETRY=’RETRY’, SUCCESS=’SUCCESS’, REVOKED=’REVOKED’)
exception = None
exchange = None
expires = None
failed = None
info(fields=None, extra=[])[源代码]
Information about this task suitable for on-screen display.
kwargs = None
merge(\args, **kwargs*)[源代码]
merge_rules = {‘RECEIVED’: (‘name’, ‘args’, ‘kwargs’, ‘retries’, ‘eta’, ‘expires’)}
How to merge out of order events. Disorder is detected by logical ordering (e.g. task-received must have happened before a task-failed event).
A merge rule consists of a state and a list of fields to keep from that state. (RECEIVED, (‘name’, ‘args’), means the name and args fields are always taken from the RECEIVED state, and any values for these fields received before or after is simply ignored.
name = None
on_failed(\args, **kwargs*)[源代码]
on_received(\args, **kwargs*)[源代码]
on_retried(\args, **kwargs*)[源代码]
on_revoked(\args, **kwargs*)[源代码]
on_sent(\args, **kwargs*)[源代码]
on_started(\args, **kwargs*)[源代码]
on_succeeded(\args, **kwargs*)[源代码]
on_unknown_event(\args, **kwargs*)[源代码]
origin None
ready None[源代码]
received = None
result = None
retried = None
retries = None
revoked = None
routing_key = None
runtime = None
sent = None
started = None
state = ‘PENDING’
succeeded = None
timestamp = None
traceback = None
update(\args, **kwargs*)[源代码]
worker = None
class celery.events.state.State(callback=None, workers=None, tasks=None, taskheap=None, max_workers_in_memory=5000, max_tasks_in_memory=10000, on_node_join=None, on_node_leave=None)[源代码]
Records clusters state.
class Task(uuid=None, \*kwargs*)
Task State.
args = None
as_dict()
client = None
clock = 0
eta = None
event(type_, timestamp=None, local_received=None, fields=None, precedence=<function precedence at 0xadd495c>, items=<function items at 0x93eea04>, dict=<type ‘dict’>, PENDING=’PENDING’, RECEIVED=’RECEIVED’, STARTED=’STARTED’, FAILURE=’FAILURE’, RETRY=’RETRY’, SUCCESS=’SUCCESS’, REVOKED=’REVOKED’)
exception = None
exchange = None
expires = None
failed = None
info(fields=None, extra=[])
Information about this task suitable for on-screen display.
kwargs = None
merge(\args, **kwargs*)
merge_rules = {‘RECEIVED’: (‘name’, ‘args’, ‘kwargs’, ‘retries’, ‘eta’, ‘expires’)}
name = None
on_failed(\args, **kwargs*)
on_received(\args, **kwargs*)
on_retried(\args, **kwargs*)
on_revoked(\args, **kwargs*)
on_sent(\args, **kwargs*)
on_started(\args, **kwargs*)
on_succeeded(\args, **kwargs*)
on_unknown_event(\args, **kwargs*)
origin None
ready None
received = None
result = None
retried = None
retries = None
revoked = None
routing_key = None
runtime = None
sent = None
started = None
state = ‘PENDING’
succeeded = None
timestamp = None
traceback = None
update(\args, **kwargs*)
worker = None
class State.Worker(hostname=None, pid=None, freq=60, heartbeats=None, clock=0, active=None, processed=None, loadavg=None, sw_ident=None, sw_ver=None, sw_sys=None)
Worker State.
active None
alive None
clock None
event None
expire_window = 200
freq None
heartbeat_expires None
heartbeat_max = 4
heartbeats None
hostname None
id None
loadavg None
on_heartbeat(\args, **kwargs*)
on_offline(\args, **kwargs*)
on_online(\args, **kwargs*)
pid None
processed None
status_string None
sw_ident None
sw_sys None
sw_ver None
update(f, \*kw*)
update_heartbeat(\args, **kwargs*)
State.alive_workers()[源代码]
Return a list of (seemingly) alive workers.
State.clear(ready=True)[源代码]
State.clear_tasks(ready=True)[源代码]
State.event(event)[源代码]
State.event_count = 0
State.freeze_while(fun, \args, **kwargs*)[源代码]
State.get_or_create_task(uuid)[源代码]
Get or create task by uuid.
State.get_or_create_worker(hostname, \*kwargs*)[源代码]
Get or create worker by hostname.
Return tuple of (worker, was_created).
State.heap_multiplier = 4
State.itertasks(limit=None)[源代码]
State.rebuild_taskheap(timetuple=<class ‘kombu.clocks.timetuple’>, heapify=<built-in function heapify>)
State.task_count = 0
State.task_event(type_, fields)[源代码]
Deprecated, use event().
State.task_types()[源代码]
Return a list of all seen task types.
State.tasks_by_time(limit=None)
Generator giving tasks ordered by time, in (uuid, Task) tuples.
State.tasks_by_timestamp(limit=None)[源代码]
Generator giving tasks ordered by time, in (uuid, Task) tuples.
State.tasks_by_type(name, limit=None)[源代码]
Get all tasks by type.
Return a list of (uuid, Task) tuples.
State.tasks_by_worker(hostname, limit=None)[源代码]
Get all tasks by worker.
State.worker_event(type_, fields)[源代码]
Deprecated, use event().
celery.events.state.heartbeat_expires(timestamp, freq=60, expire_window=200, Decimal=<class ‘decimal.Decimal’>, float=<type ‘float’>, isinstance=<built-in function isinstance>)[源代码]