grpc hello world server 解析

我们介绍 grpc quick start 时,通过快速启动一个 grpc server 端和 client 端,然后以 rpc 调用的方式输出一个 hello world。那么输出 hello world 需要经过哪些方法的处理呢?…….这个我也不知道,所以我们先去瞅瞅源码,探究一下 hello world 的背后是连接是如何建立的,然后一起来解读这个问题哈哈。

这节内容我们先来研究一下 server 端连接建立过程。

先放上 server 端的 main 函数。

  1. func main() {
  2. lis, err := net.Listen("tcp", port)
  3. if err != nil {
  4. log.Fatalf("failed to listen: %v", err)
  5. }
  6. s := grpc.NewServer()
  7. pb.RegisterGreeterServer(s, &server{})
  8. if err := s.Serve(lis); err != nil {
  9. log.Fatalf("failed to serve: %v", err)
  10. }
  11. }

我们发现其实 server 端连接的建立主要包括三步:

(1)创建 server

(2)server 的注册

(3)调用 Serve 监听端口并处理请求

ok,弄清楚主流程之后下面我们进入每个步骤里面去看一下代码实现。

1、创建 server

server 的创建比较简单,其实就下面一个方法:

  1. func NewServer(opt ...ServerOption) *Server {
  2. opts := defaultServerOptions
  3. for _, o := range opt {
  4. o.apply(&opts)
  5. }
  6. s := &Server{
  7. lis: make(map[net.Listener]bool),
  8. opts: opts,
  9. conns: make(map[transport.ServerTransport]bool),
  10. m: make(map[string]*service),
  11. quit: grpcsync.NewEvent(),
  12. done: grpcsync.NewEvent(),
  13. czData: new(channelzData),
  14. }
  15. s.cv = sync.NewCond(&s.mu)
  16. if EnableTracing {
  17. _, file, line, _ := runtime.Caller(1)
  18. s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
  19. }
  20. if channelz.IsOn() {
  21. s.channelzID = channelz.RegisterServer(&channelzServer{s}, "")
  22. }
  23. return s
  24. }

这个方法的核心无非是创建了一个 server 结构体,然后为结构体的属性赋值。我们顺便来瞅瞅 server 的结构:

  1. // Server is a gRPC server to serve RPC requests.
  2. type Server struct {
  3. // serverOptions 就是描述协议的各种参数选项,包括发送和接收的消息大小、buffer大小等等各种,跟 http Headers 类似,我们这里就暂时先不管
  4. opts serverOptions
  5. // 一个互斥锁
  6. mu sync.Mutex // guards following
  7. // listener map
  8. lis map[net.Listener]bool
  9. // connections map
  10. conns map[transport.ServerTransport]bool
  11. // server 是否在处理请求的一个状态位
  12. serve bool
  13. drain bool
  14. cv *sync.Cond // signaled when connections close for GracefulStop
  15. // service map
  16. m map[string]*service // service name -> service info
  17. events trace.EventLog
  18. quit *grpcsync.Event
  19. done *grpcsync.Event
  20. channelzRemoveOnce sync.Once
  21. serveWG sync.WaitGroup // counts active Serve goroutines for GracefulStop
  22. channelzID int64 // channelz unique identification number
  23. czData *channelzData
  24. }

虽然 server 结构体里面各种乱起八糟的字段,但是我们可以先不管哈哈哈,比较重要的无非就是三个 map 表分别用来存放多个 listener 、connection 和 service。其他字段都是为了实现协议描述或者并发控制的功能。我们重点关注下

  1. m map[string]*service

这个结构,service 中主要包含了 MethodDesc 和 StreamDesc 这两个 map

  1. type service struct {
  2. server interface{} // the server for service methods
  3. md map[string]*MethodDesc
  4. sd map[string]*StreamDesc
  5. mdata interface{}
  6. }

enter image description here

2、server 注册

server 的注册调用了 RegisterGreeterServer 方法,这个方法是 pb.go 文件里面的,如下:

  1. func RegisterGreeterServer(s *grpc.Server, srv GreeterServer) {
  2. s.RegisterService(&_Greeter_serviceDesc, srv)
  3. }

