-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathBunnyMessageDecoder.php
58 lines (47 loc) · 1.76 KB
/
BunnyMessageDecoder.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
<?php
declare(strict_types=1);
namespace Telephantast\BunnyTransport;
use Bunny\Message as BunnyMessage;
use Telephantast\MessageBus\Async\Delay;
use Telephantast\MessageBus\Async\ObjectDenormalizer;
use Telephantast\MessageBus\CreatedAt\CreatedAt;
use Telephantast\MessageBus\Envelope;
use Telephantast\MessageBus\MessageId\CorrelationId;
use Telephantast\MessageBus\MessageId\MessageId;
/**
* @internal
* @psalm-internal Telephantast\BunnyTransport
* @psalm-import-type BunnyHeaders from BunnyMessageEncoder
*/
final readonly class BunnyMessageDecoder
{
public function __construct(
private ObjectDenormalizer $objectDenormalizer,
) {}
public function decode(BunnyMessage $bunnyMessage): Envelope
{
/** @var BunnyHeaders $headers */
$headers = $bunnyMessage->headers;
$message = $this->objectDenormalizer->denormalize(
json_decode($bunnyMessage->content, associative: true, flags: JSON_THROW_ON_ERROR),
$headers['type'],
);
$stamps = [];
foreach ($headers['stamps'] ?? [] as $stampClass => $stampData) {
$stamps[] = $this->objectDenormalizer->denormalize($stampData, $stampClass);
}
if (isset($headers['message-id'])) {
$stamps[] = new MessageId($headers['message-id']);
}
if (isset($headers['correlation-id'])) {
$stamps[] = new CorrelationId($headers['correlation-id']);
}
if (isset($headers['timestamp'])) {
$stamps[] = new CreatedAt(\DateTimeImmutable::createFromInterface($headers['timestamp']));
}
if (isset($headers['x-delay'])) {
$stamps[] = new Delay((int) ($headers['x-delay'] / 1000));
}
return Envelope::wrap($message, ...$stamps);
}
}