diff --git a/banzai/main.py b/banzai/main.py index e0fc4e66..115123da 100755 --- a/banzai/main.py +++ b/banzai/main.py @@ -48,12 +48,13 @@ def get_consumers(self, Consumer, channel): def on_message(self, body, message): instrument = LCOFrameFactory.get_instrument_from_header(body, self.runtime_context.db_address) - if instrument is None or instrument.nx is None: - queue_name = self.runtime_context.CELERY_TASK_QUEUE_NAME - elif instrument.nx * instrument.ny > self.runtime_context.LARGE_WORKER_THRESHOLD: - queue_name = self.runtime_context.LARGE_WORKER_QUEUE - else: - queue_name = self.runtime_context.CELERY_TASK_QUEUE_NAME + queue_name = self.runtime_context.CELERY_TASK_QUEUE_NAME + try: + if instrument.nx * instrument.ny > self.runtime_context.LARGE_WORKER_THRESHOLD: + queue_name = self.runtime_context.LARGE_WORKER_QUEUE + except Exception as e: + logger.warning(f"Instrument not found in DB, or instrument size not defined: {e}", extra_tags={'instrument': body.get('INSTRUME')}) + process_image.apply_async(args=(body, vars(self.runtime_context)), queue=queue_name) message.ack() # acknowledge to the sender we got this message (it can be popped)