Streaming 通信

演示了服务端流、双向流等 Streaming 流式通信的基本使用方法。

选择 RPC 通信协议 一节提到,Streaming 是 Dubbo3 新提供的一种 RPC 数据传输模式,适用于以下场景:

  • 接口需要发送大量数据,这些数据无法被放在一个 RPC 的请求或响应中,需要分批发送,但应用层如果按照传统的多次 RPC 方式无法解决顺序和性能的问题,如果需要保证有序,则只能串行发送
  • 流式场景,数据需要按照发送顺序处理, 数据本身是没有确定边界的
  • 推送类场景,多个消息在同一个调用的上下文中被发送和处理

Streaming 分为以下三种:

  • SERVER_STREAM(服务端流)
  • CLIENT_STREAM(客户端流)
  • BIDIRECTIONAL_STREAM(双向流)

以下示例演示 triple streaming 流式通信的基本使用方法,涵盖了客户端流、服务端流、双向流等三种模式,示例使用 Protocol Buffers 的服务开发模式,对于 Java 接口模式的开发者可以在本文最后查看相应说明。可在此查看 本示例完整代码

运行示例

首先,可通过以下命令下载示例源码:

  1. git clone --depth=1 https://github.com/apache/dubbo-samples.git

进入示例源码目录:

  1. cd dubbo-samples/2-advanced/dubbo-samples-triple-streaming

编译项目,由 IDL 生成代码,这会调用 dubbo 提供的 protoc 插件生成对应的服务定义代码:

  1. mvn clean compile

启动server

运行以下命令,启动 server:

  1. $ mvn compile exec:java -Dexec.mainClass="org.apache.dubbo.samples.tri.streaming.TriStreamServer"

启动client

运行以下命令,启动 client:

  1. $ mvn compile exec:java -Dexec.mainClass="org.apache.dubbo.samples.tri.streaming.TriStreamClient"

源码解读

使用 Protobuf(IDL) 开发 triple 协议服务 一节中提到的一样,这个示例使用 protobuf 定义服务,因此示例需要的依赖、配置等基本是一致的,请参考那一节了解完整详情。接下来,我们将重点讲解流式通信部分的内容。

protobuf服务定义

  1. syntax = "proto3";
  2. option java_multiple_files = true;
  3. package org.apache.dubbo.samples.tri.streaming;
  4. message GreeterRequest {
  5. string name = 1;
  6. }
  7. message GreeterReply {
  8. string message = 1;
  9. }
  10. service Greeter{
  11. rpc biStream(stream GreeterRequest) returns (stream GreeterReply);
  12. rpc serverStream(GreeterRequest) returns (stream GreeterReply);
  13. }

在上面的 proto 文件中,我们定义了两个方法:

  • biStream(stream GreeterRequest) returns (stream GreeterReply) 双向流
  • serverStream(GreeterRequest) returns (stream GreeterReply) 服务端流

生成代码

接下来,我们需要从 .proto 服务定义生成 Dubbo 客户端和服务器接口。protoc dubbo 插件可以帮助我们生成需要的代码,在使用 Gradle 或 Maven 时,protoc 构建插件可以生成必要的代码作为构建的一部分。具体 maven 配置及代码生成步骤我们在 上一节 中有具体的描述。

target/generated-sources/protobuf/java/org/apache/dubbo/samples/tri/streaming/ 目录中可以发现如下生成代码,其中我们将重点讲解 DubboGreeterTriple.java

  1. ├── DubboGreeterTriple.java
  2. ├── Greeter.java
  3. ├── GreeterOuterClass.java
  4. ├── GreeterReply.java
  5. ├── GreeterReplyOrBuilder.java
  6. ├── GreeterRequest.java
  7. └── GreeterRequestOrBuilder.java

Server

首先,让我们看一下如何定义服务实现并启动提供者:

  1. 实现 IDL 代码生成过程中定义的服务基类,提供自定义的业务逻辑。
  2. 运行 Dubbo 服务以侦听来自客户端的请求并返回服务响应。

