From 2f1ce63ce24ce240577adfd62cba66092306f60d Mon Sep 17 00:00:00 2001 From: Danil D Date: Wed, 8 Nov 2023 10:54:09 +0300 Subject: [PATCH 1/3] fix queue_index --- smart_kit/start_points/main_loop_kafka.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/smart_kit/start_points/main_loop_kafka.py b/smart_kit/start_points/main_loop_kafka.py index a0c2b925..b3c5d5a2 100644 --- a/smart_kit/start_points/main_loop_kafka.py +++ b/smart_kit/start_points/main_loop_kafka.py @@ -293,7 +293,7 @@ async def poll_kafka(self, kafka_key, queues): async def put_to_queue(self, mq_message, executable, kwargs): key = mq_message.key if key: - queue_index = int(hashlib.sha1(key).hexdigest(), 16) % (len(self.queues) - 1) + queue_index = int(hashlib.sha1(key).hexdigest(), 16) % len(self.queues) else: queue_index = (len(self.queues) - 1) From 3f77e1b3ac33d0ddef3a9eb7357f453577dde362 Mon Sep 17 00:00:00 2001 From: Danil D Date: Fri, 17 Nov 2023 12:13:48 +0300 Subject: [PATCH 2/3] fix queue_index --- smart_kit/start_points/main_loop_kafka.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/smart_kit/start_points/main_loop_kafka.py b/smart_kit/start_points/main_loop_kafka.py index b3c5d5a2..b106f2ae 100644 --- a/smart_kit/start_points/main_loop_kafka.py +++ b/smart_kit/start_points/main_loop_kafka.py @@ -78,6 +78,8 @@ def __init__(self, *args, **kwargs): self.skip_delay = self.waiting_message_timeout.get('skip', 8000) self.worker_tasks = [] self.max_concurrent_messages = self.template_settings.get("max_concurrent_messages", 10) + if self.max_concurrent_messages < 1: + raise ValueError(f"max_concurrent_messages must be greater than 0 (actual: {self.max_concurrent_messages}") self.queues = [asyncio.Queue() for _ in range(self.max_concurrent_messages)] self.total_messages = 0 @@ -293,7 +295,7 @@ async def poll_kafka(self, kafka_key, queues): async def put_to_queue(self, mq_message, executable, kwargs): key = mq_message.key if key: - queue_index = int(hashlib.sha1(key).hexdigest(), 16) % len(self.queues) + queue_index = int(hashlib.sha256(key).hexdigest(), 16) % len(self.queues) else: queue_index = (len(self.queues) - 1) From 2e08005d9802afe0424ce3805b537cc2c597a4f7 Mon Sep 17 00:00:00 2001 From: Danil D Date: Fri, 17 Nov 2023 12:14:41 +0300 Subject: [PATCH 3/3] fix typo --- smart_kit/start_points/main_loop_kafka.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/smart_kit/start_points/main_loop_kafka.py b/smart_kit/start_points/main_loop_kafka.py index b106f2ae..b8359082 100644 --- a/smart_kit/start_points/main_loop_kafka.py +++ b/smart_kit/start_points/main_loop_kafka.py @@ -79,7 +79,7 @@ def __init__(self, *args, **kwargs): self.worker_tasks = [] self.max_concurrent_messages = self.template_settings.get("max_concurrent_messages", 10) if self.max_concurrent_messages < 1: - raise ValueError(f"max_concurrent_messages must be greater than 0 (actual: {self.max_concurrent_messages}") + raise ValueError(f"max_concurrent_messages must be greater than 0 (actual: {self.max_concurrent_messages})") self.queues = [asyncio.Queue() for _ in range(self.max_concurrent_messages)] self.total_messages = 0