From 581196e2163f5c9fa21b1a2c19450923a0176142 Mon Sep 17 00:00:00 2001 From: Matt Daily Date: Mon, 4 Nov 2024 10:27:23 -0800 Subject: [PATCH] Additional logs to help narrow down issue --- banzai/celery.py | 1 + banzai/main.py | 9 ++++++++- banzai/utils/realtime_utils.py | 7 ++++++- 3 files changed, 15 insertions(+), 2 deletions(-) diff --git a/banzai/celery.py b/banzai/celery.py index c5647764..28637130 100644 --- a/banzai/celery.py +++ b/banzai/celery.py @@ -180,6 +180,7 @@ def process_image(file_info: dict, runtime_context: dict): :param file_info: Body of queue message: dict :param runtime_context: Context object with runtime environment info """ + logger.info('Processing frame', extra_tags={'filename': file_info.get('filename')}) runtime_context = Context(runtime_context) try: if realtime_utils.need_to_process_image(file_info, runtime_context): diff --git a/banzai/main.py b/banzai/main.py index 34d1a827..bd1c1fd5 100755 --- a/banzai/main.py +++ b/banzai/main.py @@ -10,6 +10,7 @@ import argparse import os.path import logging +import traceback from kombu import Exchange, Connection, Queue from kombu.mixins import ConsumerMixin @@ -47,7 +48,13 @@ def get_consumers(self, Consumer, channel): return [consumer] def on_message(self, body, message): - instrument = LCOFrameFactory.get_instrument_from_header(body, self.runtime_context.db_address) + logger.info('Received message', extra_tags={'filename': body['filename']}) + try: + instrument = LCOFrameFactory.get_instrument_from_header(body, self.runtime_context.db_address) + except Exception: + logger.error(f'Could not get instrument from header. {traceback.format_exc()}', extra_tags={'filename': body['filename']}) + message.ack() + return if instrument is None or instrument.nx is None: queue_name = self.runtime_context.CELERY_TASK_QUEUE_NAME elif instrument.nx * instrument.ny > self.runtime_context.LARGE_WORKER_THRESHOLD: diff --git a/banzai/utils/realtime_utils.py b/banzai/utils/realtime_utils.py index e26fe28c..2f6ca001 100644 --- a/banzai/utils/realtime_utils.py +++ b/banzai/utils/realtime_utils.py @@ -1,4 +1,5 @@ import os + from banzai import dbs from banzai.utils import file_utils, import_utils, image_utils from banzai.data import HeaderOnly @@ -91,7 +92,11 @@ def need_to_process_image(file_info, context): factory = import_utils.import_attribute(context.FRAME_FACTORY)() test_image = factory.observation_frame_class(hdu_list=[HeaderOnly(file_info, name='')], file_path=file_info['filename']) - test_image.instrument = factory.get_instrument_from_header(file_info, db_address=context.db_address) + try: + test_image.instrument = factory.get_instrument_from_header(file_info, db_address=context.db_address) + except Exception: + logger.error(f'Issue getting instrument from header. {logs.format_exception()}', extra_tags={'filename': filename}) + need_to_process = False if image_utils.get_reduction_level(test_image.meta) != '00': logger.error('Image has nonzero reduction level. Aborting.', extra_tags={'filename': filename}) need_to_process = False