diff --git a/.flake8 b/.flake8 index 5aa1a7f7..48cf754c 100644 --- a/.flake8 +++ b/.flake8 @@ -13,6 +13,9 @@ exclude = fink_fat/others/night_report.py ../fink-fat/fink_fat_out_2/mpc/test_assoc_tag.py ../fink-fat/fink_fat_test/plot_perf_test.py + ../fink-fat/test_assoc_orbit.py + ../fink-fat/fink_fat/test/pipeline_test.py + ../fink-fat/fink_fat/test/pipeline_analysis.py per-file-ignores = ../fink-fat/fink_fat/orbit_fitting/orbfit_files.py:W503 ../fink-fat/fink_fat/orbit_fitting/mpcobs_files.py:W503 diff --git a/fink_fat/command_line/association_cli.py b/fink_fat/command_line/association_cli.py index 2283c8e5..7db642dd 100644 --- a/fink_fat/command_line/association_cli.py +++ b/fink_fat/command_line/association_cli.py @@ -8,6 +8,12 @@ from fink_fat.others.utils import init_logging from astropy.time import Time +import fink_fat +import subprocess +from pyspark.sql.functions import col +from fink_fat.command_line.utils_cli import string_to_bool +import configparser + def request_fink( object_class, @@ -209,21 +215,23 @@ def get_last_sso_alert_from_file(filepath, verbose=False): >>> assert len(pdf) == 2798 >>> assert 'objectId' in pdf.columns """ - pdf = pd.read_csv(filepath, header=0, sep=r'\s+', index_col=False) + pdf = pd.read_csv(filepath, header=0, sep=r"\s+", index_col=False) - required_header = ['ra', 'dec', 'jd', 'magpsf', 'sigmapsf'] + required_header = ["ra", "dec", "jd", "magpsf", "sigmapsf"] msg = """ The header of {} must contain at least the following fields: ra dec jd magpsf sigmapsf - """.format(filepath) + """.format( + filepath + ) assert set(required_header) - set(pdf.columns) == set(), AssertionError(msg) - if 'objectId' not in pdf.columns: - pdf['objectId'] = range(len(pdf)) + if "objectId" not in pdf.columns: + pdf["objectId"] = range(len(pdf)) - pdf['candid'] = range(10, len(pdf) + 10) - pdf['nid'] = 0 - pdf['fid'] = 0 + pdf["candid"] = range(10, len(pdf) + 10) + pdf["nid"] = 0 + pdf["fid"] = 0 required_columns = [ "objectId", @@ -240,7 +248,7 @@ def get_last_sso_alert_from_file(filepath, verbose=False): ] if len(pdf) > 0: - date = Time(pdf['jd'].values[0], format='jd').iso.split(' ')[0] + date = Time(pdf["jd"].values[0], format="jd").iso.split(" ")[0] pdf.insert(len(pdf.columns), "not_updated", np.ones(len(pdf), dtype=np.bool_)) pdf.insert(len(pdf.columns), "last_assoc_date", date) else: @@ -248,6 +256,7 @@ def get_last_sso_alert_from_file(filepath, verbose=False): return pdf[required_columns] + def get_last_sso_alert(object_class, date, verbose=False): """ Get the alerts from Fink corresponding to the object_class for the given date. @@ -344,6 +353,100 @@ def get_last_sso_alert(object_class, date, verbose=False): return pdf[required_columns] +def get_last_roid_streaming_alert( + config: configparser.ConfigParser, + is_mpc: bool, + mode: str, + read_path: str, + output_path: str = None, +): + if mode == "local": + # load alerts from local + sso_night = pd.read_parquet(read_path) + + elif mode == "spark": + assert ( + output_path is not None + ), "The argument 'output_path' is None.\nYou must set an output_path when loading data with spark" + + # load alerts from spark + master_manager = config["SOLVE_ORBIT_PARAMS"]["manager"] + principal_group = config["SOLVE_ORBIT_PARAMS"]["principal"] + secret = config["SOLVE_ORBIT_PARAMS"]["secret"] + role = config["SOLVE_ORBIT_PARAMS"]["role"] + executor_env = config["SOLVE_ORBIT_PARAMS"]["exec_env"] + driver_mem = config["SOLVE_ORBIT_PARAMS"]["driver_memory"] + exec_mem = config["SOLVE_ORBIT_PARAMS"]["executor_memory"] + max_core = config["SOLVE_ORBIT_PARAMS"]["max_core"] + exec_core = config["SOLVE_ORBIT_PARAMS"]["executor_core"] + + application = os.path.join( + os.path.dirname(fink_fat.__file__), + "command_line", + "association_cli.py prod", + ) + + application += " " + master_manager + application += " " + read_path + application += " " + is_mpc + + spark_submit = "spark-submit \ + --master {} \ + --conf spark.mesos.principal={} \ + --conf spark.mesos.secret={} \ + --conf spark.mesos.role={} \ + --conf spark.executorEnv.HOME={} \ + --driver-memory {}G \ + --executor-memory {}G \ + --conf spark.cores.max={} \ + --conf spark.executor.cores={} \ + {}".format( + master_manager, + principal_group, + secret, + role, + executor_env, + driver_mem, + exec_mem, + max_core, + exec_core, + application, + ) + + process = subprocess.run(spark_submit, shell=True) + if process.returncode != 0: + logger = init_logging() + logger.info(process.stderr) + logger.info(process.stdout) + exit() + + sso_night = pd.read_parquet(output_path) + os.remove(output_path) + + else: + raise ValueError(f"mode {mode} not exist") + + roid_pdf = pd.json_normalize(sso_night["ff_roid"]) + sso_night = pd.concat( + [sso_night, roid_pdf], + axis=1, + ) + cols_to_keep = [ + "objectId", + "candid", + "ra", + "dec", + "jd", + "magpsf", + "sigmapsf", + "fid", + "ssnamenr", + "roid", + "estimator_id", + "ffdistnr", + ] + return sso_night[cols_to_keep] + def intro_reset(): # pragma: no cover logger = init_logging() logger.info("WARNING !!!") @@ -453,13 +556,45 @@ def get_data(tr_df_path, obs_df_path): if __name__ == "__main__": # pragma: no cover import sys - import doctest - from pandas.testing import assert_frame_equal # noqa: F401 - import fink_fat.test.test_sample as ts # noqa: F401 - from unittest import TestCase # noqa: F401 - - if "unittest.util" in __import__("sys").modules: - # Show full diff in self.assertEqual. - __import__("sys").modules["unittest.util"]._MAX_LENGTH = 999999999 - - sys.exit(doctest.testmod()[0]) + if sys.argv[1] == "test": + import doctest + from pandas.testing import assert_frame_equal # noqa: F401 + import fink_fat.test.test_sample as ts # noqa: F401 + from unittest import TestCase # noqa: F401 + + if "unittest.util" in __import__("sys").modules: + # Show full diff in self.assertEqual. + __import__("sys").modules["unittest.util"]._MAX_LENGTH = 999999999 + + sys.exit(doctest.testmod()[0]) + elif sys.argv[1] == "prod": + from pyspark.sql import SparkSession + + logger = init_logging() + master_adress = str(sys.argv[2]) + read_path = str(sys.argv[3]) + output_path = str(sys.argv[4]) + is_mpc = string_to_bool(str(sys.argv[5])) + + spark = ( + SparkSession.builder.master(master_adress) + .appName("Fink-FAT_solve_orbit") + .getOrCreate() + ) + df = spark.read.load(read_path) + df = df.select( + "objectId", + "candid", + "candidate.ra", + "candidate.dec", + "candidate.jd", + "candidate.magpsf", + "candidate.sigmapsf", + "candidate.fid", + "candidate.ssnamenr", + "ff_roid", + ) + roid_flag = [3, 4, 5] if is_mpc else [1, 2, 4, 5] + df = df.filter(col("ff_roid.roid").isin(roid_flag)) + df_local = df.toPandas() + df_local.to_parquet(output_path, index=False) diff --git a/fink_fat/command_line/cli_main/associations.py b/fink_fat/command_line/cli_main/associations.py index 1b599a06..88e078c2 100644 --- a/fink_fat/command_line/cli_main/associations.py +++ b/fink_fat/command_line/cli_main/associations.py @@ -18,12 +18,14 @@ from fink_fat.command_line.association_cli import ( get_data, get_last_sso_alert, + get_last_sso_alert_from_file, intro_reset, no_reset, yes_reset, ) from fink_fat.others.utils import cast_obs_data, init_logging +from fink_fat.command_line.cli_main.fitroid import fitroid_associations def cli_associations(arguments, config, output_path): @@ -44,6 +46,10 @@ def cli_associations(arguments, config, output_path): # get the path according to the class mpc or candidates output_path, object_class = get_class(arguments, output_path) + if object_class == "SSO fitroid": + fitroid_associations(arguments, config, logger, output_path) + exit() + tr_df_path = os.path.join(output_path, "trajectory_df.parquet") obs_df_path = os.path.join(output_path, "old_obs.parquet") @@ -92,7 +98,24 @@ def cli_associations(arguments, config, output_path): exit() t_before = t.time() - new_alerts = get_last_sso_alert(object_class, last_night, arguments["--verbose"]) + + if arguments["--filepath"]: + new_alerts = get_last_sso_alert_from_file( + arguments["--filepath"], arguments["--verbose"] + ) + if arguments["--verbose"]: + print( + "Number of alerts measurements from {}: {}".format( + arguments["--filepath"], len(new_alerts) + ) + ) + else: + new_alerts = get_last_sso_alert( + object_class, last_night, arguments["--verbose"] + ) + if arguments["--verbose"]: + print("Number of alerts retrieve from fink: {}".format(len(new_alerts))) + if len(new_alerts) == 0: logger.info("no alerts available for the night of {}".format(last_night)) exit() diff --git a/fink_fat/command_line/cli_main/offline.py b/fink_fat/command_line/cli_main/offline.py index 5040e0a8..420d618b 100644 --- a/fink_fat/command_line/cli_main/offline.py +++ b/fink_fat/command_line/cli_main/offline.py @@ -274,7 +274,7 @@ def cli_offline(arguments, config, output_path): int(config["SOLVE_ORBIT_PARAMS"]["noise_ntrials"]), prop_epoch=float(prop_epoch) if prop_epoch != "None" else None, verbose_orbfit=int(config["SOLVE_ORBIT_PARAMS"]["orbfit_verbose"]), - verbose=arguments["--verbose"] + verbose=arguments["--verbose"], ).drop("provisional designation", axis=1) orbfit_time = t.time() - t_before diff --git a/fink_fat/command_line/cli_main/solve_orbit.py b/fink_fat/command_line/cli_main/solve_orbit.py index 209cb504..894c0eb2 100644 --- a/fink_fat/command_line/cli_main/solve_orbit.py +++ b/fink_fat/command_line/cli_main/solve_orbit.py @@ -70,7 +70,7 @@ def cli_solve_orbit(arguments, config, output_path): int(config["SOLVE_ORBIT_PARAMS"]["noise_ntrials"]), prop_epoch=float(prop_epoch) if prop_epoch != "None" else None, verbose_orbfit=int(config["SOLVE_ORBIT_PARAMS"]["orbfit_verbose"]), - verbose=arguments["--verbose"] + verbose=arguments["--verbose"], ).drop("provisional designation", axis=1) orbfit_time = t.time() - t_before diff --git a/fink_fat/command_line/fink_fat_cli.py b/fink_fat/command_line/fink_fat_cli.py index 942b108b..06188caf 100755 --- a/fink_fat/command_line/fink_fat_cli.py +++ b/fink_fat/command_line/fink_fat_cli.py @@ -1,6 +1,6 @@ """ Usage: - fink_fat associations (mpc | candidates) [--night ] [options] + fink_fat associations (mpc | candidates | fitroid) [--night ] [--filepath ] [options] fink_fat kalman (mpc | candidates) [--night ] [options] fink_fat solve_orbit (mpc | candidates) (local | cluster) [options] fink_fat merge_orbit (mpc | candidates) [options] @@ -10,36 +10,37 @@ fink_fat --version Options: - associations Perform associations of alert to return a set of trajectories candidates. - kalman Perform associations of alerts with the kalman filters and the orbits - (Must be used with the new fink-science asteroid module) - solve_orbit Resolve a dynamical inverse problem to return a set of orbital elements from - the set of trajectories candidates. - merge_orbit Merge the orbit candidates if the both trajectories can belong to the same solar system objects. - offline Associate the alerts to form trajectories candidates then solve the orbit - until the end parameters. Starts from saved data or from the start parameters - if provided. - stats Print statistics about trajectories detected by assocations, the old observations - and, if exists, the orbital elements for some trajectories. - mpc Return the associations on the solar system mpc alerts (only for tests purpose). - candidates Run the associations on the solar system candidates alerts. - local Run the orbital solver in local mode. Use multiprocessing to speed-up the computation. - cluster Run the orbital solver in cluster mode. Use a Spark cluster to significantly speed-up the computation. - The cluster mode need to be launch on a system where pyspark are installed and a spark cluster manager are setup. - -n --night Specify the night to request sso alerts from fink broker. - Format is yyyy-mm-dd as yyyy = year, mm = month, dd = day. - Example : 2022-03-04 for the 2022 march 04. - [intervall of day between the day starting at night midday until night midday + 1] - -m --mpc-data Compute statistics according to the minor planet center database. - of the mpc database file. - The mpc database can be downloaded by pasting this url in your browser: https://minorplanetcenter.net/Extended_Files/mpcorb_extended.json.gz - -r --reset Remove the file containing the trajectories candidates, the old observations and the orbits. - -s --save Save the alerts sent by Fink before the associations for statistics purposes. - Save also additional statistics : computation time, number of alerts from the current days, number of candidates trajectories, number of old observations. - -h --help Show help and quit. - --version Show version. - --config FILE Specify the config file - --verbose Print information and progress bar during the process + associations Perform associations of alert to return a set of trajectories candidates. + solve_orbit Resolve a dynamical inverse problem to return a set of orbital elements from + the set of trajectories candidates. + merge_orbit Merge the orbit candidates if the both trajectories can belong to the same solar system objects. + offline Associate the alerts to form trajectories candidates then solve the orbit + until the end parameters. Starts from saved data or from the start parameters + if provided. + stats Print statistics about trajectories detected by assocations, the old observations + and, if exists, the orbital elements for some trajectories. + mpc Return the associations on the solar system mpc alerts (only for tests purpose). + candidates Run the associations on the solar system candidates alerts. + fitroid Merge the trajectories on the alerts associated with the trajectories by the roid science module of fink + (the roid science module from fink-science must have been run before) + local Run the orbital solver in local mode. Use multiprocessing to speed-up the computation. + cluster Run the orbital solver in cluster mode. Use a Spark cluster to significantly speed-up the computation. + The cluster mode need to be launch on a system where pyspark are installed and a spark cluster manager are setup. + -n --night Specify the night to request sso alerts from fink broker. + Format is yyyy-mm-dd as yyyy = year, mm = month, dd = day. + Example : 2022-03-04 for the 2022 march 04. + [intervall of day between the day starting at night midday until night midday + 1] + -f --filepath Path to the Euclid SSOPipe output file. + -m --mpc-data Compute statistics according to the minor planet center database. + of the mpc database file. + The mpc database can be downloaded by pasting this url in your browser: https://minorplanetcenter.net/Extended_Files/mpcorb_extended.json.gz + -r --reset Remove the file containing the trajectories candidates, the old observations and the orbits. + -s --save Save the alerts sent by Fink before the associations for statistics purposes. + Save also additional statistics : computation time, number of alerts from the current days, number of candidates trajectories, number of old observations. + -h --help Show help and quit. + --version Show version. + --config FILE Specify the config file + --verbose Print information and progress bar during the process """ from fink_fat import __version__ diff --git a/fink_fat/command_line/orbit_cli.py b/fink_fat/command_line/orbit_cli.py index 60e9487b..e7c1f922 100644 --- a/fink_fat/command_line/orbit_cli.py +++ b/fink_fat/command_line/orbit_cli.py @@ -191,7 +191,9 @@ def cluster_mode( return orbit_results -def switch_local_cluster(config: dict, traj_orb: pd.DataFrame, verbose=False) -> pd.DataFrame: +def switch_local_cluster( + config: dict, traj_orb: pd.DataFrame, verbose=False +) -> pd.DataFrame: """ Run the orbit fitting and choose cluster mode if the number of trajectories are above the local limit set in the config file @@ -230,7 +232,7 @@ def switch_local_cluster(config: dict, traj_orb: pd.DataFrame, verbose=False) -> int(config["SOLVE_ORBIT_PARAMS"]["noise_ntrials"]), prop_epoch=float(prop_epoch) if prop_epoch != "None" else None, verbose_orbfit=int(config["SOLVE_ORBIT_PARAMS"]["orbfit_verbose"]), - verbose=verbose + verbose=verbose, ).drop("provisional designation", axis=1) return new_orbit_pdf @@ -319,9 +321,9 @@ def trcand_to_orbit( trajectory_df = trajectory_df[ ~trajectory_df["trajectory_id"].isin(new_traj_id) ].reset_index(drop=True) - trparams_df = trparams_df[~trparams_df["trajectory_id"].isin(new_traj_id)].reset_index( - drop=True - ) + trparams_df = trparams_df[ + ~trparams_df["trajectory_id"].isin(new_traj_id) + ].reset_index(drop=True) failed_orbit = np.setdiff1d(large_traj.index.values, new_traj_id) mask_failed = trparams_df["trajectory_id"].isin(failed_orbit) with pd.option_context("mode.chained_assignment", None): diff --git a/fink_fat/command_line/utils_cli.py b/fink_fat/command_line/utils_cli.py index ad01f65d..f89c7946 100644 --- a/fink_fat/command_line/utils_cli.py +++ b/fink_fat/command_line/utils_cli.py @@ -55,7 +55,7 @@ def before_get(self, parser, section, option, value, defaults): return os.path.expandvars(value) -def init_cli(arguments): +def init_cli(arguments: dict) -> Tuple[configparser.ConfigParser, str]: """ Read the fink_fat configuration file of fink_fat specified by the --config argument @@ -181,10 +181,15 @@ def get_class(arguments, path): if not os.path.isdir(path): os.mkdir(path) object_class = "Solar System candidate" + elif arguments["fitroid"]: + path = os.path.join(path, "fitroid", "") + if not os.path.isdir(path): + os.mkdir(path) + object_class = "SSO fitroid" else: # pragma: no cover raise ValueError( - "Class does not correspond to a sso class from fink, got {}".format( - arguments["mpc"] + "Class does not correspond to a sso class from fink\nargument keys: {}".format( + arguments ) ) diff --git a/fink_fat/data/fink_fat.conf b/fink_fat/data/fink_fat.conf index 7a04bc1f..5c751fed 100644 --- a/fink_fat/data/fink_fat.conf +++ b/fink_fat/data/fink_fat.conf @@ -1,18 +1,40 @@ [TW_PARAMS] +# all params are in days trajectory_keep_limit=15 old_observation_keep_limit=2 trajectory_2_points_keep_limit=8 +orbit_keep_limit=30 +predict_function_keep_limit=10 [ASSOC_PARAMS] +# arcsecond intra_night_separation=120 + intra_night_magdiff_limit_same_fid=0.2 intra_night_magdiff_limit_diff_fid=0.8 + +# deg/day inter_night_separation=0.3 + inter_night_magdiff_limit_same_fid=0.1 inter_night_magdiff_limit_diff_fid=0.5 + +# degree maximum_angle=1 + +# arcmin +error_radius=15 + +# arcsecond +orbit_assoc_radius=600.0 + +# if true, use dbscan for intra_night associations, use legacy fink_fat otherwise use_dbscan=False +# roid_mpc = (True | False) if true load mpc alerts for associations with the fink alert stream. +# load sso candidates otherwise (for fink_fat version >= 1.0) +roid_mpc= + [ASSOC_PERF] store_kd_tree=false @@ -44,4 +66,8 @@ tracklets_with_old_observations=true new_observations_with_old_observations=true [OUTPUT] -association_output_file=fink_fat_out \ No newline at end of file +association_output_file=fink_fat_out + +# roid_path_mode = ('local' | 'spark') +roid_path_mode= +roid_module_output= \ No newline at end of file