diff --git a/docs/managing_data/continuous_sharing.rst b/docs/managing_data/continuous_sharing.rst new file mode 100644 index 000000000..851d1a952 --- /dev/null +++ b/docs/managing_data/continuous_sharing.rst @@ -0,0 +1,76 @@ +Setting up Continuous Sharing +--------------------------------------- + +After setting up your TOM's `DATA_SHARING` destinations in your settings, you can set up individual Targets to share +their data automatically with a sharing destination as the data arrives in the TOM. Continuous Sharing is handled through +the `PersistentShare` model in the `tom_targets` module. + + +Permissions: +############################# + +In order to setup continuous sharing, your user account must have the proper permissions, which means permissions to +add, view, and delete `PersistentShare` objects. A superuser account will have all permissions by default, but to give +permissions to another user, you can use code like this one time in the console: + +.. code:: python + + from guardian.shortcuts import assign_perm + + # To assign the permission to a single user + user = User.objects.get(username='myusername') + assign_perm('tom_targets.view_persistentshare', user) + assign_perm('tom_targets.add_persistentshare', user) + assign_perm('tom_targets.delete_persistentshare', user) + + # To assign the permission to all users of a group + group = Group.objects.get(name='mygroupname') + assign_perm('tom_targets.view_persistentshare', group) + assign_perm('tom_targets.add_persistentshare', group) + assign_perm('tom_targets.delete_persistentshare', group) + + +The user must also have `change_target` permissions on the specific Target they are attempting to continuously share. + + +Managing Continuous Sharing: +************************************************* + +There are a few ways to manage continuous sharing. First, you can navigate to any Target's share page `/targets//share` +and you should see a tab for creating and viewing continuous sharing for that Target. You can also navigate to +`/targets/persistentshare/manage` to create and view all persistentshare objects you have permissions to see. There is also +a REST API for persistentshare objects that can be accessed at `/targets/persistentshare/`, which is used internally from the +manage pages. + +If you have a custom target details page, you can integrate the controls for creating or managing continuous sharing using the +template partials below: + +.. code:: html + +

Continously Share data for Target {{ target.name }}

+
+ {% create_persistent_share target %} +
+

Manage Continuous Sharing for Target {{ target.name }}

