简洁的并发TCP服务器

尽管上节的并发 TCP 服务器运作良好,但是它还不能为实际应用提供服务。因此,在这节您将学到怎样把第四章中的 keyValue.go 文件对于复杂类型的使用转化为一个功能齐全的并发 TCP 应用。

为了能够和网络中的 key-value 存储交互,我们创建自定义的 TCP 协议。您将需要为 key-value 存储的每一个函数定义关键字。为简单起见,每个关键字都跟着相关数据。大多数命令的结果将是成功或失败消息。

设计您自己的 TCP 或 UDP 协议不是一个简单的工作。这意味着设计一个新协议时,您必须特别细致小心。这里的关键是在您开始编写生产代码之前文档化所有内容。

这个主题使用的工具命名为 kvTCP.go,它被分为六个部分。

kvTCP.go 的第一部分如下:

  1. package main
  2. import (
  3. "bufio"
  4. "encoding/gob"
  5. "fmt"
  6. "net"
  7. "os"
  8. "strings"
  9. )
  10. type myElement struct {
  11. Name string
  12. Surname string
  13. Id string
  14. }
  15. const welcome = "Welcome to the Key-value store!\n"
  16. var DATA = make(map[string]myElement)
  17. var DATAFILE = "/tmp/dataFile.gob"

kvTCP.go 的第二部分如下:

  1. func handleConnection(c net.Conn) {
  2. c.Write([]byte(welcome))
  3. for {
  4. netData, err := bufio.NewReader(c).ReadString('\n')
  5. if err != nil {
  6. fmt.Println(err)
  7. return
  8. }
  9. command := strings.TrimSpace(string(netData))
  10. tokens := strings.Fields(command)
  11. switch len(tokens) {
  12. case 0:
  13. continue
  14. case 1:
  15. tokens = append(tokens, "")
  16. tokens = append(tokens, "")
  17. tokens = append(tokens, "")
  18. tokens = append(tokens, "")
  19. case 2:
  20. tokens = append(tokens, "")
  21. tokens = append(tokens, "")
  22. tokens = append(tokens, "")
  23. case 3:
  24. tokens = append(tokens, "")
  25. tokens = append(tokens, "")
  26. case 4:
  27. tokens = append(tokens, "")
  28. }
  29. switch tokens[0] {
  30. case "STOP":
  31. err = save()
  32. if err != nil {
  33. fmt.Println(err)
  34. }
  35. c.Close()
  36. return
  37. case "PRINT":
  38. PRINT(c)
  39. case "DELETE":
  40. if !DELETE(tokens[1]) {
  41. netData := "Delete operation failed!\n"
  42. c.Write([]byte(netData))
  43. }else{
  44. netData := "Delete operation successful!\n"
  45. c.Write([]byte(netData))
  46. }
  47. case "ADD":
  48. n := myElement{tokens[2], tokens[3], tokens[4]}
  49. if !ADD(tokens[1], n) {
  50. netData := "Add operation failed!\n"
  51. c.Write([]byte(netData))
  52. } else {
  53. netData := "Add operation successful!\n"
  54. c.Write([]byte(netData))
  55. }
  56. err = save()
  57. if err != nil {
  58. fmt.Println(err)
  59. }
  60. case "LOOKUP":
  61. n := LOOKUP(tokens[1])
  62. if n != nil {
  63. netData := fmt.Sprintf("%v\n", *n)
  64. c.Write([]byte(netData))
  65. } else {
  66. netData := "Did not find key!\n"
  67. c.Write([]byte(netData))
  68. }
  69. case "CHANGE":
  70. n := myElement{tokens[2], tokens[3], tokens[4]}
  71. if !CHANGE(tokens[1], n) {
  72. netData := "Update operation failed!\n"
  73. c.Write([]byte(netData))
  74. } else {
  75. netData := "Update operation successful!\n"
  76. c.Write([]byte(netData))
  77. }
  78. err = save()
  79. if err != nil {
  80. fmt.Println(err)
  81. }
  82. default:
  83. netData := "Unknown command - please try again!\n"
  84. c.Write([]byte(netData))
  85. }
  86. }
  87. }

handleConnection() 函数和每个 TCP 客户端交互并解析客户端的输入。

