如何优雅地关闭通道

在本文发表数日前,我曾写了一篇文章来解释通道的规则。 那篇文章在redditHN上获得了很多点赞,但也有很多人对Go通道的细节设计提出了一些批评意见。

这些批评主要针对于通道设计中的下列细节:

  1. 没有一个简单和通用的方法用来在不改变一个通道的状态的情况下检查这个通道是否已经关闭。
  2. 关闭一个已经关闭的通道将产生一个恐慌,所以在不知道一个通道是否已经关闭的时候关闭此通道是很危险的。
  3. 向一个已关闭的通道发送数据将产生一个恐慌,所以在不知道一个通道是否已经关闭的时候向此通道发送数据是很危险的。

这些批评看上去有几分道理(实际上属于对通道的不正确使用导致的偏见)。 是的,Go语言中并没有提供一个内置函数来检查一个通道是否已经关闭。

在Go中,如果我们能够保证从不会向一个通道发送数据,那么有一个简单的方法来判断此通道是否已经关闭。 此方法已经在上一篇文章通道用例大全中展示过了。 这里为了本文的连贯性,在下面的例子中重新列出了此方法。

  1. package main
  2. import "fmt"
  3. type T int
  4. func IsClosed(ch <-chan T) bool {
  5. select {
  6. case <-ch:
  7. return true
  8. default:
  9. }
  10. return false
  11. }
  12. func main() {
  13. c := make(chan T)
  14. fmt.Println(IsClosed(c)) // false
  15. close(c)
  16. fmt.Println(IsClosed(c)) // true
  17. }

如前所述,此方法并不是一个通用的检查通道是否已经关闭的方法。

事实上,即使有一个内置closed函数用来检查一个通道是否已经关闭,它的有用性也是十分有限的。 原因是当此函数的一个调用的结果返回时,被查询的通道的状态可能已经又改变了,导致此调用结果并不能反映出被查询的通道的最新状态。 虽然我们可以根据一个调用closed(ch)的返回结果为true而得出我们不应该再向通道ch发送数据的结论, 但是我们不能根据一个调用closed(ch)的返回结果为false而得出我们可以继续向通道ch发送数据的结论。

通道关闭原则

一个常用的使用Go通道的原则是不要在数据接收方或者在有多个发送者的情况下关闭通道。 换句话说,我们只应该让一个通道唯一的发送者关闭此通道。

下面我们将称此原则为通道关闭原则

当然,这并不是一个通用的关闭通道的原则。通用的原则是不要关闭已关闭的通道。 如果我们能够保证从某个时刻之后,再没有协程将向一个未关闭的非nil通道发送数据,则一个协程可以安全地关闭此通道。 然而,做出这样的保证常常需要很大的努力,从而导致代码过度复杂。 另一方面,遵循通道关闭原则是一件相对简单的事儿。

粗鲁地关闭通道的方法

如果由于某种原因,你一定非要从数据接收方或者让众多发送者中的一个关闭一个通道,你可以使用恢复机制来防止可能产生的恐慌而导致程序崩溃。 下面就是这样的一个实现(假设通道的元素类型为T)。

  1. func SafeClose(ch chan T) (justClosed bool) {
  2. defer func() {
  3. if recover() != nil {
  4. // 一个函数的返回结果可以在defer调用中修改。
  5. justClosed = false
  6. }
  7. }()
  8. // 假设ch != nil。
  9. close(ch) // 如果ch已关闭,则产生一个恐慌。
  10. return true // <=> justClosed = true; return
  11. }

此方法违反了通道关闭原则

同样的方法可以用来粗鲁地向一个关闭状态未知的通道发送数据。

  1. func SafeSend(ch chan T, value T) (closed bool) {
  2. defer func() {
  3. if recover() != nil {
  4. closed = true
  5. }
  6. }()
  7. ch <- value // 如果ch已关闭,则产生一个恐慌。
  8. return false // <=> closed = false; return
  9. }

这样的粗鲁方法不仅违反了通道关闭原则,而且Go白皮书和标准编译器不保证它的实现中不存在数据竞争

礼貌地关闭通道的方法

