4.3 gRPC Streaming, Client and Server

项目地址:https://github.com/EDDYCJY/go-grpc-example

前言

本章节将介绍 gRPC 的流式,分为三种类型:

  • Server-side streaming RPC:服务器端流式 RPC
  • Client-side streaming RPC:客户端流式 RPC
  • Bidirectional streaming RPC:双向流式 RPC

任何技术,因为有痛点,所以才有了存在的必要性。如果您想要了解 gRPC 的流式调用,请继续

image

gRPC Streaming 是基于 HTTP/2 的,后续章节再进行详细讲解

为什么不用 Simple RPC

流式为什么要存在呢,是 Simple RPC 有什么问题吗?通过模拟业务场景,可得知在使用 Simple RPC 时,有如下问题:

  • 数据包过大造成的瞬时压力
  • 接收数据包时,需要所有数据包都接受成功且正确后,才能够回调响应,进行业务处理(无法客户端边发送,服务端边处理)

为什么用 Streaming RPC

  • 大规模数据包
  • 实时场景

模拟场景

每天早上 6 点,都有一批百万级别的数据集要同从 A 同步到 B,在同步的时候,会做一系列操作(归档、数据分析、画像、日志等)。这一次性涉及的数据量确实大

在同步完成后,也有人马上会去查阅数据,为了新的一天筹备。也符合实时性。

两者相较下,这个场景下更适合使用 Streaming RPC

gRPC

在讲解具体的 gRPC 流式代码时,会着重在第一节讲解,因为三种模式其实是不同的组合。希望你能够注重理解,举一反三,其实都是一样的知识点 👍

目录结构

  1. $ tree go-grpc-example
  2. go-grpc-example
  3. ├── client
  4. ├── simple_client
  5. └── client.go
  6. └── stream_client
  7. └── client.go
  8. ├── proto
  9. ├── search.proto
  10. └── stream.proto
  11. └── server
  12. ├── simple_server
  13. └── server.go
  14. └── stream_server
  15. └── server.go

增加 stream_server、stream_client 存放服务端和客户端文件,proto/stream.proto 用于编写 IDL

IDL

在 proto 文件夹下的 stream.proto 文件中,写入如下内容:

  1. syntax = "proto3";
  2. package proto;
  3. service StreamService {
  4. rpc List(StreamRequest) returns (stream StreamResponse) {};
  5. rpc Record(stream StreamRequest) returns (StreamResponse) {};
  6. rpc Route(stream StreamRequest) returns (stream StreamResponse) {};
  7. }
  8. message StreamPoint {
  9. string name = 1;
  10. int32 value = 2;
  11. }
  12. message StreamRequest {
  13. StreamPoint pt = 1;
  14. }
  15. message StreamResponse {
  16. StreamPoint pt = 1;
  17. }

注意关键字 stream,声明其为一个流方法。这里共涉及三个方法,对应关系为

  • List:服务器端流式 RPC
  • Record:客户端流式 RPC
  • Route:双向流式 RPC

基础模板 + 空定义

Server

  1. package main
  2. import (
  3. "log"
  4. "net"
  5. "google.golang.org/grpc"
  6. pb "github.com/EDDYCJY/go-grpc-example/proto"
  7. )
  8. type StreamService struct{}
  9. const (
  10. PORT = "9002"
  11. )
  12. func main() {
  13. server := grpc.NewServer()
  14. pb.RegisterStreamServiceServer(server, &StreamService{})
  15. lis, err := net.Listen("tcp", ":"+PORT)
  16. if err != nil {
  17. log.Fatalf("net.Listen err: %v", err)
  18. }
  19. server.Serve(lis)
  20. }
  21. func (s *StreamService) List(r *pb.StreamRequest, stream pb.StreamService_ListServer) error {
  22. return nil
  23. }
  24. func (s *StreamService) Record(stream pb.StreamService_RecordServer) error {
  25. return nil
  26. }
  27. func (s *StreamService) Route(stream pb.StreamService_RouteServer) error {
  28. return nil
  29. }

写代码前,建议先将 gRPC Server 的基础模板和接口给空定义出来。若有不清楚可参见上一章节的知识点

