Reactive Programming

Using the Reactive API for Triple Streaming Calls

Outdated Risk Reminder

Please note that the Reactive usage methods described in this document may be outdated. Always refer to the latest reactive examples in apache/dubbo-samples for usage.

Feature Description

This feature is based on the Triple protocol and implemented using Project Reactor, supported from version 3.1.0 onwards. Users only need to write an IDL file and specify the corresponding Generator for the protobuf plugin to generate and use Stub code that supports the reactive API.

There are four call modes: OneToOne, OneToMany, ManyToOne, and ManyToMany, corresponding to Unary calls, server streams, client streams, and bidirectional streams, respectively. In Reactor’s implementation, One corresponds to Mono, and Many corresponds to Flux.

Reactive Streams provide a standard asynchronous stream processing API that allows applications to write event-driven programs while ensuring stability through BackPressure. The Triple protocol adds support for streaming scenarios in the Dubbo framework at the communication protocol level, enabling business needs such as large file transfers and push mechanisms.

The combination of Dubbo + Reactive Stream Stub offers the most convenient streaming usage and improved end-to-end asynchronous performance.

Reference use case https://github.com/apache/dubbo-samples/tree/master/dubbo-samples-triple-reactor

Usage Scenarios

The system needs to handle large volumes of concurrent requests without overloading any server. Systems that provide real-time data to a large number of users need to ensure that they can handle the load without crashing or slowing down.

Usage Method

For Triple usage and configuration, please refer to Using Triple with IDL, and ensure Dubbo version >= 3.1.0.

Adding Necessary Dependencies

To use Reactor Triple, you need to add the following dependencies.

  1. <dependency>
  2. <groupId>org.reactivestreams</groupId>
  3. <artifactId>reactive-streams</artifactId>
  4. </dependency>
  5. <dependency>
  6. <groupId>io.projectreactor</groupId>
  7. <artifactId>reactor-core</artifactId>
  8. </dependency>

Configuring Protobuf Maven Plugin

Simply change mainClass to org.apache.dubbo.gen.tri.reactive.ReactorDubbo3TripleGenerator, and ensure ${compiler.version} >= 3.1.0

  1. <build>
  2. <plugins>
  3. <plugin>
  4. <groupId>org.xolstice.maven.plugins</groupId>
  5. <artifactId>protobuf-maven-plugin</artifactId>
  6. <version>0.6.1</version>
  7. <configuration>
  8. <protocArtifact>com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier}
  9. </protocArtifact>
  10. <pluginId>grpc-java</pluginId>
  11. <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}
  12. </pluginArtifact>
  13. <protocPlugins>
  14. <protocPlugin>
  15. <id>dubbo</id>
  16. <groupId>org.apache.dubbo</groupId>
  17. <artifactId>dubbo-compiler</artifactId>
  18. <version>${compiler.version}</version>
  19. <mainClass>org.apache.dubbo.gen.tri.reactive.ReactorDubbo3TripleGenerator</mainClass>
  20. </protocPlugin>
  21. </protocPlugins>
  22. </configuration>
  23. <executions>
  24. <execution>
  25. <goals>
  26. <goal>compile</goal>
  27. </goals>
  28. </execution>
  29. </executions>
  30. </plugin>
  31. </plugins>
  32. </build>

Writing and Compiling IDL Files

The IDL file is written in exactly the same way as the native Triple protocol, and the corresponding code will be generated by default in the target/generated-sources/protobuf/java directory after compilation.

  1. syntax = "proto3";
  2. option java_multiple_files = true;
  3. package org.apache.dubbo.samples.triple.reactor;
  4. // The request message containing the user's name.
  5. message GreeterRequest {
  6. string name = 1;
  7. }
  8. // The response message containing the greetings
  9. message GreeterReply {
  10. string message = 1;
  11. }
  12. service GreeterService {
  13. rpc greetOneToOne(GreeterRequest) returns (GreeterReply);
  14. rpc greetOneToMany(GreeterRequest) returns (stream GreeterReply);
  15. rpc greetManyToOne(stream GreeterRequest) returns (GreeterReply);
  16. rpc greetManyToMany(stream GreeterRequest) returns (stream GreeterReply);
  17. }

