grpc hello world client 解析

上一节我们介绍了 grpc 输出 hello world 过程中 server 监听和处理请求的过程。这一节中我们将介绍 client 发出请求的过程。

来先看代码:

  1. func main() {
  2. // Set up a connection to the server.
  3. conn, err := grpc.Dial(address, grpc.WithInsecure())
  4. if err != nil {
  5. log.Fatalf("did not connect: %v", err)
  6. }
  7. defer conn.Close()
  8. c := pb.NewGreeterClient(conn)
  9. // Contact the server and print out its response.
  10. name := defaultName
  11. if len(os.Args) > 1 {
  12. name = os.Args[1]
  13. }
  14. ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  15. defer cancel()
  16. r, err := c.SayHello(ctx, &pb.HelloRequest{Name: name})
  17. if err != nil {
  18. log.Fatalf("could not greet: %v", err)
  19. }
  20. log.Printf("Greeting: %s", r.Message)
  21. }

可以看到 client 的建立也可以大致分为 3 步:

1)创建一个客户端连接 conn

2)通过一个 conn 创建一个客户端

3)发起 rpc 调用

ok,那我们开始 step by step ,具体看看每一步做了啥

1)创建一个客户端连接 conn

  1. conn, err := grpc.Dial(address, grpc.WithInsecure())

通过 Dial 方法创建 conn,Dial 调用了 DialContext 方法

  1. func Dial(target string, opts ...DialOption) (*ClientConn, error) {
  2. return DialContext(context.Background(), target, opts...)
  3. }

跟进 DialContext,发现 DialContext 这个方法非常长,这里就不贴代码了,具体就是先实例化了一个 ClientConn 的结构体,然后主要为 ClientConn 的 dopts 的各个属性进行初始化赋值。

  1. cc := &ClientConn{
  2. target: target,
  3. csMgr: &connectivityStateManager{},
  4. conns: make(map[*addrConn]struct{}),
  5. dopts: defaultDialOptions(),
  6. blockingpicker: newPickerWrapper(),
  7. czData: new(channelzData),
  8. firstResolveEvent: grpcsync.NewEvent(),
  9. }

我们先来看看 ClientConn 的结构

  1. type ClientConn struct {
  2. ctx context.Context
  3. cancel context.CancelFunc
  4. target string
  5. parsedTarget resolver.Target
  6. authority string
  7. dopts dialOptions
  8. csMgr *connectivityStateManager
  9. balancerBuildOpts balancer.BuildOptions
  10. blockingpicker *pickerWrapper
  11. mu sync.RWMutex
  12. resolverWrapper *ccResolverWrapper
  13. sc *ServiceConfig
  14. conns map[*addrConn]struct{}
  15. // Keepalive parameter can be updated if a GoAway is received.
  16. mkp keepalive.ClientParameters
  17. curBalancerName string
  18. balancerWrapper *ccBalancerWrapper
  19. retryThrottler atomic.Value
  20. firstResolveEvent *grpcsync.Event
  21. channelzID int64 // channelz unique identification number
  22. czData *channelzData
  23. }

dialOptions 其实就是对客户端属性的一些设置,包括压缩解压缩、是否需要认证、超时时间、是否重试等信息。

这里我们来看一下初始化了哪些属性:

connectivityStateManager

  1. type connectivityStateManager struct {
  2. mu sync.Mutex
  3. state connectivity.State
  4. notifyChan chan struct{}
  5. channelzID int64
  6. }

连接的状态管理器,每个连接具有 “IDLE”、“CONNECTING”、“READY”、“TRANSIENT_FAILURE”、“SHUTDOW N”、“Invalid-State” 这几种状态。

pickerWrapper

  1. type pickerWrapper struct {
  2. mu sync.Mutex
  3. done bool
  4. blockingCh chan struct{}
  5. picker balancer.Picker
  6. // The latest connection happened.
  7. connErrMu sync.Mutex
  8. connErr error
  9. }

pickerWrapper 是对 balancer.Picker 的一层封装,balancer.Picker 其实是一个负载均衡器,它里面只有一个 Pick 方法,它返回一个 SubConn 连接。

  1. type Picker interface {
  2. Pick(ctx context.Context, opts PickOptions) (conn SubConn, done func(DoneInfo), err error)
  3. }

