celery.worker.consumer
celery.worker.consumer
This module contains the components responsible for consuming messages from the broker, processing the messages and keeping the broker connections up and running.
class celery.worker.consumer.Consumer(on_task_request, init_callback=<function noop at 0xb08df7c>, hostname=None, pool=None, app=None, timer=None, controller=None, hub=None, amqheartbeat=None, worker_options=None, disable_rate_limits=False, initial_prefetch_count=2, prefetch_multiplier=1, \*kwargs*)[源代码]
class Blueprint(steps=None, name=None, app=None, on_start=None, on_close=None, on_stopped=None)
default_steps = [‘celery.worker.consumer:Connection’, ‘celery.worker.consumer:Mingle’, ‘celery.worker.consumer:Events’, ‘celery.worker.consumer:Gossip’, ‘celery.worker.consumer:Heart’, ‘celery.worker.consumer:Control’, ‘celery.worker.consumer:Tasks’, ‘celery.worker.consumer:Evloop’, ‘celery.worker.consumer:Agent’]
name = ‘Consumer’
shutdown(parent)
Consumer.Strategies
dict 的别名
Consumer.add_task_queue(queue, exchange=None, exchange_type=None, routing_key=None, \*options*)[源代码]
Consumer.apply_eta_task(task)[源代码]
Method called by the timer to apply a task with an ETA/countdown.
Consumer.bucket_for_task(type)
Consumer.cancel_task_queue(queue)[源代码]
Consumer.connect()
Establish the broker connection.
Will retry establishing the connection if the BROKER_CONNECTION_RETRY setting is enabled
Consumer.create_task_handler()
Consumer.in_shutdown = False
set when consumer is shutting down.
Consumer.init_callback = None
Optional callback called the first time the worker is ready to receive tasks.
Consumer.loop_args()
Consumer.on_close()
Consumer.on_decode_error(message, exc)[源代码]
Callback called if an error occurs while decoding a message received.
Simply logs the error and acknowledges the message so it doesn’t enter a loop.
参数: - message – The message with errors.
- exc – The original exception instance.
Consumer.on_invalid_task(body, message, exc)
Consumer.on_ready()
Consumer.on_unknown_message(body, message)
Consumer.on_unknown_task(body, message, exc)
Consumer.pool = None
The current worker pool instance.
Consumer.register_with_event_loop(hub)
Consumer.reset_rate_limits()
Consumer.restart_count = -1
Consumer.shutdown()
Consumer.start()[源代码]
Consumer.stop()[源代码]
Consumer.timer = None
A timer used for high-priority internal tasks, such as sending heartbeats.
Consumer.update_strategies()[源代码]
class celery.worker.consumer.Connection(c, \*kwargs*)
info(c, params=’N/A’)
name = u’celery.worker.consumer.Connection’
requires = ()
shutdown(c)
start(c)
class celery.worker.consumer.Events(c, send_events=None, \*kwargs*)
name = u’celery.worker.consumer.Events’
requires = (step:celery.worker.consumer.Connection{()},)
shutdown(c)
start(c)
stop(c)
class celery.worker.consumer.Heart(c, without_heartbeat=False, \*kwargs*)
name = u’celery.worker.consumer.Heart’
requires = (step:celery.worker.consumer.Events{(step:celery.worker.consumer.Connection{()},)},)
shutdown(c)
start(c)
stop(c)
class celery.worker.consumer.Control(c, \*kwargs*)
include_if(c)
name = u’celery.worker.consumer.Control’
requires = (step:celery.worker.consumer.Mingle{(step:celery.worker.consumer.Events{(step:celery.worker.consumer.Connection{()},)},)},)
class celery.worker.consumer.Tasks(c, \*kwargs*)
info(c)
name = u’celery.worker.consumer.Tasks’
requires = (step:celery.worker.consumer.Events{(step:celery.worker.consumer.Connection{()},)},)
shutdown(c)
start(c)
stop(c)
class celery.worker.consumer.Evloop(parent, \*kwargs*)
label = ‘event loop’
last = True
name = u’celery.worker.consumer.Evloop’
patch_all(c)
requires = ()
start(c)
class celery.worker.consumer.Agent(c, \*kwargs*)
conditional = True
create(c)
name = u’celery.worker.consumer.Agent’
requires = (step:celery.worker.consumer.Connection{()},)
class celery.worker.consumer.Mingle(c, without_mingle=False, \*kwargs*)
compatible_transport(app)
compatible_transports = set([‘redis’, ‘amqp’])
label = ‘Mingle’
name = u’celery.worker.consumer.Mingle’
requires = (step:celery.worker.consumer.Events{(step:celery.worker.consumer.Connection{()},)},)
start(c)
class celery.worker.consumer.Gossip(c, without_gossip=False, interval=5.0, \*kwargs*)
call_task(task)
compatible_transport(app)
compatible_transports = set([‘redis’, ‘amqp’])
election(id, topic, action=None)
get_consumers(channel)
label = ‘Gossip’
name = u’celery.worker.consumer.Gossip’
on_elect(event)
on_elect_ack(event)
on_message(prepare, message)
on_node_join(worker)
on_node_leave(worker)
on_node_lost(worker)
periodic()
register_timer()
requires = (step:celery.worker.consumer.Mingle{(step:celery.worker.consumer.Events{(step:celery.worker.consumer.Connection{()},)},)},)
start(c)
celery.worker.consumer.dump_body(m, body)[源代码]