jina.serve.networking module
class jina.serve.networking.ReplicaList[source]
Bases: object
Maintains a list of connections to replicas and uses round robin for selecting a replica
add_connection(address)[source]
Add connection with address to the connection list :type address:
str
:param address: Target address of this connectionasync remove_connection(address)[source]
Remove connection with address from the connection list :type address:
str
:param address: Remove connection for this address :returns: The removed connection or None if there was not any for the given addressget_next_connection()[source]
Returns a connection from the list. Strategy is round robin :returns: A connection from the pool
get_all_connections()[source]
Returns all available connections :returns: A complete list of all connections from the pool
has_connection(address)[source]
Checks if a connection for ip exists in the list :type address:
str
:param address: The address to check :rtype:bool
:returns: True if a connection for the ip exists in the listhas_connections()[source]
Checks if this contains any connection :rtype:
bool
:returns: True if any connection is managed, False otherwiseasync close()[source]
Close all connections and clean up internal state
class jina.serve.networking.GrpcConnectionPool(logger=None)[source]
Bases: object
Manages a list of grpc connections.
Parameters
logger (
Optional
[JinaLogger]) – the logger to usesend_request(request, deployment, head=False, shard_id=None, polling_type=PollingType.ANY, endpoint=None)[source]
Send a single message to target via one or all of the pooled connections, depending on polling_type. Convenience function wrapper around send_request. :type request: Request :param request: a single request to send :type deployment:
str
:param deployment: name of the Jina deployment to send the message to :type head:bool
:param head: If True it is send to the head, otherwise to the worker pods :type shard_id:Optional
[int
] :param shard_id: Send to a specific shard of the deployment, ignored for polling ALL :type polling_type: PollingType :param polling_type: defines if the message should be send to any or all pooled connections for the target :type endpoint:Optional
[str
] :param endpoint: endpoint to target with the request :rtype:List
[Task
] :return: list of asyncio.Task items for each send callsend_requests(requests, deployment, head=False, shard_id=None, polling_type=PollingType.ANY, endpoint=None)[source]
Send a request to target via one or all of the pooled connections, depending on polling_type
Parameters
requests (
List
[Request]) – request (DataRequest/ControlRequest) to senddeployment (
str
) – name of the Jina deployment to send the request tohead (
bool
) – If True it is send to the head, otherwise to the worker podsshard_id (
Optional
[int
]) – Send to a specific shard of the deployment, ignored for polling ALLpolling_type (PollingType) – defines if the request should be send to any or all pooled connections for the target
endpoint (
Optional
[str
]) – endpoint to target with the requests
Return type
List
[Task
]Returns
list of asyncio.Task items for each send call
send_request_once(request, deployment, head=False, shard_id=None)[source]
Send msg to target via only one of the pooled connections :type request: Request :param request: request to send :type deployment:
str
:param deployment: name of the Jina deployment to send the message to :type head:bool
:param head: If True it is send to the head, otherwise to the worker pods :type shard_id:Optional
[int
] :param shard_id: Send to a specific shard of the deployment, ignored for polling ALL :rtype:Task
:return: asyncio.Task representing the send callsend_requests_once(requests, deployment, head=False, shard_id=None, endpoint=None)[source]
Send a request to target via only one of the pooled connections
Parameters
requests (
List
[Request]) – request to senddeployment (
str
) – name of the Jina deployment to send the request tohead (
bool
) – If True it is send to the head, otherwise to the worker podsshard_id (
Optional
[int
]) – Send to a specific shard of the deployment, ignored for polling ALLendpoint (
Optional
[str
]) – endpoint to target with the requests
Return type
Task
Returns
asyncio.Task representing the send call
add_connection(deployment, address, head=False, shard_id=None)[source]
Adds a connection for a deployment to this connection pool
Parameters
deployment (
str
) – The deployment the connection belongs to, like ‘encoder’head (
Optional
[bool
]) – True if the connection is for a headaddress (
str
) – Address used for the grpc connection, format is <host>:<port>shard_id (
Optional
[int
]) – Optional parameter to indicate this connection belongs to a shard, ignored for heads
async remove_connection(deployment, address, head=False, shard_id=None)[source]
Removes a connection to a deployment
Parameters
deployment (
str
) – The deployment the connection belongs to, like ‘encoder’address (
str
) – Address used for the grpc connection, format is <host>:<port>head (
Optional
[bool
]) – True if the connection is for a headshard_id (
Optional
[int
]) – Optional parameter to indicate this connection belongs to a shard, ignored for heads
Returns
The removed connection, None if it did not exist
start()[source]
Starts the connection pool
async close()[source]
Closes the connection pool
static get_grpc_channel(address, options=None, asyncio=False, https=False, root_certificates=None)[source]
Creates a grpc channel to the given address
Parameters
address (
str
) – The address to connect to, format is <host>:<port>options (
Optional
[list
]) – A list of options to pass to the grpc channelasyncio (
Optional
[bool
]) – If True, use the asyncio implementation of the grpc channelhttps (
Optional
[bool
]) – If True, use https for the grpc channelroot_certificates (
Optional
[str
]) – The path to the root certificates for https, only used if https is True
Return type
Channel
Returns
A grpc channel or an asyncio channel
static activate_worker_sync(worker_host, worker_port, target_head, shard_id=None)[source]
Register a given worker to a head by sending an activate request
Parameters
worker_host (
str
) – the host address of the workerworker_port (
int
) – the port of the workertarget_head (
str
) – address of the head to send the activate request toshard_id (
Optional
[int
]) – id of the shard the worker belongs to
Return type
Returns
the response request
async static activate_worker(worker_host, worker_port, target_head, shard_id=None)[source]
Register a given worker to a head by sending an activate request
Parameters
worker_host (
str
) – the host address of the workerworker_port (
int
) – the port of the workertarget_head (
str
) – address of the head to send the activate request toshard_id (
Optional
[int
]) – id of the shard the worker belongs to
Return type
Returns
the response request
async static deactivate_worker(worker_host, worker_port, target_head, shard_id=None)[source]
Remove a given worker to a head by sending a deactivate request
Parameters
worker_host (
str
) – the host address of the workerworker_port (
int
) – the port of the workertarget_head (
str
) – address of the head to send the deactivate request toshard_id (
Optional
[int
]) – id of the shard the worker belongs to
Return type
Returns
the response request
static send_request_sync(request, target, timeout=100.0, https=False, root_certificates=None, endpoint=None)[source]
Sends a request synchronically to the target via grpc
Parameters
request (Request) – the request to send
target (
str
) – where to send the request to, like 127.0.0.1:8080timeout – timeout for the send
https – if True, use https for the grpc channel
root_certificates (
Optional
[str
]) – the path to the root certificates for https, only used if https is Trueendpoint (
Optional
[str
]) – endpoint to target with the request
Return type
Returns
the response request
static get_default_grpc_options()[source]
Returns a list of default options used for creating grpc channels. Documentation is here https://github.com/grpc/grpc/blob/master/include/grpc/impl/codegen/grpc_types.h :returns: list of tuples defining grpc parameters
async static send_request_async(request, target, timeout=1.0, https=False, root_certificates=None)[source]
Sends a request asynchronously to the target via grpc
Parameters
request (Request) – the request to send
target (
str
) – where to send the request to, like 127.0.0.1:8080timeout (
float
) – timeout for the sendhttps (
bool
) – if True, use https for the grpc channelroot_certificates (
Optional
[str
]) – the path to the root certificates for https, only u
Return type
Returns
the response request
static create_async_channel_stub(address, https=False, root_certificates=None)[source]
Creates an async GRPC Channel. This channel has to be closed eventually!
Parameters
address – the address to create the connection to, like 127.0.0.0.1:8080
https – if True, use https for the grpc channel
root_certificates (
Optional
[str
]) – the path to the root certificates for https, only u
Return type
Tuple
[JinaSingleDataRequestRPCStub, JinaDataRequestRPCStub, JinaControlRequestRPCStub,Channel
]Returns
DataRequest/ControlRequest stubs and an async grpc channel
class jina.serve.networking.K8sGrpcConnectionPool(namespace, client, logger=None)[source]
Bases: jina.serve.networking.GrpcConnectionPool
Manages grpc connections to replicas in a K8s deployment.
Parameters
namespace (
str
) – K8s namespace to operate inclient (kubernetes.client.CoreV1Api) – K8s client
logger (JinaLogger) – the logger to use
K8S_PORT_EXPOSE = 8080
K8S_PORT_IN = 8081
K8S_PORT_USES_BEFORE = 8082
K8S_PORT_USES_AFTER = 8083
start()[source]
Subscribe to the K8s API and watch for changes in Pods
run()[source]
Subscribes on MODIFIED events from list_namespaced_pod AK8s PI
async close()[source]
Closes the connection pool
jina.serve.networking.is_remote_local_connection(first, second)[source]
Decides, whether first
is remote host and second
is localhost
Parameters
first (
str
) – the ip or host name of the first runtimesecond (
str
) – the ip or host name of the second runtime
Returns
True, if first is remote and second is local
jina.serve.networking.create_connection_pool(k8s_connection_pool=False, k8s_namespace=None, logger=None)[source]
Creates the appropriate connection pool based on parameters :type k8s_namespace: Optional
[str
] :param k8s_namespace: k8s namespace the pool will live in, None if outside K8s :type k8s_connection_pool: bool
:param k8s_connection_pool: flag to indicate if K8sGrpcConnectionPool should be used, defaults to true in K8s :type logger: Optional
[JinaLogger] :param logger: the logger to use :rtype: GrpcConnectionPool :return: A connection pool object
jina.serve.networking.host_is_local(hostname)[source]
Check if hostname is point to localhost :param hostname: host to check :return: True if hostname means localhost, False otherwise