Skip to content

Commit

Permalink
Got rid of some 'Unhandled promise rejections'
Browse files Browse the repository at this point in the history
  • Loading branch information
raoul committed Nov 30, 2023
1 parent 492fad5 commit 4bbccb9
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 25 deletions.
32 changes: 21 additions & 11 deletions src/AsyncConnectionManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use React\Promise\PromiseInterface;
use React\Promise\Timer\TimeoutException;
use Throwable;
use function React\Promise\reject;
use function React\Promise\resolve;

class AsyncConnectionManager
Expand Down Expand Up @@ -75,19 +76,23 @@ function (AsyncConnectionWriter $writer) {

return resolve(new AsyncConnectionResult($writer, true));
},
function (Throwable $e): void {
$this->logger->error('Connecting failed');
$this->connectionPromise->reject($e);
$this->connectionPromise = null;
)->catch(function (Throwable $e): PromiseInterface {
$this->logger->error('Connecting failed');

if ($e instanceof TimeoutException) {
throw new AsyncConnectionTimeoutException($e->getMessage(), $e->getCode(), $e);
}
// preventing falsely positive 'Unhandled promise rejection'
$this->connectionPromise->promise()->catch(static function (): void {
});

throw $e;
},
);
}, $doAfterFailedDisconnect);
$this->connectionPromise->reject($e);
$this->connectionPromise = null;

if ($e instanceof TimeoutException) {
return reject(new AsyncConnectionTimeoutException($e->getMessage(), $e->getCode(), $e));
}

return reject($e);
});
}, $doAfterFailedDisconnect)->catch(static fn (Throwable $e) => reject($e));
}

public function disconnect(): PromiseInterface
Expand Down Expand Up @@ -127,6 +132,11 @@ public function disconnect(): PromiseInterface
return resolve($value);
}, function (Throwable $e): void {
$this->logger->debug('Disconnection failed');

// preventing falsely positive 'Unhandled promise rejection'
$this->disconnectionPromise->promise()->catch(static function (): void {
});

$this->disconnectionPromise->reject($e);
$this->disconnectionPromise = null;

Expand Down
11 changes: 6 additions & 5 deletions src/AsyncMessageQueueManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use function array_slice;
use function array_values;
use function count;
use function React\Promise\reject;
use function React\Promise\resolve;
use function sprintf;
use function time;
Expand Down Expand Up @@ -125,11 +126,11 @@ function (AsyncConnectionResult $result) use ($requestsCounter) {

return $this->minIntervalPromise->then(static fn () => resolve($result));
},
function (Throwable $exception) use ($requestsCounter): void {
function (Throwable $exception) use ($requestsCounter): PromiseInterface {
$this->log('connection failed', $requestsCounter);
$this->finishWithError($exception, $requestsCounter);

throw $exception;
return reject($exception);
},
)->then(
fn (AsyncConnectionResult $result) => $this->asyncMessageSender->sendMessage($result->getWriter(), $message)->then(static fn () => resolve($result)),
Expand All @@ -144,14 +145,14 @@ function (AsyncConnectionResult $result) use ($requestsCounter): void {
$this->lastSentMessageTime = time();
$this->finishWithSuccess($requestsCounter);
},
function (Throwable $exception) use ($requestsCounter): void {
function (Throwable $exception) use ($requestsCounter): PromiseInterface {
$this->log('sending failed', $requestsCounter);
$this->forceReconnect = true;
$this->finishWithError($exception, $requestsCounter);

throw $exception;
return reject($exception);
},
);
)->catch(static fn (Throwable $e) => reject($e));
}

