From 671f55129a63f1d7ceeca5018c061de82d925875 Mon Sep 17 00:00:00 2001 From: titrxw <1120309488@qq.com> Date: Thu, 24 Feb 2022 15:51:15 +0800 Subject: [PATCH] =?UTF-8?q?=E5=88=A0=E9=99=A4tcp,websocket=20session?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Src/App.php | 2 +- Src/Core/Listener/PipeMessageListener.php | 2 +- Src/Core/Listener/TaskListener.php | 2 +- Src/Tcp/Listener/AfterWorkerStopListener.php | 32 ------------------- Src/Tcp/Listener/BeforeStartListener.php | 2 +- Src/Tcp/Listener/CloseListener.php | 3 -- Src/Tcp/Listener/ConnectListener.php | 21 ++---------- Src/Tcp/Listener/ReceiveListener.php | 21 ++++++++---- .../Listener/AfterWorkerStopListener.php | 32 ------------------- .../Listener/BeforeStartListener.php | 2 +- Src/WebSocket/Listener/CloseListener.php | 14 -------- Src/WebSocket/Listener/HandShakeListener.php | 15 --------- Src/WebSocket/Listener/MessageListener.php | 25 +++++++++------ 13 files changed, 36 insertions(+), 137 deletions(-) delete mode 100644 Src/Tcp/Listener/AfterWorkerStopListener.php delete mode 100644 Src/WebSocket/Listener/AfterWorkerStopListener.php diff --git a/Src/App.php b/Src/App.php index 4d135d50..4d6a139f 100644 --- a/Src/App.php +++ b/Src/App.php @@ -30,7 +30,7 @@ */ class App { public const NAME = 'w7-rangine'; - public const VERSION = '2.5.7'; + public const VERSION = '2.5.8'; public static $self; protected $appNamespace; diff --git a/Src/Core/Listener/PipeMessageListener.php b/Src/Core/Listener/PipeMessageListener.php index 75af55f3..f6d95f01 100644 --- a/Src/Core/Listener/PipeMessageListener.php +++ b/Src/Core/Listener/PipeMessageListener.php @@ -12,7 +12,7 @@ namespace W7\Core\Listener; -use Swoole\Http\Server; +use Swoole\Server; use W7\Core\Message\Message; use W7\Core\Server\ServerEvent; diff --git a/Src/Core/Listener/TaskListener.php b/Src/Core/Listener/TaskListener.php index e877689e..59bd5d17 100644 --- a/Src/Core/Listener/TaskListener.php +++ b/Src/Core/Listener/TaskListener.php @@ -12,7 +12,7 @@ namespace W7\Core\Listener; -use Swoole\Http\Server; +use Swoole\Server; use Swoole\Server\Task; use W7\Core\Exception\HandlerExceptions; use W7\Core\Helper\Traiter\TaskDispatchTrait; diff --git a/Src/Tcp/Listener/AfterWorkerStopListener.php b/Src/Tcp/Listener/AfterWorkerStopListener.php deleted file mode 100644 index 832a544a..00000000 --- a/Src/Tcp/Listener/AfterWorkerStopListener.php +++ /dev/null @@ -1,32 +0,0 @@ - - * - * document http://s.w7.cc/index.php?c=wiki&do=view&id=317&list=2284 - * - * visited https://www.rangine.com/ for more details - */ - -namespace W7\Tcp\Listener; - -use W7\App; -use W7\Core\Listener\ListenerAbstract; -use W7\Tcp\Collector\FdCollector; - -class AfterWorkerStopListener extends ListenerAbstract { - public function run(...$params) { - $clientCollector = FdCollector::instance()->all(); - if (empty($clientCollector)) { - return true; - } - - foreach ($clientCollector as $fd => $client) { - App::$server->getServer()->exists($fd) && App::$server->getServer()->close($fd); - } - - FdCollector::instance()->clear(); - } -} diff --git a/Src/Tcp/Listener/BeforeStartListener.php b/Src/Tcp/Listener/BeforeStartListener.php index 39a7e6c6..d7ee50de 100644 --- a/Src/Tcp/Listener/BeforeStartListener.php +++ b/Src/Tcp/Listener/BeforeStartListener.php @@ -22,7 +22,7 @@ class BeforeStartListener extends ListenerAbstract { public function run(...$params) { $this->registerRouter(); - $this->registerMiddleware(); +// $this->registerMiddleware(); } private function registerRouter() { diff --git a/Src/Tcp/Listener/CloseListener.php b/Src/Tcp/Listener/CloseListener.php index 1bb4a570..0dfc716b 100644 --- a/Src/Tcp/Listener/CloseListener.php +++ b/Src/Tcp/Listener/CloseListener.php @@ -16,7 +16,6 @@ use W7\Core\Listener\ListenerAbstract; use W7\Core\Server\ServerEnum; use W7\Core\Server\ServerEvent; -use W7\Tcp\Collector\FdCollector; class CloseListener extends ListenerAbstract { public function run(...$params) { @@ -25,8 +24,6 @@ public function run(...$params) { } private function onClose(Server $server, int $fd, int $reactorId): void { - FdCollector::instance()->delete($fd); - $this->getEventDispatcher()->dispatch(ServerEvent::ON_USER_AFTER_CLOSE, [$server, $fd, $reactorId, ServerEnum::TYPE_TCP]); } } diff --git a/Src/Tcp/Listener/ConnectListener.php b/Src/Tcp/Listener/ConnectListener.php index 6909a5f3..94660666 100644 --- a/Src/Tcp/Listener/ConnectListener.php +++ b/Src/Tcp/Listener/ConnectListener.php @@ -13,35 +13,18 @@ namespace W7\Tcp\Listener; use Swoole\Server; -use W7\Contract\Session\SessionInterface; use W7\Core\Listener\ListenerAbstract; use W7\Core\Server\ServerEnum; use W7\Core\Server\ServerEvent; -use W7\Http\Message\Outputer\TcpResponseOutputer; use W7\Http\Message\Server\Request as Psr7Request; -use W7\Http\Message\Server\Response as Psr7Response; -use W7\Tcp\Collector\FdCollector; class ConnectListener extends ListenerAbstract { public function run(...$params) { [$server, $fd, $reactorId] = $params; - return $this->onConnect($server, $fd, $reactorId); + $this->onConnect($server, $fd, $reactorId); } private function onConnect(Server $server, $fd, $reactorId) { - /** - * @var Psr7Request $psr7Request - */ - $psr7Request = new Psr7Request('', ''); - $psr7Response = new Psr7Response(); - $psr7Response->setOutputer(new TcpResponseOutputer($server, $fd)); - - //TCP session guarantees that data is shared in this connection, and Response cannot delegate SessionID, so there is no data shared between two connections - $psr7Request->session = $this->getContainer()->clone(SessionInterface::class); - $psr7Request->session->start($psr7Request); - - FdCollector::instance()->set($fd, [$psr7Request, $psr7Response]); - - $this->getEventDispatcher()->dispatch(ServerEvent::ON_USER_AFTER_OPEN, [$server, $fd, $psr7Request, ServerEnum::TYPE_TCP]); + $this->getEventDispatcher()->dispatch(ServerEvent::ON_USER_AFTER_OPEN, [$server, $fd, new Psr7Request('', ''), ServerEnum::TYPE_TCP]); } } diff --git a/Src/Tcp/Listener/ReceiveListener.php b/Src/Tcp/Listener/ReceiveListener.php index 87fada06..03c237fc 100644 --- a/Src/Tcp/Listener/ReceiveListener.php +++ b/Src/Tcp/Listener/ReceiveListener.php @@ -14,9 +14,9 @@ use Swoole\Server; use W7\Core\Listener\ListenerAbstract; +use W7\Http\Message\Outputer\TcpResponseOutputer; use W7\Http\Message\Server\Request as Psr7Request; use W7\Http\Message\Server\Response as Psr7Response; -use W7\Tcp\Collector\FdCollector; use W7\Tcp\Server\Dispatcher as RequestDispatcher; class ReceiveListener extends ListenerAbstract { @@ -26,23 +26,30 @@ public function run(...$params) { $this->dispatch($server, $reactorId, $fd, $data); } + //待优化 session + protected function getRequestAndResponse(Server $server, $fd) { + $psr7Request = new Psr7Request('POST', '/'); + $psr7Request = $psr7Request->withAttribute('fd', $fd); + + $psr7Response = new Psr7Response(); + $psr7Response->setOutputer(new TcpResponseOutputer($server, $fd)); + + return [$psr7Request, $psr7Response]; + } + private function dispatch(Server $server, $reactorId, $fd, $data) { - $this->getContext()->setContextDataByKey('fd', $fd); $this->getContext()->setContextDataByKey('reactorid', $reactorId); $this->getContext()->setContextDataByKey('workid', $server->worker_id); $this->getContext()->setContextDataByKey('coid', $this->getContext()->getCoroutineId()); - $collector = FdCollector::instance()->get($fd, []); - /** * @var Psr7Request $psr7Request */ - $psr7Request = $collector[0]; - $psr7Request = $psr7Request->loadFromTcpData($data); /** * @var Psr7Response $psr7Response */ - $psr7Response = $collector[1]; + [$psr7Request, $psr7Response] = $this->getRequestAndResponse($server, $fd); + $psr7Request = $psr7Request->loadFromTcpData($data); /** * @var RequestDispatcher $dispatcher diff --git a/Src/WebSocket/Listener/AfterWorkerStopListener.php b/Src/WebSocket/Listener/AfterWorkerStopListener.php deleted file mode 100644 index 7f394afd..00000000 --- a/Src/WebSocket/Listener/AfterWorkerStopListener.php +++ /dev/null @@ -1,32 +0,0 @@ - - * - * document http://s.w7.cc/index.php?c=wiki&do=view&id=317&list=2284 - * - * visited https://www.rangine.com/ for more details - */ - -namespace W7\WebSocket\Listener; - -use W7\App; -use W7\Core\Listener\ListenerAbstract; -use W7\WebSocket\Collector\FdCollector; - -class AfterWorkerStopListener extends ListenerAbstract { - public function run(...$params) { - $clientCollector = FdCollector::instance()->all(); - if (empty($clientCollector)) { - return true; - } - - foreach ($clientCollector as $fd => $client) { - App::$server->getServer()->isEstablished($fd) && App::$server->getServer()->disconnect($fd, 0, ''); - } - - FdCollector::instance()->clear(); - } -} diff --git a/Src/WebSocket/Listener/BeforeStartListener.php b/Src/WebSocket/Listener/BeforeStartListener.php index 09262f28..f9526e60 100644 --- a/Src/WebSocket/Listener/BeforeStartListener.php +++ b/Src/WebSocket/Listener/BeforeStartListener.php @@ -22,7 +22,7 @@ class BeforeStartListener extends ListenerAbstract { public function run(...$params) { $this->registerRouter(); - $this->registerMiddleware(); +// $this->registerMiddleware(); } private function registerRouter() { diff --git a/Src/WebSocket/Listener/CloseListener.php b/Src/WebSocket/Listener/CloseListener.php index 668e6125..dd59b063 100644 --- a/Src/WebSocket/Listener/CloseListener.php +++ b/Src/WebSocket/Listener/CloseListener.php @@ -16,8 +16,6 @@ use W7\Core\Listener\ListenerAbstract; use W7\Core\Server\ServerEnum; use W7\Core\Server\ServerEvent; -use W7\Http\Message\Server\Request as Psr7Request; -use W7\WebSocket\Collector\FdCollector; class CloseListener extends ListenerAbstract { public function run(...$params) { @@ -26,18 +24,6 @@ public function run(...$params) { } private function onClose(Server $server, int $fd, int $reactorId): void { - $fdCollector = FdCollector::instance(); - $collector = $fdCollector->get($fd, []); - if ($collector) { - /** - * @var Psr7Request $psr7Request - */ - $psr7Request = $collector[0]; - $psr7Request->session->close(); - } - - $fdCollector->delete($fd); - $this->getEventDispatcher()->dispatch(ServerEvent::ON_USER_AFTER_CLOSE, [$server, $fd, $reactorId, ServerEnum::TYPE_WEBSOCKET]); } } diff --git a/Src/WebSocket/Listener/HandShakeListener.php b/Src/WebSocket/Listener/HandShakeListener.php index 3cbfb679..13ff82ac 100644 --- a/Src/WebSocket/Listener/HandShakeListener.php +++ b/Src/WebSocket/Listener/HandShakeListener.php @@ -15,14 +15,12 @@ use Swoole\Http\Request; use Swoole\Http\Response; use W7\App; -use W7\Contract\Session\SessionInterface; use W7\Core\Listener\ListenerAbstract; use W7\Core\Server\ServerEnum; use W7\Core\Server\ServerEvent; use W7\Http\Message\Outputer\SwooleResponseOutputer; use W7\Http\Message\Server\Request as Psr7Request; use W7\Http\Message\Server\Response as Psr7Response; -use W7\WebSocket\Collector\FdCollector; class HandShakeListener extends ListenerAbstract { public function run(...$params) { @@ -73,26 +71,13 @@ private function handshake(Request $request, Response $response) { $response = $psr7Response->withHeaders($headers)->withStatus(101); - $psr7Request->session = $this->getContainer()->clone(SessionInterface::class); - $psr7Request->session->start($psr7Request); - $response = $psr7Request->session->replenishResponse($response); - try { - $localIps = swoole_get_local_ip(); - $psr7Request->session->set('fd', $request->fd); - $psr7Request->session->set('server', [ - 'ip' => array_values($localIps)[0] ?? '', - 'mac' => swoole_get_local_mac()[array_keys($localIps)[0] ?? 0] ?? '' - ]); - $this->getEventDispatcher()->dispatch(ServerEnum::TYPE_WEBSOCKET . ':' . ServerEvent::ON_OPEN, [App::$server->getServer(), $psr7Request]); } catch (\Throwable $e) { $this->getLogger()->debug($e->getMessage(), ['exception' => $e]); return false; } - FdCollector::instance()->set($request->fd, [$psr7Request, $response]); - $response->send(); return true; } diff --git a/Src/WebSocket/Listener/MessageListener.php b/Src/WebSocket/Listener/MessageListener.php index d2215542..6efdab5e 100644 --- a/Src/WebSocket/Listener/MessageListener.php +++ b/Src/WebSocket/Listener/MessageListener.php @@ -18,7 +18,6 @@ use W7\Http\Message\Outputer\WebSocketResponseOutputer; use W7\Http\Message\Server\Request as Psr7Request; use W7\Http\Message\Server\Response as Psr7Response; -use W7\WebSocket\Collector\FdCollector; use W7\WebSocket\Server\Dispatcher; class MessageListener extends ListenerAbstract { @@ -27,28 +26,34 @@ public function run(...$params) { $this->onMessage($server, $frame); } - private function onMessage(Server $server, SwooleFrame $frame): bool { + //待优化 session + protected function getRequestAndResponse(Server $server, SwooleFrame $frame) { + $psr7Request = new Psr7Request('POST', '/'); + $psr7Request = $psr7Request->withAttribute('fd', $frame->fd); + + $psr7Response = new Psr7Response(); + $psr7Response->setOutputer(new WebSocketResponseOutputer($server, $frame->fd)); + + return [$psr7Request, $psr7Response]; + } + + private function onMessage(Server $server, SwooleFrame $frame) { $this->getContext()->setContextDataByKey('workid', $server->worker_id); $this->getContext()->setContextDataByKey('coid', $this->getContext()->getCoroutineId()); - $collector = FdCollector::instance()->get($frame->fd, []); - /** * @var Psr7Request $psr7Request */ - $psr7Request = $collector[0]; - $psr7Request = $psr7Request->loadFromWSFrame($frame); /** * @var Psr7Response $psr7Response */ - $psr7Response = $collector[1]; - $psr7Response->setOutputer(new WebSocketResponseOutputer($server, $frame->fd)); + [$psr7Request, $psr7Response] = $this->getRequestAndResponse($server, $frame); + $psr7Request = $psr7Request->withAttribute('frame', $frame); + $psr7Request = $psr7Request->loadFromWSFrame($frame); $dispatcher = $this->getContainer()->get(Dispatcher::class); $psr7Response = $dispatcher->dispatch($psr7Request, $psr7Response); $psr7Response->send(); - - return true; } }