前提

服务注册与发现最简单的就是direct固定服务端地址的直连方式。也就是服务端正常监听端口启动不进行额外操作,客户端使用如下target

  1. direct://default/127.0.0.1:9000,127.0.0.1:9091

target就是标准的URL资源定位符查看WIKI

其中direct为协议类型,此处表示直接使用该URL内提供的地址127.0.0.1:9000,127.0.0.1:9091进行连接,而default在此处无意义仅当做占位符。

gRPC Resolver

gRPC暴露了服务发现的接口resolver.Builderresolver.ClientConnresolver.Resolver官方代码位置

  1. // Builder creates a resolver that will be used to watch name resolution updates.
  2. type Builder interface {
  3. // Build creates a new resolver for the given target.
  4. //
  5. // gRPC dial calls Build synchronously, and fails if the returned error is
  6. // not nil.
  7. Build(target Target, cc ClientConn, opts BuildOption) (Resolver, error)
  8. // Scheme returns the scheme supported by this resolver.
  9. // Scheme is defined at https://github.com/grpc/grpc/blob/master/doc/naming.md.
  10. Scheme() string
  11. }
  12. // ClientConn contains the callbacks for resolver to notify any updates
  13. // to the gRPC ClientConn.
  14. //
  15. // This interface is to be implemented by gRPC. Users should not need a
  16. // brand new implementation of this interface. For the situations like
  17. // testing, the new implementation should embed this interface. This allows
  18. // gRPC to add new methods to this interface.
  19. type ClientConn interface {
  20. // UpdateState updates the state of the ClientConn appropriately.
  21. UpdateState(State)
  22. // NewAddress is called by resolver to notify ClientConn a new list
  23. // of resolved addresses.
  24. // The address list should be the complete list of resolved addresses.
  25. //
  26. // Deprecated: Use UpdateState instead.
  27. NewAddress(addresses []Address)
  28. // NewServiceConfig is called by resolver to notify ClientConn a new
  29. // service config. The service config should be provided as a json string.
  30. //
  31. // Deprecated: Use UpdateState instead.
  32. NewServiceConfig(serviceConfig string)
  33. }
  34. // Resolver watches for the updates on the specified target.
  35. // Updates include address updates and service config updates.
  36. type Resolver interface {
  37. // ResolveNow will be called by gRPC to try to resolve the target name
  38. // again. It's just a hint, resolver can ignore this if it's not necessary.
  39. //
  40. // It could be called multiple times concurrently.
  41. ResolveNow(ResolveNowOption)
  42. // Close closes the resolver.
  43. Close()
  44. }

下面依次分析这三个接口的作用:

  • Builder用于gRPC内部创建Resolver接口的实现,但注意声明的Build方法将接口ClientConn作为参数传入了
  • ClientConn接口有两个废弃方法不用管,看UpdateState方法需要传入State结构,看代码可以发现其中包含了Addresses []Address // Resolved addresses for the target,可以看出是需要将服务发现得到的Address对象列表告诉ClientConn的对象
  • Resolver提供了ResolveNow用于被gRPC尝试重新进行服务发现

看完这三个接口就可以明白gRPC的服务发现实现逻辑,通过Builder进行Reslover的创建,在Build的过程中将服务发现的地址信息丢给ClientConn用于内部连接创建等逻辑。主要逻辑可以按下面顺序来看源码理解:

  • clientDial时会根据target解析的scheme获取对应的Builder官方代码位置
  • Dial成功会创建出结构体ClientConn的对象官方代码位置(注意不是上面的ClientConn接口),可以看到结构体ClientConn内的成员resolverWrapper又实现了接口ClientConn的方法官方代码位置
  • resolverWrapper被初始化时就会调用Build方法官方代码位置,其中参数为接口ClientConn传入的是ccResolverWrapper
  • 当用户基于Builder的实现进行UpdateState调用时,则会触发结构体ClientConnupdateResolverState方法官方代码位置updateResolverState则会对传入的Address进行初始化等逻辑官方代码位置