很多Go程序员喜欢使用sync.Once来关闭通道。

  1. type MyChannel struct {
  2. C chan T
  3. once sync.Once
  4. }
  5. func NewMyChannel() *MyChannel {
  6. return &MyChannel{C: make(chan T)}
  7. }
  8. func (mc *MyChannel) SafeClose() {
  9. mc.once.Do(func() {
  10. close(mc.C)
  11. })
  12. }

当然,我们也可以使用sync.Mutex来防止多次关闭一个通道。

  1. type MyChannel struct {
  2. C chan T
  3. closed bool
  4. mutex sync.Mutex
  5. }
  6. func NewMyChannel() *MyChannel {
  7. return &MyChannel{C: make(chan T)}
  8. }
  9. func (mc *MyChannel) SafeClose() {
  10. mc.mutex.Lock()
  11. defer mc.mutex.Unlock()
  12. if !mc.closed {
  13. close(mc.C)
  14. mc.closed = true
  15. }
  16. }
  17. func (mc *MyChannel) IsClosed() bool {
  18. mc.mutex.Lock()
  19. defer mc.mutex.Unlock()
  20. return mc.closed
  21. }

这些实现确实比上一节中的方法礼貌一些,但是它们不能完全有效地避免数据竞争。 目前的Go白皮书并不保证发生在一个通道上的并发关闭操作和发送操纵不会产生数据竞争。 如果一个SafeClose函数和同一个通道上的发送操作同时运行,则数据竞争可能发生(虽然这样的数据竞争一般并不会带来什么危害)。

优雅地关闭通道的方法

上一节中介绍的SafeSend函数有一个弊端,它的调用不能做为case操作而被使用在select代码块中。 另外,很多Go程序员(包括我)认为上面两节展示的关闭通道的方法不是很优雅。 本节下面将介绍一些在各种情形下使用纯通道操作来关闭通道的方法。

(为了演示程序的完整性,下面这些例子中使用到了sync.WaitGroup。在实践中,sync.WaitGroup并不是必需的。)

情形一:M个接收者和一个发送者。发送者通过关闭用来传输数据的通道来传递发送结束信号

这是最简单的一种情形。当发送者欲结束发送,让它关闭用来传输数据的通道即可。

  1. package main
  2. import (
  3. "time"
  4. "math/rand"
  5. "sync"
  6. "log"
  7. )
  8. func main() {
  9. rand.Seed(time.Now().UnixNano())
  10. log.SetFlags(0)
  11. // ...
  12. const Max = 100000
  13. const NumReceivers = 100
  14. wgReceivers := sync.WaitGroup{}
  15. wgReceivers.Add(NumReceivers)
  16. // ...
  17. dataCh := make(chan int)
  18. // 发送者
  19. go func() {
  20. for {
  21. if value := rand.Intn(Max); value == 0 {
  22. // 此唯一的发送者可以安全地关闭此数据通道。
  23. close(dataCh)
  24. return
  25. } else {
  26. dataCh <- value
  27. }
  28. }
  29. }()
  30. // 接收者
  31. for i := 0; i < NumReceivers; i++ {
  32. go func() {
  33. defer wgReceivers.Done()
  34. // 接收数据直到通道dataCh已关闭
  35. // 并且dataCh的缓冲队列已空。
  36. for value := range dataCh {
  37. log.Println(value)
  38. }
  39. }()
  40. }
  41. wgReceivers.Wait()
  42. }

情形二:一个接收者和N个发送者,此唯一接收者通过关闭一个额外的信号通道来通知发送者不要在发送数据了