+
+ {% persistent_share_table target %} +
+ +Note that setting up Continuous Sharing stores the destination from your `DATA_SHARING` settings. If you later change or remove that +destination then continuous shares using it will fail. + +Also note that by default, continuous sharing will occur when a ReducedDatum is saved, or when the default `tom_base` `DataProcessor` is used +to load in a `DataProduct`. If you create your own `DataProcessor` subclass in your TOM, you must add the following lines to trigger continuous +sharing after you have bulk created the `ReducedDatums`: + +.. code:: python + + from tom_dataproducts.sharing import continuous_share_data + # After all your logic to bulk_create ReducedDatums + # Trigger any sharing you may have set to occur when new data comes in + # Encapsulate this in a try/catch so sharing failure doesn't prevent dataproduct ingestion + try: + continuous_share_data(dp.target, reduced_datums) + except Exception as e: + logger.warning(f"Failed to share new dataproduct {dp.product_id}: {repr(e)}") diff --git a/docs/managing_data/index.rst b/docs/managing_data/index.rst index 75f94ba31..ad3b4ebdf 100644 --- a/docs/managing_data/index.rst +++ b/docs/managing_data/index.rst @@ -11,6 +11,7 @@ Managing Data customizing_data_processing tom_direct_sharing stream_pub_sub + continuous_sharing single_target_data_service @@ -24,5 +25,7 @@ TOM from uploaded data products. :doc:`Publish and Subscribe to a Kafka Stream ` - Learn how to publish and subscribe to a Kafka stream topic. +:doc:`Setting up Continuous Sharing of a target's data to a TOM or Kafka stream ` - Learn how to set up continuous sharing of a Target's data products. + :doc:`Integrating Single-Target Data Service Queries ` - Learn how to integrate the existing Atlas, panSTARRS, and ZTF single-target data services into your TOM, and learn how to add new services. diff --git a/tom_dataproducts/alertstreams/hermes.py b/tom_dataproducts/alertstreams/hermes.py index c20b107f8..d3cd8b2c7 100644 --- a/tom_dataproducts/alertstreams/hermes.py +++ b/tom_dataproducts/alertstreams/hermes.py @@ -149,7 +149,7 @@ def get_hermes_spectroscopy(self, datum): def convert_astropy_brightness_to_hermes(brightness_unit): if not brightness_unit: return brightness_unit - elif brightness_unit.uppercase() == 'AB' or brightness_unit.uppercase() == 'ABFLUX': + elif brightness_unit.upper() == 'AB' or brightness_unit.upper() == 'ABFLUX': return 'AB mag' else: return brightness_unit @@ -158,11 +158,11 @@ def convert_astropy_brightness_to_hermes(brightness_unit): def convert_astropy_wavelength_to_hermes(wavelength_unit): if not wavelength_unit: return wavelength_unit - elif wavelength_unit.lowercase() == 'angstrom' or wavelength_unit == 'AA': + elif wavelength_unit.lower() == 'angstrom' or wavelength_unit == 'AA': return 'Å' - elif wavelength_unit.lowercase() == 'micron': + elif wavelength_unit.lower() == 'micron': return 'µm' - elif wavelength_unit.lowercase() == 'hertz': + elif wavelength_unit.lower() == 'hertz': return 'Hz' else: return wavelength_unit @@ -209,7 +209,8 @@ def publish_to_hermes(message_info, datums, targets=Target.objects.none(), **kwa response = requests.post(url=submit_url, json=alert, headers=headers) response.raise_for_status() # Only mark the datums as shared if the sharing was successful - hermes_alert = AlertStreamMessage(topic=message_info.topic, exchange_status='published') + hermes_alert = AlertStreamMessage( + topic=message_info.topic, message_id=response.json().get('uuid'), exchange_status='published') hermes_alert.save() for tomtoolkit_photometry in datums: tomtoolkit_photometry.message.add(hermes_alert) diff --git a/tom_dataproducts/data_processor.py b/tom_dataproducts/data_processor.py index 9438b54cf..b27a7781c 100644 --- a/tom_dataproducts/data_processor.py +++ b/tom_dataproducts/data_processor.py @@ -6,6 +6,7 @@ from importlib import import_module from tom_dataproducts.models import ReducedDatum +from tom_dataproducts.sharing import continuous_share_data logger = logging.getLogger(__name__) @@ -70,7 +71,14 @@ def run_data_processor(dp): # timestamp=datum[0], value=datum[1], source_name=datum[2]) for datum in data] # 3. Finally, insert the new ReducedDatum objects into the database - ReducedDatum.objects.bulk_create(new_reduced_datums) + reduced_datums = ReducedDatum.objects.bulk_create(new_reduced_datums) + + # 4. Trigger any sharing you may have set to occur when new data comes in + # Encapsulate this in a try/catch so sharing failure doesn't prevent dataproduct ingestion + try: + continuous_share_data(dp.target, reduced_datums) + except Exception as e: + logger.warning(f"Failed to share new dataproduct {dp.product_id}: {repr(e)}") # log what happened if skipped_data: diff --git a/tom_dataproducts/serializers.py b/tom_dataproducts/serializers.py index 7d6ba9993..7c1d92c0f 100644 --- a/tom_dataproducts/serializers.py +++ b/tom_dataproducts/serializers.py @@ -8,7 +8,7 @@ from tom_observations.models import ObservationRecord from tom_observations.serializers import ObservationRecordFilteredPrimaryKeyRelatedField from tom_targets.models import Target -from tom_targets.serializers import TargetFilteredPrimaryKeyRelatedField +from tom_targets.fields import TargetFilteredPrimaryKeyRelatedField class DataProductGroupSerializer(serializers.ModelSerializer): diff --git a/tom_dataproducts/sharing.py b/tom_dataproducts/sharing.py index b9be43fd6..ae815ef01 100644 --- a/tom_dataproducts/sharing.py +++ b/tom_dataproducts/sharing.py @@ -11,12 +11,40 @@ from django.http import StreamingHttpResponse from django.utils.text import slugify -from tom_targets.models import Target +from tom_targets.models import Target, PersistentShare from tom_dataproducts.models import DataProduct, ReducedDatum from tom_dataproducts.alertstreams.hermes import publish_to_hermes, BuildHermesMessage, get_hermes_topics from tom_dataproducts.serializers import DataProductSerializer, ReducedDatumSerializer +def continuous_share_data(target, reduced_datums): + """ + Triggered when new ReducedDatums are created. + Shares those ReducedDatums to the sharing destination of any PersistentShares on the target. + :param target: Target instance that these reduced_datums belong to + :param reduced_datums: list of ReducedDatum instances to share + """ + persistentshares = PersistentShare.objects.filter(target=target) + for persistentshare in persistentshares: + share_destination = persistentshare.destination + reduced_datum_pks = [rd.pk for rd in reduced_datums] + if 'HERMES' in share_destination.upper(): + hermes_topic = share_destination.split(':')[1] + destination = share_destination.split(':')[0] + filtered_reduced_datums = check_for_share_safe_datums( + destination, ReducedDatum.objects.filter(pk__in=reduced_datum_pks), topic=hermes_topic) + sharing = getattr(settings, "DATA_SHARING", {}) + message = BuildHermesMessage(title=f"Updated data for {target.name} from " + f"{getattr(settings, 'TOM_NAME', 'TOM Toolkit')}.", + authors=sharing.get('hermes', {}).get('DEFAULT_AUTHORS', None), + message=None, + topic=hermes_topic + ) + publish_to_hermes(message, filtered_reduced_datums) + else: + share_data_with_tom(share_destination, None, None, None, selected_data=reduced_datum_pks) + + def share_target_list_with_hermes(share_destination, form_data, selected_targets=None, include_all_data=False): """ Serialize and share a set of selected targets and their data with Hermes diff --git a/tom_targets/admin.py b/tom_targets/admin.py index ac5362edc..baf4b41a3 100644 --- a/tom_targets/admin.py +++ b/tom_targets/admin.py @@ -1,5 +1,6 @@ from django.contrib import admin -from .models import Target, TargetList, TargetExtra +from .models import Target, TargetList, TargetExtra, PersistentShare +from .forms import AdminPersistentShareForm class TargetExtraInline(admin.TabularInline): @@ -17,6 +18,17 @@ class TargetListAdmin(admin.ModelAdmin): model = TargetList +class PersistentShareAdmin(admin.ModelAdmin): + model = PersistentShare + form = AdminPersistentShareForm + raw_id_fields = ( + 'target', + 'user' + ) + + admin.site.register(Target, TargetAdmin) admin.site.register(TargetList, TargetListAdmin) + +admin.site.register(PersistentShare, PersistentShareAdmin) diff --git a/tom_targets/apps.py b/tom_targets/apps.py index b858f7340..0b79ef450 100644 --- a/tom_targets/apps.py +++ b/tom_targets/apps.py @@ -3,3 +3,7 @@ class TomTargetsConfig(AppConfig): name = 'tom_targets' + + def ready(self): + import tom_targets.signals.handlers # noqa + super().ready() diff --git a/tom_targets/fields.py b/tom_targets/fields.py new file mode 100644 index 000000000..2f2b674b0 --- /dev/null +++ b/tom_targets/fields.py @@ -0,0 +1,14 @@ +from guardian.shortcuts import get_objects_for_user +from rest_framework import serializers + + +class TargetFilteredPrimaryKeyRelatedField(serializers.PrimaryKeyRelatedField): + # This PrimaryKeyRelatedField subclass is used to implement get_queryset based on the permissions of the user + # submitting the request. The pattern was taken from this StackOverflow answer: https://stackoverflow.com/a/32683066 + + def get_queryset(self): + request = self.context.get('request', None) + queryset = super().get_queryset() + if not (request and queryset): + return None + return get_objects_for_user(request.user, 'tom_targets.change_target') diff --git a/tom_targets/forms.py b/tom_targets/forms.py index 559d87457..0f6f22839 100644 --- a/tom_targets/forms.py +++ b/tom_targets/forms.py @@ -7,7 +7,7 @@ from guardian.shortcuts import assign_perm, get_groups_with_perms, remove_perm from tom_dataproducts.sharing import get_sharing_destination_options -from .models import Target, TargetExtra, TargetName, TargetList +from .models import Target, TargetExtra, TargetName, TargetList, PersistentShare from tom_targets.base_models import (SIDEREAL_FIELDS, NON_SIDEREAL_FIELDS, REQUIRED_SIDEREAL_FIELDS, REQUIRED_NON_SIDEREAL_FIELDS, REQUIRED_NON_SIDEREAL_FIELDS_PER_SCHEME, IGNORE_FIELDS) @@ -241,3 +241,29 @@ class TargetMergeForm(forms.Form): 'hx-target': '#id_target_merge_fields', # replace name_select element }) ) + + +class AdminPersistentShareForm(forms.ModelForm): + destination = forms.ChoiceField(choices=[], label='Share Destination', required=True) + + class Meta: + model = PersistentShare + fields = '__all__' + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.fields['destination'].choices = get_sharing_destination_options() + + +class PersistentShareForm(AdminPersistentShareForm): + target = forms.IntegerField(label='Target ID', initial=0, required=True) + + def __init__(self, *args, **kwargs): + try: + self.target_id = kwargs.pop('target_id') + except KeyError: + self.target_id = None + super().__init__(*args, **kwargs) + if self.target_id: + self.fields['target'].initial = self.target_id + self.fields['target'].disabled = True diff --git a/tom_targets/migrations/0022_persistentshare.py b/tom_targets/migrations/0022_persistentshare.py new file mode 100644 index 000000000..98ee3cecc --- /dev/null +++ b/tom_targets/migrations/0022_persistentshare.py @@ -0,0 +1,30 @@ +# Generated by Django 4.2.13 on 2024-11-22 22:29 + +from django.conf import settings +from django.db import migrations, models +import django.db.models.deletion + + +class Migration(migrations.Migration): + + dependencies = [ + migrations.swappable_dependency(settings.AUTH_USER_MODEL), + ('tom_targets', '0021_rename_target_basetarget_alter_basetarget_options'), + ] + + operations = [ + migrations.CreateModel( + name='PersistentShare', + fields=[ + ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('destination', models.CharField(help_text='The sharing destination, as it appears in your DATA_SHARING settings dict', max_length=200)), + ('created', models.DateTimeField(auto_now_add=True, help_text='The time which this PersistentShare was created in the TOM database.')), + ('target', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='tom_targets.basetarget')), + ('user', models.ForeignKey(null=True, on_delete=django.db.models.deletion.SET_NULL, to=settings.AUTH_USER_MODEL)), + ], + options={ + 'ordering': ('-created',), + 'unique_together': {('target', 'destination')}, + }, + ), + ] diff --git a/tom_targets/models.py b/tom_targets/models.py index 49fa4404e..78191a967 100644 --- a/tom_targets/models.py +++ b/tom_targets/models.py @@ -3,6 +3,7 @@ import logging from django.conf import settings +from django.contrib.auth.models import User from django.core.exceptions import ValidationError from django.db import models from django.utils.module_loading import import_string @@ -193,3 +194,33 @@ class Meta: def __str__(self): return self.name + + +class PersistentShare(models.Model): + """ + Class representing a persistent share setup between a sharing destination and a Target + + :param target: The ``Target`` you want to share + + :param user: The ``User`` that created this PersistentShare, for accountability purposes. + + :param destination: The sharing destination, as it appears in your TOM's DATA_SHARING settings dict + :type destination: str + + :param created: The time at which this PersistentShare was created + :type created: datetime + """ + target = models.ForeignKey(BaseTarget, on_delete=models.CASCADE) + user = models.ForeignKey(User, null=True, on_delete=models.SET_NULL) + destination = models.CharField( + max_length=200, help_text='The sharing destination, as it appears in your DATA_SHARING settings dict') + created = models.DateTimeField( + auto_now_add=True, help_text='The time which this PersistentShare was created in the TOM database.' + ) + + class Meta: + ordering = ('-created',) + unique_together = ['target', 'destination'] + + def __str__(self): + return f'{self.target}-{self.destination}' diff --git a/tom_targets/serializers.py b/tom_targets/serializers.py index a589a7bc9..c6dc4cf43 100644 --- a/tom_targets/serializers.py +++ b/tom_targets/serializers.py @@ -1,10 +1,12 @@ from django.contrib.auth.models import Group -from guardian.shortcuts import assign_perm, get_groups_with_perms, get_objects_for_user +from guardian.shortcuts import assign_perm, get_groups_with_perms from rest_framework import serializers from tom_common.serializers import GroupSerializer -from tom_targets.models import Target, TargetExtra, TargetName, TargetList +from tom_targets.models import Target, TargetExtra, TargetName, TargetList, PersistentShare from tom_targets.validators import RequiredFieldsTogetherValidator +from tom_targets.fields import TargetFilteredPrimaryKeyRelatedField +from tom_dataproducts.sharing import get_sharing_destination_options class TargetNameSerializer(serializers.ModelSerializer): @@ -181,13 +183,10 @@ def update(self, instance, validated_data): return instance -class TargetFilteredPrimaryKeyRelatedField(serializers.PrimaryKeyRelatedField): - # This PrimaryKeyRelatedField subclass is used to implement get_queryset based on the permissions of the user - # submitting the request. The pattern was taken from this StackOverflow answer: https://stackoverflow.com/a/32683066 +class PersistentShareSerializer(serializers.ModelSerializer): + destination = serializers.ChoiceField(choices=get_sharing_destination_options(), required=True) + target = TargetFilteredPrimaryKeyRelatedField(queryset=Target.objects.all(), required=True) - def get_queryset(self): - request = self.context.get('request', None) - queryset = super().get_queryset() - if not (request and queryset): - return None - return get_objects_for_user(request.user, 'tom_targets.change_target') + class Meta: + model = PersistentShare + fields = ('id', 'target', 'destination', 'user', 'created') diff --git a/tom_targets/signals/__init__.py b/tom_targets/signals/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tom_targets/signals/handlers.py b/tom_targets/signals/handlers.py new file mode 100644 index 000000000..659305c17 --- /dev/null +++ b/tom_targets/signals/handlers.py @@ -0,0 +1,13 @@ +from django.dispatch import receiver +from django.db.models.signals import post_save + +from tom_dataproducts.models import ReducedDatum +from tom_dataproducts.sharing import continuous_share_data + + +@receiver(post_save, sender=ReducedDatum) +def cb_reduceddatum_post_save(sender, instance, *args, **kwargs): + # When a new ReducedDatum is created or updated, check for any persistentshare instances on that target + # and if they exist, attempt to share the new data + target = instance.target + continuous_share_data(target, reduced_datums=[instance]) diff --git a/tom_targets/templates/tom_targets/partials/create_persistent_share.html b/tom_targets/templates/tom_targets/partials/create_persistent_share.html new file mode 100644 index 000000000..215a15852 --- /dev/null +++ b/tom_targets/templates/tom_targets/partials/create_persistent_share.html @@ -0,0 +1,83 @@ +{% load bootstrap4 targets_extras static %} +{% if form %} +
+ {% csrf_token %} +
+
+ {% bootstrap_field form.destination %} +
+
+ {% bootstrap_field form.target %} +
+
+ {% if target %} + + {% else %} + + {% endif %} +
+
+
+ +
+ {% buttons %} + {% endbuttons %} +
+{% else %} +
+

