Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added delay to trigger after block end and to prefer public/same block/same proposal calibrations. #403

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
1.20.0 (2024-12-11)
-------------------
- Added functionality to delay processing until after the end of the observing block
- Added the ability to only use public calibrations
- Added the ability to prefer calibrations taken within the same block or with the same proposal

1.19.1 (2024-11-05)
-------------------
- Added extra logging and try excepts to catch frames that bypass silently
Expand Down
11 changes: 8 additions & 3 deletions banzai/calibrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,14 @@ def apply_master_calibration(self, image, master_calibration_image):
pass

def get_calibration_file_info(self, image):
return dbs.get_master_cal(image, self.calibration_type, self.master_selection_criteria,
use_only_older_calibrations=self.runtime_context.use_only_older_calibrations,
db_address=self.runtime_context.db_address)
return dbs.cal_record_to_file_info(
dbs.get_master_cal_record(image, self.calibration_type, self.master_selection_criteria,
self.runtime_context.db_address,
use_only_older_calibrations=self.runtime_context.use_only_older_calibrations,
prefer_same_block=self.runtime_context.same_block_cals,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we keep the names of the kwargs consistent with the runtime context attributes?

prefer_same_proposal=self.runtime_context.prefer_same_proposal,
check_public=self.runtime_context.check_public_cals)
)


class CalibrationComparer(CalibrationUser):
Expand Down
12 changes: 7 additions & 5 deletions banzai/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from celery import Celery
from kombu import Queue

