RPCX 起步

rpcx是一个分布式的服务框架,致力于提供一个产品级的、高性能、透明化的RPC远程服务调用框架。它参考了目前在国内非常流行的Java生态圈的RPC框架Dubbo、Motan等,为Go生态圈提供一个丰富功能的RPC平台。

随着互联网的发展,网站应用的规模不断扩大,常规的垂直应用架构已无法应对,分布式服务架构以及流动计算架构势在必行,亟需一个治理系统确保架构有条不紊的演进。

目前,随着网站的规模的扩大,一般会将单体程序逐渐演化为微服务的架构方式,这是目前流行的一种架构模式。

RPCX起步 - 图1

即使不是微服务,也会将业务拆分成不同的服务的方式,服务之间互相调用。

那么,如何实现服务(微服务)之间的调用的?一般来说最常用的是两种方式: RESTful API和RPC两种服务通讯方式。本书的第一章就介绍了这两种方式的特点和优缺点,那么本书重点介绍的是RPC的方式。

RPCX就是为Go生态圈提供的一个全功能的RPC框架,它参考了国内电商圈流行的RPC框架Dubbo的功能特性,实现了一个高性能的、可容错的,插件式的RPC框架。

RPCX的目标:

  1. 简单: 易于学习、易于开发、易于集成和易于发布
  2. 高性能:远远高于grpc-go, 更不用说dubbo和motan
  3. 服务发现和服务治理:方便开发大规模的微服务集群
  4. 跨平台: rpcx 3.0底层不再使用标准rpc库,而是采用跨平台的二进制协议,高效但是方便多语言开发

它的特点包括:

1、指出纯的go方法, 不需要额外的定义
2、可插拔的设计,可以方便扩展服务发现插件、tracing等
3、支持 TCP、HTTP、QUIC、KCP等协议
4、支持多种编码方式, 比如JSON、Protobuf、MessagePack 和 原始字节数据
5、服务发现支持 单机对单机、单机对多机、zookeeper、etcd、consul、mDNS等多种发现方式
6、容错支持 Failover、Failfast、Failtry等多种模式
7、负载均衡支持随机选取、顺序选取、一致性哈希、基于权重的选取、基于网络质量的选取和就近选取等多种均衡方式
8、支持压缩
9、支持扩展信息传递(元数据)
10、支持身份验证
11、支持自动heartbeat和单向请求
12、支持metrics、log、timeout、别名、断路器、TLS等特性

本章就让我们举两个的例子,看看如何利用rpcx进行开发。

端对端的例子

首先让我们看一个简单的例子,一个服务器和一个客户端。
这并不是一个常用的应用场景,因为部署的规模太小,其实可以直接官方的库或者其它的RPC框架如gRPC、Thrift就可以实现,当然使用rpcx也很简单,我们就以这个例子,先熟悉一下rpcx的开发。

本书中常用的一个例子就是提供一个乘法的服务,客户端提供两个数,服务器计算这两个数的乘积返回。

服务器端开发

首先,我们需要实现自己的服务,这很简单,就是定义普通的方法即可:

  1. type Args struct {
  2. A int
  3. B int
  4. }
  5. type Reply struct {
  6. C int
  7. }
  8. type Arith int
  9. func (t *Arith) Mul(ctx context.Context, args *Args, reply *Reply) error {
  10. reply.C = args.A * args.B
  11. return nil
  12. }

Args作为传入的参数,它的两个字段AB代表两个乘数。
ReplyC代表返回的结果。
Mul就是业务方法,对乘数进行相乘,然后返回结果。

然后注册这个服务启动就可以了:

  1. func main() {
  2. s := server.NewServer()
  3. s.RegisterName("Arith", new(example.Arith), "")
  4. s.Serve("tcp", *addr)
  5. }

三行代码就可以将一个方法暴露成一个服务, 它使用默认的编码和传输方式(TCP)。但是如果你想使用定制的服务器,你可以使用rpcx.NewServer生成一个新的服务器对象。

客户端同步调用

客户端代码也很简单,首先你需要定义一些访问服务器的一些策略以及服务器的一些信息,然后创建一个XClient对象,并且在程序不再使用这个对象的时候关闭它:

  1. d := client.NewPeer2PeerDiscovery("tcp@"+*addr, "")
  2. xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
  3. defer xclient.Close()

NewXClient第一个参数是服务路径,在Go实现的服务中就是服务端注册的服务名,Java实现的服务器可以是类名全路径。
第二个参数是要调用的方法名。
第三个参数是容错模式,这个使用的是同一个服务器多次重试, 默认重试三次。
第四个参数是在多个服务器同时存在的情况下的负载均衡模式。因为我们这个例子就一个服务器,所以也无所谓随机了,总是选择我们的那一个服务器。
第五个参数是服务器的信息,这里我们使用的是点对点的服务发现模式, 如果是其它的服务模式,比如etcd,那么就要提供etcd的相关信息。
第六个参数可以提供一些额外的信息,比如编码模式、TLS信息等。