如此整个服务发现过程就结束了。从中也可以看出gRPC官方提供的三个接口还是很灵活的,但也正因为灵活要实现稍微麻烦一些,而Address官方代码位置如果直接被业务拿来用于服务节点信息的描述结构则显得有些过于简单。

所以warden包装了gRPC的整个服务发现实现逻辑,代码分别位于pkg/naming/naming.gowarden/resolver/resolver.go,其中:

  • naming.go内定义了用于描述业务实例的Instance结构、用于服务注册的Registry接口、用于服务发现的Resolver接口
  • resolver.go内实现了gRPC官方的resolver.Builderresolver.Resolver接口,但也暴露了naming.go内的naming.Buildernaming.Resolver接口

warden Resolver

接下来看naming内的接口如下:

  1. // Resolver resolve naming service
  2. type Resolver interface {
  3. Fetch(context.Context) (*InstancesInfo, bool)
  4. Watch() <-chan struct{}
  5. Close() error
  6. }
  7. // Builder resolver builder.
  8. type Builder interface {
  9. Build(id string) Resolver
  10. Scheme() string
  11. }

可以看到封装方式与gRPC官方的方法一样,通过Builder进行Resolver的初始化。不同的是通过封装将参数进行了简化:

  • Build只需要传对应的服务id即可:warden/resolver/resolver.go在gRPC进行调用后,会根据Scheme方法查询对应的naming.Builder实现并调用Buildid传入,而naming.Resolver的实现即可通过id去对应的服务发现中间件进行实例信息的查询
  • Resolver则对方法进行了扩展,除了简单进行Fetch操作外还多了Watch方法,用于监听服务发现中间件的节点变化情况,从而能够实时的进行服务实例信息的更新

naming/discovery内实现了基于discovery为中间件的服务注册与发现逻辑。如果要实现其他中间件如etcd|zookeeper等的逻辑,参考naming/discovery/discovery.go内的逻辑,将与discovery的交互逻辑替换掉即可(后续会默认将etcd/zk等实现,敬请期待)。

使用discovery

因为warden内默认使用direct的方式,所以要使用discovery需要在业务的NewClient前进行注册,代码如下:

  1. package dao
  2. import (
  3. "context"
  4. "github.com/go-kratos/kratos/pkg/naming/discovery"
  5. "github.com/go-kratos/kratos/pkg/net/rpc/warden"
  6. "github.com/go-kratos/kratos/pkg/net/rpc/warden/resolver"
  7. "google.golang.org/grpc"
  8. )
  9. // AppID your appid, ensure unique.
  10. const AppID = "demo.service" // NOTE: example
  11. func init(){
  12. // NOTE: 注意这段代码,表示要使用discovery进行服务发现
  13. // NOTE: 还需注意的是,resolver.Register是全局生效的,所以建议该代码放在进程初始化的时候执行
  14. // NOTE: !!!切记不要在一个进程内进行多个不同中间件的Register!!!
  15. // NOTE: 在启动应用时,可以通过flag(-discovery.nodes) 或者 环境配置(DISCOVERY_NODES)指定discovery节点
  16. resolver.Register(discovery.Builder())
  17. }
  18. // NewClient new member grpc client
  19. func NewClient(cfg *warden.ClientConfig, opts ...grpc.DialOption) (DemoClient, error) {
  20. client := warden.NewClient(cfg, opts...)
  21. conn, err := client.Dial(context.Background(), "discovery://default/"+AppID)
  22. if err != nil {
  23. return nil, err
  24. }
  25. // 注意替换这里:
  26. // NewDemoClient方法是在"api"目录下代码生成的
  27. // 对应proto文件内自定义的service名字,请使用正确方法名替换
  28. return NewDemoClient(conn), nil
  29. }

注意:resolver.Register是全局行为,建议放在包加载阶段或main方法开始时执行,该方法执行后会在gRPC内注册构造方法

