Skip to content

Commit

Permalink
RPC Implementation using multiple relays to enable async communication (
Browse files Browse the repository at this point in the history
  • Loading branch information
L3tum authored Feb 29, 2024
1 parent 05269a1 commit c8fe156
Show file tree
Hide file tree
Showing 16 changed files with 1,792 additions and 80 deletions.
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@
"scripts": {
"test": "phpunit --no-coverage --colors=always",
"test-cover": "phpunit --coverage-clover=coverage.xml",
"test-static": "psalm",
"test-static": "psalm --no-cache",
"test-mutations": "infection"
},
"minimum-stability": "dev",
Expand Down
36 changes: 36 additions & 0 deletions src/ConnectedRelayInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<?php

namespace Spiral\Goridge;

use Spiral\Goridge\Exception\RelayException;

/**
* This interface describes a Relay that explictily establishes a connection.
* That connection can also be re-established on the fly (in comparison to StreamRelay, which relies on the existence of the streams).
* The object is also clonable, i.e. supports cloning without data errors due to shared state.
*/
interface ConnectedRelayInterface extends RelayInterface
{
/**
* Returns true if the underlying connection is already established
*/
public function isConnected(): bool;

/**
* Establishes the underlying connection and returns true on success, false on failure, or throws an exception in case of an error.
*
* @throws RelayException
*/
public function connect(): bool;

/**
* Closes the underlying connection.
*/
public function close(): void;

/**
* Enforce implementation of __clone magic method
* @psalm-return void
*/
public function __clone();
}
115 changes: 115 additions & 0 deletions src/MultiRelayHelper.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
<?php

declare(strict_types=1);

namespace Spiral\Goridge;

use Spiral\Goridge\RPC\Exception\RPCException;
use function socket_select;

class MultiRelayHelper
{
/**
* @param array<array-key, RelayInterface> $relays
* @return array-key[]|false
* @internal
* Returns either
* - an array of array keys, even if only one
* - or false if none
*/
public static function findRelayWithMessage(array $relays, int $timeoutInMicroseconds = 0): array|false
{
if (count($relays) === 0) {
return false;
}

if ($relays[array_key_first($relays)] instanceof SocketRelay) {
$sockets = [];
$socketIdToRelayIndexMap = [];
foreach ($relays as $relayIndex => $relay) {
assert($relay instanceof SocketRelay);

// Enforce connection
if ($relay->socket === null) {
// Important: Do not force reconnect here as it would otherwise completely ruin further handling
continue;
}

$sockets[] = $relay->socket;
$socketIdToRelayIndexMap[spl_object_id($relay->socket)] = $relayIndex;
}

if (count($sockets) === 0) {
return false;
}

$writes = null;
$except = null;
$changes = socket_select($sockets, $writes, $except, 0, $timeoutInMicroseconds);

if ($changes > 0) {
$indexes = [];
foreach ($sockets as $socket) {
$indexes[] = $socketIdToRelayIndexMap[spl_object_id($socket)] ?? throw new RPCException("Invalid socket??");
}

return $indexes;
} else {
return false;
}
}

if ($relays[array_key_first($relays)] instanceof StreamRelay) {
$streams = [];
$streamNameToRelayIndexMap = [];
foreach ($relays as $relayIndex => $relay) {
assert($relay instanceof StreamRelay);

$streams[] = $relay->in;
$streamNameToRelayIndexMap[(string)$relay->in] = $relayIndex;
}

$writes = null;
$except = null;
$changes = stream_select($streams, $writes, $except, 0, $timeoutInMicroseconds);

if ($changes > 0) {
$indexes = [];
foreach ($streams as $stream) {
$indexes[] = $streamNameToRelayIndexMap[(string)$stream] ?? throw new RPCException("Invalid stream??");
}

return $indexes;
} else {
return false;
}
}

return false;
}

/**
* @param array<array-key, RelayInterface> $relays
* @return array-key[]|false
* @internal
* Returns either
* - an array of array keys, even if only one
* - or false if none
*/
public static function checkConnected(array $relays): array|false
{
if (count($relays) === 0) {
return false;
}

$keysNotConnected = [];
foreach ($relays as $key => $relay) {
if ($relay instanceof ConnectedRelayInterface && !$relay->isConnected()) {
$relay->connect();
$keysNotConnected[] = $key;
}
}

return $keysNotConnected;
}
}
90 changes: 90 additions & 0 deletions src/RPC/AbstractRPC.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
<?php

declare(strict_types=1);

namespace Spiral\Goridge\RPC;

use Spiral\Goridge\Frame;
use Spiral\Goridge\RelayInterface;
use Spiral\Goridge\RPC\Exception\ServiceException;
use Stringable;
use function sprintf;
use function strlen;
use function substr;
use function ucfirst;

abstract class AbstractRPC implements RPCInterface
{
/**
* RPC calls service prefix.
*
* @var non-empty-string|null
*/
protected ?string $service = null;

/**
* @var positive-int
*/
protected static int $seq = 1;

public function __construct(
protected CodecInterface $codec
) {
}

/**
* @psalm-pure
*/
public function withServicePrefix(string $service): self
{
/** @psalm-suppress ImpureVariable */
$rpc = clone $this;
$rpc->service = $service;

return $rpc;
}

/**
* @psalm-pure
*/
public function withCodec(CodecInterface $codec): self
{
/** @psalm-suppress ImpureVariable */
$rpc = clone $this;
$rpc->codec = $codec;

return $rpc;
}

/**
* @throws Exception\ServiceException
*/
protected function decodeResponse(Frame $frame, RelayInterface $relay, mixed $options = null): mixed
{
// exclude method name
$body = substr((string)$frame->payload, $frame->options[1]);

if ($frame->hasFlag(Frame::ERROR)) {
$name = $relay instanceof Stringable
? (string)$relay
: $relay::class;

throw new ServiceException(sprintf("Error '%s' on %s", $body, $name));
}

return $this->codec->decode($body, $options);
}

/**
* @param non-empty-string $method
*/
protected function packFrame(string $method, mixed $payload): Frame
{
if ($this->service !== null) {
$method = $this->service . '.' . ucfirst($method);
}

$body = $method . $this->codec->encode($payload);
return new Frame($body, [self::$seq, strlen($method)], $this->codec->getIndex());
}
}
69 changes: 69 additions & 0 deletions src/RPC/AsyncRPCInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
<?php

namespace Spiral\Goridge\RPC;

use Spiral\Goridge\Exception\GoridgeException;
use Spiral\Goridge\Exception\RelayException;
use Spiral\Goridge\RPC\Exception\RPCException;
use Spiral\Goridge\RPC\Exception\ServiceException;

interface AsyncRPCInterface extends RPCInterface
{
/**
* Invoke remote RoadRunner service method using given payload (free form) non-blockingly and ignore the response.
*
* @param non-empty-string $method
*
* @throws GoridgeException
*/
public function callIgnoreResponse(string $method, mixed $payload): void;

/**
* Invoke remote RoadRunner service method using given payload (free form) non-blockingly but accept a response.
*
* @param non-empty-string $method
* @return positive-int An "ID" to check whether a response has been received and to fetch said response.
*
* @throws GoridgeException
*/
public function callAsync(string $method, mixed $payload): int;

/**
* Check whether a response has been received using the "ID" obtained through @see AsyncRPCInterface::callAsync() .
*
* @param positive-int $seq
* @return bool
*/
public function hasResponse(int $seq): bool;

/**
* Checks the "ID"s obtained through @see AsyncRPCInterface::callAsync() if they've got a response yet.
* Returns an array of "ID"s that do.
*
* @param positive-int[] $seqs
* @return positive-int[]
*/
public function hasResponses(array $seqs): array;

/**
* Fetch the response for the "ID" obtained through @see AsyncRPCInterface::callAsync() .
* @param positive-int $seq
* @throws RPCException
* @throws ServiceException
* @throws RelayException
*/
public function getResponse(int $seq, mixed $options = null): mixed;

/**
* Fetches the responses for the "ID"s obtained through @see AsyncRPCInterface::callAsync()
* and returns a map of "ID" => Response.
* @throws RelayException
* @throws ServiceException
* @throws RPCException
*
* @param array<array-key, positive-int> $seqs
* @return iterable<positive-int, mixed>
*
*/
public function getResponses(array $seqs, mixed $options = null): iterable;
}
Loading

0 comments on commit c8fe156

Please sign in to comment.