Skip to content

Commit

Permalink
Merge branch 'master' into queue-factory
Browse files Browse the repository at this point in the history
  • Loading branch information
vjik authored Nov 25, 2024
2 parents dc95ac7 + af0fb00 commit 3699e6f
Show file tree
Hide file tree
Showing 11 changed files with 203 additions and 24 deletions.
31 changes: 27 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,16 @@ Each queue task consists of two parts:
`Yiisoft\Queue\Message\Message`. For more complex cases you should implement the interface by your own.
2. A message handler is a callable called by a `Yiisoft\Queue\Worker\Worker`. The handler handles each queue message.

For example, if you need to download and save a file, your message may look like the following:
For example, if you need to download and save a file, your message creation may look like the following:
- Message handler as the first parameter
- Message data as the second parameter

```php
$data = [
'url' => $url,
'destinationFile' => $filename,
];
$message = new \Yiisoft\Queue\Message\Message('file-download', $data);
$message = new \Yiisoft\Queue\Message\Message(FileDownloader::class, $data);
```

Then you should push it to the queue:
Expand Down Expand Up @@ -93,9 +95,8 @@ class FileDownloader
The last thing we should do is to create a configuration for the `Yiisoft\Queue\Worker\Worker`:

```php
$handlers = ['file-download' => [new FileDownloader('/path/to/save/files'), 'handle']];
$worker = new \Yiisoft\Queue\Worker\Worker(
$handlers, // Here it is
[],
$logger,
$injector,
$container
Expand Down Expand Up @@ -134,6 +135,28 @@ $status->isReserved();
$status->isDone();
```

## Custom handler names
### Custom handler names

By default, when you push a message to the queue, the message handler name is the fully qualified class name of the handler.
This can be useful for most cases, but sometimes you may want to use a shorter name or arbitrary string as the handler name.
This can be useful when you want to reduce the amount of data being passed or when you communicate with external systems.

To use a custom handler name before message push, you can pass it as the first argument `Message` when creating it:
```php
new Message('handler-name', $data);
```

To use a custom handler name on message consume, you should configure handler mapping for the `Worker` class:
```php
$worker = new \Yiisoft\Queue\Worker\Worker(
['handler-name' => FooHandler::class],
$logger,
$injector,
$container
);
```

## Different queue channels

Often we need to push to different queue channels with an only application. There is the `QueueProviderInterface`
Expand Down
10 changes: 10 additions & 0 deletions src/Message/EnvelopeTrait.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,16 @@ trait EnvelopeTrait
{
private MessageInterface $message;

/**
* A mirror of {@see MessageInterface::fromData()}
*/
abstract public static function fromMessage(MessageInterface $message): self;

public static function fromData(string $handlerName, mixed $data, array $metadata = []): MessageInterface
{
return self::fromMessage(Message::fromData($handlerName, $data, $metadata));
}

public function getMessage(): MessageInterface
{
return $this->message;
Expand Down
42 changes: 30 additions & 12 deletions src/Message/JsonMessageSerializer.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ public function serialize(MessageInterface $message): string
'data' => $message->getData(),
'meta' => $message->getMetadata(),
];
if (!isset($payload['meta']['message-class'])) {
$payload['meta']['message-class'] = $message instanceof EnvelopeInterface
? $message->getMessage()::class
: $message::class;
}

return json_encode($payload, JSON_THROW_ON_ERROR);
}
Expand All @@ -34,25 +39,38 @@ public function unserialize(string $value): MessageInterface
throw new InvalidArgumentException('Payload must be array. Got ' . get_debug_type($payload) . '.');
}

$name = $payload['name'] ?? null;
if (!isset($name) || !is_string($name)) {
throw new InvalidArgumentException('Handler name must be a string. Got ' . get_debug_type($name) . '.');
}

$meta = $payload['meta'] ?? [];
if (!is_array($meta)) {
throw new InvalidArgumentException('Metadata must be array. Got ' . get_debug_type($meta) . '.');
throw new InvalidArgumentException('Metadata must be an array. Got ' . get_debug_type($meta) . '.');
}

// TODO: will be removed later
$message = new Message($payload['name'] ?? '$name', $payload['data'] ?? null, $meta);

$envelopes = [];
if (isset($meta[EnvelopeInterface::ENVELOPE_STACK_KEY]) && is_array($meta[EnvelopeInterface::ENVELOPE_STACK_KEY])) {
$message = $message->withMetadata(
array_merge($message->getMetadata(), [EnvelopeInterface::ENVELOPE_STACK_KEY => []]),
);
foreach ($meta[EnvelopeInterface::ENVELOPE_STACK_KEY] as $envelope) {
if (is_string($envelope) && class_exists($envelope) && is_subclass_of($envelope, EnvelopeInterface::class)) {
$message = $envelope::fromMessage($message);
}
}
$envelopes = $meta[EnvelopeInterface::ENVELOPE_STACK_KEY];
}
$meta[EnvelopeInterface::ENVELOPE_STACK_KEY] = [];