You do not have permission to Continuously Share data. Please contact the TOM administrator.

+
+{% endif %} + diff --git a/tom_targets/templates/tom_targets/partials/persistent_share_table.html b/tom_targets/templates/tom_targets/partials/persistent_share_table.html new file mode 100644 index 000000000..d9e3da278 --- /dev/null +++ b/tom_targets/templates/tom_targets/partials/persistent_share_table.html @@ -0,0 +1,58 @@ + + + + + + + {% if can_delete %} + + {% endif %} + + + + {% for persistentshare in persistentshares %} + + + + + {% if can_delete %} + + {% endif %} + {% endfor %} + +
TargetShare DestinationCreatorDelete?
+ {{ persistentshare.target.names|join:", " }} + {{ persistentshare.destination }}{{ persistentshare.user.username }} + {% if target %} + + {% else %} + + {% endif %} +
+ diff --git a/tom_targets/templates/tom_targets/target_manage_persistent_shares.html b/tom_targets/templates/tom_targets/target_manage_persistent_shares.html new file mode 100644 index 000000000..c0adc58f2 --- /dev/null +++ b/tom_targets/templates/tom_targets/target_manage_persistent_shares.html @@ -0,0 +1,20 @@ +{% extends 'tom_common/base.html' %} +{% load bootstrap4 targets_extras static %} +{% block title %}Persistent Share Form{% endblock %} +{% block additional_css %} + + + +{% endblock %} +{% block content %} +

