Skip to content

Commit

Permalink
feat(python): support reusable redis connections
Browse files Browse the repository at this point in the history
Like `BullMQ` for typescript, support reusing the same redis connection. This is helpful in
environments with caps on connection count.
  • Loading branch information
mful authored and manast committed Apr 8, 2024
1 parent b9d6dd6 commit 29ad8c8
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 4 deletions.
6 changes: 4 additions & 2 deletions python/bullmq/redis_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ class RedisConnection:
"canDoubleTimeout": False
}

def __init__(self, redisOpts: dict | str = {}):
def __init__(self, redisOpts: dict | str | redis.Redis = {}):
self.version = None
retry = Retry(ExponentialBackoff(), 3)
retry_errors = [BusyLoadingError, ConnectionError, TimeoutError]

if isinstance(redisOpts, dict):
if isinstance(redisOpts, redis.Redis):
self.conn = redisOpts
elif isinstance(redisOpts, dict):
defaultOpts = {
"host": "localhost",
"port": 6379,
Expand Down
3 changes: 2 additions & 1 deletion python/bullmq/types/queue_options.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@

from typing import TypedDict, Any
import redis.asyncio as redis


class QueueBaseOptions(TypedDict, total=False):
Expand All @@ -8,7 +9,7 @@ class QueueBaseOptions(TypedDict, total=False):
"""

prefix: str
connection: dict[str, Any]
connection: dict[str, Any] | redis.Redis
"""
Prefix for all queue keys.
"""
Expand Down
3 changes: 2 additions & 1 deletion python/bullmq/types/worker_options.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@

from typing import TypedDict, Any
import redis.asyncio as redis


class WorkerOptions(TypedDict, total=False):
Expand Down Expand Up @@ -50,7 +51,7 @@ class WorkerOptions(TypedDict, total=False):
Prefix for all queue keys.
"""

connection: dict[str, Any]
connection: dict[str, Any] | redis.Redis
"""
Options for connecting to a Redis instance.
"""
9 changes: 9 additions & 0 deletions python/tests/queue_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"""

from asyncio import Future
import redis.asyncio as redis
from bullmq import Queue, Worker, Job
from uuid import uuid4

Expand Down Expand Up @@ -421,5 +422,13 @@ async def test_remove_job(self):

await queue.close()

async def test_reusable_redis(self):
conn = redis.Redis(decode_responses=True, host="localhost", port="6379", db=0)
queue = Queue(queueName, {"connection": conn})
job = await queue.add("test-job", {"foo": "bar"}, {})

self.assertEqual(job.id, "1")
await queue.close()

if __name__ == '__main__':
unittest.main()
29 changes: 29 additions & 0 deletions python/tests/worker_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"""

from asyncio import Future
import redis.asyncio as redis
from bullmq import Queue, Worker, Job, WaitingChildrenError
from uuid import uuid4
from enum import Enum
Expand Down Expand Up @@ -395,5 +396,33 @@ def completing(job: Job, result):
await queue.close()
await worker.close()

async def test_reusable_redis(self):
conn = redis.Redis(decode_responses=True, host="localhost", port="6379", db=0)
queue = Queue(queueName, {"connection": conn})
data = {"foo": "bar"}
job = await queue.add("test-job", data, {"removeOnComplete": False})

async def process(job: Job, token: str):
print("Processing job", job)
return "done"

worker = Worker(queueName, process, {"connection": conn})

processing = Future()
worker.on("completed", lambda job, result: processing.set_result(None))

await processing

completedJob = await Job.fromId(queue, job.id)

self.assertEqual(completedJob.id, job.id)
self.assertEqual(completedJob.attemptsMade, 1)
self.assertEqual(completedJob.data, data)
self.assertEqual(completedJob.returnvalue, "done")
self.assertNotEqual(completedJob.finishedOn, None)

await worker.close(force=True)
await queue.close()

if __name__ == '__main__':
unittest.main()

0 comments on commit 29ad8c8

Please sign in to comment.