开发进阶

针对于短连接而言,每一次发送接收数据即关闭连接,连接的处理逻辑比较简单,当然通信效率也会比较低。在大多数的TCP通信场景中,往往是使用长连接操作,并采用异步全双工的TCP通信模式,即所有的通信完全是异步。在这种场景下, gtcp.Conn 链接对象可能同时处于多个读写操作( gtcp.Conn 对象的数据读写操作是并发安全的),因此 SendRecv 操作在逻辑上将会失效。因为当你在同一逻辑操作中发送完毕数据之后,随后立即获取数据有可能得到的是其他写操作的结果。

无论服务端还是客户端,都需要在独立的异步循环中封装使用 Recv* 方法获取数据并通过 switch...case... 处理数据(仅在一个 goroutine 中全权负责读取数据),根据请求数据进行业务处理的转发。

连接对象-通信开发进阶 - 图1提示

也就是说, Send*/ Recv* 方法是并发安全的,但是发送数据时要一次性发送。由于支持异步并发写, gtcp.Conn 对象不带任何缓冲实现。

使用示例

我们通过一个完成的示例来说明一下如何在程序中实现异步全双工通信,完成示例代码位于: https://github.com/gogf/gf/v2/tree/master/.example/net/gtcp/pkg_operations/common

  1. types/types.go

定义通信的数据格式,随后我们可以使用 SendPkg/ RecvPkg 方法来通信。

考虑到简化测试代码复杂度,因此这里使用 JSON 数据格式来传递数据。在一些对于消息包大小比较严格的场景中, 数据 字段可以自行按照二进制进行封装解析设计。此外,需要注意的是,即使使用 JSON 数据格式,其中的 Act 字段往往定义常量来实现,大部分场景中使用 uint8 类型即可,以减小消息包大小,这里偷一下懒,直接使用字符串,以便演示。

  1. package types
  2. type Msg struct {
  3. Act string // 操作
  4. Data string // 数据
  5. }
  1. funcs/funcs.go

自定义数据格式的发送/获取定义,便于数据结构编码/解析。

  1. package funcs
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/gogf/gf/v2/net/gtcp"
  6. "github.com/gogf/gf/.example/net/gtcp/pkg_operations/common/types"
  7. )
  8. // 自定义格式发送消息包
  9. func SendPkg(conn *gtcp.Conn, act string, data...string) error {
  10. s := ""
  11. if len(data) > 0 {
  12. s = data[0]
  13. }
  14. msg, err := json.Marshal(types.Msg{
  15. Act : act,
  16. Data : s,
  17. })
  18. if err != nil {
  19. panic(err)
  20. }
  21. return conn.SendPkg(msg)
  22. }
  23. // 自定义格式接收消息包
  24. func RecvPkg(conn *gtcp.Conn) (msg *types.Msg, err error) {
  25. if data, err := conn.RecvPkg(); err != nil {
  26. return nil, err
  27. } else {
  28. msg = &types.Msg{}
  29. err = json.Unmarshal(data, msg)
  30. if err != nil {
  31. return nil, fmt.Errorf("invalid package structure: %s", err.Error())
  32. }
  33. return msg, err
  34. }
  35. }
  1. gtcp_common_server.go

通信服务端。在该示例中,服务端并不主动断开连接,而是在 10 秒后向客户端发送 doexit 消息,通知客户端主动断开连接,以结束示例。

  1. package main
  2. import (
  3. "github.com/gogf/gf/v2/net/gtcp"
  4. "github.com/gogf/gf/v2/os/glog"
  5. "github.com/gogf/gf/v2/os/gtimer"
  6. "github.com/gogf/gf/.example/net/gtcp/pkg_operations/common/funcs"
  7. "github.com/gogf/gf/.example/net/gtcp/pkg_operations/common/types"
  8. "time"
  9. )
  10. func main() {
  11. gtcp.NewServer("127.0.0.1:8999", func(conn *gtcp.Conn) {
  12. defer conn.Close()
  13. // 测试消息, 10秒后让客户端主动退出
  14. gtimer.SetTimeout(10*time.Second, func() {
  15. funcs.SendPkg(conn, "doexit")
  16. })
  17. for {
  18. msg, err := funcs.RecvPkg(conn)
  19. if err != nil {
  20. if err.Error() == "EOF" {
  21. glog.Println("client closed")
  22. }
  23. break
  24. }
  25. switch msg.Act {
  26. case "hello": onClientHello(conn, msg)
  27. case "heartbeat": onClientHeartBeat(conn, msg)
  28. default:
  29. glog.Errorf("invalid message: %v", msg)
  30. break
  31. }
  32. }
  33. }).Run()
  34. }
  35. func onClientHello(conn *gtcp.Conn, msg *types.Msg) {
  36. glog.Printf("hello message from [%s]: %s", conn.RemoteAddr().String(), msg.Data)
  37. funcs.SendPkg(conn, msg.Act, "Nice to meet you!")
  38. }
  39. func onClientHeartBeat(conn *gtcp.Conn, msg *types.Msg) {
  40. glog.Printf("heartbeat from [%s]", conn.RemoteAddr().String())
  41. }
  1. gtcp_common_client.go