public function getQueuedMessagesCount(): int
Expand Down
26 changes: 21 additions & 5 deletions src/Smtp/AsyncSmtpConnectionWriter.php
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public function __construct(
)
{
if (!$connection->isReadable() || !$connection->isWritable()) {
throw new InvalidSmtpConnectionException();
throw new InvalidSmtpConnectionException('SMTP connection stream is not readable or/and not writable.');
}

$connection->on('data', function ($data): void {
Expand Down Expand Up @@ -73,7 +73,7 @@ public function write(AsyncMessage $message): PromiseInterface
if (!$this->isValid()) {
$this->logger->error('stream not valid');

return reject(new InvalidSmtpConnectionException());
return reject(new InvalidSmtpConnectionException('SMTP connection stream is not readable or/and not writable.'));
}

if ($message instanceof AsyncDoubleResponseMessage) {
Expand All @@ -92,14 +92,22 @@ public function write(AsyncMessage $message): PromiseInterface
$message->getTextReplacement(),
];

$this->connection->write(sprintf('%s%s', $message->getText(), Message::EOL));
$result = $this->connection->write(sprintf('%s%s', $message->getText(), Message::EOL));
if ($result === false) {
throw new InvalidSmtpConnectionException('Write failed.');
}

return $firstResponse->promise()
->then(static fn () => $secondResponse->promise());
->then(static fn () => $secondResponse->promise())
->catch(static fn (Throwable $e) => reject($e),
);
}

$deferred = new Deferred();
$this->connection->write(sprintf('%s%s', $message->getText(), Message::EOL));
$result = $this->connection->write(sprintf('%s%s', $message->getText(), Message::EOL));
if ($result === false) {
throw new InvalidSmtpConnectionException('Write failed.');
}

if ($message instanceof AsyncSingleResponseMessage) {
$this->expectedResponses[] = [
Expand Down Expand Up @@ -128,6 +136,10 @@ private function processDataResponse(string $data): void

[$deferred, $expectedCodes, $message, $messageToReplace] = array_shift($this->expectedResponses);

// preventing falsely positive 'Unhandled promise rejection'
$deferred->promise()->catch(static function (): void {
});

if (preg_match('~^[\d]{3}$~i', $data) !== 1
&& preg_match('~^[\d]{3}[^\d]+~i', $data) !== 1) {
$deferred->reject(new AsyncSmtpConnectionException(sprintf('Unexpected response format: %s.', $data)));
Expand Down Expand Up @@ -184,6 +196,10 @@ private function processNonDataResponse(
continue;
}

// preventing falsely positive 'Unhandled promise rejection'
$deferred->promise()->catch(static function (): void {
});

$deferred->reject(new AsyncSmtpConnectionException($exceptionMessage, 0, $previousException));
}
});
Expand Down
7 changes: 5 additions & 2 deletions src/Smtp/AsyncSmtpConnector.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
use React\Promise\PromiseInterface;
use React\Socket\ConnectionInterface;
use React\Socket\ConnectorInterface;
use Throwable;
use function base64_encode;
use function React\Promise\reject;
use function React\Promise\resolve;
use function sprintf;

Expand Down Expand Up @@ -38,7 +40,8 @@ public function connect(): PromiseInterface
$writer = $this->asyncSmtpWriterFactory->create($connection);

return $this->greetServer($writer);
})->then(fn ($writer) => $this->loginToServer($writer))->then(static fn ($writer) => resolve($writer));
})->then(fn ($writer) => $this->loginToServer($writer))->then(static fn ($writer) => resolve($writer))
->catch(static fn (Throwable $e) => reject($e));
}

public function disconnect(AsyncConnectionWriter $writer): PromiseInterface
Expand All @@ -52,7 +55,7 @@ private function greetServer(AsyncSmtpConnectionWriter $writer): PromiseInterfac
$ehloMessage = new AsyncDoubleResponseMessage(sprintf('EHLO %s', $self), [SmtpCode::SERVICE_READY], [SmtpCode::OK]);

return $writer->write($ehloMessage)
->otherwise(static function (AsyncSmtpConnectionException $e) use ($self, $writer) {
->catch(static function (AsyncSmtpConnectionException $e) use ($self, $writer) {
$heloMessage = new AsyncDoubleResponseMessage(sprintf('HELO %s', $self), [SmtpCode::SERVICE_READY], [SmtpCode::OK]);

return $writer->write($heloMessage);
Expand Down
4 changes: 2 additions & 2 deletions src/Smtp/InvalidSmtpConnectionException.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
class InvalidSmtpConnectionException extends AsyncConnectionException
{

public function __construct()
public function __construct(string $message)
{
parent::__construct('SMTP connection stream is not readable or/and not writable.');
parent::__construct($message);
}

}
13 changes: 13 additions & 0 deletions tests/Smtp/AsyncSmtpConnectionWriterTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ protected function setUp(): void
public function testInvalidStreamThrowsException(): void
{
$this->expectException(InvalidSmtpConnectionException::class);
$this->expectExceptionMessage('SMTP connection stream is not readable or/and not writable.');

$connectionMock = $this->createMock(ConnectionInterface::class);
$connectionMock->method('isReadable')->willReturn(false);
Expand All @@ -69,6 +70,18 @@ public function testInvalidStreamThrowsException(): void
new AsyncSmtpConnectionWriter($connectionMock, $this->logger);
}

public function testFailedWriteThrowsException(): void
{
$this->expectException(InvalidSmtpConnectionException::class);
$this->expectExceptionMessage('Write failed.');

$connectionMock = $this->createConnectionMock();
$connectionMock->method('write')->willReturn(false);

$writer = new AsyncSmtpConnectionWriter($connectionMock, $this->logger);
$writer->write(new AsyncSingleResponseMessage('AUTH LOGIN', [334]));
}

public function testUnexpectedConnectionEndFromServer(): void
{
$writer = new AsyncSmtpConnectionWriter($this->createConnectionMock(), $this->logger);
Expand Down

0 comments on commit 4bbccb9

Please sign in to comment.