1. RPC

1.1.1. RPC简介

  • 远程过程调用(Remote Procedure Call,RPC)是一个计算机通信协议
  • 该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外地为这个交互作用编程
  • 如果涉及的软件采用面向对象编程,那么远程过程调用亦可称作远程调用或远程方法调用

1.1.2. 流行RPC框架的对比

RPC - 图1

1.1.3. golang中如何实现RPC

  • golang中实现RPC非常简单,官方提供了封装好的库,还有一些第三方的库

  • golang官方的net/rpc库使用encoding/gob进行编解码,支持tcp和http数据传输方式,由于其他语言不支持gob编解码方式,所以golang的RPC只支持golang开发的服务器与客户端之间的交互

  • 官方还提供了net/rpc/jsonrpc库实现RPC方法,jsonrpc采用JSON进行数据编解码,因而支持跨语言调用,目前jsonrpc库是基于tcp协议实现的,暂不支持http传输方式

  • 例题:golang实现RPC程序,实现求矩形面积和周长

服务端

  1. package main
  2. import (
  3. "log"
  4. "net/http"
  5. "net/rpc"
  6. )
  7. //  例题:golang实现RPC程序,实现求矩形面积和周长
  8. type Params struct {
  9. Width, Height int
  10. }
  11. type Rect struct{}
  12. // RPC服务端方法,求矩形面积
  13. func (r *Rect) Area(p Params, ret *int) error {
  14. *ret = p.Height * p.Width
  15. return nil
  16. }
  17. // 周长
  18. func (r *Rect) Perimeter(p Params, ret *int) error {
  19. *ret = (p.Height + p.Width) * 2
  20. return nil
  21. }
  22. // 主函数
  23. func main() {
  24. // 1.注册服务
  25. rect := new(Rect)
  26. // 注册一个rect的服务
  27. rpc.Register(rect)
  28. // 2.服务处理绑定到http协议上
  29. rpc.HandleHTTP()
  30. // 3.监听服务
  31. err := http.ListenAndServe(":8000", nil)
  32. if err != nil {
  33. log.Panicln(err)
  34. }
  35. }

客户端

  1. package main
  2. import (
  3. "fmt"
  4. "log"
  5. "net/rpc"
  6. )
  7. // 传的参数
  8. type Params struct {
  9. Width, Height int
  10. }
  11. // 主函数
  12. func main() {
  13. // 1.连接远程rpc服务
  14. conn, err := rpc.DialHTTP("tcp", ":8000")
  15. if err != nil {
  16. log.Fatal(err)
  17. }
  18. // 2.调用方法
  19. // 面积
  20. ret := 0
  21. err2 := conn.Call("Rect.Area", Params{50, 100}, &ret)
  22. if err2 != nil {
  23. log.Fatal(err2)
  24. }
  25. fmt.Println("面积:", ret)
  26. // 周长
  27. err3 := conn.Call("Rect.Perimeter", Params{50, 100}, &ret)
  28. if err3 != nil {
  29. log.Fatal(err3)
  30. }
  31. fmt.Println("周长:", ret)
  32. }
  • golang写RPC程序,必须符合4个基本条件,不然RPC用不了

    • 结构体字段首字母要大写,可以别人调用

    • 函数名必须首字母大写

    • 函数第一参数是接收参数,第二个参数是返回给客户端的参数,必须是指针类型

    • 函数还必须有一个返回值error

  • 练习:模仿前面例题,自己实现RPC程序,服务端接收2个参数,可以做乘法运算,也可以做商和余数的运算,客户端进行传参和访问,得到结果如下:

服务端代码:

  1. package main
  2. import (
  3. "errors"
  4. "log"
  5. "net/http"
  6. "net/rpc"
  7. )
  8. // 结构体,用于注册的
  9. type Arith struct{}
  10. // 声明参数结构体
  11. type ArithRequest struct {
  12. A, B int
  13. }
  14. // 返回给客户端的结果
  15. type ArithResponse struct {
  16. // 乘积
  17. Pro int
  18. // 商
  19. Quo int
  20. // 余数
  21. Rem int
  22. }
  23. // 乘法
  24. func (this *Arith) Multiply(req ArithRequest, res *ArithResponse) error {
  25. res.Pro = req.A * req.B
  26. return nil
  27. }
  28. // 商和余数
  29. func (this *Arith) Divide(req ArithRequest, res *ArithResponse) error {
  30. if req.B == 0 {
  31. return errors.New("除数不能为0")
  32. }
  33. // 除
  34. res.Quo = req.A / req.B
  35. // 取模
  36. res.Rem = req.A % req.B
  37. return nil
  38. }
  39. // 主函数
  40. func main() {
  41. // 1.注册服务
  42. rect := new(Arith)
  43. // 注册一个rect的服务
  44. rpc.Register(rect)
  45. // 2.服务处理绑定到http协议上
  46. rpc.HandleHTTP()
  47. // 3.监听服务
  48. err := http.ListenAndServe(":8000", nil)
  49. if err != nil {
  50. log.Fatal(err)
  51. }
  52. }

客户端代码:

  1. package main
  2. import (
  3. "fmt"
  4. "log"
  5. "net/rpc"
  6. )
  7. type ArithRequest struct {
  8. A, B int
  9. }
  10. // 返回给客户端的结果
  11. type ArithResponse struct {
  12. // 乘积
  13. Pro int
  14. // 商
  15. Quo int
  16. // 余数
  17. Rem int
  18. }
  19. func main() {
  20. conn, err := rpc.DialHTTP("tcp", ":8000")
  21. if err != nil {
  22. log.Fatal(err)
  23. }
  24. req := ArithRequest{9, 2}
  25. var res ArithResponse
  26. err2 := conn.Call("Arith.Multiply", req, &res)
  27. if err2 != nil {
  28. log.Fatal(err2)
  29. }
  30. fmt.Printf("%d * %d = %d\n", req.A, req.B, res.Pro)
  31. err3 := conn.Call("Arith.Divide", req, &res)
  32. if err3 != nil {
  33. log.Fatal(err3)
  34. }
  35. fmt.Printf("%d / %d 商 %d,余数 = %d\n", req.A, req.B, res.Quo, res.Rem)
  36. }

另外,net/rpc/jsonrpc库通过json格式编解码,支持跨语言调用

服务端代码:

  1. package main
  2. import (
  3. "fmt"
  4. "log"
  5. "net"
  6. "net/rpc"
  7. "net/rpc/jsonrpc"
  8. )
  9. type Params struct {
  10. Width, Height int
  11. }
  12. type Rect struct {
  13. }
  14. func (r *Rect) Area(p Params, ret *int) error {
  15. *ret = p.Width * p.Height
  16. return nil
  17. }
  18. func (r *Rect) Perimeter(p Params, ret *int) error {
  19. *ret = (p.Height + p.Width) * 2
  20. return nil
  21. }
  22. func main() {
  23. rpc.Register(new(Rect))
  24. lis, err := net.Listen("tcp", ":8080")
  25. if err != nil {
  26. log.Panicln(err)
  27. }
  28. for {
  29. conn, err := lis.Accept()
  30. if err != nil {
  31. continue
  32. }
  33. go func(conn net.Conn) {
  34. fmt.Println("new client")
  35. jsonrpc.ServeConn(conn)
  36. }(conn)
  37. }
  38. }

客户端代码:

  1. package main
  2. import (
  3. "fmt"
  4. "log"
  5. "net/rpc/jsonrpc"
  6. )
  7. type Params struct {
  8. Width, Height int
  9. }
  10. func main() {
  11. conn, err := jsonrpc.Dial("tcp", ":8080")
  12. if err != nil {
  13. log.Panicln(err)
  14. }
  15. ret := 0
  16. err2 := conn.Call("Rect.Area", Params{50, 100}, &ret)
  17. if err2 != nil {
  18. log.Panicln(err2)
  19. }
  20. fmt.Println("面积:", ret)
  21. err3 := conn.Call("Rect.Perimeter", Params{50, 100}, &ret)
  22. if err3 != nil {
  23. log.Panicln(err3)
  24. }
  25. fmt.Println("周长:", ret)
  26. }