这个方法调用了 server 的 RegisterService 方法,然后传入了一个 ServiceDesc 的数据结构,如下 :

  1. var _Greeter_serviceDesc = grpc.ServiceDesc{
  2. ServiceName: "helloworld.Greeter",
  3. HandlerType: (*GreeterServer)(nil),
  4. Methods: []grpc.MethodDesc{
  5. {
  6. MethodName: "SayHello",
  7. Handler: _Greeter_SayHello_Handler,
  8. },
  9. {
  10. MethodName: "SayHelloAgain",
  11. Handler: _Greeter_SayHelloAgain_Handler,
  12. },
  13. },
  14. Streams: []grpc.StreamDesc{},
  15. Metadata: "helloworld.proto",
  16. }

我们来看看 RegisterService 这个方法,可以看到主要是调用了 register 方法,register 方法则按照方法名为 key,将方法注入到 server 的 service map 中。看到这里我们其实可以预测一下,server 不同 rpc 请求的处理,也是根据 service 中不同的 serviceName 去 service map 中取出不同的 handler 进行处理

  1. func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
  2. ht := reflect.TypeOf(sd.HandlerType).Elem()
  3. st := reflect.TypeOf(ss)
  4. if !st.Implements(ht) {
  5. grpclog.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
  6. }
  7. s.register(sd, ss)
  8. }
  9. func (s *Server) register(sd *ServiceDesc, ss interface{}) {
  10. s.mu.Lock()
  11. defer s.mu.Unlock()
  12. s.printf("RegisterService(%q)", sd.ServiceName)
  13. if s.serve {
  14. grpclog.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
  15. }
  16. if _, ok := s.m[sd.ServiceName]; ok {
  17. grpclog.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
  18. }
  19. srv := &service{
  20. server: ss,
  21. md: make(map[string]*MethodDesc),
  22. sd: make(map[string]*StreamDesc),
  23. mdata: sd.Metadata,
  24. }
  25. for i := range sd.Methods {
  26. d := &sd.Methods[i]
  27. srv.md[d.MethodName] = d
  28. }
  29. for i := range sd.Streams {
  30. d := &sd.Streams[i]
  31. srv.sd[d.StreamName] = d
  32. }
  33. s.m[sd.ServiceName] = srv
  34. }

3、Serve 过程

回想所有 C/S 模式下,client 和 server 的通信基本是类似的。大致过程无非是 server 通过死循环的方式在某一个端口实现监听,然后 client 对这个端口发起连接请求,握手成功后建立连接,然后 server 处理 client 发送过来的请求数据,根据请求类型和请求参数,调用不同的 handler 进行处理,回写响应数据。

所以,对 server 端来说,主要是了解其如何实现监听,如何为请求分配不同的 handler 和 回写响应数据。

上面我们得知 server 调用了 Serve 方法来进行处理,所以立马就想跟进去看看。 跳过前面一堆条件检查和控制代码,直接锁定了一个 for 循环,如下:(中间的一些代码已省略)

  1. for {
  2. rawConn, err := lis.Accept()
  3. ......
  4. s.serveWG.Add(1)
  5. go func() {
  6. s.handleRawConn(rawConn)
  7. s.serveWG.Done()
  8. }()
  9. }

ok,我们已经看到了监听过程,server 的监听果然是通过一个死循环 调用了 lis.Accept() 进行端口监听。

继续往下看,我们发现新起协程调用了 handleRawConn 这个方法,为了节约篇幅,我们直接看重点代码,如下:

  1. func (s *Server) handleRawConn(rawConn net.Conn) {
  2. ...
  3. conn, authInfo, err := s.useTransportAuthenticator(rawConn)
  4. ...
  5. // Finish handshaking (HTTP2)
  6. st := s.newHTTP2Transport(conn, authInfo)
  7. if st == nil {
  8. return
  9. }
  10. ...
  11. go func() {
  12. s.serveStreams(st)
  13. s.removeConn(st)
  14. }()
  15. }