什么是 SubConn 呢?看一下这个类的介绍

  1. // Each sub connection contains a list of addresses. gRPC will
  2. // try to connect to them (in sequence), and stop trying the
  3. // remainder once one connection is successful.

这里我们就明白了,在分布式环境下,可能会存在多个 client 和 多个 server,client 发起一个 rpc 调用之前,需要通过 balancer 去找到一个 server 的 address,balancer 的 Picker 类返回一个 SubConn,SubConn 里面包含了多个 server 的 address,假如返回的 SubConn 是 “READY” 状态,grpc 会发送 RPC 请求,否则则会阻塞,等待 UpdateBalancerState 这个方法更新连接的状态并且通过 picker 获取一个新的 SubConn 连接。

channelz

channelz 主要用来监测 server 和 channel 的状态,这里的概念和实现比较复杂,暂时不进行深入研究,感兴趣的同学可以参考:https://github.com/grpc/proposal/blob/master/A14-channelz.md 这个 proposal ,初始化的代码如下:

  1. if channelz.IsOn() {
  2. if cc.dopts.channelzParentID != 0 {
  3. cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target)
  4. channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
  5. Desc: "Channel Created",
  6. Severity: channelz.CtINFO,
  7. Parent: &channelz.TraceEventDesc{
  8. Desc: fmt.Sprintf("Nested Channel(id:%d) created", cc.channelzID),
  9. Severity: channelz.CtINFO,
  10. },
  11. })
  12. } else {
  13. cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, 0, target)
  14. channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
  15. Desc: "Channel Created",
  16. Severity: channelz.CtINFO,
  17. })
  18. }
  19. cc.csMgr.channelzID = cc.channelzID
  20. }

Authentication

这一段是对认证信息的初始化校验,这里暂不研究,感兴趣的同学可以了解下 :https://grpc.io/docs/guides/auth/

  1. if !cc.dopts.insecure {
  2. if cc.dopts.copts.TransportCredentials == nil && cc.dopts.copts.CredsBundle == nil {
  3. return nil, errNoTransportSecurity
  4. }
  5. if cc.dopts.copts.TransportCredentials != nil && cc.dopts.copts.CredsBundle != nil {
  6. return nil, errTransportCredsAndBundle
  7. }
  8. } else {
  9. if cc.dopts.copts.TransportCredentials != nil || cc.dopts.copts.CredsBundle != nil {
  10. return nil, errCredentialsConflict
  11. }
  12. for _, cd := range cc.dopts.copts.PerRPCCredentials {
  13. if cd.RequireTransportSecurity() {
  14. return nil, errTransportCredentialsMissing
  15. }
  16. }
  17. }

Dialer

Dialer 是发起 rpc 请求的调用器,dialer 中实现了对 rpc 请求调用的具体细节,所以可以算是我们的重点研究对象之一。dialer 中包括了连接建立、地址解析、服务发现、长连接等等具体策略的实现。

  1. if cc.dopts.copts.Dialer == nil {
  2. cc.dopts.copts.Dialer = newProxyDialer(
  3. func(ctx context.Context, addr string) (net.Conn, error) {
  4. network, addr := parseDialTarget(addr)
  5. return (&net.Dialer{}).DialContext(ctx, network, addr)
  6. },
  7. )
  8. }

这一段方法只是简单进行了地址的规则解析,我们具体看 DialContext 方法,其中有一行:

  1. addrs, err := d.resolver().resolveAddrList(resolveCtx, "dial", network, address, d.LocalAddr)

可以看到通过 dialer 的 resolver 来进行服务发现,这里以后我们再单独详细讲解。

这里值得一提的是,通过 dialContext 可以看出,这里的 dial 有两种请求方式,一种是 dialParallel , 另一种是 dialSerial。dialParallel 发出两个完全相同的请求,采用第一个返回的结果,抛弃掉第二个请求。dialSerial 则是发出一串(多个)请求。然后采取第一个返回的请求结果( 成功或者失败)。

scChan

scChan 是 dialOptions 中的一个属性,定义如下:

  1. scChan <-chan ServiceConfig

