Skip to content

Commit

Permalink
删除tcp,websocket session
Browse files Browse the repository at this point in the history
  • Loading branch information
titrxw committed Feb 24, 2022
1 parent df1a0ed commit 671f551
Show file tree
Hide file tree
Showing 13 changed files with 36 additions and 137 deletions.
2 changes: 1 addition & 1 deletion Src/App.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
*/
class App {
public const NAME = 'w7-rangine';
public const VERSION = '2.5.7';
public const VERSION = '2.5.8';

public static $self;
protected $appNamespace;
Expand Down
2 changes: 1 addition & 1 deletion Src/Core/Listener/PipeMessageListener.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

namespace W7\Core\Listener;

use Swoole\Http\Server;
use Swoole\Server;
use W7\Core\Message\Message;
use W7\Core\Server\ServerEvent;

Expand Down
2 changes: 1 addition & 1 deletion Src/Core/Listener/TaskListener.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

namespace W7\Core\Listener;

use Swoole\Http\Server;
use Swoole\Server;
use Swoole\Server\Task;
use W7\Core\Exception\HandlerExceptions;
use W7\Core\Helper\Traiter\TaskDispatchTrait;
Expand Down
32 changes: 0 additions & 32 deletions Src/Tcp/Listener/AfterWorkerStopListener.php

This file was deleted.

2 changes: 1 addition & 1 deletion Src/Tcp/Listener/BeforeStartListener.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
class BeforeStartListener extends ListenerAbstract {
public function run(...$params) {
$this->registerRouter();
$this->registerMiddleware();
// $this->registerMiddleware();
}

private function registerRouter() {
Expand Down
3 changes: 0 additions & 3 deletions Src/Tcp/Listener/CloseListener.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
use W7\Core\Listener\ListenerAbstract;
use W7\Core\Server\ServerEnum;
use W7\Core\Server\ServerEvent;
use W7\Tcp\Collector\FdCollector;

class CloseListener extends ListenerAbstract {
public function run(...$params) {
Expand All @@ -25,8 +24,6 @@ public function run(...$params) {
}

private function onClose(Server $server, int $fd, int $reactorId): void {
FdCollector::instance()->delete($fd);

$this->getEventDispatcher()->dispatch(ServerEvent::ON_USER_AFTER_CLOSE, [$server, $fd, $reactorId, ServerEnum::TYPE_TCP]);
}
}
21 changes: 2 additions & 19 deletions Src/Tcp/Listener/ConnectListener.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,35 +13,18 @@
namespace W7\Tcp\Listener;

use Swoole\Server;
use W7\Contract\Session\SessionInterface;
use W7\Core\Listener\ListenerAbstract;
use W7\Core\Server\ServerEnum;
use W7\Core\Server\ServerEvent;
use W7\Http\Message\Outputer\TcpResponseOutputer;
use W7\Http\Message\Server\Request as Psr7Request;
use W7\Http\Message\Server\Response as Psr7Response;
use W7\Tcp\Collector\FdCollector;

class ConnectListener extends ListenerAbstract {
public function run(...$params) {
[$server, $fd, $reactorId] = $params;
return $this->onConnect($server, $fd, $reactorId);
$this->onConnect($server, $fd, $reactorId);
}

private function onConnect(Server $server, $fd, $reactorId) {
/**
* @var Psr7Request $psr7Request
*/
$psr7Request = new Psr7Request('', '');
$psr7Response = new Psr7Response();
$psr7Response->setOutputer(new TcpResponseOutputer($server, $fd));

//TCP session guarantees that data is shared in this connection, and Response cannot delegate SessionID, so there is no data shared between two connections
$psr7Request->session = $this->getContainer()->clone(SessionInterface::class);
$psr7Request->session->start($psr7Request);

FdCollector::instance()->set($fd, [$psr7Request, $psr7Response]);

$this->getEventDispatcher()->dispatch(ServerEvent::ON_USER_AFTER_OPEN, [$server, $fd, $psr7Request, ServerEnum::TYPE_TCP]);
$this->getEventDispatcher()->dispatch(ServerEvent::ON_USER_AFTER_OPEN, [$server, $fd, new Psr7Request('', ''), ServerEnum::TYPE_TCP]);
}
}
21 changes: 14 additions & 7 deletions Src/Tcp/Listener/ReceiveListener.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@

use Swoole\Server;
use W7\Core\Listener\ListenerAbstract;
use W7\Http\Message\Outputer\TcpResponseOutputer;
use W7\Http\Message\Server\Request as Psr7Request;
use W7\Http\Message\Server\Response as Psr7Response;
use W7\Tcp\Collector\FdCollector;
use W7\Tcp\Server\Dispatcher as RequestDispatcher;

class ReceiveListener extends ListenerAbstract {
Expand All @@ -26,23 +26,30 @@ public function run(...$params) {
$this->dispatch($server, $reactorId, $fd, $data);
}

//待优化 session
protected function getRequestAndResponse(Server $server, $fd) {
$psr7Request = new Psr7Request('POST', '/');
$psr7Request = $psr7Request->withAttribute('fd', $fd);

$psr7Response = new Psr7Response();
$psr7Response->setOutputer(new TcpResponseOutputer($server, $fd));

return [$psr7Request, $psr7Response];
}

private function dispatch(Server $server, $reactorId, $fd, $data) {
$this->getContext()->setContextDataByKey('fd', $fd);
$this->getContext()->setContextDataByKey('reactorid', $reactorId);
$this->getContext()->setContextDataByKey('workid', $server->worker_id);
$this->getContext()->setContextDataByKey('coid', $this->getContext()->getCoroutineId());

$collector = FdCollector::instance()->get($fd, []);

/**
* @var Psr7Request $psr7Request
*/
$psr7Request = $collector[0];
$psr7Request = $psr7Request->loadFromTcpData($data);
/**
* @var Psr7Response $psr7Response
*/
$psr7Response = $collector[1];
[$psr7Request, $psr7Response] = $this->getRequestAndResponse($server, $fd);
$psr7Request = $psr7Request->loadFromTcpData($data);

/**
* @var RequestDispatcher $dispatcher
Expand Down
32 changes: 0 additions & 32 deletions Src/WebSocket/Listener/AfterWorkerStopListener.php

This file was deleted.

2 changes: 1 addition & 1 deletion Src/WebSocket/Listener/BeforeStartListener.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
class BeforeStartListener extends ListenerAbstract {
public function run(...$params) {
$this->registerRouter();
$this->registerMiddleware();
// $this->registerMiddleware();
}

private function registerRouter() {
Expand Down
14 changes: 0 additions & 14 deletions Src/WebSocket/Listener/CloseListener.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
use W7\Core\Listener\ListenerAbstract;
use W7\Core\Server\ServerEnum;
use W7\Core\Server\ServerEvent;
use W7\Http\Message\Server\Request as Psr7Request;
use W7\WebSocket\Collector\FdCollector;

class CloseListener extends ListenerAbstract {
public function run(...$params) {
Expand All @@ -26,18 +24,6 @@ public function run(...$params) {
}

private function onClose(Server $server, int $fd, int $reactorId): void {
$fdCollector = FdCollector::instance();
$collector = $fdCollector->get($fd, []);
if ($collector) {
/**
* @var Psr7Request $psr7Request
*/
$psr7Request = $collector[0];
$psr7Request->session->close();
}

$fdCollector->delete($fd);

$this->getEventDispatcher()->dispatch(ServerEvent::ON_USER_AFTER_CLOSE, [$server, $fd, $reactorId, ServerEnum::TYPE_WEBSOCKET]);
}
}
15 changes: 0 additions & 15 deletions Src/WebSocket/Listener/HandShakeListener.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,12 @@
use Swoole\Http\Request;
use Swoole\Http\Response;
use W7\App;
use W7\Contract\Session\SessionInterface;
use W7\Core\Listener\ListenerAbstract;
use W7\Core\Server\ServerEnum;
use W7\Core\Server\ServerEvent;
use W7\Http\Message\Outputer\SwooleResponseOutputer;
use W7\Http\Message\Server\Request as Psr7Request;
use W7\Http\Message\Server\Response as Psr7Response;
use W7\WebSocket\Collector\FdCollector;

class HandShakeListener extends ListenerAbstract {
public function run(...$params) {
Expand Down Expand Up @@ -73,26 +71,13 @@ private function handshake(Request $request, Response $response) {

$response = $psr7Response->withHeaders($headers)->withStatus(101);

$psr7Request->session = $this->getContainer()->clone(SessionInterface::class);
$psr7Request->session->start($psr7Request);
$response = $psr7Request->session->replenishResponse($response);

try {
$localIps = swoole_get_local_ip();
$psr7Request->session->set('fd', $request->fd);
$psr7Request->session->set('server', [
'ip' => array_values($localIps)[0] ?? '',
'mac' => swoole_get_local_mac()[array_keys($localIps)[0] ?? 0] ?? ''
]);

$this->getEventDispatcher()->dispatch(ServerEnum::TYPE_WEBSOCKET . ':' . ServerEvent::ON_OPEN, [App::$server->getServer(), $psr7Request]);
} catch (\Throwable $e) {
$this->getLogger()->debug($e->getMessage(), ['exception' => $e]);
return false;
}

FdCollector::instance()->set($request->fd, [$psr7Request, $response]);

$response->send();
return true;
}
Expand Down
25 changes: 15 additions & 10 deletions Src/WebSocket/Listener/MessageListener.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
use W7\Http\Message\Outputer\WebSocketResponseOutputer;
use W7\Http\Message\Server\Request as Psr7Request;
use W7\Http\Message\Server\Response as Psr7Response;
use W7\WebSocket\Collector\FdCollector;
use W7\WebSocket\Server\Dispatcher;

class MessageListener extends ListenerAbstract {
Expand All @@ -27,28 +26,34 @@ public function run(...$params) {
$this->onMessage($server, $frame);
}

private function onMessage(Server $server, SwooleFrame $frame): bool {
//待优化 session
protected function getRequestAndResponse(Server $server, SwooleFrame $frame) {
$psr7Request = new Psr7Request('POST', '/');
$psr7Request = $psr7Request->withAttribute('fd', $frame->fd);

$psr7Response = new Psr7Response();
$psr7Response->setOutputer(new WebSocketResponseOutputer($server, $frame->fd));

return [$psr7Request, $psr7Response];
}

private function onMessage(Server $server, SwooleFrame $frame) {
$this->getContext()->setContextDataByKey('workid', $server->worker_id);
$this->getContext()->setContextDataByKey('coid', $this->getContext()->getCoroutineId());

$collector = FdCollector::instance()->get($frame->fd, []);

/**
* @var Psr7Request $psr7Request
*/
$psr7Request = $collector[0];
$psr7Request = $psr7Request->loadFromWSFrame($frame);
/**
* @var Psr7Response $psr7Response
*/
$psr7Response = $collector[1];
$psr7Response->setOutputer(new WebSocketResponseOutputer($server, $frame->fd));
[$psr7Request, $psr7Response] = $this->getRequestAndResponse($server, $frame);
$psr7Request = $psr7Request->withAttribute('frame', $frame);
$psr7Request = $psr7Request->loadFromWSFrame($frame);

$dispatcher = $this->getContainer()->get(Dispatcher::class);
$psr7Response = $dispatcher->dispatch($psr7Request, $psr7Response);

$psr7Response->send();

return true;
}
}

0 comments on commit 671f551

Please sign in to comment.