此情形比上一种情形复杂一些。我们不能让接收者关闭用来传输数据的通道来停止数据传输,因为这样做违反了通道关闭原则。 但是我们可以让接收者关闭一个额外的信号通道来通知发送者不要在发送数据了。

  1. package main
  2. import (
  3. "time"
  4. "math/rand"
  5. "sync"
  6. "log"
  7. )
  8. func main() {
  9. rand.Seed(time.Now().UnixNano())
  10. log.SetFlags(0)
  11. // ...
  12. const Max = 100000
  13. const NumSenders = 1000
  14. wgReceivers := sync.WaitGroup{}
  15. wgReceivers.Add(1)
  16. // ...
  17. dataCh := make(chan int)
  18. stopCh := make(chan struct{})
  19. // stopCh是一个额外的信号通道。它的
  20. // 发送者为dataCh数据通道的接收者。
  21. // 它的接收者为dataCh数据通道的发送者。
  22. // 发送者
  23. for i := 0; i < NumSenders; i++ {
  24. go func() {
  25. for {
  26. // 这里的第一个尝试接收用来让此发送者
  27. // 协程尽早地退出。对于这个特定的例子,
  28. // 此select代码块并非必需。
  29. select {
  30. case <- stopCh:
  31. return
  32. default:
  33. }
  34. // 即使stopCh已经关闭,此第二个select
  35. // 代码块中的第一个分支仍很有可能在若干个
  36. // 循环步内依然不会被选中。如果这是不可接受
  37. // 的,则上面的第一个select代码块是必需的。
  38. select {
  39. case <- stopCh:
  40. return
  41. case dataCh <- rand.Intn(Max):
  42. }
  43. }
  44. }()
  45. }
  46. // 接收者
  47. go func() {
  48. defer wgReceivers.Done()
  49. for value := range dataCh {
  50. if value == Max-1 {
  51. // 此唯一的接收者同时也是stopCh通道的
  52. // 唯一发送者。尽管它不能安全地关闭dataCh数
  53. // 据通道,但它可以安全地关闭stopCh通道。
  54. close(stopCh)
  55. return
  56. }
  57. log.Println(value)
  58. }
  59. }()
  60. // ...
  61. wgReceivers.Wait()
  62. }

如此例中的注释所述,对于此额外的信号通道stopCh,它只有一个发送者,即dataCh数据通道的唯一接收者。 dataCh数据通道的接收者关闭了信号通道stopCh,这是不违反通道关闭原则的。

在此例中,数据通道dataCh并没有被关闭。是的,我们不必关闭它。 当一个通道不再被任何协程所使用后,它将逐渐被垃圾回收掉,无论它是否已经被关闭。 所以这里的优雅性体现在通过不关闭一个通道来停止使用此通道。

情形三:M个接收者和N个发送者。它们中的任何协程都可以让一个中间调解协程帮忙发出停止数据传送的信号

这是最复杂的一种情形。我们不能让接收者和发送者中的任何一个关闭用来传输数据的通道,我们也不能让多个接收者之一关闭一个额外的信号通道。 这两种做法都违反了通道关闭原则。 然而,我们可以引入一个中间调解者角色并让其关闭额外的信号通道来通知所有的接收者和发送者结束工作。 具体实现见下例。注意其中使用了一个尝试发送操作来向中间调解者发送信号。

  1. package main
  2. import (
  3. "time"
  4. "math/rand"
  5. "sync"
  6. "log"
  7. "strconv"
  8. )
  9. func main() {
  10. rand.Seed(time.Now().UnixNano())
  11. log.SetFlags(0)
  12. // ...
  13. const Max = 100000
  14. const NumReceivers = 10
  15. const NumSenders = 1000
  16. wgReceivers := sync.WaitGroup{}
  17. wgReceivers.Add(NumReceivers)
  18. // ...
  19. dataCh := make(chan int)
  20. stopCh := make(chan struct{})
  21. // stopCh是一个额外的信号通道。它的发送
  22. // 者为中间调解者。它的接收者为dataCh
  23. // 数据通道的所有的发送者和接收者。
  24. toStop := make(chan string, 1)
  25. // toStop是一个用来通知中间调解者让其
  26. // 关闭信号通道stopCh的第二个信号通道。
  27. // 此第二个信号通道的发送者为dataCh数据
  28. // 通道的所有的发送者和接收者,它的接收者
  29. // 为中间调解者。它必须为一个缓冲通道。
  30. var stoppedBy string
  31. // 中间调解者
  32. go func() {
  33. stoppedBy = <-toStop
  34. close(stopCh)
  35. }()
  36. // 发送者
  37. for i := 0; i < NumSenders; i++ {
  38. go func(id string) {
  39. for {
  40. value := rand.Intn(Max)
  41. if value == 0 {
  42. // 为了防止阻塞,这里使用了一个尝试
  43. // 发送操作来向中间调解者发送信号。
  44. select {
  45. case toStop <- "发送者#" + id:
  46. default:
  47. }
  48. return
  49. }
  50. // 此处的尝试接收操作是为了让此发送协程尽早
  51. // 退出。标准编译器对尝试接收和尝试发送做了
  52. // 特殊的优化,因而它们的速度很快。
  53. select {
  54. case <- stopCh:
  55. return
  56. default:
  57. }
  58. // 即使stopCh已关闭,如果这个select代码块
  59. // 中第二个分支的发送操作是非阻塞的,则第一个
  60. // 分支仍很有可能在若干个循环步内依然不会被选
  61. // 中。如果这是不可接受的,则上面的第一个尝试
  62. // 接收操作代码块是必需的。
  63. select {
  64. case <- stopCh:
  65. return
  66. case dataCh <- value:
  67. }
  68. }
  69. }(strconv.Itoa(i))
  70. }
  71. // 接收者
  72. for i := 0; i < NumReceivers; i++ {
  73. go func(id string) {
  74. defer wgReceivers.Done()
  75. for {
  76. // 和发送者协程一样,此处的尝试接收操作是为了
  77. // 让此接收协程尽早退出。
  78. select {
  79. case <- stopCh:
  80. return
  81. default:
  82. }
  83. // 即使stopCh已关闭,如果这个select代码块
  84. // 中第二个分支的接收操作是非阻塞的,则第一个
  85. // 分支仍很有可能在若干个循环步内依然不会被选
  86. // 中。如果这是不可接受的,则上面尝试接收操作
  87. // 代码块是必需的。
  88. select {
  89. case <- stopCh:
  90. return
  91. case value := <-dataCh:
  92. if value == Max-1 {
  93. // 为了防止阻塞,这里使用了一个尝试
  94. // 发送操作来向中间调解者发送信号。
  95. select {
  96. case toStop <- "接收者#" + id:
  97. default:
  98. }
  99. return
  100. }
  101. log.Println(value)
  102. }
  103. }
  104. }(strconv.Itoa(i))
  105. }
  106. // ...
  107. wgReceivers.Wait()
  108. log.Println("被" + stoppedBy + "终止了")
  109. }

