9.11 Go 并发控制

前言

提到Go语言的并发,就不得不提goroutine,其作为Go语言的一大特色,在日常开发中使用很多。

在日常应用场景就会涉及一个goroutine启动或结束,启动一个goroutine很简单只需要在函数前面加关键词go即可,而由于每个goroutine都是独立运行的,其退出有自身决定的,除非main主程序结束或程序崩溃的情况发生。

那么,如何控制goroutine或者说通知goroutine结束运行呢?

解决的方式其实很简单,那就是想办法和goroutine通讯,通知goroutine什么时候结束,goroutine结束也可以通知其他goroutine或main主程序。

并发控制方法主要有:

全局变量

channel

WaitGroup

context

全局变量

这是并发控制最简单的实现方式

1、声明一个全局变量。

2、所有子goroutine共享这个变量,并不断轮询这个变量检查是否有更新;

3、在主进程中变更该全局变量;

4、子goroutine检测到全局变量更新,执行相应的逻辑。

示例

  1. package main
  2. import (
  3. "fmt"
  4. "time"
  5. )
  6. func main() {
  7. open := true
  8. go func() {
  9. for open {
  10. println("goroutineA running")
  11. time.Sleep(1 * time.Second)
  12. }
  13. println("goroutineA exit")
  14. }()
  15. go func() {
  16. for open {
  17. println("goroutineB running")
  18. time.Sleep(1 * time.Second)
  19. }
  20. println("goroutineB exit")
  21. }()
  22. time.Sleep(2 * time.Second)
  23. open = false
  24. time.Sleep(2 * time.Second)
  25. fmt.Println("main fun exit")
  26. }

输出

goroutineA running goroutineB running goroutineA running goroutineB running goroutineB running goroutineA exit goroutineB exit main fun exit

这种实现方式

优点:实现简单。

缺点:适用一些逻辑简单的场景,全局变量的信息量比较少,为了防止不同goroutine同时修改变量需要用到加锁来解决。

channel

channel是goroutine之间主要的通讯方式,一般会和select搭配使用。

如想了解channel实现原理可参考

$zh-9.9.md

1、声明一个stop的chan。

2、在goroutine中,使用select判断stop是否可以接收到值,如果可以接收到,就表示可以退出停止了;如果没有接收到,就会执行default里逻辑。直到收到stop的通知。

3、主程序发送了stop<- true结束的指令后。

4、子goroutine接到结束指令case <-stop退出return。

示例

  1. package main
  2. import (
  3. "fmt"
  4. "time"
  5. )
  6. func main() {
  7. stop := make(chan bool)
  8. go func() {
  9. for {
  10. select {
  11. case <-stop:
  12. fmt.Println("goroutine exit")
  13. return
  14. default:
  15. fmt.Println("goroutine running")
  16. time.Sleep(1 * time.Second)
  17. }
  18. }
  19. }()
  20. time.Sleep(2 * time.Second)
  21. stop <- true
  22. time.Sleep(2 * time.Second)
  23. fmt.Println("main fun exit")
  24. }

输出

goroutine running goroutine running goroutine running goroutine exit main fun exit

这种select+chan是一种比较优雅的并发控制方式,但也有局限性,如多个goroutine 需要结束,以及嵌套goroutine 的场景。

WaitGroup

Go语言提供同步包(sync),源码(src/sync/waitgroup.go)。

Sync包同步提供基本的同步原语,如互斥锁。除了Once和WaitGroup类型之外,大多数类型都是供低级库例程使用的。通过Channel和沟通可以更好地完成更高级别的同步。并且此包中的值在使用过后不要拷贝。

Sync.WaitGroup是一种实现并发控制方式,WaitGroup 对象内部有一个计数器,最初从0开始,它有三个方法:Add(), Done(), Wait() 用来控制计数器的数量。

  • Add(n) 把计数器设置为n
  • Done() 每次把计数器-1
  • wait() 会阻塞代码的运行,直到计数器地值减为0。

示例

  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. )
  7. func main() {
  8. //定义一个WaitGroup
  9. var wg sync.WaitGroup
  10. //计数器设置为2
  11. wg.Add(2)
  12. go func() {
  13. time.Sleep(2 * time.Second)
  14. fmt.Println("goroutineA finish")
  15. //计数器减1
  16. wg.Done()
  17. }()
  18. go func() {
  19. time.Sleep(2 * time.Second)
  20. fmt.Println("goroutineB finish")
  21. //计数器减1
  22. wg.Done()
  23. }()
  24. //会阻塞代码的运行,直到计数器地值减为0。
  25. wg.Wait()
  26. time.Sleep(2 * time.Second)
  27. fmt.Println("main fun exit")
  28. }

这种控制并发的方式适用于,好多个goroutine协同做一件事情的时候,因为每个goroutine做的都是这件事情的一部分,只有全部的goroutine都完成,这件事情才算是完成,这是等待的方式。WaitGroup相对于channel并发控制方式比较轻巧。

注意:

1、计数器不能为负值

2、WaitGroup对象不是一个引用类型

Context

应用场景:在 Go http 包的 Server 中,每个Request都需要开启一个goroutine做一些事情,这些goroutine又可能会开启其他的goroutine。所以我们需要一种可以跟踪goroutine的方案,才可以达到控制他们的目的,这就是Go语言为我们提供的Context,称之为上下文。