1.1.4. RPC调用流程

  • 微服务架构下数据交互一般是对内 RPC,对外 REST
  • 将业务按功能模块拆分到各个微服务,具有提高项目协作效率、降低模块耦合度、提高系统可用性等优点,但是开发门槛比较高,比如 RPC 框架的使用、后期的服务监控等工作
  • 一般情况下,我们会将功能代码在本地直接调用,微服务架构下,我们需要将这个函数作为单独的服务运行,客户端通过网络调用

1.1.5. 网络传输数据格式

  • 两端要约定好数据包的格式
  • 成熟的RPC框架会有自定义传输协议,这里网络传输格式定义如下,前面是固定长度消息头,后面是变长消息体

RPC - 图2

  • 自己定义数据格式的读写
  1. package rpc
  2. import (
  3. "encoding/binary"
  4. "io"
  5. "net"
  6. )
  7. // 测试网络中读写数据的情况
  8. // 会话连接的结构体
  9. type Session struct {
  10. conn net.Conn
  11. }
  12. // 构造方法
  13. func NewSession(conn net.Conn) *Session {
  14. return &Session{conn: conn}
  15. }
  16. // 向连接中去写数据
  17. func (s *Session) Write(data []byte) error {
  18. // 定义写数据的格式
  19. // 4字节头部 + 可变体的长度
  20. buf := make([]byte, 4+len(data))
  21. // 写入头部,记录数据长度
  22. binary.BigEndian.PutUint32(buf[:4], uint32(len(data)))
  23. // 将整个数据,放到4后边
  24. copy(buf[4:], data)
  25. _, err := s.conn.Write(buf)
  26. if err != nil {
  27. return err
  28. }
  29. return nil
  30. }
  31. // 从连接读数据
  32. func (s *Session) Read() ([]byte, error) {
  33. // 读取头部记录的长度
  34. header := make([]byte, 4)
  35. // 按长度读取消息
  36. _, err := io.ReadFull(s.conn, header)
  37. if err != nil {
  38. return nil, err
  39. }
  40. // 读取数据
  41. dataLen := binary.BigEndian.Uint32(header)
  42. data := make([]byte, dataLen)
  43. _, err = io.ReadFull(s.conn, data)
  44. if err != nil {
  45. return nil, err
  46. }
  47. return data, nil
  48. }

测试类

  1. package rpc
  2. import (
  3. "fmt"
  4. "net"
  5. "sync"
  6. "testing"
  7. )
  8. func TestSession_ReadWriter(t *testing.T) {
  9. // 定义地址
  10. addr := "127.0.0.1:8000"
  11. my_data := "hello"
  12. // 等待组定义
  13. wg := sync.WaitGroup{}
  14. wg.Add(2)
  15. // 写数据的协程
  16. go func() {
  17. defer wg.Done()
  18. lis, err := net.Listen("tcp", addr)
  19. if err != nil {
  20. t.Fatal(err)
  21. }
  22. conn, _ := lis.Accept()
  23. s := Session{conn: conn}
  24. err = s.Write([]byte(my_data))
  25. if err != nil {
  26. t.Fatal(err)
  27. }
  28. }()
  29. // 读数据的协程
  30. go func() {
  31. defer wg.Done()
  32. conn, err := net.Dial("tcp", addr)
  33. if err != nil {
  34. t.Fatal(err)
  35. }
  36. s := Session{conn: conn}
  37. data, err := s.Read()
  38. if err != nil {
  39. t.Fatal(err)
  40. }
  41. // 最后一层校验
  42. if string(data) != my_data {
  43. t.Fatal(err)
  44. }
  45. fmt.Println(string(data))
  46. }()
  47. wg.Wait()
  48. }

编码解码

  1. package rpc
  2. import (
  3. "bytes"
  4. "encoding/gob"
  5. )
  6. // 定义RPC交互的数据结构
  7. type RPCData struct {
  8. // 访问的函数
  9. Name string
  10. // 访问时的参数
  11. Args []interface{}
  12. }
  13. // 编码
  14. func encode(data RPCData) ([]byte, error) {
  15. //得到字节数组的编码器
  16. var buf bytes.Buffer
  17. bufEnc := gob.NewEncoder(&buf)
  18. // 编码器对数据编码
  19. if err := bufEnc.Encode(data); err != nil {
  20. return nil, err
  21. }
  22. return buf.Bytes(), nil
  23. }
  24. // 解码
  25. func decode(b []byte) (RPCData, error) {
  26. buf := bytes.NewBuffer(b)
  27. // 得到字节数组解码器
  28. bufDec := gob.NewDecoder(buf)
  29. // 解码器对数据节码
  30. var data RPCData
  31. if err := bufDec.Decode(&data); err != nil {
  32. return data, err
  33. }
  34. return data, nil
  35. }

