jina.orchestrate.flow.base module
class jina.orchestrate.flow.base.Flow(**, asyncio: Optional[bool] = ‘False’, host: Optional[str] = “‘0.0.0.0’”, https: Optional[bool] = ‘False’, port: Optional[int] = ‘None’, protocol: Optional[str] = “‘GRPC’”, proxy: Optional[bool] = ‘False’, results_as_docarray: Optional[bool] = ‘False’, **kwargs*)[source]
class jina.orchestrate.flow.base.Flow(**, compress: Optional[str] = “‘NONE’”, compress_min_bytes: Optional[int] = ‘1024’, compress_min_ratio: Optional[float] = ‘1.1’, connection_list: Optional[str] = ‘None’, cors: Optional[bool] = ‘False’, daemon: Optional[bool] = ‘False’, default_swagger_ui: Optional[bool] = ‘False’, deployments_addresses: Optional[str] = “‘{}’”, description: Optional[str] = ‘None’, env: Optional[dict] = ‘None’, expose_endpoints: Optional[str] = ‘None’, expose_public: Optional[bool] = ‘False’, graph_description: Optional[str] = “‘{}’”, host: Optional[str] = “‘0.0.0.0’”, host_in: Optional[str] = “‘0.0.0.0’”, log_config: Optional[str] = ‘None’, name: Optional[str] = “‘gateway’”, native: Optional[bool] = ‘False’, no_crud_endpoints: Optional[bool] = ‘False’, no_debug_endpoints: Optional[bool] = ‘False’, polling: Optional[str] = “‘ANY’”, port_expose: Optional[int] = ‘None’, port_in: Optional[int] = ‘None’, prefetch: Optional[int] = ‘0’, protocol: Optional[str] = “‘GRPC’”, proxy: Optional[bool] = ‘False’, py_modules: Optional[List[str]] = ‘None’, quiet: Optional[bool] = ‘False’, quiet_error: Optional[bool] = ‘False’, replicas: Optional[int] = ‘1’, runtime_backend: Optional[str] = “‘PROCESS’”, runtime_cls: Optional[str] = “‘GRPCGatewayRuntime’”, shards: Optional[int] = ‘1’, timeout_ctrl: Optional[int] = ‘60’, timeout_ready: Optional[int] = ‘600000’, title: Optional[str] = ‘None’, uses: Optional[Union[str, Type[BaseExecutor], dict]] = “‘BaseExecutor’”, uses_after_address: Optional[str] = ‘None’, uses_before_address: Optional[str] = ‘None’, uses_metas: Optional[dict] = ‘None’, uses_requests: Optional[dict] = ‘None’, uses_with: Optional[dict] = ‘None’, uvicorn_kwargs: Optional[dict] = ‘None’, workspace: Optional[str] = ‘None’, **kwargs*)
class jina.orchestrate.flow.base.Flow(**, env: Optional[dict] = ‘None’, inspect: Optional[str] = “‘COLLECT’”, log_config: Optional[str] = ‘None’, name: Optional[str] = ‘None’, polling: Optional[str] = “‘ANY’”, quiet: Optional[bool] = ‘False’, quiet_error: Optional[bool] = ‘False’, timeout_ctrl: Optional[int] = ‘60’, uses: Optional[str] = ‘None’, workspace: Optional[str] = ‘None’, **kwargs*)
Bases: jina.clients.mixin.PostMixin, jina.jaml.JAMLCompatible, contextlib.ExitStack
Flow is how Jina streamlines and distributes Executors.
property last_deployment
Last deployment
needs(needs, name=’joiner’, \args, **kwargs*)[source]
Add a blocker to the Flow, wait until all pods defined in needs completed.
Parameters
needs (
Union
[Tuple
[str
],List
[str
]]) – list of service names to waitname (
str
) – the name of this joiner, by default isjoiner
args – additional positional arguments forwarded to the add function
kwargs – additional key value arguments forwarded to the add function
Return type
Returns
the modified Flow
needs_all(name=’joiner’, \args, **kwargs*)[source]
Collect all hanging Deployments so far and add a blocker to the Flow; wait until all handing pods completed.
Parameters
name (
str
) – the name of this joiner (default isjoiner
)args – additional positional arguments which are forwarded to the add and needs function
kwargs – additional key value arguments which are forwarded to the add and needs function
Return type
Returns
the modified Flow
add(**, connection_list: Optional[str] = ‘None’, daemon: Optional[bool] = ‘False’, docker_kwargs: Optional[dict] = ‘None’, entrypoint: Optional[str] = ‘None’, env: Optional[dict] = ‘None’, expose_public: Optional[bool] = ‘False’, external: Optional[bool] = ‘False’, force_update: Optional[bool] = ‘False’, gpus: Optional[str] = ‘None’, host: Optional[str] = “‘0.0.0.0’”, host_in: Optional[str] = “‘0.0.0.0’”, install_requirements: Optional[bool] = ‘False’, log_config: Optional[str] = ‘None’, name: Optional[str] = ‘None’, native: Optional[bool] = ‘False’, polling: Optional[str] = “‘ANY’”, port_in: Optional[int] = ‘None’, port_jinad: Optional[int] = ‘8000’, pull_latest: Optional[bool] = ‘False’, py_modules: Optional[List[str]] = ‘None’, quiet: Optional[bool] = ‘False’, quiet_error: Optional[bool] = ‘False’, quiet_remote_logs: Optional[bool] = ‘False’, replicas: Optional[int] = ‘1’, runtime_backend: Optional[str] = “‘PROCESS’”, runtime_cls: Optional[str] = “‘WorkerRuntime’”, shards: Optional[int] = ‘1’, timeout_ctrl: Optional[int] = ‘60’, timeout_ready: Optional[int] = ‘600000’, upload_files: Optional[List[str]] = ‘None’, uses: Optional[Union[str, Type[‘BaseExecutor’], dict]] = “‘BaseExecutor’”, uses_after: Optional[Union[str, Type[‘BaseExecutor’], dict]] = ‘None’, uses_after_address: Optional[str] = ‘None’, uses_before: Optional[Union[str, Type[‘BaseExecutor’], dict]] = ‘None’, uses_before_address: Optional[str] = ‘None’, uses_metas: Optional[dict] = ‘None’, uses_requests: Optional[dict] = ‘None’, uses_with: Optional[dict] = ‘None’, volumes: Optional[List[str]] = ‘None’, workspace: Optional[str] = ‘None’, **kwargs*) → Union[‘Flow’, ‘AsyncFlow’][source]
Add a Deployment to the current Flow object and return the new modified Flow object. The attribute of the Deployment can be later changed with
set()
or deleted withremove()
Parameters
needs (
Union
[str
,Tuple
[str
],List
[str
],None
]) – the name of the Deployment(s) that this Deployment receives data from. One can also use ‘gateway’ to indicate the connection with the gateway.deployment_role (DeploymentRoleType) – the role of the Deployment, used for visualization and route planning
copy_flow (
bool
) – when set to true, then always copy the current Flow and do the modification on top of it then return, otherwise, do in-line modificationkwargs – other keyword-value arguments that the Deployment CLI supports
Return type
Returns
a (new) Flow object with modification
inspect(name=’inspect’, \args, **kwargs*)[source]
Add an inspection on the last changed Deployment in the Flow
Internally, it adds two Deployments to the Flow. But don’t worry, the overhead is minimized and you can remove them by simply using Flow(inspect=FlowInspectType.REMOVE) before using the Flow.
Flow -- PUB-SUB -- BaseDeployment(_pass) -- Flow
|
-- PUB-SUB -- InspectDeployment (Hanging)
In this way,
InspectDeployment
looks like a simple_pass
from outside and does not introduce side-effects (e.g. changing the socket type) to the original Flow. The original incoming and outgoing socket types are preserved.This function is very handy for introducing an Evaluator into the Flow.
See also
Parameters
name (
str
) – name of the Deploymentargs – args for .add()
kwargs – kwargs for .add()
Return type
Returns
the new instance of the Flow
gather_inspect(name=’gather_inspect’, include_last_deployment=True, \args, **kwargs*)[source]
Gather all inspect Deployments output into one Deployment. When the Flow has no inspect Deployment then the Flow itself is returned.
Note
If
--no-inspect
is not given, then gather_inspect() is auto called before build(). So in general you don’t need to manually call gather_inspect().Parameters
name (
str
) – the name of the gather Deploymentinclude_last_deployment (
bool
) – if to include the last modified Deployment in the Flowargs – args for .add()
kwargs – kwargs for .add()
Return type
Returns
the modified Flow or the copy of it
See also
build(copy_flow=False)[source]
Build the current Flow and make it ready to use
Note
No need to manually call it since 0.0.8. When using Flow with the context manager, or using start(), build() will be invoked.
Parameters
copy_flow (
bool
) – when set to true, then always copy the current Flow and do the modification on top of it then return, otherwise, do in-line modificationReturn type
Returns
the current Flow (by default)
Note
copy_flow=True
is recommended if you are building the same Flow multiple times in a row. e.g.f = Flow()
with f:
f.index()
with f.build(copy_flow=True) as fl:
fl.search()
start()[source]
Start to run all Deployments in this Flow.
Remember to close the Flow with
close()
.Note that this method has a timeout of
timeout_ready
set in CLI, which is inherited all the way from jina.orchestrate.pods.PodReturns
this instance
property num_deployments: int
Get the number of Deployments in this Flow
Return type
int
property num_pods: int
Get the number of pods (shards count) in this Flow
Return type
int
property client: BaseClient
Return a
BaseClient
object attach to this Flow.Return type
plot(output=None, vertical_layout=False, inline_display=False, build=True, copy_flow=True)[source]
Visualize the Flow up to the current point If a file name is provided it will create a jpg image with that name, otherwise it will display the URL for mermaid. If called within IPython notebook, it will be rendered inline, otherwise an image will be created.
Example,
flow = Flow().add(name='deployment_a').plot('flow.svg')
Parameters
output (
Optional
[str
]) – a filename specifying the name of the image to be created, the suffix svg/jpg determines the file type of the output imagevertical_layout (
bool
) – top-down or left-right layoutinline_display (
bool
) – show image directly inside the Jupyter Notebookbuild (
bool
) – build the Flow first before plotting, gateway connection can be better showedcopy_flow (
bool
) – when set to true, then always copy the current Flow and do the modification on top of it then return, otherwise, do in-line modification
Return type
Returns
the Flow
property port_expose: int
Return the exposed port of the gateway .. # noqa: DAR201
Return type
int
property host: str
Return the local address of the gateway .. # noqa: DAR201
Return type
str
property address_private: str
Return the private IP address of the gateway for connecting from other machine in the same network
Return type
str
property address_public: str
Return the public IP address of the gateway for connecting from other machine in the public network
Return type
str
block(stop_event=None)[source]
Block the Flow until stop_event is set or user hits KeyboardInterrupt
Parameters
stop_event (
Union
[Event
,Event
,None
]) – a threading event or a multiprocessing event that onces set will resume the control Flow to main thread.
property protocol: jina.enums.GatewayProtocolType
Return the protocol of this Flow
Return type
Returns
the protocol of this Flow
property workspace: str
Return the workspace path of the flow.
Return type
str
property workspace_id: Dict[str, str]
Get all Deployments’
workspace_id
values in a dictReturn type
Dict
[str
,str
]
property env: Optional[Dict]
Get all envs to be set in the Flow
Return type
Optional
[Dict
]Returns
envs as dict
expose_endpoint(exec_endpoint: str, path: Optional[str] = None)[source]
expose_endpoint(exec_endpoint: str, **, path: Optional[str] = ‘None’, status_code: int = ‘200’, tags: Optional[List[str]] = ‘None’, summary: Optional[str] = ‘None’, description: Optional[str] = ‘None’, response_description: str = “‘Successful Response’”, deprecated: Optional[bool] = ‘None’, methods: Optional[List[str]] = ‘None’, operation_id: Optional[str] = ‘None’, response_model_by_alias: bool = ‘True’, response_model_exclude_unset: bool = ‘False’, response_model_exclude_defaults: bool = ‘False’, response_model_exclude_none: bool = ‘False’, include_in_schema: bool = ‘True’, name: Optional[str] = ‘None’*)
Expose an Executor’s endpoint (defined by @requests(on=…)) to HTTP endpoint for easier access.
After expose, you can send data request directly to http://hostname:port/endpoint.
Parameters
exec_endpoint (
str
) – the endpoint string, by convention starts with /
# noqa: DAR101 # noqa: DAR102
join(needs, name=’joiner’, \args, **kwargs*)
Add a blocker to the Flow, wait until all pods defined in needs completed.
Parameters
needs (
Union
[Tuple
[str
],List
[str
]]) – list of service names to waitname (
str
) – the name of this joiner, by default isjoiner
args – additional positional arguments forwarded to the add function
kwargs – additional key value arguments forwarded to the add function
Return type
Returns
the modified Flow
rolling_update(deployment_name, uses_with=None)[source]
Reload all replicas of a deployment sequentially
Parameters
deployment_name (
str
) – deployment to updateuses_with (
Optional
[Dict
]) – a Dictionary of arguments to restart the executor with
to_k8s_yaml(output_base_path, k8s_namespace=None, k8s_connection_pool=True)[source]
Converts the Flow into a set of yaml deployments to deploy in Kubernetes :type output_base_path:
str
:param output_base_path: The base path where to dump all the yaml files :type k8s_namespace:Optional
[str
] :param k8s_namespace: The name of the k8s namespace to set for the configurations. If None, the name of the Flow will be used. :type k8s_connection_pool:bool
:param k8s_connection_pool: Boolean indicating wether the kubernetes connection pool should be used inside the Executor Runtimes.to_docker_compose_yaml(output_path=None, network_name=None)[source]
Converts the Flow into a yaml file to run with docker-compose up :type output_path:
Optional
[str
] :param output_path: The output path for the yaml file :type network_name:Optional
[str
] :param network_name: The name of the network that will be used by the deployment namescale(deployment_name, replicas)[source]
Scale the amount of replicas of a given Executor.
Parameters
deployment_name (
str
) – deployment to updatereplicas (
int
) – The number of replicas to scale to
property client_args: argparse.Namespace
Get Client settings.
# noqa: DAR201
Return type
Namespace
property gateway_args: argparse.Namespace
Get Gateway settings.
# noqa: DAR201
Return type
Namespace
update_network_interface(\*kwargs*)[source]
Update the network interface of this Flow (affects Gateway & Client)
Parameters
kwargs – new network settings