Streaming Communication

Demonstrates basic usage of Streaming communication such as server streaming, bidirectional streaming, etc.

In the section Choosing RPC Communication Protocol, it is mentioned that Streaming is a new RPC data transmission model provided by Dubbo3, suitable for the following scenarios:

  • The interface needs to send a large amount of data that cannot be placed in a single RPC request or response and needs to be sent in batches. If the application layer cannot solve sequence and performance issues using traditional multiple RPC methods, it can only be sent serially to ensure order.
  • In streaming scenarios, data needs to be processed in the order it is sent, and the data itself does not have fixed boundaries.
  • In push scenarios, multiple messages are sent and processed within the same call context.

Streaming is divided into the following three types:

  • SERVER_STREAM
  • CLIENT_STREAM
  • BIDIRECTIONAL_STREAM

The following example demonstrates the basic usage of triple streaming communication, covering client streams, server streams, bidirectional streams, etc. The example uses the service development model of Protocol Buffers; developers using the Java interface model can refer to the corresponding instructions at the end of this article. You can view the complete code for this example.

Running the Example

First, download the example source code using the following command:

  1. git clone --depth=1 https://github.com/apache/dubbo-samples.git

Navigate to the example source directory:

  1. cd dubbo-samples/2-advanced/dubbo-samples-triple-streaming

Compile the project, generating code from IDL, which will call the protoc plugin provided by Dubbo to generate the corresponding service definition code:

  1. mvn clean compile

Start Server

Run the following command to start the server:

  1. $ mvn compile exec:java -Dexec.mainClass="org.apache.dubbo.samples.tri.streaming.TriStreamServer"

Start Client

Run the following command to start the client:

  1. $ mvn compile exec:java -Dexec.mainClass="org.apache.dubbo.samples.tri.streaming.TriStreamClient"

Source Code Interpretation

As mentioned in the section Using Protobuf (IDL) to Develop Triple Protocol Services, this example uses protobuf to define services, so the dependencies and configurations required by the example are basically the same; please refer to that section for complete details. Next, we will focus on the streaming communication part.

Protobuf Service Definition

  1. syntax = "proto3";
  2. option java_multiple_files = true;
  3. package org.apache.dubbo.samples.tri.streaming;
  4. message GreeterRequest {
  5. string name = 1;
  6. }
  7. message GreeterReply {
  8. string message = 1;
  9. }
  10. service Greeter{
  11. rpc biStream(stream GreeterRequest) returns (stream GreeterReply);
  12. rpc serverStream(GreeterRequest) returns (stream GreeterReply);
  13. }

In the above proto file, we define two methods:

  • biStream(stream GreeterRequest) returns (stream GreeterReply) bidirectional stream
  • serverStream(GreeterRequest) returns (stream GreeterReply) server stream

Generate Code

Next, we need to generate Dubbo client and server interfaces from the .proto service definition. The protoc dubbo plugin can help us generate the required code, and when using Gradle or Maven, the protoc build plugin can generate necessary code as part of the build. Specific Maven configurations and code generation steps are described in the previous section.

In the target/generated-sources/protobuf/java/org/apache/dubbo/samples/tri/streaming/ directory, you can find the following generated code, where we will focus on DubboGreeterTriple.java:

  1. ├── DubboGreeterTriple.java
  2. ├── Greeter.java
  3. ├── GreeterOuterClass.java
  4. ├── GreeterReply.java
  5. ├── GreeterReplyOrBuilder.java
  6. ├── GreeterRequest.java
  7. └── GreeterRequestOrBuilder.java

Server

First, let’s look at how to define the service implementation and start the provider:

  1. Implement the service base class defined during the IDL code generation process, providing custom business logic.
  2. Run the Dubbo service to listen for requests from clients and return service responses.

Provide Service Implementation GreeterImplBase

