Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add WebSocket client implementation #6

Merged
merged 24 commits into from
Oct 5, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/main/php/websocket/Listener.class.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ public abstract function message($connection, $message);
* Closes connection
*
* @param websocket.protocol.Connection $connection
* @param int $code
* @param string $reason
* @return void
*/
public function close($connection) { /* NOOP */ }
public function close($connection, $code, $reason) { /* NOOP */ }
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a BC break - all Listener implementations with a close() method need to be adapted!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
223 changes: 223 additions & 0 deletions src/main/php/websocket/WebSocket.class.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
<?php namespace websocket;

use lang\{Closeable, Throwable};
use peer\{Socket, CryptoSocket, ProtocolException};
use util\Bytes;
use websocket\protocol\{Connection, Handshake, Opcodes};

/**
* WebSocket implementation
*
* @test websocket.unittest.WebSocketTest
*/
class WebSocket implements Closeable {
private $socket, $path, $origin;
private $conn= null;
private $listener= null;
private $random= 'random_bytes';

/**
* Creates a new instance
*
* @param peer.Socket|string $endpoint, e.g. "wss://example.com"
* @param string $origin
*/
public function __construct($endpoint, $origin= 'localhost') {
if ($endpoint instanceof Socket) {
$this->socket= $endpoint;
$this->path= '/';
} else {
$url= parse_url($endpoint);
if ('wss' === $url['scheme']) {
$this->socket= new CryptoSocket($url['host'], $url['port'] ?? 443);
$this->socket->cryptoImpl= STREAM_CRYPTO_METHOD_ANY_CLIENT;
} else {
$this->socket= new Socket($url['host'], $url['port'] ?? 80);
}
$this->path= $url['path'] ?? '/';
}
$this->origin= $origin;
}

/** @return peer.Socket */
public function socket() { return $this->socket; }

/** @return string */
public function path() { return $this->path; }

/** @return string */
public function origin() { return $this->origin; }

/** @return bool */
public function connected() { return $this->socket->isConnected(); }

/** @param function(int): string */
public function random($function) {
$this->random= $function;
}

/**
* Attach listener
*
* @param websocket.Listener $listener
* @return self
*/
public function listening(Listener $listener) {
$this->listener= $listener;
return $this;
}

/**
* Connects to websocket endpoint and performs handshake
*
* @see https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Sec-WebSocket-Accept
* @param [:string|string[]] $headers
* @throws peer.ProtocolException
* @return void
*/
public function connect($headers= []) {
if ($this->socket->isConnected()) return;

$key= base64_encode(($this->random)(16));
$headers+= ['Host' => $this->socket->host, 'Origin' => $this->origin];
$this->socket->connect();
$this->socket->write(
"GET {$this->path} HTTP/1.1\r\n".
"Upgrade: websocket\r\n".
"Sec-WebSocket-Key: {$key}\r\n".
"Sec-WebSocket-Version: 13\r\n".
"Connection: Upgrade\r\n"
);
foreach ($headers as $name => $values) {
foreach ((array)$values as $value) {
$this->socket->write("{$name}: {$value}\r\n");
}
}
$this->socket->write("\r\n");

sscanf($this->socket->readLine(), "HTTP/%s %d %[^\r]", $version, $status, $message);
if (101 !== $status) {
$this->socket->close();
throw new ProtocolException('Unexpected response '.$status.' '.$message);
}

$headers= [];
while ($line= $this->socket->readLine()) {
sscanf($line, "%[^:]: %[^\r]", $header, $value);
$headers[$header][]= $value;
}

$accept= $headers['Sec-Websocket-Accept'][0] ?? '';
$expect= base64_encode(sha1($key.Handshake::GUID, true));
if ($accept !== $expect) {
$this->socket->close();
throw new ProtocolException('Accept key mismatch, have '.$accept.', expect '.$expect);
}

$this->socket->setTimeout(600.0);
$this->conn= new Connection(
$this->socket,
(int)$this->socket->getHandle(),
$this->listener,
$this->path,
$headers
);
$this->conn->open();
}

/**
* Sends a ping
*
* @param string $payload
* @return void
* @throws peer.ProtocolException
*/
public function ping($payload= '') {
if (!$this->socket->isConnected()) throw new ProtocolException('Not connected');

$this->conn->message(Opcodes::PING, $payload, ($this->random)(4));
}

/**
* Sends a message
*
* @param util.Bytes|string $message
* @return void
* @throws peer.ProtocolException
*/
public function send($message) {
if (!$this->socket->isConnected()) throw new ProtocolException('Not connected');

if ($message instanceof Bytes) {
$this->conn->message(Opcodes::BINARY, $message, ($this->random)(4));
} else {
$this->conn->message(Opcodes::TEXT, $message, ($this->random)(4));
}
}

/**
* Receive messages, handling PING and CLOSE
*
* @return iterable
* @throws peer.ProtocolException
*/
public function receive($timeout= null) {
if (!$this->socket->isConnected()) throw new ProtocolException('Not connected');

if (null !== $timeout && !$this->socket->canRead($timeout)) return;
foreach ($this->conn->receive() as $opcode => $packet) {
switch ($opcode) {
case Opcodes::TEXT:
$this->conn->on($packet);
yield $packet;
break;

case Opcodes::BINARY:
$message= new Bytes($packet);
$this->conn->on($message);
yield $message;
break;

case Opcodes::PING:
$this->conn->message(Opcodes::PONG, $packet, ($this->random)(4));
break;

case Opcodes::PONG: // Do not answer PONGs
break;

case Opcodes::CLOSE:
$close= unpack('ncode/a*reason', $packet);
$this->conn->close($close['code'], $close['reason']);
$this->socket->close();

// 1000 is a normal close, all others indicate an error
if (1000 === $close['code']) return;
throw new ProtocolException('Connection closed (#'.$close['code'].'): '.$close['reason']);
}
}
}

/**
* Closes connection
*
* @param int $code
* @param string $reason
* @return void
*/
public function close($code= 1000, $reason= '') {
if (!$this->socket->isConnected()) return;

try {
$this->conn->message(Opcodes::CLOSE, pack('na*', $code, $reason), ($this->random)(4));
} catch (Throwable $ignored) {
// ...
}
$this->conn->close($code, $reason);
$this->socket->close();
}

/** Destructor - ensures connection is closed */
public function __destruct() {
$this->close();
}
}
59 changes: 41 additions & 18 deletions src/main/php/websocket/protocol/Connection.class.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ class Connection {
*
* @param peer.Socket $socket
* @param int $id
* @param websocket.Listener $listener
* @param ?websocket.Listener $listener
* @param string $path
* @param [:var] $headers
*/
public function __construct($socket, $id, Listener $listener, $path= '/', $headers= []) {
public function __construct($socket, $id, $listener, $path= '/', $headers= []) {
$this->socket= $socket;
$this->id= $id;
$this->listener= $listener;
Expand All @@ -49,7 +49,7 @@ public function headers() { return $this->headers; }
* @return void
*/
public function open() {
$this->listener->open($this);
$this->listener && $this->listener->open($this);
}

/**
Expand All @@ -59,16 +59,21 @@ public function open() {
* @return var
*/
public function on($payload) {
return $this->listener->message($this, $payload);
return $this->listener ? $this->listener->message($this, $payload) : null;
}

/**
* Opens connection
* Closes connection
*
* @param int $code
* @param string $reason
* @return void
*/
public function close() {
$this->listener->close($this);
public function close($code= 1000, $reason= '') {
if ($this->socket->isConnected()) {
$this->listener && $this->listener->close($this, $code, $reason);
$this->socket->close();
}
}

/**
Expand Down Expand Up @@ -102,10 +107,7 @@ public function receive() {
$continue= [];
do {
$packet= $this->read(2);
if (strlen($packet) < 2) {
$this->socket->close();
return;
}
if (strlen($packet) < 2) return;

$final= $packet[0] & "\x80";
$opcode= $packet[0] & "\x0f";
Expand All @@ -118,8 +120,7 @@ public function receive() {

// Verify opcode, send protocol error if unkown
if (!isset($packets[$opcode])) {
$this->transmit(Opcodes::CLOSE, pack('n', 1002));
$this->socket->close();
yield Opcodes::CLOSE => pack('n', 1002);
return;
}

Expand All @@ -133,8 +134,7 @@ public function receive() {

// Verify length
if ($read > self::MAXLENGTH) {
$this->transmit(Opcodes::CLOSE, pack('n', 1003));
$this->socket->close();
yield Opcodes::CLOSE => pack('n', 1003);
return;
}

Expand All @@ -160,6 +160,29 @@ public function receive() {
} while ($continue);
}

/**
* Sends an message
*
* @param string $type One of the class constants TEXT | BINARY | CLOSE | PING | PONG
* @param string $payload
* @param string $mask 4 bytes
* @return void
*/
public function message($type, $payload, $mask) {
$length= strlen($payload);
$data= '';
for ($i = 0; $i < $length; $i+= 4) {
$data.= $mask ^ substr($payload, $i, 4);
}

if ($length < 126) {
$this->socket->write(("\x80" | $type).("\x80" | chr($length)).$mask.$data);
} else if ($length < 65536) {
$this->socket->write(("\x80" | $type)."\xfe".pack('n', $length).$mask.$data);
} else {
$this->socket->write(("\x80" | $type)."\xff".pack('J', $length).$mask.$data);
}
}

/**
* Transmits an answer
Expand All @@ -168,7 +191,7 @@ public function receive() {
* @param string $payload
* @return void
*/
public function transmit($type, $payload) {
public function answer($type, $payload) {
$length= strlen($payload);
if ($length < 126) {
$this->socket->write(("\x80" | $type).chr($length).$payload);
Expand All @@ -187,9 +210,9 @@ public function transmit($type, $payload) {
*/
public function send($arg) {
if ($arg instanceof Bytes) {
$this->transmit(Opcodes::BINARY, $arg);
$this->answer(Opcodes::BINARY, $arg);
} else {
$this->transmit(Opcodes::TEXT, $arg);
$this->answer(Opcodes::TEXT, $arg);
}
}
}
Loading
Loading