Continuously Share data {% if target %} for Target {{ target.name }}{% endif %}

+
+ {% create_persistent_share target %} +
+

Manage Continous Sharing {% if target %} for Target {{ target.name }}{% endif %}

+
+ {% persistent_share_table target %} +
+{% buttons %} +{% endbuttons %} +{% endblock %} diff --git a/tom_targets/templates/tom_targets/target_share.html b/tom_targets/templates/tom_targets/target_share.html index 1262eb892..95cf43d06 100644 --- a/tom_targets/templates/tom_targets/target_share.html +++ b/tom_targets/templates/tom_targets/target_share.html @@ -1,5 +1,5 @@ {% extends 'tom_common/base.html' %} -{% load bootstrap4 dataproduct_extras static %} +{% load bootstrap4 targets_extras dataproduct_extras static %} {% block title %}Share Target {{ target.name }}{% endblock %} {% block additional_css %} @@ -8,39 +8,72 @@ {% endblock %} {% block content %}

Share {{ target.name }}

-
-
-
+ +
+
+
+ +
+
{% bootstrap_field form.share_destination %} -
-
+
+
{% bootstrap_field form.share_title %} -
-
- -
- {% if hermes_sharing %} -
+
+
+ +
+ {% if hermes_sharing %} +
or +
+
+ +
+ {% endif %}
-
- -
- {% endif %} -
-
-
+
+
{% bootstrap_field form.share_message %} +
-
-

Include Data

- {% with target_share=True %} +

Include Data

+ {% with target_share=True %} {% get_photometry_data target target_share %} - {% endwith %} - {% buttons %} + {% endwith %} + {% buttons %} Back - {% endbuttons %} - + {% endbuttons %} + +
+
+
+
+

Continously Share data for Target {{ target.name }}

+
+ {% create_persistent_share target %} +
+

Manage Continuous Sharing for Target {{ target.name }}

+
+ {% persistent_share_table target %} +
+
+
+