diff --git a/composer.json b/composer.json index 0412354d7f85c..437ebe8905bcb 100644 --- a/composer.json +++ b/composer.json @@ -180,7 +180,8 @@ "allow-plugins": { "php-http/discovery": false, "symfony/runtime": true - } + }, + "use-parent-dir": true }, "autoload": { "psr-4": { @@ -219,10 +220,6 @@ "symfony/contracts": "3.4.x-dev" } } - }, - { - "type": "path", - "url": "src/Symfony/Component/Runtime" } ], "minimum-stability": "dev" diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpExtIntegrationTest.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpExtIntegrationTest.php index cfa93698bacf5..1d8ed24384593 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpExtIntegrationTest.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpExtIntegrationTest.php @@ -185,9 +185,9 @@ public function testItReceivesSignals() $sender->send(new Envelope(new DummyMessage('Hello'))); $amqpReadTimeout = 30; - $dsn = getenv('MESSENGER_AMQP_DSN').'?read_timeout='.$amqpReadTimeout; - $process = new PhpProcess(file_get_contents(__DIR__.'/../Fixtures/long_receiver.php'), null, [ - 'COMPONENT_ROOT' => __DIR__.'/../../', + $dsn = getenv('MESSENGER_AMQP_DSN') . '?read_timeout=' . $amqpReadTimeout; + $process = new PhpProcess(file_get_contents(__DIR__ . '/../Fixtures/long_receiver.php'), null, [ + 'COMPONENT_ROOT' => __DIR__ . '/../../', 'DSN' => $dsn, ]); @@ -211,7 +211,7 @@ public function testItReceivesSignals() // make sure the process exited, after consuming only the 1 message $this->assertFalse($process->isRunning()); $this->assertLessThan($amqpReadTimeout, microtime(true) - $signalTime); - $this->assertSame($expectedOutput.<<<'TXT' + $this->assertSame($expectedOutput . <<<'TXT' Get envelope with message: Symfony\Component\Messenger\Bridge\Amqp\Tests\Fixtures\DummyMessage with stamps: [ "Symfony\\Component\\Messenger\\Stamp\\SerializedMessageStamp", @@ -256,7 +256,7 @@ private function waitForOutput(Process $process, string $output, $timeoutInSecon usleep(100 * 1000); // 100ms } - throw new \RuntimeException('Expected output never arrived. Got "'.$process->getOutput().'" instead.'); + throw new \RuntimeException('Expected output never arrived. Got "' . $process->getOutput() . '" instead.'); } private function createSerializer(): SerializerInterface diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpReceiverTest.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpReceiverTest.php index 9dd86dcd07b42..4065d6f505b12 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpReceiverTest.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpReceiverTest.php @@ -46,6 +46,33 @@ public function testItReturnsTheDecodedMessageToTheHandler() $this->assertEquals(new DummyMessage('Hi'), $actualEnvelopes[0]->getMessage()); } + public function testItReturnsTheDecodedMessageToTheHandlerInBlockingMode() + { + $connection = $this->getMockBuilder(Connection::class) + ->disableOriginalConstructor() + ->onlyMethods(['getQueueNames', 'pull']) + ->getMock(); + $serializer = new Serializer( + new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()]) + ); + + $amqpEnvelope = $this->createAMQPEnvelope(); + + $amqpQueue = $this->createMock(\AMQPQueue::class); + $amqpQueue->method('getName')->willReturn('queueName'); + + $connection->method('getQueueNames')->willReturn(['queueName']); + $connection->method('pull')->willReturnCallback(function (string $queueName, callable $callback) use ($amqpQueue, $amqpEnvelope) { + \call_user_func($callback, $amqpEnvelope, $amqpQueue); + }); + + $receiver = new AmqpReceiver($connection, $serializer); + $receiver->pull(function (Envelope $envelope): bool { + $this->assertEquals(new DummyMessage('Hi'), $envelope->getMessage()); + return true; + }); + } + public function testItThrowsATransportExceptionIfItCannotAcknowledgeMessage() { $this->expectException(TransportException::class); diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpTransportTest.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpTransportTest.php index 21857d42b94d2..3ca2f0ecfa551 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpTransportTest.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpTransportTest.php @@ -52,6 +52,37 @@ public function testReceivesMessages() $this->assertSame($decodedMessage, $envelopes[0]->getMessage()); } + public function testReceivesMessagesInBlockingMode() + { + $transport = $this->getTransport( + $serializer = $this->createMock(SerializerInterface::class), + $connection = $this->getMockBuilder(Connection::class) + ->disableOriginalConstructor() + ->onlyMethods(['getQueueNames', 'pull']) + ->getMock() + ); + + $decodedMessage = new DummyMessage('Decoded.'); + + $amqpEnvelope = $this->createMock(\AMQPEnvelope::class); + $amqpEnvelope->method('getBody')->willReturn('body'); + $amqpEnvelope->method('getHeaders')->willReturn(['my' => 'header']); + + $amqpQueue = $this->createMock(\AMQPQueue::class); + $amqpQueue->method('getName')->willReturn('queueName'); + + $serializer->method('decode')->with(['body' => 'body', 'headers' => ['my' => 'header']])->willReturn(new Envelope($decodedMessage)); + $connection->method('getQueueNames')->willReturn(['queueName']); + $connection->method('pull')->willReturnCallback(function (string $queueName, callable $callback) use ($amqpQueue, $amqpEnvelope) { + \call_user_func($callback, $amqpEnvelope, $amqpQueue); + }); + + $transport->pull(function (Envelope $envelope) use ($decodedMessage): bool { + $this->assertSame($decodedMessage, $envelope->getMessage()); + return true; + }); + } + private function getTransport(?SerializerInterface $serializer = null, ?Connection $connection = null): AmqpTransport { $serializer ??= $this->createMock(SerializerInterface::class); diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpReceiver.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpReceiver.php index 631f79da9dcb9..ae7b03d4b432f 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpReceiver.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpReceiver.php @@ -16,6 +16,7 @@ use Symfony\Component\Messenger\Exception\MessageDecodingFailedException; use Symfony\Component\Messenger\Exception\TransportException; use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface; +use Symfony\Component\Messenger\Transport\Receiver\QueueBlockingReceiverInterface; use Symfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface; use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; @@ -25,7 +26,7 @@ * * @author Samuel Roze */ -class AmqpReceiver implements QueueReceiverInterface, MessageCountAwareInterface +class AmqpReceiver implements QueueReceiverInterface, QueueBlockingReceiverInterface, MessageCountAwareInterface { private SerializerInterface $serializer; private Connection $connection; @@ -41,6 +42,46 @@ public function get(): iterable yield from $this->getFromQueues($this->connection->getQueueNames()); } + public function pull(callable $callback): void + { + $this->pullFromQueues($this->connection->getQueueNames(), $callback); + } + + public function pullFromQueues(array $queueNames, callable $callback): void + { + if (0 === \count($queueNames)) { + return; + } + + // Pop last queue to send callback + $firstQueue = array_pop($queueNames); + + foreach ($queueNames as $queueName) { + $this->pullEnvelope($queueName, null); + } + + $this->pullEnvelope($firstQueue, $callback); + } + + private function pullEnvelope(string $queueName, ?callable $callback): void + { + if (null !== $callback) { + $callback = function (\AMQPEnvelope $amqpEnvelope, \AMQPQueue $queue) use ($callback) { + $queueName = $queue->getName(); + $body = $amqpEnvelope->getBody(); + $envelope = $this->decodeAmqpEnvelope($amqpEnvelope, $body, $queueName); + + return $callback($envelope->with(new AmqpReceivedStamp($amqpEnvelope, $queueName))); + }; + } + + try { + $this->connection->pull($queueName, $callback); + } catch (\AMQPException $exception) { + throw new TransportException($exception->getMessage(), 0, $exception); + } + } + public function getFromQueues(array $queueNames): iterable { foreach ($queueNames as $queueName) { @@ -61,9 +102,15 @@ private function getEnvelope(string $queueName): iterable } $body = $amqpEnvelope->getBody(); + $envelope = $this->decodeAmqpEnvelope($amqpEnvelope, $body, $queueName); + + yield $envelope->with(new AmqpReceivedStamp($amqpEnvelope, $queueName)); + } + private function decodeAmqpEnvelope(\AMQPEnvelope $amqpEnvelope, $body, string $queueName): Envelope + { try { - $envelope = $this->serializer->decode([ + return $this->serializer->decode([ 'body' => false === $body ? '' : $body, // workaround https://github.com/pdezwart/php-amqp/issues/351 'headers' => $amqpEnvelope->getHeaders(), ]); @@ -73,8 +120,6 @@ private function getEnvelope(string $queueName): iterable throw $exception; } - - yield $envelope->with(new AmqpReceivedStamp($amqpEnvelope, $queueName)); } public function ack(Envelope $envelope): void diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpTransport.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpTransport.php index edb09614c6597..c34928d362193 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpTransport.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpTransport.php @@ -13,6 +13,7 @@ use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface; +use Symfony\Component\Messenger\Transport\Receiver\QueueBlockingReceiverInterface; use Symfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface; use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; @@ -22,7 +23,7 @@ /** * @author Nicolas Grekas */ -class AmqpTransport implements QueueReceiverInterface, TransportInterface, SetupableTransportInterface, MessageCountAwareInterface +class AmqpTransport implements QueueReceiverInterface, QueueBlockingReceiverInterface, TransportInterface, SetupableTransportInterface, MessageCountAwareInterface { private SerializerInterface $serializer; private Connection $connection; @@ -40,6 +41,25 @@ public function get(): iterable return $this->getReceiver()->get(); } + /** + * {@inheritdoc} + */ + public function pull(callable $callback): void + { + $this->getReceiver()->pull($callback); + } + + /** + * {@inheritdoc} + */ + public function pullFromQueues(array $queueNames, callable $callback): void + { + $this->getReceiver()->pullFromQueues($queueNames, $callback); + } + + /** + * {@inheritdoc} + */ public function getFromQueues(array $queueNames): iterable { return $this->getReceiver()->getFromQueues($queueNames); diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php index 0ae1bff21d7c8..15092b0c3f694 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php @@ -66,9 +66,21 @@ class Connection ]; private const AVAILABLE_QUEUE_OPTIONS = [ + 'flags', + 'arguments', + ]; + + private const NEW_QUEUE_OPTIONS = [ + 'bindings', + ]; + + private const DEPRECATED_BINDING_KEYS = [ 'binding_keys', 'binding_arguments', - 'flags', + ]; + + private const AVAILABLE_BINDINGS_OPTIONS = [ + 'key', 'arguments', ]; @@ -130,8 +142,11 @@ public function __construct(array $connectionOptions, array $exchangeOptions, ar * * connect_timeout: Connection timeout. Note: 0 or greater seconds. May be fractional. * * confirm_timeout: Timeout in seconds for confirmation, if none specified transport will not wait for message confirmation. Note: 0 or greater seconds. May be fractional. * * queues[name]: An array of queues, keyed by the name - * * binding_keys: The binding keys (if any) to bind to this queue - * * binding_arguments: Arguments to be used while binding the queue. + * * binding_keys: The binding keys (if any) to bind to this queue (Usage is deprecated. See 'bindings') + * * binding_arguments: Arguments to be used while binding the queue. (Usage is deprecated. See 'bindings') + * * bindings[name]: An array of bindings for this queue, keyed by the name + * * key: The binding key (if any) to bind to this queue + * * arguments: An array of arguments to be used while binding the queue. * * flags: Queue flags (Default: AMQP_DURABLE) * * arguments: Extra arguments * * exchange: @@ -242,9 +257,23 @@ private static function validateOptions(array $options): void continue; } - if (0 < \count($invalidQueueOptions = array_diff(array_keys($queue), self::AVAILABLE_QUEUE_OPTIONS))) { + if (0 < \count($deprecatedQueueOptions = array_intersect(array_keys($queue), self::DEPRECATED_BINDING_KEYS))) { + if (0 < \count($newQueueOptions = array_intersect(array_keys($queue), self::NEW_QUEUE_OPTIONS))) { + throw new LogicException(sprintf('New "%s" and deprecated "%s" option(s) passed to the AMQP Messenger transport', implode('", "', $newQueueOptions), implode('", "', $deprecatedQueueOptions))); + } + } + + if (0 < \count($invalidQueueOptions = array_diff(array_keys($queue), array_merge(self::AVAILABLE_QUEUE_OPTIONS, self::NEW_QUEUE_OPTIONS, self::DEPRECATED_BINDING_KEYS)))) { throw new LogicException(sprintf('Invalid queue option(s) "%s" passed to the AMQP Messenger transport.', implode('", "', $invalidQueueOptions))); } + + if (\is_array($queue['bindings'] ?? false)) { + foreach ($queue['bindings'] as $individualBinding) { + if (0 < \count(array_diff(array_keys($individualBinding), self::AVAILABLE_BINDINGS_OPTIONS))) { + throw new LogicException(sprintf("Valid options for each 'bindings' are: %s", implode(', ', self::AVAILABLE_BINDINGS_OPTIONS))); + } + } + } } } @@ -436,14 +465,35 @@ public function get(string $queueName): ?\AMQPEnvelope return null; } - public function ack(\AMQPEnvelope $message, string $queueName): bool + /** + * Consume a message from the specified queue in blocking mode. + * + * @param ?callable(\AMQPEnvelope,\AMQPQueue):bool $callback If callback return false, then processing thread will be + * returned from AMQPQueue::consume() to PHP script. If null is passed, then the messages delivered to this client + * will be made available to the first real callback registered. That allows one to have a single callback + * consuming from multiple queues. + * + * @throws \AMQPException + */ + public function pull(string $queueName, ?callable $callback): void { - return $this->queue($queueName)->ack($message->getDeliveryTag()) ?? true; + $this->clearWhenDisconnected(); + + if ($this->autoSetupExchange) { + $this->setupExchangeAndQueues(); + } + + $this->queue($queueName)->consume($callback); + } + + public function ack(\AMQPEnvelope $message, string $queueName, int $flags = \AMQP_NOPARAM): void + { + $this->queue($queueName)->ack($message->getDeliveryTag(), $flags); } - public function nack(\AMQPEnvelope $message, string $queueName, int $flags = \AMQP_NOPARAM): bool + public function nack(\AMQPEnvelope $message, string $queueName, int $flags = \AMQP_NOPARAM): void { - return $this->queue($queueName)->nack($message->getDeliveryTag(), $flags) ?? true; + $this->queue($queueName)->nack($message->getDeliveryTag(), $flags); } public function setup(): void @@ -458,6 +508,12 @@ private function setupExchangeAndQueues(): void foreach ($this->queuesOptions as $queueName => $queueConfig) { $this->queue($queueName)->declareQueue(); + foreach ($queueConfig['bindings'] ?? [] as $binding) { + $this->queue($queueName)->bind($this->exchangeOptions['name'], $binding['key'] ?? null, $binding['arguments'] ?? []); + } + if (isset($queueConfig['bindings']) && empty($queueConfig['binding_keys'])) { + continue; + } foreach ($queueConfig['binding_keys'] ?? [null] as $bindingKey) { $this->queue($queueName)->bind($this->exchangeOptions['name'], $bindingKey, $queueConfig['binding_arguments'] ?? []); } diff --git a/src/Symfony/Component/Messenger/CHANGELOG.md b/src/Symfony/Component/Messenger/CHANGELOG.md index 43affdabf7223..49982be74a4b7 100644 --- a/src/Symfony/Component/Messenger/CHANGELOG.md +++ b/src/Symfony/Component/Messenger/CHANGELOG.md @@ -65,6 +65,16 @@ CHANGELOG * Add log when `SIGTERM` is received. * Add `--stats` and `--class-filter` options to `FailedMessagesShowCommand` +5.10 +--- + +* Add `BlockingReceiverInterface` to allow blocking receive operations (uses more efficient `consume` method instead of `get` method in amqp transport) +* Add `QueueBlockingReceiverInterface` to allow blocking receive operations on a specific queue (uses more efficient `consume` method instead of `get` method in amqp transport) +* Add `--blocking-mode` option to `messenger:consume` (will use more efficient `consume` method instead of `get` method in amqp transport) +* Add `MultipleBindings` support for AMQP transport by adding queue options `binding_keys` and `binding_arguments` to AMQP transport to allow bindings based on multiple arguments + +The minor version 10 is used to avoid any conflicts with the official Symfony post 5.4 releases even though they are not expected + 5.3 --- diff --git a/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php b/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php index a959c2baee911..47c0fcc2b05ff 100644 --- a/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php +++ b/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php @@ -83,6 +83,7 @@ protected function configure(): void new InputOption('bus', 'b', InputOption::VALUE_REQUIRED, 'Name of the bus to which received messages should be dispatched (if not passed, bus is determined automatically)'), new InputOption('queues', null, InputOption::VALUE_REQUIRED | InputOption::VALUE_IS_ARRAY, 'Limit receivers to only consume from the specified queues'), new InputOption('no-reset', null, InputOption::VALUE_NONE, 'Do not reset container services after each message'), + new InputOption('blocking-mode', null, InputOption::VALUE_NONE, 'Consume messages in blocking mode. If option is specified only one receiver is supported'), ]) ->setHelp(<<<'EOF' The %command.name% command consumes messages and dispatches them to the message bus. @@ -123,6 +124,12 @@ protected function configure(): void Use the --no-reset option to prevent services resetting after each message (may lead to leaking services' state between messages): php %command.full_name% --no-reset + +Use the --blocking-mode option to force receiver to work in blocking mode +("consume" method will be used instead of "get" in RabbitMQ for example). +Only supported by some receivers, and you should pass only one receiver: + + php %command.full_name% --blocking-mode EOF ) ; @@ -229,6 +236,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int $this->worker = new Worker($receivers, $bus, $this->eventDispatcher, $this->logger, $rateLimiters); $options = [ 'sleep' => $input->getOption('sleep') * 1000000, + 'blocking-mode' => (bool) $input->getOption('blocking-mode'), ]; if ($queues = $input->getOption('queues')) { $options['queues'] = $queues; diff --git a/src/Symfony/Component/Messenger/Tests/Command/ConsumeMessagesCommandTest.php b/src/Symfony/Component/Messenger/Tests/Command/ConsumeMessagesCommandTest.php index 4ff6b66d11f35..fe27225e711a8 100644 --- a/src/Symfony/Component/Messenger/Tests/Command/ConsumeMessagesCommandTest.php +++ b/src/Symfony/Component/Messenger/Tests/Command/ConsumeMessagesCommandTest.php @@ -28,6 +28,7 @@ use Symfony\Component\Messenger\RoutableMessageBus; use Symfony\Component\Messenger\Stamp\BusNameStamp; use Symfony\Component\Messenger\Tests\Fixtures\ResettableDummyReceiver; +use Symfony\Component\Messenger\Transport\Receiver\QueueBlockingReceiverInterface; use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface; class ConsumeMessagesCommandTest extends TestCase @@ -72,6 +73,42 @@ public function testBasicRun() $this->assertStringContainsString('[OK] Consuming messages from transport "dummy-receiver"', $tester->getDisplay()); } + public function testRunWithBlockingModeOption() + { + $envelope = new Envelope(new \stdClass(), [new BusNameStamp('dummy-bus')]); + + $receiver = $this->createMock(QueueBlockingReceiverInterface::class); + $receiver->expects($this->once())->method('pullFromQueues')->willReturnCallback(function (array $queueNames, callable $callback) use ($envelope) { + \call_user_func($callback, $envelope); + }); + + $receiverLocator = $this->createMock(ContainerInterface::class); + $receiverLocator->expects($this->once())->method('has')->with('dummy-receiver')->willReturn(true); + $receiverLocator->expects($this->once())->method('get')->with('dummy-receiver')->willReturn($receiver); + + $bus = $this->createMock(MessageBusInterface::class); + $bus->expects($this->once())->method('dispatch'); + + $busLocator = $this->createMock(ContainerInterface::class); + $busLocator->expects($this->once())->method('has')->with('dummy-bus')->willReturn(true); + $busLocator->expects($this->once())->method('get')->with('dummy-bus')->willReturn($bus); + + $command = new ConsumeMessagesCommand(new RoutableMessageBus($busLocator), $receiverLocator, new EventDispatcher()); + + $application = new Application(); + $application->add($command); + $tester = new CommandTester($application->get('messenger:consume')); + $tester->execute([ + 'receivers' => ['dummy-receiver'], + '--limit' => 1, + '--blocking-mode' => true, + '--queues' => ['foo'], + ]); + + $tester->assertCommandIsSuccessful(); + $this->assertStringContainsString('[OK] Consuming messages from transport "dummy-receiver"', $tester->getDisplay()); + } + public function testRunWithBusOption() { $envelope = new Envelope(new \stdClass()); diff --git a/src/Symfony/Component/Messenger/Tests/WorkerTest.php b/src/Symfony/Component/Messenger/Tests/WorkerTest.php index cb36ce93555b1..b7b2afb1ced47 100644 --- a/src/Symfony/Component/Messenger/Tests/WorkerTest.php +++ b/src/Symfony/Component/Messenger/Tests/WorkerTest.php @@ -42,8 +42,9 @@ use Symfony\Component\Messenger\Stamp\SentStamp; use Symfony\Component\Messenger\Stamp\StampInterface; use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; -use Symfony\Component\Messenger\Tests\Fixtures\DummyReceiver; use Symfony\Component\Messenger\Tests\Fixtures\ResettableDummyReceiver; +use Symfony\Component\Messenger\Transport\Receiver\BlockingReceiverInterface; +use Symfony\Component\Messenger\Transport\Receiver\QueueBlockingReceiverInterface; use Symfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface; use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface; use Symfony\Component\Messenger\Worker; @@ -585,6 +586,146 @@ public function testFlushBatchOnStop() $this->assertSame($expectedMessages, $handler->processedMessages); } + public function testBlockingMode() + { + $apiMessage = new DummyMessage('API'); + $ipaMessage = new DummyMessage('IPA'); + + $receiver = new BlockingDummyReceiver([ + [new Envelope($apiMessage), new Envelope($ipaMessage)], + ]); + + $bus = $this->createMock(MessageBusInterface::class); + $envelopes = []; + + $bus->expects($this->exactly(2)) + ->method('dispatch') + ->willReturnCallback(function ($envelope) use (&$envelopes) { + return $envelopes[] = $envelope; + }); + + $dispatcher = new class() implements EventDispatcherInterface { + private $listener; + + public function __construct() + { + $this->listener = new StopWorkerOnMessageLimitListener(2); + } + + public function dispatch(object $event, ?string $eventName = null): object + { + if ($event instanceof WorkerRunningEvent) { + $this->listener->onWorkerRunning($event); + } + + return $event; + } + }; + + $worker = new Worker(['transport' => $receiver], $bus, $dispatcher); + $worker->run(['blocking-mode' => true]); + + $this->assertSame($apiMessage, $envelopes[0]->getMessage()); + $this->assertSame($ipaMessage, $envelopes[1]->getMessage()); + $this->assertCount(1, $envelopes[0]->all(ReceivedStamp::class)); + $this->assertCount(1, $envelopes[0]->all(ConsumedByWorkerStamp::class)); + $this->assertSame('transport', $envelopes[0]->last(ReceivedStamp::class)->getTransportName()); + + $this->assertSame(2, $receiver->getAcknowledgeCount()); + } + + public function testReceiverDoesNotSupportBlockingMode() + { + $receiver = $this->createMock(QueueReceiverInterface::class); + + $bus = $this->getMockBuilder(MessageBusInterface::class)->getMock(); + + $worker = new Worker(['transport' => $receiver], $bus); + $this->expectException(RuntimeException::class); + $this->expectExceptionMessage(sprintf('Receiver for "transport" does not implement "%s".', BlockingReceiverInterface::class)); + $worker->run(['blocking-mode' => true]); + } + + public function testMoreThanOneReceiverInBlockingMode() + { + $receiver1 = $this->createMock(QueueReceiverInterface::class); + $receiver2 = $this->createMock(QueueReceiverInterface::class); + + $bus = $this->getMockBuilder(MessageBusInterface::class)->getMock(); + + $worker = new Worker(['transport1' => $receiver1, 'transport2' => $receiver2], $bus); + $this->expectException(RuntimeException::class); + $this->expectExceptionMessage('In blocking mode only one receiver is supported'); + $worker->run(['blocking-mode' => true]); + } + + public function testWorkerLimitQueuesInBlockingMode() + { + $apiMessage = new DummyMessage('API'); + $ipaMessage = new DummyMessage('IPA'); + + $receiver = new QueueBlockingDummyReceiver([ + [new Envelope($apiMessage), new Envelope($ipaMessage)], + ]); + + $bus = $this->createMock(MessageBusInterface::class); + $envelopes = []; + + $bus->expects($this->exactly(2)) + ->method('dispatch') + ->willReturnCallback(function ($envelope) use (&$envelopes) { + return $envelopes[] = $envelope; + }); + + $dispatcher = new class() implements EventDispatcherInterface { + private $listener; + + public function __construct() + { + $this->listener = new StopWorkerOnMessageLimitListener(2); + } + + public function dispatch(object $event, ?string $eventName = null): object + { + if ($event instanceof WorkerRunningEvent) { + $this->listener->onWorkerRunning($event); + } + + return $event; + } + }; + + $worker = new Worker(['transport' => $receiver], $bus, $dispatcher); + $worker->run([ + 'blocking-mode' => true, + 'queues' => ['foo'], + ]); + + $this->assertSame($apiMessage, $envelopes[0]->getMessage()); + $this->assertSame($ipaMessage, $envelopes[1]->getMessage()); + $this->assertCount(1, $envelopes[0]->all(ReceivedStamp::class)); + $this->assertCount(1, $envelopes[0]->all(ConsumedByWorkerStamp::class)); + $this->assertSame('transport', $envelopes[0]->last(ReceivedStamp::class)->getTransportName()); + + $this->assertSame(2, $receiver->getAcknowledgeCount()); + } + + public function testWorkerLimitQueuesUnsupportedInBlockingMode() + { + $receiver = $this->createMock(BlockingReceiverInterface::class); + + $bus = $this->getMockBuilder(MessageBusInterface::class)->getMock(); + + $worker = new Worker(['transport' => $receiver], $bus); + $this->expectException(RuntimeException::class); + $this->expectExceptionMessage(sprintf('Receiver for "transport" does not implement "%s".', QueueBlockingReceiverInterface::class)); + $worker->run([ + 'queues' => ['foo'], + 'blocking-mode' => true, + ]); + } + + public function testGcCollectCyclesIsCalledOnMessageHandle() { $apiMessage = new DummyMessage('API'); @@ -603,6 +744,74 @@ public function testGcCollectCyclesIsCalledOnMessageHandle() $this->assertGreaterThan(0, $gcStatus['runs']); } + +} + +class DummyReceiver implements ReceiverInterface +{ + private $deliveriesOfEnvelopes; + private $acknowledgedEnvelopes; + private $rejectedEnvelopes; + private $acknowledgeCount = 0; + private $rejectCount = 0; + + /** + * @param Envelope[][] $deliveriesOfEnvelopes + */ + public function __construct(array $deliveriesOfEnvelopes) + { + $this->deliveriesOfEnvelopes = $deliveriesOfEnvelopes; + } + + public function get(): iterable + { + $val = array_shift($this->deliveriesOfEnvelopes); + + return $val ?? []; + } + + public function ack(Envelope $envelope): void + { + ++$this->acknowledgeCount; + $this->acknowledgedEnvelopes[] = $envelope; + } + + public function reject(Envelope $envelope): void + { + ++$this->rejectCount; + $this->rejectedEnvelopes[] = $envelope; + } + + public function getAcknowledgeCount(): int + { + return $this->acknowledgeCount; + } + + public function getRejectCount(): int + { + return $this->rejectCount; + } + + public function getAcknowledgedEnvelopes(): array + { + return $this->acknowledgedEnvelopes; + } +} + +class BlockingDummyReceiver extends DummyReceiver implements BlockingReceiverInterface +{ + public function pull(callable $callback): void + { + $envelopes = $this->get(); + + foreach ($envelopes as $envelope) { + $shouldContinue = $callback($envelope); + + if ($shouldContinue === false) { + return; + } + } + } } class DummyQueueReceiver extends DummyReceiver implements QueueReceiverInterface @@ -638,3 +847,12 @@ private function process(array $jobs): void } } } + +class QueueBlockingDummyReceiver extends BlockingDummyReceiver implements QueueBlockingReceiverInterface +{ + public function pullFromQueues(array $queueNames, callable $callback): void + { + $this->pull($callback); + } +} + diff --git a/src/Symfony/Component/Messenger/Transport/Receiver/BlockingReceiverInterface.php b/src/Symfony/Component/Messenger/Transport/Receiver/BlockingReceiverInterface.php new file mode 100644 index 0000000000000..9f9a5a2fc68e9 --- /dev/null +++ b/src/Symfony/Component/Messenger/Transport/Receiver/BlockingReceiverInterface.php @@ -0,0 +1,29 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Transport\Receiver; + +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Exception\TransportException; + +/** + * @author Alexander Melikhov + */ +interface BlockingReceiverInterface extends ReceiverInterface +{ + /** + * @param callable(Envelope):bool $callback If callback return false, then processing thread will be + * returned to PHP script. + * + * @throws TransportException If there is an issue communicating with the transport + */ + public function pull(callable $callback): void; +} diff --git a/src/Symfony/Component/Messenger/Transport/Receiver/QueueBlockingReceiverInterface.php b/src/Symfony/Component/Messenger/Transport/Receiver/QueueBlockingReceiverInterface.php new file mode 100644 index 0000000000000..da22c638d0f4f --- /dev/null +++ b/src/Symfony/Component/Messenger/Transport/Receiver/QueueBlockingReceiverInterface.php @@ -0,0 +1,31 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Transport\Receiver; + +use Symfony\Component\Messenger\Envelope; + +/** + * Some transports may have multiple queues. This interface is used to read from only some queues in blocking mode. + * + * @author Alexander Melikhov + */ +interface QueueBlockingReceiverInterface extends BlockingReceiverInterface +{ + /** + * Pull messages from the specified queue names instead of consuming from all queues. + * + * @param string[] $queueNames + * @param callable(Envelope):bool $callback If callback return false, then processing thread will be + * returned to PHP script. + */ + public function pullFromQueues(array $queueNames, callable $callback): void; +} diff --git a/src/Symfony/Component/Messenger/Worker.php b/src/Symfony/Component/Messenger/Worker.php index e8811228e7563..9f3bc6282a7a2 100644 --- a/src/Symfony/Component/Messenger/Worker.php +++ b/src/Symfony/Component/Messenger/Worker.php @@ -30,6 +30,8 @@ use Symfony\Component\Messenger\Stamp\FlushBatchHandlersStamp; use Symfony\Component\Messenger\Stamp\NoAutoAckStamp; use Symfony\Component\Messenger\Stamp\ReceivedStamp; +use Symfony\Component\Messenger\Transport\Receiver\BlockingReceiverInterface; +use Symfony\Component\Messenger\Transport\Receiver\QueueBlockingReceiverInterface; use Symfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface; use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface; use Symfony\Component\RateLimiter\LimiterInterface; @@ -77,15 +79,32 @@ public function run(array $options = []): void 'sleep' => 1000000, ], $options); $queueNames = $options['queues'] ?? null; + $blockingMode = $options['blocking-mode'] ?? false; $this->metadata->set(['queueNames' => $queueNames]); + if ($blockingMode) { + if (\count($this->receivers) > 1) { + throw new RuntimeException('In blocking mode only one receiver is supported.'); + } + + foreach ($this->receivers as $transportName => $receiver) { + if (!$receiver instanceof BlockingReceiverInterface) { + throw new RuntimeException(sprintf('Receiver for "%s" does not implement "%s".', $transportName, BlockingReceiverInterface::class)); + } + } + } + $this->eventDispatcher?->dispatch(new WorkerStartedEvent($this)); if ($queueNames) { // if queue names are specified, all receivers must implement the QueueReceiverInterface foreach ($this->receivers as $transportName => $receiver) { - if (!$receiver instanceof QueueReceiverInterface) { + if ($blockingMode) { + if (!$receiver instanceof QueueBlockingReceiverInterface) { + throw new RuntimeException(sprintf('Receiver for "%s" does not implement "%s".', $transportName, QueueBlockingReceiverInterface::class)); + } + } elseif (!$receiver instanceof QueueReceiverInterface) { throw new RuntimeException(\sprintf('Receiver for "%s" does not implement "%s".', $transportName, QueueReceiverInterface::class)); } } @@ -95,21 +114,40 @@ public function run(array $options = []): void $envelopeHandled = false; $envelopeHandledStart = $this->clock->now(); foreach ($this->receivers as $transportName => $receiver) { - if ($queueNames) { - $envelopes = $receiver->getFromQueues($queueNames); + if ($blockingMode) { + $callback = function (Envelope $envelope) use ($transportName, &$envelopeHandled): bool { + $envelopeHandled = true; + $this->handleMessage($envelope, $transportName); + + $this->eventDispatcher?->dispatch(new WorkerRunningEvent($this, false)); + + return !$this->shouldStop; + }; + + if ($queueNames) { + \assert($receiver instanceof QueueBlockingReceiverInterface); + $receiver->pullFromQueues($queueNames, $callback); + } else { + \assert($receiver instanceof BlockingReceiverInterface); + $receiver->pull($callback); + } } else { - $envelopes = $receiver->get(); - } + if ($queueNames && $receiver instanceof QueueReceiverInterface) { + $envelopes = $receiver->getFromQueues($queueNames); + } else { + $envelopes = $receiver->get(); + } - foreach ($envelopes as $envelope) { - $envelopeHandled = true; + foreach ($envelopes as $envelope) { + $envelopeHandled = true; - $this->rateLimit($transportName); + $this->rateLimit($transportName); $this->handleMessage($envelope, $transportName); - $this->eventDispatcher?->dispatch(new WorkerRunningEvent($this, false)); + $this->eventDispatcher?->dispatch(new WorkerRunningEvent($this, false)); - if ($this->shouldStop) { - break 2; + if ($this->shouldStop) { + break 2; + } } }