Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding functionality to grab lightcurves from Cassandra. #167

Merged
merged 7 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/adler/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from . import dataclasses
from . import science
from . import utilities
from . import lasair
85 changes: 71 additions & 14 deletions src/adler/dataclasses/AdlerPlanetoid.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,30 +117,87 @@ def construct_from_SQL(
return cls(ssObjectId, filter_list, date_range, observations_by_filter, mpcorb, ssobject, adler_data)

@classmethod
def construct_from_JSON(cls, json_filename):
with open(json_filename) as f:
json_dict = json.load(f)
def construct_from_cassandra(
cls,
ssObjectId,
filter_list=["u", "g", "r", "i", "z", "y"],
date_range=[60000.0, 67300.0],
cassandra_hosts=["10.21.3.123"],
): # pragma: no cover
"""Custom constructor which builds the AdlerPlanetoid object and the associated Observations, MPCORB and SSObject objects from
a Cassandra database. Used only for Lasair integration.

TODO: move method to its own class which inherits from AdlerPlanetoid and move to adler-lasair repo?

Parameters
-----------
ssObjectId : str
ssObjectId of the object of interest.

observations_dict = {**json_dict["SSSource"], **json_dict["DiaSource"]}
filter_list : list of str
A comma-separated list of the filters of interest.

date_range : list of float
The minimum and maximum dates of the desired observations.

cassandra_hosts : list of str
Location of the Cassandra database - usually an IP address. Default is ["10.21.3.123"].

filter_list = [observations_dict["band"]]
"""
# do not move this import! CassandraFetcher requires the non-mandatory
# cassandra-driver library - if not installed, and this import is at the top,
# test collection will break.
from adler.lasair.cassandra_fetcher import CassandraFetcher

fetcher = CassandraFetcher(cassandra_hosts=cassandra_hosts)

MPCORB_dict = fetcher.fetch_MPCORB(ssObjectId)
SSObject_dict = fetcher.fetch_SSObject(ssObjectId, filter_list)
observations_dict = fetcher.fetch_observations(ssObjectId)

MPCORB_dict = json_dict["MPCORB"]
SSObject_dict = json_dict["SSObject"]
# note that Cassandra doesn't allow filters/joins
# instead we pull all observations for this ID, then filter with Pandas later
observations_table = pd.DataFrame(observations_dict)
observations_table.rename(columns={"decl": "dec"}, inplace=True)

ssObjectId = observations_dict["ssObjectId"]
observations_by_filter = []
for filter_name in filter_list:
obs_slice = observations_table[
(observations_table["band"] == filter_name)
& (observations_table["midpointmjdtai"].between(date_range[0], date_range[1]))
]

if len(obs_slice) == 0:
logger.warning(
"No observations found in {} filter for this object. Skipping this filter.".format(
filter_name
)
)
else:
observations = Observations.construct_from_data_table(ssObjectId, filter_name, obs_slice)
observations_by_filter.append(observations)

if len(observations_by_filter) == 0:
logger.error(
"No observations found for this object in the given filter(s). Check SSOID and try again."
)
raise Exception(
"No observations found for this object in the given filter(s). Check SSOID and try again."
)

if len(filter_list) > len(observations_by_filter):
logger.info(
"Not all specified filters have observations. Recalculating filter list based on past observations."
)
filter_list = [obs_object.filter_name for obs_object in observations_by_filter]
logger.info("New filter list is: {}".format(filter_list))

observations_by_filter = [
Observations.construct_from_dictionary(ssObjectId, filter_list[0], observations_dict)
]
mpcorb = MPCORB.construct_from_dictionary(ssObjectId, MPCORB_dict)
ssobject = SSObject.construct_from_dictionary(ssObjectId, filter_list, SSObject_dict)

adler_data = AdlerData(ssObjectId, filter_list)

return cls(
ssObjectId, filter_list, [np.nan, np.nan], observations_by_filter, mpcorb, ssobject, adler_data
)
return cls(ssObjectId, filter_list, date_range, observations_by_filter, mpcorb, ssobject, adler_data)

@classmethod
def construct_from_RSP(
Expand Down
20 changes: 19 additions & 1 deletion src/adler/dataclasses/MPCORB.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,27 @@ def construct_from_data_table(cls, ssObjectId, data_table):

@classmethod
def construct_from_dictionary(cls, ssObjectId, data_dict):
"""Initialises the MPCORB object from a dictionary of data.

Parameters
-----------
ssObjectId : str
ssObjectId of the object of interest.

data_dict : dict or dict-like object
Dictionary of data from which attributes shoud be populated.

Returns
-----------
MPCORB object
MPCORB object with class attributes populated from data_table.

"""
mpcorb_dict = {"ssObjectId": ssObjectId}

for mpcorb_key, mpcorb_type in MPCORB_KEYS.items():
mpcorb_dict[mpcorb_key] = get_from_dictionary(data_dict, mpcorb_key, mpcorb_type, "MPCORB")
mpcorb_dict[mpcorb_key] = get_from_dictionary(
data_dict, mpcorb_key.casefold(), mpcorb_type, "MPCORB"
)

return cls(**mpcorb_dict)
28 changes: 26 additions & 2 deletions src/adler/dataclasses/Observations.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ class Observations:
num_obs: int = 0

@classmethod
def construct_from_data_table(cls, ssObjectId, filter_name, data_table):
def construct_from_data_table(cls, ssObjectId, filter_name, data_table, cassandra=False):
"""Initialises the Observations object from a table of data.

Parameters
Expand All @@ -139,7 +139,12 @@ def construct_from_data_table(cls, ssObjectId, filter_name, data_table):
obs_dict = {"ssObjectId": ssObjectId, "filter_name": filter_name, "num_obs": len(data_table)}

for obs_key, obs_type in OBSERVATIONS_KEYS.items():
obs_dict[obs_key] = get_from_table(data_table, obs_key, obs_type, "SSSource/DIASource")
try:
obs_dict[obs_key] = get_from_table(data_table, obs_key, obs_type, "SSSource/DIASource")
except KeyError: # sometimes we have case issues...
obs_dict[obs_key] = get_from_table(
data_table, obs_key.casefold(), obs_type, "SSSource/DIASource"
)

obs_dict["reduced_mag"] = cls.calculate_reduced_mag(
cls, obs_dict["mag"], obs_dict["topocentricDist"], obs_dict["heliocentricDist"]
Expand All @@ -149,6 +154,25 @@ def construct_from_data_table(cls, ssObjectId, filter_name, data_table):

@classmethod
def construct_from_dictionary(cls, ssObjectId, filter_name, data_dict):
"""Initialises the Observations object from a dictionary of data.

Parameters
-----------
ssObjectId : str
ssObjectId of the object of interest.

filter_name : str
String of the filter the observations are taken in,

data_dict : dict or dict-like object
Dictionary of data from which attributes shoud be populated.

Returns
-----------
Observations object
Observations object with class attributes populated from data_dict.

"""
obs_dict = {"ssObjectId": ssObjectId, "filter_name": filter_name, "num_obs": 1}

for obs_key, obs_type in OBSERVATIONS_KEYS.items():
Expand Down
52 changes: 46 additions & 6 deletions src/adler/dataclasses/SSObject.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,25 @@ class SSObject:

@classmethod
def construct_from_data_table(cls, ssObjectId, filter_list, data_table):
"""Initialises the SSObject object from a table of data.

Parameters
-----------
ssObjectId : str
ssObjectId of the object of interest.

filter_list : list of str
A comma-separated list of the filters of interest.

data_table : table-like object
Table of data from which attributes shoud be populated.

Returns
-----------
SSObject object
SSObject object with class attributes populated from data_table.

"""
sso_dict = {"ssObjectId": ssObjectId, "filter_list": filter_list, "filter_dependent_values": []}

for sso_key, sso_type in SSO_KEYS.items():
Expand All @@ -88,19 +107,40 @@ def construct_from_data_table(cls, ssObjectId, filter_list, data_table):

@classmethod
def construct_from_dictionary(cls, ssObjectId, filter_list, data_dict):
"""Initialises the SSObject object from a dictionary of data.

Parameters
-----------
ssObjectId : str
ssObjectId of the object of interest.

filter_list : list of str
A comma-separated list of the filters of interest.

data_dict : dict or dict-like object
Ditcionary of data from which attributes shoud be populated.

Returns
-----------
SSObject object
SSObject object with class attributes populated from data_dict.

"""
sso_dict = {"ssObjectId": ssObjectId, "filter_list": filter_list, "filter_dependent_values": []}

for sso_key, sso_type in SSO_KEYS.items():
sso_dict[sso_key] = get_from_dictionary(data_dict, sso_key, sso_type, "SSObject")
sso_dict[sso_key] = get_from_dictionary(data_dict, sso_key.casefold(), sso_type, "SSObject")

for i, filter_name in enumerate(filter_list):
filter_dept_object = FilterDependentSSO(
filter_name=filter_name,
H=get_from_dictionary(data_dict, filter_name + "_H", float, "SSObject"),
G12=get_from_dictionary(data_dict, filter_name + "_G12", float, "SSObject"),
Herr=get_from_dictionary(data_dict, filter_name + "_HErr", float, "SSObject"),
G12err=get_from_dictionary(data_dict, filter_name + "_G12Err", float, "SSObject"),
nData=get_from_dictionary(data_dict, filter_name + "_Ndata", float, "SSObject"),
H=get_from_dictionary(data_dict, (filter_name + "_H").casefold(), float, "SSObject"),
G12=get_from_dictionary(data_dict, (filter_name + "_G12").casefold(), float, "SSObject"),
Herr=get_from_dictionary(data_dict, (filter_name + "_HErr").casefold(), float, "SSObject"),
G12err=get_from_dictionary(
data_dict, (filter_name + "_G12Err").casefold(), float, "SSObject"
),
nData=get_from_dictionary(data_dict, (filter_name + "_Ndata").casefold(), float, "SSObject"),
)

sso_dict["filter_dependent_values"].append(filter_dept_object)
Expand Down
28 changes: 26 additions & 2 deletions src/adler/dataclasses/dataclass_utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,29 @@ def get_from_table(data_table, column_name, data_type, table_name="default"):


def get_from_dictionary(data_dict, key_name, data_type, table_name="default"):
"""Retrieves information from a dictionary and forces it to be a specified type.

Parameters
-----------
data_dict : dict or dict-like object
Dictionary containing columns of interest.

key_name : str
Key name under which the data of interest is stored.

data_type : type
Data type. Should be int, float, str or np.ndarray.

table_name : str
Name of the table or dictionary. This is mostly for more informative error messages. Default="default".

Returns
-----------
data_val : str, float, int or nd.array
The data requested from the dictionary cast to the type required.

"""

try:
if data_type == str:
data_val = str(data_dict[key_name])
Expand All @@ -124,14 +147,15 @@ def get_from_dictionary(data_dict, key_name, data_type, table_name="default"):
except ValueError:
print("error message")

data_val = check_value_populated(data_val, data_type, key_name, "JSON")
data_val = check_value_populated(data_val, data_type, key_name, "dictionary")

return data_val


def check_value_populated(data_val, data_type, column_name, table_name):
"""Checks to see if data_val populated properly and prints a helpful warning if it didn't.
Usually this will trigger because the RSP hasn't populated that field for this particular object.
Usually this will trigger because the RSP or Cassandra database hasn't populated that
field for this particular object.

Parameters
-----------
Expand Down
Loading
Loading