1.1.6. 实现RPC服务端

  • 服务端接收到的数据需要包括什么?
    • 调用的函数名、参数列表,还有一个返回值error类型
  • 服务端需要解决的问题是什么?

    • Map维护客户端传来调用函数,服务端知道去调谁
  • 服务端的核心功能有哪些?

    • 维护函数map
    • 客户端传来的东西进行解析
    • 函数的返回值打包,传给客户端
  1. package rpc
  2. import (
  3. "fmt"
  4. "net"
  5. "reflect"
  6. )
  7. // 声明服务端
  8. type Server struct {
  9. // 地址
  10. addr string
  11. // map 用于维护关系的
  12. funcs map[string]reflect.Value
  13. }
  14. // 构造方法
  15. func NewServer(addr string) *Server {
  16. return &Server{addr: addr, funcs: make(map[string]reflect.Value)}
  17. }
  18. // 服务端需要一个注册Register
  19. // 第一个参数函数名,第二个传入真正的函数
  20. func (s *Server) Register(rpcName string, f interface{}) {
  21. // 维护一个map
  22. // 若map已经有键了
  23. if _, ok := s.funcs[rpcName]; ok {
  24. return
  25. }
  26. // 若map中没值,则将映射加入map,用于调用
  27. fVal := reflect.ValueOf(f)
  28. s.funcs[rpcName] = fVal
  29. }
  30. // 服务端等待调用的方法
  31. func (s *Server) Run() {
  32. // 监听
  33. lis, err := net.Listen("tcp", s.addr)
  34. if err != nil {
  35. fmt.Printf("监听 %s err :%v", s.addr, err)
  36. return
  37. }
  38. for {
  39. // 服务端循环等待调用
  40. conn, err := lis.Accept()
  41. if err != nil {
  42. return
  43. }
  44. serSession := NewSession(conn)
  45. // 使用RPC方式读取数据
  46. b, err := serSession.Read()
  47. if err != nil {
  48. return
  49. }
  50. // 数据解码
  51. rpcData, err := decode(b)
  52. if err != nil {
  53. return
  54. }
  55. // 根据读到的name,得到要调用的函数
  56. f, ok := s.funcs[rpcData.Name]
  57. if !ok {
  58. fmt.Println("函数 %s 不存在", rpcData.Name)
  59. return
  60. }
  61. // 遍历解析客户端传来的参数,放切片里
  62. inArgs := make([]reflect.Value, 0, len(rpcData.Args))
  63. for _, arg := range rpcData.Args {
  64. inArgs = append(inArgs, reflect.ValueOf(arg))
  65. }
  66. // 反射调用方法
  67. // 返回Value类型,用于给客户端传递返回结果,out是所有的返回结果
  68. out := f.Call(inArgs)
  69. // 遍历out ,用于返回给客户端,存到一个切片里
  70. outArgs := make([]interface{}, 0, len(out))
  71. for _, o := range out {
  72. outArgs = append(outArgs, o.Interface())
  73. }
  74. // 数据编码,返回给客户端
  75. respRPCData := RPCData{rpcData.Name, outArgs}
  76. bytes, err := encode(respRPCData)
  77. if err != nil {
  78. return
  79. }
  80. // 将服务端编码后的数据,写出到客户端
  81. err = serSession.Write(bytes)
  82. if err != nil {
  83. return
  84. }
  85. }
  86. }