通信客户端,可以看到代码结构和服务端差不多,数据获取独立处于 for 循环中,每个业务逻辑发送消息包时直接使用 SendPkg 方法进行发送。

心跳消息常用 gtimer 定时器实现,在该示例中,客户端每隔 1 秒主动向服务端发送心跳消息,在 3 秒后向服务端发送 hello 测试消息。这些都是由 gtimer 定时器实现的,定时器在TCP通信中比较常见。

客户端连接 10 秒后,服务端会给客户端发送 doexit 消息,客户端收到该消息后便主动断开连接,长连接结束。

  1. package main
  2. import (
  3. "github.com/gogf/gf/v2/net/gtcp"
  4. "github.com/gogf/gf/v2/os/glog"
  5. "github.com/gogf/gf/v2/os/gtimer"
  6. "github.com/gogf/gf/.example/net/gtcp/pkg_operations/common/funcs"
  7. "github.com/gogf/gf/.example/net/gtcp/pkg_operations/common/types"
  8. "time"
  9. )
  10. func main() {
  11. conn, err := gtcp.NewConn("127.0.0.1:8999")
  12. if err != nil {
  13. panic(err)
  14. }
  15. defer conn.Close()
  16. // 心跳消息
  17. gtimer.SetInterval(time.Second, func() {
  18. if err := funcs.SendPkg(conn, "heartbeat"); err != nil {
  19. panic(err)
  20. }
  21. })
  22. // 测试消息, 3秒后向服务端发送hello消息
  23. gtimer.SetTimeout(3*time.Second, func() {
  24. if err := funcs.SendPkg(conn, "hello", "My name's John!"); err != nil {
  25. panic(err)
  26. }
  27. })
  28. for {
  29. msg, err := funcs.RecvPkg(conn)
  30. if err != nil {
  31. if err.Error() == "EOF" {
  32. glog.Println("server closed")
  33. }
  34. break
  35. }
  36. switch msg.Act {
  37. case "hello": onServerHello(conn, msg)
  38. case "doexit": onServerDoExit(conn, msg)
  39. case "heartbeat": onServerHeartBeat(conn, msg)
  40. default:
  41. glog.Errorf("invalid message: %v", msg)
  42. break
  43. }
  44. }
  45. }
  46. func onServerHello(conn *gtcp.Conn, msg *types.Msg) {
  47. glog.Printf("hello response message from [%s]: %s", conn.RemoteAddr().String(), msg.Data)
  48. }
  49. func onServerHeartBeat(conn *gtcp.Conn, msg *types.Msg) {
  50. glog.Printf("heartbeat from [%s]", conn.RemoteAddr().String())
  51. }
  52. func onServerDoExit(conn *gtcp.Conn, msg *types.Msg) {
  53. glog.Printf("exit command from [%s]", conn.RemoteAddr().String())
  54. conn.Close()
  55. }
  1. 执行后

    • 服务端输出结果

      1. 2019-05-03 14:59:13.732 heartbeat from [127.0.0.1:51220]
      2. 2019-05-03 14:59:14.732 heartbeat from [127.0.0.1:51220]
      3. 2019-05-03 14:59:15.733 heartbeat from [127.0.0.1:51220]
      4. 2019-05-03 14:59:15.733 hello message from [127.0.0.1:51220]: My name's John!
      5. 2019-05-03 14:59:16.731 heartbeat from [127.0.0.1:51220]
      6. 2019-05-03 14:59:17.733 heartbeat from [127.0.0.1:51220]
      7. 2019-05-03 14:59:18.731 heartbeat from [127.0.0.1:51220]
      8. 2019-05-03 14:59:19.730 heartbeat from [127.0.0.1:51220]
      9. 2019-05-03 14:59:20.732 heartbeat from [127.0.0.1:51220]
      10. 2019-05-03 14:59:21.732 heartbeat from [127.0.0.1:51220]
      11. 2019-05-03 14:59:22.698 client closed
    • 客户端输出结果

      1. 2019-05-03 14:59:15.733 hello response message from [127.0.0.1:8999]: Nice to meet you!
      2. 2019-05-03 14:59:22.698 exit command from [127.0.0.1:8999]