$class = $payload['meta']['message-class'] ?? Message::class;
// Don't check subclasses when it's a default class: that's faster
if ($class !== Message::class && !is_subclass_of($class, MessageInterface::class)) {
$class = Message::class;
}

/**
* @var class-string<MessageInterface> $class
*/
$message = $class::fromData($name, $payload['data'] ?? null, $meta);

foreach ($envelopes as $envelope) {
if (is_string($envelope) && class_exists($envelope) && is_subclass_of($envelope, EnvelopeInterface::class)) {
$message = $envelope::fromMessage($message);
}
}

return $message;
}
Expand Down
5 changes: 5 additions & 0 deletions src/Message/Message.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ public function __construct(
) {
}

public static function fromData(string $handlerName, mixed $data, array $metadata = []): MessageInterface
{
return new self($handlerName, $data, $metadata);
}

public function getHandlerName(): string
{
return $this->handlerName;
Expand Down
10 changes: 10 additions & 0 deletions src/Message/MessageHandlerInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<?php

declare(strict_types=1);

namespace Yiisoft\Queue\Message;

interface MessageHandlerInterface
{
public function handle(MessageInterface $message): void;
}
2 changes: 2 additions & 0 deletions src/Message/MessageInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

interface MessageInterface
{
public static function fromData(string $handlerName, mixed $data, array $metadata = []): self;

/**
* Returns handler name.
*
Expand Down
13 changes: 13 additions & 0 deletions src/Worker/Worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use Throwable;
use Yiisoft\Injector\Injector;
use Yiisoft\Queue\Exception\JobFailureException;
use Yiisoft\Queue\Message\MessageHandlerInterface;
use Yiisoft\Queue\Message\MessageInterface;
use Yiisoft\Queue\Middleware\Consume\ConsumeFinalHandler;
use Yiisoft\Queue\Middleware\Consume\ConsumeMiddlewareDispatcher;
Expand Down Expand Up @@ -77,6 +78,18 @@ public function process(MessageInterface $message, QueueInterface $queue): Messa
private function getHandler(string $name): ?callable
{
if (!array_key_exists($name, $this->handlersCached)) {
$definition = $this->handlers[$name] ?? null;
if ($definition === null && $this->container->has($name)) {
$handler = $this->container->get($name);
if ($handler instanceof MessageHandlerInterface) {
$this->handlersCached[$name] = $handler->handle(...);

return $this->handlersCached[$name];
}

return null;
}

$this->handlersCached[$name] = $this->prepare($this->handlers[$name] ?? null);
}

Expand Down
24 changes: 24 additions & 0 deletions tests/Integration/MessageConsumingTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
use Yiisoft\Queue\Middleware\Consume\MiddlewareFactoryConsumeInterface;
use Yiisoft\Queue\Middleware\FailureHandling\FailureMiddlewareDispatcher;
use Yiisoft\Queue\Middleware\FailureHandling\MiddlewareFactoryFailureInterface;
use Yiisoft\Queue\Tests\Integration\Support\TestHandler;
use Yiisoft\Queue\Tests\TestCase;
use Yiisoft\Queue\Worker\Worker;

Expand Down Expand Up @@ -41,4 +42,27 @@ public function testMessagesConsumed(): void

$this->assertEquals($messages, $this->messagesProcessed);
}

public function testMessagesConsumedByHandlerClass(): void
{
$handler = new TestHandler();
$container = $this->createMock(ContainerInterface::class);
$container->method('get')->with(TestHandler::class)->willReturn($handler);
$container->method('has')->with(TestHandler::class)->willReturn(true);
$worker = new Worker(
[],
new NullLogger(),
new Injector($container),
$container,
new ConsumeMiddlewareDispatcher($this->createMock(MiddlewareFactoryConsumeInterface::class)),
new FailureMiddlewareDispatcher($this->createMock(MiddlewareFactoryFailureInterface::class), [])
);

$messages = [1, 'foo', 'bar-baz'];
foreach ($messages as $message) {
$worker->process(new Message(TestHandler::class, $message), $this->getQueue());
}

$this->assertEquals($messages, $handler->messagesProcessed);
}
}
20 changes: 20 additions & 0 deletions tests/Integration/Support/TestHandler.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<?php

declare(strict_types=1);

namespace Yiisoft\Queue\Tests\Integration\Support;

use Yiisoft\Queue\Message\MessageHandlerInterface;
use Yiisoft\Queue\Message\MessageInterface;

final class TestHandler implements MessageHandlerInterface
{
public function __construct(public array $messagesProcessed = [])
{
}

public function handle(MessageInterface $message): void
{
$this->messagesProcessed[] = $message->getData();
}
}
40 changes: 32 additions & 8 deletions tests/Unit/Message/JsonMessageSerializerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
use Yiisoft\Queue\Message\JsonMessageSerializer;
use Yiisoft\Queue\Message\Message;
use Yiisoft\Queue\Message\MessageInterface;
use Yiisoft\Queue\Tests\Unit\Support\TestMessage;

/**
* Testing message serialization options
Expand Down Expand Up @@ -42,10 +43,10 @@ public static function dataUnsupportedPayloadFormat(): iterable
*/
public function testMetadataFormat(mixed $meta): void
{
$payload = ['data' => 'test', 'meta' => $meta];
$payload = ['name' => 'handler', 'data' => 'test', 'meta' => $meta];
$serializer = $this->createSerializer();

$this->expectExceptionMessage(sprintf('Metadata must be array. Got %s.', get_debug_type($meta)));
$this->expectExceptionMessage(sprintf('Metadata must be an array. Got %s.', get_debug_type($meta)));
$this->expectException(InvalidArgumentException::class);
$serializer->unserialize(json_encode($payload));
}
Expand All @@ -59,31 +60,32 @@ public static function dataUnsupportedMetadataFormat(): iterable

public function testUnserializeFromData(): void
{
$payload = ['data' => 'test'];
$payload = ['name' => 'handler', 'data' => 'test'];
$serializer = $this->createSerializer();

$message = $serializer->unserialize(json_encode($payload));

$this->assertInstanceOf(MessageInterface::class, $message);
$this->assertEquals($payload['data'], $message->getData());
$this->assertEquals([], $message->getMetadata());
$this->assertEquals([EnvelopeInterface::ENVELOPE_STACK_KEY => []], $message->getMetadata());
}

public function testUnserializeWithMetadata(): void
{
$payload = ['data' => 'test', 'meta' => ['int' => 1, 'str' => 'string', 'bool' => true]];
$payload = ['name' => 'handler', 'data' => 'test', 'meta' => ['int' => 1, 'str' => 'string', 'bool' => true]];
$serializer = $this->createSerializer();

$message = $serializer->unserialize(json_encode($payload));

$this->assertInstanceOf(MessageInterface::class, $message);
$this->assertEquals($payload['data'], $message->getData());
$this->assertEquals(['int' => 1, 'str' => 'string', 'bool' => true], $message->getMetadata());
$this->assertEquals(['int' => 1, 'str' => 'string', 'bool' => true, EnvelopeInterface::ENVELOPE_STACK_KEY => []], $message->getMetadata());
}

public function testUnserializeEnvelopeStack(): void
{
$payload = [
'name' => 'handler',
'data' => 'test',
'meta' => [
EnvelopeInterface::ENVELOPE_STACK_KEY => [
Expand Down Expand Up @@ -113,7 +115,7 @@ public function testSerialize(): void
$json = $serializer->serialize($message);

$this->assertEquals(
'{"name":"handler","data":"test","meta":[]}',
'{"name":"handler","data":"test","meta":{"message-class":"Yiisoft\\\\Queue\\\\Message\\\\Message"}}',
$json,
);
}
Expand All @@ -129,9 +131,10 @@ public function testSerializeEnvelopeStack(): void

$this->assertEquals(
sprintf(
'{"name":"handler","data":"test","meta":{"envelopes":["%s"],"%s":"test-id"}}',
'{"name":"handler","data":"test","meta":{"envelopes":["%s"],"%s":"test-id","message-class":"%s"}}',
str_replace('\\', '\\\\', IdEnvelope::class),
IdEnvelope::MESSAGE_ID_KEY,
str_replace('\\', '\\\\', Message::class),
),
$json,
);
Expand All @@ -145,14 +148,35 @@ public function testSerializeEnvelopeStack(): void
IdEnvelope::class,
],
IdEnvelope::MESSAGE_ID_KEY => 'test-id',
'message-class' => Message::class,
], $message->getMetadata());

$this->assertEquals([
EnvelopeInterface::ENVELOPE_STACK_KEY => [],
IdEnvelope::MESSAGE_ID_KEY => 'test-id',
'message-class' => Message::class,
], $message->getMessage()->getMetadata());
}

public function testRestoreOriginalMessageClass(): void
{
$message = new TestMessage();
$serializer = $this->createSerializer();
$serializer->unserialize($serializer->serialize($message));

$this->assertInstanceOf(TestMessage::class, $message);
}

public function testRestoreOriginalMessageClassWithEnvelope(): void
{
$message = new IdEnvelope(new TestMessage());
$serializer = $this->createSerializer();
$serializer->unserialize($serializer->serialize($message));

$this->assertInstanceOf(IdEnvelope::class, $message);
$this->assertInstanceOf(TestMessage::class, $message->getMessage());
}

private function createSerializer(): JsonMessageSerializer
{
return new JsonMessageSerializer();
Expand Down
Loading

0 comments on commit 3699e6f

Please sign in to comment.