celery.contrib.migrate

celery.contrib.migrate

Migration tools.

exception celery.contrib.migrate.StopFiltering[源代码]

class celery.contrib.migrate.State[源代码]

  • count = 0

  • filtered = 0

  • strtotal None[源代码]

  • total_apx = 0

celery.contrib.migrate.republish(producer, message, exchange=None, routing_key=None, remove_props=[u’application_headers’, u’content_type’, u’content_encoding’, u’headers’])[源代码]

celery.contrib.migrate.migrate_task(producer, body_, message, queues=None)[源代码]

celery.contrib.migrate.migrate_tasks(source, dest, migrate=<function migrate_task at 0xd5c2614>, app=None, queues=None, \*kwargs*)[源代码]

celery.contrib.migrate.move(predicate, connection=None, exchange=None, routing_key=None, source=None, app=None, callback=None, limit=None, transform=None, \*kwargs*)[源代码]

Find tasks by filtering them and move the tasks to a new queue.

参数:
  • predicate

    Filter function used to decide which messages to move. Must accept the standard signature of (body, message) used by Kombu consumer callbacks. If the predicate wants the message to be moved it must return either:

    1. a tuple of (exchange, routing_key), or
    2. a Queue instance, or
    3. any other true value which means the specified exchange and routing_key arguments will be used.
  • connection – Custom connection to use.
  • source – Optional list of source queues to use instead of the default (which is the queues in CELERY_QUEUES). This list can also contain new Queue instances.
  • exchange – Default destination exchange.
  • routing_key – Default destination routing key.
  • limit – Limit number of messages to filter.
  • callback – Callback called after message moved, with signature (state, body, message).
  • transform – Optional function to transform the return value (destination) of the filter function.

Also supports the same keyword arguments as start_filter().

To demonstrate, the move_task_by_id() operation can be implemented like this:

  1. def is_wanted_task(body, message):
  2. if body['id'] == wanted_id:
  3. return Queue('foo', exchange=Exchange('foo'),
  4. routing_key='foo')
  5. move(is_wanted_task)

or with a transform:

  1. def transform(value):
  2. if isinstance(value, string_t):
  3. return Queue(value, Exchange(value), value)
  4. return value
  5. move(is_wanted_task, transform=transform)

The predicate may also return a tuple of (exchange, routing_key) to specify the destination to where the task should be moved, or a Queue instance. Any other true value means that the task will be moved to the default exchange/routing_key.

celery.contrib.migrate.task_id_eq(task_id, body, message)[源代码]

celery.contrib.migrate.task_id_in(ids, body, message)[源代码]

celery.contrib.migrate.start_filter(app, conn, filter, limit=None, timeout=1.0, ack_messages=False, tasks=None, queues=None, callback=None, forever=False, on_declare_queue=None, consume_from=None, state=None, accept=None, \*kwargs*)[源代码]

celery.contrib.migrate.move_task_by_id(task_id, dest, \*kwargs*)[源代码]

Find a task by id and move it to another queue.

参数:
  • task_id – Id of task to move.
  • dest – Destination queue.

Also supports the same keyword arguments as move().

celery.contrib.migrate.move_by_idmap(map, \*kwargs*)[源代码]

Moves tasks by matching from a task_id: queue mapping, where queue is a queue to move the task to.

Example:

  1. >>> move_by_idmap({
  2. ... '5bee6e82-f4ac-468e-bd3d-13e8600250bc': Queue('name'),
  3. ... 'ada8652d-aef3-466b-abd2-becdaf1b82b3': Queue('name'),
  4. ... '3a2b140d-7db1-41ba-ac90-c36a0ef4ab1f': Queue('name')},
  5. ... queues=['hipri'])

celery.contrib.migrate.move_by_taskmap(map, \*kwargs*)[源代码]

Moves tasks by matching from a task_name: queue mapping, where queue is the queue to move the task to.

Example:

  1. >>> move_by_taskmap({
  2. ... 'tasks.add': Queue('name'),
  3. ... 'tasks.mul': Queue('name'),
  4. ... })