延时队列
概述
延时队列是一种特殊的队列,它的元素只有在指定的延时时间到达之后才能被消费。延时队列的实现通常依赖于定时任务,定时任务会定期扫描队列中的元素,将到期的元素移动到另一个队列中,消费者从这个队列中消费元素。
在 go-zero 中我们使用 go-queue 来实现延时队列。
任务目标
- 了解 go-queue 的基本使用
- 了解如何在 go-zero 中使用延时队列
准备条件
- 安装 beanstalkd
生产者
dq 生产者很简单,只需要 beanstalkd 地址,tube 即可创建一个 Producer 对象。
type Beanstalk struct {
Endpoint string // beanstalkd 地址
Tube string // tube 名称
}
NewProducer(beanstalks []Beanstalk) Producer
代码示例
package main
import (
"fmt"
"strconv"
"time"
"github.com/zeromicro/go-queue/dq"
)
func main() {
producer := dq.NewProducer([]dq.Beanstalk{
{
Endpoint: "localhost:11300",
Tube: "tube",
},
{
Endpoint: "localhost:11301",
Tube: "tube",
},
})
// 延迟 5s 后处理
_, err := producer.Delay([]byte("hello"), time.Second*5)
if err != nil {
fmt.Println(err)
}
// 在指定时间点处理
_, err = producer.At([]byte("hello"), time.Now().Add(time.Second*10))
if err != nil {
fmt.Println(err)
}
}
消费者
配置介绍
type DqConf struct {
Beanstalks []Beanstalk
Redis redis.RedisConf
}
type Beanstalk struct {
Endpoint string // beanstalkd 地址
Tube string // tube 名称
}
参数 | 类型 | 是否必填 | 默认值 | 说明 |
---|---|---|---|---|
Beanstalks | []Beanstalk | 是 | beanstalkd 配置 | |
Redis | RedisConf | 是 | redis 配置,详情参考Redis 配置 |
Beanstalk
参数 | 类型 | 是否必填 | 默认值 | 说明 |
---|---|---|---|---|
Endpoint | string | 是 | beanstalkd 地址 | |
Tube | string | 是 | tube 名称 |
示例
package main
import (
"fmt"
"github.com/zeromicro/go-queue/dq"
"github.com/zeromicro/go-zero/core/stores/redis"
)
func main() {
consumer := dq.NewConsumer(dq.DqConf{
Beanstalks: []dq.Beanstalk{
{
Endpoint: "localhost:11300",
Tube: "tube",
},
{
Endpoint: "localhost:11301",
Tube: "tube",
},
},
Redis: redis.RedisConf{
Host: "localhost:6379",
Type: redis.NodeType,
},
})
consumer.Consume(func(body []byte) {
fmt.Println(string(body))
})
}