FastCacheQueue

EasySwoole FastCache组件在>= 1.2.1的时候新增类似· beanstalkd消息队列 ·特性。

  • 可以创建多个queue
  • 支持延迟投递
  • 任务超时恢复执行
  • 任务重发执行
  • 任务最大重发次数
  • 支持putJob、delayJob、releaseJob、reserveJob、buryJob、kickJob等命令

基本使用

FastCacheQueue依托于FastCache,具体安装请查看FastCache

服务注册

更新后,EasySwoole\FastCache\CacheProcessConfig类多出以下方法

  1. /** 设置进程最大内存 默认512M */
  2. public function setMaxMem(string $maxMem): void
  3. /** 设置消息队列保留时间 默认60s (取出任务后没有及时确认会重新放回队列) */
  4. public function setQueueReserveTime(int $queueReserveTime): void
  5. /** 设置消息队列最大重发次数 默认10 达到次数后重发将会被丢弃 */
  6. public function setQueueMaxReleaseTimes(int $queueMaxReleaseTimes): void

开始使用

下文示例代码的Job和Cache都使用以下命名空间

  1. use EasySwoole\FastCache\Cache;
  2. use EasySwoole\FastCache\Job;

投递任务

投递成功之后 将会返回该任务的jobId。

没有失败情况,除非fastCache注册注册失败。

  1. $job = new Job();
  2. $job->setData("siam"); // 任意类型数据
  3. $job->setQueue("siam_queue");
  4. $jobId = Cache::getInstance()->putJob($job);
  5. var_dump($jobId);

取出任务

可以开启自定义进程当消费者,循环监听队列,执行任务处理。

注意:任务执行完成一定要有一个结果。要么删除该任务,要么重发。否则当任务取出一定时间后(默认60s)会自动放回队列中。

  1. $job = Cache::getInstance()->getJob('siam_queue');// Job对象或者null
  2. if ($job === null){
  3. echo "没有任务\n";
  4. }else{
  5. // 执行业务逻辑
  6. var_dump($job);
  7. // 执行完了要删除或者重发,否则超时会自动重发
  8. Cache::getInstance()->deleteJob($job);
  9. }

清空ready任务队列

  1. var_dump(Cache::getInstance()->flushReadyJobQueue('siam_queue'));
  2. var_dump(Cache::getInstance()->jobQueueSize('siam_queue'));

延迟执行任务

  1. $job = new Job();
  2. $job->setData("siam");
  3. $job->setQueue("siam_queue_delay");
  4. $job->setDelay(5);// 延时5s
  5. $jobId = Cache::getInstance()->putJob($job);
  6. var_dump($jobId);
  7. // 马上取会失败 隔5s取才成功
  8. $job = Cache::getInstance()->getJob('siam_queue_delay');
  9. var_dump($job);

删除任务

可以是由getJob取出的对象,也可以自己声明Job对象,传入JobId来删除。

  1. $job = new Job();
  2. $job->setJobId(1);
  3. $job->setQueue('siam_queue_delay');
  4. Cache::getInstance()->deleteJob($job);

任务重发

任务执行失败,或者某些场景需要重新执行,则可以重发。

重发时,可以指定是否延迟执行。

  1. // get出来的任务执行失败可以重发
  2. $job = new Job();
  3. $job->setData("siam");
  4. $job->setQueue("siam_queue");
  5. $jobId = Cache::getInstance()->putJob($job);
  6. $job = Cache::getInstance()->getJob('siam_queue');
  7. if ($job === null){
  8. echo "没有任务\n";
  9. }else{
  10. // 执行业务逻辑
  11. $doRes = false;
  12. if (!$doRes){
  13. // 业务逻辑失败,需要重发
  14. // 如果延迟队列需要马上重发,在这里需要清空delay属性
  15. // $job->setDelay(0);
  16. // 如果普通队列需要延迟重发,则设置delay属性
  17. // $job->setDelay(5);
  18. $res = Cache::getInstance()->releaseJob($job);
  19. var_dump($res);
  20. }else{
  21. // 执行完了要删除或者重发,否则超时会自动重发
  22. Cache::getInstance()->deleteJob($job);
  23. }
  24. }

