扇出和扇入
通常情况下多个函数可以同时从一个channel接收数据,直到channel关闭,这种情况被称作扇出。这是一种将工作分布给一组工作者的方法,目的是并行使用CPU和I/O。
如果一个函数同时接收并处理多个channel输入并转化为一个输出channel,直到所有的输入channel都关闭后,关闭输出channel,这种情况就被称作扇入。
但是main可以容易的通过关闭done channel来释放所有的发送者。关闭是个高效的发送给所有发送者的信号。我们扩展channel管道里的每个函数,让其以参数方式接收done,并通过defer语句在函数退出时执行关闭操作,这样main里所有的退出路径都会触发管道里的所有状态退出。
func main() {
// 构建done channel,整个管道里分享done,并在管道退出时关闭这个channel
// 以此通知所有Goroutine该推出了。
done := make(chan struct{})
defer close(done)
in := gen(done, 2, 3)
// 发布sq的工作到两个都从in里读取数据的Goroutine
c1 := sq(done, in)
c2 := sq(done, in)
// 处理来自output的第一个数值
out := merge(done, c1, c2)
fmt.Println(<-out) // 4 或者 9
// done会通过defer调用而关闭
}
func sq(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n*n
}
close(out)
}()
return out
}
merge对每个流入channel启动一个Goroutine,并将流入的数值复制到流出channel,由此将一组channel转换到一个channel。一旦启动了所有的output Goroutine,merge函数会多启动一个Goroutine,这个Goroutine在所有的输入channel输入完毕后,关闭流出channel。sq函数是把上一个函数的chan最为参数,下一个输出的chan作为返回值。
但是往一个已经关闭的channel输出会产生异常(panic),所以一定要保证所有数据发送完成后再执行关闭。
所以发送Goroutine将发送操作替换为一个select语句,要么把数据发送给out,要么处理来自done的数值。done的类型是个空结构,因为具体数值并不重要:接收事件本身就指明了应当放弃继续发送给out的动作。而output Goroutine会继续循环处理流入的channel,c,而不会阻塞上游状态.
//gen函数启动一个Goroutine,将整数数列发送给channel,如果所有数都发送完成,关闭这个channel
func gen(nums ...int) <-chan int {
out := make(chan int, len(nums))
for _, n := range nums {
out <- n
}
close(out)
return out
}
// 从一个channel接收整数,并求整数的平方,发送给另一个channel.
// mission的循环中退出,因为我们知道如果done已经被关闭了,也会关闭上游的gen状态.
// mission通过defer语句,保证不管从哪个返回路径,它的out channel都会被关闭.
func mission(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
select {
case out <- n * n:
case <-done:
return
}
}
}()
return out
}
func merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// 为每个cs中的输入channel启动一个output Goroutine。outpu从c里复制数值直到c被关闭
// 或者从done里接收到数值,之后output调用wg.Done
output := func(c <-chan int) {
for n := range c {
select {
case out <- n:
case <-done:
}
}
wg.Done()
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}
// 启动一个Goroutine,当所有output Goroutine都工作完后(wg.Done),关闭out,
// 保证只关闭一次。这个Goroutine必须在wg.Add之后启动
go func() {
wg.Wait()
close(out)
}()
return out
}
在channel模式中有个模式:
状态会在所有发送操作做完后,关闭它们的流出channel
状态会持续接收从流入channel输入的数值,直到channel关闭
这个模式使得每个接收状态可以写为一个range循环,并保证所有的Goroutine在将所有的数值发送成功给下游后立刻退出。
所以在构建channel的时候:
- 状态会在所有发送操作做完后,关闭它们的流出channel.
- 状态会持续接收从流入channel输入的数值,直到channel关闭或者其发送者被释放.
因而管道要么保证足够能存下所有发送数据的缓冲区,要么接收来自接收者明确的要放弃channel的信号,来保证释放发送者。