Delay Queue

Overview

Delay queue is a special queue whose elements can only be consumed after the specified delay has arrived.Delayed queue implementation usually relies on a scheduled task, which regularly scan elements in the queue, move expired elements to another queue, from which consumers consume.

In go-zero we use go-queue to achieve delay queue

Task Targets

  • Learn about the basic usage of go-queue
  • Learn how to use the delay queue in go-zero

Preparing

Producer

dq Producer is simple and needs only beanstalkd addresses, tube can create a producer object.

  1. type Beanstalk struct {
  2. Endpoint string // beanstalkd server address
  3. Tube string // tube name
  4. }
  5. NewProducer(beanstalks []Beanstalk) Producer

Code Example

  1. package main
  2. import (
  3. "fmt"
  4. "strconv"
  5. "time"
  6. "github.com/zeromicro/go-queue/dq"
  7. )
  8. func main() {
  9. producer := dq.NewProducer([]dq.Beanstalk{
  10. {
  11. Endpoint: "localhost:11300",
  12. Tube: "tube",
  13. },
  14. {
  15. Endpoint: "localhost:11301",
  16. Tube: "tube",
  17. },
  18. })
  19. _, err := producer.Delay([]byte("hello"), time.Second*5)
  20. if err != nil {
  21. fmt.Println(err)
  22. }
  23. _, err = producer.At([]byte("hello"), time.Now().Add(time.Second*10))
  24. if err != nil {
  25. fmt.Println(err)
  26. }
  27. }

Consumer

Configure Introduction

  1. type DqConf struct {
  2. Beanstalks []Beanstalk
  3. Redis redis.RedisConf
  4. }
  5. type Beanstalk struct {
  6. Endpoint string // beanstalkd server address
  7. Tube string // tube name
  8. }
Delay Queue - 图1ParamsDelay Queue - 图2DataTypeDelay Queue - 图3Required?Delay Queue - 图4Default valueDelay Queue - 图5Note
Beanstalks[]BeanstalkYESbeanstalkd configuration
RedisRedisConfYESredis configuration, referenceRedis configuration

Beanstalk

Delay Queue - 图6ParamsDelay Queue - 图7DataTypeDelay Queue - 图8Required?Delay Queue - 图9Default valueDelay Queue - 图10Note
EndpointstringYESbeanstalkd address
TubestringYEStube name

Sample

  1. package main
  2. import (
  3. "fmt"
  4. "github.com/zeromicro/go-queue/dq"
  5. "github.com/zeromicro/go-zero/core/stores/redis"
  6. )
  7. func main() {
  8. consumer := dq.NewConsumer(dq.DqConf{
  9. Beanstalks: []dq.Beanstalk{
  10. {
  11. Endpoint: "localhost:11300",
  12. Tube: "tube",
  13. },
  14. {
  15. Endpoint: "localhost:11301",
  16. Tube: "tube",
  17. },
  18. },
  19. Redis: redis.RedisConf{
  20. Host: "localhost:6379",
  21. Type: redis.NodeType,
  22. },
  23. })
  24. consumer.Consume(func(body []byte) {
  25. fmt.Println(string(body))
  26. })
  27. }

References

  1. go-beanstalk:
  2. Beanstalkd