Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Arch 414 blocking consumer multiple bindings 6.4.15 #4

Open
wants to merge 21 commits into
base: 6.4
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,8 @@
"allow-plugins": {
"php-http/discovery": false,
"symfony/runtime": true
}
},
"use-parent-dir": true
},
"autoload": {
"psr-4": {
Expand Down Expand Up @@ -219,10 +220,6 @@
"symfony/contracts": "3.4.x-dev"
}
}
},
{
"type": "path",
"url": "src/Symfony/Component/Runtime"
}
],
"minimum-stability": "dev"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,9 @@ public function testItReceivesSignals()
$sender->send(new Envelope(new DummyMessage('Hello')));

$amqpReadTimeout = 30;
$dsn = getenv('MESSENGER_AMQP_DSN').'?read_timeout='.$amqpReadTimeout;
$process = new PhpProcess(file_get_contents(__DIR__.'/../Fixtures/long_receiver.php'), null, [
'COMPONENT_ROOT' => __DIR__.'/../../',
$dsn = getenv('MESSENGER_AMQP_DSN') . '?read_timeout=' . $amqpReadTimeout;
$process = new PhpProcess(file_get_contents(__DIR__ . '/../Fixtures/long_receiver.php'), null, [
'COMPONENT_ROOT' => __DIR__ . '/../../',
'DSN' => $dsn,
]);

Expand All @@ -211,7 +211,7 @@ public function testItReceivesSignals()
// make sure the process exited, after consuming only the 1 message
$this->assertFalse($process->isRunning());
$this->assertLessThan($amqpReadTimeout, microtime(true) - $signalTime);
$this->assertSame($expectedOutput.<<<'TXT'
$this->assertSame($expectedOutput . <<<'TXT'
Get envelope with message: Symfony\Component\Messenger\Bridge\Amqp\Tests\Fixtures\DummyMessage
with stamps: [
"Symfony\\Component\\Messenger\\Stamp\\SerializedMessageStamp",
Expand Down Expand Up @@ -256,7 +256,7 @@ private function waitForOutput(Process $process, string $output, $timeoutInSecon
usleep(100 * 1000); // 100ms
}

throw new \RuntimeException('Expected output never arrived. Got "'.$process->getOutput().'" instead.');
throw new \RuntimeException('Expected output never arrived. Got "' . $process->getOutput() . '" instead.');
}

private function createSerializer(): SerializerInterface
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,33 @@ public function testItReturnsTheDecodedMessageToTheHandler()
$this->assertEquals(new DummyMessage('Hi'), $actualEnvelopes[0]->getMessage());
}

public function testItReturnsTheDecodedMessageToTheHandlerInBlockingMode()
{
$connection = $this->getMockBuilder(Connection::class)
->disableOriginalConstructor()
->onlyMethods(['getQueueNames', 'pull'])
->getMock();
$serializer = new Serializer(
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
);

$amqpEnvelope = $this->createAMQPEnvelope();

$amqpQueue = $this->createMock(\AMQPQueue::class);
$amqpQueue->method('getName')->willReturn('queueName');

$connection->method('getQueueNames')->willReturn(['queueName']);
$connection->method('pull')->willReturnCallback(function (string $queueName, callable $callback) use ($amqpQueue, $amqpEnvelope) {
\call_user_func($callback, $amqpEnvelope, $amqpQueue);
});

$receiver = new AmqpReceiver($connection, $serializer);
$receiver->pull(function (Envelope $envelope): bool {
$this->assertEquals(new DummyMessage('Hi'), $envelope->getMessage());
return true;
});
}

public function testItThrowsATransportExceptionIfItCannotAcknowledgeMessage()
{
$this->expectException(TransportException::class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,37 @@ public function testReceivesMessages()
$this->assertSame($decodedMessage, $envelopes[0]->getMessage());
}

public function testReceivesMessagesInBlockingMode()
{
$transport = $this->getTransport(
$serializer = $this->createMock(SerializerInterface::class),
$connection = $this->getMockBuilder(Connection::class)
->disableOriginalConstructor()
->onlyMethods(['getQueueNames', 'pull'])
->getMock()
);

$decodedMessage = new DummyMessage('Decoded.');

$amqpEnvelope = $this->createMock(\AMQPEnvelope::class);
$amqpEnvelope->method('getBody')->willReturn('body');
$amqpEnvelope->method('getHeaders')->willReturn(['my' => 'header']);

$amqpQueue = $this->createMock(\AMQPQueue::class);
$amqpQueue->method('getName')->willReturn('queueName');

$serializer->method('decode')->with(['body' => 'body', 'headers' => ['my' => 'header']])->willReturn(new Envelope($decodedMessage));
$connection->method('getQueueNames')->willReturn(['queueName']);
$connection->method('pull')->willReturnCallback(function (string $queueName, callable $callback) use ($amqpQueue, $amqpEnvelope) {
\call_user_func($callback, $amqpEnvelope, $amqpQueue);
});

$transport->pull(function (Envelope $envelope) use ($decodedMessage): bool {
$this->assertSame($decodedMessage, $envelope->getMessage());
return true;
});
}

private function getTransport(?SerializerInterface $serializer = null, ?Connection $connection = null): AmqpTransport
{
$serializer ??= $this->createMock(SerializerInterface::class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
use Symfony\Component\Messenger\Transport\Receiver\QueueBlockingReceiverInterface;
use Symfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
Expand All @@ -25,7 +26,7 @@
*
* @author Samuel Roze <[email protected]>
*/
class AmqpReceiver implements QueueReceiverInterface, MessageCountAwareInterface
class AmqpReceiver implements QueueReceiverInterface, QueueBlockingReceiverInterface, MessageCountAwareInterface
{
private SerializerInterface $serializer;
private Connection $connection;
Expand All @@ -41,6 +42,46 @@ public function get(): iterable
yield from $this->getFromQueues($this->connection->getQueueNames());
}

public function pull(callable $callback): void
{
$this->pullFromQueues($this->connection->getQueueNames(), $callback);
}

public function pullFromQueues(array $queueNames, callable $callback): void
{
if (0 === \count($queueNames)) {
return;
}

// Pop last queue to send callback
$firstQueue = array_pop($queueNames);

foreach ($queueNames as $queueName) {
$this->pullEnvelope($queueName, null);
}

$this->pullEnvelope($firstQueue, $callback);
}

private function pullEnvelope(string $queueName, ?callable $callback): void
{
if (null !== $callback) {
$callback = function (\AMQPEnvelope $amqpEnvelope, \AMQPQueue $queue) use ($callback) {
$queueName = $queue->getName();
$body = $amqpEnvelope->getBody();
$envelope = $this->decodeAmqpEnvelope($amqpEnvelope, $body, $queueName);

return $callback($envelope->with(new AmqpReceivedStamp($amqpEnvelope, $queueName)));
};
}

try {
$this->connection->pull($queueName, $callback);
} catch (\AMQPException $exception) {
throw new TransportException($exception->getMessage(), 0, $exception);
}
}

public function getFromQueues(array $queueNames): iterable
{
foreach ($queueNames as $queueName) {
Expand All @@ -61,9 +102,15 @@ private function getEnvelope(string $queueName): iterable
}

$body = $amqpEnvelope->getBody();
$envelope = $this->decodeAmqpEnvelope($amqpEnvelope, $body, $queueName);

yield $envelope->with(new AmqpReceivedStamp($amqpEnvelope, $queueName));
}

private function decodeAmqpEnvelope(\AMQPEnvelope $amqpEnvelope, $body, string $queueName): Envelope
{
try {
$envelope = $this->serializer->decode([
return $this->serializer->decode([
'body' => false === $body ? '' : $body, // workaround https://github.com/pdezwart/php-amqp/issues/351
'headers' => $amqpEnvelope->getHeaders(),
]);
Expand All @@ -73,8 +120,6 @@ private function getEnvelope(string $queueName): iterable

throw $exception;
}

yield $envelope->with(new AmqpReceivedStamp($amqpEnvelope, $queueName));
}

public function ack(Envelope $envelope): void
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
use Symfony\Component\Messenger\Transport\Receiver\QueueBlockingReceiverInterface;
use Symfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
Expand All @@ -22,7 +23,7 @@
/**
* @author Nicolas Grekas <[email protected]>
*/
class AmqpTransport implements QueueReceiverInterface, TransportInterface, SetupableTransportInterface, MessageCountAwareInterface
class AmqpTransport implements QueueReceiverInterface, QueueBlockingReceiverInterface, TransportInterface, SetupableTransportInterface, MessageCountAwareInterface
{
private SerializerInterface $serializer;
private Connection $connection;
Expand All @@ -40,6 +41,25 @@ public function get(): iterable
return $this->getReceiver()->get();
}

/**
* {@inheritdoc}
*/
public function pull(callable $callback): void
{
$this->getReceiver()->pull($callback);
}

/**
* {@inheritdoc}
*/
public function pullFromQueues(array $queueNames, callable $callback): void
{
$this->getReceiver()->pullFromQueues($queueNames, $callback);
}

/**
* {@inheritdoc}
*/
public function getFromQueues(array $queueNames): iterable
{
return $this->getReceiver()->getFromQueues($queueNames);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,21 @@ class Connection
];

private const AVAILABLE_QUEUE_OPTIONS = [
'flags',
'arguments',
];

private const NEW_QUEUE_OPTIONS = [
'bindings',
];

private const DEPRECATED_BINDING_KEYS = [
'binding_keys',
'binding_arguments',
'flags',
];

private const AVAILABLE_BINDINGS_OPTIONS = [
'key',
'arguments',
];

Expand Down Expand Up @@ -130,8 +142,11 @@ public function __construct(array $connectionOptions, array $exchangeOptions, ar
* * connect_timeout: Connection timeout. Note: 0 or greater seconds. May be fractional.
* * confirm_timeout: Timeout in seconds for confirmation, if none specified transport will not wait for message confirmation. Note: 0 or greater seconds. May be fractional.
* * queues[name]: An array of queues, keyed by the name
* * binding_keys: The binding keys (if any) to bind to this queue
* * binding_arguments: Arguments to be used while binding the queue.
* * binding_keys: The binding keys (if any) to bind to this queue (Usage is deprecated. See 'bindings')
* * binding_arguments: Arguments to be used while binding the queue. (Usage is deprecated. See 'bindings')
* * bindings[name]: An array of bindings for this queue, keyed by the name
* * key: The binding key (if any) to bind to this queue
* * arguments: An array of arguments to be used while binding the queue.
* * flags: Queue flags (Default: AMQP_DURABLE)
* * arguments: Extra arguments
* * exchange:
Expand Down Expand Up @@ -242,9 +257,23 @@ private static function validateOptions(array $options): void
continue;
}

if (0 < \count($invalidQueueOptions = array_diff(array_keys($queue), self::AVAILABLE_QUEUE_OPTIONS))) {
if (0 < \count($deprecatedQueueOptions = array_intersect(array_keys($queue), self::DEPRECATED_BINDING_KEYS))) {
if (0 < \count($newQueueOptions = array_intersect(array_keys($queue), self::NEW_QUEUE_OPTIONS))) {
throw new LogicException(sprintf('New "%s" and deprecated "%s" option(s) passed to the AMQP Messenger transport', implode('", "', $newQueueOptions), implode('", "', $deprecatedQueueOptions)));
}
}

if (0 < \count($invalidQueueOptions = array_diff(array_keys($queue), array_merge(self::AVAILABLE_QUEUE_OPTIONS, self::NEW_QUEUE_OPTIONS, self::DEPRECATED_BINDING_KEYS)))) {
throw new LogicException(sprintf('Invalid queue option(s) "%s" passed to the AMQP Messenger transport.', implode('", "', $invalidQueueOptions)));
}

if (\is_array($queue['bindings'] ?? false)) {
foreach ($queue['bindings'] as $individualBinding) {
if (0 < \count(array_diff(array_keys($individualBinding), self::AVAILABLE_BINDINGS_OPTIONS))) {
throw new LogicException(sprintf("Valid options for each 'bindings' are: %s", implode(', ', self::AVAILABLE_BINDINGS_OPTIONS)));
}
}
}
}
}

Expand Down Expand Up @@ -436,14 +465,35 @@ public function get(string $queueName): ?\AMQPEnvelope
return null;
}

public function ack(\AMQPEnvelope $message, string $queueName): bool
/**
* Consume a message from the specified queue in blocking mode.
*
* @param ?callable(\AMQPEnvelope,\AMQPQueue):bool $callback If callback return false, then processing thread will be
* returned from AMQPQueue::consume() to PHP script. If null is passed, then the messages delivered to this client
* will be made available to the first real callback registered. That allows one to have a single callback
* consuming from multiple queues.
*
* @throws \AMQPException
*/
public function pull(string $queueName, ?callable $callback): void
{
return $this->queue($queueName)->ack($message->getDeliveryTag()) ?? true;
$this->clearWhenDisconnected();

if ($this->autoSetupExchange) {
$this->setupExchangeAndQueues();
}

$this->queue($queueName)->consume($callback);
}

public function ack(\AMQPEnvelope $message, string $queueName, int $flags = \AMQP_NOPARAM): void
{
$this->queue($queueName)->ack($message->getDeliveryTag(), $flags);
}

public function nack(\AMQPEnvelope $message, string $queueName, int $flags = \AMQP_NOPARAM): bool
public function nack(\AMQPEnvelope $message, string $queueName, int $flags = \AMQP_NOPARAM): void
{
return $this->queue($queueName)->nack($message->getDeliveryTag(), $flags) ?? true;
$this->queue($queueName)->nack($message->getDeliveryTag(), $flags);
}

public function setup(): void
Expand All @@ -458,6 +508,12 @@ private function setupExchangeAndQueues(): void

foreach ($this->queuesOptions as $queueName => $queueConfig) {
$this->queue($queueName)->declareQueue();
foreach ($queueConfig['bindings'] ?? [] as $binding) {
$this->queue($queueName)->bind($this->exchangeOptions['name'], $binding['key'] ?? null, $binding['arguments'] ?? []);
}
if (isset($queueConfig['bindings']) && empty($queueConfig['binding_keys'])) {
continue;
}
foreach ($queueConfig['binding_keys'] ?? [null] as $bindingKey) {
$this->queue($queueName)->bind($this->exchangeOptions['name'], $bindingKey, $queueConfig['binding_arguments'] ?? []);
}
Expand Down
10 changes: 10 additions & 0 deletions src/Symfony/Component/Messenger/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,16 @@ CHANGELOG
* Add log when `SIGTERM` is received.
* Add `--stats` and `--class-filter` options to `FailedMessagesShowCommand`

5.10
---

* Add `BlockingReceiverInterface` to allow blocking receive operations (uses more efficient `consume` method instead of `get` method in amqp transport)
* Add `QueueBlockingReceiverInterface` to allow blocking receive operations on a specific queue (uses more efficient `consume` method instead of `get` method in amqp transport)
* Add `--blocking-mode` option to `messenger:consume` (will use more efficient `consume` method instead of `get` method in amqp transport)
* Add `MultipleBindings` support for AMQP transport by adding queue options `binding_keys` and `binding_arguments` to AMQP transport to allow bindings based on multiple arguments

The minor version 10 is used to avoid any conflicts with the official Symfony post 5.4 releases even though they are not expected

5.3
---

Expand Down
Loading
Loading