Client

  1. package main
  2. import (
  3. "log"
  4. "google.golang.org/grpc"
  5. pb "github.com/EDDYCJY/go-grpc-example/proto"
  6. )
  7. const (
  8. PORT = "9002"
  9. )
  10. func main() {
  11. conn, err := grpc.Dial(":"+PORT, grpc.WithInsecure())
  12. if err != nil {
  13. log.Fatalf("grpc.Dial err: %v", err)
  14. }
  15. defer conn.Close()
  16. client := pb.NewStreamServiceClient(conn)
  17. err = printLists(client, &pb.StreamRequest{Pt: &pb.StreamPoint{Name: "gRPC Stream Client: List", Value: 2018}})
  18. if err != nil {
  19. log.Fatalf("printLists.err: %v", err)
  20. }
  21. err = printRecord(client, &pb.StreamRequest{Pt: &pb.StreamPoint{Name: "gRPC Stream Client: Record", Value: 2018}})
  22. if err != nil {
  23. log.Fatalf("printRecord.err: %v", err)
  24. }
  25. err = printRoute(client, &pb.StreamRequest{Pt: &pb.StreamPoint{Name: "gRPC Stream Client: Route", Value: 2018}})
  26. if err != nil {
  27. log.Fatalf("printRoute.err: %v", err)
  28. }
  29. }
  30. func printLists(client pb.StreamServiceClient, r *pb.StreamRequest) error {
  31. return nil
  32. }
  33. func printRecord(client pb.StreamServiceClient, r *pb.StreamRequest) error {
  34. return nil
  35. }
  36. func printRoute(client pb.StreamServiceClient, r *pb.StreamRequest) error {
  37. return nil
  38. }

一、Server-side streaming RPC:服务器端流式 RPC

服务器端流式 RPC,显然是单向流,并代指 Server 为 Stream 而 Client 为普通 RPC 请求

简单来讲就是客户端发起一次普通的 RPC 请求,服务端通过流式响应多次发送数据集,客户端 Recv 接收数据集。大致如图:

image

Server

  1. func (s *StreamService) List(r *pb.StreamRequest, stream pb.StreamService_ListServer) error {
  2. for n := 0; n <= 6; n++ {
  3. err := stream.Send(&pb.StreamResponse{
  4. Pt: &pb.StreamPoint{
  5. Name: r.Pt.Name,
  6. Value: r.Pt.Value + int32(n),
  7. },
  8. })
  9. if err != nil {
  10. return err
  11. }
  12. }
  13. return nil
  14. }

在 Server,主要留意 stream.Send 方法。它看上去能发送 N 次?有没有大小限制?

  1. type StreamService_ListServer interface {
  2. Send(*StreamResponse) error
  3. grpc.ServerStream
  4. }
  5. func (x *streamServiceListServer) Send(m *StreamResponse) error {
  6. return x.ServerStream.SendMsg(m)
  7. }

通过阅读源码,可得知是 protoc 在生成时,根据定义生成了各式各样符合标准的接口方法。最终再统一调度内部的 SendMsg 方法,该方法涉及以下过程:

  • 消息体(对象)序列化
  • 压缩序列化后的消息体
  • 对正在传输的消息体增加 5 个字节的 header
  • 判断压缩+序列化后的消息体总字节长度是否大于预设的 maxSendMessageSize(预设值为 math.MaxInt32),若超出则提示错误
  • 写入给流的数据集

Client

  1. func printLists(client pb.StreamServiceClient, r *pb.StreamRequest) error {
  2. stream, err := client.List(context.Background(), r)
  3. if err != nil {
  4. return err
  5. }
  6. for {
  7. resp, err := stream.Recv()
  8. if err == io.EOF {
  9. break
  10. }
  11. if err != nil {
  12. return err
  13. }
  14. log.Printf("resp: pj.name: %s, pt.value: %d", resp.Pt.Name, resp.Pt.Value)
  15. }
  16. return nil
  17. }

在 Client,主要留意 stream.Recv() 方法。什么情况下 io.EOF ?什么情况下存在错误信息呢?

  1. type StreamService_ListClient interface {
  2. Recv() (*StreamResponse, error)
  3. grpc.ClientStream
  4. }
  5. func (x *streamServiceListClient) Recv() (*StreamResponse, error) {
  6. m := new(StreamResponse)
  7. if err := x.ClientStream.RecvMsg(m); err != nil {
  8. return nil, err
  9. }
  10. return m, nil
  11. }

