一些常见并发编程错误

Go语言是一门天然支持并发的编程语言。 通过使用go关键字,我们可以很轻松地创建协程;通过使用通道Go中提供的其它各种同步技术,并发编程变得简单、轻松和有趣。

另一方面,Go并不阻止程序员在并发编程中因为粗心或者经验不足而犯错。 本文的余下部分将展示一些常见的并发错误,来帮助Go程序员在实践中避免这些错误。

当需要同步的时候没有同步

我们已经知道,源文件中的代码行在运行时刻并非总是按照它们的出现次序被执行

下面这个示例程序犯了两个错误:

  • 首先,主协程中对变量b的读取和匿名协程中的对变量b的写入可能会产生数据竞争;
  • 其次,在主协程中,条件b == true成立并不能确保条件a != nil也成立。 编译器和CPU可能会对调整此程序中匿名协程中的某些指令的顺序已获取更快的执行速度。 所以,站在主协程的视角看,对变量b的赋值可能会发生在对变量a的赋值之前,这将造成在修改a的元素时a依然为一个nil切片。
  1. package main
  2. import (
  3. "time"
  4. "runtime"
  5. )
  6. func main() {
  7. var a []int // nil
  8. var b bool // false
  9. // 一个匿名协程。
  10. go func () {
  11. a = make([]int, 3)
  12. b = true // 写入b
  13. }()
  14. for !b { // 读取b
  15. time.Sleep(time.Second)
  16. runtime.Gosched()
  17. }
  18. a[0], a[1], a[2] = 0, 1, 2 // 可能会发生恐慌
  19. }

上面这个程序可能在很多计算机上运行良好,但是可能会在某些计算机上因为恐慌而崩溃退出;或者使用某些编译器编译的时候运行良好,但使用另外的某个编译器编译的时候将造成程序运行时崩溃退出。

我们应该使用通道或者sync标准库包中的同步技术来确保内存顺序。比如:

  1. package main
  2. func main() {
  3. var a []int = nil
  4. c := make(chan struct{})
  5. go func () {
  6. a = make([]int, 3)
  7. c <- struct{}{}
  8. }()
  9. <-c
  10. a[0], a[1], a[2] = 0, 1, 2 // 绝不会造成恐慌
  11. }

使用time.Sleep调用来做同步

让我们看一个简单的例子:

  1. package main
  2. import (
  3. "fmt"
  4. "time"
  5. )
  6. func main() {
  7. var x = 123
  8. go func() {
  9. x = 789 // 写入x
  10. }()
  11. time.Sleep(time.Second)
  12. fmt.Println(x) // 读取x
  13. }

我们期望着此程序打印出789。 事实上,则其运行结果常常正如我们所期待的。 但是,此程序中的同步处理实现的正确吗?否!原因很简单,Go运行时并不能保证对x的写入一定发生在对x的读取之前。 在某些特定的情形下,比如CPU资源被很一些其它计算密集的程序所占用,则对x的写入有可能发生在对x的读取之后。 因此,我们不应该在正式的项目中使用time.Sleep调用来做同步。

让我们看另一个简单的例子:

  1. package main
  2. import (
  3. "fmt"
  4. "time"
  5. )
  6. var x = 0
  7. func main() {
  8. var num = 123
  9. var p = &num
  10. c := make(chan int)
  11. go func() {
  12. c <- *p + x
  13. }()
  14. time.Sleep(time.Second)
  15. num = 789
  16. fmt.Println(<-c)
  17. }

你觉得此程序会输出什么?123还是789? 事实上,它的输出是和具体使用的编译器相关的。 对于标准编译器1.16版本来说,它很可能输出123。 但是从理论上说,它输出789或者另外一个预想不到的值也是有可能的。

让我们将此例中的c <- *p + x一行换成c <- *p,然后重新运行它,你将会发现它的输出变成了789(如果它使用标准编译器1.16版本编译的话)。 重申一次,此结果是和具体使用的编译器和编译器的版本相关的。

是的,此程序中存在数据竞争。表达式*p的估值可能发生在赋值num = 789之前、之后、或者同时。 time.Sleep调用并不能保证*p的估值发生在此赋值之后。

对于这个特定的例子,我们应该将欲发送的值在开启新协程之前存储在一个临时变量中来避免数据竞争。

  1. ...
  2. tmp := *p
  3. go func() {
  4. c <- tmp
  5. }()
  6. ...

使一些协程永久处于阻塞状态

有很多原因导致某个协程永久阻塞,比如:

  • 从一个永远不会有其它协程向其发送数据的通道接收数据;
  • 向一个永远不会有其它协程从中读取数据的通道发送数据;
  • 被自己死锁了;
  • 和其它协程相互死锁了;
  • 等等。

除了有时我们故意地将主协程永久阻塞以防止程序退出外,其它大多数造成协程永久阻塞的情况都不是我们所期待的。 Go运行时很难分辨出一个处于阻塞状态的协程是否将永久阻塞下去,所以Go运行时不会释放永久处于阻塞状态的协程占用的资源。

