Registry

Registry implements service discovery.

1.7. Registry  - 图1

Services registered themself into registries and clients can find those service via registries.

There are some poplular products that can be used as registies, for example, zookeeper, etcd, consul. And many companies implement their registry center.

rpcx provides out-of-box registry for zookeeper, etcd, consul, mDNS, and it aslo provides peer-to-peer, peer-to-multiple registies. For the test purpose, rpcx aslo has a in-process registgry.

peer2peer

Example:102basic

Actually it has no registry. Clients connect services directly. It can be used in a small system and there is only one node for services. You can do few changes to change another registry when your system scales.

Server doesn't need to do more configurations.

Client uses the Peer2PeerDiscovery and only set the network and address of that service.

Since there is one node, the selector is meanless.

  1. d := client.NewPeer2PeerDiscovery("tcp@"+*addr, "")
  2. xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
  3. defer xclient.Close()

Notice: rpcx uses network@Host:port format to indicates one service. the network can be tcphttpunixquic or kcp. The Host can be host name or ip address.

NewXClient must use the service name as the first argument, then failmode, selector, discovery and other options.

peer2multiple

Example:multiple

If you has multiple services but has no a center registry. You can configure addresses of services in client programatically.

Server doesn't need to do more configurations.

Client uses the MultipleServersDiscovery and only set the network and address of that service.

  1. d := client.NewMultipleServersDiscovery([]*client.KVPair{{Key: *addr1}, {Key: *addr2}})
  2. xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
  3. defer xclient.Close()

You must set services info and metadata in MultipleServersDiscovery。 If some services are added or removed, you can call MultipleServersDiscovery.Update to update service dynamically.

  1. func (d *MultipleServersDiscovery) Update(pairs []*KVPair)

zookeeper

Example:zookeeper

Apache ZooKeeper is a software project of the Apache Software Foundation. It is essentially a distributed hierarchical key-value store, which is used to provide a distributed configuration service, synchronization service, and naming registry for large distributed systems.

Services must use ZooKeeperRegisterPlugin plugin to register their info into zookeeper, while clients must use ZookeeperDiscovery to get services info.

You must set ServiceAddress (network@address) and zookeeper address. You should set the basePath so that your services won't be conflict with other serviced developed by others.

You can set Metrics and rpcx will update throughputs periodically. If you want to update metrics you must add [Metrics] plugin in server.

You can set UpdateInterval to refresh the lease. rpcx sets the lease to UpdateInterval 3. That means if server has not update the lease in UpdateInterval 3, this node will be removed from zookeeper.

```go server.gofunc main() { flag.Parse()

  1. s := server.NewServer()
  2. addRegistryPlugin(s)
  3. s.RegisterName("Arith", new(example.Arith), "")
  4. s.Serve("tcp", *addr)

}

func addRegistryPlugin(s *server.Server) {

  1. r := &serverplugin.ZooKeeperRegisterPlugin{
  2. ServiceAddress: "tcp@" + *addr,
  3. ZooKeeperServers: []string{*zkAddr},
  4. BasePath: *basePath,
  5. Metrics: metrics.NewRegistry(),
  6. UpdateInterval: time.Minute,
  7. }
  8. err := r.Start()
  9. if err != nil {
  10. log.Fatal(err)
  11. }
  12. s.Plugins.Add(r)

}

  1. Configuration for Client is simple.
  2. You need to set the zookeeper address and the same basePath to service.
  3. ```go client.go
  4. d := client.NewZookeeperDiscovery(*basePath, "Arith",[]string{*zkAddr}, nil)
  5. xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
  6. defer xclient.Close()

etcd

Example:etcd

etcd is a distributed, reliable key-value store for the most critical data of a distributed system.

Services must use EtcdRegisterPlugin plugin to register their info into etcd, while clients must use EtcdDiscovery to get services info.

You must set ServiceAddress (network@address) and etcd address. You should set the basePath so that your services won't be conflict with other serviced developed by others.

You can set Metrics and rpcx will update throughputs periodically. If you want to update metrics you must add [Metrics] plugin in server.

You can set UpdateInterval to refresh the lease. rpcx sets the lease to UpdateInterval 3. That means if server has not update the lease in UpdateInterval 3, this node will be removed from etcd.

```go server.gofunc main() { flag.Parse()

  1. s := server.NewServer()
  2. addRegistryPlugin(s)
  3. s.RegisterName("Arith", new(example.Arith), "")
  4. s.Serve("tcp", *addr)

}

func addRegistryPlugin(s *server.Server) {

  1. r := &serverplugin.EtcdRegisterPlugin{
  2. ServiceAddress: "tcp@" + *addr,
  3. EtcdServers: []string{*etcdAddr},
  4. BasePath: *basePath,
  5. Metrics: metrics.NewRegistry(),
  6. UpdateInterval: time.Minute,
  7. }
  8. err := r.Start()
  9. if err != nil {
  10. log.Fatal(err)
  11. }
  12. s.Plugins.Add(r)

}

  1. Configuration for Client is simple.
  2. You need to set the etcd address and the same basePath to service.
  3. ```go client.go
  4. d := client.NewEtcdDiscovery(*basePath, "Arith",[]string{*etcdAddr}, nil)
  5. xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
  6. defer xclient.Close()

