Skip to content

Commit

Permalink
Merge pull request #379 from LCOGT/add-large-workers
Browse files Browse the repository at this point in the history
Add large workers
  • Loading branch information
mgdaily authored Jan 22, 2024
2 parents 7855698 + 933df3f commit c1fc01b
Show file tree
Hide file tree
Showing 26 changed files with 944 additions and 336 deletions.
7 changes: 6 additions & 1 deletion .github/workflows/docker_image.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ on:
push:
tags:
- "*"
pull_request:
branches:
- master

jobs:
build-tag:
Expand Down Expand Up @@ -39,9 +42,11 @@ jobs:

- name: Extract Docker metadata
id: meta
uses: docker/metadata-action@v4
uses: docker/metadata-action@v5
with:
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
tags: |
type=sha,event=pr
- name: Build and also push Dockerimage
id: build-and-push
Expand Down
52 changes: 52 additions & 0 deletions .github/workflows/e2e.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
name: E2E Tests

on:
workflow_dispatch: {}
push: {}

jobs:
test:
runs-on:
- banzai-runner
steps:
- name: Checkout
uses: actions/checkout@v4

- name: Install kubectl
uses: azure/setup-kubectl@v3
with:
version: "v1.22.17"

- name: Start a Kind cluster
uses: helm/[email protected]
with:
cluster_name: kind
version: "v0.19.0"
node_image: kindest/node:v1.22.17@sha256:9af784f45a584f6b28bce2af84c494d947a05bd709151466489008f80a9ce9d5
wait: "600s"

- name: Build docker image
run: |
docker build -t banzai:test-latest .
- name: Copy docker image to nodes
run: |
kind load docker-image banzai:test-latest
- name: Start banzai
run: |
cat banzai/tests/e2e-k8s.yaml
# Deploy banzai stack
kubectl apply -f banzai/tests/e2e-k8s.yaml
# Wait for banzai to be ready
kubectl wait --for=condition=Ready --timeout=60m pod/banzai-e2e-test
- name: Test Master Bias Creation
run: |
kubectl exec banzai-e2e-test -c banzai-listener -- pytest -o log_cli=true -s --pyargs banzai.tests --durations=0 --junitxml=/archive/engineering/pytest-master-bias.xml -m master_bias
- name: Cleanup
run: |
kubectl delete pod banzai-e2e-test
2 changes: 1 addition & 1 deletion banzai/calibrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ def make_master_calibrations(instrument, frame_type, min_date, max_date, runtime
'max_date': max_date}
logger.info("Making master frames", extra_tags=extra_tags)
calibration_frames_info = dbs.get_individual_cal_frames(instrument, frame_type, min_date, max_date,
db_address=runtime_context.db_address)
db_address=runtime_context.db_address)
if len(calibration_frames_info) == 0:
logger.info("No calibration frames found to stack", extra_tags=extra_tags)
try:
Expand Down
27 changes: 19 additions & 8 deletions banzai/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@

from banzai import dbs, calibrations, logs
from banzai.utils import date_utils, realtime_utils, stage_utils
from celery.signals import setup_logging, worker_process_init
from celery.signals import worker_process_init
from banzai.context import Context
from banzai.utils.observation_utils import filter_calibration_blocks_for_type, get_calibration_blocks_for_time_range
from banzai.utils.date_utils import get_stacking_date_range
import logging


logger = logs.get_logger()
Expand Down Expand Up @@ -51,10 +52,14 @@ def configure_workers(**kwargs):

app.log.setup()
logs.set_log_level(os.getenv('BANZAI_WORKER_LOGLEVEL', 'INFO'))
logging.getLogger('amqp').setLevel(logging.WARNING)
logging.getLogger('kombu').setLevel(logging.WARNING)
logging.getLogger('celery.bootsteps').setLevel(logging.WARNING)