可以看到 handleRawConn 里面实现了 http 的 handshake,还记得之前我们说过,grpc 是基于 http2 实现的吗?这里是不是实锤了……. 发现又通过一个新的协程调用了 serveStreams 这个方法,这个方法干了啥呢?

  1. func (s *Server) serveStreams(st transport.ServerTransport) {
  2. defer st.Close()
  3. var wg sync.WaitGroup
  4. st.HandleStreams(func(stream *transport.Stream) {
  5. wg.Add(1)
  6. go func() {
  7. defer wg.Done()
  8. s.handleStream(st, stream, s.traceInfo(st, stream))
  9. }()
  10. }, func(ctx context.Context, method string) context.Context {
  11. if !EnableTracing {
  12. return ctx
  13. }
  14. tr := trace.New("grpc.Recv."+methodFamily(method), method)
  15. return trace.NewContext(ctx, tr)
  16. })
  17. wg.Wait()
  18. }

其实它主要调用了 handleStream ,继续跟进 handleStream 方法,我们发现了重要线索,如下(省略了部分无关代码)

  1. func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
  2. sm := stream.Method()
  3. ...
  4. service := sm[:pos]
  5. method := sm[pos+1:]
  6. srv, knownService := s.m[service]
  7. if knownService {
  8. if md, ok := srv.md[method]; ok {
  9. s.processUnaryRPC(t, stream, srv, md, trInfo)
  10. return
  11. }
  12. if sd, ok := srv.sd[method]; ok {
  13. s.processStreamingRPC(t, stream, srv, sd, trInfo)
  14. return
  15. }
  16. }
  17. ...
  18. }

重要线索就是这一行

  1. srv, knownService := s.m[service]

还记得我们之前的预测吗?根据 serviceName 去 server 中的 service map,也就是 m 这个字段,里面去取出 handler 进行处理。我们 hello world 这个 demo 的请求不涉及到 stream ,所以直接取出 handler ,然后传给 processUnaryRPC 这个方法进行处理。

  1. if md, ok := srv.md[method]; ok {
  2. s.processUnaryRPC(t, stream, srv, md, trInfo)
  3. return
  4. }

再来看看 processUnaryRpc 这个方法

  1. func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) {
  2. ...
  3. sh := s.opts.statsHandler
  4. if sh != nil {
  5. beginTime := time.Now()
  6. begin := &stats.Begin{
  7. BeginTime: beginTime,
  8. }
  9. sh.HandleRPC(stream.Context(), begin)
  10. defer func() {
  11. end := &stats.End{
  12. BeginTime: beginTime,
  13. EndTime: time.Now(),
  14. }
  15. if err != nil && err != io.EOF {
  16. end.Error = toRPCErr(err)
  17. }
  18. sh.HandleRPC(stream.Context(), end)
  19. }()
  20. }
  21. ...
  22. if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {
  23. if err == io.EOF {
  24. // The entire stream is done (for unary RPC only).
  25. return err
  26. }
  27. if s, ok := status.FromError(err); ok {
  28. if e := t.WriteStatus(stream, s); e != nil {
  29. grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status: %v", e)
  30. }
  31. } else {
  32. switch st := err.(type) {
  33. case transport.ConnectionError:
  34. // Nothing to do here.
  35. default:
  36. panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st))
  37. }
  38. }
  39. if binlog != nil {
  40. h, _ := stream.Header()
  41. binlog.Log(&binarylog.ServerHeader{
  42. Header: h,
  43. })
  44. binlog.Log(&binarylog.ServerTrailer{
  45. Trailer: stream.Trailer(),
  46. Err: appErr,
  47. })
  48. }
  49. return err
  50. }
  51. ...
  52. }

我们终于看到了 handler 对 rpc 的处理:

  1. sh := s.opts.statsHandler
  2. sh.HandleRPC(stream.Context(), begin)
  3. sh.HandleRPC(stream.Context(), end)

同时也看到了 response 的回写

  1. s.sendResponse(t, stream, reply, cp, opts, comp)

至此,server 端我们的目标实现,追踪到了整个请求和监听、handler 处理 和 response 回写的过程。