流式通信

流式通信 streaming

Streaming 流式通信是 Dubbo3 新提供的一种 RPC 数据传输模式,适用于以下场景:

  • 接口需要发送大量数据,这些数据无法被放在一个 RPC 的请求或响应中,需要分批发送,但应用层如果按照传统的多次 RPC 方式无法解决顺序和性能的问题,如果需要保证有序,则只能串行发送
  • 流式场景,数据需要按照发送顺序处理, 数据本身是没有确定边界的
  • 推送类场景,多个消息在同一个调用的上下文中被发送和处理

Streaming 流式通信类型分为以下三种:

  • SERVER_STREAM(服务端流)
  • CLIENT_STREAM(客户端流)
  • BIDIRECTIONAL_STREAM(双向流)

1.介绍

本文档演示如何在 Dubbo-go 中使用流式通信,可在此查看 完整示例源码地址

2.如何使用Dubbo-go流式通信

在proto文件中需要流式通信的方法的参数前面添加stream,使用proto-gen-triple生成相应文件

  1. service GreetService {
  2. rpc Greet(GreetRequest) returns (GreetResponse) {}
  3. rpc GreetStream(stream GreetStreamRequest) returns (stream GreetStreamResponse) {}
  4. rpc GreetClientStream(stream GreetClientStreamRequest) returns (GreetClientStreamResponse) {}
  5. rpc GreetServerStream(GreetServerStreamRequest) returns (stream GreetServerStreamResponse) {}
  6. }

编写服务端handler文件

源文件路径: dubbo-go-sample/streaming/go-server/cmd/server.go

  1. type GreetTripleServer struct {
  2. }
  3. func (srv *GreetTripleServer) Greet(ctx context.Context, req *greet.GreetRequest) (*greet.GreetResponse, error) {
  4. resp := &greet.GreetResponse{Greeting: req.Name}
  5. return resp, nil
  6. }
  7. func (srv *GreetTripleServer) GreetStream(ctx context.Context, stream greet.GreetService_GreetStreamServer) error {
  8. for {
  9. req, err := stream.Recv()
  10. if err != nil {
  11. if triple.IsEnded(err) {
  12. break
  13. }
  14. return fmt.Errorf("triple BidiStream recv error: %s", err)
  15. }
  16. if err := stream.Send(&greet.GreetStreamResponse{Greeting: req.Name}); err != nil {
  17. return fmt.Errorf("triple BidiStream send error: %s", err)
  18. }
  19. }
  20. return nil
  21. }
  22. func (srv *GreetTripleServer) GreetClientStream(ctx context.Context, stream greet.GreetService_GreetClientStreamServer) (*greet.GreetClientStreamResponse, error) {
  23. var reqs []string
  24. for stream.Recv() {
  25. reqs = append(reqs, stream.Msg().Name)
  26. }
  27. if stream.Err() != nil && !triple.IsEnded(stream.Err()) {
  28. return nil, fmt.Errorf("triple ClientStream recv err: %s", stream.Err())
  29. }
  30. resp := &greet.GreetClientStreamResponse{
  31. Greeting: strings.Join(reqs, ","),
  32. }
  33. return resp, nil
  34. }
  35. func (srv *GreetTripleServer) GreetServerStream(ctx context.Context, req *greet.GreetServerStreamRequest, stream greet.GreetService_GreetServerStreamServer) error {
  36. for i := 0; i < 5; i++ {
  37. if err := stream.Send(&greet.GreetServerStreamResponse{Greeting: req.Name}); err != nil {
  38. return fmt.Errorf("triple ServerStream send err: %s", err)
  39. }
  40. }
  41. return nil
  42. }

编写客户端client文件

