Create Executor
Executor process DocumentArray
in-place via functions decorated with @requests
. To create an Executor, you only need to follow three principles:
An
Executor
should subclass directly fromjina.Executor
class.An
Executor
class is a bag of functions with shared state (viaself
); it can contain an arbitrary number of functions with arbitrary names.Functions decorated by
@requests
will be invoked according to theiron=
endpoint. These functions can be coroutines (async def
) or regular functions.
Minimum working example
from jina import Executor, requests
class MyExecutor(Executor):
@requests
def foo(self, **kwargs):
print(kwargs)
Use it in a Flow
from jina import Executor
f = Flow().add(uses=MyExecutor)
with f:
f.post(on='/random_work', inputs=Document(), on_done=print)
Use it as-is
m = MyExecutor()
m.foo()
Using Executors with AsyncIO
For I/O bound Executors it can be helpful to utilize Python’s AsyncIO API. This means we can wait for multiple pending Executor function calls concurrently.
import asyncio
from jina import Executor, requests
class MyExecutor(Executor):
@requests
async def foo(self, **kwargs):
await asyncio.sleep(1.0)
print(kwargs)
async def main():
m = MyExecutor()
call1 = asyncio.create_task(m.foo())
call2 = asyncio.create_task(m.foo())
await asyncio.gather(call1, call2)
asyncio.run(main())
Constructor
Subclass
Every new executor should be subclass from jina.Executor
.
You can name your executor class freely.
__init__
No need to implement __init__
if your Executor
does not contain initial states.
If your executor has __init__
, it needs to carry **kwargs
in the signature and call super().__init__(**kwargs)
in the body:
from jina import Executor
class MyExecutor(Executor):
def __init__(self, foo: str, bar: int, **kwargs):
super().__init__(**kwargs)
self.bar = bar
self.foo = foo
What is inside kwargs?
Here, kwargs
contains metas
and requests
(representing the request-to-function mapping) values from the YAML config and runtime_args
injected on startup.
You can access the values of these arguments in __init__
body via self.metas
/self.requests
/self.runtime_args
, or modifying their values before sending to super().__init__()
.
Passing arguments
When using an Executor in a Flow, there are two ways of passing arguments to its __init__
.
via uses_with
from jina import Flow
f = Flow.add(uses=MyExecutor, uses_with={'foo': 'hello', 'bar': 1})
with f:
...
via predefined YAML
my-exec.yml
jtype: MyExecutor
with:
foo: hello
bar: 1
``` my-flow.py
![](/projects/jina-2.7.0-en/560a334f905a2c3f3a88dc4cbf6917b0.svg)
![](/projects/jina-2.7.0-en/7d4a10a8b8976688e4d950af40acec01.svg)
from jina import Flow
f = Flow.add(uses=’my-exec.yml’)
with f: …
Hint
`uses_with` has higher priority than predefined `with` config in YAML. When both presented, `uses_with` is picked up first.
## Methods
Methods of `Executor` can be named and written freely.
Only methods that are decorated with `@requests` can be used in a `Flow`.
### Method decorator
You can import `requests` decorator via
from jina import requests
`@requests` defines when a function will be invoked in the Flow. It has a keyword `on=` to define the endpoint.
To call an Executor’s function, uses `Flow.post(on=..., ...)`. For example, given:
from jina import Executor, Flow, Document, requests
class MyExecutor(Executor):
@requests(on='/index')
def foo(self, **kwargs):
print(f'foo is called: {kwargs}')
@requests(on='/random_work')
def bar(self, **kwargs):
print(f'bar is called: {kwargs}')
f = Flow().add(uses=MyExecutor)
with f: f.post(on=’/index’, inputs=Document(text=’index’)) f.post(on=’/random_work’, inputs=Document(text=’random_work’)) f.post(on=’/blah’, inputs=Document(text=’blah’))
Then:
- `f.post(on='/index', ...)` will trigger `MyExecutor.foo`;
- `f.post(on='/random_work', ...)` will trigger `MyExecutor.bar`;
- `f.post(on='/blah', ...)` will not trigger any function, as no function is bound to `/blah`;
#### Default binding
A class method decorated with plain `@requests` (without `on=`) is the default handler for all endpoints. That means it is the fallback handler for endpoints that are not found. `f.post(on='/blah', ...)` will invoke `MyExecutor.foo`
from jina import Executor, requests
class MyExecutor(Executor):
@requests
def foo(self, **kwargs):
print(kwargs)
@requests(on='/index')
def bar(self, **kwargs):
print(kwargs)
#### Multiple bindings
To bind a method with multiple endpoints, you can use `@requests(on=['/foo', '/bar'])`. This allows either `f.post(on='/foo', ...)` or `f.post(on='/bar', ...)` to invoke that function.
#### No binding
A class with no `@requests` binding plays no part in the Flow. The request will simply pass through without any processing.
### Method signature
Class method decorated by `@request` follows the signature below (`async` is optional):
async def foo(docs: DocumentArray, parameters: Dict, docs_matrix: List[DocumentArray], groundtruths: Optional[DocumentArray], groundtruths_matrix: List[DocumentArray]) -> Optional[DocumentArray]: pass
If the function is using `async` in its signature, it will be used as a coroutine and the regular `asyncio` functionality is available inside the function.
The Executor’s methods receive the following arguments in order:
<table><thead><tr><th>Name</th><th>Type</th><th>Description</th></tr></thead><tbody><tr><td><code>docs</code></td><td><code>DocumentArray</code></td><td><code>Request.docs</code>. When multiple requests are available, it is a concatenation of all <code>Request.docs</code> as one <code>DocumentArray</code>.</td></tr><tr><td><code>parameters</code></td><td><code>Dict</code></td><td><code>Request.parameters</code>, given by <code>Flow.post(..., parameters=)</code></td></tr><tr><td><code>docs_matrix</code></td><td><code>List[DocumentArray]</code></td><td>When multiple requests are available, it is a list of all <code>Request.docs</code>. On single request, it is <code>None</code></td></tr><tr><td><code>groundtruths</code></td><td><code>Optional[DocumentArray]</code></td><td><code>Request.groundtruths</code>. Same behavior as <code>docs</code></td></tr><tr><td><code>groundtruths_matrix</code></td><td><code>List[DocumentArray]</code></td><td>Same behavior as <code>docs_matrix</code> but on <code>Request.groundtruths</code></td></tr></tbody></table>
Note
Executor’s methods not decorated with `@request` do not enjoy these arguments.
Note
The arguments order is designed as common-usage-first. Not alphabetical order or semantic closeness.
Hint
If you don’t need some arguments, you can suppress them into `**kwargs`. For example:
from jina import Executor, requests
class MyExecutor(Executor):
@requests
def foo_using_docs_arg(self, docs, **kwargs):
print(docs)
@requests
def foo_using_docs_parameters_arg(self, docs, parameters, **kwargs):
print(docs)
print(parameters)
@requests
def foo_using_no_arg(self, **kwargs):
# the args are suppressed into kwargs
print(kwargs['docs_matrix'])
### Method returns
Methods decorated with `@request` can return `DocumentArray`, `DocumentArrayMemmap`, `Dict` or `None`.
- If the return is `None`, then Jina considers all changes to happened in-place. The next Executor will receive the updated `docs` modified by the current Executor.
- If the return is `DocumentArray` or `DocumentArrayMemmap`, then the current `docs` field in the `Request` will be overridden by the return, which will be forwarded to the next Executor in the Flow.
- If the return is a `Dict`, then `Request.parameters` will be updated by union with the return. The next Executor will receive this updated `Request.parameters`. One can leverage this feature to pass parameters between Executors.
So do I need a return? Most time you don’t. Let’s see some examples.
#### Embed Documents `blob`
In this example, `encode()` uses some neural network to get the embedding for each `Document.blob`, then assign it to `Document.embedding`. The whole procedure is in-place and there is no need to return anything.
import numpy as np from jina import requests, Executor, DocumentArray
from my_model import get_predict_model
class PNEncoder(Executor): def init(self, kwargs): super().init(kwargs) self.model = get_predict_model()
@requests
def encode(self, docs: DocumentArray, *args, **kwargs) -> None:
_blob, _docs = docs.traverse_flat(['c']).get_attributes_with_docs('blob')
embeds = self.model.predict(np.stack(_blob))
for d, b in zip(_docs, embeds):
d.embedding = b
#### Add Chunks by segmenting Document
In this example, each `Document` is segmented by `get_mesh` and the results are added to `.chunks`. After that, `.buffer` and `.uri` are removed from each `Document`. In this case, all changes happen in-place and there is no need to return anything.
from jina import requests, Document, Executor, DocumentArray
class ConvertSegmenter(Executor):
@requests
def segment(self, docs: DocumentArray, **kwargs) -> None:
for d in docs:
d.load_uri_to_buffer()
d.chunks = [Document(blob=_r['blob'], tags=_r['tags']) for _r in get_mesh(d.content)]
d.pop('buffer', 'uri')
#### Preserve Document `id` only
In this example, a simple indexer stores incoming `docs` in a `DocumentArray`. Then it recreates a new `DocumentArray` by preserving only `id` in the original `docs` and dropping all others, as the developer does not want to carry all rich info over the network. This needs a return.
from jina import requests, Document, Executor, DocumentArray
class MyIndexer(Executor): “””Simple indexer class “””
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._docs = DocumentArray()
@requests(on='/index')
def index(self, docs: DocumentArray, **kwargs):
self._docs.extend(docs)
return DocumentArray([Document(id=d.id) for d in docs])
#### Pass/change request parameters
In this example, `MyExec2` receives the parameters `{'top_k': 10}` from `MyExec1` when the Flow containing `MyExec1 -> MyExec2` in order.
from jina import requests, Document, Executor
class MyExec1(Executor):
@requests(on='/index')
def index(self, **kwargs):
return {'top_k': 10}
class MyExec2(Executor):
@requests(on='/index')
def index(self, parameters, **kwargs):
self.docs[:int(parameters['top_k']))
```