Task Message Protocol v2 (Draft Spec.)
Notes
Support for multiple languages via the lang header.
Worker may redirect the message to a worker that supports the language.
Metadata moved to headers.
This means that workers/intermediates can inspect the message and make decisions based on the headers without decoding the payload (which may be language specific, e.g. serialized by the Python specific pickle serializer).
Body is only for language specific data.
- Python stores args/kwargs in body.
- If a message uses raw encoding then the raw data will be passed as a single argument to the function.
- Java/C, etc. can use a thrift/protobuf document as the body
Dispatches to actor based on c_type, c_meth headers
c_meth is unused by python, but may be used in the future to specify class+method pairs.
Chain gains a dedicated field.
Reducing the chain into a recursive callbacks argument causes problems when the recursion limit is exceeded.
This is fixed in the new message protocol by specifying a list of signatures, each task will then pop a task off the list when sending the next message:
execute_task(message)
chain = message.headers['chain']
if chain:
sig = maybe_signature(chain.pop())
sig.apply_async(chain=chain)
correlation_id replaces task_id field.
c_shadow lets you specify a different name for logs, monitors can be used for e.g. meta tasks that calls any function:
from celery.utils.imports import qualname
class PickleTask(Task):
abstract = True
def unpack_args(self, fun, args=()):
return fun, args
def apply_async(self, args, kwargs, **options):
fun, real_args = self.unpack_args(*args)
return super(PickleTask, self).apply_async(
(fun, real_args, kwargs), shadow=qualname(fun), **options
)
@app.task(base=PickleTask)
def call(fun, args, kwargs):
return fun(*args, **kwargs)
Undecided
May consider moving callbacks/errbacks/chain into body.
Will huge lists in headers cause overhead? The downside of keeping them in the body is that intermediates won’t be able to introspect these values.
Definition
# protocol v2 implies UTC=True
# 'class' header existing means protocol is v2
properties = {
'correlation_id': (uuid)task_id,
'content_type': (string)mime,
'content_encoding': (string)encoding,
# optional
'reply_to': (string)queue_or_url,
}
headers = {
'lang': (string)'py'
'c_type': (string)task,
# optional
'c_meth': (string)unused,
'c_shadow': (string)replace_name,
'eta': (iso8601)eta,
'expires'; (iso8601)expires,
'callbacks': (list)Signature,
'errbacks': (list)Signature,
'chain': (list)Signature, # non-recursive, reversed list of signatures
'group': (uuid)group_id,
'chord': (uuid)chord_id,
'retries': (int)retries,
'timelimit': (tuple)(soft, hard),
}
body = (args, kwargs)
Example
# chain: add(add(add(2, 2), 4), 8) == 2 + 2 + 4 + 8
task_id = uuid()
basic_publish(
message=json.dumps([[2, 2], {}]),
application_headers={
'lang': 'py',
'c_type': 'proj.tasks.add',
'chain': [
# reversed chain list
{'task': 'proj.tasks.add', 'args': (8, )},
{'task': 'proj.tasks.add', 'args': (4, )},
]
}
properties={
'correlation_id': task_id,
'content_type': 'application/json',
'content_encoding': 'utf-8',
}
)