grpc负载均衡

负载均衡流程

grpc 官方的 doc 中介绍了,grpc 的负载均衡是基于一次请求而不是一次连接的。也就是说,假如所有的请求都来自同一个客户端的连接,这些请求还是会被均衡到所有服务器。

整个 grpc 负载均衡流程如下图:

 grpc 负载均衡 1、启动时,grpc client 通过服名字解析服务得到一个 address list,每个 address 将指示它是服务器地址还是负载平衡器地址,以及指示要哪个客户端负载平衡策略的服务配置(例如,round_robin 或 grpclb)

2、客户端实例化负载均衡策略 如果解析程序返回的任何一个地址是负载均衡器地址,则无论 service config 中定义了什么负载均衡策略,客户端都将使用grpclb策略。否则,客户端将使用 service config 中定义的负载均衡策略。如果服务配置未请求负载均衡策略,则客户端将默认使用选择第一个可用服务器地址的策略。

3、负载平衡策略为每个服务器地址创建一个 subchannel,假如是 grpclb 策略,客户端会根据名字解析服务返回的地址列表,请求负载均衡器,由负载均衡器决定请求哪个 subConn,然后打开一个数据流,对这个 subConn 中的所有服务器 adress 都建立连接,从而实现 client stream 的效果

4、当有rpc请求时,负载均衡策略决定哪个子通道即grpc服务器将接收请求,当可用服务器为空时客户端的请求将被阻塞。

源码实现

接下来我们来看一下源码里面关于 grpc 负载均衡的实现,这里主要分为初始化 balancer 和寻址两步

1. 初始化 balancer

之前介绍 grpc 服务发现时,我们知道了通过 dns_resolver 的 lookup 方法可以得到一个 address list,那么拿到地址列表之后具体干了什么事呢?下面我们来看看

  1. result, sc := d.lookup()
  2. // Next lookup should happen within an interval defined by d.freq. It may be
  3. // more often due to exponential retry on empty address list.
  4. if len(result) == 0 {
  5. d.retryCount++
  6. d.t.Reset(d.backoff.Backoff(d.retryCount))
  7. } else {
  8. d.retryCount = 0
  9. d.t.Reset(d.freq)
  10. }
  11. d.cc.NewServiceConfig(sc)
  12. d.cc.NewAddress(result)

这里调用了 NewAddress 方法,在 NewAddress 这个方法里面又调用了 updateResolverState 这个方法,对负载均衡器的初始化就是在这个方法中进行的,如下:

  1. if cc.dopts.balancerBuilder == nil {
  2. // Only look at balancer types and switch balancer if balancer dial
  3. // option is not set.
  4. var newBalancerName string
  5. if cc.sc != nil && cc.sc.lbConfig != nil {
  6. newBalancerName = cc.sc.lbConfig.name
  7. balCfg = cc.sc.lbConfig.cfg
  8. } else {
  9. var isGRPCLB bool
  10. for _, a := range s.Addresses {
  11. if a.Type == resolver.GRPCLB {
  12. isGRPCLB = true
  13. break
  14. }
  15. }
  16. if isGRPCLB {
  17. newBalancerName = grpclbName
  18. } else if cc.sc != nil && cc.sc.LB != nil {
  19. newBalancerName = *cc.sc.LB
  20. } else {
  21. newBalancerName = PickFirstBalancerName
  22. }
  23. }
  24. cc.switchBalancer(newBalancerName)
  25. } else if cc.balancerWrapper == nil {
  26. // Balancer dial option was set, and this is the first time handling
  27. // resolved addresses. Build a balancer with dopts.balancerBuilder.
  28. cc.curBalancerName = cc.dopts.balancerBuilder.Name()
  29. cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)
  30. }
  31. cc.balancerWrapper.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})

之前说到了我们在 dns_resolver 中查找 address 时是通过 grpclb 去进行查找的,所以它返回的 resolver 的策略就是 grpclb 策略。这里会进入到 switchBalancer 方法,我们来看看这个方法干了啥

  1. func (cc *ClientConn) switchBalancer(name string) {
  2. builder := balancer.Get(name)
  3. ...
  4. cc.curBalancerName = builder.Name()
  5. cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts)
  6. }

