延时队列

概述

延时队列是一种特殊的队列,它的元素只有在指定的延时时间到达之后才能被消费。延时队列的实现通常依赖于定时任务,定时任务会定期扫描队列中的元素,将到期的元素移动到另一个队列中,消费者从这个队列中消费元素。

在 go-zero 中我们使用 go-queue 来实现延时队列。

任务目标

  • 了解 go-queue 的基本使用
  • 了解如何在 go-zero 中使用延时队列

准备条件

生产者

dq 生产者很简单,只需要 beanstalkd 地址,tube 即可创建一个 Producer 对象。

  1. type Beanstalk struct {
  2. Endpoint string // beanstalkd 地址
  3. Tube string // tube 名称
  4. }
  5. NewProducer(beanstalks []Beanstalk) Producer

代码示例

  1. package main
  2. import (
  3. "fmt"
  4. "strconv"
  5. "time"
  6. "github.com/zeromicro/go-queue/dq"
  7. )
  8. func main() {
  9. producer := dq.NewProducer([]dq.Beanstalk{
  10. {
  11. Endpoint: "localhost:11300",
  12. Tube: "tube",
  13. },
  14. {
  15. Endpoint: "localhost:11301",
  16. Tube: "tube",
  17. },
  18. })
  19. // 延迟 5s 后处理
  20. _, err := producer.Delay([]byte("hello"), time.Second*5)
  21. if err != nil {
  22. fmt.Println(err)
  23. }
  24. // 在指定时间点处理
  25. _, err = producer.At([]byte("hello"), time.Now().Add(time.Second*10))
  26. if err != nil {
  27. fmt.Println(err)
  28. }
  29. }

消费者

配置介绍

  1. type DqConf struct {
  2. Beanstalks []Beanstalk
  3. Redis redis.RedisConf
  4. }
  5. type Beanstalk struct {
  6. Endpoint string // beanstalkd 地址
  7. Tube string // tube 名称
  8. }
延时队列 - 图1参数延时队列 - 图2类型延时队列 - 图3是否必填延时队列 - 图4默认值延时队列 - 图5说明
Beanstalks[]Beanstalkbeanstalkd 配置
RedisRedisConfredis 配置,详情参考Redis 配置

Beanstalk

延时队列 - 图6参数延时队列 - 图7类型延时队列 - 图8是否必填延时队列 - 图9默认值延时队列 - 图10说明
Endpointstringbeanstalkd 地址
Tubestringtube 名称

示例

  1. package main
  2. import (
  3. "fmt"
  4. "github.com/zeromicro/go-queue/dq"
  5. "github.com/zeromicro/go-zero/core/stores/redis"
  6. )
  7. func main() {
  8. consumer := dq.NewConsumer(dq.DqConf{
  9. Beanstalks: []dq.Beanstalk{
  10. {
  11. Endpoint: "localhost:11300",
  12. Tube: "tube",
  13. },
  14. {
  15. Endpoint: "localhost:11301",
  16. Tube: "tube",
  17. },
  18. },
  19. Redis: redis.RedisConf{
  20. Host: "localhost:6379",
  21. Type: redis.NodeType,
  22. },
  23. })
  24. consumer.Consume(func(body []byte) {
  25. fmt.Println(string(body))
  26. })
  27. }

参考文献

  1. 《go-beanstalk》
  2. 《beanstalkd》