自定义进程
EasySwoole中支持添加用户自定义的swoole process。
抽象父类
任何的自定义进程,都应该继承自EasySwoole\Core\Swoole\Process\AbstractProcess,
AbstractProcess实现代码如下:
<?php
/**
* Created by PhpStorm.
* User: yf
* Date: 2018/1/17
* Time: 上午11:28
*/
namespace EasySwoole\Core\Swoole\Process;
use EasySwoole\Core\Swoole\Memory\TableManager;
use EasySwoole\Core\Swoole\ServerManager;
use EasySwoole\Core\Swoole\Time\Timer;
use Swoole\Process;
abstract class AbstractProcess
{
private $swooleProcess;
private $processName;
private $async = null;
private $args = [];
function __construct(string $processName,$async = true,array $args)
{
$this->async = $async;
$this->args = $args;
$this->processName = $processName;
$this->swooleProcess = new \swoole_process([$this,'__start'],false,2);
ServerManager::getInstance()->getServer()->addProcess($this->swooleProcess);
}
public function getProcess():Process
{
return $this->swooleProcess;
}
/*
* 仅仅为了提示:在自定义进程中依旧可以使用定时器
*/
public function addTick($ms,callable $call):?int
{
return Timer::loop(
$ms,$call
);
}
public function clearTick(int $timerId)
{
Timer::clear($timerId);
}
public function delay($ms,callable $call):?int
{
return Timer::delay(
$ms,$call
);
}
/*
* 服务启动后才能获得到pid
*/
public function getPid():?int
{
if(isset($this->swooleProcess->pid)){
return $this->swooleProcess->pid;
}else{
$key = md5($this->processName);
$pid = TableManager::getInstance()->get('process_hash_map')->get($key);
if($pid){
return $pid['pid'];
}else{
return null;
}
}
}
function __start(Process $process)
{
if(PHP_OS != 'Darwin'){
$process->name($this->getProcessName());
}
TableManager::getInstance()->get('process_hash_map')->set(
md5($this->processName),['pid'=>$this->swooleProcess->pid]
);
ProcessManager::getInstance()->setProcess($this->getProcessName(),$this);
if (extension_loaded('pcntl')) {
pcntl_async_signals(true);
}
Process::signal(SIGTERM,function ()use($process){
$this->onShutDown();
TableManager::getInstance()->get('process_hash_map')->del(md5($this->processName));
swoole_event_del($process->pipe);
$this->swooleProcess->exit(0);
});
if($this->async){
swoole_event_add($this->swooleProcess->pipe, function(){
$msg = $this->swooleProcess->read(64 * 1024);
$this->onReceive($msg);
});
}
$this->run($this->swooleProcess);
}
public function getArgs():array
{
return $this->args;
}
public function getProcessName()
{
return $this->processName;
}
public abstract function run(Process $process);
public abstract function onShutDown();
public abstract function onReceive(string $str,...$args);
}
进程管理器
ProcessManager,实现代码如下:
namespace EasySwoole\Core\Swoole\Process;
use EasySwoole\Core\AbstractInterface\Singleton;
use EasySwoole\Core\Swoole\Memory\TableManager;
use EasySwoole\Core\Swoole\ServerManager;
use Swoole\Table;
class ProcessManager
{
use Singleton;
private $processList = [];
function __construct()
{
TableManager::getInstance()->add(
'process_hash_map',[
'pid'=>[
'type'=>Table::TYPE_INT,
'size'=>10
]
],256
);
}
public function addProcess(string $processName,string $processClass,$async = true,array $args = []):bool
{
if(ServerManager::getInstance()->isStart()){
trigger_error('you can not add a process after server start');
return false;
}
$key = md5($processName);
if(!isset($this->processList[$key])){
try{
$process = new $processClass($processName,$async,$args);
$this->processList[$key] = $process;
return true;
}catch (\Throwable $throwable){
trigger_error($throwable->getMessage().$throwable->getTraceAsString());
return false;
}
}else{
trigger_error('you can not add the same name process : '.$processName);
return false;
}
}
public function getProcessByName(string $processName):?AbstractProcess
{
$key = md5($processName);
if(isset($this->processList[$key])){
return $this->processList[$key];
}else{
return null;
}
}
public function getProcessByPid(int $pid):?AbstractProcess
{
$table = TableManager::getInstance()->get('process_hash_map');
foreach ($table as $key => $item){
if($item['pid'] == $pid){
return $this->processList[$key];
}
}
return null;
}
public function setProcess(string $processName,AbstractProcess $process)
{
$key = md5($processName);
$this->processList[$key] = $process;
}
public function reboot(string $processName):bool
{
$p = $this->getProcessByName($processName);
if($p){
\swoole_process::kill($p->getPid(),SIGTERM);
return true;
}else{
return false;
}
}
public function writeByProcessName(string $name,string $data):bool
{
$process = $this->getProcessByName($name);
if($process){
return (bool)$process->getProcess()->write($data);
}else{
return false;
}
}
public function readByProcessName(string $name,float $timeOut = 0.1):?string
{
$process = $this->getProcessByName($name);
if($process){
$process = $process->getProcess();
$read = array($process);
$write = [];
$error = [];
$ret = swoole_select($read, $write,$error, $timeOut);
if($ret){
return $process->read(64 * 1024);
}else{
return null;
}
}else{
return null;
}
}
}
实例
我们以demo中的自定义进程例子来说明:
namespace App\Process;
use EasySwoole\Core\Swoole\Process\AbstractProcess;
use Swoole\Process;
class Test extends AbstractProcess
{
//进程start的时候会执行的事件
public function run(Process $process)
{
// TODO: Implement run() method.
//添加进程内定时器
$this->addTick(2000,function (){
var_dump('this is '.$this->getProcessName().' process tick');
});
}
//当进程关闭的时候会执行该事件
public function onShutDown()
{
// TODO: Implement onShutDown() method.
}
//当有信息发给该进程的时候,会执行此进程
public function onReceive(string $str, ...$args)
{
// TODO: Implement onReceive() method.
var_dump('process rec'.$str);
}
}
以上代码直达连接,
至于如何使用(测试),请见demo中的EasySwooleEvent.php