可以看到其实他是一个 ServiceConfig类型的一个 channel,那么 ServiceConfig 是什么呢?源码中对这个类的介绍如下:

  1. // ServiceConfig is provided by the service provider and contains parameters for how clients that connect to the service should behave.

通过介绍得知 ServiceConfig 是服务提供方约定的一些参数。这里说明 client 提供给 server 一个可以通过 channel 来修改这些参数的入口。这里到时候我们介绍服务发现时可以细讲,我们在这里只需要知道 client 的某些属性是可以被 server 修改的就行了

  1. if cc.dopts.scChan != nil {
  2. // Try to get an initial service config.
  3. select {
  4. case sc, ok := <-cc.dopts.scChan:
  5. if ok {
  6. cc.sc = &sc
  7. scSet = true
  8. }
  9. default:
  10. }
  11. }

2)通过一个 conn 创建一个客户端

通过一个 conn 创建客户端的代码如下:

  1. c := pb.NewGreeterClient(conn)

这一步非常简单,其实是 pb 文件中生成的代码,就是创建一个 greeterClient 的客户端。

  1. type greeterClient struct {
  2. cc *grpc.ClientConn
  3. }
  4. func NewGreeterClient(cc *grpc.ClientConn) GreeterClient {
  5. return &greeterClient{cc}
  6. }

3)发起 rpc 调用

前面在创建 Dialer 的时候,我们已经将请求的 target 解析成了 address。我们猜这一步应该是向指定 address 发起 rpc 请求了。来具体看看

  1. func (c *greeterClient) SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error) {
  2. out := new(HelloReply)
  3. err := c.cc.Invoke(ctx, "/helloworld.Greeter/SayHello", in, out, opts...)
  4. if err != nil {
  5. return nil, err
  6. }
  7. return out, nil
  8. }

SayHello 方法是通过调用 Invoke 的方法去发起 rpc 调用, Invoke 方法如下:

  1. func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...CallOption) error {
  2. // allow interceptor to see all applicable call options, which means those
  3. // configured as defaults from dial option as well as per-call options
  4. opts = combine(cc.dopts.callOptions, opts)
  5. if cc.dopts.unaryInt != nil {
  6. return cc.dopts.unaryInt(ctx, method, args, reply, cc, invoke, opts...)
  7. }
  8. return invoke(ctx, method, args, reply, cc, opts...)
  9. }
  10. func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
  11. cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)
  12. if err != nil {
  13. return err
  14. }
  15. if err := cs.SendMsg(req); err != nil {
  16. return err
  17. }
  18. return cs.RecvMsg(reply)
  19. }

Invoke 方法调用了 invoke, 在 invoke 这个方法里面,果然不出所料,我们看到了 sendMsg 和 recvMsg 接口,这两个接口在 clientStream 中被实现了。

SendMsg

我们先来看看 clientStream 中定义的 sendMsg,关键代码如下:

  1. func (cs *clientStream) SendMsg(m interface{}) (err error) {
  2. ...
  3. // load hdr, payload, data
  4. hdr, payload, data, err := prepareMsg(m, cs.codec, cs.cp, cs.comp)
  5. if err != nil {
  6. return err
  7. }
  8. ...
  9. op := func(a *csAttempt) error {
  10. err := a.sendMsg(m, hdr, payload, data)
  11. // nil out the message and uncomp when replaying; they are only needed for
  12. // stats which is disabled for subsequent attempts.
  13. m, data = nil, nil
  14. return err
  15. }
  16. }

先准备数据,然后再调用 csAttempt 这个结构体中的 sendMsg 方法,

  1. func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error {
  2. cs := a.cs
  3. if a.trInfo != nil {
  4. a.mu.Lock()
  5. if a.trInfo.tr != nil {
  6. a.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true)
  7. }
  8. a.mu.Unlock()
  9. }
  10. if err := a.t.Write(a.s, hdr, payld, &transport.Options{Last: !cs.desc.ClientStreams}); err != nil {
  11. if !cs.desc.ClientStreams {
  12. // For non-client-streaming RPCs, we return nil instead of EOF on error
  13. // because the generated code requires it. finish is not called; RecvMsg()
  14. // will call it with the stream's status independently.
  15. return nil
  16. }
  17. return io.EOF
  18. }
  19. if a.statsHandler != nil {
  20. a.statsHandler.HandleRPC(cs.ctx, outPayload(true, m, data, payld, time.Now()))
  21. }
  22. if channelz.IsOn() {
  23. a.t.IncrMsgSent()
  24. }
  25. return nil
  26. }

