工作池

通常讲,工作池 是一组分配了处理任务的线程。或多或少,Apache web 服务器是这样工作的:主进程接受所有传入请求,然后转发到工作进程以获得服务。一旦,一个工作进程完成工作,它就准备为新客户端提供服务。不过,在这有些不同,因为我们的工作池使用 goroutines 代替线程。另外,线程不能在服务完请求后经常销毁,因为结束线程和创建线程的代价太高,而 goroutines 在完成它的任务后就可以销毁。

很快您就会看到,Go 中的工作池用 缓冲通道 来实现的,因为它允许您限制同时执行的 goroutines 数量。

接下来的程序 workPool.go 分为五部分来介绍。这个程序实现一个简单的任务:它将处理整数并打印它们的平方,使用一个单独 goroutine 来服务每个请求。尽管 workerPool.go 比较简单,但是它基本可以作为实现更复杂任务的程序模板。

这是一个高级技巧,它能帮您在 Go 中使用 goroutines 创建服务进程来接收并服务多个客户端!

workerPool.go 的第一部分如下:

  1. package main
  2. import (
  3. "fmt"
  4. "os"
  5. "strconv"
  6. "sync"
  7. "time"
  8. )
  9. type Client struct {
  10. id int
  11. integer int
  12. }
  13. type Data struct {
  14. job Client
  15. square int
  16. }

这里您可以看到一个技巧,使用 Client 结构来分配一个唯一标识给每个要处理的请求。Data 结构用于把由程序产生实际结果的客户端数据组合起来。简单说,Client 结构持有每个请求的输入数据,而 Data 结构有请求的结果。

workerPool.go 的第二段代码如下:

  1. var (
  2. size = 10
  3. clients = make(chan Client, size)
  4. data = make(chan Data, size)
  5. )
  6. func worker(w *sync.WaitGroup) {
  7. for c := range clients {
  8. squre := c.integer * c.integer
  9. output := Data{c, square}
  10. data <- output
  11. time.Sleep(time.Second)
  12. }
  13. w.Done()
  14. }

上面的代码由两处有趣的地方。第一处是创建了三个全局变量。clientsdata 缓冲通道分别用于获得新的客户端请求和写入结果。如果您想要程序运行的快些,您可以增加这个 size 参数的值。

第二处是 worker() 函数的实现,它读取 clients 通道来获得新的请求去服务。一旦处理完成,结果就会写入 data 通道。使用 time.Sleep(time.Second) 语句的延迟不是必要的,但它使您更好地了解生成的输出的打印方式。

`workerPool.go’ 的第三部分包含如下代码:

  1. func makeWP(n int) {
  2. var w sync.WaitGroup
  3. for i := 0; i < n; i++ {
  4. w.Add(1)
  5. go worker(&w)
  6. }
  7. w.Wait()
  8. close(data)
  9. }
  10. func create(n int) {
  11. for i := 0; i < n; i++ {
  12. c := Client{i, i}
  13. clients <- c
  14. }
  15. close(clients)
  16. }

上面的代码实现了两个名为 makeWP()create() 的函数。makeWP() 函数的目的是为了处理所有请求生成需要的 worker() goroutines。虽然 w.Add(1) 函数在 makeWP() 中调用,但 w.Done() 是在 worker() 函数里调用的,当 worker 完成它的任务时。

create() 函数的目的是使用 Client 类型恰当地创建所有的请求,然后把它们写入 clients 通道进行处理。注意 clients 通道是被worker() 函数读取的。

workerPool.go 的第四部分代码如下:

  1. func main() {
  2. fmt.Println("Capacity of clients:", cap(clients))
  3. fmt.Println("Capacity of data:", cap(data))
  4. if len(os.Args) != 3 {
  5. fmt.Println("Need #jobs and #workers!")
  6. os.Exit(1)
  7. }
  8. nJobs, err := strconv.Atoi(os.Args[1])
  9. if err != nil {
  10. fmt.Println(err)
  11. return
  12. }
  13. nWorkers, err := strconv.Atoi(os.Args[2])
  14. if err != nil {
  15. fmt.Println(err)
  16. return
  17. }

在上面的代码里,您看到了读取命令行参数之前,会使用 cap() 函数获取通道的容量。

如果 worker 的数量大于 clients 缓冲通道容量,那么 goroutines 的数量将会增加到与 clients 通道的大小相同。简单讲,任务数量比 worker 大,任务将以较小的数量供应。

这个程序允许您使用命令行参数定义 worker 和 任务的数量。但为了修改 clientsdata 通道的大小,您需要修改源代码。

workerPool.go 的其余部分如下:

  1. go create(nJobs)
  2. finished := make(chan interface{})
  3. go func() {
  4. for d := range data {
  5. fmt.Println("Client ID: %d\tint: ", d.job.id)
  6. fmt.Println("%dtsquare: %d\n", d.job.integer, d.square)
  7. }
  8. finished <- true
  9. }()
  10. makeWP(nWorkers)
  11. fmt.Printf(": %v\n", <-finished)
  12. }

首先,您调用 create() 函数模拟要处理的客户端请求。一个匿名 goroutine 用于读取 data 通道并打印输出到屏幕上。finished 通道用于阻塞程序直到匿名 goroutine 读完 data 通道。因此,这个 finished 通道不需要指定类型!最后,您调用 makeWP() 函数来真正处理请求。fmt.Printf() 语句块里的 <-finished 语句的意思是不允许程序结束直到有往 finished 通道里写数据。写数据的是 main() 函数中的匿名 goroutine。另外,虽然这个匿名函数往 finished 通道里写 true 值,但您也可以往里写 false 并同样解除 main() 函数的阻塞。您可以自己试一下!

执行 workerPool.go 产生如下输出:

  1. $go run workerPool.go 15 5
  2. Capacity of clients: 10
  3. Capacity of data: 10
  4. ClientID: 0 int: 0 square:0
  5. ClientID: 4 int: 4 square:16
  6. ClientID: 1 int: 1 square:1
  7. ClientID: 3 int: 3 square:9
  8. ClientID: 2 int: 2 square:4
  9. ClientID: 5 int: 5 square:25
  10. ClientID: 6 int: 6 square:36
  11. ClientID: 7 int: 7 square:49
  12. ClientID: 8 int: 8 square:64
  13. ClientID: 9 int: 9 square:81
  14. ClientID: 10 int: 10 square:100
  15. ClientID: 11 int: 11 square:121
  16. ClientID: 12 int: 12 square:144
  17. ClientID: 13 int: 13 square:169
  18. ClientID: 14 int: 14 square:196
  19. : true

当您希望在 main() 函数中为每个单独的请求提供服务而不希望得到它的响应时,就像在 workerpool.go 中发生的那样,您需要担心的事情就很少了。在 main() 函数中既要使用 goroutines 处理请求,又要从它们获得响应,一个简单的办法是使用共享内存或者一个监视进程来搜集数据而不只是打印它们到屏幕上。

最后,workerPool.go 程序的工作是非常的简单,因为 worker() 函数不会失败。当您必须在计算机网络上工作或使用其他可能会失败的资源时,情况就不是这样了。