Streaming Communication

Streaming communication

Streaming communication is a new RPC data transfer mode offered by Dubbo3, suitable for the following scenarios:

  • Interfaces that need to send large amounts of data that cannot be placed in a single RPC request or response, requiring batch sending. However, traditional multiple RPC calls cannot resolve issues of order and performance, and if order is to be guaranteed, they must be sent serially.
  • Streaming scenarios where data needs to be processed in the order sent, and the data itself has no definite boundaries.
  • Push scenarios where multiple messages are sent and processed within the same call context.

There are three types of Streaming communication:

  • SERVER_STREAM (Server Stream)
  • CLIENT_STREAM (Client Stream)
  • BIDIRECTIONAL_STREAM (Bidirectional Stream)

1. Introduction

This document demonstrates how to use streaming communication in Dubbo-go. You can view the complete example source code here.

2. How to use Dubbo-go streaming communication

In the proto file, add stream before the parameters of the methods that require streaming communication and generate the corresponding files using proto-gen-triple.

  1. service GreetService {
  2. rpc Greet(GreetRequest) returns (GreetResponse) {}
  3. rpc GreetStream(stream GreetStreamRequest) returns (stream GreetStreamResponse) {}
  4. rpc GreetClientStream(stream GreetClientStreamRequest) returns (GreetClientStreamResponse) {}
  5. rpc GreetServerStream(GreetServerStreamRequest) returns (stream GreetServerStreamResponse) {}
  6. }

Write the server handler file.

Source file path: dubbo-go-sample/streaming/go-server/cmd/server.go

  1. type GreetTripleServer struct {
  2. }
  3. func (srv *GreetTripleServer) Greet(ctx context.Context, req *greet.GreetRequest) (*greet.GreetResponse, error) {
  4. resp := &greet.GreetResponse{Greeting: req.Name}
  5. return resp, nil
  6. }
  7. func (srv *GreetTripleServer) GreetStream(ctx context.Context, stream greet.GreetService_GreetStreamServer) error {
  8. for {
  9. req, err := stream.Recv()
  10. if err != nil {
  11. if triple.IsEnded(err) {
  12. break
  13. }
  14. return fmt.Errorf("triple BidiStream recv error: %s", err)
  15. }
  16. if err := stream.Send(&greet.GreetStreamResponse{Greeting: req.Name}); err != nil {
  17. return fmt.Errorf("triple BidiStream send error: %s", err)
  18. }
  19. }
  20. return nil
  21. }
  22. func (srv *GreetTripleServer) GreetClientStream(ctx context.Context, stream greet.GreetService_GreetClientStreamServer) (*greet.GreetClientStreamResponse, error) {
  23. var reqs []string
  24. for stream.Recv() {
  25. reqs = append(reqs, stream.Msg().Name)
  26. }
  27. if stream.Err() != nil && !triple.IsEnded(stream.Err()) {
  28. return nil, fmt.Errorf("triple ClientStream recv err: %s", stream.Err())
  29. }
  30. resp := &greet.GreetClientStreamResponse{
  31. Greeting: strings.Join(reqs, ","),
  32. }
  33. return resp, nil
  34. }
  35. func (srv *GreetTripleServer) GreetServerStream(ctx context.Context, req *greet.GreetServerStreamRequest, stream greet.GreetService_GreetServerStreamServer) error {
  36. for i := 0; i < 5; i++ {
  37. if err := stream.Send(&greet.GreetServerStreamResponse{Greeting: req.Name}); err != nil {
  38. return fmt.Errorf("triple ServerStream send err: %s", err)
  39. }
  40. }
  41. return nil
  42. }

Write the client file.

