协议编解码器

一般的协议都会包括协议头和协议体,对于业务而言,一般只关心需要发送的业务数据。所以,协议头的内容一般是框架自动帮忙填充。将业务数据包装成指定协议格式的数据包就是编码的过程,从指定协议格式中的数据包中取出业务数据的过程就是解码的过程。

每个 rpc 框架基本都有自己的编解码器,下面我们就来说说 grpc 的编解码过程。

grpc 解码

我们还是从我们的 examples 目录下的 helloworld demo 中 server 的 main 函数入手

  1. func main() {
  2. lis, err := net.Listen("tcp", port)
  3. if err != nil {
  4. log.Fatalf("failed to listen: %v", err)
  5. }
  6. s := grpc.NewServer()
  7. pb.RegisterGreeterServer(s, &server{})
  8. if err := s.Serve(lis); err != nil {
  9. log.Fatalf("failed to serve: %v", err)
  10. }
  11. }

在 s.Serve(lis) ——> s.handleRawConn(rawConn) —— > s.serveStreams(st) ——> s.handleStream(st, stream, s.traceInfo(st, stream)) ——> s.processUnaryRPC(t, stream, srv, md, trInfo) 方法中有一段代码:

  1. sh := s.opts.statsHandler
  2. ...
  3. df := func(v interface{}) error {
  4. if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil {
  5. return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
  6. }
  7. if sh != nil {
  8. sh.HandleRPC(stream.Context(), &stats.InPayload{
  9. RecvTime: time.Now(),
  10. Payload: v,
  11. WireLength: payInfo.wireLength,
  12. Data: d,
  13. Length: len(d),
  14. })
  15. }
  16. if binlog != nil {
  17. binlog.Log(&binarylog.ClientMessage{
  18. Message: d,
  19. })
  20. }
  21. if trInfo != nil {
  22. trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
  23. }
  24. return nil
  25. }

这段代码的逻辑先调 getCodec 获取解包类,然后调用这个类的 Unmarshal 方法进行解包。将业务数据取出来,然后调用 handler 进行处理。

  1. func (s *Server) getCodec(contentSubtype string) baseCodec {
  2. if s.opts.codec != nil {
  3. return s.opts.codec
  4. }
  5. if contentSubtype == "" {
  6. return encoding.GetCodec(proto.Name)
  7. }
  8. codec := encoding.GetCodec(contentSubtype)
  9. if codec == nil {
  10. return encoding.GetCodec(proto.Name)
  11. }
  12. return codec
  13. }

我们来看 getCodec 这个方法,它是通过 contentSubtype 这个字段来获取解包类的。假如不设置 contentSubtype ,那么默认会用名字为 proto 的解码器。

我们来看看 contentSubtype 是如何设置的。之前说到了 grpc 的底层默认是基于 http2 的。在 serveHttp 时调用了 NewServerHandlerTransport 这个方法来创建一个 ServerTransport,然后我们发现,其实就是根据 content-type 这个字段去生成的。

  1. func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats stats.Handler) (ServerTransport, error) {
  2. ...
  3. contentType := r.Header.Get("Content-Type")
  4. // TODO: do we assume contentType is lowercase? we did before
  5. contentSubtype, validContentType := contentSubtype(contentType)
  6. if !validContentType {
  7. return nil, errors.New("invalid gRPC request content-type")
  8. }
  9. if _, ok := w.(http.Flusher); !ok {
  10. return nil, errors.New("gRPC requires a ResponseWriter supporting http.Flusher")
  11. }
  12. st := &serverHandlerTransport{
  13. rw: w,
  14. req: r,
  15. closedCh: make(chan struct{}),
  16. writes: make(chan func()),
  17. contentType: contentType,
  18. contentSubtype: contentSubtype,
  19. stats: stats,
  20. }
  21. }

我们来看看 contentSubtype 这个方法 。

  1. ...
  2. baseContentType = "application/grpc"
  3. ...
  4. func contentSubtype(contentType string) (string, bool) {
  5. if contentType == baseContentType {
  6. return "", true
  7. }
  8. if !strings.HasPrefix(contentType, baseContentType) {
  9. return "", false
  10. }
  11. // guaranteed since != baseContentType and has baseContentType prefix
  12. switch contentType[len(baseContentType)] {
  13. case '+', ';':
  14. // this will return true for "application/grpc+" or "application/grpc;"
  15. // which the previous validContentType function tested to be valid, so we
  16. // just say that no content-subtype is specified in this case
  17. return contentType[len(baseContentType)+1:], true
  18. default:
  19. return "", false
  20. }
  21. }

