diff --git a/CHANGES.md b/CHANGES.md index d3be4c8c..139b6472 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,9 @@ +1.20.0 (2024-12-11) +------------------- +- Added functionality to delay processing until after the end of the observing block +- Added the ability to only use public calibrations +- Added the ability to prefer calibrations taken within the same block or with the same proposal + 1.19.1 (2024-11-05) ------------------- - Added extra logging and try excepts to catch frames that bypass silently diff --git a/banzai/calibrations.py b/banzai/calibrations.py index 7578f34f..2909691a 100644 --- a/banzai/calibrations.py +++ b/banzai/calibrations.py @@ -137,9 +137,14 @@ def apply_master_calibration(self, image, master_calibration_image): pass def get_calibration_file_info(self, image): - return dbs.get_master_cal(image, self.calibration_type, self.master_selection_criteria, - use_only_older_calibrations=self.runtime_context.use_only_older_calibrations, - db_address=self.runtime_context.db_address) + return dbs.cal_record_to_file_info( + dbs.get_master_cal_record(image, self.calibration_type, self.master_selection_criteria, + self.runtime_context.db_address, + use_only_older_calibrations=self.runtime_context.use_only_older_calibrations, + prefer_same_block_cals=self.runtime_context.same_block_cals, + prefer_same_proposal_cals=self.runtime_context.prefer_same_proposal_cals, + check_public_cals=self.runtime_context.check_public_cals) + ) class CalibrationComparer(CalibrationUser): diff --git a/banzai/celery.py b/banzai/celery.py index 28637130..bdda5e06 100644 --- a/banzai/celery.py +++ b/banzai/celery.py @@ -4,7 +4,7 @@ from celery import Celery from kombu import Queue - +from celery.exceptions import Retry from banzai import dbs, calibrations, logs from banzai.utils import date_utils, realtime_utils, stage_utils from celery.signals import worker_process_init @@ -174,16 +174,16 @@ def stack_calibrations(self, min_date: str, max_date: str, instrument_id: int, f raise self.retry() -@app.task(name='celery.process_image', reject_on_worker_lost=True, max_retries=5) -def process_image(file_info: dict, runtime_context: dict): +@app.task(name='celery.process_image', bind=True, reject_on_worker_lost=True, max_retries=5) +def process_image(self, file_info: dict, runtime_context: dict): """ :param file_info: Body of queue message: dict :param runtime_context: Context object with runtime environment info """ - logger.info('Processing frame', extra_tags={'filename': file_info.get('filename')}) - runtime_context = Context(runtime_context) try: - if realtime_utils.need_to_process_image(file_info, runtime_context): + logger.info('Processing frame', extra_tags={'filename': file_info.get('filename')}) + runtime_context = Context(runtime_context) + if realtime_utils.need_to_process_image(file_info, runtime_context, self): if 'path' in file_info: filename = os.path.basename(file_info['path']) else: @@ -193,6 +193,8 @@ def process_image(file_info: dict, runtime_context: dict): realtime_utils.increment_try_number(filename, db_address=runtime_context.db_address) stage_utils.run_pipeline_stages([file_info], runtime_context) realtime_utils.set_file_as_processed(filename, db_address=runtime_context.db_address) + except Retry: + raise except Exception: logger.error("Exception processing frame: {error}".format(error=logs.format_exception()), extra_tags={'file_info': file_info}) diff --git a/banzai/dbs.py b/banzai/dbs.py index 6786aa56..ca8bbacd 100755 --- a/banzai/dbs.py +++ b/banzai/dbs.py @@ -10,9 +10,8 @@ import os.path import datetime from dateutil.parser import parse -import numpy as np import requests -from sqlalchemy import create_engine, pool, type_coerce, cast +from sqlalchemy import create_engine, pool, type_coerce, cast, func from sqlalchemy.orm import sessionmaker from sqlalchemy import Column, Integer, String, DateTime, ForeignKey, Boolean, CHAR, JSON, UniqueConstraint, Float from sqlalchemy.ext.declarative import declarative_base @@ -74,6 +73,9 @@ class CalibrationImage(Base): good_until = Column(DateTime, default=datetime.datetime(3000, 1, 1)) good_after = Column(DateTime, default=datetime.datetime(1000, 1, 1)) attributes = Column(JSON) + blockid = Column(Integer, nullable=True) + proposal = Column(String(50), nullable=True) + public_date = Column(DateTime, nullable=True) class Instrument(Base): @@ -336,7 +338,8 @@ def cal_record_to_file_info(record): def get_master_cal_record(image, calibration_type, master_selection_criteria, db_address, - use_only_older_calibrations=False): + use_only_older_calibrations=False, prefer_same_block_cals=False, check_public_cals=False, + prefer_same_proposal_cals=False): calibration_criteria = CalibrationImage.type == calibration_type.upper() calibration_criteria &= CalibrationImage.instrument_id == image.instrument.id calibration_criteria &= CalibrationImage.is_master.is_(True) @@ -356,24 +359,22 @@ def get_master_cal_record(image, calibration_type, master_selection_criteria, db calibration_criteria &= CalibrationImage.good_after <= image.dateobs calibration_criteria &= CalibrationImage.good_until >= image.dateobs + calibration_image = None with get_session(db_address=db_address) as db_session: - calibration_images = db_session.query(CalibrationImage).filter(calibration_criteria).all() - - # Exit if no calibration file found - if len(calibration_images) == 0: - return None - - # Find the closest date - date_deltas = np.abs(np.array([i.dateobs - image.dateobs for i in calibration_images])) - closest_calibration_image = calibration_images[np.argmin(date_deltas)] - - return closest_calibration_image - - -def get_master_cal(image, calibration_type, master_selection_criteria, db_address, - use_only_older_calibrations=False): - return cal_record_to_file_info(get_master_cal_record(image, calibration_type, master_selection_criteria, db_address, - use_only_older_calibrations=use_only_older_calibrations)) + if prefer_same_block_cals: + block_criteria = CalibrationImage.blockid == image.blockid + image_filter = db_session.query(CalibrationImage).filter(calibration_criteria & block_criteria) + calibration_image = image_filter.order_by(func.abs(CalibrationImage.dateobs - image.dateobs)).first() + if calibration_image is None and prefer_same_proposal_cals: + proposal_criteria = CalibrationImage.proposal == image.proposal + image_filter = db_session.query(CalibrationImage).filter(calibration_criteria & proposal_criteria) + calibration_image = image_filter.order_by(func.abs(CalibrationImage.dateobs - image.dateobs)).first() + if check_public_cals: + calibration_criteria &= CalibrationImage.public_date <= datetime.datetime.now(datetime.UTC) + if calibration_image is None: + image_filter = db_session.query(CalibrationImage).filter(calibration_criteria) + calibration_image = image_filter.order_by(func.abs(CalibrationImage.dateobs - image.dateobs)).first() + return calibration_image def get_individual_cal_records(instrument, calibration_type, min_date: str, max_date: str, db_address: str, diff --git a/banzai/frames.py b/banzai/frames.py index 47543aa5..fe5b7929 100644 --- a/banzai/frames.py +++ b/banzai/frames.py @@ -165,6 +165,31 @@ def requested_ccd_temperature(self): def measured_ccd_temperature(self): pass + @property + @abc.abstractmethod + def block_end_date(self): + pass + + @property + @abc.abstractmethod + def proposal(self): + pass + + @property + @abc.abstractmethod + def blockid(self): + pass + + @property + @abc.abstractmethod + def public_date(self): + pass + + @public_date.setter + @abc.abstractmethod + def public_date(self, value): + pass + @property def data_type(self): # Convert bytes to bits diff --git a/banzai/lco.py b/banzai/lco.py index 67447d57..57a2418b 100644 --- a/banzai/lco.py +++ b/banzai/lco.py @@ -62,6 +62,30 @@ def dateobs(self): def datecreated(self): return Time(self.primary_hdu.meta.get('DATE'), scale='utc').datetime + @property + def block_end_date(self): + return Time(self.primary_hdu.meta.get('BLKEDATE'), scale='utc').datetime + + @property + def proposal(self): + return self.primary_hdu.meta.get('PROPID') + + @property + def blockid(self): + return self.primary_hdu.meta.get('BLKUID') + + @property + def public_date(self): + pubdat = self.primary_hdu.meta.get('L1PUBDAT') + if pubdat is None: + return pubdat + else: + return Time(pubdat).datetime + + @public_date.setter + def public_date(self, value: datetime.datetime): + self.primary_hdu.meta['L1PUBDAT'] = date_utils.date_obs_to_string(value), '[UTC] Date the frame becomes public' + @property def configuration_mode(self): mode = self.meta.get('CONFMODE', 'default') @@ -112,13 +136,14 @@ def save_processing_metadata(self, context): self.meta['PIPEVER'] = (context.PIPELINE_VERSION, 'Pipeline version') - if any(fnmatch(self.meta['PROPID'].lower(), public_proposal) for public_proposal in context.PUBLIC_PROPOSALS): - self.meta['L1PUBDAT'] = (self.meta['DATE-OBS'], '[UTC] Date the frame becomes public') - else: - # Wait to make public - date_observed = date_utils.parse_date_obs(self.meta['DATE-OBS']) - next_year = date_observed + datetime.timedelta(days=context.DATA_RELEASE_DELAY) - self.meta['L1PUBDAT'] = (date_utils.date_obs_to_string(next_year), '[UTC] Date the frame becomes public') + if self.public_date is None: + # Don't override the public date if it already exists + if any(fnmatch(self.meta['PROPID'].lower(), public_proposal) for public_proposal in context.PUBLIC_PROPOSALS): + self.public_date = self.dateobs + else: + # Wait to make public + next_year = self.dateobs + datetime.timedelta(days=context.DATA_RELEASE_DELAY) + self.public_date = next_year def get_output_filename(self, runtime_context): output_filename = self.filename.replace('00.fits', '{:02d}.fits'.format(int(runtime_context.reduction_level))) @@ -171,6 +196,9 @@ def to_db_record(self, output_product): 'is_master': self.is_master, 'is_bad': self.is_bad, 'frameid': output_product.frame_id, + 'blockid': self.blockid, + 'proposal': self.proposal, + 'public_date': self.public_date, 'attributes': {}} for attribute in self.grouping_criteria: record_attributes['attributes'][attribute] = str(getattr(self, attribute)) diff --git a/banzai/main.py b/banzai/main.py index bd1c1fd5..a1e12145 100755 --- a/banzai/main.py +++ b/banzai/main.py @@ -118,6 +118,15 @@ def parse_args(settings, extra_console_arguments=None, parser_description='Proce help='Maximum number of times to try to process a frame') parser.add_argument('--broker-url', dest='broker_url', help='URL for the FITS broker service.') + parser.add_argument('--delay-to-block-end', dest='delay_to_block_end', default=False, action='store_true', + help='Delay real-time processing until after the block has ended') + parser.add_argument('--same-block-cals', dest='same_block_cals', default=False, action='store_true', + help='Prefer calibrations taken in the same block') + parser.add_argument('--check-public-cals', dest='check_public_cals', default=False, action='store_true', + help='Check to see if calibration frames are public before using them?') + parser.add_argument('--prefer-same-proposal-cals', dest='prefer_same_proposal_cals', + default=False, action='store_true', + help='Prefer calibrations taken with the same proposal') if extra_console_arguments is None: extra_console_arguments = [] diff --git a/banzai/settings.py b/banzai/settings.py index 7123c5b9..7f3567bb 100644 --- a/banzai/settings.py +++ b/banzai/settings.py @@ -90,6 +90,8 @@ 'banzai.utils.file_utils.ccdsum_to_filename', 'banzai.utils.file_utils.filter_to_filename')} +OBSTYPES_TO_DELAY = [] + TELESCOPE_FILENAME_FUNCTION = 'banzai.utils.file_utils.telescope_to_filename' OBSERVATION_PORTAL_URL = os.getenv('OBSERVATION_PORTAL_URL', diff --git a/banzai/tests/test_frames.py b/banzai/tests/test_frames.py index e66ae92f..0a22d6cf 100644 --- a/banzai/tests/test_frames.py +++ b/banzai/tests/test_frames.py @@ -384,7 +384,7 @@ def test_data_to_detector_section_full(): def test_propid_public(): proposal_ids = ['standard', 'Photometric standards', 'NRES standards', 'FLOYDS standards'] - date_obs = '2021-09-01T00:00:00' + date_obs = '2021-09-01T00:00:00.000009' test_data = [CCDData(np.zeros((1024, 1024)), meta={'PROPID': propid, 'DATE-OBS': date_obs}) for propid in proposal_ids] diff --git a/banzai/tests/test_need_to_process_image.py b/banzai/tests/test_need_to_process_image.py index 3d56fc89..f65ea72e 100644 --- a/banzai/tests/test_need_to_process_image.py +++ b/banzai/tests/test_need_to_process_image.py @@ -3,6 +3,7 @@ from banzai.tests.utils import FakeContext from banzai.utils.realtime_utils import need_to_process_image +import datetime md5_hash1 = '49a6bb35cdd3859224c0214310b1d9b6' md5_hash2 = 'aec5ef355e7e43a59fedc88ac95caed6' @@ -11,7 +12,7 @@ class FakeRealtimeImage(object): - def __init__(self, success=False, checksum=md5_hash1, tries=0): + def __init__(self, success=False, checksum=md5_hash1, tries=0, block_end_date=None): self.success = success self.checksum = checksum self.tries = tries @@ -22,10 +23,11 @@ def __init__(self, success=False, checksum=md5_hash1, tries=0): @mock.patch('banzai.utils.fits_utils.get_primary_header') @mock.patch('banzai.utils.image_utils.image_can_be_processed') def test_no_processing_if_previous_success(mock_can_process, mock_header, mock_processed, mock_md5): + mock_task = mock.MagicMock() mock_can_process.return_value = True mock_processed.return_value = FakeRealtimeImage(success=True, checksum=md5_hash1) mock_md5.return_value = md5_hash1 - assert not need_to_process_image({'path':'test.fits'}, FakeContext()) + assert not need_to_process_image({'path': 'test.fits'}, FakeContext(), mock_task) @mock.patch('banzai.dbs.commit_processed_image') @@ -34,10 +36,11 @@ def test_no_processing_if_previous_success(mock_can_process, mock_header, mock_p @mock.patch('banzai.utils.fits_utils.get_primary_header') @mock.patch('banzai.utils.image_utils.image_can_be_processed') def test_do_process_if_never_tried(mock_can_process, mock_header, mock_processed, mock_md5, mock_commit): + mock_task = mock.MagicMock() mock_can_process.return_value = True mock_processed.return_value = FakeRealtimeImage(success=False, checksum=md5_hash1, tries=0) mock_md5.return_value = md5_hash1 - assert need_to_process_image({'path':'test.fits'}, FakeContext()) + assert need_to_process_image({'path': 'test.fits'}, FakeContext(), mock_task) @mock.patch('banzai.dbs.commit_processed_image') @@ -46,12 +49,13 @@ def test_do_process_if_never_tried(mock_can_process, mock_header, mock_processed @mock.patch('banzai.utils.fits_utils.get_primary_header') @mock.patch('banzai.utils.image_utils.image_can_be_processed') def test_do_process_if_tries_less_than_max(mock_can_process, mock_header, mock_processed, mock_md5, mock_commit): + mock_task = mock.MagicMock() mock_can_process.return_value = True mock_processed.return_value = FakeRealtimeImage(success=False, checksum=md5_hash1, tries=3) mock_md5.return_value = md5_hash1 context = FakeContext() context.max_tries = 5 - assert need_to_process_image({'path':'test.fits'}, context) + assert need_to_process_image({'path': 'test.fits'}, context, mock_task) @mock.patch('banzai.dbs.commit_processed_image') @@ -60,13 +64,14 @@ def test_do_process_if_tries_less_than_max(mock_can_process, mock_header, mock_p @mock.patch('banzai.utils.fits_utils.get_primary_header') @mock.patch('banzai.utils.image_utils.image_can_be_processed') def test_no_processing_if_tries_at_max(mock_can_process, mock_header, mock_processed, mock_md5, mock_commit): + mock_task = mock.MagicMock() mock_can_process.return_value = True max_tries = 5 mock_processed.return_value = FakeRealtimeImage(success=False, checksum=md5_hash1, tries=max_tries) mock_md5.return_value = md5_hash1 context = FakeContext() context.max_tries = max_tries - assert not need_to_process_image({'path':'test.fits'}, context) + assert not need_to_process_image({'path': 'test.fits'}, context, mock_task) @mock.patch('banzai.dbs.commit_processed_image') @@ -76,11 +81,12 @@ def test_no_processing_if_tries_at_max(mock_can_process, mock_header, mock_proce @mock.patch('banzai.utils.image_utils.image_can_be_processed') def test_do_process_if_new_checksum(mock_can_process, mock_header, mock_processed, mock_md5, mock_commit): # assert that tries and success are reset to 0 + mock_task = mock.MagicMock() image = FakeRealtimeImage(success=True, checksum=md5_hash1, tries=3) mock_can_process.return_value = True mock_processed.return_value = image mock_md5.return_value = md5_hash2 - assert need_to_process_image({'path': 'test.fits'}, FakeContext()) + assert need_to_process_image({'path': 'test.fits'}, FakeContext(), mock_task) assert not image.success assert image.tries == 0 assert image.checksum == md5_hash2 diff --git a/banzai/utils/realtime_utils.py b/banzai/utils/realtime_utils.py index 2f6ca001..f920c2dd 100644 --- a/banzai/utils/realtime_utils.py +++ b/banzai/utils/realtime_utils.py @@ -4,6 +4,8 @@ from banzai.utils import file_utils, import_utils, image_utils from banzai.data import HeaderOnly from banzai import logs +import datetime + logger = logs.get_logger() @@ -22,7 +24,7 @@ def increment_try_number(path, db_address): dbs.commit_processed_image(image, db_address=db_address) -def need_to_process_image(file_info, context): +def need_to_process_image(file_info, context, task): """ Figure out if we need to try to make a process a given file. @@ -108,6 +110,12 @@ def need_to_process_image(file_info, context): msg = 'The header in this queue message appears to not be complete enough to make a Frame object' logger.error(msg, extra_tags={'filename': filename}) need_to_process = False + if context.delay_to_block_end and test_image.obstype in context.OBSTYPES_TO_DELAY: + if datetime.datetime.now() < test_image.block_end_date: + logger.info('Observing Block in progress. Retrying 5 minutes after it completes', + extra_tags={'filename': filename}) + delay = test_image.block_end_date - datetime.datetime.now() + datetime.timedelta(minutes=5) + task.retry(countdown=delay.total_seconds()) except Exception: logger.error('Issue creating Image object with given queue message', extra_tags={"filename": filename}) logger.error(logs.format_exception())