Skip to content

Commit

Permalink
fitroid command line
Browse files Browse the repository at this point in the history
  • Loading branch information
FusRoman committed Dec 4, 2023
1 parent 4f8f8ad commit e9575c5
Show file tree
Hide file tree
Showing 9 changed files with 257 additions and 62 deletions.
3 changes: 3 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ exclude =
fink_fat/others/night_report.py
../fink-fat/fink_fat_out_2/mpc/test_assoc_tag.py
../fink-fat/fink_fat_test/plot_perf_test.py
../fink-fat/test_assoc_orbit.py
../fink-fat/fink_fat/test/pipeline_test.py
../fink-fat/fink_fat/test/pipeline_analysis.py
per-file-ignores =
../fink-fat/fink_fat/orbit_fitting/orbfit_files.py:W503
../fink-fat/fink_fat/orbit_fitting/mpcobs_files.py:W503
Expand Down
173 changes: 154 additions & 19 deletions fink_fat/command_line/association_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@
from fink_fat.others.utils import init_logging

from astropy.time import Time
import fink_fat
import subprocess
from pyspark.sql.functions import col
from fink_fat.command_line.utils_cli import string_to_bool
import configparser


def request_fink(
object_class,
Expand Down Expand Up @@ -209,21 +215,23 @@ def get_last_sso_alert_from_file(filepath, verbose=False):
>>> assert len(pdf) == 2798
>>> assert 'objectId' in pdf.columns
"""
pdf = pd.read_csv(filepath, header=0, sep=r'\s+', index_col=False)
pdf = pd.read_csv(filepath, header=0, sep=r"\s+", index_col=False)

required_header = ['ra', 'dec', 'jd', 'magpsf', 'sigmapsf']
required_header = ["ra", "dec", "jd", "magpsf", "sigmapsf"]
msg = """
The header of {} must contain at least the following fields:
ra dec jd magpsf sigmapsf
""".format(filepath)
""".format(
filepath
)
assert set(required_header) - set(pdf.columns) == set(), AssertionError(msg)

if 'objectId' not in pdf.columns:
pdf['objectId'] = range(len(pdf))
if "objectId" not in pdf.columns:
pdf["objectId"] = range(len(pdf))

pdf['candid'] = range(10, len(pdf) + 10)
pdf['nid'] = 0
pdf['fid'] = 0
pdf["candid"] = range(10, len(pdf) + 10)
pdf["nid"] = 0
pdf["fid"] = 0

required_columns = [
"objectId",
Expand All @@ -240,14 +248,15 @@ def get_last_sso_alert_from_file(filepath, verbose=False):
]

if len(pdf) > 0:
date = Time(pdf['jd'].values[0], format='jd').iso.split(' ')[0]
date = Time(pdf["jd"].values[0], format="jd").iso.split(" ")[0]
pdf.insert(len(pdf.columns), "not_updated", np.ones(len(pdf), dtype=np.bool_))
pdf.insert(len(pdf.columns), "last_assoc_date", date)
else:
return pd.DataFrame(columns=required_columns)

return pdf[required_columns]


def get_last_sso_alert(object_class, date, verbose=False):
"""
Get the alerts from Fink corresponding to the object_class for the given date.
Expand Down Expand Up @@ -344,6 +353,100 @@ def get_last_sso_alert(object_class, date, verbose=False):
return pdf[required_columns]


def get_last_roid_streaming_alert(
config: configparser.ConfigParser,
is_mpc: bool,
mode: str,
read_path: str,
output_path: str = None,
):
if mode == "local":
# load alerts from local
sso_night = pd.read_parquet(read_path)

elif mode == "spark":
assert (
output_path is not None
), "The argument 'output_path' is None.\nYou must set an output_path when loading data with spark"

# load alerts from spark
master_manager = config["SOLVE_ORBIT_PARAMS"]["manager"]
principal_group = config["SOLVE_ORBIT_PARAMS"]["principal"]
secret = config["SOLVE_ORBIT_PARAMS"]["secret"]
role = config["SOLVE_ORBIT_PARAMS"]["role"]
executor_env = config["SOLVE_ORBIT_PARAMS"]["exec_env"]
driver_mem = config["SOLVE_ORBIT_PARAMS"]["driver_memory"]
exec_mem = config["SOLVE_ORBIT_PARAMS"]["executor_memory"]
max_core = config["SOLVE_ORBIT_PARAMS"]["max_core"]
exec_core = config["SOLVE_ORBIT_PARAMS"]["executor_core"]

application = os.path.join(
os.path.dirname(fink_fat.__file__),
"command_line",
"association_cli.py prod",
)

application += " " + master_manager
application += " " + read_path
application += " " + is_mpc

spark_submit = "spark-submit \
--master {} \
--conf spark.mesos.principal={} \
--conf spark.mesos.secret={} \
--conf spark.mesos.role={} \
--conf spark.executorEnv.HOME={} \
--driver-memory {}G \
--executor-memory {}G \
--conf spark.cores.max={} \
--conf spark.executor.cores={} \
{}".format(
master_manager,
principal_group,
secret,
role,
executor_env,
driver_mem,
exec_mem,
max_core,
exec_core,
application,
)

process = subprocess.run(spark_submit, shell=True)
if process.returncode != 0:
logger = init_logging()
logger.info(process.stderr)
logger.info(process.stdout)
exit()

sso_night = pd.read_parquet(output_path)
os.remove(output_path)

else:
raise ValueError(f"mode {mode} not exist")

roid_pdf = pd.json_normalize(sso_night["ff_roid"])
sso_night = pd.concat(
[sso_night, roid_pdf],
axis=1,
)
cols_to_keep = [
"objectId",
"candid",
"ra",
"dec",
"jd",
"magpsf",
"sigmapsf",
"fid",
"ssnamenr",
"roid",
"estimator_id",
"ffdistnr",
]
return sso_night[cols_to_keep]

def intro_reset(): # pragma: no cover
logger = init_logging()
logger.info("WARNING !!!")
Expand Down Expand Up @@ -453,13 +556,45 @@ def get_data(tr_df_path, obs_df_path):

if __name__ == "__main__": # pragma: no cover
import sys
import doctest
from pandas.testing import assert_frame_equal # noqa: F401
import fink_fat.test.test_sample as ts # noqa: F401
from unittest import TestCase # noqa: F401

if "unittest.util" in __import__("sys").modules:
# Show full diff in self.assertEqual.
__import__("sys").modules["unittest.util"]._MAX_LENGTH = 999999999

sys.exit(doctest.testmod()[0])
if sys.argv[1] == "test":
import doctest
from pandas.testing import assert_frame_equal # noqa: F401
import fink_fat.test.test_sample as ts # noqa: F401
from unittest import TestCase # noqa: F401

if "unittest.util" in __import__("sys").modules:
# Show full diff in self.assertEqual.
__import__("sys").modules["unittest.util"]._MAX_LENGTH = 999999999

sys.exit(doctest.testmod()[0])
elif sys.argv[1] == "prod":
from pyspark.sql import SparkSession

logger = init_logging()
master_adress = str(sys.argv[2])
read_path = str(sys.argv[3])
output_path = str(sys.argv[4])
is_mpc = string_to_bool(str(sys.argv[5]))

spark = (
SparkSession.builder.master(master_adress)
.appName("Fink-FAT_solve_orbit")
.getOrCreate()
)
df = spark.read.load(read_path)
df = df.select(
"objectId",
"candid",
"candidate.ra",
"candidate.dec",
"candidate.jd",
"candidate.magpsf",
"candidate.sigmapsf",
"candidate.fid",
"candidate.ssnamenr",
"ff_roid",
)
roid_flag = [3, 4, 5] if is_mpc else [1, 2, 4, 5]
df = df.filter(col("ff_roid.roid").isin(roid_flag))
df_local = df.toPandas()
df_local.to_parquet(output_path, index=False)
25 changes: 24 additions & 1 deletion fink_fat/command_line/cli_main/associations.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
from fink_fat.command_line.association_cli import (
get_data,
get_last_sso_alert,
get_last_sso_alert_from_file,
intro_reset,
no_reset,
yes_reset,
)

from fink_fat.others.utils import cast_obs_data, init_logging
from fink_fat.command_line.cli_main.fitroid import fitroid_associations


def cli_associations(arguments, config, output_path):
Expand All @@ -44,6 +46,10 @@ def cli_associations(arguments, config, output_path):
# get the path according to the class mpc or candidates
output_path, object_class = get_class(arguments, output_path)

if object_class == "SSO fitroid":
fitroid_associations(arguments, config, logger, output_path)
exit()

tr_df_path = os.path.join(output_path, "trajectory_df.parquet")
obs_df_path = os.path.join(output_path, "old_obs.parquet")

Expand Down Expand Up @@ -92,7 +98,24 @@ def cli_associations(arguments, config, output_path):
exit()

t_before = t.time()
new_alerts = get_last_sso_alert(object_class, last_night, arguments["--verbose"])

if arguments["--filepath"]:
new_alerts = get_last_sso_alert_from_file(
arguments["--filepath"], arguments["--verbose"]
)
if arguments["--verbose"]:
print(
"Number of alerts measurements from {}: {}".format(
arguments["--filepath"], len(new_alerts)
)
)
else:
new_alerts = get_last_sso_alert(
object_class, last_night, arguments["--verbose"]
)
if arguments["--verbose"]:
print("Number of alerts retrieve from fink: {}".format(len(new_alerts)))

if len(new_alerts) == 0:
logger.info("no alerts available for the night of {}".format(last_night))
exit()
Expand Down
2 changes: 1 addition & 1 deletion fink_fat/command_line/cli_main/offline.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ def cli_offline(arguments, config, output_path):
int(config["SOLVE_ORBIT_PARAMS"]["noise_ntrials"]),
prop_epoch=float(prop_epoch) if prop_epoch != "None" else None,
verbose_orbfit=int(config["SOLVE_ORBIT_PARAMS"]["orbfit_verbose"]),
verbose=arguments["--verbose"]
verbose=arguments["--verbose"],
).drop("provisional designation", axis=1)
orbfit_time = t.time() - t_before

Expand Down
2 changes: 1 addition & 1 deletion fink_fat/command_line/cli_main/solve_orbit.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def cli_solve_orbit(arguments, config, output_path):
int(config["SOLVE_ORBIT_PARAMS"]["noise_ntrials"]),
prop_epoch=float(prop_epoch) if prop_epoch != "None" else None,
verbose_orbfit=int(config["SOLVE_ORBIT_PARAMS"]["orbfit_verbose"]),
verbose=arguments["--verbose"]
verbose=arguments["--verbose"],
).drop("provisional designation", axis=1)
orbfit_time = t.time() - t_before

Expand Down
63 changes: 32 additions & 31 deletions fink_fat/command_line/fink_fat_cli.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""
Usage:
fink_fat associations (mpc | candidates) [--night <date>] [options]
fink_fat associations (mpc | candidates | fitroid) [--night <date>] [--filepath <filepath>] [options]
fink_fat kalman (mpc | candidates) [--night <date>] [options]
fink_fat solve_orbit (mpc | candidates) (local | cluster) [options]
fink_fat merge_orbit (mpc | candidates) [options]
Expand All @@ -10,36 +10,37 @@
fink_fat --version
Options:
associations Perform associations of alert to return a set of trajectories candidates.
kalman Perform associations of alerts with the kalman filters and the orbits
(Must be used with the new fink-science asteroid module)
solve_orbit Resolve a dynamical inverse problem to return a set of orbital elements from
the set of trajectories candidates.
merge_orbit Merge the orbit candidates if the both trajectories can belong to the same solar system objects.
offline Associate the alerts to form trajectories candidates then solve the orbit
until the end parameters. Starts from saved data or from the start parameters
if provided.
stats Print statistics about trajectories detected by assocations, the old observations
and, if exists, the orbital elements for some trajectories.
mpc Return the associations on the solar system mpc alerts (only for tests purpose).
candidates Run the associations on the solar system candidates alerts.
local Run the orbital solver in local mode. Use multiprocessing to speed-up the computation.
cluster Run the orbital solver in cluster mode. Use a Spark cluster to significantly speed-up the computation.
The cluster mode need to be launch on a system where pyspark are installed and a spark cluster manager are setup.
-n <date> --night <date> Specify the night to request sso alerts from fink broker.
Format is yyyy-mm-dd as yyyy = year, mm = month, dd = day.
Example : 2022-03-04 for the 2022 march 04.
[intervall of day between the day starting at night midday until night midday + 1]
-m <path> --mpc-data <path> Compute statistics according to the minor planet center database.
<path> of the mpc database file.
The mpc database can be downloaded by pasting this url in your browser: https://minorplanetcenter.net/Extended_Files/mpcorb_extended.json.gz
-r --reset Remove the file containing the trajectories candidates, the old observations and the orbits.
-s --save Save the alerts sent by Fink before the associations for statistics purposes.
Save also additional statistics : computation time, number of alerts from the current days, number of candidates trajectories, number of old observations.
-h --help Show help and quit.
--version Show version.
--config FILE Specify the config file
--verbose Print information and progress bar during the process
associations Perform associations of alert to return a set of trajectories candidates.
solve_orbit Resolve a dynamical inverse problem to return a set of orbital elements from
the set of trajectories candidates.
merge_orbit Merge the orbit candidates if the both trajectories can belong to the same solar system objects.
offline Associate the alerts to form trajectories candidates then solve the orbit
until the end parameters. Starts from saved data or from the start parameters
if provided.
stats Print statistics about trajectories detected by assocations, the old observations
and, if exists, the orbital elements for some trajectories.
mpc Return the associations on the solar system mpc alerts (only for tests purpose).
candidates Run the associations on the solar system candidates alerts.
fitroid Merge the trajectories on the alerts associated with the trajectories by the roid science module of fink
(the roid science module from fink-science must have been run before)
local Run the orbital solver in local mode. Use multiprocessing to speed-up the computation.
cluster Run the orbital solver in cluster mode. Use a Spark cluster to significantly speed-up the computation.
The cluster mode need to be launch on a system where pyspark are installed and a spark cluster manager are setup.
-n <date> --night <date> Specify the night to request sso alerts from fink broker.
Format is yyyy-mm-dd as yyyy = year, mm = month, dd = day.
Example : 2022-03-04 for the 2022 march 04.
[intervall of day between the day starting at night midday until night midday + 1]
-f <filepath> --filepath <filepath> Path to the Euclid SSOPipe output file.
-m <path> --mpc-data <path> Compute statistics according to the minor planet center database.
<path> of the mpc database file.
The mpc database can be downloaded by pasting this url in your browser: https://minorplanetcenter.net/Extended_Files/mpcorb_extended.json.gz
-r --reset Remove the file containing the trajectories candidates, the old observations and the orbits.
-s --save Save the alerts sent by Fink before the associations for statistics purposes.
Save also additional statistics : computation time, number of alerts from the current days, number of candidates trajectories, number of old observations.
-h --help Show help and quit.
--version Show version.
--config FILE Specify the config file
--verbose Print information and progress bar during the process
"""

from fink_fat import __version__
Expand Down
Loading

0 comments on commit e9575c5

Please sign in to comment.