Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/add forced photometry #696

Merged
merged 5 commits into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 128 additions & 0 deletions docs/managing_data/forced_photometry.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
Integrating Forced Photometry Service Queries
---------------------------------------

The base TOM Toolkit comes with Atlas, panSTARRS, and ZTF query services. More services
can be added by extending the base ForcedPhotometryService implementation.


Integrating existing Forced Photometry Services
###############################################

You must add certain configuration to your TOM's ``settings.py`` to setup the existing forced
photometry services. This configuration will go in the ``FORCED_PHOTOMETRY_SERVICES`` section
shown below:

.. code:: python
FORCED_PHOTOMETRY_SERVICES = {
'atlas': {
'class': 'tom_dataproducts.forced_photometry.atlas.AtlasForcedPhotometryService',
'url': "https://fallingstar-data.com/forcedphot",
'api_key': os.getenv('ATLAS_FORCED_PHOTOMETRY_API_KEY', 'your atlas account api token')
},
'panstarrs': {
#TODO
},
'ztf': {
#TODO
}
}

DATA_PRODUCT_TYPES = {
...
'atlas_photometry': ('atlas_photometry', 'Atlas Photometry'),
...
}

DATA_PROCESSORS = {
...
'atlas_photometry': 'tom_dataproducts.processors.atlas_processor.AtlasProcessor',
...
}


Configuring your TOM to serve tasks asynchronously:
***************************************************

Several of the services are best suited to be queried asynchronously, especially if you plan to make large
queries that would take a long time. The TOM Toolkit is setup to use `dramatiq <https://dramatiq.io/index.html>`_
as an asynchronous task manager, but doing so requires you to run either a `redis <https://github.com/redis/redis>`_
or `rabbitmq <https://github.com/rabbitmq/rabbitmq-server>`_ server to act as the task queue. To use dramatiq with
a redis server, you would add the following to your ``settings.py``:

.. code:: python
INSTALLED_APPS = [
...
'django_dramatiq',
...
]

DRAMATIQ_BROKER = {
"BROKER": "dramatiq.brokers.redis.RedisBroker",
"OPTIONS": {
"url": "redis://your-redis-service-url:your-redis-port"
},
"MIDDLEWARE": [
"dramatiq.middleware.AgeLimit",
"dramatiq.middleware.TimeLimit",
"dramatiq.middleware.Callbacks",
"dramatiq.middleware.Retries",
"django_dramatiq.middleware.DbConnectionsMiddleware",
]
}

After adding the ``django_dramatiq`` installed app, you will need to run ``./manage.py migrate`` once to setup
its DB tables. If this configuration is set in your TOM, the existing services which support asynchronous queries,
Atlas and ZTF, should start querying asynchronously. If you do not add these settings, those services will still
function but will fall back to synchronous queries.


Adding a new Forced Photometry Service
######################################

The Forced Photometry services fulfill an interface defined in
`BaseForcedPhotometryService <https://github.com/TOMToolkit/tom_base/blob/dev/tom_dataproducts/forced_photometry/forced_photometry_service.py>`_.
To implement your own Forced Photometry service, you need to do 3 things:
1. Subclass BaseForcedPhotometryService
2. Subclass BaseForcedPhotometryQueryForm
3. Subclass DataProcessor
Once those are implemented, don't forget to update your settings for ``FORCED_PHOTOMETRY_SERVICES``,
``DATA_PRODUCT_TYPES``, and ``DATA_PROCESSORS`` for your new service and its associated data product type.


Subclass BaseForcedPhotometryService:
*************************************

The most important method here is the ``query_service`` method which is where you put your service's business logic
for making the query, given the form parameters and target. This method is expected to create a DataProduct in the database
at the end of the query, storing the result file or files. If queries to your service are expected to take a long time and
you would like to make them asynchronously (not blocking the UI while calling), then follow the example in the
`atlas implementation <https://github.com/TOMToolkit/tom_base/blob/dev/tom_dataproducts/forced_photometry/atlas.py>`_ and place your
actual asynchronous query method in your module's ``tasks.py`` file so it can be found by dramatiq. Like in the atlas implementation,
your code should check to see if ``django_dramatiq`` is in the settings ``INSTALLED_APPS`` before trying to enqueue it with dramatiq.

