管道

管道是一个虚拟的方法用来连接 goroutines 和 通道,使一个 goroutine 的输出成为另一个的输入,使用通道传递数据。

使用管道的一个好处是程序中有不变的数据流,因此 goroutine 和 通道不必等所有都就绪才开始执行。另外,因为您不必把所有内容都保存为变量,就节省了变量和内存空间的使用。最后,管道简化了程序设计并提升了维护性。

我们使用 pipeline.go 代码来说明管道的使用。这个程序分六部分来介绍。pipeling.go 程序执行的任务是在给定范围内产生随机数,当任何数字随机出现第二次时就结束。但在终止前,程序将打印第一个随机数出现第二次之前的所有随机数之和。您需要三个函数来连接程序的通道。程序的逻辑在这三个函数中,但数据流在管道的通道内。

这个程序有两个通道。第一个(通道A)用于从第一个函数获取随机数并发送它们到第二个函数。第二个(通道B)被第二个函数用来发送可接受的随机数到第三个函数。第三个函数负责从通道 B 获取数据,并计算结果和展示。

pipeline.go 的第一段代码如下:

  1. package main
  2. import (
  3. "fmt"
  4. "math/rand"
  5. "os"
  6. "strconv"
  7. "time"
  8. )
  9. var CLOSEA = false
  10. var DATA = make(map[int]bool)

因为 second() 函数需要通过一种方式告诉 first() 函数关闭第一个通道,所以我使用一个全局变量 CLOSEA 来处理。CLOSEA 变量只在 first() 函数中检查,并只能在 second() 函数中修改。

pipeline.go 的第二段代码如下:

  1. func random(min, max int) int {
  2. return rand.Intn(max-min) + min
  3. }
  4. func first(min, max int, out chan<- int) {
  5. for {
  6. if CLOSEA {
  7. close(out)
  8. return
  9. }
  10. out <- random(min, max)
  11. }
  12. }

上面的代码展示了 randomfirst 函数的实现。您已经对 random() 函数比较熟悉了,它在一定范围内产生随机数。但真正有趣的是 first() 函数。它使用 for 循环持续运行,直到 CLOSEA 变为 true。那样的话,它就关闭 out 通道。

pipeline.go 的第三段代码如下:

  1. func second(out chan<- int, in <-chan int) {
  2. for x := range in {
  3. fmt.Print(x, " ")
  4. _, ok := DATA[x]
  5. if ok {
  6. CLOSEA = true
  7. }else {
  8. DATA[x] = true
  9. out <- x
  10. }
  11. }
  12. fmt.Println()
  13. close(out)
  14. }

second() 函数从 in 通道接收数据,并发送该数据到 out 通道。但是,second() 函数一旦发现 DATA map 中已经存在了该随机数,它就把 CLOSEA 全局变量设为 true 并停止发送任何数据到 out 通道。然后,它就关闭 out 通道。

pipeline.go 的第四段代码如下:

  1. func third(in <-chan int){
  2. var sum int
  3. sum = 0
  4. for x2 := range in {
  5. sum = sum + x2
  6. }
  7. fmt.Println("The sum of the random numbers is %d\n", sum)
  8. }

third 函数持续从 in 通道读取数据。当通道被 second() 函数关闭时,for 循环停止获取数据,函数打印输出。从这很清楚的看到 second() 函数控制许多事情。

pipeline.go 的第五段代码如下:

  1. func main() {
  2. if len(os.Args) != 3 {
  3. fmt.Println("Need two integer paramters!")
  4. os.Exit(1)
  5. }
  6. n1, _ := strconv.Atoi(os.Args[1])
  7. n2, _ := strconv.Atoi(os.Args[2])
  8. if n1 > n2 {
  9. fmt.Printf("%d should be smaller than %d\n", n1, n2)
  10. return
  11. }

pipeline.go 的最后一段如下:

  1. rand.Seed(time.Now().UnixNano())
  2. A := make(chan int)
  3. B := make(chan int)
  4. go first(n1, n2, A)
  5. go second(B, A)
  6. third(B)
  7. }

这里,定义了需要的通道,并执行两个 goroutines 和一个函数。third() 函数阻止 main 返回,因为它不作为 goroutine 执行。

执行 pipeline.go 产生如下输出:

  1. $go run pipeline.go 1 10
  2. 2 2
  3. The sum of the random numbers is 2
  4. $go run pipeline.go 1 10
  5. 9 7 8 4 3 3
  6. The sum of the random numbers is 31
  7. $go run pipeline.go 1 10
  8. 1 6 9 7 1
  9. The sum of the random numbers is 23
  10. $go run pipeline.go 10 20
  11. 16 19 16
  12. The sum of the random numbers is 35
  13. $go run pipeline.go 10 20
  14. 10 16 17 11 15 10
  15. The sum of the random numbers is 69
  16. $go run pipeline.go 10 20
  17. 12 11 14 15 10 15
  18. The sum of the random numbers is 62

这里重点是尽管 first 函数按自己的节奏持续产生随机数,并且 second() 函数把它们打印在屏幕上,不需要的随机数也就是已经出现的随机数,不会发送给 third() 函数,因此就不会包含在最终的总和中。