Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue/164/offline 1.0 #165

Merged
merged 4 commits into from
Jan 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 19 additions & 51 deletions fink_fat/command_line/association_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@

from astropy.time import Time
import fink_fat
import subprocess
from pyspark.sql.functions import col
from fink_fat.command_line.utils_cli import string_to_bool
import configparser
import pathlib

import fink_fat.others.launch_spark as sp


def request_fink(
object_class,
Expand Down Expand Up @@ -413,58 +414,30 @@ def get_last_roid_streaming_alert(
if not os.path.isdir(output_path_spark):
pathlib.Path(output_path_spark).mkdir(parents=True)

# load alerts from spark
master_manager = config["SOLVE_ORBIT_PARAMS"]["manager"]
principal_group = config["SOLVE_ORBIT_PARAMS"]["principal"]
secret = config["SOLVE_ORBIT_PARAMS"]["secret"]
role = config["SOLVE_ORBIT_PARAMS"]["role"]
executor_env = config["SOLVE_ORBIT_PARAMS"]["exec_env"]
driver_mem = config["SOLVE_ORBIT_PARAMS"]["driver_memory"]
exec_mem = config["SOLVE_ORBIT_PARAMS"]["executor_memory"]
max_core = config["SOLVE_ORBIT_PARAMS"]["max_core"]
exec_core = config["SOLVE_ORBIT_PARAMS"]["executor_core"]

application = os.path.join(
os.path.dirname(fink_fat.__file__),
"command_line",
"association_cli.py prod",
)

application += " " + master_manager
application += " " + input_path
application += " " + output_path_spark
application += " " + str(is_mpc)
application += " " + year
application += " " + month
application += " " + day

# FIXME
# temporary dependencies (only during the performance test phase)
FINK_FAT = "/home/roman.le-montagner/home_big_storage/Doctorat/Asteroids/fink-fat/dist/fink_fat-1.0.0-py3.9.egg"
FINK_SCIENCE = "/home/roman.le-montagner/home_big_storage/Doctorat/fink-science/dist/fink_science-4.4-py3.7.egg"

spark_submit = f"spark-submit \
--master {master_manager} \
--conf spark.mesos.principal={principal_group} \
--conf spark.mesos.secret={secret} \
--conf spark.mesos.role={role} \
--conf spark.executorEnv.HOME={executor_env} \
--driver-memory {driver_mem}G \
--executor-memory {exec_mem}G \
--conf spark.cores.max={max_core} \
--conf spark.executor.cores={exec_core} \
--conf spark.driver.maxResultSize=6G\
--conf spark.sql.execution.arrow.pyspark.enabled=true\
--conf spark.sql.execution.arrow.maxRecordsPerBatch=1000000\
--conf spark.kryoserializer.buffer.max=512m\
--py-files {FINK_FAT},{FINK_SCIENCE}\
{application}"
spark_submit = sp.build_spark_submit(config)
spark_app = sp.spark_submit_application(spark_submit, application)
process = sp.run_spark_submit(spark_app, verbose)

if verbose:
logger.info("run recovering of data with spark")
process = subprocess.run(spark_submit, shell=True)
if process.returncode != 0:
logger = init_logging()
logger.error(
f"""
Recovering of stream data with spark exited with a non-zero return code: {process.returncode}
"""
)
logger.info(process.stderr)
logger.info(process.stdout)
return pd.DataFrame(columns=cols_to_keep)
Expand Down Expand Up @@ -609,22 +582,17 @@ def get_data(tr_df_path, obs_df_path):

sys.exit(doctest.testmod()[0])
elif sys.argv[1] == "prod":
from pyspark.sql import SparkSession
from fink_utils.broker.sparkUtils import init_sparksession

logger = init_logging()
master_adress = str(sys.argv[2])
read_path = str(sys.argv[3])
output_path = str(sys.argv[4])
is_mpc = string_to_bool(str(sys.argv[5]))
year = sys.argv[6]
month = sys.argv[7]
day = sys.argv[8]

spark = (
SparkSession.builder.master(master_adress)
.appName(f"FINK-FAT_recover_stream_data_{year}{month}{day}")
.getOrCreate()
)
read_path = str(sys.argv[2])
output_path = str(sys.argv[3])
is_mpc = string_to_bool(str(sys.argv[4]))
year = sys.argv[5]
month = sys.argv[6]
day = sys.argv[7]

spark = init_sparksession(f"FINK-FAT_recover_stream_data_{year}{month}{day}")
df = spark.read.load(read_path)
df = df.select(
"objectId",
Expand Down
25 changes: 14 additions & 11 deletions fink_fat/command_line/cli_main/fitroid.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ def fitroid_associations(
if arguments["--night"]:
last_night = arguments["--night"]

year, month, day = last_night.split("-")
# path to the orbits data
path_orbit = os.path.join(output_path, "orbital.parquet")
path_trajectory_orb = os.path.join(output_path, "trajectory_orb.parquet")
Expand All @@ -143,7 +144,6 @@ def fitroid_associations(
"No alerts in the current night, compute the ephemeries for the next night"
)
# even if no alerts for the current night, compute the ephemeries for the next night in any case
year, month, day = last_night.split("-")
launch_spark_ephem(
config,
path_orbit,
Expand Down Expand Up @@ -359,6 +359,7 @@ def fitroid_associations(
with pd.option_context("mode.chained_assignment", None):
trajectory_df["trajectory_id"] = trajectory_df["trajectory_id"].astype(int)
fit_roid_df["trajectory_id"] = fit_roid_df["trajectory_id"].astype(int)
fit_roid_df["last_assoc_date"] = f"{year}-{month}-{day}"

trajectory_df = trajectory_df.drop("updated", axis=1)
trajectory_df.to_parquet(path_trajectory_df, index=False)
Expand All @@ -369,19 +370,21 @@ def fitroid_associations(
trajectory_orb.to_parquet(path_trajectory_orb, index=False)
orbits.to_parquet(path_orbit, index=False)

# compute the ephemerides for the next observation night
if arguments["--verbose"]:
logger.info("start to compute ephemerides using spark")
t_before = time.time()
if len(orbits) > 0:
# compute the ephemerides for the next observation night
if arguments["--verbose"]:
logger.info("start to compute ephemerides using spark")
t_before = time.time()

year, month, day = last_night.split("-")
launch_spark_ephem(
config, path_orbit, os.path.join(output_path, "ephem.parquet"), year, month, day
)
launch_spark_ephem(
config, path_orbit, os.path.join(output_path, "ephem.parquet"), year, month, day
)

if arguments["--verbose"]:
logger.info(f"ephemeries computing time: {time.time() - t_before:.4f} seconds")
logger.newline()

if arguments["--verbose"]:
logger.info(f"ephemeries computing time: {time.time() - t_before:.4f} seconds")
logger.newline()
hours, minutes, secondes = seconds_to_hms(time.time() - start_assoc_time)
logger.info(
f"total execution time: {hours} hours, {minutes} minutes, {secondes} seconds"
Expand Down
69 changes: 57 additions & 12 deletions fink_fat/command_line/cli_main/offline.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
no_reset,
)
from fink_fat.others.utils import init_logging
from fink_fat.command_line.cli_main.offline_fitroid.offline_fitroid import (
offline_fitroid,
)


def cli_offline(arguments, config, output_path):
Expand All @@ -38,13 +41,13 @@ def cli_offline(arguments, config, output_path):
path where are located the fink-fat data
"""
logger = init_logging()
logger.info("offline mode")

output_path, object_class = get_class(arguments, output_path)

# path to the associations data
tr_df_path = os.path.join(output_path, "trajectory_df.parquet")
obs_df_path = os.path.join(output_path, "old_obs.parquet")
fitroid_path = os.path.join(output_path, "fit_roid.parquet")

# path to the orbit data
orb_res_path = os.path.join(output_path, "orbital.parquet")
Expand Down Expand Up @@ -88,7 +91,12 @@ def cli_offline(arguments, config, output_path):
current_date = datetime.datetime.now() - delta_day

# test if the trajectory_df and old_obs_df exists in the output directory.
if os.path.exists(tr_df_path) and os.path.exists(obs_df_path):
if object_class == "SSO fitroid":
test_path = os.path.exists(tr_df_path) and os.path.exists(fitroid_path)
else:
test_path = os.path.exists(tr_df_path) and os.path.exists(obs_df_path)

if test_path:
if arguments["<start>"] is not None:
logger.info("A save of trajectories candidates already exists.")
logger.info(
Expand All @@ -100,18 +108,34 @@ def cli_offline(arguments, config, output_path):
logger.info("Abort offline mode.")
exit()

trajectory_df = pd.read_parquet(tr_df_path)
old_obs_df = pd.read_parquet(obs_df_path)
# 1.0 case: load the fit_roid parquet
if object_class == "SSO fitroid":
fitroid_df = pd.read_parquet(fitroid_path)
current_date = max(
pd.to_datetime(fitroid_df["last_assoc_date"], format="%Y-%m-%d")
)
current_date += delta_day

# first case: trajectories already exists: begin the offline mode with the last associations date + 1
last_tr_date = pd.to_datetime(
trajectory_df["last_assoc_date"], format="%Y-%m-%d"
)
# version < 1.0: load trajectory_df and old_obs parquet
else:
trajectory_df = pd.read_parquet(tr_df_path)
old_obs_df = pd.read_parquet(obs_df_path)

last_obs_date = pd.to_datetime(old_obs_df["last_assoc_date"], format="%Y-%m-%d")
# first case: trajectories already exists: begin the offline mode with the last associations date + 1
last_tr_date = pd.to_datetime(
trajectory_df["last_assoc_date"], format="%Y-%m-%d"
)

current_date = max(last_tr_date.max(), last_obs_date.max())
current_date += delta_day
last_obs_date = pd.to_datetime(
old_obs_df["last_assoc_date"], format="%Y-%m-%d"
)

current_date = max(last_tr_date.max(), last_obs_date.max())
current_date += delta_day

logger.info(
f"Start offline mode with the previous reconstructed trajectory, start date = {current_date.date()}"
)

# last case: <start> options given by the user, start the offline mode from this date.
if arguments["<start>"] is not None:
Expand All @@ -124,7 +148,15 @@ def cli_offline(arguments, config, output_path):
today = datetime.datetime.now().date()

if current_date.date() > stop_date.date():
logger.info("Error !!! Start date is greater than stop date.")
logger.error(
f"Start date {current_date.date()} is greater than stop date {stop_date.date()}."
)
exit()

if current_date.date() == stop_date.date():
logger.warn(
f"Start date {current_date.date()} = stop date {stop_date.date()}.\n\tStart the offline mode with the argument <start> strictly greater than stop date."
)
exit()

orb_df = pd.DataFrame()
Expand All @@ -144,6 +176,19 @@ def cli_offline(arguments, config, output_path):

stats_dict = {}

if object_class == "SSO fitroid":
offline_fitroid(
config,
arguments["--config"],
current_date,
stop_date,
logger,
arguments["--verbose"],
)
return

if arguments["--verbose"]:
logger.info("start the offline mode")
while True:
if arguments["--verbose"]:
logger.info("current processing date: {}".format(current_date))
Expand Down
Empty file.
37 changes: 37 additions & 0 deletions fink_fat/command_line/cli_main/offline_fitroid/launch_roid.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
if __name__ == "__main__":
import os
import sys
from fink_fat.others.utils import init_logging
from fink_fat.command_line.utils_cli import init_cli
import fink_fat.others.launch_spark as spark

logger = init_logging()

year, month, day = sys.argv[1], sys.argv[2], sys.argv[3]
path_offline = sys.argv[4]
path_config = sys.argv[5]
verbose = sys.argv[6]

config, output_path = init_cli({"--config": path_config})

application = os.path.join(
path_offline,
"run_roid.py",
)

application += " " + year
application += " " + month
application += " " + day
application += " " + path_config

spark_submit = spark.build_spark_submit(config)
spark_app = spark.spark_submit_application(spark_submit, application)
process = spark.run_spark_submit(spark_app, verbose)

if process.returncode != 0:
logger = init_logging()
logger.error(f"""
Offline launch roid spark_submit exited with a non-zero return code: {process.returncode}
""")
logger.info(process.stderr)
logger.info(process.stdout)
49 changes: 49 additions & 0 deletions fink_fat/command_line/cli_main/offline_fitroid/offline_fitroid.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import datetime
import os
import subprocess
from fink_fat.others.utils import LoggerNewLine
import fink_fat
import configparser
import pathlib

from fink_fat.others.utils import init_logging


def offline_fitroid(
config: configparser.ConfigParser,
path_config: str,
start_date: datetime,
end_date: datetime,
logger: LoggerNewLine,
verbose: bool,
):
if verbose:
logger.info(
f"""
--- START FITROID OFFLINE ---
start date: {start_date}
end date: {end_date}
"""
)
logger.newline()

ff_path = os.path.dirname(fink_fat.__file__)
offline_path = os.path.join(ff_path, "command_line", "cli_main", "offline_fitroid")

log_path = config["OFFLINE"]["log_path"]
if not os.path.isdir(log_path):
pathlib.Path(log_path).mkdir(parents=True)

proc = subprocess.run(
os.path.join(
offline_path,
f"run_offline_fitroid.sh {str(start_date)} {str(end_date)} {offline_path} {path_config} {log_path} {verbose}",
),
shell=True,
)

if proc.returncode != 0:
logger = init_logging()
logger.info(proc.stderr)
logger.info(proc.stdout)
return
Loading
Loading