The ``get_data_product_type`` method should return the name of your new data product type you are going to define a
DataProcessor for. This must match the name you add to ``DATA_PROCESSORS`` and ``DATA_PRODUCT_TYPES`` in your ``settings.py``.
You will also need to define a `DataProcessor <https://github.com/TOMToolkit/tom_base/blob/dev/tom_dataproducts/data_processor.py#L46>`
for this data type.


Subclass BaseForcedPhotometryQueryForm:
***************************************

This class defines the form users will need to fill out to query the service. It uses
`django-crispy-forms <https://django-crispy-forms.readthedocs.io/en/latest/>`_ to define the layout
programmatically. You first will add whatever form fields you need to the base of your
subclass, and then just fill in the ``layout()`` method with a django-crispy-forms layout
for your fields, and optionally the ``clean()`` method if you want to perform any field validation.
The values of the fields from this form will be available to you in your service class in the
``query_service`` method.


Subclass DataProcessor:
***********************

You must create a custom DataProcessor that knows how to convert data returned from your service into
a series of either photometry or spectroscopy datums. Without defining this step, your queries will still
result in a DataProduct file being stored from the service's ``query_service`` method, but those files will
not be parsed into photometry or spectroscopy datums. You can read more about how to implement a custom
DataProcessor `here <../customizing_data_processing>`_.
4 changes: 3 additions & 1 deletion docs/managing_data/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Managing Data
customizing_data_processing
tom_direct_sharing
stream_pub_sub
forced_photometry


:doc:`Creating Plots from TOM Data <plotting_data>` - Learn how to create plots using plot.ly and your TOM
Expand All @@ -23,4 +24,5 @@ TOM from uploaded data products.

:doc:`Publish and Subscribe to a Kafka Stream <stream_pub_sub>` - Learn how to publish and subscribe to a Kafka stream topic.


:doc:`Integrating Forced Photometry Service Queries <forced_photometry>` - Learn how to integrate the existing Atlas, panSTARRS, and ZTF
forced photometry services into your TOM, and learn how to add new services.
2 changes: 2 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@
'astroplan~=0.8',
'astropy>=5.0',
'beautifulsoup4~=4.9',
'dramatiq[redis, watch]<2.0.0',
'django>=3.1,<5', # TOM Toolkit requires db math functions
'djangorestframework~=3.12',
'django-bootstrap4>=3,<24',
'django-contrib-comments~=2.0', # Earlier version are incompatible with Django >= 3.0
'django-crispy-forms~=2.0',
'crispy-bootstrap4~=2022.0',
'django-dramatiq<1.0.0',
'django-extensions~=3.1',
'django-filter>=21,<24',
'django-gravatar2~=1.4',
Expand Down
10 changes: 9 additions & 1 deletion tom_dataproducts/data_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ def run_data_processor(dp):

data_processor = clazz()
data = data_processor.process_data(dp)
data_type = data_processor.data_type_override() or dp.data_product_type

