diff --git a/.flake8 b/.flake8 index 3d2e9844..5ee8d8f6 100644 --- a/.flake8 +++ b/.flake8 @@ -27,3 +27,4 @@ per-file-ignores = ../fink-fat/fink_fat/seeding/dbscan_seeding.py:W503 ../fink-fat/fink_fat/command_line/cli_main/stats.py:W503 ../fink-fat/fink_fat/kalman/asteroid_kalman.py:W503 + ../fink-fat/fink_fat/command_line/orbit_cli.py:W503 diff --git a/fink_fat/associations/association_kalman.py b/fink_fat/associations/association_kalman.py index 0b2fa655..25e0aecf 100644 --- a/fink_fat/associations/association_kalman.py +++ b/fink_fat/associations/association_kalman.py @@ -2,13 +2,22 @@ import pandas as pd import sys import doctest +from typing import Tuple from copy import deepcopy -from fink_fat.kalman.init_kalman import init_kalman +from fink_fat.others.utils import repeat_chunk -def update_kalman(kalman_copy, new_alert): +def update_kalman( + kalman_copy: pd.Series, + ra_alert: float, + dec_alert: float, + jd_alert: float, + mag_alert: float, + fid_alert: int, + tr_id: int, +) -> pd.Series: """ Update the kalman filter contains in the @@ -16,8 +25,18 @@ def update_kalman(kalman_copy, new_alert): ---------- kalman_copy : pd.Series a row of the kalman dataframe - new_alert : numpy array - a numpy array containing data of one alert + ra_alert : float + right ascencion + dec_alert : float + declination + jd_alert : float + julian date + mag_alert : float + magnitude + fid_alert : int + filter identifier + tr_id : int + new trajectory_id Returns ------- @@ -27,13 +46,13 @@ def update_kalman(kalman_copy, new_alert): Y = np.array( [ [ - new_alert[2], - new_alert[3], + ra_alert, + dec_alert, ] ] ) - dt = new_alert[4] - kalman_copy["jd_1"].values[0] + dt = jd_alert - kalman_copy["jd_1"].values[0] A = np.array( [ [1, 0, dt, 0], @@ -52,12 +71,12 @@ def update_kalman(kalman_copy, new_alert): kalman_copy["dec_0"] = kalman_copy["dec_1"] kalman_copy["jd_0"] = kalman_copy["jd_1"] - kalman_copy["ra_1"] = new_alert[2] - kalman_copy["dec_1"] = new_alert[3] - kalman_copy["jd_1"] = new_alert[4] + kalman_copy["ra_1"] = ra_alert + kalman_copy["dec_1"] = dec_alert + kalman_copy["jd_1"] = jd_alert - kalman_copy["mag_1"] = new_alert[9] - kalman_copy["fid_1"] = new_alert[6] + kalman_copy["mag_1"] = mag_alert + kalman_copy["fid_1"] = fid_alert kalman_copy["dt"] = kalman_copy["jd_1"] - kalman_copy["jd_0"] kalman_copy["vel_ra"] = ( @@ -66,6 +85,7 @@ def update_kalman(kalman_copy, new_alert): kalman_copy["vel_dec"] = ( kalman_copy["dec_1"] - kalman_copy["dec_0"] ) / kalman_copy["dt"] + kalman_copy["trajectory_id"] = tr_id return kalman_copy @@ -131,15 +151,194 @@ def kalman_rowcopy(kalman_row: pd.Series, new_traj_id: int) -> pd.Series: """ r = deepcopy(kalman_row) r["kalman"] = deepcopy(r["kalman"].values) - r["kalman"].values[0].kf_id = new_traj_id + r["kalman"].values[0].kf_id = int(new_traj_id) return r -def update_trajectories( - trajectory_df, - kalman_df, - new_alerts, -): +def trajectory_extension( + trajectory: pd.DataFrame, + cluster: pd.DataFrame, + cluster_id: int, + kalman: pd.DataFrame, + new_tr_id: int = None, +) -> Tuple[pd.DataFrame, pd.DataFrame]: + tmp_cluster = ( + cluster[cluster["trajectory_id"] == cluster_id] + .drop("estimator_id", axis=1) + .sort_values("jd") + .drop_duplicates("objectId") + ) + data_cluster = tmp_cluster[["ra", "dec", "jd", "magpsf", "fid"]].values + + # update the kalman with the new alerts from the clusters + for ra, dec, jd, magpsf, fid in data_cluster: + kalman = update_kalman(kalman, ra, dec, jd, magpsf, fid, new_tr_id) + + extended_traj = pd.concat([trajectory, tmp_cluster]) + + if new_tr_id is not None: + extended_traj["trajectory_id"] = new_tr_id + return extended_traj, kalman + + +def tracklets_associations( + trajectory_df: pd.DataFrame, kalman_df: pd.DataFrame, new_alerts: pd.DataFrame +) -> Tuple[pd.DataFrame, pd.DataFrame]: + """ + Associates the intra night trajectories with the kalman trajectories. + + Parameters + ---------- + trajectory_df : pd.DataFrame + trajectories estimated from kalman filters + kalman_df : pd.DataFrame + dataframe containing the kalman filters informations + new_alerts : pd.DataFrame + new associateds alerts from the new observing night. + + Returns + ------- + Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame] + * the updated trajectories + * the updated kalman filters + + """ + traj_id_to_update = np.sort(new_alerts["estimator_id"].unique()) + traj_id_to_update = traj_id_to_update[traj_id_to_update != -1] + + # get the trajectory to update + mask_tr_update = trajectory_df["trajectory_id"].isin(traj_id_to_update) + + # keep the trajectory with associations + tr_to_update = trajectory_df[mask_tr_update] + + # same for the kalman filters + mask_kalman_update = kalman_df["trajectory_id"].isin(traj_id_to_update) + kalman_to_update = kalman_df[mask_kalman_update] + + cluster_df = new_alerts[new_alerts["trajectory_id"] != -1] + + res_updated_traj = [] + res_updated_kalman = [] + + new_tr_id = np.max(trajectory_df["trajectory_id"].unique()) + 1 + for tr_id in traj_id_to_update: + current_tr = tr_to_update[tr_to_update["trajectory_id"] == tr_id] + current_kalman = kalman_to_update[kalman_to_update["trajectory_id"] == tr_id] + current_cluster = cluster_df[cluster_df["estimator_id"] == tr_id] + current_cluster_id = np.sort(current_cluster["trajectory_id"].unique()) + for cl_id in current_cluster_id: + next_extended_traj, next_updated_kalman = trajectory_extension( + current_tr, + cluster_df, + cl_id, + kalman_rowcopy(current_kalman, new_tr_id), + new_tr_id, + ) + + res_updated_traj.append(next_extended_traj) + res_updated_kalman.append(next_updated_kalman) + new_tr_id += 1 + + if len(res_updated_traj) == 0: + return ( + pd.DataFrame(columns=trajectory_df.columns), + pd.DataFrame(columns=kalman_df.columns), + new_tr_id, + ) + + # merge the extended trajectories + all_extended_traj = pd.concat(res_updated_traj) + all_new_kalman = pd.concat(res_updated_kalman) + + return all_extended_traj, all_new_kalman, new_tr_id + + +def single_alerts_associations( + trajectory_df: pd.DataFrame, + kalman_df: pd.DataFrame, + new_alerts: pd.DataFrame, + max_tr_id: int, +) -> Tuple[pd.DataFrame, pd.DataFrame]: + """ + Associates the single alerts with the kalman trajectories + + Parameters + ---------- + trajectory_df : pd.DataFrame + trajectories estimated from kalman filters + kalman_df : pd.DataFrame + dataframe containing the kalman filters informations + new_alerts : pd.DataFrame + new associateds alerts from the new observing night. + max_tr_id : int + maximum trajectory id to assign to the new kalman trajectories + + Returns + ------- + Tuple[pd.DataFrame, pd.DataFrame] + * new trajectories with the new added alerts + * new kalman filters + """ + cluster_df = new_alerts[ + (new_alerts["trajectory_id"] == -1) & (new_alerts["estimator_id"] != -1) + ] + traj_counts_duplicates = cluster_df["estimator_id"].value_counts().sort_index() + new_traj_id = np.arange(max_tr_id, max_tr_id + np.sum(traj_counts_duplicates)) + with pd.option_context("mode.chained_assignment", None): + cluster_df["trajectory_id"] = new_traj_id + cluster_df = cluster_df.sort_values("estimator_id") + + if len(cluster_df) == 0: + return pd.DataFrame(columns=trajectory_df.columns), pd.DataFrame( + columns=kalman_df.columns + ) + + new_kalman = pd.concat( + [ + update_kalman( + kalman_rowcopy(kalman_df[kalman_df["trajectory_id"] == est_id], tr_id), + ra, + dec, + jd, + magpsf, + fid, + tr_id, + ) + for ra, dec, jd, magpsf, fid, tr_id, est_id in cluster_df[ + ["ra", "dec", "jd", "magpsf", "fid", "trajectory_id", "estimator_id"] + ].values + ] + ) + traj_to_update = ( + trajectory_df[trajectory_df["trajectory_id"].isin(cluster_df["estimator_id"])] + .sort_values(["trajectory_id", "jd"]) + .reset_index(drop=True) + ) + traj_size = traj_to_update["trajectory_id"].value_counts().sort_index() + duplicate_id = repeat_chunk( + traj_to_update.index.values, traj_size.values, traj_counts_duplicates.values + ) + + traj_duplicate = traj_to_update.loc[duplicate_id] + nb_repeat = np.repeat(traj_size.values, traj_counts_duplicates.values) + tr_id_repeat = np.repeat(cluster_df["trajectory_id"].values, nb_repeat) + + traj_duplicate["trajectory_id"] = tr_id_repeat + nb_repeat = np.repeat(traj_size.values, traj_counts_duplicates.values) + tr_id_repeat = np.repeat(cluster_df["trajectory_id"].values, nb_repeat) + + traj_duplicate["trajectory_id"] = tr_id_repeat + new_traj = pd.concat([traj_duplicate, cluster_df.drop("estimator_id", axis=1)]) + return new_traj, new_kalman + + +def kalman_association( + trajectory_df: pd.DataFrame, + kalman_df: pd.DataFrame, + new_alerts: pd.DataFrame, + confirmed_sso: bool = False, +) -> Tuple[pd.DataFrame, pd.DataFrame]: """ Update the kalman filters and the trajectories with the new associations found during the night. This function initialize also the new kalman filters based on the seeds found in the night. @@ -151,7 +350,10 @@ def update_trajectories( kalman_df : pd.DataFrame dataframe containing the kalman filters new_alerts : pd.DataFrame - dataframe containing the alerts from the current nigth to associates with the kalman filters + dataframe containing the alerts from the current nigth to associates with the kalman filters, + the dataframe must contains the trajectory_id corresponding to the seeds find by the seeding. + confirmed_sso : boolean + if true, used the confirmed sso (for test purpose) Returns ------- @@ -162,137 +364,42 @@ def update_trajectories( Examples -------- - # see fink_fat/test/kalman_test/update_kalman_test + #### see fink_fat/test/kalman_test/update_kalman_test """ - # get the alerts associated with at least one kalman filter - new_alerts["seeds_next_night"] = new_alerts["trajectory_id"] - new_alerts_explode = new_alerts.explode( - [ - "ffdistnr", - "estimator_id", - ] + if confirmed_sso: + roid_flag = [3, 4] + else: + roid_flag = [1, 2, 4] + new_alerts = new_alerts[new_alerts["roid"].isin(roid_flag)] + new_alerts = new_alerts.explode(["ffdistnr", "estimator_id"]) + new_alerts["estimator_id"] = new_alerts["estimator_id"].fillna(-1).astype(int) + traj_id_to_update = np.sort(new_alerts["estimator_id"].unique()) + + # get the trajectory to update + mask_tr_update = trajectory_df["trajectory_id"].isin(traj_id_to_update) + mask_kalman_update = kalman_df["trajectory_id"].isin(traj_id_to_update) + non_tr_update_df = trajectory_df[~mask_tr_update] + non_kalman_update = kalman_df[~mask_kalman_update] + + res_tr, res_kalman, max_tr_id = tracklets_associations( + trajectory_df, kalman_df, new_alerts + ) + new_traj, new_kalman = single_alerts_associations( + trajectory_df, kalman_df, new_alerts, max_tr_id ) - new_alerts_explode = new_alerts_explode[~new_alerts_explode["ffdistnr"].isna()] - - # initialize new kalman filters based on the seeds no associated with a known kalman filter - tracklets_no_assoc = new_alerts[ - ~new_alerts["trajectory_id"].isin(new_alerts_explode["estimator_id"]) - ] - new_seeds = tracklets_no_assoc[tracklets_no_assoc["trajectory_id"] != -1] - kalman_new_seeds = init_kalman(new_seeds) - - new_alerts = new_alerts.drop("trajectory_id", axis=1) - new_alerts_explode = new_alerts_explode.drop("trajectory_id", axis=1) - - new_traj = [] - new_kalman = [] - new_traj_id = kalman_df["trajectory_id"].max() + 1 - unique_traj_id_to_update = new_alerts_explode["estimator_id"].unique() - for trajectory_id in unique_traj_id_to_update: - current_assoc = new_alerts_explode[ - new_alerts_explode["estimator_id"] == trajectory_id - ] - for seeds in current_assoc["seeds_next_night"].unique(): - if seeds != -1.0: - # add an intra night tracklets to a trajectory - current_trajectory = deepcopy( - trajectory_df[trajectory_df["trajectory_id"] == trajectory_id] - ) - current_seeds = deepcopy( - new_alerts[new_alerts["seeds_next_night"] == seeds] - ) - current_kalman_pdf = kalman_rowcopy( - kalman_df[kalman_df["trajectory_id"] == trajectory_id], new_traj_id - ) - - assert len(current_kalman_pdf) == 1 - - for el in current_seeds.sort_values("jd").values: - current_kalman_pdf = update_kalman( - current_kalman_pdf, - el, - ) - - with pd.option_context("mode.chained_assignment", None): - current_trajectory["trajectory_id"] = new_traj_id - current_kalman_pdf["trajectory_id"] = new_traj_id - current_seeds["trajectory_id"] = new_traj_id - new_traj.append(current_trajectory) - new_traj.append(current_seeds[current_trajectory.columns]) - new_kalman.append(current_kalman_pdf) - - new_traj_id += 1 - - else: - current_seeds = deepcopy( - current_assoc[(current_assoc["seeds_next_night"] == seeds)] - ) - current_seeds["trajectory_id"] = current_seeds["estimator_id"] - cols_to_keep = list(current_seeds.columns[:-5]) + ["trajectory_id"] - for el in current_seeds[cols_to_keep].values: - current_trajectory = deepcopy( - trajectory_df[trajectory_df["trajectory_id"] == trajectory_id] - ) - current_kalman_pdf = kalman_rowcopy( - kalman_df[kalman_df["trajectory_id"] == trajectory_id], - new_traj_id, - ) - assert len(current_kalman_pdf) == 1 - current_kalman_pdf = update_kalman( - current_kalman_pdf, - el, - ) - - with pd.option_context("mode.chained_assignment", None): - el[-1] = new_traj_id - current_trajectory["trajectory_id"] = new_traj_id - current_kalman_pdf["trajectory_id"] = new_traj_id - current_seeds["trajectory_id"] = new_traj_id - - new_traj.append(current_trajectory) - new_traj.append( - pd.DataFrame( - [el], - columns=cols_to_keep, - ) - ) - new_kalman.append(current_kalman_pdf) - - new_traj_id += 1 - - previous_traj = trajectory_df[ - ~trajectory_df["trajectory_id"].isin(unique_traj_id_to_update) - ] - previous_kalman = kalman_df[ - ~kalman_df["trajectory_id"].isin(unique_traj_id_to_update) - ] + new_traj = pd.concat([res_tr, new_traj]) + new_kalman = pd.concat([res_kalman, new_kalman]) - new_traj = pd.concat([previous_traj, pd.concat(new_traj)]).sort_values( - [ - "trajectory_id", - "jd", - ] - ) - new_kalman = pd.concat([previous_kalman, pd.concat(new_kalman)]) - - max_traj_id = new_kalman["trajectory_id"].max() + 1 - map_new_traj_id = { - curr_traj_id: new_traj_id - for curr_traj_id, new_traj_id in zip( - kalman_new_seeds["trajectory_id"], - np.arange(max_traj_id, max_traj_id + len(kalman_new_seeds), dtype=int), - ) - } - new_seeds["trajectory_id"] = new_seeds["trajectory_id"].map(map_new_traj_id) - kalman_new_seeds["trajectory_id"] = kalman_new_seeds["trajectory_id"].map( - map_new_traj_id - ) + # add a column to know which trajectories has been updated this night (Y: yes, N: no) + with pd.option_context("mode.chained_assignment", None): + new_traj["updated"] = "Y" + non_tr_update_df["updated"] = "N" - new_traj = new_traj.append(new_seeds[new_traj.columns]).reset_index(drop=True) - new_kalman = new_kalman.append(kalman_new_seeds).reset_index(drop=True) + traj_results = pd.concat([non_tr_update_df, new_traj]) + kalman_results = pd.concat([non_kalman_update, new_kalman]) - return new_traj, new_kalman + return traj_results, kalman_results if __name__ == "__main__": # pragma: no cover diff --git a/fink_fat/associations/association_orbit.py b/fink_fat/associations/association_orbit.py new file mode 100644 index 00000000..1b961a23 --- /dev/null +++ b/fink_fat/associations/association_orbit.py @@ -0,0 +1,118 @@ +import numpy as np +import pandas as pd +from typing import Tuple + +from fink_fat.command_line.orbit_cli import switch_local_cluster +from fink_fat.others.utils import LoggerNewLine + + +def generate_fake_traj_id(orb_pdf: pd.DataFrame) -> Tuple[pd.DataFrame, dict]: + """ + Generate fake integer trajectory_id based on the ssoCandId for the orbit fitting. + + Parameters + ---------- + orb_pdf : pd.DataFrame + contains the orbital parameters + + Returns + ------- + Tuple[pd.DataFrame, dict] + * the input orbital dataframe with the trajectory_id column + * a dictionary to translate back to the ssoCandId + """ + sso_id = orb_pdf["ssoCandId"].unique() + map_tr = { + sso_name: tr_id for sso_name, tr_id in zip(sso_id, np.arange(len(sso_id))) + } + tr_map = { + tr_id: sso_name for sso_name, tr_id in zip(sso_id, np.arange(len(sso_id))) + } + orb_pdf["trajectory_id"] = orb_pdf["ssoCandId"].map(map_tr) + return orb_pdf, tr_map + + +def orbit_associations( + config: dict, + new_alerts: pd.DataFrame, + trajectory_df: pd.DataFrame, + orbits: pd.DataFrame, + logger: LoggerNewLine, + verbose: bool, +) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: + """ + Add the alerts 5-flagged to the trajectory_df and recompute the orbit with the new points. + + Parameters + ---------- + config : dict + the data from the configuration file + new_alerts : pd.DataFrame + alerts from the current nights with the roid column + trajectory_df : pd.DataFrame + contains the alerts of the trajectories + orbits : pd.DataFrame + contains the orbital parameters of the orbits + logger : LoggerNewLine + logger class used to print the logs + verbose : bool + if true, print the logs + + Returns + ------- + Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame] + the new orbits, the updated trajectories and the old orbits + """ + orbit_cols_to_keep = list(orbits.columns) + traj_cols_to_keep = list(trajectory_df.columns) + if "ffdistnr" not in traj_cols_to_keep: + traj_cols_to_keep.append("ffdistnr") + + orbit_alert_assoc = ( + new_alerts[new_alerts["roid"] == 5] + .rename({"estimator_id": "ssoCandId"}, axis=1) + .drop("roid", axis=1) + ) + if len(orbit_alert_assoc) == 0: + return orbits, trajectory_df, pd.DataFrame(columns=orbits.columns) + + updated_sso_id = orbit_alert_assoc["ssoCandId"].unique() + # get the trajectories to update + traj_to_update = trajectory_df[trajectory_df["ssoCandId"].isin(updated_sso_id)] + # add the new alerts + traj_to_new_orbit = pd.concat([traj_to_update, orbit_alert_assoc]) + traj_to_new_orbit, trid_to_ssoid = generate_fake_traj_id(traj_to_new_orbit) + + if verbose: + logger.info(f"number of orbits to update: {len(updated_sso_id)}") + + duplicated_id = orbit_alert_assoc[ + orbit_alert_assoc[["ssoCandId", "jd"]].duplicated() + ]["ssoCandId"] + assert len(duplicated_id) == 0 + + new_orbit_pdf = switch_local_cluster(config, traj_to_new_orbit) + + # remove the failed orbits + new_orbit_pdf = new_orbit_pdf[new_orbit_pdf["a"] != -1.0] + if verbose: + logger.info( + f"number of successfull updated orbits: {len(new_orbit_pdf)} ({(len(new_orbit_pdf) / len(updated_sso_id)) * 100} %)" + ) + new_orbit_pdf["ssoCandId"] = new_orbit_pdf["trajectory_id"].map(trid_to_ssoid) + updated_id = new_orbit_pdf["ssoCandId"] + + # update orbit + mask_orbit_update = orbits["ssoCandId"].isin(updated_id) + current_orbit = orbits[~mask_orbit_update] + old_orbit = orbits[mask_orbit_update] + old_orbit_len = len(orbits) + orbits = pd.concat([current_orbit, new_orbit_pdf])[orbit_cols_to_keep] + assert len(orbits) == old_orbit_len + + # update traj + new_traj = traj_to_new_orbit[traj_to_new_orbit["ssoCandId"].isin(updated_id)] + old_traj = trajectory_df[~trajectory_df["ssoCandId"].isin(updated_id)] + + trajectory_df = pd.concat([old_traj, new_traj])[traj_cols_to_keep] + return orbits, trajectory_df, old_orbit diff --git a/fink_fat/command_line/orbit_cli.py b/fink_fat/command_line/orbit_cli.py index 0322f74f..cfc93903 100644 --- a/fink_fat/command_line/orbit_cli.py +++ b/fink_fat/command_line/orbit_cli.py @@ -1,11 +1,17 @@ import os import subprocess +from typing import Tuple import pandas as pd +import numpy as np import fink_fat -from fink_fat.others.utils import init_logging -from fink_fat.orbit_fitting.orbfit_local import get_last_detection +from fink_fat.others.utils import init_logging, LoggerNewLine +from fink_fat.orbit_fitting.orbfit_local import ( + get_last_detection, + compute_df_orbit_param, +) +from fink_fat.command_line.utils_cli import assig_tags def intro_reset_orbit(): # pragma: no cover @@ -74,7 +80,7 @@ def cluster_mode( dataframe containing the orbital parameters of the input trajectories columns: ref_epoch, a, e, i, long. node, arg. peric, mean anomaly, rms_a, rms_e, rms_i, rms_long. node, rms_arg. peric, rms_mean anomaly, chi_reduced, - last_ra, last_dec, last_jd, last_mag, last_fid + last_ra, last_dec, last_jd, last_mag, last_fid, ssoCandId Examples -------- @@ -185,6 +191,132 @@ def cluster_mode( return orbit_results +def switch_local_cluster(config: dict, traj_orb: pd.DataFrame) -> pd.DataFrame: + """ + Run the orbit fitting and choose cluster mode if the number of trajectories are above + the local limit set in the config file + + Parameters + ---------- + config : dict + contains data from the config file + traj_orb : pd.DataFrame + contains the alerts of the trajectories + + Returns + ------- + new_orbit_pdf : pd.DataFrame + contains the orbital parameters estimated from the traj_orb parameters by the orbit fitting. + """ + nb_orb = len(traj_orb["trajectory_id"].unique()) + if nb_orb > int(config["SOLVE_ORBIT_PARAMS"]["local_mode_limit"]): + new_orbit_pdf = cluster_mode(config, traj_orb) + else: + config_epoch = config["SOLVE_ORBIT_PARAMS"]["prop_epoch"] + prop_epoch = None if config_epoch == "None" else float(config_epoch) + + new_orbit_pdf = compute_df_orbit_param( + traj_orb, + int(config["SOLVE_ORBIT_PARAMS"]["cpu_count"]), + config["SOLVE_ORBIT_PARAMS"]["ram_dir"], + int(config["SOLVE_ORBIT_PARAMS"]["n_triplets"]), + int(config["SOLVE_ORBIT_PARAMS"]["noise_ntrials"]), + prop_epoch, + int(config["SOLVE_ORBIT_PARAMS"]["orbfit_verbose"]), + ).drop("provisional designation", axis=1) + return new_orbit_pdf + + +def kalman_to_orbit( + config: dict, + trajectory_df: pd.DataFrame, + trajectory_orb: pd.DataFrame, + kalman_df: pd.DataFrame, + orbits: pd.DataFrame, + logger: LoggerNewLine, + verbose: bool, +) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]: + """ + Compute and return the orbits for the trajectories with enough points found by the kalman filters + + + Parameters + ---------- + config : dict + the data from the config file + trajectory_df : pd.DataFrame + trajectories found by the kalman filters + trajectory_orb : pd.DataFrame + trajectories with orbits + kalman_df : pd.DataFrame + the kalman filters parameters + orbits : pd.DataFrame + the orbits parameters + logger : LoggerNewLine + the logging object used to print the logs + verbose : bool + if true, print the logs + + Returns + ------- + Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame,] + * trajectories found by the kalman filters without those sent to the orbit fitting + * kalman filters without those with the associated trajectories sent to the orbit fitting + * trajectories with the ones with orbits + * the orbits parameters with the new ones + """ + # get the trajectories large enough to go to the orbit fitting + orbit_limit = int(config["SOLVE_ORBIT_PARAMS"]["orbfit_limit"]) + traj_size = trajectory_df["trajectory_id"].value_counts() + large_traj = traj_size[traj_size >= orbit_limit] + + if len(large_traj) == 0: + return trajectory_df, kalman_df, trajectory_orb, orbits + + # to be send to the orbit fitting, + # a trajectory must have a number of point above the orbit point limit set in the config file + # and must have been updated during the current night + traj_to_orb = trajectory_df[ + (trajectory_df["trajectory_id"].isin(large_traj.index.values)) + & (trajectory_df["updated"] == "Y") + ].sort_values(["trajectory_id", "jd"]) + nb_traj_to_orb = len(traj_to_orb["trajectory_id"].unique()) + if verbose: + logger.info( + f"number of trajectories send to the orbit fitting: {nb_traj_to_orb}" + ) + new_orbits = switch_local_cluster(config, traj_to_orb) + new_orbits = new_orbits[new_orbits["a"] != -1.0] + if verbose: + logger.info( + f"number of orbit fitted: {len(new_orbits)} ({(len(new_orbits) / nb_traj_to_orb) * 100} %)" + ) + + # get the trajectories with orbit and assign the ssoCandId + new_traj_id = new_orbits["trajectory_id"].values + new_traj_orb = traj_to_orb[traj_to_orb["trajectory_id"].isin(new_traj_id)] + new_orbits, new_traj_orb = assig_tags(new_orbits, new_traj_orb, len(orbits) + 1) + + # add the new orbits and new trajectories with orbit + orbits = pd.concat([orbits, new_orbits]) + trajectory_orb = pd.concat([trajectory_orb, new_traj_orb]) + + # remove the new trajectories with orbit from trajectory_df and the associated kalman filters + trajectory_df = trajectory_df[ + ~trajectory_df["trajectory_id"].isin(new_traj_id) + ].reset_index(drop=True) + kalman_df = kalman_df[~kalman_df["trajectory_id"].isin(new_traj_id)].reset_index( + drop=True + ) + failed_orbit = np.setdiff1d(large_traj.index.values, new_traj_id) + mask_failed = kalman_df["trajectory_id"].isin(failed_orbit) + with pd.option_context("mode.chained_assignment", None): + kalman_df.loc[mask_failed, "orbfit_test"] = ( + kalman_df.loc[mask_failed, "orbfit_test"] + 1 + ) + return trajectory_df, kalman_df, trajectory_orb, orbits + + if __name__ == "__main__": from fink_science.tester import spark_unit_tests from pandas.testing import assert_frame_equal # noqa: F401 diff --git a/fink_fat/command_line/utils_cli.py b/fink_fat/command_line/utils_cli.py index 6d2f43aa..cf15b975 100644 --- a/fink_fat/command_line/utils_cli.py +++ b/fink_fat/command_line/utils_cli.py @@ -8,6 +8,7 @@ import fink_fat from fink_fat.others.id_tags import generate_tags from fink_fat.others.utils import init_logging +from typing import Tuple def string_to_bool(bool_str): @@ -419,7 +420,9 @@ def align_trajectory_id(trajectory_df, orbit_df, obs_orbit_df): return trajectory_df, orbit_df, obs_orbit_df -def assig_tags(orb_df, traj_orb_df, start_tags): +def assig_tags( + orb_df: pd.DataFrame, traj_orb_df: pd.DataFrame, start_tags: int +) -> Tuple[pd.DataFrame, pd.DataFrame]: """ Assign the ssoCandId to the orbital and traj_orb dataframe @@ -467,10 +470,9 @@ def assig_tags(orb_df, traj_orb_df, start_tags): } assert len(np.unique(orb_df["trajectory_id"])) == len(int_id_to_tags) - - orb_df["ssoCandId"] = orb_df["trajectory_id"].map(int_id_to_tags) - - traj_orb_df["ssoCandId"] = traj_orb_df["trajectory_id"].map(int_id_to_tags) + with pd.option_context("mode.chained_assignment", None): + orb_df["ssoCandId"] = orb_df["trajectory_id"].map(int_id_to_tags) + traj_orb_df["ssoCandId"] = traj_orb_df["trajectory_id"].map(int_id_to_tags) orb_df = orb_df.drop("trajectory_id", axis=1) traj_orb_df = traj_orb_df.drop("trajectory_id", axis=1) diff --git a/fink_fat/kalman/init_kalman.py b/fink_fat/kalman/init_kalman.py index 078a07d8..5e3dde18 100644 --- a/fink_fat/kalman/init_kalman.py +++ b/fink_fat/kalman/init_kalman.py @@ -61,7 +61,7 @@ def make_A(dt): ] tr_id, ra, dec, jd, mag, fid = ( - gb_input["trajectory_id"].values[0], + int(gb_input["trajectory_id"].values[0]), gb_input["ra"].values, gb_input["dec"].values, gb_input["jd"].values, diff --git a/fink_fat/others/utils.py b/fink_fat/others/utils.py index 78ffee73..4b7f2387 100644 --- a/fink_fat/others/utils.py +++ b/fink_fat/others/utils.py @@ -1,7 +1,6 @@ import numpy as np import fink_fat import logging -import types import datetime import pytz @@ -84,7 +83,63 @@ def formatTime(self, record, datefmt=None): return s -def init_logging(logger_name=fink_fat.__name__) -> logging.Logger: +class LoggerNewLine(logging.Logger): + """ + A custom logger class adding only a method to print a newline. + + Examples + -------- + logger.newline() + """ + + def __init__(self, name: str, level: int = 0) -> None: + super().__init__(name, level) + ch = logging.StreamHandler() + + self.setLevel(logging.DEBUG) + + # create console handler and set level to debug + ch = logging.StreamHandler() + ch.setLevel(logging.DEBUG) + + # create formatter + formatter = CustomTZFormatter( + "%(asctime)s - %(name)s - %(levelname)s \n\t message: %(message)s" + ) + + # add formatter to ch + ch.setFormatter(formatter) + + # add ch to logger + self.addHandler(ch) + + blank_handler = logging.StreamHandler() + blank_handler.setLevel(logging.DEBUG) + blank_handler.setFormatter(logging.Formatter(fmt="")) + self.console_handler = ch + self.blank_handler = blank_handler + + def newline(self, how_many_lines=1): + """ + Print blank line using the logger class + + Parameters + ---------- + how_many_lines : int, optional + how many blank line to print, by default 1 + """ + # Switch handler, output a blank line + self.removeHandler(self.console_handler) + self.addHandler(self.blank_handler) + for _ in range(how_many_lines): + self.info("\n") + + # Switch back + self.removeHandler(self.blank_handler) + self.addHandler(self.console_handler) + + +def init_logging(logger_name=fink_fat.__name__) -> LoggerNewLine: """ Initialise a logger for the gcn stream @@ -105,41 +160,6 @@ def init_logging(logger_name=fink_fat.__name__) -> logging.Logger: """ # create logger - def log_newline(self, how_many_lines=1): - # Switch handler, output a blank line - self.removeHandler(self.console_handler) - self.addHandler(self.blank_handler) - for i in range(how_many_lines): - self.info("\n") - - # Switch back - self.removeHandler(self.blank_handler) - self.addHandler(self.console_handler) - + logging.setLoggerClass(LoggerNewLine) logger = logging.getLogger(logger_name) - logger.setLevel(logging.DEBUG) - - # create console handler and set level to debug - ch = logging.StreamHandler() - ch.setLevel(logging.DEBUG) - - # create formatter - formatter = CustomTZFormatter( - "%(asctime)s - %(name)s - %(levelname)s \n\t message: %(message)s" - ) - - # add formatter to ch - ch.setFormatter(formatter) - - # add ch to logger - logger.addHandler(ch) - - blank_handler = logging.StreamHandler() - blank_handler.setLevel(logging.DEBUG) - blank_handler.setFormatter(logging.Formatter(fmt="")) - - logger.console_handler = ch - logger.blank_handler = blank_handler - logger.newline = types.MethodType(log_newline, logger) - return logger diff --git a/fink_fat/test/kalman_test/update_kalman_test/data.py b/fink_fat/test/kalman_test/update_kalman_test/data.py index 295e1a5e..39bf0c42 100644 --- a/fink_fat/test/kalman_test/update_kalman_test/data.py +++ b/fink_fat/test/kalman_test/update_kalman_test/data.py @@ -70,6 +70,7 @@ def data_test_1(): "ssdistnr": [1, 1, 1, 1], "magpsf": [17.5, 21.2, 19.6, 14.3], "trajectory_id": [0, 0, 1, 1], + "roid": [4, 4, 4, 4], "ffdistnr": [[0.1], [0.3], [0.1], [0.5]], "estimator_id": [[0], [0], [1], [1]], } @@ -145,8 +146,7 @@ def data_test_2(): "ssdistnr": [1, 1, 1, 1], "magpsf": [17.5, 21.2, 19.6, 14.3], "trajectory_id": [-1, -1, -1, -1], - "roid": [3, 3, 3, 3], - "t_estimator": ["kalman", "kalman", "kalman", "kalman"], + "roid": [4, 4, 4, 4], "ffdistnr": [[0.1], [0.3], [0.1], [0.5]], "estimator_id": [[1], [0], [1], [0]], } @@ -222,8 +222,7 @@ def data_test_3(): "ssdistnr": [1, 1, 1, 1], "magpsf": [17.5, 21.2, 19.6, 14.3], "trajectory_id": [-1, -1, -1, -1], - "roid": [3, 3, 3, 3], - "t_estimator": ["kalman", "kalman", "kalman", "kalman"], + "roid": [4, 4, 4, 4], "ffdistnr": [[0.1, 0.3], [0.3], [0.1], [0.5, 0.4]], "estimator_id": [[1, 2], [0], [1], [0, 2]], } @@ -299,8 +298,7 @@ def data_test_4(): "ssdistnr": [1, 1, 1, 1, 1, 1], "magpsf": [17.5, 21.2, 19.6, 14.3, 17.5, 19.8], "trajectory_id": [-1, -1, -1, -1, 0, 0], - "roid": [3, 3, 3, 3, 3, 3], - "t_estimator": ["kalman", "kalman", "kalman", "kalman", "kalman", "kalman"], + "roid": [4, 3, 4, 3, 4, 3], "ffdistnr": [[0.1, 0.3], [0.3], [0.1], [0.5, 0.4], [0.2], [0.4]], "estimator_id": [[1, 2], [0], [1], [0, 2], [0], [0]], } @@ -376,8 +374,7 @@ def data_test_5(): "ssdistnr": [1, 1, 1, 1, 1, 1], "magpsf": [17.5, 21.2, 19.6, 14.3, 17.5, 19.8], "trajectory_id": [1, 1, 2, 2, 0, 0], - "roid": [3, 3, 3, 3, 3, 3], - "t_estimator": ["kalman", "kalman", "kalman", "kalman", "kalman", "kalman"], + "roid": [4, 4, 3, 3, 4, 3], "ffdistnr": [[0.1, 0.3], [0.3], [0.1], [0.5, 0.4], [0.2], [0.4]], "estimator_id": [[1, 2], [0], [1], [0, 2], [0], [0]], } @@ -454,7 +451,6 @@ def data_test_6(): "magpsf": [17.5, 21.2, 19.6, 14.3, 17.5, 19.8], "trajectory_id": [-1, -1, 2, 2, 0, 0], "roid": [3, 3, 3, 3, 3, 3], - "t_estimator": ["kalman", "kalman", "kalman", "kalman", "kalman", "kalman"], "ffdistnr": [[0.1, 0.3], [0.3, 0.2], [0.1], [0.5, 0.4], [0.2], [0.4]], "estimator_id": [[1, 2], [0, 1], [1], [0, 2], [0], [0]], } @@ -531,7 +527,6 @@ def data_test_7(): "magpsf": [17.5, 21.2, 19.6, 14.3, 17.5, 19.8], "trajectory_id": [0, 0, 2, 2, 1, 1], "roid": [3, 3, 3, 3, 3, 3], - "t_estimator": ["kalman", "kalman", "kalman", "kalman", "kalman", "kalman"], "ffdistnr": [[0.1, 0.3], [0.3, 0.2], [0.1], [0.5], [0.2], [0.4]], "estimator_id": [[1, 2], [1, 2], [0], [0], [0], [0]], } diff --git a/fink_fat/test/kalman_test/update_kalman_test/data_test/new_kalman_data_test_1.parquet b/fink_fat/test/kalman_test/update_kalman_test/data_test/new_kalman_data_test_1.parquet index d926ea84..8708708c 100644 Binary files a/fink_fat/test/kalman_test/update_kalman_test/data_test/new_kalman_data_test_1.parquet and b/fink_fat/test/kalman_test/update_kalman_test/data_test/new_kalman_data_test_1.parquet differ diff --git a/fink_fat/test/kalman_test/update_kalman_test/data_test/new_kalman_data_test_2.parquet b/fink_fat/test/kalman_test/update_kalman_test/data_test/new_kalman_data_test_2.parquet index c0ac93c5..0d9663be 100644 Binary files a/fink_fat/test/kalman_test/update_kalman_test/data_test/new_kalman_data_test_2.parquet and b/fink_fat/test/kalman_test/update_kalman_test/data_test/new_kalman_data_test_2.parquet differ diff --git a/fink_fat/test/kalman_test/update_kalman_test/data_test/new_kalman_data_test_3.parquet b/fink_fat/test/kalman_test/update_kalman_test/data_test/new_kalman_data_test_3.parquet index 0c1fb135..96bcd1c4 100644 Binary files a/fink_fat/test/kalman_test/update_kalman_test/data_test/new_kalman_data_test_3.parquet and b/fink_fat/test/kalman_test/update_kalman_test/data_test/new_kalman_data_test_3.parquet differ diff --git a/fink_fat/test/kalman_test/update_kalman_test/data_test/new_kalman_data_test_4.parquet b/fink_fat/test/kalman_test/update_kalman_test/data_test/new_kalman_data_test_4.parquet index f3674c51..8a31d30f 100644 Binary files a/fink_fat/test/kalman_test/update_kalman_test/data_test/new_kalman_data_test_4.parquet and b/fink_fat/test/kalman_test/update_kalman_test/data_test/new_kalman_data_test_4.parquet differ diff --git a/fink_fat/test/kalman_test/update_kalman_test/data_test/new_kalman_data_test_5.parquet b/fink_fat/test/kalman_test/update_kalman_test/data_test/new_kalman_data_test_5.parquet index 617c68d3..08b8906d 100644 Binary files a/fink_fat/test/kalman_test/update_kalman_test/data_test/new_kalman_data_test_5.parquet and b/fink_fat/test/kalman_test/update_kalman_test/data_test/new_kalman_data_test_5.parquet differ diff --git a/fink_fat/test/kalman_test/update_kalman_test/data_test/new_kalman_data_test_6.parquet b/fink_fat/test/kalman_test/update_kalman_test/data_test/new_kalman_data_test_6.parquet index 8460c3ec..1701c2e6 100644 Binary files a/fink_fat/test/kalman_test/update_kalman_test/data_test/new_kalman_data_test_6.parquet and b/fink_fat/test/kalman_test/update_kalman_test/data_test/new_kalman_data_test_6.parquet differ diff --git a/fink_fat/test/kalman_test/update_kalman_test/data_test/new_kalman_data_test_7.parquet b/fink_fat/test/kalman_test/update_kalman_test/data_test/new_kalman_data_test_7.parquet index ef4fd8ed..d209dde3 100644 Binary files a/fink_fat/test/kalman_test/update_kalman_test/data_test/new_kalman_data_test_7.parquet and b/fink_fat/test/kalman_test/update_kalman_test/data_test/new_kalman_data_test_7.parquet differ diff --git a/fink_fat/test/kalman_test/update_kalman_test/data_test/new_traj_data_test_1.parquet b/fink_fat/test/kalman_test/update_kalman_test/data_test/new_traj_data_test_1.parquet index f412f774..94cbfb05 100644 Binary files a/fink_fat/test/kalman_test/update_kalman_test/data_test/new_traj_data_test_1.parquet and b/fink_fat/test/kalman_test/update_kalman_test/data_test/new_traj_data_test_1.parquet differ diff --git a/fink_fat/test/kalman_test/update_kalman_test/data_test/new_traj_data_test_2.parquet b/fink_fat/test/kalman_test/update_kalman_test/data_test/new_traj_data_test_2.parquet index 23fe1e33..ac795378 100644 Binary files a/fink_fat/test/kalman_test/update_kalman_test/data_test/new_traj_data_test_2.parquet and b/fink_fat/test/kalman_test/update_kalman_test/data_test/new_traj_data_test_2.parquet differ diff --git a/fink_fat/test/kalman_test/update_kalman_test/data_test/new_traj_data_test_3.parquet b/fink_fat/test/kalman_test/update_kalman_test/data_test/new_traj_data_test_3.parquet index 3722f426..20df1a3f 100644 Binary files a/fink_fat/test/kalman_test/update_kalman_test/data_test/new_traj_data_test_3.parquet and b/fink_fat/test/kalman_test/update_kalman_test/data_test/new_traj_data_test_3.parquet differ diff --git a/fink_fat/test/kalman_test/update_kalman_test/data_test/new_traj_data_test_4.parquet b/fink_fat/test/kalman_test/update_kalman_test/data_test/new_traj_data_test_4.parquet index f0a25538..d3fbabab 100644 Binary files a/fink_fat/test/kalman_test/update_kalman_test/data_test/new_traj_data_test_4.parquet and b/fink_fat/test/kalman_test/update_kalman_test/data_test/new_traj_data_test_4.parquet differ diff --git a/fink_fat/test/kalman_test/update_kalman_test/data_test/new_traj_data_test_5.parquet b/fink_fat/test/kalman_test/update_kalman_test/data_test/new_traj_data_test_5.parquet index d2dab27b..0f32d6f1 100644 Binary files a/fink_fat/test/kalman_test/update_kalman_test/data_test/new_traj_data_test_5.parquet and b/fink_fat/test/kalman_test/update_kalman_test/data_test/new_traj_data_test_5.parquet differ diff --git a/fink_fat/test/kalman_test/update_kalman_test/data_test/new_traj_data_test_6.parquet b/fink_fat/test/kalman_test/update_kalman_test/data_test/new_traj_data_test_6.parquet index ed414a83..cf7535fb 100644 Binary files a/fink_fat/test/kalman_test/update_kalman_test/data_test/new_traj_data_test_6.parquet and b/fink_fat/test/kalman_test/update_kalman_test/data_test/new_traj_data_test_6.parquet differ diff --git a/fink_fat/test/kalman_test/update_kalman_test/data_test/new_traj_data_test_7.parquet b/fink_fat/test/kalman_test/update_kalman_test/data_test/new_traj_data_test_7.parquet index d6278679..6b68fdaf 100644 Binary files a/fink_fat/test/kalman_test/update_kalman_test/data_test/new_traj_data_test_7.parquet and b/fink_fat/test/kalman_test/update_kalman_test/data_test/new_traj_data_test_7.parquet differ diff --git a/fink_fat/test/kalman_test/update_kalman_test/test_runner.py b/fink_fat/test/kalman_test/update_kalman_test/test_runner.py index a93a4a83..3b610fd5 100644 --- a/fink_fat/test/kalman_test/update_kalman_test/test_runner.py +++ b/fink_fat/test/kalman_test/update_kalman_test/test_runner.py @@ -1,7 +1,7 @@ import pandas as pd import numpy as np from pandas.testing import assert_frame_equal -from fink_fat.associations.association_kalman import update_trajectories +from fink_fat.associations.association_kalman import kalman_association from fink_fat.test.kalman_test.update_kalman_test import data as d @@ -29,6 +29,7 @@ def assert_test( check_index_type=False, check_dtype=False, ) + assert_frame_equal( new_kalman.fillna(-1.0)[new_kalman.columns[:-1]].reset_index(drop=True), result_kalman.fillna(-1.0).reset_index(drop=True), @@ -39,7 +40,9 @@ def assert_test( def aux_test_runner(f): trajectory_df, kalman_pdf, new_alerts = f() - new_traj, new_kalman = update_trajectories(trajectory_df, kalman_pdf, new_alerts) + new_traj, new_kalman = kalman_association( + trajectory_df, kalman_pdf, new_alerts, True + ) # print(" FUNCTION RESULTS ") # print(new_traj.fillna(-1.0))