并发

V语言并发的思路和语法跟go一样,甚至关键字也一样:go/chan/select

目前已经实现了一个早期版本的并发,可以初步使用

go可以添加在函数调用,方法调用,匿名函数调用前,即可创建并发任务单元

  1. module main
  2. const (
  3. num_iterations = 10000
  4. )
  5. fn do_send(ch chan int) {
  6. for i in 0 .. num_iterations {
  7. ch <- i //写入channel,也叫发送消息
  8. }
  9. }
  10. fn main() {
  11. ch := chan int{cap: 1000}
  12. go do_send(ch) //在函数调用前使用go关键字,即可创建并发任务单元
  13. mut sum := i64(0)
  14. for _ in 0 .. num_iterations {
  15. sum += <-ch //读取channel,也叫接收消息
  16. }
  17. println(sum)
  18. }

声明channel变量

channel变量声明时,可以指定cap,cap表示缓冲区大小/容量,指定后不可改变,如果不指定,默认为0

len表示当前被使用的缓冲大小,len不能在声明时指定,初始值为0,只读,根据写入/读取channel自动改变,写入增加len,读取减少len

没有指定cap,就是同步模式,同步模式下,发送和接收双方配对,然后读写同时完成,如果接收之前,还没有发送,就会出现阻塞

有指定cap,就是异步模式,异步模式下,在缓冲大小的范围内,发送方不用等待接收方,数据写入后,继续往下执行,不会出现阻塞,如果超出了缓冲大小范围,发送方还是要阻塞等待接收方接收数据

channel从底层实现上来说,是一个队列,通过push()把数据写入到队列中,通过pop()把数据读取出来,

  1. fn main() {
  2. ch := chan int{cap: 1000} //声明一个channel变量,类型为int,缓冲区大小为1000,即异步channel
  3. println(ch.len) // 0
  4. println(ch.cap) // 1000
  5. ch2 := chan int{} //不指定cap,默认为0,即同步channel
  6. println(ch2.len) // 0
  7. println(ch2.cap) // 0
  8. ch <- 1
  9. println(ch.len) // 1
  10. println(ch.cap) // 1000
  11. }

读取channel/接收消息

  1. fn main() {
  2. ch := chan int{cap: 100}
  3. sum := <-ch //读取channel
  4. println(sum)
  5. //也可以使用try_pop()
  6. //尝试读channel,把channel的值,读取到i变量中,并返回ChanState枚举:.success/.not_ready/.colsed
  7. i := 0
  8. res := ch.try_pop(&i)
  9. println(res)
  10. }

写入channel/发送消息

  1. fn main() {
  2. ch := chan int{cap: 100}
  3. ch <- 2 //写入channel
  4. //也可以使用try_push()
  5. //尝试写channel,把i的值写入到channel中,并返回ChanState枚举:.success/.not_ready/.colsed
  6. i := 3
  7. res := ch.try_push(&i)
  8. println(res)
  9. }

go表达式

除了使用标准的chanel和waitgroup方式外,还可以使用go表达式来简化并发代码,go表达式更像是一种并发语法糖/简化版.

  1. module main
  2. import time
  3. //无返回值的函数
  4. fn do_something() {
  5. println('start do_something...')
  6. time.sleep(2*time.second) //休眠2秒,模拟并发持续的时间
  7. println('end do_something')
  8. }
  9. //有返回值的函数
  10. fn add(x int, y int) int {
  11. println('add并发开始...')
  12. time.sleep(4*time.second) //休眠4秒,模拟并发持续的时间
  13. println('add并发完成')
  14. return x + y
  15. }
  16. fn main() {
  17. //并发调用无返回值的函数
  18. g:=go do_something()
  19. //并发调用带有返回值的函数,完成后返回
  20. g2 := go add(3, 2)
  21. //这段期间主线程可以继续执行别的代码...
  22. g.wait() //阻塞等待线程完成
  23. result := g2.wait() //阻塞等待线程结果返回
  24. println(result)
  25. }

thread线程数组

go表达式实现了并发执行后,然后返回单个结果给主线程