reduced_datums = [ReducedDatum(target=dp.target, data_product=dp, data_type=dp.data_product_type,
reduced_datums = [ReducedDatum(target=dp.target, data_product=dp, data_type=data_type,
timestamp=datum[0], value=datum[1], source_name=datum[2]) for datum in data]
ReducedDatum.objects.bulk_create(reduced_datums)

Expand Down Expand Up @@ -65,3 +66,10 @@ def process_data(self, data_product):
:rtype: list of 2-tuples
"""
return []

def data_type_override(self):
jchate6 marked this conversation as resolved.
Show resolved Hide resolved
"""
Override for the ReducedDatum data type, if you want it to be different from the
DataProduct data_type.
"""
return ''
Empty file.
146 changes: 146 additions & 0 deletions tom_dataproducts/forced_photometry/atlas.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
from django import forms
from django.conf import settings
from crispy_forms.layout import Div, HTML
from astropy.time import Time
import tom_dataproducts.forced_photometry.forced_photometry_service as fps
from tom_dataproducts.tasks import atlas_query
from tom_targets.models import Target


class AtlasForcedPhotometryQueryForm(fps.BaseForcedPhotometryQueryForm):
min_date = forms.CharField(
label='Min date:', required=False,
widget=forms.TextInput(attrs={'class': 'ml-2', 'type': 'datetime-local'})
)
max_date = forms.CharField(
label='Max date:', required=False,
widget=forms.TextInput(attrs={'class': 'ml-2', 'type': 'datetime-local'})
)
min_date_mjd = forms.FloatField(
label='Min date (mjd):', required=False,
widget=forms.NumberInput(attrs={'class': 'ml-2'})
)
max_date_mjd = forms.FloatField(
label='Max date (mjd):', required=False,
widget=forms.NumberInput(attrs={'class': 'ml-2'})
)

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

def layout(self):
return Div(
Div(
Div(
'min_date',
css_class='col-md-4',
),
Div(
HTML('OR'),
css_class='col-md-1'
),
Div(
'min_date_mjd',
css_class='col-md-5'
),
css_class='form-row form-inline mb-2'
),
Div(
Div(
'max_date',
css_class='col-md-4',
),
Div(
HTML('OR'),
css_class='col-md-1'
),
Div(
'max_date_mjd',
css_class='col-md-5'
),
css_class='form-row form-inline mb-4'
),
)

def clean(self):
cleaned_data = super().clean()
if not (cleaned_data.get('min_date') or cleaned_data.get('min_date_mjd')):
raise forms.ValidationError("Must supply a minimum date in either datetime or mjd format")
if cleaned_data.get('min_date') and cleaned_data.get('min_date_mjd'):
raise forms.ValidationError("Please specify the minimum date in either datetime or mjd format")
if cleaned_data.get('max_date') and cleaned_data.get('max_date_mjd'):
raise forms.ValidationError("Please specify the maximum date in either datetime or mjd format")
return cleaned_data


class AtlasForcedPhotometryService(fps.BaseForcedPhotometryService):
name = 'Atlas'

def __init__(self):
super().__init__
self.success_message = ('Asynchronous Atlas query is processing. '
'Refresh the page once complete it will show '
'up as a dataproduct in the "Manage Data" tab.')

def get_form(self):
"""
This method returns the form for querying this service.
"""
return AtlasForcedPhotometryQueryForm

def query_service(self, query_parameters):
"""
This method takes in the serialized data from the query form and actually
submits the query to the service
"""
print(f"Querying Atlas service with params: {query_parameters}")
min_date_mjd = query_parameters.get('min_date_mjd')
if not min_date_mjd:
min_date_mjd = Time(query_parameters.get('min_date')).mjd
max_date_mjd = query_parameters.get('max_date_mjd')
if not max_date_mjd and query_parameters.get('max_date'):
max_date_mjd = Time(query_parameters.get('max_date')).mjd
if not Target.objects.filter(pk=query_parameters.get('target_id')).exists():
raise fps.ForcedPhotometryServiceException(f"Target {query_parameters.get('target_id')} does not exist")

if 'atlas' not in settings.FORCED_PHOTOMETRY_SERVICES:
raise fps.ForcedPhotometryServiceException("Must specify 'atlas' settings in FORCED_PHOTOMETRY_SERVICES")
if not settings.FORCED_PHOTOMETRY_SERVICES.get('atlas', {}).get('url'):
raise fps.ForcedPhotometryServiceException(
"Must specify a 'url' under atlas settings in FORCED_PHOTOMETRY_SERVICES"
)
if not settings.FORCED_PHOTOMETRY_SERVICES.get('atlas', {}).get('api_key'):
raise fps.ForcedPhotometryServiceException(
"Must specify an 'api_key' under atlas settings in FORCED_PHOTOMETRY_SERVICES"
)

if 'django_dramatiq' in settings.INSTALLED_APPS:
atlas_query.send(min_date_mjd, max_date_mjd,
query_parameters.get('target_id'),
self.get_data_product_type())
else:
query_succeeded = atlas_query(min_date_mjd, max_date_mjd,
query_parameters.get('target_id'),
self.get_data_product_type())
if not query_succeeded:
raise fps.ForcedPhotometryServiceException(
"Atlas query failed, check the server logs for more information"
)
self.success_message = "Atlas query completed. View its data product in the 'Manage Data' tab"

return True

def validate_form(self, query_parameters):
"""
Same thing as query_service, but a dry run. You can
skip this in different modules by just using "pass"

Typically called by the .is_valid() method.
"""
print(f"Validating Atlas service with params: {query_parameters}")

def get_success_message(self):
return self.success_message

def get_data_product_type(self):
return 'atlas_photometry'
Loading
Loading