MQ¶

zan框架MQ采用go语言开源消息队列NSQ,提供操作NSQ的客户端SDK,对应的包名称为nsq-client。

配置¶

MQ连接配置在项目 resource/config/$ENV/nsq.php下。

  1. <?php
  2. /**
  3. * 说明:
  4. * 1. 只有lookup项必填, 其他全部选填
  5. * 2. 所有时间配置 单位: ms
  6. */
  7. return [
  8. // ["必填"]lookup 节点地址
  9. "lookup" => [
  10. "http://www.example.com"
  11. ]
  12. ];

另外,workerStart文件夹下需要新增配置文件.config.php

  1. <?php
  2. use Zan\Framework\Components\Nsq\InitializeSQS;
  3.  
  4. return [
  5. InitializeSQS::class,
  6. ];

接口¶

NSQ目前提供pub和sub接口,接口规范为

  1. <?php
  2. class SQS {
  3. /**
  4. * 订阅
  5. * @param string $topic
  6. * @param string $channel
  7. * @param MsgHandler|callable $msgHandler
  8. * @param int $maxInFlight
  9. * @return \Generator yield return Consumer
  10. * @throws NsqException
  11. */
  12. public static function subscribe($topic, $channel, $msgHandler, $maxInFlight = -1);
  13.  
  14. /**
  15. * 取消订阅
  16. * @param string $topic
  17. * @param string $channel
  18. * @return bool
  19. */
  20. public static function unSubscribe($topic, $channel);
  21.  
  22. /**
  23. * 发布
  24. * @param string $topic
  25. * @param string[] ...$messages
  26. * @return \Generator yield bool
  27. * @throws NsqException
  28. */
  29. public static function publish($topic, ...$messages);
  30.  
  31. /**
  32. * 统计信息
  33. * @return array
  34. */
  35. public static function stat();
  36. }

使用示例¶

  • 订阅
  1. $topic = "zan_mqworker_test";
  2. $ch = "ch";
  3. //msgHandler为callable function
  4. yield SQS::subscribe($topic, $ch, function(Message $msg, Consumer $consumer) {});
  5. //msgHandler为interface MsgHandler
  6. yield SQS::subscribe($topic, $ch, new BenchMsgHandler(), 1);
  • 取消订阅
  1. yield SQS::unSubscribe($topic, $ch);
  • 发布
  1. $oneMsg = "hello";
  2. $multiMsgs = [
  3. "hello",
  4. "hi",
  5. ];
  6. yield SQS::publish($topic, $oneMsg);
  7. yield SQS::publish($topic, "hello", "hi");
  8. yield SQS::publish($topic, ...$multiMsgs);

原文: http://zanphpdoc.zanphp.io/libs/sdks/mq.html