Skip to content

Commit

Permalink
Try another consumption of job, if no job found instead of increment …
Browse files Browse the repository at this point in the history
…job executed

New option `WorkerOptions::$sleepNoJob` (default: 1 second) to wait before retry consumption in case of no job
  • Loading branch information
ElGigi committed Jan 20, 2025
1 parent b74a5bf commit 049918a
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 1 deletion.
5 changes: 4 additions & 1 deletion src/Worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -169,13 +169,16 @@ public function run(QueueInterface $queue, WorkerOptions $options = new WorkerOp

// Get a job to consume
$job = $queue->consume();
$nbJobsExecuted++;

if (null === $job) {
$this->logger?->debug('No job to consume');
usleep((int)($options->sleepNoJob * 1000 * 1000));
continue;
}

// Increment nb job to execute
$nbJobsExecuted++;

$this->logger?->info(
sprintf(
'Job %s consumed from queue %s',
Expand Down
2 changes: 2 additions & 0 deletions src/WorkerOptions.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public function __construct(
public ?string $killFilePath = null,
public bool $stopNoJob = false,
public int|float $sleep = 0,
public int|float $sleepNoJob = 1,
) {
}

Expand All @@ -43,6 +44,7 @@ public function logOptions(?LoggerInterface $logger): void
$logger?->debug(sprintf('Kill file path: %s', $this->killFilePath ?? '--'));
$logger?->debug(sprintf('Stop on no jobs: %s', $this->stopNoJob ? 'yes' : 'no'));
$logger?->debug(sprintf('Sleep between job consumption: %s', $this->unit($this->sleep, 'second(s)')));
$logger?->debug(sprintf('Sleep if no job: %s', $this->unit($this->sleepNoJob, 'second(s)')));
}

private function unit(int|float $value, ?string $unit = null): string
Expand Down

0 comments on commit 049918a

Please sign in to comment.