采用最快回应通道用例中,如果被当作future/promise来用的通道的容量不足够大,则较慢回应的协程在准备发送回应结果时将永久阻塞。 比如,下面的例子中,每个请求将导致4个协程永久阻塞。

  1. func request() int {
  2. c := make(chan int)
  3. for i := 0; i < 5; i++ {
  4. i := i
  5. go func() {
  6. c <- i // 4个协程将永久阻塞在这里
  7. }()
  8. }
  9. return <-c
  10. }

为了防止有4个协程永久阻塞,被当作future/promise使用的通道的容量必须至少为4.

第二种“采用最快回应”实现方法中,如果被当作future/promise使用的通道是一个非缓冲通道(如下面的代码所示),则有可能导致其通道的接收者可能会错过所有的回应而导致处于永久阻塞状态。

  1. func request() int {
  2. c := make(chan int)
  3. for i := 0; i < 5; i++ {
  4. i := i
  5. go func() {
  6. select {
  7. case c <- i:
  8. default:
  9. }
  10. }()
  11. }
  12. return <-c // 有可能永久阻塞在此
  13. }

接收者协程可能会永久阻塞的原因是如果5个尝试发送操作都发生在接收操作<-c准备好之前,亦即5个个尝试发送操作都失败了,则接收者协程将永远无值可接收(从而将处于永久阻塞状态)。

将通道c改为一个缓冲通道,则至少会有一个尝试发送将成功,从而接收者协程肯定不会永久阻塞。

复制sync标准库包中的类型的值

在实践中,sync标准库包中的类型(除了Locker接口类型)的值不应该被复制。 我们只应该复制它们的指针值。

下面是一个有问题的并发编程的例子。 在此例子中,当Counter.Value方法被调用时,一个Counter属主值将被复制,此属主值的字段Mutex也将被一同复制。 此复制并没有被同步保护,因此复制结果可能是不完整的,并非被复制的属主值的一个快照。 即使此Mutex字段得以侥幸完整复制,它的副本所保护的是对字段n的一个副本的访问,因此一般是没有意义的。

  1. import "sync"
  2. type Counter struct {
  3. sync.Mutex
  4. n int64
  5. }
  6. // 此方法实现是没问题的。
  7. func (c *Counter) Increase(d int64) (r int64) {
  8. c.Lock()
  9. c.n += d
  10. r = c.n
  11. c.Unlock()
  12. return
  13. }
  14. // 此方法的实现是有问题的。当它被调用时,
  15. // 一个Counter属主值将被复制。
  16. func (c Counter) Value() (r int64) {
  17. c.Lock()
  18. r = c.n
  19. c.Unlock()
  20. return
  21. }

我们应该将Value方法的属主参数类型更改为指针类型*Counter来避免复制sync.Mutex值。

Go官方工具链中提供的go vet命令将提示此例中的Value方法的声明可能是一个潜在的逻辑错误。

在错误的地方调用sync.WaitGroup.Add方法

每个sync.WaitGroup值内部维护着一个计数。此计数的初始值为0。 如果一个sync.WaitGroup值的Wait方法在此计数为0的时候被调用,则此调用不会阻塞,否则此调用将一直阻塞到此计数变为0为止。

为了让一个WaitGroup值的使用有意义,在此值的计数为0的情况下,对它的下一次Add方法的调用必须出现在对它的下一次Wait方法的调用之前。

比如,在下面的例子中,Add方法的调用位置是不合适的。 此例子程序的打印结果并不总是100,而可能是0100间的任何一个值。 原因是没有任何一个Add方法调用可以确保发生在唯一的Wait方法调用之前,结果导致没有任何一个Done方法调用可以确保发生在唯一的Wait方法调用返回之前。

  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. "sync/atomic"
  6. )
  7. func main() {
  8. var wg sync.WaitGroup
  9. var x int32 = 0
  10. for i := 0; i < 100; i++ {
  11. go func() {
  12. wg.Add(1)
  13. atomic.AddInt32(&x, 1)
  14. wg.Done()
  15. }()
  16. }
  17. fmt.Println("等待片刻...")
  18. wg.Wait()
  19. fmt.Println(atomic.LoadInt32(&x))
  20. }

我们应该将对Add方法的调用移出匿名协程之外,像下面这样,使得任何一个Done方法调用都确保发生在唯一的Wait方法调用返回之前。

  1. ...
  2. for i := 0; i < 100; i++ {
  3. wg.Add(1)
  4. go func() {
  5. atomic.AddInt32(&x, 1)
  6. wg.Done()
  7. }()
  8. }
  9. ...

不当地使用用做Future/Promise的通道

通道用例大全一文中,我们了解到一些函数可以返回用做future/promise的通道结果。 假设fafb是这样的两个函数,则下面的调用方式并没有体现出这两个函数的真正价值。

  1. doSomethingWithFutureArguments(<-fa(), <-fb())

在上面这行调用中,两个实参值(promise回应结果)的生成实际上是串行进行的,future/promise的价值没有体现出来。

