diff --git a/src/WorkerInstance.php b/src/WorkerInstance.php index ba77b8b..46bd63d 100644 --- a/src/WorkerInstance.php +++ b/src/WorkerInstance.php @@ -80,6 +80,7 @@ public function boot(): Promise $payloadData = @unserialize($rawPayload, ['allowed_classes' => false]); if ($payloadData === false) { $logContext['raw_payload'] = NonUtf8Cleaner::cleanString($rawPayload); + yield $this->beanstalkClient->bury($jobId); $this->logger->critical('Cannot unserialize job payload so it has been buried.', $logContext); continue; } diff --git a/tests/BeanstalkTestCase.php b/tests/BeanstalkTestCase.php index a57bdf8..fa66cab 100644 --- a/tests/BeanstalkTestCase.php +++ b/tests/BeanstalkTestCase.php @@ -73,4 +73,14 @@ protected function assertReadyJobsCountInTube(int $expectedReadyJobsCount, strin $stats = $this->pheanstalk->statsTube($tube); $this->assertEquals($expectedReadyJobsCount, $stats['current-jobs-ready']); } + + /** + * @param int $expectedBuriedJobsCount + * @param string $tube + */ + protected function assertBuriedJobsCountInTube(int $expectedBuriedJobsCount, string $tube) + { + $stats = $this->pheanstalk->statsTube($tube); + $this->assertEquals($expectedBuriedJobsCount, $stats['current-jobs-buried']); + } } diff --git a/tests/Integration/JobUnserializationErrorHandlingTest.php b/tests/Integration/JobUnserializationErrorHandlingTest.php index 3b56cf3..b8c11b5 100644 --- a/tests/Integration/JobUnserializationErrorHandlingTest.php +++ b/tests/Integration/JobUnserializationErrorHandlingTest.php @@ -6,6 +6,7 @@ use Amp\Beanstalk\BeanstalkClient; use Amp\Loop; use org\bovigo\vfs\vfsStream; +use Webgriffe\Esb\BeanstalkTestCase; use Webgriffe\Esb\DummyFilesystemWorker; use Webgriffe\Esb\DummyRepeatProducer; use Webgriffe\Esb\KernelTestCase; @@ -38,7 +39,9 @@ public function testNotUnserializableJobShouldBeHandled() Loop::delay( 200, function () { - $beanstalkdClient = new BeanstalkClient('tcp://127.0.0.1:11300?tube=' . self::TUBE); + $beanstalkdClient = new BeanstalkClient( + BeanstalkTestCase::getBeanstalkdConnectionUri() . '?tube=' . self::TUBE + ); yield $beanstalkdClient->put('... not unserializable payload ...'); Loop::delay( 200, @@ -52,5 +55,7 @@ function () { self::$kernel->boot(); $this->assertContains('Cannot unserialize job payload so it has been buried.', $this->dumpLog()); + $this->assertReadyJobsCountInTube(0, self::TUBE); + $this->assertBuriedJobsCountInTube(1, self::TUBE); } }