Introduction

While goroutine in Go is relatively lightweight compared to system threads (with an initial stack size of only 2KB and supports dynamic expansion), threads started using languages such as Java or C++ are generally kernel-mode threads occupying about 4MB of memory. Assuming our server CPU has 4GB of memory, it’s clear that the total number of concurrent kernel-mode threads is limited to about 1024. In contrast, the number of goroutines in Go can reach 4*1024*1024/2=2 million. This illustrates why Go naturally supports high concurrency.

Pain Points

Resource Consumption of Coroutine Execution

However, under high concurrency, the frequent creation and destruction of goroutine can be a performance bottleneck and create pressure on GC. Reusing goroutine to reduce the overhead of creation/destruction is the purpose of grpool, which pools goroutine. For example, for 1 million tasks, creating and destroying 1 million goroutine is required, but with grpool, only a few ten thousand goroutine might be needed to execute all tasks effectively.

Tests show that goroutine pooling offers little improvement in execution efficiency for business logic (reducing execution time/CPU usage). It might even be slower than native goroutine execution because the scheduling of pooled goroutines is not as efficient as the underlying Go scheduler. However, due to the reuse design, memory usage decreases significantly with pooling.

Large Number of Coroutines Affect Global Scheduling

Some business modules need to dynamically create coroutines to execute tasks, such as asynchronous collection tasks and metric calculation tasks. These tasks are not the core logic of the service but can lead to an explosion in coroutines in extreme cases, affecting the global scheduling of the Go engine and causing significant service execution delays.

For example, an asynchronous collection task might execute every 5 seconds, creating 100 coroutines each time to collect data from different targets. If network delays occur, tasks from previous executions might not finish before new coroutines are created, potentially causing an explosion in coroutines and affecting global service execution. In such scenarios, pooling techniques can be used to execute tasks quantitatively. After a certain number of tasks accumulate, subsequent tasks should block. For example, if the maximum number of tasks in a pool is set to 10,000, execution blocks when this limit is exceeded, but it does not delay global service execution.

Concept Introduction

Pool

Goroutine pool used to manage several reusable goroutine resources.

Job

Tasks waiting for execution added to the pool’s task queue. A Job is a method of type Func and can only be obtained and executed by one Worker. Func is defined as:

  1. type Func func(ctx context.Context)

Worker

A goroutine in the pool involved in task execution. A Worker can execute several Jobs until there are no more Jobs waiting in the queue.

Usage Introduction

Usage:

  1. import "github.com/gogf/gf/v2/os/grpool"

Scenarios:

Managing a large number of asynchronous tasks, reusing asynchronous coroutines, and reducing memory usage.

Interface Documentation:

  1. func Add(ctx context.Context, f Func) error
  2. func AddWithRecover(ctx context.Context, userFunc Func, recoverFunc RecoverFunc) error
  3. func Jobs() int
  4. func Size() int
  5. func New(limit ...int) *Pool
  6. func (p *Pool) Add(ctx context.Context, f Func) error
  7. func (p *Pool) AddWithRecover(ctx context.Context, userFunc Func, recoverFunc RecoverFunc) error
  8. func (p *Pool) Cap() int
  9. func (p *Pool) Close()
  10. func (p *Pool) IsClosed() bool
  11. func (p *Pool) Jobs() int
  12. func (p *Pool) Size() int

Create a goroutine pool object using grpool.New. The parameter limit is optional and used to limit the number of worker goroutine in the pool, with no limit by default. Tasks can be continuously added to the pool without restriction, but the number of worker goroutine can be limited. Use Size() to check the current number of worker goroutine and Jobs() to check the current number of pending tasks in the pool.

For convenience, the grpool package provides a default goroutine pool, which does not limit the number of goroutine, and tasks can be added directly to the default pool via grpool.Add, with task parameters required to be functions/methods of type func().

The most common question about this module is how to pass parameters to tasks within grpool from the outside. Please see example 2 for details.

Usage Examples

Using the Default goroutine Pool, Restrict 100 goroutine to Execute 1000 Tasks

  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/gogf/gf/v2/os/gctx"
  6. "github.com/gogf/gf/v2/os/grpool"
  7. "github.com/gogf/gf/v2/os/gtimer"
  8. "time"
  9. )
  10. var (
  11. ctx = gctx.New()
  12. )
  13. func job(ctx context.Context) {
  14. time.Sleep(1 * time.Second)
  15. }
  16. func main() {
  17. pool := grpool.New(100)
  18. for i := 0; i < 1000; i++ {
  19. pool.Add(ctx, job)
  20. }
  21. fmt.Println("worker:", pool.Size())
  22. fmt.Println(" jobs:", pool.Jobs())
  23. gtimer.SetInterval(ctx, time.Second, func(ctx context.Context) {
  24. fmt.Println("worker:", pool.Size())
  25. fmt.Println(" jobs:", pool.Jobs())
  26. fmt.Println()
  27. })
  28. select {}
  29. }

The task function in this program performs sleep 1 second, which clearly demonstrates the function of limiting goroutine count. A gtime.SetInterval timer is used to print out the number of work goroutine and pending tasks every second.

Asynchronous Parameter Passing: A Common Mistake for Beginners

This example is not valid in go versions ≥ 1.22, as the loop variable trap no longer exists post go 1.22.

  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/gogf/gf/v2/os/gctx"
  6. "github.com/gogf/gf/v2/os/grpool"
  7. "sync"
  8. )
  9. var (
  10. ctx = gctx.New()
  11. )
  12. func main() {
  13. wg := sync.WaitGroup{}
  14. for i := 0; i < 10; i++ {
  15. wg.Add(1)
  16. grpool.Add(ctx, func(ctx context.Context) {
  17. fmt.Println(i)
  18. wg.Done()
  19. })
  20. }
  21. wg.Wait()
  22. }