控制并发的实现方式:

1、 context.Background():返回一个空的Context,这个空的Context一般用于整个Context树的根节点。

2、context.WithCancel(context.Background()),创建一个可取消的子Context,然后当作参数传给goroutine使用,这样就可以使用这个子Context跟踪这个goroutine。

3、在goroutine中,使用select调用<-ctx.Done()判断是否要结束,如果接收到值的话,就可以返回结束goroutine了;如果接收不到,就会继续进行监控。

4、cancel(),取消函数(context.WithCancel()返回的第二个参数,名字和声明的名字一致)。作用是给goroutine发送结束指令。

示例:

  1. package main
  2. import (
  3. "fmt"
  4. "time"
  5. "golang.org/x/net/context"
  6. )
  7. func main() {
  8. //创建一个可取消子context,context.Background():返回一个空的Context,这个空的Context一般用于整个Context树的根节点。
  9. ctx, cancel := context.WithCancel(context.Background())
  10. go func(ctx context.Context) {
  11. for {
  12. select {
  13. //使用select调用<-ctx.Done()判断是否要结束
  14. case <-ctx.Done():
  15. fmt.Println("goroutine exit")
  16. return
  17. default:
  18. fmt.Println("goroutine running.")
  19. time.Sleep(2 * time.Second)
  20. }
  21. }
  22. }(ctx)
  23. time.Sleep(10 * time.Second)
  24. fmt.Println("main fun exit")
  25. //取消context
  26. cancel()
  27. time.Sleep(5 * time.Second)
  28. }

输出:

goroutine running. goroutine running. goroutine running. goroutine running. goroutine running. main fun exit goroutine exit

如果想控制多个goroutine ,也很简单。

示例

  1. package main
  2. import (
  3. "fmt"
  4. "time"
  5. "golang.org/x/net/context"
  6. )
  7. func main() {
  8. //创建一个可取消子context,context.Background():返回一个空的Context,这个空的Context一般用于整个Context树的根节点。
  9. ctx, cancel := context.WithCancel(context.Background())
  10. ctxTwo, cancelTwo := context.WithCancel(context.Background())
  11. go func(ctx context.Context) {
  12. for {
  13. select {
  14. //使用select调用<-ctx.Done()判断是否要结束
  15. case <-ctx.Done():
  16. fmt.Println("goroutineA exit")
  17. return
  18. default:
  19. fmt.Println("goroutineA running.")
  20. time.Sleep(2 * time.Second)
  21. }
  22. }
  23. }(ctx)
  24. go func(ctx context.Context) {
  25. for {
  26. select {
  27. //使用select调用<-ctx.Done()判断是否要结束
  28. case <-ctx.Done():
  29. fmt.Println("goroutineB exit")
  30. return
  31. default:
  32. fmt.Println("goroutineB running.")
  33. time.Sleep(2 * time.Second)
  34. }
  35. }
  36. }(ctx)
  37. go func(ctxTwo context.Context) {
  38. for {
  39. select {
  40. //使用select调用<-ctx.Done()判断是否要结束
  41. case <-ctxTwo.Done():
  42. fmt.Println("goroutineC exit")
  43. return
  44. default:
  45. fmt.Println("goroutineC running.")
  46. time.Sleep(2 * time.Second)
  47. }
  48. }
  49. }(ctxTwo)
  50. time.Sleep(4 * time.Second)
  51. fmt.Println("main fun exit")
  52. //取消context
  53. cancel()
  54. cancelTwo()
  55. time.Sleep(5 * time.Second)
  56. }

结果:

goroutineA running. goroutineB running. goroutineC running. goroutineB running. goroutineC running. goroutineA running. goroutineC running. goroutineA running. goroutineB running. main fun exit goroutineC exit goroutineA exit goroutineB exit

context还适用于更复杂的场景,如主动取消goroutine或goroutine定时取消等。context接口除了func WithCancel(parent Context) (ctx Context, cancel CancelFunc),还有衍生以下方法

  • func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc): 此函数返回其父项的派生 context,当截止日期超过或取消函数被调用时,该 context 将被取消。例如,您可以创建一个将在以后的某个时间自动取消的 context,并在子函数中传递它。当因为截止日期耗尽而取消该 context 时,获此 context 的所有函数都会收到通知去停止运行并返回。

  • func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc):

    此函数类似于 context.WithDeadline。不同之处在于它将持续时间作为参数输入而不是时间对象。此函数返回派生 context,如果调用取消函数或超出超时持续时间,则会取消该派生 context。

  • func WithValue(parent Context, key, val interface{}) Context:

    此函数接收 context 并返回派生 context,其中值 val 与 key 关联,并通过 context 树与 context 一起传递。这意味着一旦获得带有值的 context,从中派生的任何 context 都会获得此值。不建议使用 context 值传递关键参数,而是函数应接收签名中的那些值,使其显式化。

有兴趣的同学请阅读:https://studygolang.com/pkgdoc

参考:

https://tutorialedge.net/golang/go-waitgroup-tutorial/

http://goinbigdata.com/golang-wait-for-all-goroutines-to-finish/

https://medium.com/code-zen/concurrency-in-go-5fcba11acb0f

https://blog.csdn.net/u013029603/article/details/81232395

links