Usage Example

  1. Implement Server Interface
  1. package org.apache.dubbo.samples.triple.reactor.impl;
  2. import org.apache.dubbo.samples.triple.reactor.DubboGreeterServiceTriple;
  3. import org.apache.dubbo.samples.triple.reactor.GreeterReply;
  4. import org.apache.dubbo.samples.triple.reactor.GreeterRequest;
  5. import org.slf4j.Logger;
  6. import org.slf4j.LoggerFactory;
  7. import reactor.core.publisher.Flux;
  8. public class GreeterServiceImpl extends DubboGreeterServiceTriple.GreeterServiceImplBase {
  9. private static final Logger LOGGER = LoggerFactory.getLogger(GreeterServiceImpl.class);
  10. @Override
  11. public Flux<GreeterReply> greetManyToMany(Flux<GreeterRequest> request) {
  12. return request.doOnNext(req -> LOGGER.info("greetManyToMany get data: {}", req))
  13. .map(req -> GreeterReply.newBuilder().setMessage(req.getName() + " -> server get").build())
  14. .doOnNext(res -> LOGGER.info("greetManyToMany response data: {}", res));
  15. }
  16. }
  1. Add Server Interface Startup Class
  1. package org.apache.dubbo.samples.triple.reactor;
  2. import org.apache.dubbo.common.constants.CommonConstants;
  3. import org.apache.dubbo.config.ApplicationConfig;
  4. import org.apache.dubbo.config.ProtocolConfig;
  5. import org.apache.dubbo.config.RegistryConfig;
  6. import org.apache.dubbo.config.ServiceConfig;
  7. import org.apache.dubbo.config.bootstrap.DubboBootstrap;
  8. import org.apache.dubbo.samples.triple.reactor.impl.GreeterServiceImpl;
  9. public class ReactorServer {
  10. private static final int PORT = 50052;
  11. public static void main(String[] args) {
  12. ServiceConfig<GreeterService> reactorService = new ServiceConfig<>();
  13. reactorService.setInterface(GreeterService.class);
  14. reactorService.setRef(new GreeterServiceImpl());
  15. DubboBootstrap bootstrap = DubboBootstrap.getInstance();
  16. bootstrap.application(new ApplicationConfig("tri-reactor-stub-server"))
  17. .registry(new RegistryConfig("zookeeper://127.0.0.1:2181"))
  18. .protocol(new ProtocolConfig(CommonConstants.TRIPLE, PORT))
  19. .service(reactorService)
  20. .start();
  21. }
  22. }
  1. Add Client Startup Class and Consumer Program
  1. package org.apache.dubbo.samples.triple.reactor;
  2. import org.apache.dubbo.common.constants.CommonConstants;
  3. import org.apache.dubbo.config.ApplicationConfig;
  4. import org.apache.dubbo.config.ReferenceConfig;
  5. import org.apache.dubbo.config.RegistryConfig;
  6. import org.apache.dubbo.config.bootstrap.DubboBootstrap;
  7. import org.slf4j.Logger;
  8. import org.slf4j.LoggerFactory;
  9. import reactor.core.publisher.Flux;
  10. import reactor.core.publisher.Mono;
  11. import java.io.IOException;
  12. public class ReactorConsumer {
  13. private static final Logger LOGGER = LoggerFactory.getLogger(ReactorConsumer.class);
  14. private final GreeterService greeterService;
  15. public ReactorConsumer() {
  16. ReferenceConfig<GreeterService> referenceConfig = new ReferenceConfig<>();
  17. referenceConfig.setInterface(GreeterService.class);
  18. referenceConfig.setProtocol(CommonConstants.TRIPLE);
  19. referenceConfig.setProxy(CommonConstants.NATIVE_STUB);
  20. referenceConfig.setTimeout(10000);
  21. DubboBootstrap bootstrap = DubboBootstrap.getInstance();
  22. bootstrap.application(new ApplicationConfig("tri-reactor-stub-server"))
  23. .registry(new RegistryConfig("zookeeper://127.0.0.1:2181"))
  24. .reference(referenceConfig)
  25. .start();
  26. GreeterService greeterService = referenceConfig.get();
  27. }
  28. public static void main(String[] args) throws IOException {
  29. ReactorConsumer reactorConsumer = new ReactorConsumer();
  30. reactorConsumer.consumeManyToMany();
  31. System.in.read();
  32. }
  33. private void consumeManyToMany() {
  34. greeterService.greetManyToMany(Flux.range(1, 10)
  35. .map(num ->
  36. GreeterRequest.newBuilder().setName(String.valueOf(num)).build())
  37. .doOnNext(req -> LOGGER.info("consumeManyToMany request data: {}", req)))
  38. .subscribe(res -> LOGGER.info("consumeManyToMany get response: {}", res));
  39. }
  40. }
  1. Start Server

  2. Start Consumer

Feedback

Was this page helpful?

Yes No

Last modified September 30, 2024: Update & Translate Overview Docs (#3040) (d37ebceaea7)