targetdiscovery://default/${appid},当gRPC内进行解析后会得到scheme=discoveryappid,然后进行以下逻辑:

  1. warden/resolver.Builder会通过scheme获取到naming/discovery.Builder对象(靠resolver.Register注册过的)
  2. 拿到naming/discovery.Builder后执行Build(appid)构造naming/discovery.Discovery
  3. naming/discovery.Discovery对象基于appid就知道要获取哪个服务的实例信息

服务注册

客户端既然使用了discovery进行服务发现,也就意味着服务端启动后必须将自己注册给discovery知道。

相对服务发现来讲,服务注册则简单很多,看naming/discovery/discovery.go内的代码实现了naming/naming.go内的Registry接口,服务端启动时可以参考下面代码进行注册:

  1. // 该代码可放在main.go,当warden server进行初始化之后
  2. // 省略...
  3. ip := "" // NOTE: 必须拿到您实例节点的真实IP,
  4. port := "" // NOTE: 必须拿到您实例grpc监听的真实端口,warden默认监听9000
  5. hn, _ := os.Hostname()
  6. dis := discovery.New(nil)
  7. ins := &naming.Instance{
  8. Zone: env.Zone,
  9. Env: env.DeployEnv,
  10. AppID: "your app id",
  11. Hostname: hn,
  12. Addrs: []string{
  13. "grpc://" + ip + ":" + port,
  14. },
  15. }
  16. cancel, err := dis.Register(context.Background(), ins)
  17. if err != nil {
  18. panic(err)
  19. }
  20. // 省略...
  21. // 特别注意!!!
  22. // cancel必须在进程退出时执行!!!
  23. cancel()

使用ETCD

和使用discovery类似,只需要在注册时使用etcd naming即可。

  1. package dao
  2. import (
  3. "context"
  4. "github.com/go-kratos/kratos/pkg/naming/etcd"
  5. "github.com/go-kratos/kratos/pkg/net/rpc/warden"
  6. "github.com/go-kratos/kratos/pkg/net/rpc/warden/resolver"
  7. "google.golang.org/grpc"
  8. )
  9. // AppID your appid, ensure unique.
  10. const AppID = "demo.service" // NOTE: example
  11. func init(){
  12. // NOTE: 注意这段代码,表示要使用etcd进行服务发现 ,其他事项参考discovery的说明
  13. // NOTE: 在启动应用时,可以通过flag(-etcd.endpoints) 或者 环境配置(ETCD_ENDPOINTS)指定etcd节点
  14. // NOTE: 如果需要自己指定配置时 需要同时设置DialTimeout 与 DialOptions: []grpc.DialOption{grpc.WithBlock()}
  15. resolver.Register(etcd.Builder(nil))
  16. }
  17. // NewClient new member grpc client
  18. func NewClient(cfg *warden.ClientConfig, opts ...grpc.DialOption) (DemoClient, error) {
  19. client := warden.NewClient(cfg, opts...)
  20. // 这里使用etcd scheme
  21. conn, err := client.Dial(context.Background(), "etcd://default/"+AppID)
  22. if err != nil {
  23. return nil, err
  24. }
  25. // 注意替换这里:
  26. // NewDemoClient方法是在"api"目录下代码生成的
  27. // 对应proto文件内自定义的service名字,请使用正确方法名替换
  28. return NewDemoClient(conn), nil
  29. }

etcd的服务注册与discovery基本相同,可以传入详细的etcd配置项, 或者传入nil后通过flag(-etcd.endpoints)/环境配置(ETCD_ENDPOINTS)来指定etcd节点。

其他配置项

etcd默认的全局keyPrefix为kratos_etcd,当该keyPrefix与项目中其他keyPrefix冲突时可以通过flag(-etcd.prefix)或者环境配置(ETCD_PREFIX)来指定keyPrefix。

扩展阅读

warden快速开始
warden拦截器
warden基于pb生成
warden负载均衡