Skip to content

Commit

Permalink
Merge pull request #162 from salute-developers/fix/issue98
Browse files Browse the repository at this point in the history
fix queue_index
  • Loading branch information
SyrexMinus authored Nov 20, 2023
2 parents 45b368e + 2e08005 commit a7478f2
Showing 1 changed file with 3 additions and 1 deletion.
4 changes: 3 additions & 1 deletion smart_kit/start_points/main_loop_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ def __init__(self, *args, **kwargs):
self.skip_delay = self.waiting_message_timeout.get('skip', 8000)
self.worker_tasks = []
self.max_concurrent_messages = self.template_settings.get("max_concurrent_messages", 10)
if self.max_concurrent_messages < 1:
raise ValueError(f"max_concurrent_messages must be greater than 0 (actual: {self.max_concurrent_messages})")
self.queues = [asyncio.Queue() for _ in range(self.max_concurrent_messages)]
self.total_messages = 0

Expand Down Expand Up @@ -299,7 +301,7 @@ async def poll_kafka(self, kafka_key, queues):
async def put_to_queue(self, mq_message, executable, kwargs):
key = mq_message.key
if key:
queue_index = int(hashlib.sha1(key).hexdigest(), 16) % (len(self.queues) - 1)
queue_index = int(hashlib.sha256(key).hexdigest(), 16) % len(self.queues)
else:
queue_index = (len(self.queues) - 1)

Expand Down

0 comments on commit a7478f2

Please sign in to comment.