diff --git a/banzai/main.py b/banzai/main.py index 115123da..e0fc4e66 100755 --- a/banzai/main.py +++ b/banzai/main.py @@ -48,13 +48,12 @@ def get_consumers(self, Consumer, channel): def on_message(self, body, message): instrument = LCOFrameFactory.get_instrument_from_header(body, self.runtime_context.db_address) - queue_name = self.runtime_context.CELERY_TASK_QUEUE_NAME - try: - if instrument.nx * instrument.ny > self.runtime_context.LARGE_WORKER_THRESHOLD: - queue_name = self.runtime_context.LARGE_WORKER_QUEUE - except Exception as e: - logger.warning(f"Instrument not found in DB, or instrument size not defined: {e}", extra_tags={'instrument': body.get('INSTRUME')}) - + if instrument is None or instrument.nx is None: + queue_name = self.runtime_context.CELERY_TASK_QUEUE_NAME + elif instrument.nx * instrument.ny > self.runtime_context.LARGE_WORKER_THRESHOLD: + queue_name = self.runtime_context.LARGE_WORKER_QUEUE + else: + queue_name = self.runtime_context.CELERY_TASK_QUEUE_NAME process_image.apply_async(args=(body, vars(self.runtime_context)), queue=queue_name) message.ack() # acknowledge to the sender we got this message (it can be popped)