而thread线程数组实现了并发执行多个线程,然后返回结果数组给主线程,用起来挺简洁明了的

  1. module main
  2. fn f(x f64) f64 {
  3. y := x * x
  4. return y
  5. }
  6. fn g(shared a []int, i int) {
  7. lock a { //读写锁定,共享变量a
  8. a[i] *= a[i] + 1
  9. }
  10. }
  11. fn main() {
  12. mut r := []thread f64{cap: 10} //轻量级线程数组,每一个线程的返回值类型是f64
  13. for i in 0 .. 10 {
  14. r << go f(f64(i) + 0.5)
  15. }
  16. x := r.wait() //线程数组有一个内置的wait方法,等待线程数组的所有线程全部运行完毕,并返回结果数组
  17. println(x) //[0.25, 2.25, 6.25, 12.25, 20.25, 30.25, 42.25, 56.25, 72.25, 90.25]
  18. shared a := [2 3 5 7 11 13 17]
  19. t := [
  20. go g(shared a, 0)
  21. go g(shared a, 3)
  22. go g(shared a, 6)
  23. go g(shared a, 2)
  24. go g(shared a, 1)
  25. go g(shared a, 5)
  26. go g(shared a, 4)
  27. ]
  28. println('threads started')
  29. t.wait()
  30. rlock a {
  31. println(a) //[6, 12, 30, 56, 132, 182, 306]
  32. }
  33. }

错误处理

可以在读写channel中增加or代码块,实现错误处理

  1. module main
  2. const n = 1000
  3. const c = 100
  4. fn f(ch chan int) {
  5. for _ in 0 .. n {
  6. _ := <-ch
  7. }
  8. ch.close()
  9. }
  10. fn main() {
  11. ch := chan int{cap: c}
  12. go f(ch)
  13. mut j := 0
  14. for {
  15. ch <- j or { //错误处理
  16. break
  17. }
  18. // ch <-j ? //向上抛转错误
  19. j++
  20. }
  21. }

go表达式和go线程数组也支持同样的错误处理方式

  1. module main
  2. import time
  3. //无返回值的函数
  4. fn do_something() ? { //函数带错误处理
  5. println('start do_something...')
  6. time.sleep(2*time.second) //休眠2秒,模拟并发持续的时间
  7. println('end do_something')
  8. }
  9. //有返回值的函数
  10. fn add(x int, y int) ?int { //函数带错误处理
  11. println('add并发开始...')
  12. time.sleep(4*time.second) //休眠4秒,模拟并发持续的时间
  13. println('add并发完成')
  14. return x + y
  15. }
  16. fn main() {
  17. //并发调用无返回值的函数
  18. g :=go do_something()
  19. //并发调用带有返回值的函数,完成后返回
  20. g2 := go add(3, 2)
  21. //这段期间主线程可以继续执行别的代码...
  22. g.wait() or { panic(err) } //支持错误处理的并发
  23. result := g2.wait() or { panic(err) } //支持错误处理的并发
  24. println(result)
  25. }

if条件语句读取chan

  1. fn main() {
  2. mut res := []f64{cap:3}
  3. ch := chan f64{cap: 10}
  4. ch <- 6.75
  5. ch <- -3.25
  6. ch.close()
  7. for _ in 0 .. 3 {
  8. //读取chan成功,则if条件表达式返回true;读取chan失败,chan被关闭,if条件表达式返回false
  9. if x:= <-ch {
  10. res << x
  11. } else {
  12. res << -37.5
  13. }
  14. }
  15. println(res) // 返回[6.75, -3.25, -37.5]
  16. }

关闭channel

  1. ch.close()

关闭channel或者写入channel都会解除阻塞

关闭channel以后,使用try_push()和try_pop函数都会返回.closed枚举

select语句

select语句可以同时监听多个channel的读写事件,并且可以进行监听的超时处理

一般都会结合for循环使用,实现持续监听

  1. import time
  2. fn main() {
  3. ch1 := chan int{}
  4. ch2 := chan int{}
  5. go send(ch1, ch2)
  6. mut x := 0
  7. mut y := 0
  8. for {
  9. select { // select可以同时监听多个channel的读写事件
  10. x = <-ch1 { // 监听读channel
  11. println('$x')
  12. }
  13. y = <-ch2 {
  14. println('$y')
  15. }
  16. > 2 * time.second { // 监听的超时处理
  17. break
  18. }
  19. }
  20. }
  21. }
  22. fn send(ch1 chan int, ch2 chan int) {
  23. ch1 <- 1
  24. ch2 <- 2
  25. ch1 <- 3
  26. ch2 <- 4
  27. ch1 <- 5
  28. ch2 <- 6
  29. }

