Skip to content

Commit

Permalink
add date to the spark name jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
FusRoman committed Dec 7, 2023
1 parent bd7e3bc commit e1fd32c
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 15 deletions.
22 changes: 14 additions & 8 deletions fink_fat/command_line/association_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,12 +364,12 @@ def get_last_roid_streaming_alert(
):
assert verbose and logger is not None, "logger is None while verbose is True"
input_path = config["OUTPUT"]["roid_module_output"]
split_night = last_night.split("-")
year, month, day = last_night.split("-")
input_path = os.path.join(
input_path,
f"year={split_night[0]}",
f"month={split_night[1]}",
f"day={split_night[2]}",
f"year={year}",
f"month={month}",
f"day={day}",
)

mode = str(config["OUTPUT"]["roid_path_mode"])
Expand All @@ -392,9 +392,9 @@ def get_last_roid_streaming_alert(
logger.info("start to get data in spark mode")
output_path_spark = os.path.join(
output_path,
f"year={split_night[0]}",
f"month={split_night[1]}",
f"day={split_night[2]}",
f"year={year}",
f"month={month}",
f"day={day}",
)
if not os.path.isdir(output_path_spark):
pathlib.Path(output_path_spark).mkdir(parents=True)
Expand All @@ -420,6 +420,9 @@ def get_last_roid_streaming_alert(
application += " " + input_path
application += " " + output_path_spark
application += " " + str(is_mpc)
application += " " + year
application += " " + month
application += " " + day

# FIXME
# temporary dependencies (only during the performance test phase)
Expand Down Expand Up @@ -613,10 +616,13 @@ def get_data(tr_df_path, obs_df_path):
read_path = str(sys.argv[3])
output_path = str(sys.argv[4])
is_mpc = string_to_bool(str(sys.argv[5]))
year = sys.argv[6]
month = sys.argv[7]
day = sys.argv[8]

spark = (
SparkSession.builder.master(master_adress)
.appName("FINK-FAT_recover_stream_data")
.appName(f"FINK-FAT_recover_stream_data_{year}{month}{day}")
.getOrCreate()
)
df = spark.read.load(read_path)
Expand Down
2 changes: 1 addition & 1 deletion fink_fat/command_line/cli_main/fitroid.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ def fitroid_associations(
if "updated" not in trajectory_df:
trajectory_df["updated"] = "N"
trajectory_df, fit_roid_df, trajectory_orb, orbits = trcand_to_orbit(
config, trajectory_df, trajectory_orb, fit_roid_df, orbits, logger, True
config, trajectory_df, trajectory_orb, fit_roid_df, orbits, last_night, logger, True
)

nb_tr_before_tw = len(fit_roid_df)
Expand Down
13 changes: 9 additions & 4 deletions fink_fat/command_line/orbit_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def get_orbital_data(config, tr_df_path): # pragma: no cover


def cluster_mode(
config: dict, traj_to_orbital: pd.DataFrame
config: dict, traj_to_orbital: pd.DataFrame, year: str, month: str, day: str
) -> pd.DataFrame: # pragma: no cover
"""
Compute orbits using the cluster mode
Expand Down Expand Up @@ -124,6 +124,9 @@ def cluster_mode(
application += " " + noise_ntrials
application += " " + prop_epoch
application += " " + orbfit_verbose
application += " " + year
application += " " + month
application += " " + day

# FIXME
# temporary dependencies (only during the performance test phase)
Expand Down Expand Up @@ -190,7 +193,7 @@ def cluster_mode(


def switch_local_cluster(
config: dict, traj_orb: pd.DataFrame, verbose=False
config: dict, traj_orb: pd.DataFrame, year: str, month: str, day: str, verbose=False
) -> pd.DataFrame:
"""
Run the orbit fitting and choose cluster mode if the number of trajectories are above
Expand All @@ -215,7 +218,7 @@ def switch_local_cluster(
traj_orb = traj_orb.drop("estimator_id", axis=1)
if "ffdistnr" in traj_cols:
traj_orb = traj_orb.drop("ffdistnr", axis=1)
new_orbit_pdf = cluster_mode(config, traj_orb)
new_orbit_pdf = cluster_mode(config, traj_orb, year, month, day)
else:
config_epoch = config["SOLVE_ORBIT_PARAMS"]["prop_epoch"]
prop_epoch = None if config_epoch == "None" else float(config_epoch)
Expand All @@ -241,6 +244,7 @@ def trcand_to_orbit(
trajectory_orb: pd.DataFrame,
trparams_df: pd.DataFrame,
orbits: pd.DataFrame,
last_night: str,
logger: LoggerNewLine,
verbose: bool,
) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]:
Expand Down Expand Up @@ -299,7 +303,8 @@ def trcand_to_orbit(
# no trajectory updated during this night to send to orbit fitting
return trajectory_df, trparams_df, trajectory_orb, orbits

new_orbits = switch_local_cluster(config, traj_to_orb, verbose)
year, month, day = last_night.split("-")
new_orbits = switch_local_cluster(config, traj_to_orb, verbose, year, month, day)
new_orbits = new_orbits[new_orbits["a"] != -1.0]
if verbose:
logger.info(
Expand Down
8 changes: 6 additions & 2 deletions fink_fat/orbit_fitting/orbfit_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,10 @@ def get_orbit_element(ra, dec, magpsf, band, date, traj_id): # pragma: no cover
n_triplets = int(sys.argv[4])
noise_ntrials = int(sys.argv[5])
prop_epoch = None if sys.argv[6] == "None" else float(sys.argv[6])
orbfit_verbose = int(sys.argv[7])
year = sys.argv[8]
month = sys.argv[9]
day = sys.argv[10]

msg_info = """
master: {}
Expand All @@ -233,7 +237,7 @@ def get_orbit_element(ra, dec, magpsf, band, date, traj_id): # pragma: no cover

spark = spark = (
SparkSession.builder.master(master_adress)
.appName("Fink-FAT_solve_orbit")
.appName(f"Fink-FAT_solve_orbit_{year}{month}{day}")
.getOrCreate()
)

Expand Down Expand Up @@ -279,7 +283,7 @@ def get_orbit_element(ra, dec, magpsf, band, date, traj_id): # pragma: no cover
n_triplets,
noise_ntrials,
prop_epoch,
verbose=3,
verbose=orbfit_verbose,
),
)

Expand Down

0 comments on commit e1fd32c

Please sign in to comment.