返回现在有什么队列

  1. $queues = Cache::getInstance()->jobQueues();
  2. var_dump($queues);

返回某个队列的长度

  1. $queueSize = Cache::getInstance()->jobQueueSize("siam_queue");
  2. $queueSize2 = Cache::getInstance()->jobQueueSize("siam_queue_delay");
  3. var_dump($queueSize);
  4. var_dump($queueSize2);

清空队列 可指定名称

  1. // 清空全部
  2. $res = Cache::getInstance()->flushJobQueue();
  3. var_dump($res);
  4. // 清空siam_queue队列
  5. $res = Cache::getInstance()->flushJobQueue('siam_queue');
  6. var_dump($res);

将任务改为延迟状态

  1. //添加任务
  2. $job = new Job();
  3. $job->setData("LuffyQAQ");
  4. $job->setQueue("LuffyQAQ_queue_delay");
  5. $jobId = Cache::getInstance()->putJob($job);
  6. //方法一 直接传入jobId
  7. $job->setJobId($jobId);
  8. $job->setDelay(30);
  9. var_dump(Cache::getInstance()->delayJob($job));
  10. //方法二 取出任务
  11. $job = Cache::getInstance()->getJob('LuffyQAQ_queue_delay');
  12. $job->setDelay(30);
  13. var_dump(Cache::getInstance()->delayJob($job));
  14. //使用jobQueueSize查看队列长度
  15. $queueSize = Cache::getInstance()->jobQueueSize("LuffyQAQ_queue_delay");
  16. var_dump($queueSize);

从延迟执行队列中拿取

  1. //传入队列名
  2. var_dump(Cache::getInstance()->getDelayJob('LuffyQAQ_queue_delay'));

清空delay任务队列

  1. var_dump(Cache::getInstance()->flushDelayJobQueue('LuffyQAQ_queue_delay'));
  2. var_dump(Cache::getInstance()->jobQueueSize('LuffyQAQ_queue_delay'));

将任务改为保留状态

  1. //添加任务
  2. $job = new Job();
  3. $job->setData("LuffyQAQ");
  4. $job->setQueue("LuffyQAQ_queue_reserve");
  5. $jobId = Cache::getInstance()->putJob($job);
  6. //方法一 直接传入jobId
  7. $job->setJobId($jobId);
  8. var_dump(Cache::getInstance()->reserveJob($job));
  9. //方法二 取出任务
  10. $job = Cache::getInstance()->getJob('LuffyQAQ_queue_reserve');
  11. var_dump(Cache::getInstance()->reserveJob($job));
  12. //使用jobQueueSize查看队列长度
  13. $queueSize = Cache::getInstance()->jobQueueSize("LuffyQAQ_queue_reserve");
  14. var_dump($queueSize);

从保留队列中拿取

  1. //传入队列名
  2. var_dump(Cache::getInstance()->getReserveJob('LuffyQAQ_queue_reserve'));

清空reserve任务队列

  1. var_dump(Cache::getInstance()->flushReserveJobQueue('LuffyQAQ_queue_reserve'));
  2. var_dump(Cache::getInstance()->jobQueueSize('LuffyQAQ_queue_reserve'));

将任务改为埋藏状态

  1. $job = new Job();
  2. $job->setQueue('LuffyQAQ_queue_bury');
  3. $job->setData('LuffyQAQ');
  4. $jobId = Cache::getInstance()->putJob($job);
  5. $job->setJobId($jobId);
  6. var_dump(Cache::getInstance()->buryJob($job));
  7. //使用jobQueueSize查看队列长度
  8. $queueSize = Cache::getInstance()->jobQueueSize("LuffyQAQ_queue_bury");
  9. var_dump($queueSize);

从埋藏队列中拿取

  1. //传入队列名
  2. var_dump(Cache::getInstance()->getBuryJob('LuffyQAQ_queue_bury'));

将埋藏队列任务恢复到ready中

  1. var_dump(Cache::getInstance()->kickJob($job));

清空bury任务队列

  1. var_dump(Cache::getInstance()->flushBuryJobQueue('LuffyQAQ_queue_bury'));
  2. var_dump(Cache::getInstance()->jobQueueSize('LuffyQAQ_queue_bury'));