提供服务实现 GreeterImplBase

定义类 GreeterImpl 实现 DubboGreeterTriple.GreeterImplBase

  1. public class GreeterImpl extends DubboGreeterTriple.GreeterImplBase {
  2. // ...
  3. }
服务端流

GreeterImpl 实现了所有 rpc 定义中的方法。接下里,我们看一下 server-side streaming 的具体定义。

不同于普通的方法定义,serverStream 方法有两个参数,第一个参数 request 是入参,第二个参数 responseObserver 为响应值,其参数类型是 StreamObserver<GreeterReply>。在方法实现中,我们不停的调用 responseObserver.onNext(...) 将结果发送回消费方,并在最后调用 onCompleted() 表示流式响应结束。

  1. @Override
  2. public void serverStream(GreeterRequest request, StreamObserver<GreeterReply> responseObserver) {
  3. LOGGER.info("receive request: {}", request.getName());
  4. for (int i = 0; i < 10; i++) {
  5. GreeterReply reply = GreeterReply.newBuilder().setMessage("reply from serverStream. " + i).build();
  6. responseObserver.onNext(reply);
  7. }
  8. responseObserver.onCompleted();
  9. }
双向流

双向流方法 biStream 的参数和返回值都是 StreamObserver<...> 类型。但需要注意的是,它与我们传统方法定义中参数是请求值、返回值是响应的理解是反过来的,在这里,参数 StreamObserver<GreeterReply> responseObserver 是响应,我们通过 responseObserver 不停的写回响应。

请注意这里请求流响应流是独立的,我们在写回响应流数据的过程中,随时可能有请求流到达,对于每个流而言,值都是有序的。

  1. @Override
  2. public StreamObserver<GreeterRequest> biStream(StreamObserver<GreeterReply> responseObserver) {
  3. return new StreamObserver<GreeterRequest>() {
  4. @Override
  5. public void onNext(GreeterRequest data) {
  6. GreeterReply resp = GreeterReply.newBuilder().setMessage("reply from biStream " + data.getName()).build();
  7. responseObserver.onNext(resp);
  8. }
  9. @Override
  10. public void onError(Throwable throwable) {
  11. }
  12. @Override
  13. public void onCompleted() {
  14. }
  15. };
  16. }

启动 server

启动 Dubbo 服务的过程与普通应用完全一致:

  1. public static void main(String[] args) throws IOException {
  2. ServiceConfig<Greeter> service = new ServiceConfig<>();
  3. service.setInterface(Greeter.class);
  4. service.setRef(new GreeterImpl("tri-stub"));
  5. ApplicationConfig applicationConfig = new ApplicationConfig("tri-stub-server");
  6. applicationConfig.setQosEnable(false);
  7. DubboBootstrap bootstrap = DubboBootstrap.getInstance();
  8. bootstrap.application(applicationConfig)
  9. .registry(new RegistryConfig(TriSampleConstants.ZK_ADDRESS))
  10. .protocol(new ProtocolConfig(CommonConstants.TRIPLE, TriSampleConstants.SERVER_PORT))
  11. .service(service)
  12. .start();
  13. }

Client

和普通的 Dubbo 服务调用一样,我们首先需要声明 rpc 服务引用:

  1. public static void main(String[] args) throws IOException {
  2. ReferenceConfig<Greeter> ref = new ReferenceConfig<>();
  3. ref.setInterface(Greeter.class);
  4. ref.setProtocol(CommonConstants.TRIPLE);
  5. DubboBootstrap.getInstance().reference(ref).start();
  6. Greeter greeter = ref.get();
  7. }

接下来,我们就可以利用 greeter 像调用本地方法一样发起调用了。

服务端流

调用 serverStream() 传入能够处理流式响应的 SampleStreamObserver 对象,调用发起后即快速返回,之后流式响应会不停的发送到 SampleStreamObserver

  1. GreeterRequest request = GreeterRequest.newBuilder().setName("server stream request.").build();
  2. greeter.serverStream(request, new SampleStreamObserver());

