From 8f6cf5dc1d3e524c65e683fd450ce821dd73d1a3 Mon Sep 17 00:00:00 2001 From: Jon Date: Thu, 5 Oct 2023 23:35:33 +0000 Subject: [PATCH 1/5] Add forced photometry service architecture and atlas implementation --- setup.py | 2 + tom_dataproducts/data_processor.py | 10 +- .../forced_photometry/__init__.py | 0 tom_dataproducts/forced_photometry/atlas.py | 126 +++++++++++++++++ .../forced_photometry_service.py | 127 ++++++++++++++++++ .../processors/atlas_processor.py | 71 ++++++++++ tom_dataproducts/tasks.py | 102 ++++++++++++++ .../forced_photometry_form.html | 14 ++ .../partials/query_forced_photometry.html | 9 ++ .../templatetags/dataproduct_extras.py | 8 ++ tom_dataproducts/urls.py | 3 +- tom_dataproducts/views.py | 82 +++++++++++ .../templates/tom_targets/target_detail.html | 5 + 13 files changed, 557 insertions(+), 2 deletions(-) create mode 100644 tom_dataproducts/forced_photometry/__init__.py create mode 100644 tom_dataproducts/forced_photometry/atlas.py create mode 100644 tom_dataproducts/forced_photometry/forced_photometry_service.py create mode 100644 tom_dataproducts/processors/atlas_processor.py create mode 100644 tom_dataproducts/tasks.py create mode 100644 tom_dataproducts/templates/tom_dataproducts/forced_photometry_form.html create mode 100644 tom_dataproducts/templates/tom_dataproducts/partials/query_forced_photometry.html diff --git a/setup.py b/setup.py index 7f382cfb0..f546ee590 100644 --- a/setup.py +++ b/setup.py @@ -32,12 +32,14 @@ 'astroplan~=0.8', 'astropy>=5.0', 'beautifulsoup4~=4.9', + 'dramatiq[redis, watch]<2.0.0', 'django>=3.1,<5', # TOM Toolkit requires db math functions 'djangorestframework~=3.12', 'django-bootstrap4>=3,<24', 'django-contrib-comments~=2.0', # Earlier version are incompatible with Django >= 3.0 'django-crispy-forms~=2.0', 'crispy-bootstrap4~=2022.0', + 'django-dramatiq<1.0.0', 'django-extensions~=3.1', 'django-filter>=21,<24', 'django-gravatar2~=1.4', diff --git a/tom_dataproducts/data_processor.py b/tom_dataproducts/data_processor.py index 73854c19a..1ca859ed5 100644 --- a/tom_dataproducts/data_processor.py +++ b/tom_dataproducts/data_processor.py @@ -35,8 +35,9 @@ def run_data_processor(dp): data_processor = clazz() data = data_processor.process_data(dp) + data_type = data_processor.data_type_override() or dp.data_product_type - reduced_datums = [ReducedDatum(target=dp.target, data_product=dp, data_type=dp.data_product_type, + reduced_datums = [ReducedDatum(target=dp.target, data_product=dp, data_type=data_type, timestamp=datum[0], value=datum[1], source_name=datum[2]) for datum in data] ReducedDatum.objects.bulk_create(reduced_datums) @@ -65,3 +66,10 @@ def process_data(self, data_product): :rtype: list of 2-tuples """ return [] + + def data_type_override(self): + """ + Override for the ReducedDatum data type, if you want it to be different from the + DataProduct data_type. + """ + return '' diff --git a/tom_dataproducts/forced_photometry/__init__.py b/tom_dataproducts/forced_photometry/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tom_dataproducts/forced_photometry/atlas.py b/tom_dataproducts/forced_photometry/atlas.py new file mode 100644 index 000000000..b0b3e6676 --- /dev/null +++ b/tom_dataproducts/forced_photometry/atlas.py @@ -0,0 +1,126 @@ +from django import forms +from django.conf import settings +from django.utils import timezone +from django.core.files.base import ContentFile +from crispy_forms.layout import Div, HTML +from astropy.time import Time +from tom_dataproducts.forced_photometry.forced_photometry_service import BaseForcedPhotometryQueryForm, BaseForcedPhotometryService, ForcedPhotometryServiceException +from tom_dataproducts.models import ReducedDatum, DataProduct +from tom_dataproducts.data_processor import run_data_processor +from tom_dataproducts.exceptions import InvalidFileFormatException +from tom_dataproducts.tasks import atlas_query +from tom_targets.models import Target + +class AtlasForcedPhotometryQueryForm(BaseForcedPhotometryQueryForm): + min_date = forms.CharField(label='Min date:', required=False, widget=forms.TextInput(attrs={'class': 'ml-2', 'type': 'datetime-local'})) + max_date = forms.CharField(label='Max date:', required=False, widget=forms.TextInput(attrs={'class': 'ml-2', 'type': 'datetime-local'})) + min_date_mjd = forms.FloatField(label='Min date (mjd):', required=False, widget=forms.NumberInput(attrs={'class': 'ml-2'})) + max_date_mjd = forms.FloatField(label='Max date (mjd):', required=False, widget=forms.NumberInput(attrs={'class': 'ml-2'})) + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + def layout(self): + return Div( + Div( + Div( + 'min_date', + css_class='col-md-4', + ), + Div( + HTML('OR'), + css_class='col-md-1' + ), + Div( + 'min_date_mjd', + css_class='col-md-5' + ), + css_class='form-row form-inline mb-2' + ), + Div( + Div( + 'max_date', + css_class='col-md-4', + ), + Div( + HTML('OR'), + css_class='col-md-1' + ), + Div( + 'max_date_mjd', + css_class='col-md-5' + ), + css_class='form-row form-inline mb-4' + ), + ) + + def clean(self): + cleaned_data = super().clean() + if not (cleaned_data.get('min_date') or cleaned_data.get('min_date_mjd')): + raise forms.ValidationError("Must supply a minimum date in either datetime or mjd format") + if cleaned_data.get('min_date') and cleaned_data.get('min_date_mjd'): + raise forms.ValidationError("Please specify the minimum date in either datetime or mjd format") + if cleaned_data.get('max_date') and cleaned_data.get('max_date_mjd'): + raise forms.ValidationError("Please specify the maximum date in either datetime or mjd format") + return cleaned_data + + +class AtlasForcedPhotometryService(BaseForcedPhotometryService): + name = 'Atlas' + + def __init__(self): + super().__init__ + self.success_message = 'Asynchronous Atlas query is processing. Refresh the page once complete it will show up as a dataproduct in the "Manage Data" tab.' + + def get_form(self): + """ + This method returns the form for querying this service. + """ + return AtlasForcedPhotometryQueryForm + + def query_service(self, query_parameters): + """ + This method takes in the serialized data from the query form and actually + submits the query to the service + """ + print(f"Querying Atlas service with params: {query_parameters}") + min_date_mjd = query_parameters.get('min_date_mjd') + if not min_date_mjd: + min_date_mjd = Time(query_parameters.get('min_date')).mjd + max_date_mjd = query_parameters.get('max_date_mjd') + if not max_date_mjd and query_parameters.get('max_date'): + max_date_mjd = Time(query_parameters.get('max_date')).mjd + if not Target.objects.filter(pk=query_parameters.get('target_id')).exists(): + raise ForcedPhotometryServiceException(f"Target {query_parameters.get('target_id')} does not exist") + + if 'atlas' not in settings.FORCED_PHOTOMETRY_SERVICES: + raise ForcedPhotometryServiceException("Must specify 'atlas' settings in FORCED_PHOTOMETRY_SERVICES") + if not settings.FORCED_PHOTOMETRY_SERVICES.get('atlas', {}).get('url'): + raise ForcedPhotometryServiceException("Must specify a 'url' under atlas settings in FORCED_PHOTOMETRY_SERVICES") + if not settings.FORCED_PHOTOMETRY_SERVICES.get('atlas', {}).get('api_key'): + raise ForcedPhotometryServiceException("Must specify an 'api_key' under atlas settings in FORCED_PHOTOMETRY_SERVICES") + + if 'django_dramatiq' in settings.INSTALLED_APPS: + atlas_query.send(min_date_mjd, max_date_mjd, query_parameters.get('target_id'), self.get_data_product_type()) + else: + query_succeeded = atlas_query(min_date_mjd, max_date_mjd, query_parameters.get('target_id'), self.get_data_product_type()) + if not query_succeeded: + raise ForcedPhotometryServiceException("Atlas query failed, check the server logs for more information") + self.success_message = "Atlas query completed. View its data product in the 'Manage Data' tab" + + return True + + def validate_form(self, query_parameters): + """ + Same thing as query_service, but a dry run. You can + skip this in different modules by just using "pass" + + Typically called by the .is_valid() method. + """ + print(f"Validating Atlas service with params: {query_parameters}") + + def get_success_message(self): + return self.success_message + + def get_data_product_type(self): + return 'atlas_photometry' diff --git a/tom_dataproducts/forced_photometry/forced_photometry_service.py b/tom_dataproducts/forced_photometry/forced_photometry_service.py new file mode 100644 index 000000000..8ab2b2a9a --- /dev/null +++ b/tom_dataproducts/forced_photometry/forced_photometry_service.py @@ -0,0 +1,127 @@ +from abc import ABC, abstractmethod +import copy +import logging +import requests + +from crispy_forms.helper import FormHelper +from crispy_forms.layout import ButtonHolder, Layout, Submit, Div, HTML +from django import forms +from django.conf import settings +from django.contrib.auth.models import Group +from django.core.files.base import ContentFile +from django.utils.module_loading import import_string + +from tom_targets.models import Target + +logger = logging.getLogger(__name__) + + +def get_service_classes(): + try: + forced_photometry_services = settings.FORCED_PHOTOMETRY_SERVICES + except AttributeError: + return {} + + service_choices = {} + for service in forced_photometry_services.values(): + try: + clazz = import_string(service.get('class')) + except (ImportError, AttributeError): + raise ImportError(f'Could not import {service}. Did you provide the correct path?') + service_choices[clazz.name] = clazz + return service_choices + + +def get_service_class(name): + available_classes = get_service_classes() + try: + return available_classes[name] + except KeyError: + raise ImportError(f'Could not a find a forced photometry service with the name {name}. Did you add it to TOM_FORCED_PHOTOMETRY_CLASSES?') + + +class ForcedPhotometryServiceException(Exception): + pass + + +class BaseForcedPhotometryQueryForm(forms.Form): + """ + This is the class that is responsible for displaying the forced photometry request form. + This form is meant to be subclassed by more specific classes that represent a + form for a specific forced photometry service, including the query parameters it supports. + + For an implementation example please see + https://github.com/TOMToolkit/tom_base/blob/main/tom_dataproducts/forced_photometry/atlas.py + """ + service = forms.CharField(required=True, max_length=50, widget=forms.HiddenInput()) + target_id = forms.IntegerField(required=True, widget=forms.HiddenInput()) + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.helper = FormHelper() + self.common_layout = Layout('service', 'target_id') + self.helper.layout = Layout( + self.common_layout, + self.layout(), + self.button_layout() + ) + + def layout(self): + return + + def button_layout(self): + return ButtonHolder( + Submit('submit', 'Submit'), + ) + + +class BaseForcedPhotometryService(ABC): + """ + This is the class that is responsible for defining the base forced photometry service class. + This form is meant to be subclassed by more specific classes that represent a + form for a particular forced photometry service. + """ + name = 'BaseForcedPhotometryService' + + @abstractmethod + def get_form(self): + """ + This method returns the form for querying this service. + """ + pass + + @abstractmethod + def query_service(self, query_parameters): + """ + This method takes in the serialized data from the query form and actually + submits the query to the service + """ + pass + + @abstractmethod + def validate_form(self, query_parameters): + """ + Same thing as query_service, but a dry run. You can + skip this in different modules by just using "pass" + + Typically called by the .is_valid() method. + """ + pass + + @abstractmethod + def get_success_message(self): + """ + This should return a message that shows up in the UI after making the query. + It should explain what is happening / next steps, i.e. if the results will be + emailed to you it should say that and that you must upload them once received. + """ + pass + + @abstractmethod + def get_data_product_type(self): + """ + This should return the data_product_type for data products produced by this service + Make sure to also add this type in your settings to DATA_PRODUCT_TYPES and + DATA_PROCESSORS. + """ + pass diff --git a/tom_dataproducts/processors/atlas_processor.py b/tom_dataproducts/processors/atlas_processor.py new file mode 100644 index 000000000..9e90ed1c9 --- /dev/null +++ b/tom_dataproducts/processors/atlas_processor.py @@ -0,0 +1,71 @@ +import mimetypes + +from astropy import units +import astropy.io.ascii +from astropy.time import Time, TimezoneInfo + +from tom_dataproducts.data_processor import DataProcessor +from tom_dataproducts.exceptions import InvalidFileFormatException + + +class AtlasProcessor(DataProcessor): + + def data_type_override(self): + return 'photometry' + + def process_data(self, data_product): + """ + Routes a atlas processing call to a method specific to a file-format. + + :param data_product: Photometric DataProduct which will be processed into the specified format for database + ingestion + :type data_product: DataProduct + + :returns: python list of 2-tuples, each with a timestamp and corresponding data + :rtype: list + """ + + mimetype = mimetypes.guess_type(data_product.data.path)[0] + if mimetype in self.PLAINTEXT_MIMETYPES: + photometry = self._process_photometry_from_plaintext(data_product) + return [(datum.pop('timestamp'), datum, datum.pop('source', 'ATLAS')) for datum in photometry] + else: + raise InvalidFileFormatException('Unsupported file type') + + def _process_photometry_from_plaintext(self, data_product): + """ + Processes the photometric data from a plaintext file into a list of dicts. File is read using astropy as + specified in the below documentation. The file is expected to be a multi-column delimited space delimited + text file, as produced by the ATLAS forced photometry service at https://fallingstar-data.com/forcedphot + + The header looks like this: + ###MJD m dm uJy duJy F err chi/N RA Dec x y maj min phi apfit mag5sig Sky Obs + + :param data_product: ATLAS Photometric DataProduct which will be processed into a list of dicts + :type data_product: DataProduct + + :returns: python list containing the photometric data from the DataProduct + :rtype: list + """ + photometry = [] + + data = astropy.io.ascii.read(data_product.data.path) + if len(data) < 1: + raise InvalidFileFormatException('Empty table or invalid file type') + + try: + for datum in data: + time = Time(float(datum['##MJD']), format='mjd') + utc = TimezoneInfo(utc_offset=0*units.hour) + time.format = 'datetime' + value = { + 'timestamp': time.to_datetime(timezone=utc), + 'magnitude': float(datum['m']), + 'magnitude_error': float(datum['dm']), + 'filter': str(datum['F']) + } + photometry.append(value) + except Exception as e: + raise InvalidFileFormatException(e) + + return photometry diff --git a/tom_dataproducts/tasks.py b/tom_dataproducts/tasks.py new file mode 100644 index 000000000..d29ac0b3b --- /dev/null +++ b/tom_dataproducts/tasks.py @@ -0,0 +1,102 @@ +# Place dramatiq asynchronous tasks here - they are auto-discovered + +import dramatiq +import requests +import time +import logging +import re +from astropy.time import Time +from urllib.parse import urlparse +from django.conf import settings +from django.utils import timezone +from django.core.files.base import ContentFile + +from tom_targets.models import Target +from tom_dataproducts.models import DataProduct +from tom_dataproducts.exceptions import InvalidFileFormatException +from tom_dataproducts.data_processor import run_data_processor + +logger = logging.getLogger(__name__) + + +@dramatiq.actor(max_retries=0) +def atlas_query(min_date_mjd, max_date_mjd, target_id, data_product_type): + print("Calling atlas query!") + target = Target.objects.get(pk=target_id) + headers = {"Authorization": f"Token {settings.FORCED_PHOTOMETRY_SERVICES.get('atlas', {}).get('api_key')}", "Accept": "application/json"} + base_url = settings.FORCED_PHOTOMETRY_SERVICES.get('atlas', {}).get('url') + task_url = None + while not task_url: + with requests.Session() as s: + task_data = {"ra":target.ra, "dec": target.dec, "mjd_min": min_date_mjd, "send_email": False} + if max_date_mjd: + task_data['mjd_max'] = max_date_mjd + resp = s.post( + f"{base_url}/queue/", headers=headers, + data=task_data) + + if resp.status_code == 201: + task_url = resp.json()["url"] + print(f"The task url is {task_url}") + elif resp.status_code == 429: + message = resp.json()["detail"] + print(f"{resp.status_code} {message}") + t_sec = re.findall(r"available in (\d+) seconds", message) + t_min = re.findall(r"available in (\d+) minutes", message) + if t_sec: + waittime = int(t_sec[0]) + elif t_min: + waittime = int(t_min[0]) * 60 + else: + waittime = 10 + print(f"Waiting {waittime} seconds") + time.sleep(waittime) + else: + logger.error(f"Failed to queue Atlas task: HTTP Error {resp.status_code} - {resp.text}") + return False + + result_url = None + taskstarted_printed = False + while not result_url: + with requests.Session() as s: + resp = s.get(task_url, headers=headers) + + if resp.status_code == 200: + if resp.json()["finishtimestamp"]: + result_url = resp.json()["result_url"] # PART WHEN QUERY IS COMPLETE + print(f"Task is complete with results available at {result_url}") + elif resp.json()["starttimestamp"]: + if not taskstarted_printed: + print(f"Task is running (started at {resp.json()['starttimestamp']})") + taskstarted_printed = True + time.sleep(2) + else: + print(f"Waiting for job to start (queued at {resp.json()['timestamp']})") + time.sleep(4) + else: + logger.error(f"Failed to retrieve Atlas task status: HTTP Error {resp.status_code} - {resp.text}") + return False + + results = requests.get(result_url, headers=headers) + dp_name = f"atlas_{Time(min_date_mjd, format='mjd').strftime('%Y_%m_%d')}" + if max_date_mjd: + dp_name += f"-{Time(max_date_mjd, format='mjd').strftime('%Y_%m_%d')}" + dp_name += f"_{urlparse(result_url)[2].rpartition('/')[2]}" + file = ContentFile(results.content, name=dp_name) + + dp = DataProduct.objects.create( + product_id = dp_name, + target=target, + data=file, + data_product_type=data_product_type, + extra_data=f'Queried from Atlas within the TOM on {timezone.now().isoformat()}' + ) + logger.info(f"Created dataproduct {dp_name} from atlas query") + + try: + run_data_processor(dp) + except InvalidFileFormatException as e: + logger.error(f"Error processing returned Atlas data into ReducedDatums: {repr(e)}") + return False + + return True \ No newline at end of file diff --git a/tom_dataproducts/templates/tom_dataproducts/forced_photometry_form.html b/tom_dataproducts/templates/tom_dataproducts/forced_photometry_form.html new file mode 100644 index 000000000..212d926b7 --- /dev/null +++ b/tom_dataproducts/templates/tom_dataproducts/forced_photometry_form.html @@ -0,0 +1,14 @@ +{% extends 'tom_common/base.html' %} +{% load bootstrap4 static crispy_forms_tags %} +{% block title %}Query Forced Photometry{% endblock %} +{% block additional_css %} + +{% endblock %} +{% block content %} +{{ form|as_crispy_errors }} +