kvTCP.go 的第三部分包含如下代码:

  1. func save() error {
  2. fmt.Println("Saving", DATAFILE)
  3. err := os.Remove(DATAFILE)
  4. if err != nil {
  5. fmt.Println(err)
  6. }
  7. saveTo, err := os.Create(DATAFILE)
  8. if err != nil {
  9. fmt.Println("Cannot create", DATAFILE)
  10. return err
  11. }
  12. defer saveTo.Close()
  13. encoder := gob.NewEncoder(saveTo)
  14. err = encoder.Encode(DATA)
  15. if err != nil {
  16. fmt.Println("Cannot save to", DATAFILE)
  17. return err
  18. }
  19. return nil
  20. }
  21. func load() error {
  22. fmt.Println("Loading", DATAFILE)
  23. loadFrom, err := os.Open(DATAFILE)
  24. defer loadFrom.Close()
  25. if err != nil {
  26. fmt.Println("Empty key/value store!")
  27. return err
  28. }
  29. decoder := gob.NewDecoder(loadFrom)
  30. decoder.Decode(&DATA)
  31. return nil
  32. }

kvTCP.go 的第四段如下:

  1. func ADD(k string, n myElement) bool {
  2. if k == "" {
  3. return false
  4. }
  5. if LOOKUP(k) == nil {
  6. DATA[k] = n
  7. return true
  8. }
  9. return false
  10. }
  11. func DELETE(k string) bool {
  12. if LOOKUP(k) != nil {
  13. delete(DATA, k)
  14. return true
  15. }
  16. return false
  17. }
  18. func LOOKUP(k string) *myElement {
  19. _, ok := DATA[k]
  20. if ok {
  21. n := DATA[k]
  22. return &n
  23. } else {
  24. return nil
  25. }
  26. }
  27. func CHANGE(k string, n myElement) bool {
  28. DATA[k] = n
  29. return true
  30. }

上面的这些函数实现与 keyValue.go 一样。它们没有直接和 TCP 客户端交互。

kvTCP.go 的第五部分包含代码如下:

  1. func PRINT(c net.Conn) {
  2. for k, d := range DATA {
  3. netData := fmt.Sprintf("key: %s value: %v\n", k, d)
  4. c.Write([]byte(netData))
  5. }
  6. }

PRINT() 函数直接发送数据给 TCP 客户端,一次一行。

这个程序的剩余代码如下:

  1. func main() {
  2. arguments := os.Args
  3. if len(arguments) == 1 {
  4. fmt.Println("Please provide a port number!")
  5. return
  6. }
  7. PORT := ":" + arguments[1]
  8. l, err := net.Listen("tcp", PORT)
  9. if err != nil {
  10. fmt.Println(err)
  11. return
  12. }
  13. defer l.Close()
  14. err = load()
  15. if err != nil {
  16. fmt.Println(err)
  17. }
  18. for {
  19. c, err := l.Accept()
  20. if err != nil {
  21. fmt.Println(err)
  22. os.Exit(100)
  23. }
  24. go handleConnection(c)
  25. }
  26. }

执行 kvTCP.go 将产生如下输出:

  1. $ go run kvTCP.go 9000
  2. Loading /tmp/dataFile.gob
  3. Empty key/value store!
  4. open /tmp/dataFile.gob: no such file or directory
  5. Saving /tmp/dataFile.gob
  6. remove /tmp/dataFile.gob: no such file or directory
  7. Saving /tmp/dataFile.gob
  8. Saving /tmp/dataFile.gob

为了这节的目的,netcat(l) 工具用来作为 kvTCP.go 的客户端:

  1. $ nc localhost 9000
  2. Welcome to the Key-value store!
  3. PRINT
  4. LOOKUP 1
  5. Did not find key!
  6. ADD 1 2 3 4
  7. Add operation successful!
  8. LOOKUP 1
  9. {2 3 4}
  10. ADD 4 -1 -2 -3
  11. Add operation successful!
  12. PRINT
  13. key: 1 value: {2 3 4}
  14. key: 4 value: {-1 -2 -3}
  15. STOP

kvTCP.go 文件是一个使用 goroutines 的并发应用,它能够同时服务多个 TCP 客户端。然而,所有的 TCP 客户端共享相同的数据!