简介

Queue Worker是自己实现的队列调度服务,因为在开源的作品发现基本都是耦合了redis队列服务的,不太符合业务的需求。作为组件服务,需要抽象一种非侵入式的queue接口设计,由业务方注入实现了queue接口的驱动,比如redis、alimns、kafka、rabbitMQ。目前只实现了redis和alimns,其他后续看业务使用场景可扩展。

启动

  1. build/bin/snow -a job

目录

app/job

注册Worker

在队列调度任务中注册需要执行的Worker,目前提供以下两种方式进行注册。具体Worker的业务逻辑需要注意,需要保证业务逻辑是同步阻塞的,如果需要进行并发,需要阻塞等待所有的并发结束,否则无法控制worker执行的并发数。

AddFunc

第一个参数为topic,第二个参数为回调函数,第三个为worker的并发数[可选,未设置使用默认并发数]

  1. job.AddFunc("topic:test", test, 1)
  2. //test函数示例,遵循如下的输入返回数据结构
  3. func test(task work.Task) (work.TaskResult) {
  4. //业务逻辑
  5. }

AddWorker

需要传入Worker的数据结构,示例如下:

  1. //test函数输入输出同上
  2. job.AddWorker("topic:test2", &work.Worker{Call: work.MyWorkerFunc(test), MaxConcurrency: 1})

Queue驱动

队列调度要跑起来需要去拉取Queue驱动的数据,只要实现了如下Queue接口的定义,都可以进行Queue驱动注入。

Queue接口

  1. type Queue interface {
  2. Enqueue(ctx context.Context, key string, message string, args ...interface{}) (isOk bool, err error)
  3. Dequeue(ctx context.Context, key string) (message string, token string, err error)
  4. AckMsg(ctx context.Context, key string, token string) (ok bool, err error)
  5. BatchEnqueue(ctx context.Context, key string, messages []string, args ...interface{}) (isOk bool, err error)
  6. }

获取Queue驱动

目前框架已经集成好Queue的组件,可以直接使用。

  1. ## 第一个参数为依赖注入的别名,第二个参数为驱动类型(目前支持redis和alimns)
  2. q, err := queue.GetQueue(redis.SingletonMain, queue.DriverTypeRedis)

注入Queue驱动

针对topic设置相关的queue

  1. job.AddQueue(q, "topic:test1", "topic:test2")

设置默认的queue, 没有设置过的topic会使用默认的queue

  1. job.AddQueue(q)

消息入队

目前在app/jobs子包下提供了4个消息入队的方法,最终消息是以Task结构json序列化的字符串保存到消息队列中,通过队列调度服务从消息队列中取出进行json反序列化,交给对应的worker处理。

  1. /**
  2. * 消息入队 -- 原始message
  3. */
  4. func Enqueue(ctx context.Context, topic string, message string, args ...interface{}) (isOk bool, err error) {
  5. return getJob().Enqueue(ctx, topic, message, args...)
  6. }
  7. /**
  8. * 消息入队 -- Task数据结构
  9. */
  10. func EnqueueWithTask(ctx context.Context, topic string, task work.Task, args ...interface{}) (isOk bool, err error) {
  11. return getJob().EnqueueWithTask(ctx, topic, task, args...)
  12. }
  13. /**
  14. * 消息批量入队 -- 原始message
  15. */
  16. func BatchEnqueue(ctx context.Context, topic string, messages []string, args ...interface{}) (isOk bool, err error) {
  17. return getJob().BatchEnqueue(ctx, topic, messages, args...)
  18. }
  19. /**
  20. * 消息批量入队 -- Task数据结构
  21. */
  22. func BatchEnqueueWithTask(ctx context.Context, topic string, tasks []work.Task, args ...interface{}) (isOk bool, err error) {
  23. return getJob().BatchEnqueueWithTask(ctx, topic, tasks, args...)
  24. }

其他配置

针对性启用Topic消费

目前会有这样的应用场景,一份代码需要注册所有的topic,但是不同机器需要跑不同的topic消费Worker。可以通过如下方法实现:

代码实现:

  1. if config.GetOptions().Queue != "" {
  2. topics := strings.Split(config.GetOptions().Queue, ",")
  3. job.SetEnableTopics(topics...)
  4. }

启动服务控制参数:

  1. # -queue为空或者不配置时,默认启用所有注册的topic
  2. build/bin/snow -a job -queue "topic1,topic2"

设置日志Logger及等级

  1. ## 非侵入式设计,只需要实现work.Logger的接口即可
  2. job.SetLogger(logger.GetLogger())
  3. ## 设置日志等级,默认为work.Info
  4. job.SetLevel(work.WARN)

设置标准输出日志等级

  1. ## 默认为work.Info
  2. job.SetConsoleLevel(work.WARN)

设置Worker默认并发数

  1. ## 默认值为5
  2. job.SetConcurrency(10)

详细文档

https://github.com/qit-team/work