这里通过 grpclb 这个 name 去获取到了 grpclb 策略的一个 balancer 实现,然后调用了 newCCBalancerWrapper 这个方法,继续跟踪

  1. func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.BuildOptions) *ccBalancerWrapper {
  2. ccb := &ccBalancerWrapper{
  3. cc: cc,
  4. stateChangeQueue: newSCStateUpdateBuffer(),
  5. ccUpdateCh: make(chan *balancer.ClientConnState, 1),
  6. done: make(chan struct{}),
  7. subConns: make(map[*acBalancerWrapper]struct{}),
  8. }
  9. go ccb.watcher()
  10. ccb.balancer = b.Build(ccb, bopts)
  11. return ccb
  12. }

来看一下这里的 Build 方法,去 grpclb 这个策略的实现类里面看,发现它返回了一个 lbBalancer 实例

  1. func (b *lbBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
  2. ...
  3. lb := &lbBalancer{
  4. cc: newLBCacheClientConn(cc),
  5. target: opt.Target.Endpoint,
  6. opt: opt,
  7. fallbackTimeout: b.fallbackTimeout,
  8. doneCh: make(chan struct{}),
  9. manualResolver: r,
  10. subConns: make(map[resolver.Address]balancer.SubConn),
  11. scStates: make(map[balancer.SubConn]connectivity.State),
  12. picker: &errPicker{err: balancer.ErrNoSubConnAvailable},
  13. clientStats: newRPCStats(),
  14. backoff: defaultBackoffConfig, // TODO: make backoff configurable.
  15. }
  16. ...
  17. return lb
  18. }

2. 寻址

之前我们说到了,helloworld demo 中 client 发送请求主要分为三步,对 balancer 的初始化其实是在第一步 grpc.Dial 时初始化 dialContext 时完成的。那么寻址过程,就是在第三步调用 sayHello 时完成的。

我们进入 sayHello ——> c.cc.Invoke ——> invoke ——> newClientStream 方法中,有下面一段代码:

  1. if err := cs.newAttemptLocked(sh, trInfo); err != nil {
  2. cs.finish(err)
  3. return nil, err
  4. }

进入 newAttemptLocked 方法,如下:

  1. func (cs *clientStream) newAttemptLocked(sh stats.Handler, trInfo *traceInfo) error {
  2. cs.attempt = &csAttempt{
  3. cs: cs,
  4. dc: cs.cc.dopts.dc,
  5. statsHandler: sh,
  6. trInfo: trInfo,
  7. }
  8. if err := cs.ctx.Err(); err != nil {
  9. return toRPCErr(err)
  10. }
  11. t, done, err := cs.cc.getTransport(cs.ctx, cs.callInfo.failFast, cs.callHdr.Method)
  12. ...
  13. }

我们发现它调用了 getTransport 方法,进入这个方法,我们找到了 pick 方法的调用

  1. func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) {
  2. t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickOptions{
  3. FullMethodName: method,
  4. })
  5. if err != nil {
  6. return nil, nil, toRPCErr(err)
  7. }
  8. return t, done, nil
  9. }

pick 方法即是具体寻址的方法,仔细看 pick 方法,它先 Pick 获取了一个 SubConn,SubConn 结构体中包含了一个 address list,然后它会对每一个 address 都会发送 rpc 请求。

  1. func (bp *pickerWrapper) pick(ctx context.Context, failfast bool, opts balancer.PickOptions) (transport.ClientTransport, func(balancer.DoneInfo), error) {
  2. var ch chan struct{}
  3. for {
  4. ...
  5. p := bp.picker
  6. ...
  7. subConn, done, err := p.Pick(ctx, opts)
  8. ...
  9. }
  10. }

它调用了 pickWrapper 中的 Pick 方法,在第一步初始化 balancer 的时候我们说到,它返回的其实是 lbBalancer 实例,所以这里去看 lbBalancer 实例的 Pick 实现

  1. func (p *lbPicker) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
  2. p.mu.Lock()
  3. defer p.mu.Unlock()
  4. // Layer one roundrobin on serverList.
  5. s := p.serverList[p.serverListNext]
  6. p.serverListNext = (p.serverListNext + 1) % len(p.serverList)
  7. // If it's a drop, return an error and fail the RPC.
  8. if s.Drop {
  9. p.stats.drop(s.LoadBalanceToken)
  10. return nil, nil, status.Errorf(codes.Unavailable, "request dropped by grpclb")
  11. }
  12. // If not a drop but there's no ready subConns.
  13. if len(p.subConns) <= 0 {
  14. return nil, nil, balancer.ErrNoSubConnAvailable
  15. }
  16. // Return the next ready subConn in the list, also collect rpc stats.
  17. sc := p.subConns[p.subConnsNext]
  18. p.subConnsNext = (p.subConnsNext + 1) % len(p.subConns)
  19. done := func(info balancer.DoneInfo) {
  20. if !info.BytesSent {
  21. p.stats.failedToSend()
  22. } else if info.BytesReceived {
  23. p.stats.knownReceived()
  24. }
  25. }
  26. return sc, done, nil
  27. }