consul

Example:consul

Consul is a highly available and distributed service discovery and KV store designed with support for the modern data center to make distributed systems and configuration easy.

Services must use ConsulRegisterPlugin plugin to register their info into consul, while clients must use ConsulDiscovery to get services info.

You must set ServiceAddress (network@address) and consul address. You should set the basePath so that your services won't be conflict with other serviced developed by others.

You can set Metrics and rpcx will update throughputs periodically. If you want to update metrics you must add [Metrics] plugin in server.

You can set UpdateInterval to refresh the lease. rpcx sets the lease to UpdateInterval 3. That means if server has not update the lease in UpdateInterval 3, this node will be removed from consul.

```go server.gofunc main() { flag.Parse()

  1. s := server.NewServer()
  2. addRegistryPlugin(s)
  3. s.RegisterName("Arith", new(example.Arith), "")
  4. s.Serve("tcp", *addr)

}

func addRegistryPlugin(s *server.Server) {

  1. r := &serverplugin.ConsulRegisterPlugin{
  2. ServiceAddress: "tcp@" + *addr,
  3. ConsulServers: []string{*consulAddr},
  4. BasePath: *basePath,
  5. Metrics: metrics.NewRegistry(),
  6. UpdateInterval: time.Minute,
  7. }
  8. err := r.Start()
  9. if err != nil {
  10. log.Fatal(err)
  11. }
  12. s.Plugins.Add(r)

}

  1. Configuration for Client is simple.
  2. You need to set the etcd address and the same basePath to service.
  3. ```go client.go
  4. d := client.NewConsulDiscovery(*basePath, "Arith",[]string{*consulAddr}, nil)
  5. xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
  6. defer xclient.Close()

mDNS

Example:mDNS

mDNS resolves host names to IP addresses within small networks that do not include a local name server. It is a zero-configuration service, using essentially the same programming interfaces, packet formats and operating semantics as the unicast Domain Name System (DNS). Although Stuart Cheshire designed mDNS to be stand-alone capable, it can work in concert with unicast DNS servers.

The mDNS protocol is published as RFC 6762, uses IP multicast User Datagram Protocol (UDP) packets, and is implemented by the Apple Bonjour, Spotify Connect, Philips Hue, Google Chromecast, and open source Avahi (software) software packages. Android contains an mDNS implementation. mDNS has also been implemented in Windows 10, but its use is limited to discovering networked printers.

mDNS can work in conjunction with DNS Service Discovery (DNS-SD), a companion zero-configuration technique specified separately in RFC 6763.

Services must use MDNSRegisterPlugin plugin to register their info into mDNS, while clients must use MDNSDiscovery to get services info.

You must set ServiceAddress (network@address). You should set the basePath so that your services won't be conflict with other serviced developed by others.

You can set Metrics and rpcx will update throughputs periodically. If you want to update metrics you must add [Metrics] plugin in server.

You can set UpdateInterval to refresh tps. If the server is down, it will be removed from mDNS automatically.

```go server.gofunc main() { flag.Parse()

  1. s := server.NewServer()
  2. addRegistryPlugin(s)
  3. s.RegisterName("Arith", new(example.Arith), "")
  4. s.Serve("tcp", *addr)

}

func addRegistryPlugin(s *server.Server) {

  1. r := serverplugin.NewMDNSRegisterPlugin("tcp@"+*addr, 8972, metrics.NewRegistry(), time.Minute, "")
  2. err := r.Start()
  3. if err != nil {
  4. log.Fatal(err)
  5. }
  6. s.Plugins.Add(r)

}

  1. Configuration for Client is simple.
  2. You need to set the etcd address and the same basePath to service.
  3. ```go client.go
  4. d := client.NewMDNSDiscovery("Arith", 10*time.Second, 10*time.Second, "")
  5. xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
  6. defer xclient.Close()

In process

Example:inprocess

Inprocess registry is a registry for test purpose. You won't use it in production.

When you develop clients, you want to create mock services to integration test. You you can use Inprocess registry and change it to other registries when clients are deployed in production environment.

This plugin uses reflect to invoke services in the same process.

You add client.InprocessClient into services' plugin container. And clients use InpreocessDiscovery to find registered services to invoke.

```go server.gofunc main() { flag.Parse()

  1. s := server.NewServer()
  2. addRegistryPlugin(s)
  3. s.RegisterName("Arith", new(example.Arith), "")
  4. go func() {
  5. s.Serve("tcp", *addr)
  6. }()
  7. d := client.NewInpreocessDiscovery()
  8. xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
  9. defer xclient.Close()
  10. args := &example.Args{
  11. A: 10,
  12. B: 20,
  13. }
  14. for i := 0; i < 100; i++ {
  15. reply := &example.Reply{}
  16. err := xclient.Call(context.Background(), "Mul", args, reply)
  17. if err != nil {
  18. log.Fatalf("failed to call: %v", err)
  19. }
  20. log.Printf("%d * %d = %d", args.A, args.B, reply.C)
  21. }

}

func addRegistryPlugin(s *server.Server) {

  1. r := client.InprocessClient
  2. s.Plugins.Add(r)

}```

By smallnest updated 2018-12-04 11:47:26