From 95af52f6dac9c15aec11fd26ebafd9ce20d03732 Mon Sep 17 00:00:00 2001 From: Lloyd Dakin Date: Thu, 7 Mar 2024 15:16:41 -0800 Subject: [PATCH 1/6] psuedo code --- .../datalab_session/data_operations/median.py | 24 ++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/datalab/datalab_session/data_operations/median.py b/datalab/datalab_session/data_operations/median.py index ade0ef1..a220149 100644 --- a/datalab/datalab_session/data_operations/median.py +++ b/datalab/datalab_session/data_operations/median.py @@ -31,4 +31,26 @@ def wizard_description(): } def operate(self): - pass + num_files = len(self.input_data.get('input_files', [])) + + # fetch files and store in disk memory + for i, file in enumerate(self.input_data.get('input_files', [])): + print(f"Processing median operation on file {file.get('basename', 'No basename found')}") + + # Create median fitz result file based on median of all input files + + # Loop on pixel n of each file + # find median of pixel n + # store median of pixel n at pixel n of new fitz + + # Generate a basename for result using a helper function + + # Store median fitz result file in S3 bitbucket + + # Get s3 bitbucket url + + # Return the output + output = { + 'output_files': [] + } + self.set_output(output) From 40f9ac07e373da7a9a7aa80e3949157e5050904c Mon Sep 17 00:00:00 2001 From: Lloyd Dakin Date: Fri, 15 Mar 2024 16:12:48 -0700 Subject: [PATCH 2/6] median operation implementation: pulls fits files from archive, performs a median operation, adds output to bitbucket, returns blank to frontend --- .gitignore | 3 + .../datalab_session/data_operations/median.py | 62 ++++++++++++++----- datalab/datalab_session/util.py | 28 +++++++++ datalab/settings.py | 4 ++ 4 files changed, 81 insertions(+), 16 deletions(-) create mode 100644 datalab/datalab_session/util.py diff --git a/.gitignore b/.gitignore index 68bc17f..6a9390b 100644 --- a/.gitignore +++ b/.gitignore @@ -158,3 +158,6 @@ cython_debug/ # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. #.idea/ + +# vscode +.vscode diff --git a/datalab/datalab_session/data_operations/median.py b/datalab/datalab_session/data_operations/median.py index a220149..80c00a1 100644 --- a/datalab/datalab_session/data_operations/median.py +++ b/datalab/datalab_session/data_operations/median.py @@ -1,5 +1,11 @@ +from io import BytesIO from datalab.datalab_session.data_operations.data_operation import BaseDataOperation - +from datalab.datalab_session.util import store_fits_output, find_fits +import numpy as np +from astropy.io import fits +import logging +log = logging.getLogger() +log.setLevel(logging.INFO) class Median(BaseDataOperation): @@ -31,26 +37,50 @@ def wizard_description(): } def operate(self): - num_files = len(self.input_data.get('input_files', [])) + input_files = self.input_data.get('input_files', []) + completion_total = len(input_files) + image_data_list = [] + + log.info(f'Executing median operation on {completion_total} files') + + # fetch fits for all input data + for index, file_info in enumerate(input_files): + basename = file_info.get('basename', 'No basename found') + + fits_file = find_fits(basename) + fits_url = fits_file[0].get('url', 'No URL found') + + with fits.open(fits_url, use_fsspec=True) as hdu_list: + data = hdu_list['SCI'].data + image_data_list.append(data) + self.set_percent_completion((index) / completion_total) + + # Crop fits image data to be the same shape then stack + min_shape = min(arr.shape for arr in image_data_list) + cropped_data_list = [arr[:min_shape[0], :min_shape[1]] for arr in image_data_list] + stacked_data = np.stack(cropped_data_list, axis=2) + + # Calculate a Median along the z axis + median = np.median(stacked_data, axis=2) - # fetch files and store in disk memory - for i, file in enumerate(self.input_data.get('input_files', [])): - print(f"Processing median operation on file {file.get('basename', 'No basename found')}") - - # Create median fitz result file based on median of all input files + # Create a new Fits File + cache_key = self.generate_cache_key() + hdr = fits.Header([('KEY', cache_key)]) + primary_hdu = fits.PrimaryHDU(header=hdr) + image_hdu = fits.ImageHDU(median) + hdu_list = fits.HDUList([primary_hdu, image_hdu]) - # Loop on pixel n of each file - # find median of pixel n - # store median of pixel n at pixel n of new fitz + fits_buffer = BytesIO() + hdu_list.writeto(fits_buffer) + fits_buffer.seek(0) - # Generate a basename for result using a helper function + # Write the HDU List to the output FITS file in bitbucket + response = store_fits_output(cache_key, fits_buffer) + log.info(f'AWS response: {response}') - # Store median fitz result file in S3 bitbucket - - # Get s3 bitbucket url - - # Return the output + # No output yet, need to build a thumbnail service output = { 'output_files': [] } + self.set_percent_completion(completion_total / completion_total) self.set_output(output) diff --git a/datalab/datalab_session/util.py b/datalab/datalab_session/util.py new file mode 100644 index 0000000..62638f3 --- /dev/null +++ b/datalab/datalab_session/util.py @@ -0,0 +1,28 @@ +import boto3 +import requests +import logging +log = logging.getLogger() +log.setLevel(logging.INFO) + +bucket_name = 'datalab-operation-output-bucket' +archive_frames_url = 'https://datalab-archive.photonranch.org/frames/' + +def store_fits_output(item_key, fits_buffer): + log.info(f'Adding {item_key} to {bucket_name}') + + s3 = boto3.resource('s3') + response = s3.Bucket(bucket_name).put_object(Key = item_key, Body = fits_buffer.getvalue()) + return response + +def find_fits(basename): + query_params = {'basename_exact': basename } + + response = requests.get(archive_frames_url, params=query_params) + response.raise_for_status() + + if response.status_code != 204: + image_data = response.json() + results = image_data.get('results', []) + + if results: + return results diff --git a/datalab/settings.py b/datalab/settings.py index 8a06384..91c3d03 100644 --- a/datalab/settings.py +++ b/datalab/settings.py @@ -130,6 +130,10 @@ def get_list_from_env(variable, default=None): # AdminMiddleware is enabled. The default value is 'default'. DRAMATIQ_TASKS_DATABASE = 'default' +# AWS S3 Bitbucket +DATALAB_OPERATION_OUTPUT_BUCKET = os.getenv('DATALAB_OPERATION_OUTPUT_BUCKET' , 'datalab-operation-output-bucket') +AWS_ACCESS_KEY = os.getenv('AWS_ACCESS_KEY', '') +AWS_PRIVATE_KEY = os.getenv('AWS_PRIVATE_KEY', '') # Database # https://docs.djangoproject.com/en/4.2/ref/settings/#databases From c093f7eaf959603b5295f725ec3b0547844962ae Mon Sep 17 00:00:00 2001 From: Lloyd Dakin Date: Mon, 18 Mar 2024 15:42:12 -0700 Subject: [PATCH 3/6] returned none if no result and renamed keys to match credentials file for consistency --- datalab/datalab_session/util.py | 7 +++---- datalab/settings.py | 4 ++-- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/datalab/datalab_session/util.py b/datalab/datalab_session/util.py index 62638f3..2885f67 100644 --- a/datalab/datalab_session/util.py +++ b/datalab/datalab_session/util.py @@ -22,7 +22,6 @@ def find_fits(basename): if response.status_code != 204: image_data = response.json() - results = image_data.get('results', []) - - if results: - return results + results = image_data.get('results', None) + + return results diff --git a/datalab/settings.py b/datalab/settings.py index 91c3d03..20eafaa 100644 --- a/datalab/settings.py +++ b/datalab/settings.py @@ -132,8 +132,8 @@ def get_list_from_env(variable, default=None): # AWS S3 Bitbucket DATALAB_OPERATION_OUTPUT_BUCKET = os.getenv('DATALAB_OPERATION_OUTPUT_BUCKET' , 'datalab-operation-output-bucket') -AWS_ACCESS_KEY = os.getenv('AWS_ACCESS_KEY', '') -AWS_PRIVATE_KEY = os.getenv('AWS_PRIVATE_KEY', '') +AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID', '') +AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY', '') # Database # https://docs.djangoproject.com/en/4.2/ref/settings/#databases From ab49450bd53b24bb9f9c3664d27b8d8abefde8ab Mon Sep 17 00:00:00 2001 From: Lloyd Dakin Date: Wed, 20 Mar 2024 15:52:23 -0700 Subject: [PATCH 4/6] exception handling, formatting, settings, and spacing changes --- .../datalab_session/data_operations/median.py | 38 ++++++++++------ datalab/datalab_session/util.py | 44 +++++++++++++------ datalab/settings.py | 7 +-- 3 files changed, 58 insertions(+), 31 deletions(-) diff --git a/datalab/datalab_session/data_operations/median.py b/datalab/datalab_session/data_operations/median.py index 80c00a1..60f40b1 100644 --- a/datalab/datalab_session/data_operations/median.py +++ b/datalab/datalab_session/data_operations/median.py @@ -1,12 +1,16 @@ from io import BytesIO -from datalab.datalab_session.data_operations.data_operation import BaseDataOperation -from datalab.datalab_session.util import store_fits_output, find_fits +import logging + import numpy as np from astropy.io import fits -import logging + +from datalab.datalab_session.data_operations.data_operation import BaseDataOperation +from datalab.datalab_session.util import store_fits_output, get_archive_from_basename + log = logging.getLogger() log.setLevel(logging.INFO) + class Median(BaseDataOperation): @staticmethod @@ -38,24 +42,31 @@ def wizard_description(): def operate(self): input_files = self.input_data.get('input_files', []) - completion_total = len(input_files) - image_data_list = [] + file_count = len(input_files) - log.info(f'Executing median operation on {completion_total} files') + if file_count == 0: + return { 'output_files': [] } + + log.info(f'Executing median operation on {file_count} files') + + # fetch fits for all input data + image_data_list = [] - # fetch fits for all input data for index, file_info in enumerate(input_files): basename = file_info.get('basename', 'No basename found') - fits_file = find_fits(basename) - fits_url = fits_file[0].get('url', 'No URL found') + archive_record = get_archive_from_basename(basename) + try: + fits_url = archive_record[0].get('url', 'No URL found') + except IndexError: + continue with fits.open(fits_url, use_fsspec=True) as hdu_list: data = hdu_list['SCI'].data image_data_list.append(data) - self.set_percent_completion((index) / completion_total) + self.set_percent_completion((index) / file_count) - # Crop fits image data to be the same shape then stack + # Crop fits image data to be the same shape then stack min_shape = min(arr.shape for arr in image_data_list) cropped_data_list = [arr[:min_shape[0], :min_shape[1]] for arr in image_data_list] stacked_data = np.stack(cropped_data_list, axis=2) @@ -76,11 +87,10 @@ def operate(self): # Write the HDU List to the output FITS file in bitbucket response = store_fits_output(cache_key, fits_buffer) - log.info(f'AWS response: {response}') - # No output yet, need to build a thumbnail service + # TODO: No output yet, need to build a thumbnail service output = { 'output_files': [] } - self.set_percent_completion(completion_total / completion_total) + self.set_percent_completion(file_count / file_count) self.set_output(output) diff --git a/datalab/datalab_session/util.py b/datalab/datalab_session/util.py index 2885f67..8516eef 100644 --- a/datalab/datalab_session/util.py +++ b/datalab/datalab_session/util.py @@ -1,27 +1,43 @@ -import boto3 import requests import logging + +import boto3 + +from django.conf import settings + log = logging.getLogger() log.setLevel(logging.INFO) -bucket_name = 'datalab-operation-output-bucket' -archive_frames_url = 'https://datalab-archive.photonranch.org/frames/' - def store_fits_output(item_key, fits_buffer): - log.info(f'Adding {item_key} to {bucket_name}') + """ + Stores a fits into the operation bucket in S3 + + Keyword Arguements: + item_key -- name under which to store the fits file + fits_buffer -- the fits file to add to the bucket + """ + log.info(f'Adding {item_key} to {settings.DATALAB_OPERATION_BUCKET}') s3 = boto3.resource('s3') - response = s3.Bucket(bucket_name).put_object(Key = item_key, Body = fits_buffer.getvalue()) + response = s3.Bucket(settings.DATALAB_OPERATION_BUCKET).put_object(Key = item_key, Body = fits_buffer.getvalue()) return response -def find_fits(basename): +def get_archive_from_basename(basename: str) -> dict: + """ + Querys and returns an archive file from the Archive + + Keyword Arguements: + basename -- name to query + """ query_params = {'basename_exact': basename } - response = requests.get(archive_frames_url, params=query_params) - response.raise_for_status() + response = requests.get(settings.ARCHIVE_API + '/frames/', params=query_params) + + try: + image_data = response.json() + results = image_data.get('results', None) + except IndexError: + log.error(f"No image found with specified basename: {basename}") + raise FileNotFoundError - if response.status_code != 204: - image_data = response.json() - results = image_data.get('results', None) - - return results + return results diff --git a/datalab/settings.py b/datalab/settings.py index 20eafaa..37498c7 100644 --- a/datalab/settings.py +++ b/datalab/settings.py @@ -131,9 +131,10 @@ def get_list_from_env(variable, default=None): DRAMATIQ_TASKS_DATABASE = 'default' # AWS S3 Bitbucket -DATALAB_OPERATION_OUTPUT_BUCKET = os.getenv('DATALAB_OPERATION_OUTPUT_BUCKET' , 'datalab-operation-output-bucket') -AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID', '') -AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY', '') +DATALAB_OPERATION_BUCKET = os.getenv('DATALAB_OPERATION_BUCKET', 'datalab-operation-output-bucket') + +# Datalab Archive +ARCHIVE_API = os.getenv('ARCHIVE_API', 'https://datalab-archive.photonranch.org') # Database # https://docs.djangoproject.com/en/4.2/ref/settings/#databases From 4846322d399c6dcb966e9cf99749a5d8b0410e79 Mon Sep 17 00:00:00 2001 From: Lloyd Dakin Date: Thu, 21 Mar 2024 10:26:30 -0700 Subject: [PATCH 5/6] using tempfile and memap to work on arrays in memory, first stores each array individually and then combines them --- .../datalab_session/data_operations/median.py | 90 ++++++++++--------- datalab/datalab_session/util.py | 2 +- 2 files changed, 50 insertions(+), 42 deletions(-) diff --git a/datalab/datalab_session/data_operations/median.py b/datalab/datalab_session/data_operations/median.py index 60f40b1..49ef942 100644 --- a/datalab/datalab_session/data_operations/median.py +++ b/datalab/datalab_session/data_operations/median.py @@ -1,5 +1,7 @@ from io import BytesIO import logging +import os +import tempfile import numpy as np from astropy.io import fits @@ -49,48 +51,54 @@ def operate(self): log.info(f'Executing median operation on {file_count} files') - # fetch fits for all input data - image_data_list = [] - - for index, file_info in enumerate(input_files): - basename = file_info.get('basename', 'No basename found') - - archive_record = get_archive_from_basename(basename) - try: - fits_url = archive_record[0].get('url', 'No URL found') - except IndexError: - continue - - with fits.open(fits_url, use_fsspec=True) as hdu_list: - data = hdu_list['SCI'].data - image_data_list.append(data) - self.set_percent_completion((index) / file_count) - - # Crop fits image data to be the same shape then stack - min_shape = min(arr.shape for arr in image_data_list) - cropped_data_list = [arr[:min_shape[0], :min_shape[1]] for arr in image_data_list] - stacked_data = np.stack(cropped_data_list, axis=2) - - # Calculate a Median along the z axis - median = np.median(stacked_data, axis=2) - - # Create a new Fits File - cache_key = self.generate_cache_key() - hdr = fits.Header([('KEY', cache_key)]) - primary_hdu = fits.PrimaryHDU(header=hdr) - image_hdu = fits.ImageHDU(median) - hdu_list = fits.HDUList([primary_hdu, image_hdu]) - - fits_buffer = BytesIO() - hdu_list.writeto(fits_buffer) - fits_buffer.seek(0) - - # Write the HDU List to the output FITS file in bitbucket - response = store_fits_output(cache_key, fits_buffer) + with tempfile.TemporaryDirectory() as temp_dir: + memmap_paths = [] + + for index, file_info in enumerate(input_files): + basename = file_info.get('basename', 'No basename found') + archive_record = get_archive_from_basename(basename) + + try: + fits_url = archive_record[0].get('url', 'No URL found') + except IndexError: + continue + + with fits.open(fits_url, use_fsspec=True) as hdu_list: + data = hdu_list['SCI'].data + memmap_path = os.path.join(temp_dir, f'memmap_{index}.dat') + memmap_array = np.memmap(memmap_path, dtype=data.dtype, mode='w+', shape=data.shape) + memmap_array[:] = data[:] + memmap_paths.append(memmap_path) + + self.set_percent_completion(index / file_count) + + image_data_list = [ + np.memmap(path, dtype=np.float32, mode='r', shape=memmap_array.shape) + for path in memmap_paths + ] + + # Crop fits image data to be the same shape then stack + min_shape = min(arr.shape for arr in image_data_list) + cropped_data_list = [arr[:min_shape[0], :min_shape[1]] for arr in image_data_list] + stacked_data = np.stack(cropped_data_list, axis=2) + + # Calculate a Median along the z axis + median = np.median(stacked_data, axis=2) + + cache_key = self.generate_cache_key() + header = fits.Header([('KEY', cache_key)]) + primary_hdu = fits.PrimaryHDU(header=header) + image_hdu = fits.ImageHDU(median) + hdu_list = fits.HDUList([primary_hdu, image_hdu]) + + fits_buffer = BytesIO() + hdu_list.writeto(fits_buffer) + fits_buffer.seek(0) + + # Write the HDU List to the output FITS file in bitbucket + response = store_fits_output(cache_key, fits_buffer) # TODO: No output yet, need to build a thumbnail service - output = { - 'output_files': [] - } + output = {'output_files': []} self.set_percent_completion(file_count / file_count) self.set_output(output) diff --git a/datalab/datalab_session/util.py b/datalab/datalab_session/util.py index 8516eef..d4ca246 100644 --- a/datalab/datalab_session/util.py +++ b/datalab/datalab_session/util.py @@ -24,7 +24,7 @@ def store_fits_output(item_key, fits_buffer): def get_archive_from_basename(basename: str) -> dict: """ - Querys and returns an archive file from the Archive + Queries and returns an archive file from the Archive Keyword Arguements: basename -- name to query From 2cdf3f6ac09ecba58b4933cdd887758692bec8d1 Mon Sep 17 00:00:00 2001 From: Lloyd Dakin Date: Thu, 21 Mar 2024 13:55:35 -0700 Subject: [PATCH 6/6] type hints, comment fix --- datalab/datalab_session/data_operations/median.py | 2 +- datalab/datalab_session/util.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/datalab/datalab_session/data_operations/median.py b/datalab/datalab_session/data_operations/median.py index 49ef942..88299e4 100644 --- a/datalab/datalab_session/data_operations/median.py +++ b/datalab/datalab_session/data_operations/median.py @@ -95,7 +95,7 @@ def operate(self): hdu_list.writeto(fits_buffer) fits_buffer.seek(0) - # Write the HDU List to the output FITS file in bitbucket + # Write the HDU List to the output FITS file in the bucket response = store_fits_output(cache_key, fits_buffer) # TODO: No output yet, need to build a thumbnail service diff --git a/datalab/datalab_session/util.py b/datalab/datalab_session/util.py index d4ca246..de60796 100644 --- a/datalab/datalab_session/util.py +++ b/datalab/datalab_session/util.py @@ -8,7 +8,7 @@ log = logging.getLogger() log.setLevel(logging.INFO) -def store_fits_output(item_key, fits_buffer): +def store_fits_output(item_key: str, fits_buffer: object) -> object: """ Stores a fits into the operation bucket in S3