The goal of this code is to sequentially print numbers from 0-9, but the output is:

  1. 10
  2. 10
  3. 10
  4. 10
  5. 10
  6. 10
  7. 10
  8. 10
  9. 10

Why does this happen? This behavior occurs both when using the go keyword and grpool for execution. The reason is that when the function is registered for asynchronous execution, it hasn’t started executing (at registration time, only the memory address of the variable i is saved in the goroutine’s stack). When it does start executing, it reads the variable i‘s value, which by then has incremented to 10. Knowing the reason, the solution is simple: pass the value of i at the time of registering for asynchronous execution; alternatively, assign the current value of i to a temporary variable and use this in the function instead of directly using i.

Revised example code:

1) Using the go keyword

  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. )
  6. func main() {
  7. wg := sync.WaitGroup{}
  8. for i := 0; i < 10; i++ {
  9. wg.Add(1)
  10. go func(v int){
  11. fmt.Println(v)
  12. wg.Done()
  13. }(i)
  14. }
  15. wg.Wait()
  16. }

The output is:

  1. 9
  2. 3
  3. 4
  4. 5
  5. 6
  6. 7
  7. 8
  8. 1
  9. 2

Note, asynchronous execution doesn’t guarantee the order of execution matches the function registration order; similarly in the following examples.

2) Using a temporary variable

  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/gogf/gf/v2/os/gctx"
  6. "github.com/gogf/gf/v2/os/grpool"
  7. "sync"
  8. )
  9. var (
  10. ctx = gctx.New()
  11. )
  12. func main() {
  13. wg := sync.WaitGroup{}
  14. for i := 0; i < 10; i++ {
  15. wg.Add(1)
  16. v := i
  17. grpool.Add(ctx, func(ctx context.Context) {
  18. fmt.Println(v)
  19. wg.Done()
  20. })
  21. }
  22. wg.Wait()
  23. }

The output is:

  1. 0
  2. 1
  3. 2
  4. 3
  5. 4
  6. 5
  7. 6
  8. 7
  9. 8

Here, when registering a task with grpool, the registration method is func(ctx context.Context), so it cannot directly register the value of i during task registration (avoid passing business parameters via ctx if possible). Therefore, use a temporary variable to pass the current value of i.

Automatically Catch goroutine Errors: AddWithRecover

AddWithRecover pushes a new task into the pool with the specified recovery function. If there is a panic during the execution of userFunc, the optional Recovery Func is called. If no Recovery Func is provided or it’s set to nil, then panic from userFunc is ignored. The task is executed asynchronously.

  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/gogf/gf/v2/container/garray"
  6. "github.com/gogf/gf/v2/os/gctx"
  7. "github.com/gogf/gf/v2/os/grpool"
  8. "time"
  9. )
  10. var (
  11. ctx = gctx.New()
  12. )
  13. func main() {
  14. array := garray.NewArray(true)
  15. grpool.AddWithRecover(ctx, func(ctx context.Context) {
  16. array.Append(1)
  17. array.Append(2)
  18. panic(1)
  19. }, func(err error) {
  20. array.Append(1)
  21. })
  22. grpool.AddWithRecover(ctx, func(ctx context.Context) {
  23. panic(1)
  24. array.Append(1)
  25. })
  26. time.Sleep(500 * time.Millisecond)
  27. fmt.Print(array.Len())
  28. }

Performance Test: grpool vs Native goroutine

1) grpool

  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/gogf/gf/v2/os/gctx"
  6. "github.com/gogf/gf/v2/os/grpool"
  7. "github.com/gogf/gf/v2/os/gtime"
  8. "sync"
  9. "time"
  10. )
  11. var (
  12. ctx = gctx.New()
  13. )
  14. func main() {
  15. start := gtime.TimestampMilli()
  16. wg := sync.WaitGroup{}
  17. for i := 0; i < 10000000; i++ {
  18. wg.Add(1)
  19. grpool.Add(ctx, func(ctx context.Context) {
  20. time.Sleep(time.Millisecond)
  21. wg.Done()
  22. })
  23. }
  24. wg.Wait()
  25. fmt.Println(grpool.Size())
  26. fmt.Println("time spent:", gtime.TimestampMilli() - start)
  27. }

2) goroutine

  1. package main
  2. import (
  3. "fmt"
  4. "github.com/gogf/gf/v2/os/gtime"
  5. "sync"
  6. "time"
  7. )
  8. func main() {
  9. start := gtime.TimestampMilli()
  10. wg := sync.WaitGroup{}
  11. for i := 0; i < 10000000; i++ {
  12. wg.Add(1)
  13. go func() {
  14. time.Sleep(time.Millisecond)
  15. wg.Done()
  16. }()
  17. }
  18. wg.Wait()
  19. fmt.Println("time spent:", gtime.TimestampMilli() - start)
  20. }

3) Comparison of Results

The test results are the averages of three runs for both programs.

  1. grpool:
  2. goroutine count: 847313
  3. memory spent: ~2.1 G
  4. time spent: 37792 ms
  5. goroutine:
  6. goroutine count: 1000W
  7. memory spent: ~4.8 GB
  8. time spent: 27085 ms

It’s clear that after pooling, the number of goroutines and memory usage for executing the same number of tasks have reduced significantly. CPU time consumption is also reasonably acceptable.