服务发现

在了解 grpc 服务发现之前,我们先来了解一下服务发现的路由方式。一般来说,我们有客户端路由和代理层路由两种方式。

客户端路由

客户端路由模式,也就是调用方负责获取被调用方的地址信息,并使用相应的负载均衡算法发起请求。调用方访问服务注册服务,获取对应的服务 IP 地址和端口,可能还包括对应的服务负载信息(负载均衡算法、服务实例权重等)。调用方通过负载均衡算法选取其中一个发起请求。如下:

在这里插入图片描述 server 启动的时候向 config server 注册自身的服务地址,server 正常退出的时候调用接口移除自身地址,通过定时心跳保证服务是否正常以及地址的有效性。

代理层路由

代理层路由,不是由调用方去获取被调方的地址,而是通过代理的方式,由代理去获取被调方的地址、发起调用请求。如下:

在这里插入图片描述 代理层路由这种模式,对 server 的寻址不再是由 client 去实现,而是由代理去实现。client 只是会对代理层发起简单请求,代理层去进行 server 寻址、负载均衡等。

grpc 官方介绍的服务发现流程如下:

在这里插入图片描述 由这张图可以看出,grpc 是使用客户端路由的方式。具体的过程我们在介绍负载均衡时再继续介绍

grpc 服务发现

之前在介绍 grpc client 时谈到了 resolver 这个类,对下面这段代码并未做详细介绍,我们来详细看看

  1. if cc.dopts.resolverBuilder == nil {
  2. // Only try to parse target when resolver builder is not already set.
  3. cc.parsedTarget = parseTarget(cc.target)
  4. grpclog.Infof("parsed scheme: %q", cc.parsedTarget.Scheme)
  5. cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme)
  6. if cc.dopts.resolverBuilder == nil {
  7. // If resolver builder is still nil, the parsed target's scheme is
  8. // not registered. Fallback to default resolver and set Endpoint to
  9. // the original target.
  10. grpclog.Infof("scheme %q not registered, fallback to default scheme", cc.parsedTarget.Scheme)
  11. cc.parsedTarget = resolver.Target{
  12. Scheme: resolver.GetDefaultScheme(),
  13. Endpoint: target,
  14. }
  15. cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme)
  16. }
  17. } else {
  18. cc.parsedTarget = resolver.Target{Endpoint: target}
  19. }

这段代码主要干了两件事情,parseTarget 和 resolver.Get 获取了一个 resolverBuilder

parseTarget 其实就是将 target 赋值给了 resolver target 对象的 endpoint 属性,如下

  1. func parseTarget(target string) (ret resolver.Target) {
  2. var ok bool
  3. ret.Scheme, ret.Endpoint, ok = split2(target, "://")
  4. if !ok {
  5. return resolver.Target{Endpoint: target}
  6. }
  7. ret.Authority, ret.Endpoint, ok = split2(ret.Endpoint, "/")
  8. if !ok {
  9. return resolver.Target{Endpoint: target}
  10. }
  11. return ret
  12. }

这里来看 resolver.Get 方法 ,这里从一个 map 中取出了一个 Builder

  1. var (
  2. // m is a map from scheme to resolver builder.
  3. m = make(map[string]Builder)
  4. // defaultScheme is the default scheme to use.
  5. defaultScheme = "passthrough"
  6. )
  7. func Get(scheme string) Builder {
  8. if b, ok := m[scheme]; ok {
  9. return b
  10. }
  11. return nil
  12. }

Builder 是在 resolver 中定义的,在了解 Builder 是啥之前,我们先来看看 resolver 这个结构体

resolver

  1. // Package resolver defines APIs for name resolution in gRPC.
  2. // All APIs in this package are experimental.

resolver 主要提供了一个名字解析的规范,所有的名字解析服务可以实现这个规范,包括 dns 解析类 dns_resolver 就是实现了这个规范的一个解析器。

resolver 中定义了 Builder ,通过调用 Build 去获取一个 resolver 实例

  1. // Builder creates a resolver that will be used to watch name resolution updates.
  2. type Builder interface {
  3. // Build creates a new resolver for the given target.
  4. //
  5. // gRPC dial calls Build synchronously, and fails if the returned error is
  6. // not nil.
  7. Build(target Target, cc ClientConn, opts BuildOption) (Resolver, error)
  8. // Scheme returns the scheme supported by this resolver.
  9. // Scheme is defined at https://github.com/grpc/grpc/blob/master/doc/naming.md.
  10. Scheme() string
  11. }