在此例中,通道关闭原则依旧得到了遵守。

请注意,信号通道toStop的容量必须至少为1。 如果它的容量为0,则在中间调解者还未准备好的情况下就已经有某个协程向toStop发送信号时,此信号将被抛弃。

我们也可以不使用尝试发送操作向中间调解者发送信号,但信号通道toStop的容量必须至少为数据发送者和数据接收者的数量之和,以防止向其发送数据时(有一个极其微小的可能)导致某些发送者和接收者协程永久阻塞。

  1. ...
  2. toStop := make(chan string, NumReceivers + NumSenders)
  3. ...
  4. value := rand.Intn(Max)
  5. if value == 0 {
  6. toStop <- "sender#" + id
  7. return
  8. }
  9. ...
  10. if value == Max-1 {
  11. toStop <- "receiver#" + id
  12. return
  13. }
  14. ...

情形四:“M个接收者和一个发送者”情形的一个变种:用来传输数据的通道的关闭请求由第三方发出

有时,数据通道(dataCh)的关闭请求需要由某个第三方协程发出。对于这种情形,我们可以使用一个额外的信号通道来通知唯一的发送者关闭数据通道(dataCh)。

  1. package main
  2. import (
  3. "time"
  4. "math/rand"
  5. "sync"
  6. "log"
  7. )
  8. func main() {
  9. rand.Seed(time.Now().UnixNano())
  10. log.SetFlags(0)
  11. // ...
  12. const Max = 100000
  13. const NumReceivers = 100
  14. const NumThirdParties = 15
  15. wgReceivers := sync.WaitGroup{}
  16. wgReceivers.Add(NumReceivers)
  17. // ...
  18. dataCh := make(chan int)
  19. closing := make(chan struct{}) // 信号通道
  20. closed := make(chan struct{})
  21. // 此stop函数可以被安全地多次调用。
  22. stop := func() {
  23. select {
  24. case closing<-struct{}{}:
  25. <-closed
  26. case <-closed:
  27. }
  28. }
  29. // 一些第三方协程
  30. for i := 0; i < NumThirdParties; i++ {
  31. go func() {
  32. r := 1 + rand.Intn(3)
  33. time.Sleep(time.Duration(r) * time.Second)
  34. stop()
  35. }()
  36. }
  37. // 发送者
  38. go func() {
  39. defer func() {
  40. close(closed)
  41. close(dataCh)
  42. }()
  43. for {
  44. select{
  45. case <-closing: return
  46. default:
  47. }
  48. select{
  49. case <-closing: return
  50. case dataCh <- rand.Intn(Max):
  51. }
  52. }
  53. }()
  54. // 接收者
  55. for i := 0; i < NumReceivers; i++ {
  56. go func() {
  57. defer wgReceivers.Done()
  58. for value := range dataCh {
  59. log.Println(value)
  60. }
  61. }()
  62. }
  63. wgReceivers.Wait()
  64. }

