扇出和扇入

通常情况下多个函数可以同时从一个channel接收数据,直到channel关闭,这种情况被称作扇出。这是一种将工作分布给一组工作者的方法,目的是并行使用CPU和I/O。

如果一个函数同时接收并处理多个channel输入并转化为一个输出channel,直到所有的输入channel都关闭后,关闭输出channel,这种情况就被称作扇入。

但是main可以容易的通过关闭done channel来释放所有的发送者。关闭是个高效的发送给所有发送者的信号。我们扩展channel管道里的每个函数,让其以参数方式接收done,并通过defer语句在函数退出时执行关闭操作,这样main里所有的退出路径都会触发管道里的所有状态退出。

  1. func main() {
  2. // 构建done channel,整个管道里分享done,并在管道退出时关闭这个channel
  3. // 以此通知所有Goroutine该推出了。
  4. done := make(chan struct{})
  5. defer close(done)
  6. in := gen(done, 2, 3)
  7. // 发布sq的工作到两个都从in里读取数据的Goroutine
  8. c1 := sq(done, in)
  9. c2 := sq(done, in)
  10. // 处理来自output的第一个数值
  11. out := merge(done, c1, c2)
  12. fmt.Println(<-out) // 4 或者 9
  13. // done会通过defer调用而关闭
  14. }
  15. func sq(in <-chan int) <-chan int {
  16. out := make(chan int)
  17. go func() {
  18. for n := range in {
  19. out <- n*n
  20. }
  21. close(out)
  22. }()
  23. return out
  24. }

merge对每个流入channel启动一个Goroutine,并将流入的数值复制到流出channel,由此将一组channel转换到一个channel。一旦启动了所有的output Goroutine,merge函数会多启动一个Goroutine,这个Goroutine在所有的输入channel输入完毕后,关闭流出channel。sq函数是把上一个函数的chan最为参数,下一个输出的chan作为返回值。

但是往一个已经关闭的channel输出会产生异常(panic),所以一定要保证所有数据发送完成后再执行关闭。

所以发送Goroutine将发送操作替换为一个select语句,要么把数据发送给out,要么处理来自done的数值。done的类型是个空结构,因为具体数值并不重要:接收事件本身就指明了应当放弃继续发送给out的动作。而output Goroutine会继续循环处理流入的channel,c,而不会阻塞上游状态.

  1. //gen函数启动一个Goroutine,将整数数列发送给channel,如果所有数都发送完成,关闭这个channel
  2. func gen(nums ...int) <-chan int {
  3. out := make(chan int, len(nums))
  4. for _, n := range nums {
  5. out <- n
  6. }
  7. close(out)
  8. return out
  9. }
  10. // 从一个channel接收整数,并求整数的平方,发送给另一个channel.
  11. // mission的循环中退出,因为我们知道如果done已经被关闭了,也会关闭上游的gen状态.
  12. // mission通过defer语句,保证不管从哪个返回路径,它的out channel都会被关闭.
  13. func mission(in <-chan int) <-chan int {
  14. out := make(chan int)
  15. go func() {
  16. defer close(out)
  17. for n := range in {
  18. select {
  19. case out <- n * n:
  20. case <-done:
  21. return
  22. }
  23. }
  24. }()
  25. return out
  26. }
  27. func merge(cs ...<-chan int) <-chan int {
  28. var wg sync.WaitGroup
  29. out := make(chan int)
  30. // 为每个cs中的输入channel启动一个output Goroutine。outpu从c里复制数值直到c被关闭
  31. // 或者从done里接收到数值,之后output调用wg.Done
  32. output := func(c <-chan int) {
  33. for n := range c {
  34. select {
  35. case out <- n:
  36. case <-done:
  37. }
  38. }
  39. wg.Done()
  40. }
  41. wg.Add(len(cs))
  42. for _, c := range cs {
  43. go output(c)
  44. }
  45. // 启动一个Goroutine,当所有output Goroutine都工作完后(wg.Done),关闭out,
  46. // 保证只关闭一次。这个Goroutine必须在wg.Add之后启动
  47. go func() {
  48. wg.Wait()
  49. close(out)
  50. }()
  51. return out
  52. }

在channel模式中有个模式:

  • 状态会在所有发送操作做完后,关闭它们的流出channel

  • 状态会持续接收从流入channel输入的数值,直到channel关闭

这个模式使得每个接收状态可以写为一个range循环,并保证所有的Goroutine在将所有的数值发送成功给下游后立刻退出。

所以在构建channel的时候:

  • 状态会在所有发送操作做完后,关闭它们的流出channel.
  • 状态会持续接收从流入channel输入的数值,直到channel关闭或者其发送者被释放.

因而管道要么保证足够能存下所有发送数据的缓冲区,要么接收来自接收者明确的要放弃channel的信号,来保证释放发送者。