自定义进程

支持开发者创建一些特殊的工作进程,用于监控、上报或者其他特殊的任务,参考addProcess

  1. 创建Proccess类,实现CustomProcessInterface接口。

    1. namespace App\Processes;
    2. use App\Tasks\TestTask;
    3. use Hhxsv5\LaravelS\Swoole\Process\CustomProcessInterface;
    4. use Hhxsv5\LaravelS\Swoole\Task\Task;
    5. use Swoole\Coroutine;
    6. use Swoole\Http\Server;
    7. use Swoole\Process;
    8. class TestProcess implements CustomProcessInterface
    9. {
    10. /**
    11. * @var bool 退出标记,用于Reload更新
    12. */
    13. private static $quit = false;
    14. public static function callback(Server $swoole, Process $process)
    15. {
    16. // 进程运行的代码,不能退出,一旦退出Manager进程会自动再次创建该进程。
    17. while (!self::$quit) {
    18. \Log::info('Test process: running');
    19. // sleep(1); // Swoole < 2.1
    20. Coroutine::sleep(1); // Swoole>=2.1 已自动为callback()方法创建了协程并启用了协程Runtime。
    21. // 自定义进程中也可以投递Task,但不支持Task的finish()回调。
    22. // 注意:修改config/laravels.php,配置task_ipc_mode为1或2,参考 https://wiki.swoole.com/#/server/setting?id=task_ipc_mode
    23. $ret = Task::deliver(new TestTask('task data'));
    24. var_dump($ret);
    25. // 上层会捕获callback中抛出的异常,并记录到Swoole日志,然后此进程会退出,3秒后Manager进程会重新创建进程,所以需要开发者自行try/catch捕获异常,避免频繁创建进程。
    26. // throw new \Exception('an exception');
    27. }
    28. }
    29. // 要求:LaravelS >= v3.4.0 并且 callback() 必须是异步非阻塞程序。
    30. public static function onReload(Server $swoole, Process $process)
    31. {
    32. // Stop the process...
    33. // Then end process
    34. \Log::info('Test process: reloading');
    35. self::$quit = true;
    36. // $process->exit(0); // 强制退出进程
    37. }
    38. // 要求:LaravelS >= v3.7.4 并且 callback() 必须是异步非阻塞程序。
    39. public static function onStop(Server $swoole, Process $process)
    40. {
    41. // Stop the process...
    42. // Then end process
    43. \Log::info('Test process: stopping');
    44. self::$quit = true;
    45. // $process->exit(0); // 强制退出进程
    46. }
    47. }
  2. 注册TestProcess。

    1. // 修改文件 config/laravels.php
    2. // ...
    3. 'processes' => [
    4. 'test' => [ // Key为进程名
    5. 'class' => \App\Processes\TestProcess::class,
    6. 'redirect' => false, // 是否重定向输入输出
    7. 'pipe' => 0, // 管道类型:0不创建管道,1创建SOCK_STREAM类型管道,2创建SOCK_DGRAM类型管道
    8. 'enable' => true, // 是否启用,默认true
    9. //'queue' => [ // 启用消息队列作为进程间通信,配置空数组表示使用默认参数
    10. // 'msg_key' => 0, // 消息队列的KEY,默认会使用ftok(__FILE__, 1)
    11. // 'mode' => 2, // 通信模式,默认为2,表示争抢模式
    12. // 'capacity' => 8192, // 单个消息长度,长度受限于操作系统内核参数的限制,默认为8192,最大不超过65536
    13. //],
    14. //'restart_interval' => 5, // 进程异常退出后需等待多少秒再重启,默认5秒
    15. ],
    16. ],
  3. 注意:callback()方法不能退出,如果退出,Manager进程将会重新创建进程。

  4. 示例:向自定义进程中写数据。

    1. // config/laravels.php
    2. 'processes' => [
    3. 'test' => [
    4. 'class' => \App\Processes\TestProcess::class,
    5. 'redirect' => false,
    6. 'pipe' => 1,
    7. ],
    8. ],
    1. // app/Processes/TestProcess.php
    2. public static function callback(Server $swoole, Process $process)
    3. {
    4. while ($data = $process->read()) {
    5. \Log::info('TestProcess: read data', [$data]);
    6. $process->write('TestProcess: ' . $data);
    7. }
    8. }
    1. // app/Http/Controllers/TestController.php
    2. public function testProcessWrite()
    3. {
    4. /**@var \Swoole\Process $process */
    5. $process = app('swoole')->customProcesses['test'];
    6. $process->write('TestController: write data' . time());
    7. var_dump($process->read());
    8. }