我们在调用 Dial 方法发起 rpc 请求之前需要创建一个 ClientConn 连接,在 DialContext 这个方法中对 ClientConn 各属性进行了赋值,其中有一行代码就完成了 build resolver 的工作。

  1. // Build the resolver.
  2. rWrapper, err := newCCResolverWrapper(cc)
  3. func newCCResolverWrapper(cc *ClientConn) (*ccResolverWrapper, error) {
  4. rb := cc.dopts.resolverBuilder
  5. if rb == nil {
  6. return nil, fmt.Errorf("could not get resolver for scheme: %q", cc.parsedTarget.Scheme)
  7. }
  8. ccr := &ccResolverWrapper{
  9. cc: cc,
  10. addrCh: make(chan []resolver.Address, 1),
  11. scCh: make(chan string, 1),
  12. }
  13. var err error
  14. ccr.resolver, err = rb.Build(cc.parsedTarget, ccr, resolver.BuildOption{DisableServiceConfig: cc.dopts.disableServiceConfig})
  15. if err != nil {
  16. return nil, err
  17. }
  18. return ccr, nil
  19. }

不出意料,我们之前通过 get 去获取了一个 Builder, 这里调用了 Builder 的 Build 方法产生一个 resolver。

register()

上面我们说到了,resolver 通过 get 方法,根据一个 string key 去一个 builder map 中获取一个 builder,这个 map 在 resolver 中初始化如下,那么是怎么进行赋值的呢?

  1. var (
  2. // m is a map from scheme to resolver builder.
  3. m = make(map[string]Builder)
  4. // defaultScheme is the default scheme to use.
  5. defaultScheme = "passthrough"
  6. )

我们猜测肯定会有一个服务注册的过程,果然看到了一个 Register 方法

  1. func Register(b Builder) {
  2. m[b.Scheme()] = b
  3. }

所有的 resolver 实现类通过 Register 方法去实现 Builder 的注册,比如 grpc 提供的 dnsResolver 这个类中调用了 init 方法,在服务初始化时实现了 Builder 的注册

  1. func init() {
  2. resolver.Register(NewBuilder())
  3. }

获取服务地址

resolver 和 builder 都是 interface,也就是说它们只是定义了一套规则。具体实现由实现他们的子类去完成。例如在 helloworld 例子中,默认是通过默认的 passthrough 这个 scheme 去获取的 passthroughResolver 和 passthroughBuilder,我们来看 passthroughBuilder 的 Build 方法返回了一个带有 address 的 resolver,这个地址就是 server 的地址列表。在 helloworld demo 中,就是 “localhost:50051”。

  1. func (*passthroughBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) {
  2. r := &passthroughResolver{
  3. target: target,
  4. cc: cc,
  5. }
  6. r.start()
  7. return r, nil
  8. }
  9. func (r *passthroughResolver) start() {
  10. r.cc.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: r.target.Endpoint}}})
  11. }

dns_resolver

grpc 支持自定义 resolver 实现服务发现。同时 grpc 官方提供了一个基于 dns 的服务发现 resolver,这就是 dns_resolver,dns_resolver 通过 Build() 创建一个 resolver 实例,我们来具体看一下 Build() 方法:

  1. // Build creates and starts a DNS resolver that watches the name resolution of the target.
  2. func (b *dnsBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) {
  3. host, port, err := parseTarget(target.Endpoint, defaultPort)
  4. if err != nil {
  5. return nil, err
  6. }
  7. // IP address.
  8. if net.ParseIP(host) != nil {
  9. host, _ = formatIP(host)
  10. addr := []resolver.Address{{Addr: host + ":" + port}}
  11. i := &ipResolver{
  12. cc: cc,
  13. ip: addr,
  14. rn: make(chan struct{}, 1),
  15. q: make(chan struct{}),
  16. }
  17. cc.NewAddress(addr)
  18. go i.watcher()
  19. return i, nil
  20. }
  21. // DNS address (non-IP).
  22. ctx, cancel := context.WithCancel(context.Background())
  23. d := &dnsResolver{
  24. freq: b.minFreq,
  25. backoff: backoff.Exponential{MaxDelay: b.minFreq},
  26. host: host,
  27. port: port,
  28. ctx: ctx,
  29. cancel: cancel,
  30. cc: cc,
  31. t: time.NewTimer(0),
  32. rn: make(chan struct{}, 1),
  33. disableServiceConfig: opts.DisableServiceConfig,
  34. }
  35. if target.Authority == "" {
  36. d.resolver = defaultResolver
  37. } else {
  38. d.resolver, err = customAuthorityResolver(target.Authority)
  39. if err != nil {
  40. return nil, err
  41. }
  42. }
  43. d.wg.Add(1)
  44. go d.watcher()
  45. return d, nil
  46. }