1.1.7. 实现RPC客户端

  • 客户端只有函数原型,使用reflect.MakeFunc() 可以完成原型到函数的调用

  • reflect.MakeFunc()是Client从函数原型到网络调用的关键

  1. package rpc
  2. import (
  3. "net"
  4. "reflect"
  5. )
  6. // 声明服务端
  7. type Client struct {
  8. conn net.Conn
  9. }
  10. // 构造方法
  11. func NewClient(conn net.Conn) *Client {
  12. return &Client{conn: conn}
  13. }
  14. // 实现通用的RPC客户端
  15. // 传入访问的函数名
  16. // fPtr指向的是函数原型
  17. //var select fun xx(User)
  18. //cli.callRPC("selectUser",&select)
  19. func (c *Client) callRPC(rpcName string, fPtr interface{}) {
  20. // 通过反射,获取fPtr未初始化的函数原型
  21. fn := reflect.ValueOf(fPtr).Elem()
  22. // 需要另一个函数,作用是对第一个函数参数操作
  23. f := func(args []reflect.Value) []reflect.Value {
  24. // 处理参数
  25. inArgs := make([]interface{}, 0, len(args))
  26. for _, arg := range args {
  27. inArgs = append(inArgs, arg.Interface())
  28. }
  29. // 连接
  30. cliSession := NewSession(c.conn)
  31. // 编码数据
  32. reqRPC := RPCData{Name: rpcName, Args: inArgs}
  33. b, err := encode(reqRPC)
  34. if err != nil {
  35. panic(err)
  36. }
  37. // 写数据
  38. err = cliSession.Write(b)
  39. if err != nil {
  40. panic(err)
  41. }
  42. // 服务端发过来返回值,此时应该读取和解析
  43. respBytes, err := cliSession.Read()
  44. if err != nil {
  45. panic(err)
  46. }
  47. // 解码
  48. respRPC, err := decode(respBytes)
  49. if err != nil {
  50. panic(err)
  51. }
  52. // 处理服务端返回的数据
  53. outArgs := make([]reflect.Value, 0, len(respRPC.Args))
  54. for i, arg := range respRPC.Args {
  55. // 必须进行nil转换
  56. if arg == nil {
  57. // reflect.Zero()会返回类型的零值的value
  58. // .out()会返回函数输出的参数类型
  59. outArgs = append(outArgs, reflect.Zero(fn.Type().Out(i)))
  60. continue
  61. }
  62. outArgs = append(outArgs, reflect.ValueOf(arg))
  63. }
  64. return outArgs
  65. }
  66. // 完成原型到函数调用的内部转换
  67. // 参数1是reflect.Type
  68. // 参数2 f是函数类型,是对于参数1 fn函数的操作
  69. // fn是定义,f是具体操作
  70. v := reflect.MakeFunc(fn.Type(), f)
  71. // 为函数fPtr赋值,过程
  72. fn.Set(v)
  73. }

1.1.8. 实现RPC通信测试

  • 给服务端注册一个查询用户的方法,客户端使用RPC方式调用
  1. package rpc
  2. import (
  3. "encoding/gob"
  4. "fmt"
  5. "net"
  6. "testing"
  7. )
  8. //  给服务端注册一个查询用户的方法,客户端使用RPC方式调用
  9. // 定义用户对象
  10. type User struct {
  11. Name string
  12. Age int
  13. }
  14. // 用于测试用户查询的方法
  15. func queryUser(uid int) (User, error) {
  16. user := make(map[int]User)
  17. // 假数据
  18. user[0] = User{"zs", 20}
  19. user[1] = User{"ls", 21}
  20. user[2] = User{"ww", 22}
  21. // 模拟查询用户
  22. if u, ok := user[uid]; ok {
  23. return u, nil
  24. }
  25. return User{}, fmt.Errorf("%d err", uid)
  26. }
  27. func TestRPC(t *testing.T) {
  28. // 编码中有一个字段是interface{}时,要注册一下
  29. gob.Register(User{})
  30. addr := "127.0.0.1:8000"
  31. // 创建服务端
  32. srv := NewServer(addr)
  33. // 将服务端方法,注册一下
  34. srv.Register("queryUser", queryUser)
  35. // 服务端等待调用
  36. go srv.Run()
  37. // 客户端获取连接
  38. conn, err := net.Dial("tcp", addr)
  39. if err != nil {
  40. fmt.Println("err")
  41. }
  42. // 创建客户端对象
  43. cli := NewClient(conn)
  44. // 需要声明函数原型
  45. var query func(int) (User, error)
  46. cli.callRPC("queryUser", &query)
  47. // 得到查询结果
  48. u, err := query(1)
  49. if err != nil {
  50. fmt.Println("err")
  51. }
  52. fmt.Println(u)
  53. }