From 7ff83bf7364c62294329ff0e8e91e51d9b9cd5fa Mon Sep 17 00:00:00 2001 From: Jon Date: Wed, 11 Dec 2024 07:14:05 -0100 Subject: [PATCH] Add triggering from data_processor because bulk_create doesn't trigger post_save. --- tom_dataproducts/alertstreams/hermes.py | 8 ++-- tom_dataproducts/data_processor.py | 10 +++- tom_dataproducts/sharing.py | 46 ++++++++++--------- tom_targets/signals/handlers.py | 8 +--- .../partials/create_persistent_share.html | 2 +- .../partials/persistent_share_table.html | 2 +- 6 files changed, 42 insertions(+), 34 deletions(-) diff --git a/tom_dataproducts/alertstreams/hermes.py b/tom_dataproducts/alertstreams/hermes.py index cdfba7ac2..d3cd8b2c7 100644 --- a/tom_dataproducts/alertstreams/hermes.py +++ b/tom_dataproducts/alertstreams/hermes.py @@ -149,7 +149,7 @@ def get_hermes_spectroscopy(self, datum): def convert_astropy_brightness_to_hermes(brightness_unit): if not brightness_unit: return brightness_unit - elif brightness_unit.uppercase() == 'AB' or brightness_unit.uppercase() == 'ABFLUX': + elif brightness_unit.upper() == 'AB' or brightness_unit.upper() == 'ABFLUX': return 'AB mag' else: return brightness_unit @@ -158,11 +158,11 @@ def convert_astropy_brightness_to_hermes(brightness_unit): def convert_astropy_wavelength_to_hermes(wavelength_unit): if not wavelength_unit: return wavelength_unit - elif wavelength_unit.lowercase() == 'angstrom' or wavelength_unit == 'AA': + elif wavelength_unit.lower() == 'angstrom' or wavelength_unit == 'AA': return 'Å' - elif wavelength_unit.lowercase() == 'micron': + elif wavelength_unit.lower() == 'micron': return 'µm' - elif wavelength_unit.lowercase() == 'hertz': + elif wavelength_unit.lower() == 'hertz': return 'Hz' else: return wavelength_unit diff --git a/tom_dataproducts/data_processor.py b/tom_dataproducts/data_processor.py index 9438b54cf..b27a7781c 100644 --- a/tom_dataproducts/data_processor.py +++ b/tom_dataproducts/data_processor.py @@ -6,6 +6,7 @@ from importlib import import_module from tom_dataproducts.models import ReducedDatum +from tom_dataproducts.sharing import continuous_share_data logger = logging.getLogger(__name__) @@ -70,7 +71,14 @@ def run_data_processor(dp): # timestamp=datum[0], value=datum[1], source_name=datum[2]) for datum in data] # 3. Finally, insert the new ReducedDatum objects into the database - ReducedDatum.objects.bulk_create(new_reduced_datums) + reduced_datums = ReducedDatum.objects.bulk_create(new_reduced_datums) + + # 4. Trigger any sharing you may have set to occur when new data comes in + # Encapsulate this in a try/catch so sharing failure doesn't prevent dataproduct ingestion + try: + continuous_share_data(dp.target, reduced_datums) + except Exception as e: + logger.warning(f"Failed to share new dataproduct {dp.product_id}: {repr(e)}") # log what happened if skipped_data: diff --git a/tom_dataproducts/sharing.py b/tom_dataproducts/sharing.py index 231d948c3..1b102d575 100644 --- a/tom_dataproducts/sharing.py +++ b/tom_dataproducts/sharing.py @@ -11,34 +11,38 @@ from django.http import StreamingHttpResponse from django.utils.text import slugify -from tom_targets.models import Target +from tom_targets.models import Target, PersistentShare from tom_dataproducts.models import DataProduct, ReducedDatum from tom_dataproducts.alertstreams.hermes import publish_to_hermes, BuildHermesMessage, get_hermes_topics from tom_dataproducts.serializers import DataProductSerializer, ReducedDatumSerializer -def share_data_with_destination(share_destination, reduced_datum): +def continuous_share_data(target, reduced_datums): """ - Triggered by PersistentShare when new ReducedDatums are created. - Shares that ReducedDatum to the sharing destination. - :param share_destination: Topic or location to share data to from `DATA_SHARING` settings - :param reduced_datum: ReducedDatum instance to share + Triggered when new ReducedDatums are created. + Shares those ReducedDatums to the sharing destination of any PersistentShares on the target. + :param target: Target instance that these reduced_datums belong to + :param reduced_datums: list of ReducedDatum instances to share """ - if 'HERMES' in share_destination.upper(): - hermes_topic = share_destination.split(':')[1] - destination = share_destination.split(':')[0] - filtered_reduced_datums = check_for_share_safe_datums( - destination, ReducedDatum.objects.filter(pk=reduced_datum.pk), topic=hermes_topic) - sharing = getattr(settings, "DATA_SHARING", {}) - message = BuildHermesMessage(title=f"Updated data for {reduced_datum.target.name} from " - f"{getattr(settings, 'TOM_NAME', 'TOM Toolkit')}.", - authors=sharing.get('hermes', {}).get('DEFAULT_AUTHORS', None), - message=None, - topic=hermes_topic - ) - publish_to_hermes(message, filtered_reduced_datums) - else: - share_data_with_tom(share_destination, None, None, None, selected_data=[reduced_datum.pk]) + persistentshares = PersistentShare.objects.filter(target=target) + for persistentshare in persistentshares: + share_destination = persistentshare.destination + reduced_datum_pks = [rd.pk for rd in reduced_datums] + if 'HERMES' in share_destination.upper(): + hermes_topic = share_destination.split(':')[1] + destination = share_destination.split(':')[0] + filtered_reduced_datums = check_for_share_safe_datums( + destination, ReducedDatum.objects.filter(pk__in=reduced_datum_pks), topic=hermes_topic) + sharing = getattr(settings, "DATA_SHARING", {}) + message = BuildHermesMessage(title=f"Updated data for {target.name} from " + f"{getattr(settings, 'TOM_NAME', 'TOM Toolkit')}.", + authors=sharing.get('hermes', {}).get('DEFAULT_AUTHORS', None), + message=None, + topic=hermes_topic + ) + publish_to_hermes(message, filtered_reduced_datums) + else: + share_data_with_tom(share_destination, None, None, None, selected_data=reduced_datum_pks) def share_target_list_with_hermes(share_destination, form_data, selected_targets=None, include_all_data=False): diff --git a/tom_targets/signals/handlers.py b/tom_targets/signals/handlers.py index ca24f2a18..1a55da254 100644 --- a/tom_targets/signals/handlers.py +++ b/tom_targets/signals/handlers.py @@ -2,8 +2,7 @@ from django.db.models.signals import post_save from tom_dataproducts.models import ReducedDatum -from tom_dataproducts.sharing import share_data_with_destination -from tom_targets.models import PersistentShare +from tom_dataproducts.sharing import continuous_share_data @receiver(post_save, sender=ReducedDatum) @@ -11,7 +10,4 @@ def cb_dataproduct_post_save(sender, instance, *args, **kwargs): # When a new dataproduct is created or updated, check for any persistentshare instances on that target # and if they exist, attempt to share the new data target = instance.target - persistentshares = PersistentShare.objects.filter(target=target) - for persistentshare in persistentshares: - share_destination = persistentshare.destination - share_data_with_destination(share_destination, instance) + continuous_share_data(target, reduced_datums=[instance]) diff --git a/tom_targets/templates/tom_targets/partials/create_persistent_share.html b/tom_targets/templates/tom_targets/partials/create_persistent_share.html index 7f025b56f..e7aa06e90 100644 --- a/tom_targets/templates/tom_targets/partials/create_persistent_share.html +++ b/tom_targets/templates/tom_targets/partials/create_persistent_share.html @@ -70,4 +70,4 @@ var error_msg_alert = document.getElementById('create_persistent_share_error'); error_msg_alert.style.display = "none"; } - \ No newline at end of file + diff --git a/tom_targets/templates/tom_targets/partials/persistent_share_table.html b/tom_targets/templates/tom_targets/partials/persistent_share_table.html index b2ee43789..f97301a24 100644 --- a/tom_targets/templates/tom_targets/partials/persistent_share_table.html +++ b/tom_targets/templates/tom_targets/partials/persistent_share_table.html @@ -49,4 +49,4 @@ table.innerHTML = html; }) } - \ No newline at end of file +