最终是通过 a.t.Write 发出的数据写操作,a.t 是一个 ClientTransport 类型,所以最终是通过 ClientTransport 这个结构体的 Write 方法发送数据

RecvMsg

发送数据是通过 ClientTransport 的 Write 方法,我们猜测接收数据肯定是某个结构体的 Read 方法。这里我们来详细看一下

  1. func (a *csAttempt) recvMsg(m interface{}, payInfo *payloadInfo) (err error) {
  2. ...
  3. err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, payInfo, a.decomp)
  4. ...
  5. if a.statsHandler != nil {
  6. a.statsHandler.HandleRPC(cs.ctx, &stats.InPayload{
  7. Client: true,
  8. RecvTime: time.Now(),
  9. Payload: m,
  10. // TODO truncate large payload.
  11. Data: payInfo.uncompressedBytes,
  12. WireLength: payInfo.wireLength,
  13. Length: len(payInfo.uncompressedBytes),
  14. })
  15. }
  16. ...
  17. }

可以看到调用了 recv 方法:

  1. func recv(p *parser, c baseCodec, s *transport.Stream, dc Decompressor, m interface{}, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor) error {
  2. d, err := recvAndDecompress(p, s, dc, maxReceiveMessageSize, payInfo, compressor)
  3. ...
  4. }

再看 recvAndDecompress 方法,调用了 recvMsg

  1. func recvAndDecompress(p *parser, s *transport.Stream, dc Decompressor, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor) ([]byte, error) {
  2. pf, d, err := p.recvMsg(maxReceiveMessageSize)
  3. ...
  4. }
  5. func (p *parser) recvMsg(maxReceiveMessageSize int) (pf payloadFormat, msg []byte, err error) {
  6. if _, err := p.r.Read(p.header[:]); err != nil {
  7. return 0, nil, err
  8. }
  9. ...
  10. }

这里比较清楚了,最终还是调用了 p.r.Read 方法,p.r 是一个 io.Reader 类型。果然万变不离其中,最终都是要落到 IO 上。

到这里,整个 client 结构已经基本解析清楚了,but wait,总感觉哪里不太对,接收数据是调用 io.Reader ,按道理发送数据应该也是调用 io.Writer 才对。可是追溯到 ClientTransport 这里,发现它是一个 interface ,并没有实现 Write 方法,所以,Write 也是一个接口,这里是不是可以继续追溯呢?

  1. Write(s *Stream, hdr []byte, data []byte, opts *Options) error

返回去从头看,我们找到了 transport 的来源,在 Serve() 方法 的 handleRawConn 方法中,newHttp2Transport,创建了一个 Http2Transport ,然后通过 serveStreams 方法将这个 Http2Transport 层层透传下去。

  1. // Finish handshaking (HTTP2)
  2. st := s.newHTTP2Transport(conn, authInfo)
  3. if st == nil {
  4. return
  5. }
  6. rawConn.SetDeadline(time.Time{})
  7. if !s.addConn(st) {
  8. return
  9. }
  10. go func() {
  11. s.serveStreams(st)
  12. s.removeConn(st)
  13. }()

继续看一下 http2Client 的 Write 方法,如下:

  1. func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
  2. ...
  3. hdr = append(hdr, data[:emptyLen]...)
  4. data = data[emptyLen:]
  5. df := &dataFrame{
  6. streamID: s.id,
  7. h: hdr,
  8. d: data,
  9. onEachWrite: t.setResetPingStrikes,
  10. }
  11. if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
  12. select {
  13. case <-t.ctx.Done():
  14. return ErrConnClosing
  15. default:
  16. }
  17. return ContextErr(s.ctx.Err())
  18. }
  19. return t.controlBuf.put(df)
  20. }

可以看到,最终是把 data 放到了一个 controlBuf 的结构体里面

  1. // controlBuf delivers all the control related tasks (e.g., window
  2. // updates, reset streams, and various settings) to the controller.
  3. controlBuf *controlBuffer

controlBuf 是 http2 客户端发送数据的实现,这里留待后续研究