from celery.exceptions import Retry
from banzai import dbs, calibrations, logs
from banzai.utils import date_utils, realtime_utils, stage_utils
from celery.signals import worker_process_init
Expand Down Expand Up @@ -174,15 +174,15 @@ def stack_calibrations(self, min_date: str, max_date: str, instrument_id: int, f
raise self.retry()


@app.task(name='celery.process_image', reject_on_worker_lost=True, max_retries=5)
def process_image(file_info: dict, runtime_context: dict):
@app.task(name='celery.process_image', bind=True, reject_on_worker_lost=True, max_retries=5)
def process_image(self, file_info: dict, runtime_context: dict):
"""
:param file_info: Body of queue message: dict
:param runtime_context: Context object with runtime environment info
"""
logger.info('Processing frame', extra_tags={'filename': file_info.get('filename')})
runtime_context = Context(runtime_context)
try:
logger.info('Processing frame', extra_tags={'filename': file_info.get('filename')})
runtime_context = Context(runtime_context)
if realtime_utils.need_to_process_image(file_info, runtime_context):
if 'path' in file_info:
filename = os.path.basename(file_info['path'])
Expand All @@ -193,6 +193,8 @@ def process_image(file_info: dict, runtime_context: dict):
realtime_utils.increment_try_number(filename, db_address=runtime_context.db_address)
stage_utils.run_pipeline_stages([file_info], runtime_context)
realtime_utils.set_file_as_processed(filename, db_address=runtime_context.db_address)
except Retry:
raise
except Exception:
logger.error("Exception processing frame: {error}".format(error=logs.format_exception()),
extra_tags={'file_info': file_info})
41 changes: 21 additions & 20 deletions banzai/dbs.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@
import os.path
import datetime
from dateutil.parser import parse
import numpy as np
import requests
from sqlalchemy import create_engine, pool, type_coerce, cast
from sqlalchemy import create_engine, pool, type_coerce, cast, func
from sqlalchemy.orm import sessionmaker
from sqlalchemy import Column, Integer, String, DateTime, ForeignKey, Boolean, CHAR, JSON, UniqueConstraint, Float
from sqlalchemy.ext.declarative import declarative_base
Expand Down Expand Up @@ -74,6 +73,9 @@ class CalibrationImage(Base):
good_until = Column(DateTime, default=datetime.datetime(3000, 1, 1))
good_after = Column(DateTime, default=datetime.datetime(1000, 1, 1))
attributes = Column(JSON)
blockid = Column(Integer, nullable=True)
proposal = Column(String(50), nullable=True)
public_date = Column(DateTime, nullable=True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll need to do 3 DB migrations here - one for each DB - BANZAI, B-NRES, B-Floyds. Just need to keep that in mind.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the future: alembic is a good solution for this sort of thing.

https://alembic.sqlalchemy.org/en/latest/



class Instrument(Base):
Expand Down Expand Up @@ -336,7 +338,8 @@ def cal_record_to_file_info(record):


def get_master_cal_record(image, calibration_type, master_selection_criteria, db_address,
use_only_older_calibrations=False):
use_only_older_calibrations=False, prefer_same_block=False, check_public=False,
prefer_same_proposal=False):
calibration_criteria = CalibrationImage.type == calibration_type.upper()
calibration_criteria &= CalibrationImage.instrument_id == image.instrument.id
calibration_criteria &= CalibrationImage.is_master.is_(True)
Expand All @@ -356,24 +359,22 @@ def get_master_cal_record(image, calibration_type, master_selection_criteria, db
calibration_criteria &= CalibrationImage.good_after <= image.dateobs
calibration_criteria &= CalibrationImage.good_until >= image.dateobs

calibration_image = None
with get_session(db_address=db_address) as db_session:
calibration_images = db_session.query(CalibrationImage).filter(calibration_criteria).all()

# Exit if no calibration file found
if len(calibration_images) == 0:
return None

# Find the closest date
date_deltas = np.abs(np.array([i.dateobs - image.dateobs for i in calibration_images]))
closest_calibration_image = calibration_images[np.argmin(date_deltas)]

return closest_calibration_image


def get_master_cal(image, calibration_type, master_selection_criteria, db_address,
use_only_older_calibrations=False):
return cal_record_to_file_info(get_master_cal_record(image, calibration_type, master_selection_criteria, db_address,
use_only_older_calibrations=use_only_older_calibrations))
if prefer_same_block:
block_criteria = CalibrationImage.blockid == image.blockid
image_filter = db_session.query(CalibrationImage).filter(calibration_criteria & block_criteria)
calibration_image = image_filter.order_by(func.abs(CalibrationImage.dateobs - image.dateobs)).first()
if calibration_image is None and prefer_same_proposal:
proposal_criteria = CalibrationImage.proposal == image.proposal
image_filter = db_session.query(CalibrationImage).filter(calibration_criteria & proposal_criteria)
calibration_image = image_filter.order_by(func.abs(CalibrationImage.dateobs - image.dateobs)).first()
if check_public:
calibration_criteria &= CalibrationImage.public_date <= datetime.datetime.utcnow()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

utcnow() is deprecated - suggested replacement is now datetime.datetime.now(datetime.UTC)

if calibration_image is None:
image_filter = db_session.query(CalibrationImage).filter(calibration_criteria)
calibration_image = image_filter.order_by(func.abs(CalibrationImage.dateobs - image.dateobs)).first()
return calibration_image


def get_individual_cal_records(instrument, calibration_type, min_date: str, max_date: str, db_address: str,
Expand Down
25 changes: 25 additions & 0 deletions banzai/frames.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,31 @@ def requested_ccd_temperature(self):
def measured_ccd_temperature(self):
pass

@property
@abc.abstractmethod
def block_end_date(self):
pass

@property
@abc.abstractmethod
def proposal(self):
pass

@property
@abc.abstractmethod
def blockid(self):
pass

@property
@abc.abstractmethod
def public_date(self):
pass

@public_date.setter
@abc.abstractmethod
def public_date(self, value):
pass

@property
def data_type(self):
# Convert bytes to bits
Expand Down
42 changes: 35 additions & 7 deletions banzai/lco.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,30 @@ def dateobs(self):
def datecreated(self):
return Time(self.primary_hdu.meta.get('DATE'), scale='utc').datetime

@property
def block_end_date(self):
return Time(self.primary_hdu.meta.get('BLKEDATE'), scale='utc').datetime

@property
def proposal(self):
return self.primary_hdu.meta.get('PROPID')

@property
def blockid(self):
return self.primary_hdu.meta.get('BLKUID')

@property
def public_date(self):
pubdat = self.primary_hdu.meta.get('L1PUBDAT')
if pubdat is None:
return pubdat
else:
return Time(pubdat).datetime

@public_date.setter
def public_date(self, value: datetime.datetime):
self.primary_hdu.meta['L1PUBDAT'] = date_utils.date_obs_to_string(value), '[UTC] Date the frame becomes public'

@property
def configuration_mode(self):
mode = self.meta.get('CONFMODE', 'default')
Expand Down Expand Up @@ -112,13 +136,14 @@ def save_processing_metadata(self, context):

self.meta['PIPEVER'] = (context.PIPELINE_VERSION, 'Pipeline version')

if any(fnmatch(self.meta['PROPID'].lower(), public_proposal) for public_proposal in context.PUBLIC_PROPOSALS):
self.meta['L1PUBDAT'] = (self.meta['DATE-OBS'], '[UTC] Date the frame becomes public')
else:
# Wait to make public
date_observed = date_utils.parse_date_obs(self.meta['DATE-OBS'])
next_year = date_observed + datetime.timedelta(days=context.DATA_RELEASE_DELAY)
self.meta['L1PUBDAT'] = (date_utils.date_obs_to_string(next_year), '[UTC] Date the frame becomes public')
if self.public_date is None:
# Don't override the public date if it already exists
if any(fnmatch(self.meta['PROPID'].lower(), public_proposal) for public_proposal in context.PUBLIC_PROPOSALS):
self.public_date = self.dateobs
else:
# Wait to make public
next_year = self.dateobs + datetime.timedelta(days=context.DATA_RELEASE_DELAY)
self.public_date = next_year

def get_output_filename(self, runtime_context):
output_filename = self.filename.replace('00.fits', '{:02d}.fits'.format(int(runtime_context.reduction_level)))
Expand Down Expand Up @@ -171,6 +196,9 @@ def to_db_record(self, output_product):
'is_master': self.is_master,
'is_bad': self.is_bad,
'frameid': output_product.frame_id,
'blockid': self.blockid,
'proposal': self.proposal,
'public_date': self.public_date,
'attributes': {}}
for attribute in self.grouping_criteria:
record_attributes['attributes'][attribute] = str(getattr(self, attribute))
Expand Down
8 changes: 8 additions & 0 deletions banzai/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,14 @@ def parse_args(settings, extra_console_arguments=None, parser_description='Proce
help='Maximum number of times to try to process a frame')
parser.add_argument('--broker-url', dest='broker_url',
help='URL for the FITS broker service.')
parser.add_argument('--delay-to-block-end', dest='delay_to_block_end', default=False, action='store_true',
help='Delay real-time processing until after the block has ended')
parser.add_argument('--same-block-cals', dest='same_block_cals', default=False, action='store_true',
help='Prefer calibrations taken in the same block')
parser.add_argument('--check-public-cals', dest='check_public_cals', default=False, action='store_true',
help='Check to see if calibration frames are public before using them?')
parser.add_argument('--prefer-same-proposal', dest='prefer_same_proposal', default=False, action='store_true',
help='Prefer calibrations taken with the same proposal')

if extra_console_arguments is None:
extra_console_arguments = []
Expand Down
2 changes: 2 additions & 0 deletions banzai/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@
'banzai.utils.file_utils.ccdsum_to_filename',
'banzai.utils.file_utils.filter_to_filename')}

