Skip to content

Commit

Permalink
Merge pull request #399 from LCOGT/fix/dropped-messages
Browse files Browse the repository at this point in the history
Fix/dropped messages
  • Loading branch information
cmccully authored Nov 5, 2024
2 parents d9f041f + c8ec352 commit c8f13f6
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 4 deletions.
7 changes: 6 additions & 1 deletion .github/workflows/build-docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ concurrency: ${{ github.workflow }}-${{ github.ref }}
on:
push:
branches:
- "*"
- "**"
tags:
- "*"
pull_request:
Expand Down Expand Up @@ -47,6 +47,11 @@ jobs:
uses: docker/metadata-action@v4
with:
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
tags: |
type=sha
type=ref,event=branch
type=ref,event=pr
type=semver,pattern={{version}}
- name: Build and also push Dockerimage
id: build-and-push
Expand Down
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
1.19.1 (2024-11-05)
-------------------
- Added extra logging and try excepts to catch frames that bypass silently

1.19.0 (2024-10-16)
-------------------
- Added the ability to read fits files that are pre downloaded and are already in memory
Expand Down
1 change: 1 addition & 0 deletions banzai/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ def process_image(file_info: dict, runtime_context: dict):
:param file_info: Body of queue message: dict
:param runtime_context: Context object with runtime environment info
"""
logger.info('Processing frame', extra_tags={'filename': file_info.get('filename')})
runtime_context = Context(runtime_context)
try:
if realtime_utils.need_to_process_image(file_info, runtime_context):
Expand Down
9 changes: 8 additions & 1 deletion banzai/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import argparse
import os.path
import logging
import traceback

from kombu import Exchange, Connection, Queue
from kombu.mixins import ConsumerMixin
Expand Down Expand Up @@ -47,7 +48,13 @@ def get_consumers(self, Consumer, channel):
return [consumer]

def on_message(self, body, message):
instrument = LCOFrameFactory.get_instrument_from_header(body, self.runtime_context.db_address)
logger.info('Received message', extra_tags={'filename': body['filename']})
try:
instrument = LCOFrameFactory.get_instrument_from_header(body, self.runtime_context.db_address)
except Exception:
logger.error(f'Could not get instrument from header. {traceback.format_exc()}', extra_tags={'filename': body['filename']})
message.ack()
return
if instrument is None or instrument.nx is None:
queue_name = self.runtime_context.CELERY_TASK_QUEUE_NAME
elif instrument.nx * instrument.ny > self.runtime_context.LARGE_WORKER_THRESHOLD:
Expand Down
12 changes: 10 additions & 2 deletions banzai/utils/realtime_utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os

from banzai import dbs
from banzai.utils import file_utils, import_utils, image_utils
from banzai.data import HeaderOnly
Expand Down Expand Up @@ -48,6 +49,7 @@ def need_to_process_image(file_info, context):

if 'frameid' in file_info:
if 'version_set' not in file_info:
logger.info("Version set not available in file_info", extra_tags={"filename": file_info['filename']})
return True
checksum = file_info['version_set'][0].get('md5')
filename = file_info['filename']
Expand All @@ -57,7 +59,7 @@ def need_to_process_image(file_info, context):

logger.info("Checking if file needs to be processed", extra_tags={"filename": filename})
if not (filename.endswith('.fits') or filename.endswith('.fits.fz')):
logger.debug("Filename does not have a .fits extension, stopping reduction",
logger.error("Filename does not have a .fits extension, stopping reduction",
extra_tags={"filename": filename})
return False

Expand All @@ -70,6 +72,7 @@ def need_to_process_image(file_info, context):
# Check the md5.
# Reset the number of tries if the file has changed on disk/in s3
if image.checksum != checksum:
logger.info('File has changed on disk. Resetting success flags and tries', extra_tags={'filename': filename})
need_to_process = True
image.checksum = checksum
image.tries = 0
Expand All @@ -78,6 +81,7 @@ def need_to_process_image(file_info, context):

# Check if we need to try again
elif image.tries < context.max_tries and not image.success:
logger.info('File has not been successfully processed yet. Trying again.', extra_tags={'filename': filename})
need_to_process = True
dbs.commit_processed_image(image, context.db_address)

Expand All @@ -88,7 +92,11 @@ def need_to_process_image(file_info, context):
factory = import_utils.import_attribute(context.FRAME_FACTORY)()
test_image = factory.observation_frame_class(hdu_list=[HeaderOnly(file_info, name='')],
file_path=file_info['filename'])
test_image.instrument = factory.get_instrument_from_header(file_info, db_address=context.db_address)
try:
test_image.instrument = factory.get_instrument_from_header(file_info, db_address=context.db_address)
except Exception:
logger.error(f'Issue getting instrument from header. {logs.format_exception()}', extra_tags={'filename': filename})
need_to_process = False
if image_utils.get_reduction_level(test_image.meta) != '00':
logger.error('Image has nonzero reduction level. Aborting.', extra_tags={'filename': filename})
need_to_process = False
Expand Down

0 comments on commit c8f13f6

Please sign in to comment.