executors

go-zero 中,executors 充当任务池,做多任务缓冲,适用于做批量处理的任务。如:clickhouse 大批量 insertsql batch insert。同时也可以在 go-queue 中看到 executors 【在 queue 里面使用的是 ChunkExecutor ,限定任务提交字节大小】。

所以当你存在以下需求,都可以使用这个组件:

  • 批量提交任务
  • 缓冲一部分任务,惰性提交
  • 延迟任务提交

具体解释之前,先给一个大致的概览图: c42c34e8d33d48ec8a63e56feeae882a

接口设计

executors 包下,有如下几个 executor

Name Margin value
bulkexecutor 达到 maxTasks 【最大任务数】 提交
chunkexecutor 达到 maxChunkSize【最大字节数】提交
periodicalexecutor basic executor
delayexecutor 延迟执行传入的 fn()
lessexecutor

你会看到除了有特殊功能的 delayless ,其余 3 个都是 executor + container 的组合设计:

  1. func NewBulkExecutor(execute Execute, opts ...BulkOption) *BulkExecutor {
  2. // 选项模式:在 go-zero 中多处出现。在多配置下,比较好的设计思路
  3. // https://halls-of-valhalla.org/beta/articles/functional-options-pattern-in-go,54/
  4. options := newBulkOptions()
  5. for _, opt := range opts {
  6. opt(&options)
  7. }
  8. // 1. task container: [execute 真正做执行的函数] [maxTasks 执行临界点]
  9. container := &bulkContainer{
  10. execute: execute,
  11. maxTasks: options.cachedTasks,
  12. }
  13. // 2. 可以看出 bulkexecutor 底层依赖 periodicalexecutor
  14. executor := &BulkExecutor{
  15. executor: NewPeriodicalExecutor(options.flushInterval, container),
  16. container: container,
  17. }
  18. return executor
  19. }

而这个 container是个 interface

  1. TaskContainer interface {
  2. // 把 task 加入 container
  3. AddTask(task interface{}) bool
  4. // 实际上是去执行传入的 execute func()
  5. Execute(tasks interface{})
  6. // 达到临界值,移除 container 中全部的 task,通过 channel 传递到 execute func() 执行
  7. RemoveAll() interface{}
  8. }

由此可见之间的依赖关系:

  • bulkexecutorperiodicalexecutor + bulkContainer
  • chunkexecutorperiodicalexecutor + chunkContainer

[!TIP] 所以你想完成自己的 executor,可以实现 container 的这 3 个接口,再结合 periodicalexecutor 就行

所以回到👆那张图,我们的重点就放在 periodicalexecutor,看看它是怎么设计的?

如何使用

首先看看如何在业务中使用这个组件:

现有一个定时服务,每天固定时间去执行从 mysqlclickhouse 的数据同步:

  1. type DailyTask struct {
  2. ckGroup *clickhousex.Cluster
  3. insertExecutor *executors.BulkExecutor
  4. mysqlConn sqlx.SqlConn
  5. }

初始化 bulkExecutor

  1. func (dts *DailyTask) Init() {
  2. // insertIntoCk() 是真正insert执行函数【需要开发者自己编写具体业务逻辑】
  3. dts.insertExecutor = executors.NewBulkExecutor(
  4. dts.insertIntoCk,
  5. executors.WithBulkInterval(time.Second*3), // 3s会自动刷一次container中task去执行
  6. executors.WithBulkTasks(10240), // container最大task数。一般设为2的幂次
  7. )
  8. }

[!TIP] 额外介绍一下:clickhouse 适合大批量的插入,因为 insert 速度很快,大批量 insert 更能充分利用 clickhouse

主体业务逻辑编写:

  1. func (dts *DailyTask) insertNewData(ch chan interface{}, sqlFromDb *model.Task) error {
  2. for item := range ch {
  3. if r, vok := item.(*model.Task); !vok {
  4. continue
  5. }
  6. err := dts.insertExecutor.Add(r)
  7. if err != nil {
  8. r.Tag = sqlFromDb.Tag
  9. r.TagId = sqlFromDb.Id
  10. r.InsertId = genInsertId()
  11. r.ToRedis = toRedis == constant.INCACHED
  12. r.UpdateWay = sqlFromDb.UpdateWay
  13. // 1. Add Task
  14. err := dts.insertExecutor.Add(r)
  15. if err != nil {
  16. logx.Error(err)
  17. }
  18. }
  19. }
  20. // 2. Flush Task container
  21. dts.insertExecutor.Flush()
  22. // 3. Wait All Task Finish
  23. dts.insertExecutor.Wait()
  24. }

[!TIP] 可能会疑惑为什么要 Flush(), Wait() ,后面会通过源码解析一下

使用上总体分为 3 步:

  • Add():加入 task
  • Flush():刷新 container 中的 task
  • Wait():等待全部 task 执行完成

源码分析

[!TIP] 此处主要分析 periodicalexecutor,因为其他两个常用的 executor 都依赖它

初始化

  1. func New...(interval time.Duration, container TaskContainer) *PeriodicalExecutor {
  2. executor := &PeriodicalExecutor{
  3. commander: make(chan interface{}, 1),
  4. interval: interval,
  5. container: container,
  6. confirmChan: make(chan lang.PlaceholderType),
  7. newTicker: func(d time.Duration) timex.Ticker {
  8. return timex.NewTicker(interval)
  9. },
  10. }
  11. ...
  12. return executor
  13. }
  • commander:传递 tasks 的 channel
  • container:暂存 Add() 的 task
  • confirmChan:阻塞 Add() ,在开始本次的 executeTasks() 会放开阻塞
  • ticker:定时器,防止 Add() 阻塞时,会有一个定时执行的机会,及时释放暂存的 task

