Skip to content

Commit

Permalink
Additional logs to help narrow down issue
Browse files Browse the repository at this point in the history
  • Loading branch information
mgdaily committed Nov 4, 2024
1 parent b2bb999 commit 581196e
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 2 deletions.
1 change: 1 addition & 0 deletions banzai/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ def process_image(file_info: dict, runtime_context: dict):
:param file_info: Body of queue message: dict
:param runtime_context: Context object with runtime environment info
"""
logger.info('Processing frame', extra_tags={'filename': file_info.get('filename')})
runtime_context = Context(runtime_context)
try:
if realtime_utils.need_to_process_image(file_info, runtime_context):
Expand Down
9 changes: 8 additions & 1 deletion banzai/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import argparse
import os.path
import logging
import traceback

from kombu import Exchange, Connection, Queue
from kombu.mixins import ConsumerMixin
Expand Down Expand Up @@ -47,7 +48,13 @@ def get_consumers(self, Consumer, channel):
return [consumer]

def on_message(self, body, message):
instrument = LCOFrameFactory.get_instrument_from_header(body, self.runtime_context.db_address)
logger.info('Received message', extra_tags={'filename': body['filename']})
try:
instrument = LCOFrameFactory.get_instrument_from_header(body, self.runtime_context.db_address)
except Exception:
logger.error(f'Could not get instrument from header. {traceback.format_exc()}', extra_tags={'filename': body['filename']})
message.ack()
return
if instrument is None or instrument.nx is None:
queue_name = self.runtime_context.CELERY_TASK_QUEUE_NAME
elif instrument.nx * instrument.ny > self.runtime_context.LARGE_WORKER_THRESHOLD:
Expand Down
7 changes: 6 additions & 1 deletion banzai/utils/realtime_utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os

from banzai import dbs
from banzai.utils import file_utils, import_utils, image_utils
from banzai.data import HeaderOnly
Expand Down Expand Up @@ -91,7 +92,11 @@ def need_to_process_image(file_info, context):
factory = import_utils.import_attribute(context.FRAME_FACTORY)()
test_image = factory.observation_frame_class(hdu_list=[HeaderOnly(file_info, name='')],
file_path=file_info['filename'])
test_image.instrument = factory.get_instrument_from_header(file_info, db_address=context.db_address)
try:
test_image.instrument = factory.get_instrument_from_header(file_info, db_address=context.db_address)
except Exception:
logger.error(f'Issue getting instrument from header. {logs.format_exception()}', extra_tags={'filename': filename})
need_to_process = False
if image_utils.get_reduction_level(test_image.meta) != '00':
logger.error('Image has nonzero reduction level. Aborting.', extra_tags={'filename': filename})
need_to_process = False
Expand Down

0 comments on commit 581196e

Please sign in to comment.