我们应该像下面这样调用这两个函数来并发生成两个回应结果:

  1. ca, cb := fa(), fb()
  2. doSomethingWithFutureArguments(<-ca, <-cb)

没有让最后一个活跃的发送者关闭通道

Go程序员常犯的一个错误是关闭一个后续可能还会有协程向其发送数据的通道。 当向一个已关闭的通道发送数据的时候,一个恐慌将产生。

这样的错误曾经发生在一些很有名的项目中,比如Kubernetes项目中的这个bug这个bug

请阅读此篇文章来了解如何安全和优雅地关闭通道。

对地址不保证为8字节对齐的值执行64位原子操作

截至目前(Go 1.16),64位原子操作中涉及到的实参地址必须为8字节对齐的。不满足此条件的64位原子操作将造成一个恐慌。 对于标准编译器,这样的情形只可能发生在32位的架构中。 请阅读内存布局一文来获知如何确保让64位的整数值的地址在32位的架构中8字节对齐。

没留意过多的time.After函数调用消耗了大量资源

time标准库包中的After函数返回一个用做延迟通知的通道。 此函数给并发编程带来了很多便利,但是它的每个调用都需要创建一个time.Timer值,此新创建的Timer值在传递给After函数调用的时长(实参)内肯定不会被垃圾回收。 如果此函数在某个时段内被多次频繁调用,则可能导致积累很多尚未过期的Timer值从而造成大量的内存和计算消耗。

比如在下面这个例子中,如果longRunning函数被调用并且在一分钟内有一百万条消息到达, 那么在某个特定的很小时间段(大概若干秒)内将存在一百万个活跃的Timer值,即使其中只有一个是真正有用的。

  1. import (
  2. "fmt"
  3. "time"
  4. )
  5. // 如果某两个连续的消息的间隔大于一分钟,此函数将返回。
  6. func longRunning(messages <-chan string) {
  7. for {
  8. select {
  9. case <-time.After(time.Minute):
  10. return
  11. case msg := <-messages:
  12. fmt.Println(msg)
  13. }
  14. }
  15. }

为了避免太多的Timer值被创建,我们应该只使用(并复用)一个Timer值,像下面这样:

  1. func longRunning(messages <-chan string) {
  2. timer := time.NewTimer(time.Minute)
  3. defer timer.Stop()
  4. for {
  5. select {
  6. case <-timer.C: // 过期了
  7. return
  8. case msg := <-messages:
  9. fmt.Println(msg)
  10. // 此if代码块很重要。
  11. if !timer.Stop() {
  12. <-timer.C
  13. }
  14. }
  15. // 必须重置以复用。
  16. timer.Reset(time.Minute)
  17. }
  18. }

注意,此示例中的if代码块用来舍弃一个可能在执行第二个分支代码块的时候发送过来的超时通知。

不正确地使用time.Timer

一个典型的time.Timer的使用已经在上一节中展示了。一些解释:

  • 如果一个Timer值已经过期或者已经被终止(stopped),则相应的Stop方法调用返回false。 在此Timer值尚未终止的时候,Stop方法调用返回false只能意味着此Timer值已经过期。
  • 一个Timer值被终止之后,它的通道字段C最多只能含有一个过期的通知。
  • 在一个Timer终止(stopped)之后并且在重置和重用此Timer值之前,我们应该确保此Timer值中肯定不存在过期的通知。 这就是上一节中的例子中的if代码块的意义所在。

一个*Timer值的Reset方法必须在对应Timer值过期或者终止之后才能被调用; 否则,此Reset方法调用和一个可能的向此Timer值的C通道字段的发送通知操作产生数据竞争。

如果上一节中的例子中的select流程控制代码块中的第一个分支被选中,则这表示相应的Timer值已经过期,所以我们不必终止它。 但是我们必须在第二个分支中通过终止此Timer以检查此Timer中是否存在一个过期的通知。 如果确实有一个过期的通知,我们必须在重用这个Timer之前将此过期的通知取出;否则,此过期的通知将下一个循环步导致在第一个分支立即被选中。

比如,下面这个程序将在运行后大概一秒钟(而不是十秒钟)后退出。 而且此程序存在着潜在的数据竞争。

  1. package main
  2. import (
  3. "fmt"
  4. "time"
  5. )
  6. func main() {
  7. start := time.Now()
  8. timer := time.NewTimer(time.Second/2)
  9. select {
  10. case <-timer.C:
  11. default:
  12. time.Sleep(time.Second) // 此分支被选中的可能性较大
  13. }
  14. timer.Reset(time.Second * 10) // 可能数据竞争
  15. <-timer.C
  16. fmt.Println(time.Since(start)) // 大约1s
  17. }

当一个time.Timer值不再被使用后,我们不必(但是推荐)终止之。

在多个协程中使用同一个time.Timer值比较容易写出不当的并发代码,所以尽量不要跨协程使用一个Timer值。

我们不应该依赖于time.TimerReset方法的返回值。此返回值只要是为了历史兼容性而存在的。