客户端所使用的数据结构(只需请求类型和返回类型,不需要服务的定义)和服务端一样。如果客户端和服务器端在一个工程里,所以你可以访问,它们可以共享一套数据结构。但是有些情况下服务方只提供一个说明文档,数据结构还得客户端自己定义:

  1. type Args struct { A int `msg:"a"` B int `msg:"b"`}
  2. type Reply struct { C int `msg:"c"`}

同步调用的代码如下:

  1. args := &example.Args{
  2. A: 10,
  3. B: 20,
  4. }
  5. reply := &example.Reply{}
  6. err := xclient.Call(context.Background(), "Mul", args, reply)
  7. if err != nil {
  8. log.Fatalf("failed to call: %v", err)
  9. }
  10. log.Printf("%d * %d = %d", args.A, args.B, reply.C)

Call调用是一个同步的调用。

输出结果为:

  1. 10 * 20 = 200

客户端异步调用

客户端异步调用其实是使用Go方法,从返回的对象Done字段中可以得到返回的结果信息:

  1. reply := &example.Reply{}
  2. call, err := xclient.Go(context.Background(), "Mul", args, reply, nil)
  3. if err != nil {
  4. log.Fatalf("failed to call: %v", err)
  5. }
  6. replyCall := <-call.Done
  7. if replyCall.Error != nil {
  8. log.Fatalf("failed to call: %v", replyCall.Error)
  9. } else {
  10. log.Printf("%d * %d = %d", args.A, args.B, reply.C)
  11. }

开发起来是不是超简单?

多服务调用

我们再看一个复杂点的例子,这个例子中我们部署了两个相同的服务器,这两个服务注册的名字相同,都是计算两个数的乘积。为了区分客户端调用不同的服务,我们故意为一个服务器的乘积放大了十倍,便于我们观察调用的结果。

数据结构如下:

  1. type Args struct { A int `msg:"a"` B int `msg:"b"`}
  2. type Reply struct { C int `msg:"c"`}
  3. type Arith1 int
  4. func (t *Arith1) Mul(ctx context.Context, args *Args, reply *Reply) error {
  5. reply.C = args.A * args.B
  6. return nil
  7. }
  8. type Arith2 int
  9. func (t *Arith2) Mul(ctx context.Context, args *Args, reply *Reply) error {
  10. reply.C = args.A * args.B * 10
  11. return nil
  12. }

Arith2的Mul方法中我们将计算结果放大了10倍,所以如果传入两个参数10和20,它返回的结果是2000,而Arith返回200。

服务器端的代码

在服务器端我们启动两个服务器,每个服务器都注册了相同名称的一个服务Arith,它们分别监听本地的8972和8973端口:

  1. func main() {
  2. go func() {
  3. s := server.NewServer()
  4. s.RegisterName("Arith", new(Arith1), "")
  5. s.Serve("tcp", "localhost:8972")
  6. }()
  7. go func() {
  8. s := server.NewServer()
  9. s.RegisterName("Arith", new(Arith2), "")
  10. s.Serve("tcp", "localhost:8973")
  11. }()
  12. select{}
  13. }

客户端代码

因为我们没有使用注册中心,所以这里客户端需要定义这两个服务器的信息,然后定义路由方式是随机选取(RandomSelect),调用十次看看服务器返回的结果:

  1. type Args struct { A int `msg:"a"` B int `msg:"b"`}
  2. type Reply struct { C int `msg:"c"`}
  3. func main() {
  4. d := client.NewMultipleServersDiscovery([]*client.KVPair{{Key: "tcp@localhost:8972"}, {Key: "tcp@localhost:8973"}})
  5. xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
  6. defer xclient.Close()
  7. args := &example.Args{
  8. A: 10,
  9. B: 20,
  10. }
  11. for i :=0; i < 10; i++ {
  12. reply := &example.Reply{}
  13. err := xclient.Call(context.Background(), "Mul", args, reply)
  14. if err != nil {
  15. log.Fatalf("failed to call: %v", err)
  16. }
  17. log.Printf("%d * %d = %d", args.A, args.B, reply.C)
  18. }
  19. }

输出结果,可以看到调用基本上随机的分布在两个服务器上。

是不是使用rpcx可以将你的本机方法很方便地转换成服务的调用? 在后面的章节中,我们还会使用这个例子演示更多的功能。

所以的例子都可以在 https://github.com/rpcx-ecosystem/rpcx-examples3 查看。