diff --git a/fink_fat/command_line/association_cli.py b/fink_fat/command_line/association_cli.py index 0df2e918..37d0f5d4 100644 --- a/fink_fat/command_line/association_cli.py +++ b/fink_fat/command_line/association_cli.py @@ -452,6 +452,7 @@ def get_last_roid_streaming_alert( [sso_night, roid_pdf], axis=1, ) + sso_night = sso_night.explode(["estimator_id", "ffdistnr"]) cols_to_keep = [ "objectId", "candid", diff --git a/fink_fat/command_line/cli_main/fitroid.py b/fink_fat/command_line/cli_main/fitroid.py index 0bb8b3a4..911bb27c 100644 --- a/fink_fat/command_line/cli_main/fitroid.py +++ b/fink_fat/command_line/cli_main/fitroid.py @@ -220,6 +220,8 @@ def fitroid_associations( nb_new_trcand = len(fit_roid_df) trajectory_df["trajectory_id"] = trajectory_df["trajectory_id"].astype(int) + if "updated" not in trajectory_df: + trajectory_df["updated"] = "N" trajectory_df, fit_roid_df, trajectory_orb, orbits = trcand_to_orbit( config, trajectory_df, trajectory_orb, fit_roid_df, orbits, logger, True ) diff --git a/fink_fat/test/cli_test/fitroid_test.conf b/fink_fat/test/cli_test/fitroid_test.conf index 570c09f0..390d0531 100644 --- a/fink_fat/test/cli_test/fitroid_test.conf +++ b/fink_fat/test/cli_test/fitroid_test.conf @@ -44,10 +44,11 @@ noise_ntrials=20 prop_epoch=None orbfit_verbose=3 -orbfit_limit=6 +local_mode_limit=1000 +orbfit_limit=10 cpu_count=1 -ram_dir="" -manager=local[4] +ram_dir=fink_fat/test/cli_test +manager=local[8] principal=lsst secret=secret role=lsst diff --git a/fink_fat/test/cli_test/run_roid.py b/fink_fat/test/cli_test/run_roid.py index 9a42a0c2..34d85799 100644 --- a/fink_fat/test/cli_test/run_roid.py +++ b/fink_fat/test/cli_test/run_roid.py @@ -22,12 +22,16 @@ def addFileToSpark(spark: SparkSession, fitroid_path, orbit_path): datapath = "fink_fat/test/cli_test/small_sso_dataset" year, month, day = sys.argv[1:] - config, output_path = init_cli({"--config": "fink_fat/test/cli_test/fitroid_test.conf"}) + config, output_path = init_cli( + {"--config": "fink_fat/test/cli_test/fitroid_test.conf"} + ) - path_orbit = os.path.join(output_path, "orbital.parquet") - path_fit_roid = os.path.join(output_path, "fit_roid.parquet") + path_orbit = os.path.join(output_path, "fitroid", "orbital.parquet") + path_fit_roid = os.path.join(output_path, "fitroid", "fit_roid.parquet") - path_sso = os.path.join(datapath, f"year={int(year):04d}/month={int(month):02d}/day={int(day):02d}") + path_sso = os.path.join( + datapath, f"year={int(year):04d}/month={int(month):02d}/day={int(day):02d}" + ) spark = init_sparksession("fink_fat_roid") addFileToSpark(spark, path_fit_roid, path_orbit) @@ -62,8 +66,11 @@ def addFileToSpark(spark: SparkSession, fitroid_path, orbit_path): df = df.withColumn("ff_roid", roid_catcher(*args)) df = df.drop(*what_prefix) - df = df.withColumn("year", F.lit(f"{int(year):04d}")) - df = df.withColumn("month", F.lit(f"{int(month):02d}")) - df = df.withColumn("day", F.lit(f"{int(day):02d}")) - - df.write.partitionBy("year", "month", "day").parquet(config["OUTPUT"]["roid_module_output"]) + df.write.parquet( + os.path.join( + config["OUTPUT"]["roid_module_output"], + f"year={int(year):04d}", + f"month={int(month):02d}", + f"day={int(day):02d}", + ) + ) diff --git a/fink_fat/test/cli_test/test_fitroid_cli.py b/fink_fat/test/cli_test/test_fitroid_cli.py index 051b150b..10c589c7 100644 --- a/fink_fat/test/cli_test/test_fitroid_cli.py +++ b/fink_fat/test/cli_test/test_fitroid_cli.py @@ -4,13 +4,16 @@ from datetime import date, timedelta import os + def daterange(start_date, end_date): for n in range(int((end_date - start_date).days)): yield start_date + timedelta(n) + def run_roid(year, month, day): process = subprocess.run( - f"spark-submit fink_fat/test/cli_test/run_roid.py {year} {month} {day}", shell=True + f"spark-submit fink_fat/test/cli_test/run_roid.py {year} {month} {day}", + shell=True, ) if process.returncode != 0: @@ -29,19 +32,15 @@ def run_roid(year, month, day): end_date = date(2021, 6, 30) logger = init_logging() - i = 0 - max = 3 - for sso_date in daterange(start_date, end_date): - logger.info(f"start to process date {sso_date}") logger.newline() path_sso = os.path.join( - datapath, f"year={sso_date.year:04d}/month={sso_date.month:02d}/day={sso_date.day:02d}" + datapath, + f"year={sso_date.year:04d}/month={sso_date.month:02d}/day={sso_date.day:02d}", ) if os.path.exists(path_sso): - # --------------------------------------------------------------------------------# # roid runner (fink science side) logger.info("start roid processor") @@ -64,7 +63,3 @@ def run_roid(year, month, day): "--verbose", ] ) - - i += 1 - if i == max: - break