Define class GreeterImpl implementing DubboGreeterTriple.GreeterImplBase.

  1. public class GreeterImpl extends DubboGreeterTriple.GreeterImplBase {
  2. // ...
  3. }
Server Stream

GreeterImpl implements all methods defined in the rpc. Next, we look at the specific definition of the server-side streaming.

Unlike ordinary method definitions, the serverStream method has two parameters; the first parameter request is the input parameter, and the second parameter responseObserver is the response value, which has a parameter type of StreamObserver<GreeterReply>. In the method implementation, we continuously call responseObserver.onNext(...) to send the result back to the consumer and finally call onCompleted() to indicate the end of the stream response.

  1. @Override
  2. public void serverStream(GreeterRequest request, StreamObserver<GreeterReply> responseObserver) {
  3. LOGGER.info("receive request: {}", request.getName());
  4. for (int i = 0; i < 10; i++) {
  5. GreeterReply reply = GreeterReply.newBuilder().setMessage("reply from serverStream. " + i).build();
  6. responseObserver.onNext(reply);
  7. }
  8. responseObserver.onCompleted();
  9. }
Bidirectional Stream

The parameters and return values of the bidirectional stream method biStream are both of type StreamObserver<...>. However, it should be noted that it is reversed from our traditional understanding of method definitions, where the parameter StreamObserver<GreeterReply> responseObserver is the response, and we continuously write back responses through responseObserver.

Note that the request stream and response stream are independent; during the process of writing back response stream data, a request stream may arrive at any time, and values are ordered for each stream.

  1. @Override
  2. public StreamObserver<GreeterRequest> biStream(StreamObserver<GreeterReply> responseObserver) {
  3. return new StreamObserver<GreeterRequest>() {
  4. @Override
  5. public void onNext(GreeterRequest data) {
  6. GreeterReply resp = GreeterReply.newBuilder().setMessage("reply from biStream " + data.getName()).build();
  7. responseObserver.onNext(resp);
  8. }
  9. @Override
  10. public void onError(Throwable throwable) {
  11. }
  12. @Override
  13. public void onCompleted() {
  14. }
  15. };
  16. }

Start Server

The process of starting a Dubbo service is entirely consistent with ordinary applications:

  1. public static void main(String[] args) throws IOException {
  2. ServiceConfig<Greeter> service = new ServiceConfig<>();
  3. service.setInterface(Greeter.class);
  4. service.setRef(new GreeterImpl("tri-stub"));
  5. ApplicationConfig applicationConfig = new ApplicationConfig("tri-stub-server");
  6. applicationConfig.setQosEnable(false);
  7. DubboBootstrap bootstrap = DubboBootstrap.getInstance();
  8. bootstrap.application(applicationConfig)
  9. .registry(new RegistryConfig(TriSampleConstants.ZK_ADDRESS))
  10. .protocol(new ProtocolConfig(CommonConstants.TRIPLE, TriSampleConstants.SERVER_PORT))
  11. .service(service)
  12. .start();
  13. }

Client

As with ordinary Dubbo service calls, we first need to declare the rpc service reference:

  1. public static void main(String[] args) throws IOException {
  2. ReferenceConfig<Greeter> ref = new ReferenceConfig<>();
  3. ref.setInterface(Greeter.class);
  4. ref.setProtocol(CommonConstants.TRIPLE);
  5. DubboBootstrap.getInstance().reference(ref).start();
  6. Greeter greeter = ref.get();
  7. }

Next, we can use greeter to initiate calls as if calling local methods.

Server Stream

Call serverStream() passing a SampleStreamObserver object that can handle streaming responses. The call returns quickly after initiation, and thereafter streaming responses will continuously send to SampleStreamObserver.

  1. GreeterRequest request = GreeterRequest.newBuilder().setName("server stream request.").build();
  2. greeter.serverStream(request, new SampleStreamObserver());

