Skip to content

Commit

Permalink
Added delay to trigger after block end and to prefer public/same bloc…
Browse files Browse the repository at this point in the history
…k/same proposal calibrations.
  • Loading branch information
cmccully committed Dec 11, 2024
1 parent 5e22043 commit 6400d35
Show file tree
Hide file tree
Showing 11 changed files with 134 additions and 43 deletions.
6 changes: 6 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
1.20.0 (2024-12-11)
-------------------
- Added functionality to delay processing until after the end of the observing block
- Added the ability to only use public calibrations
- Added the ability to prefer calibrations taken within the same block or with the same proposal

1.19.1 (2024-11-05)
-------------------
- Added extra logging and try excepts to catch frames that bypass silently
Expand Down
11 changes: 8 additions & 3 deletions banzai/calibrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,14 @@ def apply_master_calibration(self, image, master_calibration_image):
pass

def get_calibration_file_info(self, image):
return dbs.get_master_cal(image, self.calibration_type, self.master_selection_criteria,
use_only_older_calibrations=self.runtime_context.use_only_older_calibrations,
db_address=self.runtime_context.db_address)
return dbs.cal_record_to_file_info(
dbs.get_master_cal_record(image, self.calibration_type, self.master_selection_criteria,
self.runtime_context.db_address,
use_only_older_calibrations=self.runtime_context.use_only_older_calibrations,
prefer_same_block=self.runtime_context.same_block_cals,
prefer_same_proposal=self.runtime_context.prefer_same_proposal,
check_public=self.runtime_context.check_public_cals)
)


class CalibrationComparer(CalibrationUser):
Expand Down
12 changes: 7 additions & 5 deletions banzai/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from celery import Celery
from kombu import Queue