在 Build 方法中,我们没有看到对 server address 寻址的过程,仔细找找,发现了一个 watcher 方法,如下:

  1. go d.watcher()

看一下 watcher 方法,发现它其实是一个监控进程,顾名思义作用是监控我们产生的 resolver 的状态,这里使用了一个 for 循环无限监听,通过 chan 进行消息通知。

  1. func (d *dnsResolver) watcher() {
  2. defer d.wg.Done()
  3. for {
  4. select {
  5. case <-d.ctx.Done():
  6. return
  7. case <-d.t.C:
  8. case <-d.rn:
  9. if !d.t.Stop() {
  10. // Before resetting a timer, it should be stopped to prevent racing with
  11. // reads on it's channel.
  12. <-d.t.C
  13. }
  14. }
  15. result, sc := d.lookup()
  16. // Next lookup should happen within an interval defined by d.freq. It may be
  17. // more often due to exponential retry on empty address list.
  18. if len(result) == 0 {
  19. d.retryCount++
  20. d.t.Reset(d.backoff.Backoff(d.retryCount))
  21. } else {
  22. d.retryCount = 0
  23. d.t.Reset(d.freq)
  24. }
  25. d.cc.NewServiceConfig(sc)
  26. d.cc.NewAddress(result)
  27. // Sleep to prevent excessive re-resolutions. Incoming resolution requests
  28. // will be queued in d.rn.
  29. t := time.NewTimer(minDNSResRate)
  30. select {
  31. case <-t.C:
  32. case <-d.ctx.Done():
  33. t.Stop()
  34. return
  35. }
  36. }
  37. }

我们定位到里面的 lookup 方法。

  1. result, sc := d.lookup()

进入 lookup 方法,发现它调用了 lookupSRV 这个方法

  1. func (d *dnsResolver) lookup() ([]resolver.Address, string) {
  2. newAddrs := d.lookupSRV()
  3. // Support fallback to non-balancer address.
  4. newAddrs = append(newAddrs, d.lookupHost()...)
  5. if d.disableServiceConfig {
  6. return newAddrs, ""
  7. }
  8. sc := d.lookupTXT()
  9. return newAddrs, canaryingSC(sc)
  10. }

继续追踪,lookupSRV 这个方法最终其实调用了 go 源码包 net 包下的 的 lookupSRV 方法,这个方法实现了 dns 协议对指定的service服务,protocol协议以及name域名进行srv查询,返回server 的 address 列表。经过层层解剖,我们终于找到了返回 server 的 address list 的代码。

  1. _, srvs, err := d.resolver.LookupSRV(d.ctx, "grpclb", "tcp", d.host)
  2. ...
  3. func (r *Resolver) LookupSRV(ctx context.Context, service, proto, name string) (cname string, addrs []*SRV, err error) {
  4. return r.lookupSRV(ctx, service, proto, name)
  5. }

总结

总结一下, grpc 的服务发现,主要通过 resolver 接口去定义,支持业务自己实现服务发现的 resolver。 grpc 提供了默认的 passthrough_resolver,不进行地址解析,直接将 client 发起请求时指定的 address (例如 helloworld client 指定地址为 “localhost:50051” )当成 server address。同时,假如业务使用 dns 进行服务发现,grpc 提供了 dns_resolver,通过对指定的service服务,protocol协议以及name域名进行srv查询,来返回 server 的 address 列表。