Below is the specific definition of the SampleStreamObserver class, including its specific handling logic after receiving the response.

  1. private static class SampleStreamObserver implements StreamObserver<GreeterReply> {
  2. @Override
  3. public void onNext(GreeterReply data) {
  4. LOGGER.info("stream <- reply:{}", data);
  5. }
  6. // ......
  7. }

Bidirectional Stream

Calling the greeter.biStream() method will immediately return a requestStreamObserver, while you need to pass an observer object that can process the response new SampleStreamObserver() to the method.

Next, we can continue to send requests using the requestStreamObserver obtained from the return value by calling requestStreamObserver.onNext(request); at this point, if there is a response returned, it will be processed by SampleStreamObserver, whose definition can be referred to above.

  1. StreamObserver<GreeterRequest> requestStreamObserver = greeter.biStream(new SampleStreamObserver());
  2. for (int i = 0; i < 10; i++) {
  3. GreeterRequest request = GreeterRequest.newBuilder().setName("name-" + i).build();
  4. requestStreamObserver.onNext(request);
  5. }
  6. requestStreamObserver.onCompleted();

Others

Streaming Communication in Java Interface Mode

For users not using Protobuf, you can define streaming format methods directly in your interface to use streaming communication.

Interface Definition

  1. public interface WrapperGreeter {
  2. // Bidirectional stream
  3. StreamObserver<String> sayHelloStream(StreamObserver<String> response);
  4. // Server stream
  5. void sayHelloServerStream(String request, StreamObserver<String> response);
  6. }

Among them, org.apache.dubbo.common.stream.StreamObserver is the parameter type for streaming communication provided by the Dubbo framework, and must be defined as demonstrated above.

The method parameters and return values of streaming methods are strictly specified. To prevent issues from incorrect writing, the Dubbo3 framework performs parameter checks, throwing exceptions if errors occur. For BIDIRECTIONAL_STREAM, note that the StreamObserver in the parameters is the response stream, while the StreamObserver in the return parameter is the request stream.

Implementation Class

  1. public class WrapGreeterImpl implements WrapGreeter {
  2. //...
  3. @Override
  4. public StreamObserver<String> sayHelloStream(StreamObserver<String> response) {
  5. return new StreamObserver<String>() {
  6. @Override
  7. public void onNext(String data) {
  8. System.out.println(data);
  9. response.onNext("hello,"+data);
  10. }
  11. @Override
  12. public void onError(Throwable throwable) {
  13. throwable.printStackTrace();
  14. }
  15. @Override
  16. public void onCompleted() {
  17. System.out.println("onCompleted");
  18. response.onCompleted();
  19. }
  20. };
  21. }
  22. @Override
  23. public void sayHelloServerStream(String request, StreamObserver<String> response) {
  24. for (int i = 0; i < 10; i++) {
  25. response.onNext("hello," + request);
  26. }
  27. response.onCompleted();
  28. }
  29. }

Calling Method

  1. delegate.sayHelloServerStream("server stream", new StreamObserver<String>() {
  2. @Override
  3. public void onNext(String data) {
  4. System.out.println(data);
  5. }
  6. @Override
  7. public void onError(Throwable throwable) {
  8. throwable.printStackTrace();
  9. }
  10. @Override
  11. public void onCompleted() {
  12. System.out.println("onCompleted");
  13. }
  14. });
  15. StreamObserver<String> request = delegate.sayHelloStream(new StreamObserver<String>() {
  16. @Override
  17. public void onNext(String data) {
  18. System.out.println(data);
  19. }
  20. @Override
  21. public void onError(Throwable throwable) {
  22. throwable.printStackTrace();
  23. }
  24. @Override
  25. public void onCompleted() {
  26. System.out.println("onCompleted");
  27. }
  28. });
  29. for (int i = 0; i < n; i++) {
  30. request.onNext("stream request" + i);
  31. }
  32. request.onCompleted();

Feedback

Was this page helpful?

Yes No

Last modified September 30, 2024: Update & Translate Overview Docs (#3040) (d37ebceaea7)