from celery.exceptions import Retry
from banzai import dbs, calibrations, logs
from banzai.utils import date_utils, realtime_utils, stage_utils
from celery.signals import worker_process_init
Expand Down Expand Up @@ -174,15 +174,15 @@ def stack_calibrations(self, min_date: str, max_date: str, instrument_id: int, f
raise self.retry()


@app.task(name='celery.process_image', reject_on_worker_lost=True, max_retries=5)
def process_image(file_info: dict, runtime_context: dict):
@app.task(name='celery.process_image', bind=True, reject_on_worker_lost=True, max_retries=5)
def process_image(self, file_info: dict, runtime_context: dict):
"""
:param file_info: Body of queue message: dict
:param runtime_context: Context object with runtime environment info
"""
logger.info('Processing frame', extra_tags={'filename': file_info.get('filename')})
runtime_context = Context(runtime_context)
try:
logger.info('Processing frame', extra_tags={'filename': file_info.get('filename')})
runtime_context = Context(runtime_context)
if realtime_utils.need_to_process_image(file_info, runtime_context):
if 'path' in file_info:
filename = os.path.basename(file_info['path'])
Expand All @@ -193,6 +193,8 @@ def process_image(file_info: dict, runtime_context: dict):
realtime_utils.increment_try_number(filename, db_address=runtime_context.db_address)
stage_utils.run_pipeline_stages([file_info], runtime_context)
realtime_utils.set_file_as_processed(filename, db_address=runtime_context.db_address)
except Retry:
raise
except Exception:
logger.error("Exception processing frame: {error}".format(error=logs.format_exception()),
extra_tags={'file_info': file_info})
41 changes: 21 additions & 20 deletions banzai/dbs.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@
import os.path
import datetime
from dateutil.parser import parse
import numpy as np
import requests
from sqlalchemy import create_engine, pool, type_coerce, cast
from sqlalchemy import create_engine, pool, type_coerce, cast, func
from sqlalchemy.orm import sessionmaker
from sqlalchemy import Column, Integer, String, DateTime, ForeignKey, Boolean, CHAR, JSON, UniqueConstraint, Float
from sqlalchemy.ext.declarative import declarative_base
Expand Down Expand Up @@ -74,6 +73,9 @@ class CalibrationImage(Base):
good_until = Column(DateTime, default=datetime.datetime(3000, 1, 1))
good_after = Column(DateTime, default=datetime.datetime(1000, 1, 1))
attributes = Column(JSON)
blockid = Column(Integer, nullable=True)
proposal = Column(String(50), nullable=True)
public_date = Column(DateTime, nullable=True)


class Instrument(Base):
Expand Down Expand Up @@ -336,7 +338,8 @@ def cal_record_to_file_info(record):


def get_master_cal_record(image, calibration_type, master_selection_criteria, db_address,
use_only_older_calibrations=False):
use_only_older_calibrations=False, prefer_same_block=False, check_public=False,
prefer_same_proposal=False):
calibration_criteria = CalibrationImage.type == calibration_type.upper()
calibration_criteria &= CalibrationImage.instrument_id == image.instrument.id
calibration_criteria &= CalibrationImage.is_master.is_(True)
Expand All @@ -356,24 +359,22 @@ def get_master_cal_record(image, calibration_type, master_selection_criteria, db
calibration_criteria &= CalibrationImage.good_after <= image.dateobs
calibration_criteria &= CalibrationImage.good_until >= image.dateobs

calibration_image = None
with get_session(db_address=db_address) as db_session:
calibration_images = db_session.query(CalibrationImage).filter(calibration_criteria).all()

# Exit if no calibration file found
if len(calibration_images) == 0:
return None

# Find the closest date
date_deltas = np.abs(np.array([i.dateobs - image.dateobs for i in calibration_images]))
closest_calibration_image = calibration_images[np.argmin(date_deltas)]

return closest_calibration_image


def get_master_cal(image, calibration_type, master_selection_criteria, db_address,
use_only_older_calibrations=False):
return cal_record_to_file_info(get_master_cal_record(image, calibration_type, master_selection_criteria, db_address,
use_only_older_calibrations=use_only_older_calibrations))
if prefer_same_block:
block_criteria = CalibrationImage.blockid == image.blockid
image_filter = db_session.query(CalibrationImage).filter(calibration_criteria & block_criteria)
calibration_image = image_filter.order_by(func.abs(CalibrationImage.dateobs - image.dateobs)).first()
if calibration_image is None and prefer_same_proposal:
proposal_criteria = CalibrationImage.proposal == image.proposal
image_filter = db_session.query(CalibrationImage).filter(calibration_criteria & proposal_criteria)
calibration_image = image_filter.order_by(func.abs(CalibrationImage.dateobs - image.dateobs)).first()
if check_public:
calibration_criteria &= CalibrationImage.public_date <= datetime.datetime.utcnow()
if calibration_image is None:
image_filter = db_session.query(CalibrationImage).filter(calibration_criteria)
calibration_image = image_filter.order_by(func.abs(CalibrationImage.dateobs - image.dateobs)).first()
return calibration_image


def get_individual_cal_records(instrument, calibration_type, min_date: str, max_date: str, db_address: str,
Expand Down
25 changes: 25 additions & 0 deletions banzai/frames.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,31 @@ def requested_ccd_temperature(self):
def measured_ccd_temperature(self):
pass

@property
@abc.abstractmethod
def block_end_date(self):
pass

@property
@abc.abstractmethod
def proposal(self):
pass

@property
@abc.abstractmethod
def blockid(self):
pass

@property
@abc.abstractmethod
def public_date(self):
pass

@public_date.setter
@abc.abstractmethod
def public_date(self, value):
pass

@property
def data_type(self):
# Convert bytes to bits
Expand Down
42 changes: 35 additions & 7 deletions banzai/lco.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,30 @@ def dateobs(self):
def datecreated(self):
return Time(self.primary_hdu.meta.get('DATE'), scale='utc').datetime

@property
def block_end_date(self):
return Time(self.primary_hdu.meta.get('BLKEDATE'), scale='utc').datetime

@property
def proposal(self):
return self.primary_hdu.meta.get('PROPID')

@property
def blockid(self):
return self.primary_hdu.meta.get('BLKUID')

@property
def public_date(self):
pubdat = self.primary_hdu.meta.get('L1PUBDAT')
if pubdat is None:
return pubdat
else:
return Time(pubdat).datetime

@public_date.setter
def public_date(self, value: datetime.datetime):
self.primary_hdu.meta['L1PUBDAT'] = date_utils.date_obs_to_string(value), '[UTC] Date the frame becomes public'

@property
def configuration_mode(self):
mode = self.meta.get('CONFMODE', 'default')
Expand Down Expand Up @@ -112,13 +136,14 @@ def save_processing_metadata(self, context):

self.meta['PIPEVER'] = (context.PIPELINE_VERSION, 'Pipeline version')

if any(fnmatch(self.meta['PROPID'].lower(), public_proposal) for public_proposal in context.PUBLIC_PROPOSALS):
self.meta['L1PUBDAT'] = (self.meta['DATE-OBS'], '[UTC] Date the frame becomes public')
else:
# Wait to make public
date_observed = date_utils.parse_date_obs(self.meta['DATE-OBS'])
next_year = date_observed + datetime.timedelta(days=context.DATA_RELEASE_DELAY)
self.meta['L1PUBDAT'] = (date_utils.date_obs_to_string(next_year), '[UTC] Date the frame becomes public')
if self.public_date is None:
# Don't override the public date if it already exists
if any(fnmatch(self.meta['PROPID'].lower(), public_proposal) for public_proposal in context.PUBLIC_PROPOSALS):
self.public_date = self.dateobs
else:
# Wait to make public
next_year = self.dateobs + datetime.timedelta(days=context.DATA_RELEASE_DELAY)
self.public_date = next_year

def get_output_filename(self, runtime_context):
output_filename = self.filename.replace('00.fits', '{:02d}.fits'.format(int(runtime_context.reduction_level)))
Expand Down Expand Up @@ -171,6 +196,9 @@ def to_db_record(self, output_product):
'is_master': self.is_master,
'is_bad': self.is_bad,
'frameid': output_product.frame_id,
'blockid': self.blockid,
'proposal': self.proposal,
'public_date': self.public_date,
'attributes': {}}
for attribute in self.grouping_criteria:
record_attributes['attributes'][attribute] = str(getattr(self, attribute))
Expand Down
8 changes: 8 additions & 0 deletions banzai/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,14 @@ def parse_args(settings, extra_console_arguments=None, parser_description='Proce
help='Maximum number of times to try to process a frame')
parser.add_argument('--broker-url', dest='broker_url',
help='URL for the FITS broker service.')
parser.add_argument('--delay-to-block-end', dest='delay_to_block_end', default=False, action='store_true',
help='Delay real-time processing until after the block has ended')
parser.add_argument('--same-block-cals', dest='same_block_cals', default=False, action='store_true',
help='Prefer calibrations taken in the same block')
parser.add_argument('--check-public-cals', dest='check_public_cals', default=False, action='store_true',
help='Check to see if calibration frames are public before using them?')
parser.add_argument('--prefer-same-proposal', dest='prefer_same_proposal', default=False, action='store_true',
help='Prefer calibrations taken with the same proposal')

if extra_console_arguments is None:
extra_console_arguments = []
Expand Down
2 changes: 2 additions & 0 deletions banzai/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@
'banzai.utils.file_utils.ccdsum_to_filename',
'banzai.utils.file_utils.filter_to_filename')}