RecvMsg 会从流中读取完整的 gRPC 消息体,另外通过阅读源码可得知:

(1)RecvMsg 是阻塞等待的

(2)RecvMsg 当流成功/结束(调用了 Close)时,会返回 io.EOF

(3)RecvMsg 当流出现任何错误时,流会被中止,错误信息会包含 RPC 错误码。而在 RecvMsg 中可能出现如下错误:

  • io.EOF
  • io.ErrUnexpectedEOF
  • transport.ConnectionError
  • google.golang.org/grpc/codes

同时需要注意,默认的 MaxReceiveMessageSize 值为 1024 1024 4,建议不要超出

验证

运行 stream_server/server.go:

  1. $ go run server.go

运行 stream_client/client.go:

  1. $ go run client.go
  2. 2018/09/24 16:18:25 resp: pj.name: gRPC Stream Client: List, pt.value: 2018
  3. 2018/09/24 16:18:25 resp: pj.name: gRPC Stream Client: List, pt.value: 2019
  4. 2018/09/24 16:18:25 resp: pj.name: gRPC Stream Client: List, pt.value: 2020
  5. 2018/09/24 16:18:25 resp: pj.name: gRPC Stream Client: List, pt.value: 2021
  6. 2018/09/24 16:18:25 resp: pj.name: gRPC Stream Client: List, pt.value: 2022
  7. 2018/09/24 16:18:25 resp: pj.name: gRPC Stream Client: List, pt.value: 2023
  8. 2018/09/24 16:18:25 resp: pj.name: gRPC Stream Client: List, pt.value: 2024

二、Client-side streaming RPC:客户端流式 RPC

客户端流式 RPC,单向流,客户端通过流式发起多次 RPC 请求给服务端,服务端发起一次响应给客户端,大致如图:

image

Server

  1. func (s *StreamService) Record(stream pb.StreamService_RecordServer) error {
  2. for {
  3. r, err := stream.Recv()
  4. if err == io.EOF {
  5. return stream.SendAndClose(&pb.StreamResponse{Pt: &pb.StreamPoint{Name: "gRPC Stream Server: Record", Value: 1}})
  6. }
  7. if err != nil {
  8. return err
  9. }
  10. log.Printf("stream.Recv pt.name: %s, pt.value: %d", r.Pt.Name, r.Pt.Value)
  11. }
  12. return nil
  13. }

多了一个从未见过的方法 stream.SendAndClose,它是做什么用的呢?

在这段程序中,我们对每一个 Recv 都进行了处理,当发现 io.EOF (流关闭) 后,需要将最终的响应结果发送给客户端,同时关闭正在另外一侧等待的 Recv

Client

  1. func printRecord(client pb.StreamServiceClient, r *pb.StreamRequest) error {
  2. stream, err := client.Record(context.Background())
  3. if err != nil {
  4. return err
  5. }
  6. for n := 0; n < 6; n++ {
  7. err := stream.Send(r)
  8. if err != nil {
  9. return err
  10. }
  11. }
  12. resp, err := stream.CloseAndRecv()
  13. if err != nil {
  14. return err
  15. }
  16. log.Printf("resp: pj.name: %s, pt.value: %d", resp.Pt.Name, resp.Pt.Value)
  17. return nil
  18. }

stream.CloseAndRecvstream.SendAndClose 是配套使用的流方法,相信聪明的你已经秒懂它的作用了

验证

重启 stream_server/server.go,再次运行 stream_client/client.go:

stream_client:
  1. $ go run client.go
  2. 2018/09/24 16:23:03 resp: pj.name: gRPC Stream Server: Record, pt.value: 1
stream_server:
  1. $ go run server.go
  2. 2018/09/24 16:23:03 stream.Recv pt.name: gRPC Stream Client: Record, pt.value: 2018
  3. 2018/09/24 16:23:03 stream.Recv pt.name: gRPC Stream Client: Record, pt.value: 2018
  4. 2018/09/24 16:23:03 stream.Recv pt.name: gRPC Stream Client: Record, pt.value: 2018
  5. 2018/09/24 16:23:03 stream.Recv pt.name: gRPC Stream Client: Record, pt.value: 2018
  6. 2018/09/24 16:23:03 stream.Recv pt.name: gRPC Stream Client: Record, pt.value: 2018
  7. 2018/09/24 16:23:03 stream.Recv pt.name: gRPC Stream Client: Record, pt.value: 2018