Source file path: dubbo-go-sample/streaming/go-client/cmd/client.go

  1. func main() {
  2. cli, err := client.NewClient(
  3. client.WithClientURL("tri://127.0.0.1:20000"),
  4. )
  5. if err != nil {
  6. panic(err)
  7. }
  8. svc, err := greet.NewGreetService(cli)
  9. if err != nil {
  10. panic(err)
  11. }
  12. TestClient(svc)
  13. }
  14. func TestClient(cli greet.GreetService) {
  15. if err := testUnary(cli); err != nil {
  16. logger.Error(err)
  17. }
  18. if err := testBidiStream(cli); err != nil {
  19. logger.Error(err)
  20. }
  21. if err := testClientStream(cli); err != nil {
  22. logger.Error(err)
  23. }
  24. if err := testServerStream(cli); err != nil {
  25. logger.Error(err)
  26. }
  27. }
  28. func testUnary(cli greet.GreetService) error {
  29. logger.Info("start to test TRIPLE unary call")
  30. resp, err := cli.Greet(context.Background(), &greet.GreetRequest{Name: "triple"})
  31. if err != nil {
  32. return err
  33. }
  34. logger.Infof("TRIPLE unary call resp: %s", resp.Greeting)
  35. return nil
  36. }
  37. func testBidiStream(cli greet.GreetService) error {
  38. logger.Info("start to test TRIPLE bidi stream")
  39. stream, err := cli.GreetStream(context.Background())
  40. if err != nil {
  41. return err
  42. }
  43. if sendErr := stream.Send(&greet.GreetStreamRequest{Name: "triple"}); sendErr != nil {
  44. return err
  45. }
  46. resp, err := stream.Recv()
  47. if err != nil {
  48. return err
  49. }
  50. logger.Infof("TRIPLE bidi stream resp: %s", resp.Greeting)
  51. if err := stream.CloseRequest(); err != nil {
  52. return err
  53. }
  54. if err := stream.CloseResponse(); err != nil {
  55. return err
  56. }
  57. return nil
  58. }
  59. func testClientStream(cli greet.GreetService) error {
  60. logger.Info("start to test TRIPLE client stream")
  61. stream, err := cli.GreetClientStream(context.Background())
  62. if err != nil {
  63. return err
  64. }
  65. for i := 0; i < 5; i++ {
  66. if sendErr := stream.Send(&greet.GreetClientStreamRequest{Name: "triple"}); sendErr != nil {
  67. return err
  68. }
  69. }
  70. resp, err := stream.CloseAndRecv()
  71. if err != nil {
  72. return err
  73. }
  74. logger.Infof("TRIPLE client stream resp: %s", resp.Greeting)
  75. return nil
  76. }
  77. func testServerStream(cli greet.GreetService) error {
  78. logger.Info("start to test TRIPLE server stream")
  79. stream, err := cli.GreetServerStream(context.Background(), &greet.GreetServerStreamRequest{Name: "triple"})
  80. if err != nil {
  81. return err
  82. }
  83. for stream.Recv() {
  84. logger.Infof("TRIPLE server stream resp: %s", stream.Msg().Greeting)
  85. }
  86. if stream.Err() != nil {
  87. return err
  88. }
  89. if err := stream.Close(); err != nil {
  90. return err
  91. }
  92. return nil
  93. }

3. Running Effect

Run the server and client, and you will see the requests return normally.

  1. [start to test TRIPLE unary call]
  2. TRIPLE unary call resp: [triple]
  3. [start to test TRIPLE bidi stream]
  4. TRIPLE bidi stream resp: [triple]
  5. [start to test TRIPLE client stream]
  6. TRIPLE client stream resp: [triple,triple,triple,triple,triple]
  7. [start to test TRIPLE server stream]
  8. TRIPLE server stream resp: [triple]
  9. TRIPLE server stream resp: [triple]
  10. TRIPLE server stream resp: [triple]
  11. TRIPLE server stream resp: [triple]
  12. TRIPLE server stream resp: [triple]

Feedback

Was this page helpful?

Yes No

Last modified September 30, 2024: Update & Translate Overview Docs (#3040) (d37ebceaea7)