以下是 SampleStreamObserver 类的具体定义,包含其收到响应后的具体处理逻辑。

  1. private static class SampleStreamObserver implements StreamObserver<GreeterReply> {
  2. @Override
  3. public void onNext(GreeterReply data) {
  4. LOGGER.info("stream <- reply:{}", data);
  5. }
  6. // ......
  7. }

双向流

调用 greeter.biStream() 方法会立即返回一个 requestStreamObserver,同时,需要为方法传入一个能处理响应的 observer 对象 new SampleStreamObserver()

接下来,我们就可以用才刚才返回值中得到的 requestStreamObserver 持续发送请求 requestStreamObserver.onNext(request);此时,如果有响应返回,则会由 SampleStreamObserver 接收处理,其定义请参考上文。

  1. StreamObserver<GreeterRequest> requestStreamObserver = greeter.biStream(new SampleStreamObserver());
  2. for (int i = 0; i < 10; i++) {
  3. GreeterRequest request = GreeterRequest.newBuilder().setName("name-" + i).build();
  4. requestStreamObserver.onNext(request);
  5. }
  6. requestStreamObserver.onCompleted();

其他

Java接口模式下的流式通信

对于不使用 Protobuf 的用户而言,你可以直接在接口中定义 streaming 格式的方法,这样你就能使用流式通信了。

接口定义

  1. public interface WrapperGreeter {
  2. // 双向流
  3. StreamObserver<String> sayHelloStream(StreamObserver<String> response);
  4. // 服务端流
  5. void sayHelloServerStream(String request, StreamObserver<String> response);
  6. }

其中,org.apache.dubbo.common.stream.StreamObserver 是 Dubbo 框架提供的流式通信参数类型,请务必按照以上示例所示的方式定义

Stream 方法的方法入参和返回值是严格约定的,为防止写错而导致问题,Dubbo3 框架侧做了对参数的检查, 如果出错则会抛出异常。 对于 双向流(BIDIRECTIONAL_STREAM), 需要注意参数中的 StreamObserver 是响应流,返回参数中的 StreamObserver 为请求流。

实现类

  1. public class WrapGreeterImpl implements WrapGreeter {
  2. //...
  3. @Override
  4. public StreamObserver<String> sayHelloStream(StreamObserver<String> response) {
  5. return new StreamObserver<String>() {
  6. @Override
  7. public void onNext(String data) {
  8. System.out.println(data);
  9. response.onNext("hello,"+data);
  10. }
  11. @Override
  12. public void onError(Throwable throwable) {
  13. throwable.printStackTrace();
  14. }
  15. @Override
  16. public void onCompleted() {
  17. System.out.println("onCompleted");
  18. response.onCompleted();
  19. }
  20. };
  21. }
  22. @Override
  23. public void sayHelloServerStream(String request, StreamObserver<String> response) {
  24. for (int i = 0; i < 10; i++) {
  25. response.onNext("hello," + request);
  26. }
  27. response.onCompleted();
  28. }
  29. }

调用方式

  1. delegate.sayHelloServerStream("server stream", new StreamObserver<String>() {
  2. @Override
  3. public void onNext(String data) {
  4. System.out.println(data);
  5. }
  6. @Override
  7. public void onError(Throwable throwable) {
  8. throwable.printStackTrace();
  9. }
  10. @Override
  11. public void onCompleted() {
  12. System.out.println("onCompleted");
  13. }
  14. });
  15. StreamObserver<String> request = delegate.sayHelloStream(new StreamObserver<String>() {
  16. @Override
  17. public void onNext(String data) {
  18. System.out.println(data);
  19. }
  20. @Override
  21. public void onError(Throwable throwable) {
  22. throwable.printStackTrace();
  23. }
  24. @Override
  25. public void onCompleted() {
  26. System.out.println("onCompleted");
  27. }
  28. });
  29. for (int i = 0; i < n; i++) {
  30. request.onNext("stream request" + i);
  31. }
  32. request.onCompleted();

最后修改 September 13, 2024: Refactor website structure (#2860) (1a4b998f54b)