diff --git a/Src/Core/Process/ProcessAbstract.php b/Src/Core/Process/ProcessAbstract.php index d899bf6d..33f7db98 100644 --- a/Src/Core/Process/ProcessAbstract.php +++ b/Src/Core/Process/ProcessAbstract.php @@ -103,6 +103,12 @@ public function onStart() { $this->beforeStart(); $this->channel = new Coroutine\Channel(1); + + $this->listen(); + $this->start(); + } + + private function listen() { Coroutine::create(function () { /** * @var Coroutine\Socket $socket @@ -129,47 +135,58 @@ public function onStart() { } $this->channel->close(); }); + } + private function start() { if (method_exists($this, 'read')) { - $this->startByEvent(); + $pipe = $this->pipe ? $this->pipe : $this->process->pipe; + Event::add($pipe, function () { + $this->doRun(function () { + $data = $this->pipe ? '' : $this->process->read(); + if (!$this->read($data)) { + Event::del($this->pipe ? $this->pipe : $this->process->pipe); + } + }); + }); } else { - $this->startByTimer(); - } - } - - private function startByTimer() { - $this->doRun(function () { - $this->run($this->process); - }); - } - - private function startByEvent() { - $pipe = $this->pipe ? $this->pipe : $this->process->pipe; - Event::add($pipe, function () { $this->doRun(function () { - $data = $this->pipe ? '' : $this->process->read(); - if (!$this->read($data)) { - Event::del($this->pipe ? $this->pipe : $this->process->pipe); - } + $this->run($this->process); }); - }); + } } private function doRun(\Closure $callback) { try { $callback(); + $this->stopProcessIfNecessary(); } catch (\Throwable $throwable) { if ((ENV & DEBUG) == DEBUG) { (new Output())->error($throwable->getMessage() . ' at file ' . $throwable->getFile() . ' line ' . $throwable->getLine()); } $this->getContainer()->singleton(HandlerExceptions::class)->getHandler()->report($throwable); - $this->channel->push(true); - Timer::clearAll(); - $this->getProcess()->exit(); + $this->stop(); } } + private function stopProcessIfNecessary() { + $timerNum = 0; + foreach (Timer::list() as $item) { + ++$timerNum; + } + if ($timerNum == 0) { + $this->stop(); + } + } + + private function stop() { + $this->getLogger()->debug('process ' . $this->getProcessName() . ' exit'); + + $this->channel->push(true); + Timer::clearAll(); + $this->getProcess()->exit(); + } + abstract protected function run(Process $process); /** @@ -192,8 +209,4 @@ public function sendMsg($msg) { public function readMsg($size = null) { return $this->getProcess()->pop($size); } - - public function onStop() { - $this->getLogger()->debug('process ' . $this->getProcessName() . ' exit'); - } }