sync - 处理同步需求

golang 是一门语言级别支持并发的程序语言。golang 中使用 go 语句来开启一个新的协程。 goroutine 是非常轻量的,除了给它分配栈空间,它所占用的内存空间是微乎其微的。

但当多个 goroutine 同时进行处理的时候,就会遇到比如同时抢占一个资源,某个 goroutine 等待另一个 goroutine 处理完某一个步骤之后才能继续的需求。 在 golang 的官方文档上,作者明确指出,golang 并不希望依靠共享内存的方式进行进程的协同操作。而是希望通过管道 channel 的方式进行。 当然,golang 也提供了共享内存,锁,等机制进行协同操作的包。sync 包就是为了这个目的而出现的。

sync 包中定义了 Locker 结构来代表锁。

  1. type Locker interface {
  2. Lock()
  3. Unlock()
  4. }

并且创造了两个结构来实现 Locker 接口:Mutex 和 RWMutex。

Mutex 就是互斥锁,互斥锁代表着当数据被加锁了之后,除了加锁的程序,其他程序不能对数据进行读操作和写操作。 这个当然能解决并发程序对资源的操作。但是,效率上是个问题。当加锁后,其他程序要读取操作数据,就只能进行等待了。 这个时候就需要使用读写锁。

读写锁分为读锁和写锁,读数据的时候上读锁,写数据的时候上写锁。有写锁的时候,数据不可读不可写。有读锁的时候,数据可读,不可写。 互斥锁就不举例子,读写锁可以看下面的例子:

  1. package main
  2. import (
  3. "sync"
  4. "time"
  5. )
  6. var m *sync.RWMutex
  7. var val = 0
  8. func main() {
  9. m = new(sync.RWMutex)
  10. go read(1)
  11. go write(2)
  12. go read(3)
  13. time.Sleep(5 * time.Second)
  14. }
  15. func read(i int) {
  16. m.RLock()
  17. time.Sleep(1 * time.Second)
  18. println("val: ", val)
  19. time.Sleep(1 * time.Second)
  20. m.RUnlock()
  21. }
  22. func write(i int) {
  23. m.Lock()
  24. val = 10
  25. time.Sleep(1 * time.Second)
  26. m.Unlock()
  27. }
  28. 返回:
  29. val: 0
  30. val: 10

但是如果我们把 read 中的 RLock 和 RUnlock 两个函数给注释了,就返回了 :

  1. val: 10
  2. val: 10

这个就是由于读的时候没有加读锁,在准备读取 val 的时候,val 被 write 函数进行修改了。

临时对象池

当多个 goroutine 都需要创建同一个对象的时候,如果 goroutine 过多,可能导致对象的创建数目剧增。 而对象又是占用内存的,进而导致的就是内存回收的 GC 压力徒增。造成“并发大-占用内存大- GC 缓慢-处理并发能力降低-并发更大”这样的恶性循环。 在这个时候,我们非常迫切需要有一个对象池,每个 goroutine 不再自己单独创建对象,而是从对象池中获取出一个对象(如果池中已经有的话)。 这就是 sync.Pool 出现的目的了。

sync.Pool 的使用非常简单,提供两个方法 :Get 和 Put 和一个初始化回调函数 New。

看下面这个例子(取自gomemcache):

  1. // keyBufPool returns []byte buffers for use by PickServer's call to
  2. // crc32.ChecksumIEEE to avoid allocations. (but doesn't avoid the
  3. // copies, which at least are bounded in size and small)
  4. var keyBufPool = sync.Pool{
  5. New: func() interface{} {
  6. b := make([]byte, 256)
  7. return &b
  8. },
  9. }
  10. func (ss *ServerList) PickServer(key string) (net.Addr, error) {
  11. ss.mu.RLock()
  12. defer ss.mu.RUnlock()
  13. if len(ss.addrs) == 0 {
  14. return nil, ErrNoServers
  15. }
  16. if len(ss.addrs) == 1 {
  17. return ss.addrs[0], nil
  18. }
  19. bufp := keyBufPool.Get().(*[]byte)
  20. n := copy(*bufp, key)
  21. cs := crc32.ChecksumIEEE((*bufp)[:n])
  22. keyBufPool.Put(bufp)
  23. return ss.addrs[cs%uint32(len(ss.addrs))], nil
  24. }

这是实际项目中的一个例子,这里使用 keyBufPool 的目的是为了让 crc32.ChecksumIEEE 所使用的[]bytes 数组可以重复使用,减少 GC 的压力。

但是这里可能会有一个问题,我们没有看到 Pool 的手动回收函数。 那么是不是就意味着,如果我们的并发量不断增加,这个 Pool 的体积会不断变大,或者一直维持在很大的范围内呢?

答案是不会的,sync.Pool 的回收是有的,它是在系统自动 GC 的时候,触发 pool.go 中的 poolCleanup 函数。

  1. func poolCleanup() {
  2. for i, p := range allPools {
  3. allPools[i] = nil
  4. for i := 0; i < int(p.localSize); i++ {
  5. l := indexLocal(p.local, i)
  6. l.private = nil
  7. for j := range l.shared {
  8. l.shared[j] = nil
  9. }
  10. l.shared = nil
  11. }
  12. p.local = nil
  13. p.localSize = 0
  14. }
  15. allPools = []*Pool{}
  16. }

这个函数会把 Pool 中所有 goroutine 创建的对象都进行销毁。