if select语句

  1. fn main() {
  2. ch1 := chan int{}
  3. ch2 := chan int{}
  4. go send(ch1, ch2)
  5. mut x := 0
  6. mut y := 0
  7. // ch1.close()
  8. // ch2.close()
  9. if select {
  10. x = <-ch1 {
  11. println('from x')
  12. }
  13. y = <-ch2 {
  14. println('from y')
  15. }
  16. } { // 如果select中的所有channel未关闭,则执行if代码块
  17. println('from if')
  18. } else { // 如果select中的所有channel都关闭,则执行else代码块
  19. println('from else')
  20. }
  21. }
  22. fn send(ch1 chan int, ch2 chan int) {
  23. ch1 <- 1
  24. ch2 <- 2
  25. println('from send')
  26. }

for select语句

for select语句主要在并发中使用,用来循环监听多个chanel

  1. fn main() {
  2. ch1 := chan int{}
  3. ch2 := chan f64{}
  4. go do_send(ch1, ch2)
  5. mut a := 0
  6. mut b := 0
  7. for select { // 循环监听channel的写入,写入后执行for代码块,直到所有监听的channel都已关闭
  8. x := <-ch1 {
  9. a += x
  10. }
  11. y := <-ch2 {
  12. a += int(y)
  13. }
  14. } { // for代码块
  15. b++ // 写入3次
  16. println('${b}. event')
  17. }
  18. println(a)
  19. println(b)
  20. }
  21. fn do_send(ch1 chan int, ch2 chan f64) {
  22. ch1 <- 3
  23. ch2 <- 5.0
  24. ch2.close()
  25. ch1 <- 2
  26. ch1.close()
  27. }

主进程阻塞等待

一般来说,主进程执行完毕后,不会等待其他子线程的结果,就直接退出返回,其他子线程也随着终止

可以在主进程末尾增加阻塞等待子线程的运行结果

  1. module main
  2. import time
  3. fn main() {
  4. ch := chan int{} //创建同步channel
  5. go fn (c chan int) {
  6. time.sleep(3*time.second)
  7. println('goroutine done')
  8. c.close() //关闭子线程或者写channel
  9. // c <- 333
  10. }(ch)
  11. println('main...')
  12. i := <-ch // 主线程阻塞,等待子线程返回数据或者关闭channel
  13. println('main exit...$i')
  14. }

泛型函数/方法

除了使用标准函数/方法作为go的并发单元,泛型函数/方法也可以

  1. module main
  2. import time
  3. struct Point {
  4. }
  5. fn (p Point) add<T>(c chan int, x T, y T) T {
  6. println('from generic method')
  7. c <- 1
  8. return x + y
  9. }
  10. fn add<T>(c chan int, x T, y T) T {
  11. println('from generic function')
  12. c <- 2
  13. return x + y
  14. }
  15. fn main() {
  16. ch := chan int{}
  17. //泛型函数
  18. go add<int>(ch, 1, 3)
  19. go add<f64>(ch, 1.1, 3.3)
  20. //泛型方法
  21. p := Point{}
  22. go p.add<int>(ch, 2, 4)
  23. go p.add<f64>(ch, 2.2, 4.4)
  24. i := <-ch
  25. println(i)
  26. time.sleep(1*time.second)
  27. }

线程之间的变量共享/锁定

可以使用shared/lock/rlock来实现

多个线程之间,可以通过定义shared类型的变量,来实现线程间共享,所有线程都可以读写该变量.

当某个线程要进行读写共享变量时,为了防止线程之间的数据竞争:

在读写之前,要使用lock代码块(读写锁)来锁定共享变量

在只读之前,要使用rlock代码块(只读锁)来锁定共享变量

