-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathBunnyConsume.php
69 lines (59 loc) · 2.02 KB
/
BunnyConsume.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
<?php
declare(strict_types=1);
namespace Telephantast\BunnyTransport;
use Bunny\Message;
use Telephantast\MessageBus\Async\Consumer;
use Telephantast\MessageBus\Async\ObjectDenormalizer;
use Telephantast\MessageBus\Async\TransportConsume;
use function React\Async\await;
/**
* @api
*/
final class BunnyConsume implements TransportConsume
{
private const DEFAULT_PREFETCH_COUNT = 50;
private BunnyMessageDecoder $messageDecoder;
/**
* @var \WeakMap<Consumer, \Closure(): void>
*/
private \WeakMap $consumerToCancel;
public function __construct(
private readonly BunnyConnectionPool $connectionPool,
ObjectDenormalizer $objectDenormalizer,
private readonly int $prefetchCount = self::DEFAULT_PREFETCH_COUNT,
) {
$this->messageDecoder = new BunnyMessageDecoder($objectDenormalizer);
/** @var \WeakMap<Consumer, \Closure(): void> */
$this->consumerToCancel = new \WeakMap();
}
/**
* @psalm-suppress MissingThrowsDocblock
*/
public function runConsumer(Consumer $consumer): void
{
$channel = await($this->connectionPool->get()->channel());
await($channel->qos(prefetchCount: $this->prefetchCount));
$consumerTag = await($channel->consume(
callback: function (Message $message) use ($channel, $consumer): void {
$consumer->handle($this->messageDecoder->decode($message));
await($channel->ack($message));
},
queue: $consumer->queue,
))->consumerTag;
$this->consumerToCancel[$consumer] = static function () use ($channel, $consumerTag): void {
await($channel->cancel($consumerTag));
await($channel->close());
};
}
/**
* @throws \Throwable
*/
public function stopConsumer(Consumer $consumer): void
{
$cancel = $this->consumerToCancel[$consumer] ?? null;
if ($cancel !== null) {
$cancel();
unset($this->consumerToCancel[$consumer]);
}
}
}