响应式编程

使用 Reactive API 操作 Triple 流式调用

特性说明

此特性基于 Triple 协议和 Project Reactor 实现,3.1.0 版本以上支持。用户仅需编写 IDL 文件,并指定 protobuf 插件的相应 Generator,即可生成并使用支持响应式API的 Stub 代码。

有四种调用模式,分别是 OneToOne、OneToMany、ManyToOne、ManyToMany,分别对应 Unary调用、服务端流、客户端流、双向流。在 Reactor 的实现中,One 对应 Mono,Many 对应 Flux。

背景

Reactive Stream 提供了一套标准的异步流处理 API, 在能够让应用写出事件驱动的程序的同时,也通过 BackPressure 的方式保证了节点的稳定。Triple 协议在通信协议层面为 Dubbo 框架增加了流式场景的支持,在此基础上能够实现上层包括大文件传输和推送机制的业务需求。

Dubbo + Reactive Stream Stub 的组合模式可以给用户带来最方便的流式使用方式以及全链路异步性能提升。

参考用例

https://github.com/apache/dubbo-samples/tree/master/dubbo-samples-triple-reactor

使用场景

使用方式

Triple 使用及配置可参考 IDL 方式使用 Triple,并确保 Dubbo 版本 >= 3.1.0。

添加必要的依赖

若要使用 Reactor Triple,需要额外添加如下依赖。

  1. <dependency>
  2. <groupId>org.reactivestreams</groupId>
  3. <artifactId>reactive-streams</artifactId>
  4. </dependency>
  5. <dependency>
  6. <groupId>io.projectreactor</groupId>
  7. <artifactId>reactor-core</artifactId>
  8. </dependency>

设置 protobuf Maven 插件

仅需将 mainClass 修改为 org.apache.dubbo.gen.tri.reactive.ReactorDubbo3TripleGenerator,并确保 ${compiler.version} >= 3.1.0

  1. <build>
  2. <plugins>
  3. <plugin>
  4. <groupId>org.xolstice.maven.plugins</groupId>
  5. <artifactId>protobuf-maven-plugin</artifactId>
  6. <version>0.6.1</version>
  7. <configuration>
  8. <protocArtifact>com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier}
  9. </protocArtifact>
  10. <pluginId>grpc-java</pluginId>
  11. <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}
  12. </pluginArtifact>
  13. <protocPlugins>
  14. <protocPlugin>
  15. <id>dubbo</id>
  16. <groupId>org.apache.dubbo</groupId>
  17. <artifactId>dubbo-compiler</artifactId>
  18. <version>${compiler.version}</version>
  19. <mainClass>org.apache.dubbo.gen.tri.reactive.ReactorDubbo3TripleGenerator</mainClass>
  20. </protocPlugin>
  21. </protocPlugins>
  22. </configuration>
  23. <executions>
  24. <execution>
  25. <goals>
  26. <goal>compile</goal>
  27. </goals>
  28. </execution>
  29. </executions>
  30. </plugin>
  31. </plugins>
  32. </build>

编写并编译 IDL 文件

IDL 文件编写与原生的 Triple 协议完全一致,编译后默认会在 target/generated-sources/protobuf/java 目录下看到相应代码。

  1. syntax = "proto3";
  2. option java_multiple_files = true;
  3. package org.apache.dubbo.samples.triple.reactor;
  4. // The request message containing the user's name.
  5. message GreeterRequest {
  6. string name = 1;
  7. }
  8. // The response message containing the greetings
  9. message GreeterReply {
  10. string message = 1;
  11. }
  12. service GreeterService {
  13. rpc greetOneToOne(GreeterRequest) returns (GreeterReply);
  14. rpc greetOneToMany(GreeterRequest) returns (stream GreeterReply);
  15. rpc greetManyToOne(stream GreeterRequest) returns (GreeterReply);
  16. rpc greetManyToMany(stream GreeterRequest) returns (stream GreeterReply);
  17. }

