From 165e4db0aeb4ae4750c58c648dc5a2cec146c76b Mon Sep 17 00:00:00 2001 From: Steph Merritt Date: Thu, 23 May 2024 15:31:06 +0100 Subject: [PATCH] Adding JSON constructor methods. --- src/adler/dataclasses/AdlerPlanetoid.py | 28 ++++++++++++++++++++ src/adler/dataclasses/MPCORB.py | 11 +++++++- src/adler/dataclasses/Observations.py | 15 ++++++++++- src/adler/dataclasses/SSObject.py | 23 +++++++++++++++- src/adler/dataclasses/dataclass_utilities.py | 23 +++++++++++++++- 5 files changed, 96 insertions(+), 4 deletions(-) diff --git a/src/adler/dataclasses/AdlerPlanetoid.py b/src/adler/dataclasses/AdlerPlanetoid.py index 18748bd..d6ceb40 100644 --- a/src/adler/dataclasses/AdlerPlanetoid.py +++ b/src/adler/dataclasses/AdlerPlanetoid.py @@ -1,6 +1,8 @@ from lsst.rsp import get_tap_service import pandas as pd +import numpy as np import logging +import json from adler.dataclasses.Observations import Observations from adler.dataclasses.MPCORB import MPCORB @@ -114,6 +116,32 @@ def construct_from_SQL( return cls(ssObjectId, filter_list, date_range, observations_by_filter, mpcorb, ssobject, adler_data) + @classmethod + def construct_from_JSON(cls, json_filename): + with open(json_filename) as f: + json_dict = json.load(f) + + observations_dict = {**json_dict["SSSource"], **json_dict["DiaSource"]} + + filter_list = [observations_dict["band"]] + + MPCORB_dict = json_dict["MPCORB"] + SSObject_dict = json_dict["SSObject"] + + ssObjectId = observations_dict["ssObjectId"] + + observations_by_filter = [ + Observations.construct_from_dictionary(ssObjectId, filter_list[0], observations_dict) + ] + mpcorb = MPCORB.construct_from_dictionary(ssObjectId, MPCORB_dict) + ssobject = SSObject.construct_from_dictionary(ssObjectId, filter_list, SSObject_dict) + + adler_data = AdlerData(ssObjectId, filter_list) + + return cls( + ssObjectId, filter_list, [np.nan, np.nan], observations_by_filter, mpcorb, ssobject, adler_data + ) + @classmethod def construct_from_RSP( cls, ssObjectId, filter_list=["u", "g", "r", "i", "z", "y"], date_range=[60000.0, 67300.0] diff --git a/src/adler/dataclasses/MPCORB.py b/src/adler/dataclasses/MPCORB.py index a17bd18..289084d 100644 --- a/src/adler/dataclasses/MPCORB.py +++ b/src/adler/dataclasses/MPCORB.py @@ -1,6 +1,6 @@ from dataclasses import dataclass -from adler.dataclasses.dataclass_utilities import get_from_table +from adler.dataclasses.dataclass_utilities import get_from_table, get_from_dictionary MPCORB_KEYS = { "mpcDesignation": str, @@ -109,3 +109,12 @@ def construct_from_data_table(cls, ssObjectId, data_table): mpcorb_dict[mpcorb_key] = get_from_table(data_table, mpcorb_key, mpcorb_type, "MPCORB") return cls(**mpcorb_dict) + + @classmethod + def construct_from_dictionary(cls, ssObjectId, data_dict): + mpcorb_dict = {"ssObjectId": ssObjectId} + + for mpcorb_key, mpcorb_type in MPCORB_KEYS.items(): + mpcorb_dict[mpcorb_key] = get_from_dictionary(data_dict, mpcorb_key, mpcorb_type, "MPCORB") + + return cls(**mpcorb_dict) diff --git a/src/adler/dataclasses/Observations.py b/src/adler/dataclasses/Observations.py index 64b3900..e99a758 100644 --- a/src/adler/dataclasses/Observations.py +++ b/src/adler/dataclasses/Observations.py @@ -1,7 +1,7 @@ from dataclasses import dataclass, field import numpy as np -from adler.dataclasses.dataclass_utilities import get_from_table +from adler.dataclasses.dataclass_utilities import get_from_table, get_from_dictionary OBSERVATIONS_KEYS = { "mag": np.ndarray, @@ -107,6 +107,19 @@ def construct_from_data_table(cls, ssObjectId, filter_name, data_table): return cls(**obs_dict) + @classmethod + def construct_from_dictionary(cls, ssObjectId, filter_name, data_dict): + obs_dict = {"ssObjectId": ssObjectId, "filter_name": filter_name, "num_obs": 1} + + for obs_key, obs_type in OBSERVATIONS_KEYS.items(): + obs_dict[obs_key] = get_from_dictionary(data_dict, obs_key, obs_type, "SSSource/DIASource") + + obs_dict["reduced_mag"] = cls.calculate_reduced_mag( + cls, obs_dict["mag"], obs_dict["topocentricDist"], obs_dict["heliocentricDist"] + ) + + return cls(**obs_dict) + def calculate_reduced_mag(self, mag, topocentric_dist, heliocentric_dist): """ Calculates the reduced magnitude column. diff --git a/src/adler/dataclasses/SSObject.py b/src/adler/dataclasses/SSObject.py index 9ec0443..eedc724 100644 --- a/src/adler/dataclasses/SSObject.py +++ b/src/adler/dataclasses/SSObject.py @@ -1,7 +1,7 @@ from dataclasses import dataclass, field import numpy as np -from adler.dataclasses.dataclass_utilities import get_from_table +from adler.dataclasses.dataclass_utilities import get_from_table, get_from_dictionary SSO_KEYS = { "discoverySubmissionDate": float, @@ -86,6 +86,27 @@ def construct_from_data_table(cls, ssObjectId, filter_list, data_table): return cls(**sso_dict) + @classmethod + def construct_from_dictionary(cls, ssObjectId, filter_list, data_dict): + sso_dict = {"ssObjectId": ssObjectId, "filter_list": filter_list, "filter_dependent_values": []} + + for sso_key, sso_type in SSO_KEYS.items(): + sso_dict[sso_key] = get_from_dictionary(data_dict, sso_key, sso_type, "SSObject") + + for i, filter_name in enumerate(filter_list): + filter_dept_object = FilterDependentSSO( + filter_name=filter_name, + H=get_from_dictionary(data_dict, filter_name + "_H", float, "SSObject"), + G12=get_from_dictionary(data_dict, filter_name + "_G12", float, "SSObject"), + Herr=get_from_dictionary(data_dict, filter_name + "_HErr", float, "SSObject"), + G12err=get_from_dictionary(data_dict, filter_name + "_G12Err", float, "SSObject"), + nData=get_from_dictionary(data_dict, filter_name + "_Ndata", float, "SSObject"), + ) + + sso_dict["filter_dependent_values"].append(filter_dept_object) + + return cls(**sso_dict) + @dataclass class FilterDependentSSO: diff --git a/src/adler/dataclasses/dataclass_utilities.py b/src/adler/dataclasses/dataclass_utilities.py index 40d5818..d82c031 100644 --- a/src/adler/dataclasses/dataclass_utilities.py +++ b/src/adler/dataclasses/dataclass_utilities.py @@ -86,7 +86,7 @@ def get_from_table(data_table, column_name, data_type, table_name="default"): elif data_type == int: data_val = int(data_table[column_name][0]) elif data_type == np.ndarray: - data_val = np.array(data_table[column_name]) + data_val = np.array(data_table[column_name], ndmin=1) else: logger.error( "TypeError: Type for argument data_type not recognised for column {} in table {}: must be str, float, int or np.ndarray.".format( @@ -108,6 +108,27 @@ def get_from_table(data_table, column_name, data_type, table_name="default"): return data_val +def get_from_dictionary(data_dict, key_name, data_type, table_name="default"): + try: + if data_type == str: + data_val = str(data_dict[key_name]) + elif data_type == float: + data_val = float(data_dict[key_name]) + elif data_type == int: + data_val = int(data_dict[key_name]) + elif data_type == np.ndarray: + data_val = np.array(data_dict[key_name], ndmin=1) + else: + print("type not recognised") + + except ValueError: + print("error message") + + data_val = check_value_populated(data_val, data_type, key_name, "JSON") + + return data_val + + def check_value_populated(data_val, data_type, column_name, table_name): """Checks to see if data_val populated properly and prints a helpful warning if it didn't. Usually this will trigger because the RSP hasn't populated that field for this particular object.