可以看到这其实是一个轮询实现。用一个指针表示这次取的位置,取过之后就更新这个指针为下一位。Pick 的返回是一个 SubConn 结构,SubConn 里面就包含了 server 的地址列表,此时寻址就完成了。

3、发起 request

寻址完成之后,我们得到了包含 server 地址列表的 SubConn,接下来是如何发送请求的呢?在 pick 方法中接着往下看,发现了这段代码。

  1. acw, ok := subConn.(*acBalancerWrapper)
  2. if !ok {
  3. grpclog.Error("subconn returned from pick is not *acBalancerWrapper")
  4. continue
  5. }
  6. if t, ok := acw.getAddrConn().getReadyTransport(); ok {
  7. if channelz.IsOn() {
  8. return t, doneChannelzWrapper(acw, done), nil
  9. }
  10. return t, done, nil
  11. }

这段代码先将 SubConn 转换成了一个 acBalancerWrapper ,然后获取其中的 addrConn 对象,接着调用 getReadyTransport 方法,如下:

  1. func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) {
  2. ac.mu.Lock()
  3. if ac.state == connectivity.Ready && ac.transport != nil {
  4. t := ac.transport
  5. ac.mu.Unlock()
  6. return t, true
  7. }
  8. var idle bool
  9. if ac.state == connectivity.Idle {
  10. idle = true
  11. }
  12. ac.mu.Unlock()
  13. // Trigger idle ac to connect.
  14. if idle {
  15. ac.connect()
  16. }
  17. return nil, false
  18. }

getReadyTransport 这个方法返回一个 Ready 状态的网络连接,假如连接状态是 IDLE 状态,会调用 connect 方法去进行客户端连接,connect 方法如下:

  1. func (ac *addrConn) connect() error {
  2. ac.mu.Lock()
  3. if ac.state == connectivity.Shutdown {
  4. ac.mu.Unlock()
  5. return errConnClosing
  6. }
  7. if ac.state != connectivity.Idle {
  8. ac.mu.Unlock()
  9. return nil
  10. }
  11. // Update connectivity state within the lock to prevent subsequent or
  12. // concurrent calls from resetting the transport more than once.
  13. ac.updateConnectivityState(connectivity.Connecting)
  14. ac.mu.Unlock()
  15. // Start a goroutine connecting to the server asynchronously.
  16. go ac.resetTransport()
  17. return nil
  18. }

通过 go ac.resetTransport() 这一行可以看到 connect 方法新起协程异步去与 server 建立连接。resetTransport 方法中有一行调用了 tryAllAddrs 方法,如下:

  1. newTr, addr, reconnect, err := ac.tryAllAddrs(addrs, connectDeadline)

我们猜测是在这个方法中去轮询 address 与 每个 address 的 server 建立连接。

  1. func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.Time) (transport.ClientTransport, resolver.Address, *grpcsync.Event, error) {
  2. for _, addr := range addrs {
  3. ...
  4. newTr, reconnect, err := ac.createTransport(addr, copts, connectDeadline)
  5. if err == nil {
  6. return newTr, addr, reconnect, nil
  7. }
  8. ac.cc.blockingpicker.updateConnectionError(err)
  9. }
  10. // Couldn't connect to any address.
  11. return nil, resolver.Address{}, nil, fmt.Errorf("couldn't connect to any address")
  12. }

一看这个方法,果然如此,遍历所有地址,然后调用了 createTransport 方法,为每个地址的服务器建立连接,看到这里,我们也明白了 Stream 的实现。传统的 client 实现是对某个 server 地址发起 connect,Stream 的实质无非是对一批 server 的 address 进行轮询并建立 connect

总结

grpc 负载均衡的实现是通过客户端路由的方式,先通过服务发现获取一个 resolver.Address 列表,resolver.Address 中包含了服务器地址和负载均衡服务名字,通过这个名字去初始化响应的 balancer,dns_resolver 中默认是使用的 grpclb 这个负载均衡器,寻址方式是轮询。通过调用 picker 去 生成一个 SubConn,SubConn 中包括服务器的地址列表,采用异步的方式对地址列表进行轮询,然后为每一个服务端地址都进行 connect 。

其实关于 resolver、balancer、picker 的实现还有很多细节部分,这里等待后续研究。