使用

  1. 添加服务端接口实现
  1. package org.apache.dubbo.samples.triple.reactor.impl;
  2. import org.apache.dubbo.samples.triple.reactor.DubboGreeterServiceTriple;
  3. import org.apache.dubbo.samples.triple.reactor.GreeterReply;
  4. import org.apache.dubbo.samples.triple.reactor.GreeterRequest;
  5. import org.slf4j.Logger;
  6. import org.slf4j.LoggerFactory;
  7. import reactor.core.publisher.Flux;
  8. public class GreeterServiceImpl extends DubboGreeterServiceTriple.GreeterServiceImplBase {
  9. private static final Logger LOGGER = LoggerFactory.getLogger(GreeterServiceImpl.class);
  10. @Override
  11. public Flux<GreeterReply> greetManyToMany(Flux<GreeterRequest> request) {
  12. return request.doOnNext(req -> LOGGER.info("greetManyToMany get data: {}", req))
  13. .map(req -> GreeterReply.newBuilder().setMessage(req.getName() + " -> server get").build())
  14. .doOnNext(res -> LOGGER.info("greetManyToMany response data: {}", res));
  15. }
  16. }
  1. 添加服务端接口启动类
  1. package org.apache.dubbo.samples.triple.reactor;
  2. import org.apache.dubbo.common.constants.CommonConstants;
  3. import org.apache.dubbo.config.ApplicationConfig;
  4. import org.apache.dubbo.config.ProtocolConfig;
  5. import org.apache.dubbo.config.RegistryConfig;
  6. import org.apache.dubbo.config.ServiceConfig;
  7. import org.apache.dubbo.config.bootstrap.DubboBootstrap;
  8. import org.apache.dubbo.samples.triple.reactor.impl.GreeterServiceImpl;
  9. public class ReactorServer {
  10. private static final int PORT = 50052;
  11. public static void main(String[] args) {
  12. ServiceConfig<GreeterService> reactorService = new ServiceConfig<>();
  13. reactorService.setInterface(GreeterService.class);
  14. reactorService.setRef(new GreeterServiceImpl());
  15. DubboBootstrap bootstrap = DubboBootstrap.getInstance();
  16. bootstrap.application(new ApplicationConfig("tri-reactor-stub-server"))
  17. .registry(new RegistryConfig("zookeeper://127.0.0.1:2181"))
  18. .protocol(new ProtocolConfig(CommonConstants.TRIPLE, PORT))
  19. .service(reactorService)
  20. .start();
  21. }
  22. }
  1. 添加客户端启动类和消费程序
  1. package org.apache.dubbo.samples.triple.reactor;
  2. import org.apache.dubbo.common.constants.CommonConstants;
  3. import org.apache.dubbo.config.ApplicationConfig;
  4. import org.apache.dubbo.config.ReferenceConfig;
  5. import org.apache.dubbo.config.RegistryConfig;
  6. import org.apache.dubbo.config.bootstrap.DubboBootstrap;
  7. import org.slf4j.Logger;
  8. import org.slf4j.LoggerFactory;
  9. import reactor.core.publisher.Flux;
  10. import reactor.core.publisher.Mono;
  11. import java.io.IOException;
  12. public class ReactorConsumer {
  13. private static final Logger LOGGER = LoggerFactory.getLogger(ReactorConsumer.class);
  14. private final GreeterService greeterService;
  15. public ReactorConsumer() {
  16. ReferenceConfig<GreeterService> referenceConfig = new ReferenceConfig<>();
  17. referenceConfig.setInterface(GreeterService.class);
  18. referenceConfig.setProtocol(CommonConstants.TRIPLE);
  19. referenceConfig.setProxy(CommonConstants.NATIVE_STUB);
  20. referenceConfig.setTimeout(10000);
  21. DubboBootstrap bootstrap = DubboBootstrap.getInstance();
  22. bootstrap.application(new ApplicationConfig("tri-reactor-stub-server"))
  23. .registry(new RegistryConfig("zookeeper://127.0.0.1:2181"))
  24. .reference(referenceConfig)
  25. .start();
  26. GreeterService greeterService = referenceConfig.get();
  27. }
  28. public static void main(String[] args) throws IOException {
  29. ReactorConsumer reactorConsumer = new ReactorConsumer();
  30. reactorConsumer.consumeManyToMany();
  31. System.in.read();
  32. }
  33. private void consumeManyToMany() {
  34. greeterService.greetManyToMany(Flux.range(1, 10)
  35. .map(num ->
  36. GreeterRequest.newBuilder().setName(String.valueOf(num)).build())
  37. .doOnNext(req -> LOGGER.info("consumeManyToMany request data: {}", req)))
  38. .subscribe(res -> LOGGER.info("consumeManyToMany get response: {}", res));
  39. }
  40. }
  1. 启动服务端

  2. 启动消费者端

最后修改 December 16, 2022: Fix check (#1736) (97972c1)