OBSTYPES_TO_DELAY = []

TELESCOPE_FILENAME_FUNCTION = 'banzai.utils.file_utils.telescope_to_filename'

OBSERVATION_PORTAL_URL = os.getenv('OBSERVATION_PORTAL_URL',
Expand Down
2 changes: 1 addition & 1 deletion banzai/tests/test_frames.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ def test_data_to_detector_section_full():

def test_propid_public():
proposal_ids = ['standard', 'Photometric standards', 'NRES standards', 'FLOYDS standards']
date_obs = '2021-09-01T00:00:00'
date_obs = '2021-09-01T00:00:00.000009'
test_data = [CCDData(np.zeros((1024, 1024)), meta={'PROPID': propid,
'DATE-OBS': date_obs}) for propid in proposal_ids]

Expand Down
18 changes: 12 additions & 6 deletions banzai/tests/test_need_to_process_image.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from banzai.tests.utils import FakeContext
from banzai.utils.realtime_utils import need_to_process_image
import datetime

md5_hash1 = '49a6bb35cdd3859224c0214310b1d9b6'
md5_hash2 = 'aec5ef355e7e43a59fedc88ac95caed6'
Expand All @@ -11,7 +12,7 @@


class FakeRealtimeImage(object):
def __init__(self, success=False, checksum=md5_hash1, tries=0):
def __init__(self, success=False, checksum=md5_hash1, tries=0, block_end_date=None):
self.success = success
self.checksum = checksum
self.tries = tries
Expand All @@ -22,10 +23,11 @@ def __init__(self, success=False, checksum=md5_hash1, tries=0):
@mock.patch('banzai.utils.fits_utils.get_primary_header')
@mock.patch('banzai.utils.image_utils.image_can_be_processed')
def test_no_processing_if_previous_success(mock_can_process, mock_header, mock_processed, mock_md5):
mock_task = mock.MagicMock()
mock_can_process.return_value = True
mock_processed.return_value = FakeRealtimeImage(success=True, checksum=md5_hash1)
mock_md5.return_value = md5_hash1
assert not need_to_process_image({'path':'test.fits'}, FakeContext())
assert not need_to_process_image({'path': 'test.fits'}, FakeContext(), mock_task)


@mock.patch('banzai.dbs.commit_processed_image')
Expand All @@ -34,10 +36,11 @@ def test_no_processing_if_previous_success(mock_can_process, mock_header, mock_p
@mock.patch('banzai.utils.fits_utils.get_primary_header')
@mock.patch('banzai.utils.image_utils.image_can_be_processed')
def test_do_process_if_never_tried(mock_can_process, mock_header, mock_processed, mock_md5, mock_commit):
mock_task = mock.MagicMock()
mock_can_process.return_value = True
mock_processed.return_value = FakeRealtimeImage(success=False, checksum=md5_hash1, tries=0)
mock_md5.return_value = md5_hash1
assert need_to_process_image({'path':'test.fits'}, FakeContext())
assert need_to_process_image({'path': 'test.fits'}, FakeContext(), mock_task)


@mock.patch('banzai.dbs.commit_processed_image')
Expand All @@ -46,12 +49,13 @@ def test_do_process_if_never_tried(mock_can_process, mock_header, mock_processed
@mock.patch('banzai.utils.fits_utils.get_primary_header')
@mock.patch('banzai.utils.image_utils.image_can_be_processed')
def test_do_process_if_tries_less_than_max(mock_can_process, mock_header, mock_processed, mock_md5, mock_commit):
mock_task = mock.MagicMock()
mock_can_process.return_value = True
mock_processed.return_value = FakeRealtimeImage(success=False, checksum=md5_hash1, tries=3)
mock_md5.return_value = md5_hash1
context = FakeContext()
context.max_tries = 5
assert need_to_process_image({'path':'test.fits'}, context)
assert need_to_process_image({'path': 'test.fits'}, context, mock_task)


@mock.patch('banzai.dbs.commit_processed_image')
Expand All @@ -60,13 +64,14 @@ def test_do_process_if_tries_less_than_max(mock_can_process, mock_header, mock_p
@mock.patch('banzai.utils.fits_utils.get_primary_header')
@mock.patch('banzai.utils.image_utils.image_can_be_processed')
def test_no_processing_if_tries_at_max(mock_can_process, mock_header, mock_processed, mock_md5, mock_commit):
mock_task = mock.MagicMock()
mock_can_process.return_value = True
max_tries = 5
mock_processed.return_value = FakeRealtimeImage(success=False, checksum=md5_hash1, tries=max_tries)
mock_md5.return_value = md5_hash1
context = FakeContext()
context.max_tries = max_tries
assert not need_to_process_image({'path':'test.fits'}, context)
assert not need_to_process_image({'path': 'test.fits'}, context, mock_task)


@mock.patch('banzai.dbs.commit_processed_image')
Expand All @@ -76,11 +81,12 @@ def test_no_processing_if_tries_at_max(mock_can_process, mock_header, mock_proce
@mock.patch('banzai.utils.image_utils.image_can_be_processed')
def test_do_process_if_new_checksum(mock_can_process, mock_header, mock_processed, mock_md5, mock_commit):
# assert that tries and success are reset to 0
mock_task = mock.MagicMock()
image = FakeRealtimeImage(success=True, checksum=md5_hash1, tries=3)
mock_can_process.return_value = True
mock_processed.return_value = image
mock_md5.return_value = md5_hash2
assert need_to_process_image({'path': 'test.fits'}, FakeContext())
assert need_to_process_image({'path': 'test.fits'}, FakeContext(), mock_task)
assert not image.success
assert image.tries == 0
assert image.checksum == md5_hash2
10 changes: 9 additions & 1 deletion banzai/utils/realtime_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from banzai.utils import file_utils, import_utils, image_utils
from banzai.data import HeaderOnly
from banzai import logs
import datetime


logger = logs.get_logger()

Expand All @@ -22,7 +24,7 @@ def increment_try_number(path, db_address):
dbs.commit_processed_image(image, db_address=db_address)


def need_to_process_image(file_info, context):
def need_to_process_image(file_info, context, task):
"""
Figure out if we need to try to make a process a given file.
Expand Down Expand Up @@ -108,6 +110,12 @@ def need_to_process_image(file_info, context):
msg = 'The header in this queue message appears to not be complete enough to make a Frame object'
logger.error(msg, extra_tags={'filename': filename})
need_to_process = False
if context.delay_to_block_end and test_image.obstype in context.OBSTYPES_TO_DELAY:
if datetime.datetime.now() < test_image.block_end_date:
logger.info('Observing Block in progress. Retrying 5 minutes after it completes',
extra_tags={'filename': filename})
delay = test_image.block_end_date - datetime.datetime.now() + datetime.timedelta(minutes=5)
task.retry(countdown=delay.total_seconds())
except Exception:
logger.error('Issue creating Image object with given queue message', extra_tags={"filename": filename})
logger.error(logs.format_exception())
Expand Down

0 comments on commit 6400d35

Please sign in to comment.