@app.task(name='celery.schedule_calibration_stacking', reject_on_worker_lost=True, max_retries=5)
def schedule_calibration_stacking(site: str, runtime_context: dict, min_date=None, max_date=None, frame_types=None):
def schedule_calibration_stacking(site: str, runtime_context: dict,
min_date: str = None, max_date: str = None, frame_types=None):
logger.info('Scheduling when to stack frames.', extra_tags={'site': site})
try:
runtime_context = Context(runtime_context)
Expand Down Expand Up @@ -87,11 +92,12 @@ def schedule_calibration_stacking(site: str, runtime_context: dict, min_date=Non

instruments = dbs.get_instruments_at_site(site=site, db_address=runtime_context.db_address)
for instrument in instruments:
logger.info('Checking for scheduled calibration blocks', extra_tags={'site': site,
'min_date': stacking_min_date,
'max_date': stacking_max_date,
'instrument': instrument.camera,
'frame_type': frame_type})
logger.info('Checking for scheduled calibration blocks',
extra_tags={'site': site,
'min_date': stacking_min_date,
'max_date': stacking_max_date,
'instrument': instrument.camera,
'frame_type': frame_type})
blocks_for_calibration = filter_calibration_blocks_for_type(instrument, frame_type,
calibration_blocks, runtime_context,
stacking_min_date, stacking_max_date)
Expand All @@ -110,9 +116,14 @@ def schedule_calibration_stacking(site: str, runtime_context: dict, min_date=Non
logger.info('Scheduling stacking at {}'.format(schedule_time.strftime(date_utils.TIMESTAMP_FORMAT)),
extra_tags={'site': site, 'min_date': stacking_min_date, 'max_date': stacking_max_date,
'instrument': instrument.camera, 'frame_type': frame_type})
if instrument.nx * instrument.ny > runtime_context.LARGE_WORKER_THRESHOLD:
queue_name = runtime_context.LARGE_WORKER_QUEUE
else:
queue_name = runtime_context.CELERY_TASK_QUEUE_NAME

stack_calibrations.apply_async(args=(stacking_min_date, stacking_max_date, instrument.id, frame_type,
vars(runtime_context), blocks_for_calibration),
countdown=message_delay_in_seconds)
countdown=message_delay_in_seconds, queue=queue_name)
else:
logger.warning('No scheduled calibration blocks found.',
extra_tags={'site': site, 'min_date': min_date, 'max_date': max_date,
Expand Down
2 changes: 0 additions & 2 deletions banzai/dark.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from banzai.logs import get_logger
import numpy as np


logger = get_logger()


Expand Down Expand Up @@ -45,7 +44,6 @@ def apply_master_calibration(self, image, master_calibration_image):
temperature_scaling_factor = np.exp(master_calibration_image.dark_temperature_coefficient * \
(image.measured_ccd_temperature - master_calibration_image.measured_ccd_temperature))
master_calibration_image *= temperature_scaling_factor

image -= master_calibration_image
image.meta['L1IDDARK'] = master_calibration_image.filename, 'ID of dark frame'
image.meta['L1STATDA'] = 1, 'Status flag for dark frame correction'
Expand Down
23 changes: 18 additions & 5 deletions banzai/dbs.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ class Instrument(Base):
camera = Column(String(50), index=True, nullable=False)
type = Column(String(100))
name = Column(String(100), index=True, nullable=False)
nx = Column(Integer)
ny = Column(Integer)
__table_args__ = (UniqueConstraint('site', 'camera', 'name', name='instrument_constraint'),)


Expand Down Expand Up @@ -146,10 +148,20 @@ def parse_configdb(configdb_address):
for ins in tel['instrument_set']:
for sci_cam in ins['science_cameras']:
if sci_cam is not None:
camera_size = sci_cam['camera_type']['size']
if camera_size == 'N/A':
nx = 25
ny = 25
else:
nx, ny = camera_size.split('x')
# Convert from arcminutes to arcseconds and then to pixels
nx = int(float(nx) * 60 / float(sci_cam['camera_type']['pscale']))
ny = int(float(ny) * 60 / float(sci_cam['camera_type']['pscale']))
instrument = {'site': site['code'],
'camera': sci_cam['code'],
'name': ins.get('code'),
'type': ins['instrument_type']['code']}
'type': ins['instrument_type']['code'],
'nx': nx, 'ny': ny}
# hotfix for configdb
if not instrument['name']:
instrument['name'] = instrument['camera']
Expand All @@ -167,7 +179,9 @@ def add_instrument(instrument, db_address):
record_attributes = {'site': instrument['site'],
'camera': instrument['camera'],
'name': instrument['name'],
'type': instrument['type']}
'type': instrument['type'],
'nx': instrument['nx'],
'ny': instrument['ny']}

instrument_record = add_or_update_record(db_session, Instrument, equivalence_criteria, record_attributes)
db_session.commit()
Expand Down Expand Up @@ -435,8 +449,7 @@ def populate_instrument_tables(db_address, configdb_address):
added to the network.
"""
sites, instruments = parse_configdb(configdb_address=configdb_address)
with get_session(db_address=db_address) as db_session:
for site in sites:
add_site(site, db_address)
for site in sites:
add_site(site, db_address)
for instrument in instruments:
add_instrument(instrument, db_address)
14 changes: 12 additions & 2 deletions banzai/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from types import ModuleType

from banzai.lco import LCOFrameFactory
from banzai import settings, dbs, logs, calibrations
from banzai.context import Context
from banzai.utils import date_utils, stage_utils, import_utils, image_utils, fits_utils, file_utils
Expand Down Expand Up @@ -46,8 +47,13 @@ def get_consumers(self, Consumer, channel):
return [consumer]

def on_message(self, body, message):
instrument = LCOFrameFactory.get_instrument_from_header(body, self.runtime_context.db_address)
if instrument is not None and instrument.nx * instrument.ny > self.runtime_context.LARGE_WORKER_THRESHOLD:
queue_name = self.runtime_context.LARGE_WORKER_QUEUE
else:
queue_name = self.runtime_context.CELERY_TASK_QUEUE_NAME
process_image.apply_async(args=(body, vars(self.runtime_context)),
queue=self.runtime_context.CELERY_TASK_QUEUE_NAME)
queue=queue_name)
message.ack() # acknowledge to the sender we got this message (it can be popped)


Expand Down Expand Up @@ -230,13 +236,17 @@ def add_instrument():
parser.add_argument("--name", help='Instrument name (e.g kb05, nres03)', required=True)
parser.add_argument("--instrument-type", dest='instrument_type',
help="Instrument type (e.g. 1m0-SciCam-Sinistro)", required=True)
parser.add_argument("--nx", help='Number of pixels in x direction', required=True)
parser.add_argument("--ny", help='Number of pixels in y direction', required=True)
parser.add_argument('--db-address', dest='db_address', default='sqlite:///test.db',
help='Database address: Should be in SQLAlchemy format')
args = parser.parse_args()
instrument = {'site': args.site,
'camera': args.camera,
'type': args.instrument_type,
'name': args.name}
'name': args.name,
'nx': args.nx,
'ny': args.ny}
dbs.add_instrument(instrument, args.db_address)


Expand Down
5 changes: 5 additions & 0 deletions banzai/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,4 +155,9 @@

CELERY_TASK_QUEUE_NAME = os.getenv('CELERY_TASK_QUEUE_NAME', 'celery')

# Choose a threshold a little larger than the 4096 x 4096 size frames
LARGE_WORKER_THRESHOLD = 5000 * 5000

LARGE_WORKER_QUEUE = os.getenv('CELERY_LARGE_TASK_QUEUE_NAME', 'celery_large')

REFERENCE_CATALOG_URL = os.getenv('REFERENCE_CATALOG_URL', 'http://phot-catalog.lco.gtn/')
Loading

0 comments on commit c1fc01b

Please sign in to comment.