三、Bidirectional streaming RPC:双向流式 RPC

双向流式 RPC,顾名思义是双向流。由客户端以流式的方式发起请求,服务端同样以流式的方式响应请求

首个请求一定是 Client 发起,但具体交互方式(谁先谁后、一次发多少、响应多少、什么时候关闭)根据程序编写的方式来确定(可以结合协程)

假设该双向流是按顺序发送的话,大致如图:

image

还是要强调,双向流变化很大,因程序编写的不同而不同。双向流图示无法适用不同的场景

Server

  1. func (s *StreamService) Route(stream pb.StreamService_RouteServer) error {
  2. n := 0
  3. for {
  4. err := stream.Send(&pb.StreamResponse{
  5. Pt: &pb.StreamPoint{
  6. Name: "gPRC Stream Client: Route",
  7. Value: int32(n),
  8. },
  9. })
  10. if err != nil {
  11. return err
  12. }
  13. r, err := stream.Recv()
  14. if err == io.EOF {
  15. return nil
  16. }
  17. if err != nil {
  18. return err
  19. }
  20. n++
  21. log.Printf("stream.Recv pt.name: %s, pt.value: %d", r.Pt.Name, r.Pt.Value)
  22. }
  23. return nil
  24. }

Client

  1. func printRoute(client pb.StreamServiceClient, r *pb.StreamRequest) error {
  2. stream, err := client.Route(context.Background())
  3. if err != nil {
  4. return err
  5. }
  6. for n := 0; n <= 6; n++ {
  7. err = stream.Send(r)
  8. if err != nil {
  9. return err
  10. }
  11. resp, err := stream.Recv()
  12. if err == io.EOF {
  13. break
  14. }
  15. if err != nil {
  16. return err
  17. }
  18. log.Printf("resp: pj.name: %s, pt.value: %d", resp.Pt.Name, resp.Pt.Value)
  19. }
  20. stream.CloseSend()
  21. return nil
  22. }

验证

重启 stream_server/server.go,再次运行 stream_client/client.go:

stream_server
  1. $ go run server.go
  2. 2018/09/24 16:29:43 stream.Recv pt.name: gRPC Stream Client: Route, pt.value: 2018
  3. 2018/09/24 16:29:43 stream.Recv pt.name: gRPC Stream Client: Route, pt.value: 2018
  4. 2018/09/24 16:29:43 stream.Recv pt.name: gRPC Stream Client: Route, pt.value: 2018
  5. 2018/09/24 16:29:43 stream.Recv pt.name: gRPC Stream Client: Route, pt.value: 2018
  6. 2018/09/24 16:29:43 stream.Recv pt.name: gRPC Stream Client: Route, pt.value: 2018
  7. 2018/09/24 16:29:43 stream.Recv pt.name: gRPC Stream Client: Route, pt.value: 2018
stream_client
  1. $ go run client.go
  2. 2018/09/24 16:29:43 resp: pj.name: gPRC Stream Client: Route, pt.value: 0
  3. 2018/09/24 16:29:43 resp: pj.name: gPRC Stream Client: Route, pt.value: 1
  4. 2018/09/24 16:29:43 resp: pj.name: gPRC Stream Client: Route, pt.value: 2
  5. 2018/09/24 16:29:43 resp: pj.name: gPRC Stream Client: Route, pt.value: 3
  6. 2018/09/24 16:29:43 resp: pj.name: gPRC Stream Client: Route, pt.value: 4
  7. 2018/09/24 16:29:43 resp: pj.name: gPRC Stream Client: Route, pt.value: 5
  8. 2018/09/24 16:29:43 resp: pj.name: gPRC Stream Client: Route, pt.value: 6

总结

在本文共介绍了三类流的交互方式,可以根据实际的业务场景去选择合适的方式。会事半功倍哦 🎑

参考

本系列示例代码