This package provides an easy way to put PHP tasks of any kind into a work queue such as Beanstalkd or Redis to execute them at a later time.
This is version 0.21.1.
$ composer require mle86/wq
It requires PHP 8.0+ and has no other dependencies (apart from PHPUnit/Coveralls for development and the PSR-3 interfaces).
You'll also want to install at least one other package
which contains a WorkServerAdapter
implementation,
such as:
- mle86/wq-beanstalkd (Beanstalkd server adapter),
- mle86/wq-redis (Redis server adapter),
- mle86/wq-amqp (RabbitMQ server adapter).
-
A Job is something which should be done exactly once. Maybe it's sending an e-mail, maybe it's an external API call like a webhook, maybe it's some slow clean-up process. In any case, we're talking about a unit of work that could be executed right away but it would be better for the application's performance to put it in a Work Queue instead, so it can be done asynchronously.
-
A Work Queue is a list of jobs that should be executed at some other time. They are stored in some kind of Work Server. One work server well-known in the PHP world is Beanstalkd. It can store any number of work queues, although it calls them “tubes”.
Different work queues, or tubes, are commonly used to separate job types.
For example, the same work server might have
one “mail
” queue for outgoing mails to be sent,
one “cleanup
” queue for all kinds of clean-up jobs,
and one “webhook
” queue for outgoing web-hook calls.
This package provides some helpful classes to set up a simple work queue system.
This is our Job implementation. It represents an e-mail that can be sent.
<?php
use mle86\WQ\Job\AbstractJob;
class EMail extends AbstractJob
{
protected $recipient;
protected $subject;
protected $message;
public function __construct(string $recipient, string $subject, string $message)
{
$this->recipient = $recipient;
$this->subject = $subject;
$this->message = $message;
}
public function send()
{
if (mail($this->recipient, $this->subject, $this->message)) {
// ok, has been sent!
} else {
throw new \RuntimeException ("mail() failed");
}
}
}
We have some code using that e-mail class:
<?php
use mle86\WQ\WorkServerAdapter\BeanstalkdWorkServer;
$mailJob = new EMail("[email protected]", "Hello?", "This is a test mail.");
$workServer = BeanstalkdWorkServer::connect("localhost");
$workServer->storeJob("mail", $mailJob);
And finally, we have our background worker script which regularly checks the work server for new e-mail jobs:
<?php
use mle86\WQ\WorkServerAdapter\BeanstalkdWorkServer;
use mle86\WQ\WorkProcessor;
$queue = "mail";
printf("%s worker %d starting.\n", $queue, getmypid());
$processor = new WorkProcessor(BeanstalkdWorkServer::connect("localhost"));
$fn_handler = function(EMail $mailJob) {
$mailJob->send();
// don't catch exceptions here, or the WorkProcessor won't see them.
};
while (true) {
try {
$processor->processNextJob($queue, $fn_handler);
} catch (\Throwable $e) {
echo $e . "\n"; // TODO: add some real logging here
}
}
Class reference:
- Job interface
- AbstractJob base class
- QueueEntry wrapper class
- WorkServerAdapter interface
- AffixAdapter wrapper class
- BlackHoleWorkServer dummy class
- WorkProcessor class
- SignalSafeWorkProcessor class
- JobResult enum class
- JobContext dto class
- Exceptions