Skip to content

Commit

Permalink
fix queue_index
Browse files Browse the repository at this point in the history
(cherry picked from commit 3f77e1b)
  • Loading branch information
Dan1lD committed Dec 1, 2023
1 parent 47e3d29 commit dbd50c6
Showing 1 changed file with 3 additions and 1 deletion.
4 changes: 3 additions & 1 deletion smart_kit/start_points/main_loop_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ def __init__(self, *args, **kwargs):
self.skip_delay = self.waiting_message_timeout.get('skip', 8000)
self.worker_tasks = []
self.max_concurrent_messages = self.template_settings.get("max_concurrent_messages", 10)
if self.max_concurrent_messages < 1:
raise ValueError(f"max_concurrent_messages must be greater than 0 (actual: {self.max_concurrent_messages}")
self.queues = [asyncio.Queue() for _ in range(self.max_concurrent_messages)]
self.total_messages = 0

Expand Down Expand Up @@ -291,7 +293,7 @@ async def poll_kafka(self, kafka_key, queues):
async def put_to_queue(self, mq_message, executable, kwargs):
key = mq_message.key()
if key:
queue_index = int(hashlib.sha1(key).hexdigest(), 16) % len(self.queues)
queue_index = int(hashlib.sha256(key).hexdigest(), 16) % len(self.queues)
else:
queue_index = (len(self.queues) - 1)

Expand Down

0 comments on commit dbd50c6

Please sign in to comment.