上述代码中的stop函数中使用的技巧偷自Roger Peppe在此贴中的一个留言。

情形五:“N个发送者”的一个变种:用来传输数据的通道必须被关闭以通知各个接收者数据发送已经结束了

在上面的提到的“N个发送者”情形中,为了遵守通道关闭原则,我们避免了关闭数据通道(dataCh)。 但是有时候,数据通道(dataCh)必须被关闭以通知各个接收者数据发送已经结束。 对于这种“N个发送者”情形,我们可以使用一个中间通道将它们转化为“一个发送者”情形,然后继续使用上一节介绍的技巧来关闭此中间通道,从而避免了关闭原始的dataCh数据通道。

  1. package main
  2. import (
  3. "time"
  4. "math/rand"
  5. "sync"
  6. "log"
  7. "strconv"
  8. )
  9. func main() {
  10. rand.Seed(time.Now().UnixNano())
  11. log.SetFlags(0)
  12. // ...
  13. const Max = 1000000
  14. const NumReceivers = 10
  15. const NumSenders = 1000
  16. const NumThirdParties = 15
  17. wgReceivers := sync.WaitGroup{}
  18. wgReceivers.Add(NumReceivers)
  19. // ...
  20. dataCh := make(chan int) // 将被关闭
  21. middleCh := make(chan int) // 不会被关闭
  22. closing := make(chan string)
  23. closed := make(chan struct{})
  24. var stoppedBy string
  25. stop := func(by string) {
  26. select {
  27. case closing <- by:
  28. <-closed
  29. case <-closed:
  30. }
  31. }
  32. // 中间层
  33. go func() {
  34. exit := func(v int, needSend bool) {
  35. close(closed)
  36. if needSend {
  37. dataCh <- v
  38. }
  39. close(dataCh)
  40. }
  41. for {
  42. select {
  43. case stoppedBy = <-closing:
  44. exit(0, false)
  45. return
  46. case v := <- middleCh:
  47. select {
  48. case stoppedBy = <-closing:
  49. exit(v, true)
  50. return
  51. case dataCh <- v:
  52. }
  53. }
  54. }
  55. }()
  56. // 一些第三方协程
  57. for i := 0; i < NumThirdParties; i++ {
  58. go func(id string) {
  59. r := 1 + rand.Intn(3)
  60. time.Sleep(time.Duration(r) * time.Second)
  61. stop("3rd-party#" + id)
  62. }(strconv.Itoa(i))
  63. }
  64. // 发送者
  65. for i := 0; i < NumSenders; i++ {
  66. go func(id string) {
  67. for {
  68. value := rand.Intn(Max)
  69. if value == 0 {
  70. stop("sender#" + id)
  71. return
  72. }
  73. select {
  74. case <- closed:
  75. return
  76. default:
  77. }
  78. select {
  79. case <- closed:
  80. return
  81. case middleCh <- value:
  82. }
  83. }
  84. }(strconv.Itoa(i))
  85. }
  86. // 接收者
  87. for range [NumReceivers]struct{}{} {
  88. go func() {
  89. defer wgReceivers.Done()
  90. for value := range dataCh {
  91. log.Println(value)
  92. }
  93. }()
  94. }
  95. // ...
  96. wgReceivers.Wait()
  97. log.Println("stopped by", stoppedBy)
  98. }

更多情形?

在日常编程中可能会遇到更多的变种情形,但是上面介绍的情形是最常见和最基本的。 通过聪明地使用通道(和其它并发同步技术),我相信,对于各种变种,我们总会找到相应的遵守通道关闭原则的解决方法。

结论

并没有什么情况非得逼得我们违反通道关闭原则。 如果你遇到了此情形,请考虑修改你的代码流程和结构设计。

使用通道编程宛如在艺术创作一般!