Add()

初始化完,在业务逻辑的第一步就是把 task 加入 executor

  1. func (pe *PeriodicalExecutor) Add(task interface{}) {
  2. if vals, ok := pe.addAndCheck(task); ok {
  3. pe.commander <- vals
  4. <-pe.confirmChan
  5. }
  6. }
  7. func (pe *PeriodicalExecutor) addAndCheck(task interface{}) (interface{}, bool) {
  8. pe.lock.Lock()
  9. defer func() {
  10. // 一开始为 false
  11. var start bool
  12. if !pe.guarded {
  13. // backgroundFlush() 会将 guarded 重新置反
  14. pe.guarded = true
  15. start = true
  16. }
  17. pe.lock.Unlock()
  18. // 在第一条 task 加入的时候就会执行 if 中的 backgroundFlush()。后台协程刷task
  19. if start {
  20. pe.backgroundFlush()
  21. }
  22. }()
  23. // 控制maxTask,>=maxTask 将container中tasks pop, return
  24. if pe.container.AddTask(task) {
  25. return pe.container.RemoveAll(), true
  26. }
  27. return nil, false
  28. }

addAndCheck()AddTask() 就是在控制最大 tasks 数,如果超过就执行 RemoveAll() ,将暂存 container 的 tasks pop,传递给 commander ,后面有 goroutine 循环读取,然后去执行 tasks。

backgroundFlush()

开启一个后台协程,对 container 中的 task,不断刷新:

  1. func (pe *PeriodicalExecutor) backgroundFlush() {
  2. // 封装 go func(){}
  3. threading.GoSafe(func() {
  4. ticker := pe.newTicker(pe.interval)
  5. defer ticker.Stop()
  6. var commanded bool
  7. last := timex.Now()
  8. for {
  9. select {
  10. // 从channel拿到 []tasks
  11. case vals := <-pe.commander:
  12. commanded = true
  13. // 实质:wg.Add(1)
  14. pe.enterExecution()
  15. // 放开 Add() 的阻塞,而且此时暂存区也为空。才开始新的 task 加入
  16. pe.confirmChan <- lang.Placeholder
  17. // 真正的执行 task 逻辑
  18. pe.executeTasks(vals)
  19. last = timex.Now()
  20. case <-ticker.Chan():
  21. if commanded {
  22. // 由于select选择的随机性,如果同时满足两个条件同时执行完上面的,此处置反,并跳过本段执行
  23. // https://draveness.me/golang/docs/part2-foundation/ch05-keyword/golang-select/
  24. commanded = false
  25. } else if pe.Flush() {
  26. // 刷新完成,定时器清零。暂存区空了,开始下一次定时刷新
  27. last = timex.Now()
  28. } else if timex.Since(last) > pe.interval*idleRound {
  29. // 既没到maxTask,Flush() err,并且 last->now 时间过长,会再次触发 Flush()
  30. // 只有这置反,才会开启一个新的 backgroundFlush() 后台协程
  31. pe.guarded = false
  32. // 再次刷新,防止漏掉
  33. pe.Flush()
  34. return
  35. }
  36. }
  37. }
  38. })
  39. }

总体两个过程:

  • commander 接收到 RemoveAll() 传递来的 tasks,然后执行,并放开 Add() 的阻塞,得以继续 Add()
  • ticker 到时间了,如果第一步没有执行,则自动 Flush() ,也会去做 task 的执行

Wait()

backgroundFlush() ,提到一个函数:enterExecution()

  1. func (pe *PeriodicalExecutor) enterExecution() {
  2. pe.wgBarrier.Guard(func() {
  3. pe.waitGroup.Add(1)
  4. })
  5. }
  6. func (pe *PeriodicalExecutor) Wait() {
  7. pe.wgBarrier.Guard(func() {
  8. pe.waitGroup.Wait()
  9. })
  10. }

这样列举就知道为什么之前在最后要带上 dts.insertExecutor.Wait(),当然要等待全部的 goroutine task 完成。

思考

在看源码中,思考了一些其他设计上的思路,大家是否也有类似的问题:

  • 在分析 executors 中,会发现很多地方都有 lock

[!TIP] go test 存在竞态,使用加锁来避免这种情况

  • 在分析 confirmChan 时发现,confirmChan 在此次提交才出现,为什么会这么设计?

之前是:wg.Add(1) 是写在 executeTasks() ;现在是:先wg.Add(1),再放开 confirmChan 阻塞 如果 executor func 执行阻塞,Add task 还在进行,因为没有阻塞,可能很快执行到 Executor.Wait(),这时就会出现 wg.Wait()wg.Add() 前执行,这会 panic

具体可以看最新版本的TestPeriodicalExecutor_WaitFast() ,不妨跑在此版本上,就可以重现

总结

剩余还有几个 executors 的分析,就留给大家去看看源码。

总之,整体设计上:

  • 遵循面向接口设计
  • 灵活使用 channelwaitgroup 等并发工具
  • 执行单元+存储单元的搭配使用

go-zero 中还有很多实用的组件工具,用好工具对于提升服务性能和开发效率都有很大的帮助,希望本篇文章能给大家带来一些收获。