From 52e3a5dc5447a91aa86915475cae10f2ab92d67b Mon Sep 17 00:00:00 2001 From: "hubert.lenoir" Date: Mon, 4 Nov 2024 17:51:49 +0100 Subject: [PATCH] [Messenger][RateLimiter] fix additional message handled when using a rate limiter --- Tests/WorkerTest.php | 26 +++++++++++++++----------- Worker.php | 3 ++- 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/Tests/WorkerTest.php b/Tests/WorkerTest.php index 55c28357..cb36ce93 100644 --- a/Tests/WorkerTest.php +++ b/Tests/WorkerTest.php @@ -14,6 +14,7 @@ use PHPUnit\Framework\TestCase; use Psr\EventDispatcher\EventDispatcherInterface; use Psr\Log\LoggerInterface; +use Symfony\Bridge\PhpUnit\ClockMock; use Symfony\Component\Clock\MockClock; use Symfony\Component\EventDispatcher\EventDispatcher; use Symfony\Component\HttpKernel\DependencyInjection\ServicesResetter; @@ -47,8 +48,8 @@ use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface; use Symfony\Component\Messenger\Worker; use Symfony\Component\RateLimiter\RateLimiterFactory; +use Symfony\Component\RateLimiter\Reservation; use Symfony\Component\RateLimiter\Storage\InMemoryStorage; -use Symfony\Contracts\Service\ResetInterface; /** * @group time-sensitive @@ -73,7 +74,7 @@ public function testWorkerDispatchTheReceivedMessage() return $envelopes[] = $envelope; }); - $dispatcher = new class() implements EventDispatcherInterface { + $dispatcher = new class implements EventDispatcherInterface { private StopWorkerOnMessageLimitListener $listener; public function __construct() @@ -403,7 +404,7 @@ public function testWorkerLimitQueuesUnsupported() $worker = new Worker(['transport1' => $receiver1, 'transport2' => $receiver2], $bus, clock: new MockClock()); $this->expectException(RuntimeException::class); - $this->expectExceptionMessage(sprintf('Receiver for "transport2" does not implement "%s".', QueueReceiverInterface::class)); + $this->expectExceptionMessage(\sprintf('Receiver for "transport2" does not implement "%s".', QueueReceiverInterface::class)); $worker->run(['queues' => ['foo']]); } @@ -418,7 +419,7 @@ public function testWorkerMessageReceivedEventMutability() $eventDispatcher = new EventDispatcher(); $eventDispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(1)); - $stamp = new class() implements StampInterface { + $stamp = new class implements StampInterface { }; $listener = function (WorkerMessageReceivedEvent $event) use ($stamp) { $event->addStamps($stamp); @@ -438,6 +439,8 @@ public function testWorkerRateLimitMessages() $envelope = [ new Envelope(new DummyMessage('message1')), new Envelope(new DummyMessage('message2')), + new Envelope(new DummyMessage('message3')), + new Envelope(new DummyMessage('message4')), ]; $receiver = new DummyReceiver([$envelope]); @@ -445,14 +448,12 @@ public function testWorkerRateLimitMessages() $bus->method('dispatch')->willReturnArgument(0); $eventDispatcher = new EventDispatcher(); - $eventDispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(2)); + $eventDispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(4)); $rateLimitCount = 0; - $listener = function (WorkerRateLimitedEvent $event) use (&$rateLimitCount) { + $eventDispatcher->addListener(WorkerRateLimitedEvent::class, static function () use (&$rateLimitCount) { ++$rateLimitCount; - $event->getLimiter()->reset(); // Reset limiter to continue test - }; - $eventDispatcher->addListener(WorkerRateLimitedEvent::class, $listener); + }); $rateLimitFactory = new RateLimiterFactory([ 'id' => 'bus', @@ -461,11 +462,14 @@ public function testWorkerRateLimitMessages() 'interval' => '1 minute', ], new InMemoryStorage()); + ClockMock::register(Reservation::class); + ClockMock::register(InMemoryStorage::class); + $worker = new Worker(['bus' => $receiver], $bus, $eventDispatcher, null, ['bus' => $rateLimitFactory], new MockClock()); $worker->run(); - $this->assertCount(2, $receiver->getAcknowledgedEnvelopes()); - $this->assertEquals(1, $rateLimitCount); + $this->assertSame(4, $receiver->getAcknowledgeCount()); + $this->assertSame(3, $rateLimitCount); } public function testWorkerShouldLogOnStop() diff --git a/Worker.php b/Worker.php index 3d69dd61..e8811228 100644 --- a/Worker.php +++ b/Worker.php @@ -86,7 +86,7 @@ public function run(array $options = []): void // if queue names are specified, all receivers must implement the QueueReceiverInterface foreach ($this->receivers as $transportName => $receiver) { if (!$receiver instanceof QueueReceiverInterface) { - throw new RuntimeException(sprintf('Receiver for "%s" does not implement "%s".', $transportName, QueueReceiverInterface::class)); + throw new RuntimeException(\sprintf('Receiver for "%s" does not implement "%s".', $transportName, QueueReceiverInterface::class)); } } } @@ -242,6 +242,7 @@ private function rateLimit(string $transportName): void $this->eventDispatcher?->dispatch(new WorkerRateLimitedEvent($rateLimiter, $transportName)); $rateLimiter->reserve()->wait(); + $rateLimiter->consume(); } private function flush(bool $force): bool