Skip to content

Commit

Permalink
Implement job not found on ES retrial
Browse files Browse the repository at this point in the history
  • Loading branch information
mmenozzi committed Oct 23, 2024
1 parent b5dc7ae commit 6fbbba4
Showing 1 changed file with 21 additions and 0 deletions.
21 changes: 21 additions & 0 deletions src/Service/QueueManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

final class QueueManager implements ProducerQueueManagerInterface, WorkerQueueManagerInterface
{
private const JOB_NOT_FOUND_MAX_RETRIES = 3;
private const JOB_NOT_FOUND_RETRY_DELAY_SECONDS = 3;

/**
* @var BeanstalkClient
*/
Expand Down Expand Up @@ -52,6 +55,11 @@ final class QueueManager implements ProducerQueueManagerInterface, WorkerQueueMa
*/
private $batchSize;

/**
* @var array<int, int>
*/
private static $beanstalkIdToNotFoundCountMap = [];

public function __construct(
FlowConfig $flowConfig,
BeanstalkClient $beanstalkClient,
Expand Down Expand Up @@ -140,6 +148,19 @@ public function getNextJob(): Promise
/** @var Job $job */
$job = yield $this->elasticSearch->fetchJob($jobUuid, $this->flowConfig->getTube());
} catch (\Throwable $exception) {
$notFoundCount = self::$beanstalkIdToNotFoundCountMap[$jobBeanstalkId] ?? 1;
if ($notFoundCount <= self::JOB_NOT_FOUND_MAX_RETRIES) {
self::$beanstalkIdToNotFoundCountMap[$jobBeanstalkId] = $notFoundCount + 1;
$this->logger->warning(
"Job with UUID {$jobUuid} not found in Elasticsearch. Retrying...",
['beanstalk_id' => $jobBeanstalkId, 'not_found_count' => $notFoundCount]
);
yield $this->beanstalkClient->release(
$jobBeanstalkId,
self::JOB_NOT_FOUND_RETRY_DELAY_SECONDS ** $notFoundCount
);
return yield $this->getNextJob();
}
yield $this->beanstalkClient->bury($jobBeanstalkId);

throw new JobNotFoundException($jobUuid, 0, $exception);
Expand Down

0 comments on commit 6fbbba4

Please sign in to comment.