可以看到 grpc 协议默认以 application/grpc 开头,假如不一这个开头会返回错误,假如我们想使用 json 的解码器,应该设置 content-type = application/grpc+json 。下面是一个基于 grpc 协议的请求 request :

  1. HEADERS (flags = END_HEADERS)
  2. :method = POST
  3. :scheme = http
  4. :path = /google.pubsub.v2.PublisherService/CreateTopic
  5. :authority = pubsub.googleapis.com
  6. grpc-timeout = 1S
  7. content-type = application/grpc+proto
  8. grpc-encoding = gzip
  9. authorization = Bearer y235.wef315yfh138vh31hv93hv8h3v
  10. DATA (flags = END_STREAM)
  11. <Length-Prefixed Message>

详细可参考 proto-http2

怎么拿的呢,再看一下 encoding.getCodec 方法

  1. func GetCodec(contentSubtype string) Codec {
  2. return registeredCodecs[contentSubtype]
  3. }

它其实取得是 registeredCodecs 这个 map 中的 codec,这个 map 是 RegisterCodec 方法注册进去的。

  1. var registeredCodecs = make(map[string]Codec)
  2. func RegisterCodec(codec Codec) {
  3. if codec == nil {
  4. panic("cannot register a nil Codec")
  5. }
  6. if codec.Name() == "" {
  7. panic("cannot register Codec with empty string result for Name()")
  8. }
  9. contentSubtype := strings.ToLower(codec.Name())
  10. registeredCodecs[contentSubtype] = codec
  11. }

毫无疑问, encoding 目录的 proto 包下肯定在初始化时调用注册方法了。果然

  1. func init() {
  2. encoding.RegisterCodec(codec{})
  3. }

绕了一圈,调用的其实是 proto 的 Unmarshal 方法,如下:

  1. func (codec) Unmarshal(data []byte, v interface{}) error {
  2. protoMsg := v.(proto.Message)
  3. protoMsg.Reset()
  4. if pu, ok := protoMsg.(proto.Unmarshaler); ok {
  5. // object can unmarshal itself, no need for buffer
  6. return pu.Unmarshal(data)
  7. }
  8. cb := protoBufferPool.Get().(*cachedProtoBuffer)
  9. cb.SetBuf(data)
  10. err := cb.Unmarshal(protoMsg)
  11. cb.SetBuf(nil)
  12. protoBufferPool.Put(cb)
  13. return err
  14. }

grpc 编码

在剖析解码代码的基础上,编码代码就很轻松了,其实直接找到 encoding 目录的 proto 包,看 Marshal 方法在哪儿被调用就行了。

于是我们很快就找到了调用路径,也是这个路径:

s.Serve(lis) ——> s.handleRawConn(rawConn) —— > s.serveStreams(st) ——> s.handleStream(st, stream, s.traceInfo(st, stream)) ——> s.processUnaryRPC(t, stream, srv, md, trInfo)

processUnaryRPC 方法中有一段 server 发送响应数据的代码。其实也就是这一行:

  1. if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {

我们其实也能猜到,发送数据给 client 之前肯定要编码。果然调用了 encode 方法

  1. func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error {
  2. data, err := encode(s.getCodec(stream.ContentSubtype()), msg)
  3. if err != nil {
  4. grpclog.Errorln("grpc: server failed to encode response: ", err)
  5. return err
  6. }
  7. ...
  8. }

来看一下 encode

  1. func encode(c baseCodec, msg interface{}) ([]byte, error) {
  2. if msg == nil { // NOTE: typed nils will not be caught by this check
  3. return nil, nil
  4. }
  5. b, err := c.Marshal(msg)
  6. if err != nil {
  7. return nil, status.Errorf(codes.Internal, "grpc: error while marshaling: %v", err.Error())
  8. }
  9. if uint(len(b)) > math.MaxUint32 {
  10. return nil, status.Errorf(codes.ResourceExhausted, "grpc: message too large (%d bytes)", len(b))
  11. }
  12. return b, nil
  13. }

它调用了 c.Marshal 方法, Marshal 方法其实是 baseCodec 定义的一个通用抽象方法

  1. type baseCodec interface {
  2. Marshal(v interface{}) ([]byte, error)
  3. Unmarshal(data []byte, v interface{}) error
  4. }

proto 实现了 baseCodec,前面说到了通过 s.getCodec(stream.ContentSubtype(),msg) 获取到的其实是 contentType 里面设置的协议名称,不设置的话默认取 proto 的编码器。所以最终是调用了 proto 包下的 Marshal 方法,如下:

  1. func (codec) Marshal(v interface{}) ([]byte, error) {
  2. if pm, ok := v.(proto.Marshaler); ok {
  3. // object can marshal itself, no need for buffer
  4. return pm.Marshal()
  5. }
  6. cb := protoBufferPool.Get().(*cachedProtoBuffer)
  7. out, err := marshal(v, cb)
  8. // put back buffer and lose the ref to the slice
  9. cb.SetBuf(nil)
  10. protoBufferPool.Put(cb)
  11. return out, err
  12. }

ok,那么至此,grpc 的整个编解码的流程我们就已经剖析完了