diff --git a/python/bullmq/queue.py b/python/bullmq/queue.py index 32535cb870..44a54355ed 100644 --- a/python/bullmq/queue.py +++ b/python/bullmq/queue.py @@ -116,6 +116,31 @@ def getRateLimitTtl(self): """ return self.client.pttl(self.keys["limiter"]) + async def getJobLogs(self, job_id:str, start = 0, end = -1, asc = True): + """ + Returns the logs for a given Job. + + @param job_id: The id of the job to get the logs for. + @param start: Zero based index from where to start returning jobs. + @param end: Zero based index where to stop returning jobs. + @param asc: If true, the jobs will be returned in ascending order. + """ + + logs_key = self.toKey(job_id + ":logs") + pipe = self.redisConnection.conn.pipeline(transaction=True) + if asc: + pipe.lrange(logs_key, start, end) + else: + pipe.lrange(logs_key, -(end+1), -(start+1)) + pipe.llen(logs_key) + result = await pipe.execute() + if not asc: + result[0].reverse() + return { + "logs": result[0], + "count": result[1] + } + async def obliterate(self, force: bool = False): """ Completely destroys the queue and all of its contents irreversibly. diff --git a/python/bullmq/worker.py b/python/bullmq/worker.py index b97144ac24..957ed26c0a 100644 --- a/python/bullmq/worker.py +++ b/python/bullmq/worker.py @@ -154,7 +154,6 @@ def nextJobFromJobData(self, job_data = None, job_id: str = None, limit_until: i async def waitForJob(self): block_timeout = self.getBlockTimeout(self.blockUntil) block_timeout = block_timeout if self.blockingRedisConnection.capabilities.get("canDoubleTimeout", False) else math.ceil(block_timeout) - block_timeout = min(block_timeout, maximum_block_timeout) result = await self.bclient.bzpopmin(self.scripts.keys["marker"], block_timeout) if result: diff --git a/python/tests/job_tests.py b/python/tests/job_tests.py index ea5a4a9349..290e222348 100644 --- a/python/tests/job_tests.py +++ b/python/tests/job_tests.py @@ -58,6 +58,8 @@ async def test_job_log(self): self.assertEqual(log_count, 2) + logs = await queue.getJobLogs(job.id) + self.assertEqual(logs, {"logs": ["some log text 1", "some log text 2"], "count": 2}) await queue.close() async def test_update_job_data(self):