Query {{ form.service.value }} Forced Photometry Service

+
+

Target {{ target.name }} at RA {{ target.ra }}, DEC {{ target.dec }}

+
+{% crispy query_form %} +{% endblock %} \ No newline at end of file diff --git a/tom_dataproducts/templates/tom_dataproducts/partials/query_forced_photometry.html b/tom_dataproducts/templates/tom_dataproducts/partials/query_forced_photometry.html new file mode 100644 index 000000000..c038c6f06 --- /dev/null +++ b/tom_dataproducts/templates/tom_dataproducts/partials/query_forced_photometry.html @@ -0,0 +1,9 @@ +{% load bootstrap4 static %} +{% bootstrap_css %} +{% bootstrap_javascript jquery='full' %} + + +

Query Forced Photometry Service

+{% for service in forced_photometry_services %} +{{ service }} +{% endfor %} diff --git a/tom_dataproducts/templatetags/dataproduct_extras.py b/tom_dataproducts/templatetags/dataproduct_extras.py index adbc76816..d57ce06d6 100644 --- a/tom_dataproducts/templatetags/dataproduct_extras.py +++ b/tom_dataproducts/templatetags/dataproduct_extras.py @@ -20,6 +20,7 @@ from tom_dataproducts.forms import DataProductUploadForm, DataShareForm from tom_dataproducts.models import DataProduct, ReducedDatum from tom_dataproducts.processors.data_serializers import SpectrumSerializer +from tom_dataproducts.forced_photometry.forced_photometry_service import get_service_classes from tom_observations.models import ObservationRecord from tom_targets.models import Target @@ -94,6 +95,13 @@ def dataproduct_list_all(context): 'products': products, } +@register.inclusion_tag('tom_dataproducts/partials/query_forced_photometry.html') +def query_forced_photometry(target): + services = get_service_classes().keys() + return {'forced_photometry_services': services, + 'target': target + } + @register.inclusion_tag('tom_dataproducts/partials/upload_dataproduct.html', takes_context=True) def upload_dataproduct(context, obj): diff --git a/tom_dataproducts/urls.py b/tom_dataproducts/urls.py index ebd4b450b..b47124344 100644 --- a/tom_dataproducts/urls.py +++ b/tom_dataproducts/urls.py @@ -4,7 +4,7 @@ from tom_dataproducts.views import DataProductDeleteView, DataProductGroupCreateView from tom_dataproducts.views import DataProductGroupDetailView, DataProductGroupDataView, DataProductGroupDeleteView from tom_dataproducts.views import DataProductUploadView, DataProductFeatureView, UpdateReducedDataView -from tom_dataproducts.views import DataShareView +from tom_dataproducts.views import DataShareView, ForcedPhotometryQueryView from tom_common.api_router import SharedAPIRootRouter from tom_dataproducts.api_views import DataProductViewSet @@ -23,6 +23,7 @@ path('data/group//delete/', DataProductGroupDeleteView.as_view(), name='group-delete'), path('data/upload/', DataProductUploadView.as_view(), name='upload'), path('data/reduced/update/', UpdateReducedDataView.as_view(), name='update-reduced-data'), + path('data/forced_photometry//query/', ForcedPhotometryQueryView.as_view(), name='forced-photometry-query'), path('data//delete/', DataProductDeleteView.as_view(), name='delete'), path('data//feature/', DataProductFeatureView.as_view(), name='feature'), path('data//share/', DataShareView.as_view(), name='share'), diff --git a/tom_dataproducts/views.py b/tom_dataproducts/views.py index 233437278..85ce0a2b5 100644 --- a/tom_dataproducts/views.py +++ b/tom_dataproducts/views.py @@ -1,6 +1,7 @@ from io import StringIO import logging import os +from typing import Any from urllib.parse import urlencode, urlparse from django.conf import settings @@ -36,6 +37,7 @@ from tom_observations.models import ObservationRecord from tom_observations.facility import get_service_class from tom_dataproducts.serializers import DataProductSerializer +from tom_dataproducts.forced_photometry.forced_photometry_service import ForcedPhotometryServiceException, get_service_class import requests @@ -86,6 +88,86 @@ def post(self, request, *args, **kwargs): ) +class ForcedPhotometryQueryView(LoginRequiredMixin, FormView): + """ + View that handles queries for forced photometry services + """ + template_name = 'tom_dataproducts/forced_photometry_form.html' + + def get_target_id(self): + """ + Parses the target id from the query parameters. + """ + if self.request.method == 'GET': + return self.request.GET.get('target_id') + elif self.request.method == 'POST': + return self.request.POST.get('target_id') + + def get_target(self): + """ + Gets the target for observing from the database + + :returns: target for observing + :rtype: Target + """ + return Target.objects.get(pk=self.get_target_id()) + + def get_service(self): + """ + Gets the forced photometry service that you want to query + """ + return self.kwargs['service'] + + def get_service_class(self): + """ + Gets the forced photometry service class + """ + return get_service_class(self.get_service()) + + def get_form_class(self): + """ + Gets the forced photometry service form class + """ + return self.get_service_class()().get_form() + + def get_context_data(self, **kwargs: Any) -> dict[str, Any]: + """ + Adds the target to the context object. + """ + context = super().get_context_data(**kwargs) + context['target'] = self.get_target() + context['query_form'] = self.get_form_class()(initial=self.get_initial()) + return context + + def get_initial(self): + """ + Populates the form with initial data including service name and target id + """ + initial = super().get_initial() + if not self.get_target_id(): + raise Exception('Must provide target_id') + initial['target_id'] = self.get_target_id() + initial['service'] = self.get_service() + initial.update(self.request.GET.dict()) + return initial + + def post(self, request, *args, **kwargs): + form = self.get_form() + if form.is_valid(): + service = self.get_service_class()() + try: + service.query_service(form.cleaned_data) + except ForcedPhotometryServiceException as e: + form.add_error(f"Problem querying forced photometry service: {repr(e)}") + return self.form_invalid(form) + messages.info(self.request, service.get_success_message()) + return redirect( + reverse('tom_targets:detail', kwargs={'pk': self.get_target_id()}) + ) + else: + return self.form_invalid(form) + + class DataProductUploadView(LoginRequiredMixin, FormView): """ View that handles manual upload of DataProducts. Requires authentication. diff --git a/tom_targets/templates/tom_targets/target_detail.html b/tom_targets/templates/tom_targets/target_detail.html index b58bc942f..5fdac4eac 100644 --- a/tom_targets/templates/tom_targets/target_detail.html +++ b/tom_targets/templates/tom_targets/target_detail.html @@ -85,6 +85,11 @@

Observations

{% observation_list object %}
+ {% if user.is_authenticated %} + {% query_forced_photometry object %} +
+
+ {% endif %} {% if user.is_authenticated %} {% upload_dataproduct object %} {% endif %} From 9a1fde4f08cdc42867abd2851c049e370b2582fe Mon Sep 17 00:00:00 2001 From: Jon Date: Fri, 6 Oct 2023 07:58:23 +0000 Subject: [PATCH 2/5] fix some flake8 issues --- tom_dataproducts/forced_photometry/atlas.py | 59 ++++++++++++------- .../forced_photometry_service.py | 12 ++-- .../processors/atlas_processor.py | 2 +- tom_dataproducts/tasks.py | 11 ++-- .../templatetags/dataproduct_extras.py | 8 ++- tom_dataproducts/urls.py | 3 +- tom_dataproducts/views.py | 6 +- 7 files changed, 60 insertions(+), 41 deletions(-) diff --git a/tom_dataproducts/forced_photometry/atlas.py b/tom_dataproducts/forced_photometry/atlas.py index b0b3e6676..5bb327f73 100644 --- a/tom_dataproducts/forced_photometry/atlas.py +++ b/tom_dataproducts/forced_photometry/atlas.py @@ -1,21 +1,28 @@ from django import forms from django.conf import settings -from django.utils import timezone -from django.core.files.base import ContentFile from crispy_forms.layout import Div, HTML from astropy.time import Time -from tom_dataproducts.forced_photometry.forced_photometry_service import BaseForcedPhotometryQueryForm, BaseForcedPhotometryService, ForcedPhotometryServiceException -from tom_dataproducts.models import ReducedDatum, DataProduct -from tom_dataproducts.data_processor import run_data_processor -from tom_dataproducts.exceptions import InvalidFileFormatException +import tom_dataproducts.forced_photometry.forced_photometry_service as fps from tom_dataproducts.tasks import atlas_query from tom_targets.models import Target -class AtlasForcedPhotometryQueryForm(BaseForcedPhotometryQueryForm): - min_date = forms.CharField(label='Min date:', required=False, widget=forms.TextInput(attrs={'class': 'ml-2', 'type': 'datetime-local'})) - max_date = forms.CharField(label='Max date:', required=False, widget=forms.TextInput(attrs={'class': 'ml-2', 'type': 'datetime-local'})) - min_date_mjd = forms.FloatField(label='Min date (mjd):', required=False, widget=forms.NumberInput(attrs={'class': 'ml-2'})) - max_date_mjd = forms.FloatField(label='Max date (mjd):', required=False, widget=forms.NumberInput(attrs={'class': 'ml-2'})) +class AtlasForcedPhotometryQueryForm(fps.BaseForcedPhotometryQueryForm): + min_date = forms.CharField( + label='Min date:', required=False, + widget=forms.TextInput(attrs={'class': 'ml-2', 'type': 'datetime-local'}) + ) + max_date = forms.CharField( + label='Max date:', required=False, + widget=forms.TextInput(attrs={'class': 'ml-2', 'type': 'datetime-local'}) + ) + min_date_mjd = forms.FloatField( + label='Min date (mjd):', required=False, + widget=forms.NumberInput(attrs={'class': 'ml-2'}) + ) + max_date_mjd = forms.FloatField( + label='Max date (mjd):', required=False, + widget=forms.NumberInput(attrs={'class': 'ml-2'}) + ) def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) @@ -65,12 +72,14 @@ def clean(self): return cleaned_data -class AtlasForcedPhotometryService(BaseForcedPhotometryService): +class AtlasForcedPhotometryService(fps.BaseForcedPhotometryService): name = 'Atlas' def __init__(self): super().__init__ - self.success_message = 'Asynchronous Atlas query is processing. Refresh the page once complete it will show up as a dataproduct in the "Manage Data" tab.' + self.success_message = ('Asynchronous Atlas query is processing. ' + 'Refresh the page once complete it will show ' + 'up as a dataproduct in the "Manage Data" tab.') def get_form(self): """ @@ -91,21 +100,31 @@ def query_service(self, query_parameters): if not max_date_mjd and query_parameters.get('max_date'): max_date_mjd = Time(query_parameters.get('max_date')).mjd if not Target.objects.filter(pk=query_parameters.get('target_id')).exists(): - raise ForcedPhotometryServiceException(f"Target {query_parameters.get('target_id')} does not exist") + raise fps.ForcedPhotometryServiceException(f"Target {query_parameters.get('target_id')} does not exist") if 'atlas' not in settings.FORCED_PHOTOMETRY_SERVICES: - raise ForcedPhotometryServiceException("Must specify 'atlas' settings in FORCED_PHOTOMETRY_SERVICES") + raise fps.ForcedPhotometryServiceException("Must specify 'atlas' settings in FORCED_PHOTOMETRY_SERVICES") if not settings.FORCED_PHOTOMETRY_SERVICES.get('atlas', {}).get('url'): - raise ForcedPhotometryServiceException("Must specify a 'url' under atlas settings in FORCED_PHOTOMETRY_SERVICES") + raise fps.ForcedPhotometryServiceException( + "Must specify a 'url' under atlas settings in FORCED_PHOTOMETRY_SERVICES" + ) if not settings.FORCED_PHOTOMETRY_SERVICES.get('atlas', {}).get('api_key'): - raise ForcedPhotometryServiceException("Must specify an 'api_key' under atlas settings in FORCED_PHOTOMETRY_SERVICES") + raise fps.ForcedPhotometryServiceException( + "Must specify an 'api_key' under atlas settings in FORCED_PHOTOMETRY_SERVICES" + ) if 'django_dramatiq' in settings.INSTALLED_APPS: - atlas_query.send(min_date_mjd, max_date_mjd, query_parameters.get('target_id'), self.get_data_product_type()) + atlas_query.send(min_date_mjd, max_date_mjd, + query_parameters.get('target_id'), + self.get_data_product_type()) else: - query_succeeded = atlas_query(min_date_mjd, max_date_mjd, query_parameters.get('target_id'), self.get_data_product_type()) + query_succeeded = atlas_query(min_date_mjd, max_date_mjd, + query_parameters.get('target_id'), + self.get_data_product_type()) if not query_succeeded: - raise ForcedPhotometryServiceException("Atlas query failed, check the server logs for more information") + raise fps.ForcedPhotometryServiceException( + "Atlas query failed, check the server logs for more information" + ) self.success_message = "Atlas query completed. View its data product in the 'Manage Data' tab" return True diff --git a/tom_dataproducts/forced_photometry/forced_photometry_service.py b/tom_dataproducts/forced_photometry/forced_photometry_service.py index 8ab2b2a9a..806345d90 100644 --- a/tom_dataproducts/forced_photometry/forced_photometry_service.py +++ b/tom_dataproducts/forced_photometry/forced_photometry_service.py @@ -1,18 +1,12 @@ from abc import ABC, abstractmethod -import copy import logging -import requests from crispy_forms.helper import FormHelper -from crispy_forms.layout import ButtonHolder, Layout, Submit, Div, HTML +from crispy_forms.layout import ButtonHolder, Layout, Submit from django import forms from django.conf import settings -from django.contrib.auth.models import Group -from django.core.files.base import ContentFile from django.utils.module_loading import import_string -from tom_targets.models import Target - logger = logging.getLogger(__name__) @@ -37,7 +31,9 @@ def get_service_class(name): try: return available_classes[name] except KeyError: - raise ImportError(f'Could not a find a forced photometry service with the name {name}. Did you add it to TOM_FORCED_PHOTOMETRY_CLASSES?') + raise ImportError(( + f'Could not a find a forced photometry service with the name {name}. ' + 'Did you add it to TOM_FORCED_PHOTOMETRY_CLASSES?')) class ForcedPhotometryServiceException(Exception): diff --git a/tom_dataproducts/processors/atlas_processor.py b/tom_dataproducts/processors/atlas_processor.py index 9e90ed1c9..9d452c932 100644 --- a/tom_dataproducts/processors/atlas_processor.py +++ b/tom_dataproducts/processors/atlas_processor.py @@ -39,7 +39,7 @@ def _process_photometry_from_plaintext(self, data_product): text file, as produced by the ATLAS forced photometry service at https://fallingstar-data.com/forcedphot The header looks like this: - ###MJD m dm uJy duJy F err chi/N RA Dec x y maj min phi apfit mag5sig Sky Obs + ###MJD m dm uJy duJy F err chi/N RA Dec x y maj min phi apfit mag5sig Sky Obs :param data_product: ATLAS Photometric DataProduct which will be processed into a list of dicts :type data_product: DataProduct diff --git a/tom_dataproducts/tasks.py b/tom_dataproducts/tasks.py index d29ac0b3b..f6ce1b2d9 100644 --- a/tom_dataproducts/tasks.py +++ b/tom_dataproducts/tasks.py @@ -23,12 +23,13 @@ def atlas_query(min_date_mjd, max_date_mjd, target_id, data_product_type): print("Calling atlas query!") target = Target.objects.get(pk=target_id) - headers = {"Authorization": f"Token {settings.FORCED_PHOTOMETRY_SERVICES.get('atlas', {}).get('api_key')}", "Accept": "application/json"} + headers = {"Authorization": f"Token {settings.FORCED_PHOTOMETRY_SERVICES.get('atlas', {}).get('api_key')}", + "Accept": "application/json"} base_url = settings.FORCED_PHOTOMETRY_SERVICES.get('atlas', {}).get('url') task_url = None while not task_url: with requests.Session() as s: - task_data = {"ra":target.ra, "dec": target.dec, "mjd_min": min_date_mjd, "send_email": False} + task_data = {"ra": target.ra, "dec": target.dec, "mjd_min": min_date_mjd, "send_email": False} if max_date_mjd: task_data['mjd_max'] = max_date_mjd resp = s.post( @@ -81,11 +82,11 @@ def atlas_query(min_date_mjd, max_date_mjd, target_id, data_product_type): dp_name = f"atlas_{Time(min_date_mjd, format='mjd').strftime('%Y_%m_%d')}" if max_date_mjd: dp_name += f"-{Time(max_date_mjd, format='mjd').strftime('%Y_%m_%d')}" - dp_name += f"_{urlparse(result_url)[2].rpartition('/')[2]}" + dp_name += f"_{urlparse(result_url)[2].rpartition('/')[2]}" file = ContentFile(results.content, name=dp_name) dp = DataProduct.objects.create( - product_id = dp_name, + product_id=dp_name, target=target, data=file, data_product_type=data_product_type, @@ -99,4 +100,4 @@ def atlas_query(min_date_mjd, max_date_mjd, target_id, data_product_type): logger.error(f"Error processing returned Atlas data into ReducedDatums: {repr(e)}") return False - return True \ No newline at end of file + return True diff --git a/tom_dataproducts/templatetags/dataproduct_extras.py b/tom_dataproducts/templatetags/dataproduct_extras.py index d57ce06d6..107670f5b 100644 --- a/tom_dataproducts/templatetags/dataproduct_extras.py +++ b/tom_dataproducts/templatetags/dataproduct_extras.py @@ -95,12 +95,14 @@ def dataproduct_list_all(context): 'products': products, } + @register.inclusion_tag('tom_dataproducts/partials/query_forced_photometry.html') def query_forced_photometry(target): services = get_service_classes().keys() - return {'forced_photometry_services': services, - 'target': target - } + return { + 'forced_photometry_services': services, + 'target': target + } @register.inclusion_tag('tom_dataproducts/partials/upload_dataproduct.html', takes_context=True) diff --git a/tom_dataproducts/urls.py b/tom_dataproducts/urls.py index b47124344..1f9b7ae73 100644 --- a/tom_dataproducts/urls.py +++ b/tom_dataproducts/urls.py @@ -23,7 +23,8 @@ path('data/group//delete/', DataProductGroupDeleteView.as_view(), name='group-delete'), path('data/upload/', DataProductUploadView.as_view(), name='upload'), path('data/reduced/update/', UpdateReducedDataView.as_view(), name='update-reduced-data'), - path('data/forced_photometry//query/', ForcedPhotometryQueryView.as_view(), name='forced-photometry-query'), + path('data/forced_photometry//query/', ForcedPhotometryQueryView.as_view(), + name='forced-photometry-query'), path('data//delete/', DataProductDeleteView.as_view(), name='delete'), path('data//feature/', DataProductFeatureView.as_view(), name='feature'), path('data//share/', DataShareView.as_view(), name='share'), diff --git a/tom_dataproducts/views.py b/tom_dataproducts/views.py index 85ce0a2b5..0423103a8 100644 --- a/tom_dataproducts/views.py +++ b/tom_dataproducts/views.py @@ -37,7 +37,7 @@ from tom_observations.models import ObservationRecord from tom_observations.facility import get_service_class from tom_dataproducts.serializers import DataProductSerializer -from tom_dataproducts.forced_photometry.forced_photometry_service import ForcedPhotometryServiceException, get_service_class +import tom_dataproducts.forced_photometry.forced_photometry_service as fps import requests @@ -122,7 +122,7 @@ def get_service_class(self): """ Gets the forced photometry service class """ - return get_service_class(self.get_service()) + return fps.get_service_class(self.get_service()) def get_form_class(self): """ @@ -157,7 +157,7 @@ def post(self, request, *args, **kwargs): service = self.get_service_class()() try: service.query_service(form.cleaned_data) - except ForcedPhotometryServiceException as e: + except fps.ForcedPhotometryServiceException as e: form.add_error(f"Problem querying forced photometry service: {repr(e)}") return self.form_invalid(form) messages.info(self.request, service.get_success_message()) From 77427bfd8d42a4629dfed70d9fe2085384c674f5 Mon Sep 17 00:00:00 2001 From: Jon Date: Fri, 6 Oct 2023 17:00:28 +0000 Subject: [PATCH 3/5] Added documentation for forced photometry services --- docs/managing_data/forced_photometry.rst | 128 ++++++++++++++++++++ docs/managing_data/index.rst | 4 +- tom_dataproducts/forced_photometry/atlas.py | 1 + 3 files changed, 132 insertions(+), 1 deletion(-) create mode 100644 docs/managing_data/forced_photometry.rst diff --git a/docs/managing_data/forced_photometry.rst b/docs/managing_data/forced_photometry.rst new file mode 100644 index 000000000..a9e9f7665 --- /dev/null +++ b/docs/managing_data/forced_photometry.rst @@ -0,0 +1,128 @@ +Integrating Forced Photometry Service Queries +--------------------------------------- + +The base TOM Toolkit comes with Atlas, panSTARRS, and ZTF query services. More services +can be added by extending the base ForcedPhotometryService implementation. + + +Integrating existing Forced Photometry Services +############################################### + +You must add certain configuration to your TOM's ``settings.py`` to setup the existing forced +photometry services. This configuration will go in the ``FORCED_PHOTOMETRY_SERVICES`` section +shown below: + +.. code:: python + FORCED_PHOTOMETRY_SERVICES = { + 'atlas': { + 'class': 'tom_dataproducts.forced_photometry.atlas.AtlasForcedPhotometryService', + 'url': "https://fallingstar-data.com/forcedphot", + 'api_key': os.getenv('ATLAS_FORCED_PHOTOMETRY_API_KEY', 'your atlas account api token') + }, + 'panstarrs': { + #TODO + }, + 'ztf': { + #TODO + } + } + + DATA_PRODUCT_TYPES = { + ... + 'atlas_photometry': ('atlas_photometry', 'Atlas Photometry'), + ... + } + + DATA_PROCESSORS = { + ... + 'atlas_photometry': 'tom_dataproducts.processors.atlas_processor.AtlasProcessor', + ... + } + + +Configuring your TOM to serve tasks asynchronously: +*************************************************** + +Several of the services are best suited to be queried asynchronously, especially if you plan to make large +queries that would take a long time. The TOM Toolkit is setup to use `dramatiq `_ +as an asynchronous task manager, but doing so requires you to run either a `redis `_ +or `rabbitmq `_ server to act as the task queue. To use dramatiq with +a redis server, you would add the following to your ``settings.py``: + +.. code:: python + INSTALLED_APPS = [ + ... + 'django_dramatiq', + ... + ] + + DRAMATIQ_BROKER = { + "BROKER": "dramatiq.brokers.redis.RedisBroker", + "OPTIONS": { + "url": "redis://your-redis-service-url:your-redis-port" + }, + "MIDDLEWARE": [ + "dramatiq.middleware.AgeLimit", + "dramatiq.middleware.TimeLimit", + "dramatiq.middleware.Callbacks", + "dramatiq.middleware.Retries", + "django_dramatiq.middleware.DbConnectionsMiddleware", + ] + } + +After adding the ``django_dramatiq`` installed app, you will need to run ``./manage.py migrate`` once to setup +its DB tables. If this configuration is set in your TOM, the existing services which support asynchronous queries, +Atlas and ZTF, should start querying asynchronously. If you do not add these settings, those services will still +function but will fall back to synchronous queries. + + +Adding a new Forced Photometry Service +###################################### + +The Forced Photometry services fulfill an interface defined in +`BaseForcedPhotometryService `_. +To implement your own Forced Photometry service, you need to do 3 things: +1. Subclass BaseForcedPhotometryService +2. Subclass BaseForcedPhotometryQueryForm +3. Subclass DataProcessor +Once those are implemented, don't forget to update your settings for ``FORCED_PHOTOMETRY_SERVICES``, +``DATA_PRODUCT_TYPES``, and ``DATA_PROCESSORS`` for your new service and its associated data product type. + + +Subclass BaseForcedPhotometryService: +************************************* + +The most important method here is the ``query_service`` method which is where you put your service's business logic +for making the query, given the form parameters and target. This method is expected to create a DataProduct in the database +at the end of the query, storing the result file or files. If queries to your service are expected to take a long time and +you would like to make them asynchronously (not blocking the UI while calling), then follow the example in the +`atlas implementation `_ and place your +actual asynchronous query method in your module's ``tasks.py`` file so it can be found by dramatiq. Like in the atlas implementation, +your code should check to see if ``django_dramatiq`` is in the settings ``INSTALLED_APPS`` before trying to enqueue it with dramatiq. + +The ``get_data_product_type`` method should return the name of your new data product type you are going to define a +DataProcessor for. This must match the name you add to ``DATA_PROCESSORS`` and ``DATA_PRODUCT_TYPES`` in your ``settings.py``. +You will also need to define a `DataProcessor ` +for this data type. + + +Subclass BaseForcedPhotometryQueryForm: +*************************************** + +This class defines the form users will need to fill out to query the service. It uses +`django-crispy-forms `_ to define the layout +programmatically. You first will add whatever form fields you need to the base of your +subclass, and then just fill in the ``layout()`` method with a django-crispy-forms layout +for your fields, and optionally the ``clean()`` method if you want to perform any field validation. +The values of the fields from this form will be available to you in your service class in the +``query_service`` method. + + +Subclass DataProcessor: +*********************** + +You must create a custom DataProcessor that knows how to convert data returned from your service into +a series of either photometry or spectroscopy datums. Without defining this step, your queries will still +result in a DataProduct file being stored from the service's ``query_service`` method, but those files will +not be parsed into photometry or spectroscopy datums. You can read more about how to implement a custom +DataProcessor `here <../customizing_data_processing>`_. \ No newline at end of file diff --git a/docs/managing_data/index.rst b/docs/managing_data/index.rst index 78979a9f5..5b4bcb46a 100644 --- a/docs/managing_data/index.rst +++ b/docs/managing_data/index.rst @@ -11,6 +11,7 @@ Managing Data customizing_data_processing tom_direct_sharing stream_pub_sub + forced_photometry :doc:`Creating Plots from TOM Data ` - Learn how to create plots using plot.ly and your TOM @@ -23,4 +24,5 @@ TOM from uploaded data products. :doc:`Publish and Subscribe to a Kafka Stream ` - Learn how to publish and subscribe to a Kafka stream topic. - +:doc:`Integrating Forced Photometry Service Queries ` - Learn how to integrate the existing Atlas, panSTARRS, and ZTF +forced photometry services into your TOM, and learn how to add new services. diff --git a/tom_dataproducts/forced_photometry/atlas.py b/tom_dataproducts/forced_photometry/atlas.py index 5bb327f73..062fcdce4 100644 --- a/tom_dataproducts/forced_photometry/atlas.py +++ b/tom_dataproducts/forced_photometry/atlas.py @@ -6,6 +6,7 @@ from tom_dataproducts.tasks import atlas_query from tom_targets.models import Target + class AtlasForcedPhotometryQueryForm(fps.BaseForcedPhotometryQueryForm): min_date = forms.CharField( label='Min date:', required=False, From 199cecc86e76e6fb0a42cbb9255f5d9849f7fcd1 Mon Sep 17 00:00:00 2001 From: Jon Date: Fri, 6 Oct 2023 17:19:33 +0000 Subject: [PATCH 4/5] Remove typing since thats breaking tests --- tom_dataproducts/views.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/tom_dataproducts/views.py b/tom_dataproducts/views.py index 0423103a8..0800908bb 100644 --- a/tom_dataproducts/views.py +++ b/tom_dataproducts/views.py @@ -1,7 +1,6 @@ from io import StringIO import logging import os -from typing import Any from urllib.parse import urlencode, urlparse from django.conf import settings @@ -130,11 +129,11 @@ def get_form_class(self): """ return self.get_service_class()().get_form() - def get_context_data(self, **kwargs: Any) -> dict[str, Any]: + def get_context_data(self, *args, **kwargs): """ Adds the target to the context object. """ - context = super().get_context_data(**kwargs) + context = super().get_context_data(*args, **kwargs) context['target'] = self.get_target() context['query_form'] = self.get_form_class()(initial=self.get_initial()) return context From d1b2b03945a1308f4701e1954b1bd55b3083ba1a Mon Sep 17 00:00:00 2001 From: Jon Date: Mon, 16 Oct 2023 20:44:02 +0000 Subject: [PATCH 5/5] Add newline at end of template --- .../templates/tom_dataproducts/forced_photometry_form.html | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tom_dataproducts/templates/tom_dataproducts/forced_photometry_form.html b/tom_dataproducts/templates/tom_dataproducts/forced_photometry_form.html index 212d926b7..bb78fd757 100644 --- a/tom_dataproducts/templates/tom_dataproducts/forced_photometry_form.html +++ b/tom_dataproducts/templates/tom_dataproducts/forced_photometry_form.html @@ -11,4 +11,4 @@

Query {{ form.service.value }} Forced Photometry Service

Target {{ target.name }} at RA {{ target.ra }}, DEC {{ target.dec }}


{% crispy query_form %} -{% endblock %} \ No newline at end of file +{% endblock %}