源文件路径: dubbo-go-sample/streaming/go-client/cmd/client.go

  1. func main() {
  2. cli, err := client.NewClient(
  3. client.WithClientURL("tri://127.0.0.1:20000"),
  4. )
  5. if err != nil {
  6. panic(err)
  7. }
  8. svc, err := greet.NewGreetService(cli)
  9. if err != nil {
  10. panic(err)
  11. }
  12. TestClient(svc)
  13. }
  14. func TestClient(cli greet.GreetService) {
  15. if err := testUnary(cli); err != nil {
  16. logger.Error(err)
  17. }
  18. if err := testBidiStream(cli); err != nil {
  19. logger.Error(err)
  20. }
  21. if err := testClientStream(cli); err != nil {
  22. logger.Error(err)
  23. }
  24. if err := testServerStream(cli); err != nil {
  25. logger.Error(err)
  26. }
  27. }
  28. func testUnary(cli greet.GreetService) error {
  29. logger.Info("start to test TRIPLE unary call")
  30. resp, err := cli.Greet(context.Background(), &greet.GreetRequest{Name: "triple"})
  31. if err != nil {
  32. return err
  33. }
  34. logger.Infof("TRIPLE unary call resp: %s", resp.Greeting)
  35. return nil
  36. }
  37. func testBidiStream(cli greet.GreetService) error {
  38. logger.Info("start to test TRIPLE bidi stream")
  39. stream, err := cli.GreetStream(context.Background())
  40. if err != nil {
  41. return err
  42. }
  43. if sendErr := stream.Send(&greet.GreetStreamRequest{Name: "triple"}); sendErr != nil {
  44. return err
  45. }
  46. resp, err := stream.Recv()
  47. if err != nil {
  48. return err
  49. }
  50. logger.Infof("TRIPLE bidi stream resp: %s", resp.Greeting)
  51. if err := stream.CloseRequest(); err != nil {
  52. return err
  53. }
  54. if err := stream.CloseResponse(); err != nil {
  55. return err
  56. }
  57. return nil
  58. }
  59. func testClientStream(cli greet.GreetService) error {
  60. logger.Info("start to test TRIPLE client stream")
  61. stream, err := cli.GreetClientStream(context.Background())
  62. if err != nil {
  63. return err
  64. }
  65. for i := 0; i < 5; i++ {
  66. if sendErr := stream.Send(&greet.GreetClientStreamRequest{Name: "triple"}); sendErr != nil {
  67. return err
  68. }
  69. }
  70. resp, err := stream.CloseAndRecv()
  71. if err != nil {
  72. return err
  73. }
  74. logger.Infof("TRIPLE client stream resp: %s", resp.Greeting)
  75. return nil
  76. }
  77. func testServerStream(cli greet.GreetService) error {
  78. logger.Info("start to test TRIPLE server stream")
  79. stream, err := cli.GreetServerStream(context.Background(), &greet.GreetServerStreamRequest{Name: "triple"})
  80. if err != nil {
  81. return err
  82. }
  83. for stream.Recv() {
  84. logger.Infof("TRIPLE server stream resp: %s", stream.Msg().Greeting)
  85. }
  86. if stream.Err() != nil {
  87. return err
  88. }
  89. if err := stream.Close(); err != nil {
  90. return err
  91. }
  92. return nil
  93. }

3.运行效果

运行服务端和客户端,可以看到请求正常返回

  1. [start to test TRIPLE unary call]
  2. TRIPLE unary call resp: [triple]
  3. [start to test TRIPLE bidi stream]
  4. TRIPLE bidi stream resp: [triple]
  5. [start to test TRIPLE client stream]
  6. TRIPLE client stream resp: [triple,triple,triple,triple,triple]
  7. [start to test TRIPLE server stream]
  8. TRIPLE server stream resp: [triple]
  9. TRIPLE server stream resp: [triple]
  10. TRIPLE server stream resp: [triple]
  11. TRIPLE server stream resp: [triple]
  12. TRIPLE server stream resp: [triple]

最后修改 September 13, 2024: Refactor website structure (#2860) (1a4b998f54b)