共享变量可以是基本类型,array,map,struct类型

  1. module main
  2. import time
  3. struct St {
  4. mut:
  5. x f64
  6. }
  7. fn f(x int, y f64, shared s St,shared a []string, shared m map[string]string) {
  8. time.sleep(50*time.second)
  9. //在这个线程中,如果要对共享变量进行读写,使用lock代码块来锁定,对于读写锁,其他线程只能阻塞等待,不能读写该变量,退出代码块后,自动解锁
  10. lock s,a,m { //可以同时对多个共享变量进行锁定
  11. s.x = x * y
  12. println(s.x)
  13. a[0]='abc'
  14. unsafe {
  15. m['a']='aa'
  16. }
  17. println(a[0])
  18. println(m['a'])
  19. }
  20. return
  21. }
  22. fn main() {
  23. shared s := St{} // struct共享变量
  24. shared a := []string{len:1} // 数组共享变量
  25. shared m := map[string]string // 字典共享变量
  26. unsafe {
  27. m['a']='aa'
  28. }
  29. r := go f(3, 4.0, shared s,shared a, shared m) //把共享变量传递给另一个线程,默认传递引用
  30. r.wait()
  31. //在这个线程中,如果只是要读共享变量,使用rlock代码来锁定,对于只读锁,其他线程可以读该变量,不能修改,退出代码块后,自动解锁
  32. rlock s {
  33. println(s.x)
  34. }
  35. }

函数返回shared类型

  1. struct St {
  2. mut:
  3. x f64
  4. }
  5. fn f() shared St { //函数可以返回shared的变量,用于线程之间的读写锁
  6. shared x := St{ x: 3.25 }
  7. return x
  8. }
  9. fn g(good bool) ?shared St { //函数可以返回shared的变量,用于线程之间的读写锁,结合错误处理
  10. if !good {
  11. return error('no shared St created')
  12. }
  13. shared x := St{ x: 12.75 }
  14. return x
  15. }
  16. fn shared_opt_propagate(good bool) ?f64 {
  17. shared x := g(good) ?
  18. ret := rlock x { x.x }
  19. return ret
  20. }
  21. fn main() {
  22. shared x := f()
  23. val := rlock x { x.x }
  24. println(val)
  25. res := shared_opt_propagate(true) or { 1.25 }
  26. println(res)
  27. }

读写锁表达式

  1. struct St {
  2. mut:
  3. i int
  4. }
  5. fn main() {
  6. shared xx := St{ i: 173 }
  7. shared y := St{ i: -57 }
  8. mut m := 0
  9. m = lock y { y.i } //读写锁表达式
  10. n := rlock xx { xx.i } //读表达式
  11. println(m)
  12. println(n)
  13. }

sync标准模块

Channel

  1. //使用sync模块创建channel
  2. mut ch := sync.new_channel<int>(0) //泛型风格
  3. ch.cap //返回channel的缓冲区大小
  4. ch.len() //返回channel当前已使用的缓冲大小
  5. ch.push(&i) //写channel,一定要使用指针引用
  6. ch.pop(&i) //读channel,一定要使用指针引用,返回bool类型,true读取成功,false读取失败
  7. ch.try_push() //尝试写channel,返回ChanState枚举:.success/.not_ready/.colsed
  8. ch.try_pop() //尝试读channel,返回ChanState枚举:.success/.not_ready/.colsed
  9. ch.close() //关闭channel
  10. //遍历channel
  11. sync.channel_select()

WaitGroup

如果要等待多个并发任务结束,可以使用WaitGroup

通过设定计数器,让每一个线程开始时递增计数,退出时递减计数,直到计数归零时,解除阻塞

  1. mut wg:=sync.new_waitgroup() //创建WaitGroup
  2. wg.add(int) //递增计数
  3. wg.done() //递减计数
  4. wg.wait() //阻塞等待,直到计数归零
  1. module main
  2. import sync
  3. import time
  4. fn main() {
  5. mut wg := sync.new_waitgroup()
  6. for i := 0; i < 10; i++ {
  7. wg.add(1) //递增计数
  8. go fn (i int, mut w sync.WaitGroup) {
  9. defer {
  10. w.done() //完成后递减计数
  11. }
  12. time.sleep(1*time.second)
  13. println('goroutine $i done')
  14. }(i, mut wg)
  15. }
  16. println('main start...')
  17. wg.wait() //阻塞等待,直到计数器归零
  18. println('main end...')
  19. }

输出:

  1. main start...
  2. goroutine 2 done
  3. goroutine 0 done
  4. goroutine 1 done
  5. goroutine 3 done
  6. goroutine 4 done
  7. goroutine 5 done
  8. goroutine 6 done
  9. goroutine 8 done
  10. goroutine 7 done
  11. goroutine 9 done
  12. main end...

更多参考代码可以查看: vlib/sync