diff --git a/.gitignore b/.gitignore index 68bc17f..6a9390b 100644 --- a/.gitignore +++ b/.gitignore @@ -158,3 +158,6 @@ cython_debug/ # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. #.idea/ + +# vscode +.vscode diff --git a/datalab/datalab_session/data_operations/median.py b/datalab/datalab_session/data_operations/median.py index ade0ef1..88299e4 100644 --- a/datalab/datalab_session/data_operations/median.py +++ b/datalab/datalab_session/data_operations/median.py @@ -1,4 +1,16 @@ +from io import BytesIO +import logging +import os +import tempfile + +import numpy as np +from astropy.io import fits + from datalab.datalab_session.data_operations.data_operation import BaseDataOperation +from datalab.datalab_session.util import store_fits_output, get_archive_from_basename + +log = logging.getLogger() +log.setLevel(logging.INFO) class Median(BaseDataOperation): @@ -31,4 +43,62 @@ def wizard_description(): } def operate(self): - pass + input_files = self.input_data.get('input_files', []) + file_count = len(input_files) + + if file_count == 0: + return { 'output_files': [] } + + log.info(f'Executing median operation on {file_count} files') + + with tempfile.TemporaryDirectory() as temp_dir: + memmap_paths = [] + + for index, file_info in enumerate(input_files): + basename = file_info.get('basename', 'No basename found') + archive_record = get_archive_from_basename(basename) + + try: + fits_url = archive_record[0].get('url', 'No URL found') + except IndexError: + continue + + with fits.open(fits_url, use_fsspec=True) as hdu_list: + data = hdu_list['SCI'].data + memmap_path = os.path.join(temp_dir, f'memmap_{index}.dat') + memmap_array = np.memmap(memmap_path, dtype=data.dtype, mode='w+', shape=data.shape) + memmap_array[:] = data[:] + memmap_paths.append(memmap_path) + + self.set_percent_completion(index / file_count) + + image_data_list = [ + np.memmap(path, dtype=np.float32, mode='r', shape=memmap_array.shape) + for path in memmap_paths + ] + + # Crop fits image data to be the same shape then stack + min_shape = min(arr.shape for arr in image_data_list) + cropped_data_list = [arr[:min_shape[0], :min_shape[1]] for arr in image_data_list] + stacked_data = np.stack(cropped_data_list, axis=2) + + # Calculate a Median along the z axis + median = np.median(stacked_data, axis=2) + + cache_key = self.generate_cache_key() + header = fits.Header([('KEY', cache_key)]) + primary_hdu = fits.PrimaryHDU(header=header) + image_hdu = fits.ImageHDU(median) + hdu_list = fits.HDUList([primary_hdu, image_hdu]) + + fits_buffer = BytesIO() + hdu_list.writeto(fits_buffer) + fits_buffer.seek(0) + + # Write the HDU List to the output FITS file in the bucket + response = store_fits_output(cache_key, fits_buffer) + + # TODO: No output yet, need to build a thumbnail service + output = {'output_files': []} + self.set_percent_completion(file_count / file_count) + self.set_output(output) diff --git a/datalab/datalab_session/util.py b/datalab/datalab_session/util.py new file mode 100644 index 0000000..de60796 --- /dev/null +++ b/datalab/datalab_session/util.py @@ -0,0 +1,43 @@ +import requests +import logging + +import boto3 + +from django.conf import settings + +log = logging.getLogger() +log.setLevel(logging.INFO) + +def store_fits_output(item_key: str, fits_buffer: object) -> object: + """ + Stores a fits into the operation bucket in S3 + + Keyword Arguements: + item_key -- name under which to store the fits file + fits_buffer -- the fits file to add to the bucket + """ + log.info(f'Adding {item_key} to {settings.DATALAB_OPERATION_BUCKET}') + + s3 = boto3.resource('s3') + response = s3.Bucket(settings.DATALAB_OPERATION_BUCKET).put_object(Key = item_key, Body = fits_buffer.getvalue()) + return response + +def get_archive_from_basename(basename: str) -> dict: + """ + Queries and returns an archive file from the Archive + + Keyword Arguements: + basename -- name to query + """ + query_params = {'basename_exact': basename } + + response = requests.get(settings.ARCHIVE_API + '/frames/', params=query_params) + + try: + image_data = response.json() + results = image_data.get('results', None) + except IndexError: + log.error(f"No image found with specified basename: {basename}") + raise FileNotFoundError + + return results diff --git a/datalab/settings.py b/datalab/settings.py index 8a06384..37498c7 100644 --- a/datalab/settings.py +++ b/datalab/settings.py @@ -130,6 +130,11 @@ def get_list_from_env(variable, default=None): # AdminMiddleware is enabled. The default value is 'default'. DRAMATIQ_TASKS_DATABASE = 'default' +# AWS S3 Bitbucket +DATALAB_OPERATION_BUCKET = os.getenv('DATALAB_OPERATION_BUCKET', 'datalab-operation-output-bucket') + +# Datalab Archive +ARCHIVE_API = os.getenv('ARCHIVE_API', 'https://datalab-archive.photonranch.org') # Database # https://docs.djangoproject.com/en/4.2/ref/settings/#databases