OBSTYPES_TO_DELAY = []

TELESCOPE_FILENAME_FUNCTION = 'banzai.utils.file_utils.telescope_to_filename'

OBSERVATION_PORTAL_URL = os.getenv('OBSERVATION_PORTAL_URL',
Expand Down
2 changes: 1 addition & 1 deletion banzai/tests/test_frames.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ def test_data_to_detector_section_full():

def test_propid_public():
proposal_ids = ['standard', 'Photometric standards', 'NRES standards', 'FLOYDS standards']
date_obs = '2021-09-01T00:00:00'
date_obs = '2021-09-01T00:00:00.000009'
test_data = [CCDData(np.zeros((1024, 1024)), meta={'PROPID': propid,
'DATE-OBS': date_obs}) for propid in proposal_ids]

Expand Down
18 changes: 12 additions & 6 deletions banzai/tests/test_need_to_process_image.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from banzai.tests.utils import FakeContext
from banzai.utils.realtime_utils import need_to_process_image
import datetime

md5_hash1 = '49a6bb35cdd3859224c0214310b1d9b6'
md5_hash2 = 'aec5ef355e7e43a59fedc88ac95caed6'
Expand All @@ -11,7 +12,7 @@


class FakeRealtimeImage(object):
def __init__(self, success=False, checksum=md5_hash1, tries=0):
def __init__(self, success=False, checksum=md5_hash1, tries=0, block_end_date=None):
self.success = success
self.checksum = checksum
self.tries = tries
Expand All @@ -22,10 +23,11 @@ def __init__(self, success=False, checksum=md5_hash1, tries=0):
@mock.patch('banzai.utils.fits_utils.get_primary_header')
@mock.patch('banzai.utils.image_utils.image_can_be_processed')
def test_no_processing_if_previous_success(mock_can_process, mock_header, mock_processed, mock_md5):
mock_task = mock.MagicMock()
mock_can_process.return_value = True
mock_processed.return_value = FakeRealtimeImage(success=True, checksum=md5_hash1)
mock_md5.return_value = md5_hash1
assert not need_to_process_image({'path':'test.fits'}, FakeContext())
assert not need_to_process_image({'path': 'test.fits'}, FakeContext(), mock_task)


@mock.patch('banzai.dbs.commit_processed_image')
Expand All @@ -34,10 +36,11 @@ def test_no_processing_if_previous_success(mock_can_process, mock_header, mock_p
@mock.patch('banzai.utils.fits_utils.get_primary_header')
@mock.patch('banzai.utils.image_utils.image_can_be_processed')
def test_do_process_if_never_tried(mock_can_process, mock_header, mock_processed, mock_md5, mock_commit):
mock_task = mock.MagicMock()
mock_can_process.return_value = True
mock_processed.return_value = FakeRealtimeImage(success=False, checksum=md5_hash1, tries=0)
mock_md5.return_value = md5_hash1
assert need_to_process_image({'path':'test.fits'}, FakeContext())
assert need_to_process_image({'path': 'test.fits'}, FakeContext(), mock_task)


@mock.patch('banzai.dbs.commit_processed_image')
Expand All @@ -46,12 +49,13 @@ def test_do_process_if_never_tried(mock_can_process, mock_header, mock_processed
@mock.patch('banzai.utils.fits_utils.get_primary_header')
@mock.patch('banzai.utils.image_utils.image_can_be_processed')
def test_do_process_if_tries_less_than_max(mock_can_process, mock_header, mock_processed, mock_md5, mock_commit):
mock_task = mock.MagicMock()
mock_can_process.return_value = True
mock_processed.return_value = FakeRealtimeImage(success=False, checksum=md5_hash1, tries=3)
mock_md5.return_value = md5_hash1
context = FakeContext()
context.max_tries = 5
assert need_to_process_image({'path':'test.fits'}, context)
assert need_to_process_image({'path': 'test.fits'}, context, mock_task)


@mock.patch('banzai.dbs.commit_processed_image')
Expand All @@ -60,13 +64,14 @@ def test_do_process_if_tries_less_than_max(mock_can_process, mock_header, mock_p
@mock.patch('banzai.utils.fits_utils.get_primary_header')
@mock.patch('banzai.utils.image_utils.image_can_be_processed')
def test_no_processing_if_tries_at_max(mock_can_process, mock_header, mock_processed, mock_md5, mock_commit):
mock_task = mock.MagicMock()
mock_can_process.return_value = True
max_tries = 5
mock_processed.return_value = FakeRealtimeImage(success=False, checksum=md5_hash1, tries=max_tries)
mock_md5.return_value = md5_hash1
context = FakeContext()
context.max_tries = max_tries
assert not need_to_process_image({'path':'test.fits'}, context)
assert not need_to_process_image({'path': 'test.fits'}, context, mock_task)


@mock.patch('banzai.dbs.commit_processed_image')
Expand All @@ -76,11 +81,12 @@ def test_no_processing_if_tries_at_max(mock_can_process, mock_header, mock_proce
@mock.patch('banzai.utils.image_utils.image_can_be_processed')
def test_do_process_if_new_checksum(mock_can_process, mock_header, mock_processed, mock_md5, mock_commit):
# assert that tries and success are reset to 0
mock_task = mock.MagicMock()
image = FakeRealtimeImage(success=True, checksum=md5_hash1, tries=3)
mock_can_process.return_value = True
mock_processed.return_value = image
mock_md5.return_value = md5_hash2
assert need_to_process_image({'path': 'test.fits'}, FakeContext())
assert need_to_process_image({'path': 'test.fits'}, FakeContext(), mock_task)
assert not image.success
assert image.tries == 0
assert image.checksum == md5_hash2
10 changes: 9 additions & 1 deletion banzai/utils/realtime_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from banzai.utils import file_utils, import_utils, image_utils
from banzai.data import HeaderOnly
from banzai import logs
import datetime


logger = logs.get_logger()

Expand All @@ -22,7 +24,7 @@ def increment_try_number(path, db_address):
dbs.commit_processed_image(image, db_address=db_address)


def need_to_process_image(file_info, context):
def need_to_process_image(file_info, context, task):
"""
Figure out if we need to try to make a process a given file.

Expand Down Expand Up @@ -108,6 +110,12 @@ def need_to_process_image(file_info, context):
msg = 'The header in this queue message appears to not be complete enough to make a Frame object'
logger.error(msg, extra_tags={'filename': filename})
need_to_process = False
if context.delay_to_block_end and test_image.obstype in context.OBSTYPES_TO_DELAY:
if datetime.datetime.now() < test_image.block_end_date:
logger.info('Observing Block in progress. Retrying 5 minutes after it completes',
extra_tags={'filename': filename})
delay = test_image.block_end_date - datetime.datetime.now() + datetime.timedelta(minutes=5)
task.retry(countdown=delay.total_seconds())
except Exception:
logger.error('Issue creating Image object with given queue message', extra_tags={"filename": filename})
logger.error(logs.format_exception())
Expand Down
Loading