Skip to content

Commit

Permalink
Add triggering from data_processor because bulk_create doesn't trigge…
Browse files Browse the repository at this point in the history
…r post_save.
  • Loading branch information
Jon committed Dec 11, 2024
1 parent 9f14ca8 commit 7ff83bf
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 34 deletions.
8 changes: 4 additions & 4 deletions tom_dataproducts/alertstreams/hermes.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ def get_hermes_spectroscopy(self, datum):
def convert_astropy_brightness_to_hermes(brightness_unit):
if not brightness_unit:
return brightness_unit
elif brightness_unit.uppercase() == 'AB' or brightness_unit.uppercase() == 'ABFLUX':
elif brightness_unit.upper() == 'AB' or brightness_unit.upper() == 'ABFLUX':
return 'AB mag'
else:
return brightness_unit
Expand All @@ -158,11 +158,11 @@ def convert_astropy_brightness_to_hermes(brightness_unit):
def convert_astropy_wavelength_to_hermes(wavelength_unit):
if not wavelength_unit:
return wavelength_unit
elif wavelength_unit.lowercase() == 'angstrom' or wavelength_unit == 'AA':
elif wavelength_unit.lower() == 'angstrom' or wavelength_unit == 'AA':
return 'Å'
elif wavelength_unit.lowercase() == 'micron':
elif wavelength_unit.lower() == 'micron':
return 'µm'
elif wavelength_unit.lowercase() == 'hertz':
elif wavelength_unit.lower() == 'hertz':
return 'Hz'
else:
return wavelength_unit
Expand Down
10 changes: 9 additions & 1 deletion tom_dataproducts/data_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from importlib import import_module

from tom_dataproducts.models import ReducedDatum
from tom_dataproducts.sharing import continuous_share_data

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -70,7 +71,14 @@ def run_data_processor(dp):
# timestamp=datum[0], value=datum[1], source_name=datum[2]) for datum in data]

# 3. Finally, insert the new ReducedDatum objects into the database
ReducedDatum.objects.bulk_create(new_reduced_datums)
reduced_datums = ReducedDatum.objects.bulk_create(new_reduced_datums)

# 4. Trigger any sharing you may have set to occur when new data comes in
# Encapsulate this in a try/catch so sharing failure doesn't prevent dataproduct ingestion
try:
continuous_share_data(dp.target, reduced_datums)
except Exception as e:
logger.warning(f"Failed to share new dataproduct {dp.product_id}: {repr(e)}")

# log what happened
if skipped_data:
Expand Down
46 changes: 25 additions & 21 deletions tom_dataproducts/sharing.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,34 +11,38 @@
from django.http import StreamingHttpResponse
from django.utils.text import slugify

from tom_targets.models import Target
from tom_targets.models import Target, PersistentShare
from tom_dataproducts.models import DataProduct, ReducedDatum
from tom_dataproducts.alertstreams.hermes import publish_to_hermes, BuildHermesMessage, get_hermes_topics
from tom_dataproducts.serializers import DataProductSerializer, ReducedDatumSerializer


def share_data_with_destination(share_destination, reduced_datum):
def continuous_share_data(target, reduced_datums):
"""
Triggered by PersistentShare when new ReducedDatums are created.
Shares that ReducedDatum to the sharing destination.
:param share_destination: Topic or location to share data to from `DATA_SHARING` settings
:param reduced_datum: ReducedDatum instance to share
Triggered when new ReducedDatums are created.
Shares those ReducedDatums to the sharing destination of any PersistentShares on the target.
:param target: Target instance that these reduced_datums belong to
:param reduced_datums: list of ReducedDatum instances to share
"""
if 'HERMES' in share_destination.upper():
hermes_topic = share_destination.split(':')[1]
destination = share_destination.split(':')[0]
filtered_reduced_datums = check_for_share_safe_datums(
destination, ReducedDatum.objects.filter(pk=reduced_datum.pk), topic=hermes_topic)
sharing = getattr(settings, "DATA_SHARING", {})
message = BuildHermesMessage(title=f"Updated data for {reduced_datum.target.name} from "
f"{getattr(settings, 'TOM_NAME', 'TOM Toolkit')}.",
authors=sharing.get('hermes', {}).get('DEFAULT_AUTHORS', None),
message=None,
topic=hermes_topic
)
publish_to_hermes(message, filtered_reduced_datums)
else:
share_data_with_tom(share_destination, None, None, None, selected_data=[reduced_datum.pk])
persistentshares = PersistentShare.objects.filter(target=target)
for persistentshare in persistentshares:
share_destination = persistentshare.destination
reduced_datum_pks = [rd.pk for rd in reduced_datums]
if 'HERMES' in share_destination.upper():
hermes_topic = share_destination.split(':')[1]
destination = share_destination.split(':')[0]
filtered_reduced_datums = check_for_share_safe_datums(
destination, ReducedDatum.objects.filter(pk__in=reduced_datum_pks), topic=hermes_topic)
sharing = getattr(settings, "DATA_SHARING", {})
message = BuildHermesMessage(title=f"Updated data for {target.name} from "
f"{getattr(settings, 'TOM_NAME', 'TOM Toolkit')}.",
authors=sharing.get('hermes', {}).get('DEFAULT_AUTHORS', None),
message=None,
topic=hermes_topic
)
publish_to_hermes(message, filtered_reduced_datums)
else:
share_data_with_tom(share_destination, None, None, None, selected_data=reduced_datum_pks)


def share_target_list_with_hermes(share_destination, form_data, selected_targets=None, include_all_data=False):
Expand Down
8 changes: 2 additions & 6 deletions tom_targets/signals/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,12 @@
from django.db.models.signals import post_save

from tom_dataproducts.models import ReducedDatum
from tom_dataproducts.sharing import share_data_with_destination
from tom_targets.models import PersistentShare
from tom_dataproducts.sharing import continuous_share_data


@receiver(post_save, sender=ReducedDatum)
def cb_dataproduct_post_save(sender, instance, *args, **kwargs):
# When a new dataproduct is created or updated, check for any persistentshare instances on that target
# and if they exist, attempt to share the new data
target = instance.target
persistentshares = PersistentShare.objects.filter(target=target)
for persistentshare in persistentshares:
share_destination = persistentshare.destination
share_data_with_destination(share_destination, instance)
continuous_share_data(target, reduced_datums=[instance])
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,4 @@
var error_msg_alert = document.getElementById('create_persistent_share_error');
error_msg_alert.style.display = "none";
}
</script>
</script>
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,4 @@
table.innerHTML = html;
})
}
</script>
</script>

0 comments on commit 7ff83bf

Please sign in to comment.