EasySwoole 基于Redis组件实现延迟队列

介绍

在用户要支付订单的时候,如果超过30分钟未支付,会把订单关掉。当然我们可以做一个定时任务,每个一段时间来扫描未支付的订单,如果该订单超过支付时间就关闭,但是在数据量小的时候并没有什么大的问题,但是数据量一大轮训数据库的方式就会变得特别耗资源。当面对千万级、上亿级数据量时,本身写入的IO就比较高,导致长时间查询或者根本就查不出来,更别说分库分表以后了。

使用延迟队列解决的痛点无非是

  1. 实现了数据延迟
  2. 数据摊开(仔细去理解)

知识点

  1. redis有序集合
  2. EasySwoole Redis协程客户端

案例

生成订单id —-> 扔到延迟队列 —-> 延迟队列消费进程不停获取30分钟前的订单满足条件的订单 —-> 处理订单

直接上代码

EasySwooleEvent.php 注册redis连接池、注册延迟队列消费进程

  1. <?php
  2. namespace EasySwoole\EasySwoole;
  3. use App\Process\Consumer;
  4. use EasySwoole\EasySwoole\Swoole\EventRegister;
  5. use EasySwoole\EasySwoole\AbstractInterface\Event;
  6. use EasySwoole\Http\Request;
  7. use EasySwoole\Http\Response;
  8. use EasySwoole\Pool\Manager;
  9. use EasySwoole\Redis\Config\RedisConfig;
  10. use App\RedisPool\RedisPool;
  11. use EasySwoole\Pool\Config;
  12. class EasySwooleEvent implements Event
  13. {
  14. public static function initialize()
  15. {
  16. // TODO: Implement initialize() method.
  17. date_default_timezone_set('Asia/Shanghai');
  18. }
  19. public static function mainServerCreate(EventRegister $register)
  20. {
  21. //TODO:: 注册redis连接池
  22. $config = new Config();
  23. $redisConfig1 = new RedisConfig([
  24. 'host' => '127.0.0.1',
  25. 'port' => '6379'
  26. ]);
  27. // 这里的redis连接池看文档配吧
  28. Manager::getInstance()->register(new RedisPool($config,$redisConfig1),'redis');
  29. //TODO:: 延迟队列消费进程
  30. $processConfig= new \EasySwoole\Component\Process\Config();
  31. $processConfig->setProcessName('testProcess');
  32. \EasySwoole\Component\Process\Manager::getInstance()->addProcess(new Consumer($processConfig));
  33. }
  34. public static function onRequest(Request $request, Response $response): bool
  35. {
  36. // TODO: Implement onRequest() method.
  37. return true;
  38. }
  39. public static function afterRequest(Request $request, Response $response): void
  40. {
  41. // TODO: Implement afterAction() method.
  42. }
  43. }

扔到延迟队列

  1. <?php
  2. namespace App\HttpController;
  3. use EasySwoole\Http\AbstractInterface\Controller;
  4. use EasySwoole\Pool\Manager;
  5. class Index extends Controller
  6. {
  7. function index()
  8. {
  9. /** @var $redis \EasySwoole\Redis\Redis*/
  10. $orderId = date('YmdHis', time());
  11. $redis = Manager::getInstance()->get('redis')->getObj();
  12. $res = $redis->zAdd('delay_queue_test1', time(), $orderId);
  13. if ($res) {
  14. $this->writeJson(200, '订单添加成功:'.$orderId);
  15. }
  16. }
  17. }

延迟队列消费进程

  1. <?php
  2. namespace App\Process;
  3. use EasySwoole\Component\Process\AbstractProcess;
  4. use EasySwoole\Pool\Manager;
  5. use Swoole\Coroutine;
  6. class Consumer extends AbstractProcess {
  7. protected function run($arg)
  8. {
  9. go(function (){
  10. while (true) {
  11. //TODO:: 拿到redis
  12. /** @var $redis \EasySwoole\Redis\Redis*/
  13. $redis = Manager::getInstance()->get('redis')->defer();
  14. //TODO:: 从有序集合中拿到三秒(模拟30分钟)以前的订单
  15. $orderIds = $redis->zRangeByScore('delay_queue_test1', 0, time()-3, ['withscores' => TRUE]);
  16. if (empty($orderIds)) {
  17. Coroutine::sleep(1);
  18. continue;
  19. }
  20. //TODO::拿出后立马删除
  21. $redis->zRem('delay_queue_test1', ...$orderIds);
  22. foreach ($orderIds as $orderId)
  23. {
  24. var_dump($orderId);
  25. //TODO::判断此订单30分钟后,是否仍未完成,做相应处理
  26. }
  27. }
  28. });
  29. }
  30. }

测试

请求index/index 投递订单到延迟队列

  1. ~ curl 127.0.0.1:9501/index/index
  2. {"code":200,"result":"订单添加成功:20200422004046","msg":null}%

等3s看终端是否输出

  1. easyswoole php easyswoole start
  2. ______ _____ _
  3. | ____| / ____| | |
  4. | |__ __ _ ___ _ _ | (___ __ __ ___ ___ | | ___
  5. | __| / _` | / __| | | | | \___ \ \ \ /\ / / / _ \ / _ \ | | / _ \
  6. | |____ | (_| | \__ \ | |_| | ____) | \ V V / | (_) | | (_) | | | | __/
  7. |______| \__,_| |___/ \__, | |_____/ \_/\_/ \___/ \___/ |_| \___|
  8. __/ |
  9. |___/
  10. main server SWOOLE_WEB
  11. listen address 0.0.0.0
  12. listen port 9501
  13. ip@en0 192.168.43.57
  14. worker_num 8
  15. reload_async true
  16. max_wait_time 3
  17. pid_file /Users/xx/sites/easyswoole/Temp/pid.pid
  18. log_file /Users/xx/sites/easyswoole/Log/swoole.log
  19. user xx
  20. daemonize false
  21. swoole version 4.4.15
  22. php version 7.2.18
  23. easy swoole 3.3.7
  24. develop/produce develop
  25. temp dir /Users/xx/sites/easyswoole/Temp
  26. log dir /Users/xx/sites/easyswoole/Log
  27. string(14) "20200422004046"

总结

这只是一个思路,大家可以根据实际业务做不同调整