Skip to content

Commit

Permalink
Adding JSON constructor methods. (#144)
Browse files Browse the repository at this point in the history
  • Loading branch information
astronomerritt authored Jun 18, 2024
1 parent da12eac commit 6f067fc
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 4 deletions.
28 changes: 28 additions & 0 deletions src/adler/dataclasses/AdlerPlanetoid.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from lsst.rsp import get_tap_service
import pandas as pd
import numpy as np
import logging
import json

from adler.dataclasses.Observations import Observations
from adler.dataclasses.MPCORB import MPCORB
Expand Down Expand Up @@ -114,6 +116,32 @@ def construct_from_SQL(

return cls(ssObjectId, filter_list, date_range, observations_by_filter, mpcorb, ssobject, adler_data)

@classmethod
def construct_from_JSON(cls, json_filename):
with open(json_filename) as f:
json_dict = json.load(f)

observations_dict = {**json_dict["SSSource"], **json_dict["DiaSource"]}

filter_list = [observations_dict["band"]]

MPCORB_dict = json_dict["MPCORB"]
SSObject_dict = json_dict["SSObject"]

ssObjectId = observations_dict["ssObjectId"]

observations_by_filter = [
Observations.construct_from_dictionary(ssObjectId, filter_list[0], observations_dict)
]
mpcorb = MPCORB.construct_from_dictionary(ssObjectId, MPCORB_dict)
ssobject = SSObject.construct_from_dictionary(ssObjectId, filter_list, SSObject_dict)

adler_data = AdlerData(ssObjectId, filter_list)

return cls(
ssObjectId, filter_list, [np.nan, np.nan], observations_by_filter, mpcorb, ssobject, adler_data
)

@classmethod
def construct_from_RSP(
cls, ssObjectId, filter_list=["u", "g", "r", "i", "z", "y"], date_range=[60000.0, 67300.0]
Expand Down
11 changes: 10 additions & 1 deletion src/adler/dataclasses/MPCORB.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from dataclasses import dataclass

from adler.dataclasses.dataclass_utilities import get_from_table
from adler.dataclasses.dataclass_utilities import get_from_table, get_from_dictionary

MPCORB_KEYS = {
"mpcDesignation": str,
Expand Down Expand Up @@ -109,3 +109,12 @@ def construct_from_data_table(cls, ssObjectId, data_table):
mpcorb_dict[mpcorb_key] = get_from_table(data_table, mpcorb_key, mpcorb_type, "MPCORB")

return cls(**mpcorb_dict)

@classmethod
def construct_from_dictionary(cls, ssObjectId, data_dict):
mpcorb_dict = {"ssObjectId": ssObjectId}

for mpcorb_key, mpcorb_type in MPCORB_KEYS.items():
mpcorb_dict[mpcorb_key] = get_from_dictionary(data_dict, mpcorb_key, mpcorb_type, "MPCORB")

return cls(**mpcorb_dict)
15 changes: 14 additions & 1 deletion src/adler/dataclasses/Observations.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from dataclasses import dataclass, field
import numpy as np

from adler.dataclasses.dataclass_utilities import get_from_table
from adler.dataclasses.dataclass_utilities import get_from_table, get_from_dictionary

OBSERVATIONS_KEYS = {
"mag": np.ndarray,
Expand Down Expand Up @@ -107,6 +107,19 @@ def construct_from_data_table(cls, ssObjectId, filter_name, data_table):

return cls(**obs_dict)

@classmethod
def construct_from_dictionary(cls, ssObjectId, filter_name, data_dict):
obs_dict = {"ssObjectId": ssObjectId, "filter_name": filter_name, "num_obs": 1}

for obs_key, obs_type in OBSERVATIONS_KEYS.items():
obs_dict[obs_key] = get_from_dictionary(data_dict, obs_key, obs_type, "SSSource/DIASource")

obs_dict["reduced_mag"] = cls.calculate_reduced_mag(
cls, obs_dict["mag"], obs_dict["topocentricDist"], obs_dict["heliocentricDist"]
)

return cls(**obs_dict)

def calculate_reduced_mag(self, mag, topocentric_dist, heliocentric_dist):
"""
Calculates the reduced magnitude column.
Expand Down
23 changes: 22 additions & 1 deletion src/adler/dataclasses/SSObject.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from dataclasses import dataclass, field
import numpy as np

from adler.dataclasses.dataclass_utilities import get_from_table
from adler.dataclasses.dataclass_utilities import get_from_table, get_from_dictionary

SSO_KEYS = {
"discoverySubmissionDate": float,
Expand Down Expand Up @@ -86,6 +86,27 @@ def construct_from_data_table(cls, ssObjectId, filter_list, data_table):

return cls(**sso_dict)

@classmethod
def construct_from_dictionary(cls, ssObjectId, filter_list, data_dict):
sso_dict = {"ssObjectId": ssObjectId, "filter_list": filter_list, "filter_dependent_values": []}

for sso_key, sso_type in SSO_KEYS.items():
sso_dict[sso_key] = get_from_dictionary(data_dict, sso_key, sso_type, "SSObject")

for i, filter_name in enumerate(filter_list):
filter_dept_object = FilterDependentSSO(
filter_name=filter_name,
H=get_from_dictionary(data_dict, filter_name + "_H", float, "SSObject"),
G12=get_from_dictionary(data_dict, filter_name + "_G12", float, "SSObject"),
Herr=get_from_dictionary(data_dict, filter_name + "_HErr", float, "SSObject"),
G12err=get_from_dictionary(data_dict, filter_name + "_G12Err", float, "SSObject"),
nData=get_from_dictionary(data_dict, filter_name + "_Ndata", float, "SSObject"),
)

sso_dict["filter_dependent_values"].append(filter_dept_object)

return cls(**sso_dict)


@dataclass
class FilterDependentSSO:
Expand Down
23 changes: 22 additions & 1 deletion src/adler/dataclasses/dataclass_utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def get_from_table(data_table, column_name, data_type, table_name="default"):
elif data_type == int:
data_val = int(data_table[column_name][0])
elif data_type == np.ndarray:
data_val = np.array(data_table[column_name])
data_val = np.array(data_table[column_name], ndmin=1)
else:
logger.error(
"TypeError: Type for argument data_type not recognised for column {} in table {}: must be str, float, int or np.ndarray.".format(
Expand All @@ -108,6 +108,27 @@ def get_from_table(data_table, column_name, data_type, table_name="default"):
return data_val


def get_from_dictionary(data_dict, key_name, data_type, table_name="default"):
try:
if data_type == str:
data_val = str(data_dict[key_name])
elif data_type == float:
data_val = float(data_dict[key_name])
elif data_type == int:
data_val = int(data_dict[key_name])
elif data_type == np.ndarray:
data_val = np.array(data_dict[key_name], ndmin=1)
else:
print("type not recognised")

except ValueError:
print("error message")

data_val = check_value_populated(data_val, data_type, key_name, "JSON")

return data_val


def check_value_populated(data_val, data_type, column_name, table_name):
"""Checks to see if data_val populated properly and prints a helpful warning if it didn't.
Usually this will trigger because the RSP hasn't populated that field for this particular object.
Expand Down

0 comments on commit 6f067fc

Please sign in to comment.