那这里另外一个问题也凸显出来了,很可能我上一步刚往 pool 中 PUT 一个对象之后,下一步 GC 触发,导致 pool 的 GET 函数获取不到 PUT 进去的对象。 这个时候,GET 函数就会调用 New 函数,临时创建出一个对象,并存放到 pool 中。

根据以上结论,sync.Pool 其实不适合用来做持久保存的对象池(比如连接池)。它更适合用来做临时对象池,目的是为了降低 GC 的压力。

连接池性能测试

  1. package main
  2. import (
  3. "sync"
  4. "testing"
  5. )
  6. var bytePool = sync.Pool{
  7. New: newPool,
  8. }
  9. func newPool() interface{} {
  10. b := make([]byte, 1024)
  11. return &b
  12. }
  13. func BenchmarkAlloc(b *testing.B) {
  14. for i := 0; i < b.N; i++ {
  15. obj := make([]byte, 1024)
  16. _ = obj
  17. }
  18. }
  19. func BenchmarkPool(b *testing.B) {
  20. for i := 0; i < b.N; i++ {
  21. obj := bytePool.Get().(*[]byte)
  22. _ = obj
  23. bytePool.Put(obj)
  24. }
  25. }

文件目录下执行 go test -bench .

  1. E:\MyGo\sync>go test -bench .
  2. testing: warning: no tests to run
  3. PASS
  4. BenchmarkAlloc-4 50000000 39.3 ns/op
  5. BenchmarkPool-4 50000000 25.4 ns/op
  6. ok _/E_/MyGo/sync 3.345s

通过性能测试可以清楚地看到,使用连接池消耗的 CPU 时间远远小于每次手动分配内存。

Once

有的时候,我们多个 goroutine 都要过一个操作,但是这个操作我只希望被执行一次,这个时候 Once 就上场了。比如下面的例子 :

  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. )
  7. func main() {
  8. var once sync.Once
  9. onceBody := func() {
  10. fmt.Println("Only once")
  11. }
  12. for i := 0; i < 10; i++ {
  13. go func() {
  14. once.Do(onceBody)
  15. }()
  16. }
  17. time.Sleep(3e9)
  18. }

只会打出一次 “Only once”。

WaitGroup 和 Cond

一个 goroutine 需要等待一批 goroutine 执行完毕以后才继续执行,那么这种多线程等待的问题就可以使用 WaitGroup 了。

  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. )
  6. func main() {
  7. wp := new(sync.WaitGroup)
  8. wp.Add(10);
  9. for i := 0; i < 10; i++ {
  10. go func() {
  11. fmt.Println("done ", i)
  12. wp.Done()
  13. }()
  14. }
  15. wp.Wait()
  16. fmt.Println("wait end")
  17. }

还有个 sync.Cond 是用来控制某个条件下,goroutine 进入等待时期,等待信号到来,然后重新启动。比如:

  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. )
  7. func main() {
  8. locker := new(sync.Mutex)
  9. cond := sync.NewCond(locker)
  10. done := false
  11. cond.L.Lock()
  12. go func() {
  13. time.Sleep(2e9)
  14. done = true
  15. cond.Signal()
  16. }()
  17. if (!done) {
  18. cond.Wait()
  19. }
  20. fmt.Println("now done is ", done);
  21. }

这里当主 goroutine 进入 cond.Wait 的时候,就会进入等待,当从 goroutine 发出信号之后,主 goroutine 才会继续往下面走。

sync.Cond 还有一个 BroadCast 方法,用来通知唤醒所有等待的 gouroutine。

  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. )
  7. var locker = new(sync.Mutex)
  8. var cond = sync.NewCond(locker)
  9. func test(x int) {
  10. cond.L.Lock() // 获取锁
  11. cond.Wait() // 等待通知 暂时阻塞
  12. fmt.Println(x)
  13. time.Sleep(time.Second * 1)
  14. cond.L.Unlock() // 释放锁,不释放的话将只会有一次输出
  15. }
  16. func main() {
  17. for i := 0; i < 40; i++ {
  18. go test(i)
  19. }
  20. fmt.Println("start all")
  21. cond.Broadcast() // 下发广播给所有等待的 goroutine
  22. time.Sleep(time.Second * 60)
  23. }

主 gouroutine 开启后,可以创建多个从 gouroutine,从 gouroutine 获取锁后,进入 cond.Wait 状态,当主 gouroutine 执行完任务后,通过 BroadCast 广播信号。 处于 cond.Wait 状态的所有 gouroutine 收到信号后将全部被唤醒并往下执行。需要注意的是,从 gouroutine 执行完任务后,需要通过 cond.L.Unlock 释放锁, 否则其它被唤醒的 gouroutine 将没法继续执行。 通过查看 cond.Wait 的源码就明白为什么需要需要释放锁了

  1. func (c *Cond) Wait() {
  2. c.checker.check()
  3. if raceenabled {
  4. raceDisable()
  5. }
  6. atomic.AddUint32(&c.waiters, 1)
  7. if raceenabled {
  8. raceEnable()
  9. }
  10. c.L.Unlock()
  11. runtime_Syncsemacquire(&c.sema)
  12. c.L.Lock()
  13. }

Cond.Wait 会自动释放锁等待信号的到来,当信号到来后,第一个获取到信号的 Wait 将继续往下执行并从新上锁,如果不释放锁, 其它收到信号的 gouroutine 将阻塞无法继续执行。 由于各个 Wait 收到信号的时间是不确定的,因此每次的输出顺序也都是随机的。

导航

  • 目录
  • 上一节:buildin
  • 下一节:暂未确定