Skip to content

Commit

Permalink
优化代码
Browse files Browse the repository at this point in the history
  • Loading branch information
titrxw committed Feb 25, 2021
1 parent 98f830a commit 5c05ec8
Showing 1 changed file with 39 additions and 26 deletions.
65 changes: 39 additions & 26 deletions Src/Core/Process/ProcessAbstract.php
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,12 @@ public function onStart() {
$this->beforeStart();

$this->channel = new Coroutine\Channel(1);

$this->listen();
$this->start();
}

private function listen() {
Coroutine::create(function () {
/**
* @var Coroutine\Socket $socket
Expand All @@ -129,47 +135,58 @@ public function onStart() {
}
$this->channel->close();
});
}

private function start() {
if (method_exists($this, 'read')) {
$this->startByEvent();
$pipe = $this->pipe ? $this->pipe : $this->process->pipe;
Event::add($pipe, function () {
$this->doRun(function () {
$data = $this->pipe ? '' : $this->process->read();
if (!$this->read($data)) {
Event::del($this->pipe ? $this->pipe : $this->process->pipe);
}
});
});
} else {
$this->startByTimer();
}
}

private function startByTimer() {
$this->doRun(function () {
$this->run($this->process);
});
}

private function startByEvent() {
$pipe = $this->pipe ? $this->pipe : $this->process->pipe;
Event::add($pipe, function () {
$this->doRun(function () {
$data = $this->pipe ? '' : $this->process->read();
if (!$this->read($data)) {
Event::del($this->pipe ? $this->pipe : $this->process->pipe);
}
$this->run($this->process);
});
});
}
}

private function doRun(\Closure $callback) {
try {
$callback();
$this->stopProcessIfNecessary();
} catch (\Throwable $throwable) {
if ((ENV & DEBUG) == DEBUG) {
(new Output())->error($throwable->getMessage() . ' at file ' . $throwable->getFile() . ' line ' . $throwable->getLine());
}
$this->getContainer()->singleton(HandlerExceptions::class)->getHandler()->report($throwable);

$this->channel->push(true);
Timer::clearAll();
$this->getProcess()->exit();
$this->stop();
}
}

private function stopProcessIfNecessary() {
$timerNum = 0;
foreach (Timer::list() as $item) {
++$timerNum;
}
if ($timerNum == 0) {
$this->stop();
}
}

private function stop() {
$this->getLogger()->debug('process ' . $this->getProcessName() . ' exit');

$this->channel->push(true);
Timer::clearAll();
$this->getProcess()->exit();
}

abstract protected function run(Process $process);

/**
Expand All @@ -192,8 +209,4 @@ public function sendMsg($msg) {
public function readMsg($size = null) {
return $this->getProcess()->pop($size);
}

public function onStop() {
$this->getLogger()->debug('process ' . $this->getProcessName() . ' exit');
}
}

0 comments on commit 5c05ec8

Please sign in to comment.