Streaming 通信模型

在 Dubbo Python 使用 Client streaming、Server streaming、Bidirectional streaming 模型的服务。

在此查看完整示例

Dubbo-Python 支持流式调用,包括 ClientStreamServerStreamBidirectionalStream 三种模式。

在流式调用中,操作可以分为写入流和读取流两部分。对于 ClientStream,是多次写入、单次读取;对于 ServerStream,是单次写入、多次读取;而 BidirectionalStream 支持多次写入和多次读取。

写入流

流式调用的写入操作分为单次写入(ServerStream)和多次写入(ClientStreamBidirectionalStream)。

单次写入

单次写入流的调用方式与 unary 模式类似。例如:

  1. stub.server_stream(greeter_pb2.GreeterRequest(name="hello world from dubbo-python"))

多次写入

对于多次写入流,用户可以通过迭代器或 writeStream 方式写入数据(两者只能选其一)。

  1. 迭代器写入:写入方式类似于 unary 模式,唯一的区别是传入的是迭代器。例如:

    1. # Use an iterator to send multiple requests
    2. def request_generator():
    3. for i in ["hello", "world", "from", "dubbo-python"]:
    4. yield greeter_pb2.GreeterRequest(name=str(i))
    5. # Call the remote method and return a read_stream
    6. stream = stub.client_stream(request_generator())
  2. 使用 writeStream 写入:此方法不传入参数,使用空参调用,然后通过 write 方法逐条写入数据,写入完成后调用 done_writing 方法结束流。例如:

    1. stream = stub.bi_stream()
    2. # Use the write method to send messages
    3. stream.write(greeter_pb2.GreeterRequest(name="jock"))
    4. stream.write(greeter_pb2.GreeterRequest(name="jane"))
    5. stream.write(greeter_pb2.GreeterRequest(name="alice"))
    6. stream.write(greeter_pb2.GreeterRequest(name="dave"))
    7. # Call done_writing to notify the server that the client has finished writing
    8. stream.done_writing()

读取流

流式调用的读取操作分为单次读取(ClientStream)和多次读取(ServerStreamBidirectionalStream)。在流式调用中,无论是哪种模式,返回的都是一个 ReadStream。我们可以使用 read 方法或迭代器读取数据,针对 read 方法,需要注意以下几点:

  1. read 方法支持 timeout 参数,用于设置阻塞等待时间(单位:秒)。
  2. read 方法的返回结果可能为三种:所需信息(正常情况)、None(等待超时)、EOF(读取流结束)。

单次读取

调用 read 方法一次即可读取数据,例如:

  1. result = stream.read()
  2. print(f"Received response: {result.message}")

多次读取

可以通过多次调用 read 方法读取数据,但需要处理 NoneEOF 等非期望值。因为 ReadStream 实现了 __iter____next__ 等迭代方法,我们可以通过迭代调用进行多次读取,此方法无需处理非期望值,但不支持设置阻塞超时参数。

  1. 迭代调用(推荐):

    1. def client_stream(self, request_iterator):
    2. response = ""
    3. for request in request_iterator:
    4. print(f"Received request: {request.name}")
    5. response += f"{request.name} "
    6. return greeter_pb2.GreeterReply(message=response)
  2. 多次调用 read 方法

    1. # Use read method to receive messages
    2. # If no message arrives within the specified time, returns None
    3. # If the server has finished sending messages, returns EOF
    4. while True:
    5. i = stream.read(timeout=0.5)
    6. if i is dubbo.classes.EOF:
    7. break
    8. elif i is None:
    9. print("No message received")
    10. continue
    11. print(f"Received response: {i.message}")

最后修改 November 7, 2024: docs: add python sdk manual (#3056) (26c58b388ff)