Skip to content

Commit

Permalink
fink_fat 1.0 cli test run until the end
Browse files Browse the repository at this point in the history
  • Loading branch information
FusRoman committed Dec 5, 2023
1 parent 46cc5a1 commit db96d85
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 23 deletions.
1 change: 1 addition & 0 deletions fink_fat/command_line/association_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,7 @@ def get_last_roid_streaming_alert(
[sso_night, roid_pdf],
axis=1,
)
sso_night = sso_night.explode(["estimator_id", "ffdistnr"])
cols_to_keep = [
"objectId",
"candid",
Expand Down
2 changes: 2 additions & 0 deletions fink_fat/command_line/cli_main/fitroid.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,8 @@ def fitroid_associations(
nb_new_trcand = len(fit_roid_df)

trajectory_df["trajectory_id"] = trajectory_df["trajectory_id"].astype(int)
if "updated" not in trajectory_df:
trajectory_df["updated"] = "N"
trajectory_df, fit_roid_df, trajectory_orb, orbits = trcand_to_orbit(
config, trajectory_df, trajectory_orb, fit_roid_df, orbits, logger, True
)
Expand Down
7 changes: 4 additions & 3 deletions fink_fat/test/cli_test/fitroid_test.conf
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,11 @@ noise_ntrials=20
prop_epoch=None
orbfit_verbose=3

orbfit_limit=6
local_mode_limit=1000
orbfit_limit=10
cpu_count=1
ram_dir=""
manager=local[4]
ram_dir=fink_fat/test/cli_test
manager=local[8]
principal=lsst
secret=secret
role=lsst
Expand Down
25 changes: 16 additions & 9 deletions fink_fat/test/cli_test/run_roid.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,16 @@ def addFileToSpark(spark: SparkSession, fitroid_path, orbit_path):
datapath = "fink_fat/test/cli_test/small_sso_dataset"

year, month, day = sys.argv[1:]
config, output_path = init_cli({"--config": "fink_fat/test/cli_test/fitroid_test.conf"})
config, output_path = init_cli(
{"--config": "fink_fat/test/cli_test/fitroid_test.conf"}
)

path_orbit = os.path.join(output_path, "orbital.parquet")
path_fit_roid = os.path.join(output_path, "fit_roid.parquet")
path_orbit = os.path.join(output_path, "fitroid", "orbital.parquet")
path_fit_roid = os.path.join(output_path, "fitroid", "fit_roid.parquet")

path_sso = os.path.join(datapath, f"year={int(year):04d}/month={int(month):02d}/day={int(day):02d}")
path_sso = os.path.join(
datapath, f"year={int(year):04d}/month={int(month):02d}/day={int(day):02d}"
)

spark = init_sparksession("fink_fat_roid")
addFileToSpark(spark, path_fit_roid, path_orbit)
Expand Down Expand Up @@ -62,8 +66,11 @@ def addFileToSpark(spark: SparkSession, fitroid_path, orbit_path):
df = df.withColumn("ff_roid", roid_catcher(*args))
df = df.drop(*what_prefix)

df = df.withColumn("year", F.lit(f"{int(year):04d}"))
df = df.withColumn("month", F.lit(f"{int(month):02d}"))
df = df.withColumn("day", F.lit(f"{int(day):02d}"))

df.write.partitionBy("year", "month", "day").parquet(config["OUTPUT"]["roid_module_output"])
df.write.parquet(
os.path.join(
config["OUTPUT"]["roid_module_output"],
f"year={int(year):04d}",
f"month={int(month):02d}",
f"day={int(day):02d}",
)
)
17 changes: 6 additions & 11 deletions fink_fat/test/cli_test/test_fitroid_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@
from datetime import date, timedelta
import os


def daterange(start_date, end_date):
for n in range(int((end_date - start_date).days)):
yield start_date + timedelta(n)


def run_roid(year, month, day):
process = subprocess.run(
f"spark-submit fink_fat/test/cli_test/run_roid.py {year} {month} {day}", shell=True
f"spark-submit fink_fat/test/cli_test/run_roid.py {year} {month} {day}",
shell=True,
)

if process.returncode != 0:
Expand All @@ -29,19 +32,15 @@ def run_roid(year, month, day):
end_date = date(2021, 6, 30)
logger = init_logging()

i = 0
max = 3

for sso_date in daterange(start_date, end_date):

logger.info(f"start to process date {sso_date}")
logger.newline()

path_sso = os.path.join(
datapath, f"year={sso_date.year:04d}/month={sso_date.month:02d}/day={sso_date.day:02d}"
datapath,
f"year={sso_date.year:04d}/month={sso_date.month:02d}/day={sso_date.day:02d}",
)
if os.path.exists(path_sso):

# --------------------------------------------------------------------------------#
# roid runner (fink science side)
logger.info("start roid processor")
Expand All @@ -64,7 +63,3 @@ def run_roid(year, month, day):
"--verbose",
]
)

i += 1
if i == max:
break

0 comments on commit db96d85

Please sign in to comment.