diff --git a/src/Service/QueueManager.php b/src/Service/QueueManager.php index f4206ec..02244c7 100644 --- a/src/Service/QueueManager.php +++ b/src/Service/QueueManager.php @@ -16,6 +16,9 @@ final class QueueManager implements ProducerQueueManagerInterface, WorkerQueueManagerInterface { + private const JOB_NOT_FOUND_MAX_RETRIES = 3; + private const JOB_NOT_FOUND_RETRY_DELAY_SECONDS = 3; + /** * @var BeanstalkClient */ @@ -52,6 +55,11 @@ final class QueueManager implements ProducerQueueManagerInterface, WorkerQueueMa */ private $batchSize; + /** + * @var array + */ + private static $beanstalkIdToNotFoundCountMap = []; + public function __construct( FlowConfig $flowConfig, BeanstalkClient $beanstalkClient, @@ -128,21 +136,38 @@ public function flush(): Promise public function getNextJob(): Promise { return call(function () { - try { - $rawJob = yield $this->beanstalkClient->reserve(); - } catch (\Exception $ex) { - throw new FatalQueueException($ex->getMessage(), $ex->getCode(), $ex); - } - - list($jobBeanstalkId, $jobUuid) = $rawJob; - - try { - /** @var Job $job */ - $job = yield $this->elasticSearch->fetchJob($jobUuid, $this->flowConfig->getTube()); - } catch (\Throwable $exception) { - yield $this->beanstalkClient->bury($jobBeanstalkId); - - throw new JobNotFoundException($jobUuid, 0, $exception); + while (true) { + try { + $rawJob = yield $this->beanstalkClient->reserve(); + } catch (\Exception $ex) { + throw new FatalQueueException($ex->getMessage(), $ex->getCode(), $ex); + } + + list($jobBeanstalkId, $jobUuid) = $rawJob; + + try { + /** @var Job $job */ + $job = yield $this->elasticSearch->fetchJob($jobUuid, $this->flowConfig->getTube()); + } catch (\Throwable $exception) { + $notFoundCount = self::$beanstalkIdToNotFoundCountMap[$jobBeanstalkId] ?? 1; + if ($notFoundCount <= self::JOB_NOT_FOUND_MAX_RETRIES) { + self::$beanstalkIdToNotFoundCountMap[$jobBeanstalkId] = $notFoundCount + 1; + $this->logger->warning( + "Job with UUID {$jobUuid} not found in Elasticsearch. Retrying...", + ['beanstalk_id' => $jobBeanstalkId, 'not_found_count' => $notFoundCount] + ); + yield $this->beanstalkClient->release( + $jobBeanstalkId, + self::JOB_NOT_FOUND_RETRY_DELAY_SECONDS ** $notFoundCount + ); + continue; + } + yield $this->beanstalkClient->bury($jobBeanstalkId); + + throw new JobNotFoundException($jobUuid, 0, $exception); + } + + break; } $this->saveJobBeanstalkId($job, $jobBeanstalkId);