Skip to content

Commit

Permalink
Issue/164/offline 1.0 (#165)
Browse files Browse the repository at this point in the history
* add first draft of the offline mode with 1.0

* pep8

* improve offline cli and refactor the way to launch spark applications

* pep8
  • Loading branch information
FusRoman authored Jan 17, 2024
1 parent f5abfd2 commit 7134532
Show file tree
Hide file tree
Showing 16 changed files with 478 additions and 187 deletions.
70 changes: 19 additions & 51 deletions fink_fat/command_line/association_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@

from astropy.time import Time
import fink_fat
import subprocess
from pyspark.sql.functions import col
from fink_fat.command_line.utils_cli import string_to_bool
import configparser
import pathlib

import fink_fat.others.launch_spark as sp


def request_fink(
object_class,
Expand Down Expand Up @@ -413,58 +414,30 @@ def get_last_roid_streaming_alert(
if not os.path.isdir(output_path_spark):
pathlib.Path(output_path_spark).mkdir(parents=True)

# load alerts from spark
master_manager = config["SOLVE_ORBIT_PARAMS"]["manager"]
principal_group = config["SOLVE_ORBIT_PARAMS"]["principal"]
secret = config["SOLVE_ORBIT_PARAMS"]["secret"]
role = config["SOLVE_ORBIT_PARAMS"]["role"]
executor_env = config["SOLVE_ORBIT_PARAMS"]["exec_env"]
driver_mem = config["SOLVE_ORBIT_PARAMS"]["driver_memory"]
exec_mem = config["SOLVE_ORBIT_PARAMS"]["executor_memory"]
max_core = config["SOLVE_ORBIT_PARAMS"]["max_core"]
exec_core = config["SOLVE_ORBIT_PARAMS"]["executor_core"]

application = os.path.join(
os.path.dirname(fink_fat.__file__),
"command_line",
"association_cli.py prod",
)

application += " " + master_manager
application += " " + input_path
application += " " + output_path_spark
application += " " + str(is_mpc)
application += " " + year
application += " " + month
application += " " + day

# FIXME
# temporary dependencies (only during the performance test phase)
FINK_FAT = "/home/roman.le-montagner/home_big_storage/Doctorat/Asteroids/fink-fat/dist/fink_fat-1.0.0-py3.9.egg"
FINK_SCIENCE = "/home/roman.le-montagner/home_big_storage/Doctorat/fink-science/dist/fink_science-4.4-py3.7.egg"

spark_submit = f"spark-submit \
--master {master_manager} \
--conf spark.mesos.principal={principal_group} \
--conf spark.mesos.secret={secret} \
--conf spark.mesos.role={role} \
--conf spark.executorEnv.HOME={executor_env} \
--driver-memory {driver_mem}G \
--executor-memory {exec_mem}G \
--conf spark.cores.max={max_core} \
--conf spark.executor.cores={exec_core} \
--conf spark.driver.maxResultSize=6G\
--conf spark.sql.execution.arrow.pyspark.enabled=true\
--conf spark.sql.execution.arrow.maxRecordsPerBatch=1000000\
--conf spark.kryoserializer.buffer.max=512m\
--py-files {FINK_FAT},{FINK_SCIENCE}\
{application}"
spark_submit = sp.build_spark_submit(config)
spark_app = sp.spark_submit_application(spark_submit, application)
process = sp.run_spark_submit(spark_app, verbose)

if verbose:
logger.info("run recovering of data with spark")
process = subprocess.run(spark_submit, shell=True)
if process.returncode != 0:
logger = init_logging()
logger.error(
f"""
Recovering of stream data with spark exited with a non-zero return code: {process.returncode}
"""
)
logger.info(process.stderr)
logger.info(process.stdout)
return pd.DataFrame(columns=cols_to_keep)
Expand Down Expand Up @@ -609,22 +582,17 @@ def get_data(tr_df_path, obs_df_path):

sys.exit(doctest.testmod()[0])
elif sys.argv[1] == "prod":
from pyspark.sql import SparkSession
from fink_utils.broker.sparkUtils import init_sparksession

logger = init_logging()
master_adress = str(sys.argv[2])
read_path = str(sys.argv[3])
output_path = str(sys.argv[4])
is_mpc = string_to_bool(str(sys.argv[5]))
year = sys.argv[6]
month = sys.argv[7]
day = sys.argv[8]

spark = (
SparkSession.builder.master(master_adress)
.appName(f"FINK-FAT_recover_stream_data_{year}{month}{day}")
.getOrCreate()
)
read_path = str(sys.argv[2])
output_path = str(sys.argv[3])
is_mpc = string_to_bool(str(sys.argv[4]))
year = sys.argv[5]
month = sys.argv[6]
day = sys.argv[7]

spark = init_sparksession(f"FINK-FAT_recover_stream_data_{year}{month}{day}")
df = spark.read.load(read_path)
df = df.select(
"objectId",
Expand Down
25 changes: 14 additions & 11 deletions fink_fat/command_line/cli_main/fitroid.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ def fitroid_associations(
if arguments["--night"]:
last_night = arguments["--night"]

year, month, day = last_night.split("-")
# path to the orbits data
path_orbit = os.path.join(output_path, "orbital.parquet")
path_trajectory_orb = os.path.join(output_path, "trajectory_orb.parquet")
Expand All @@ -143,7 +144,6 @@ def fitroid_associations(
"No alerts in the current night, compute the ephemeries for the next night"
)
# even if no alerts for the current night, compute the ephemeries for the next night in any case
year, month, day = last_night.split("-")
launch_spark_ephem(
config,
path_orbit,
Expand Down Expand Up @@ -359,6 +359,7 @@ def fitroid_associations(
with pd.option_context("mode.chained_assignment", None):
trajectory_df["trajectory_id"] = trajectory_df["trajectory_id"].astype(int)
fit_roid_df["trajectory_id"] = fit_roid_df["trajectory_id"].astype(int)
fit_roid_df["last_assoc_date"] = f"{year}-{month}-{day}"

trajectory_df = trajectory_df.drop("updated", axis=1)
trajectory_df.to_parquet(path_trajectory_df, index=False)
Expand All @@ -369,19 +370,21 @@ def fitroid_associations(
trajectory_orb.to_parquet(path_trajectory_orb, index=False)
orbits.to_parquet(path_orbit, index=False)

# compute the ephemerides for the next observation night
if arguments["--verbose"]:
logger.info("start to compute ephemerides using spark")
t_before = time.time()
if len(orbits) > 0:
# compute the ephemerides for the next observation night
if arguments["--verbose"]:
logger.info("start to compute ephemerides using spark")
t_before = time.time()

year, month, day = last_night.split("-")
launch_spark_ephem(
config, path_orbit, os.path.join(output_path, "ephem.parquet"), year, month, day
)
launch_spark_ephem(
config, path_orbit, os.path.join(output_path, "ephem.parquet"), year, month, day
)

if arguments["--verbose"]:
logger.info(f"ephemeries computing time: {time.time() - t_before:.4f} seconds")
logger.newline()

if arguments["--verbose"]:
logger.info(f"ephemeries computing time: {time.time() - t_before:.4f} seconds")
logger.newline()
hours, minutes, secondes = seconds_to_hms(time.time() - start_assoc_time)
logger.info(
f"total execution time: {hours} hours, {minutes} minutes, {secondes} seconds"
Expand Down
69 changes: 57 additions & 12 deletions fink_fat/command_line/cli_main/offline.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
no_reset,
)
from fink_fat.others.utils import init_logging
from fink_fat.command_line.cli_main.offline_fitroid.offline_fitroid import (
offline_fitroid,
)


def cli_offline(arguments, config, output_path):
Expand All @@ -38,13 +41,13 @@ def cli_offline(arguments, config, output_path):
path where are located the fink-fat data
"""
logger = init_logging()
logger.info("offline mode")

output_path, object_class = get_class(arguments, output_path)

# path to the associations data
tr_df_path = os.path.join(output_path, "trajectory_df.parquet")
obs_df_path = os.path.join(output_path, "old_obs.parquet")
fitroid_path = os.path.join(output_path, "fit_roid.parquet")

# path to the orbit data
orb_res_path = os.path.join(output_path, "orbital.parquet")
Expand Down Expand Up @@ -88,7 +91,12 @@ def cli_offline(arguments, config, output_path):
current_date = datetime.datetime.now() - delta_day

# test if the trajectory_df and old_obs_df exists in the output directory.
if os.path.exists(tr_df_path) and os.path.exists(obs_df_path):
if object_class == "SSO fitroid":
test_path = os.path.exists(tr_df_path) and os.path.exists(fitroid_path)
else:
test_path = os.path.exists(tr_df_path) and os.path.exists(obs_df_path)

if test_path:
if arguments["<start>"] is not None:
logger.info("A save of trajectories candidates already exists.")
logger.info(
Expand All @@ -100,18 +108,34 @@ def cli_offline(arguments, config, output_path):
logger.info("Abort offline mode.")
exit()

trajectory_df = pd.read_parquet(tr_df_path)
old_obs_df = pd.read_parquet(obs_df_path)
# 1.0 case: load the fit_roid parquet
if object_class == "SSO fitroid":
fitroid_df = pd.read_parquet(fitroid_path)
current_date = max(
pd.to_datetime(fitroid_df["last_assoc_date"], format="%Y-%m-%d")
)
current_date += delta_day

# first case: trajectories already exists: begin the offline mode with the last associations date + 1
last_tr_date = pd.to_datetime(
trajectory_df["last_assoc_date"], format="%Y-%m-%d"
)
# version < 1.0: load trajectory_df and old_obs parquet
else:
trajectory_df = pd.read_parquet(tr_df_path)
old_obs_df = pd.read_parquet(obs_df_path)

last_obs_date = pd.to_datetime(old_obs_df["last_assoc_date"], format="%Y-%m-%d")
# first case: trajectories already exists: begin the offline mode with the last associations date + 1
last_tr_date = pd.to_datetime(
trajectory_df["last_assoc_date"], format="%Y-%m-%d"
)

current_date = max(last_tr_date.max(), last_obs_date.max())
current_date += delta_day
last_obs_date = pd.to_datetime(
old_obs_df["last_assoc_date"], format="%Y-%m-%d"
)

current_date = max(last_tr_date.max(), last_obs_date.max())
current_date += delta_day

logger.info(
f"Start offline mode with the previous reconstructed trajectory, start date = {current_date.date()}"
)

# last case: <start> options given by the user, start the offline mode from this date.
if arguments["<start>"] is not None:
Expand All @@ -124,7 +148,15 @@ def cli_offline(arguments, config, output_path):
today = datetime.datetime.now().date()

if current_date.date() > stop_date.date():
logger.info("Error !!! Start date is greater than stop date.")
logger.error(
f"Start date {current_date.date()} is greater than stop date {stop_date.date()}."
)
exit()

if current_date.date() == stop_date.date():
logger.warn(
f"Start date {current_date.date()} = stop date {stop_date.date()}.\n\tStart the offline mode with the argument <start> strictly greater than stop date."
)
exit()

orb_df = pd.DataFrame()
Expand All @@ -144,6 +176,19 @@ def cli_offline(arguments, config, output_path):

stats_dict = {}

if object_class == "SSO fitroid":
offline_fitroid(
config,
arguments["--config"],
current_date,
stop_date,
logger,
arguments["--verbose"],
)
return

if arguments["--verbose"]:
logger.info("start the offline mode")
while True:
if arguments["--verbose"]:
logger.info("current processing date: {}".format(current_date))
Expand Down
Empty file.
37 changes: 37 additions & 0 deletions fink_fat/command_line/cli_main/offline_fitroid/launch_roid.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
if __name__ == "__main__":
import os
import sys
from fink_fat.others.utils import init_logging
from fink_fat.command_line.utils_cli import init_cli
import fink_fat.others.launch_spark as spark

logger = init_logging()

year, month, day = sys.argv[1], sys.argv[2], sys.argv[3]
path_offline = sys.argv[4]
path_config = sys.argv[5]
verbose = sys.argv[6]

config, output_path = init_cli({"--config": path_config})

application = os.path.join(
path_offline,
"run_roid.py",
)

application += " " + year
application += " " + month
application += " " + day
application += " " + path_config

spark_submit = spark.build_spark_submit(config)
spark_app = spark.spark_submit_application(spark_submit, application)
process = spark.run_spark_submit(spark_app, verbose)

if process.returncode != 0:
logger = init_logging()
logger.error(f"""
Offline launch roid spark_submit exited with a non-zero return code: {process.returncode}
""")
logger.info(process.stderr)
logger.info(process.stdout)
49 changes: 49 additions & 0 deletions fink_fat/command_line/cli_main/offline_fitroid/offline_fitroid.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import datetime
import os
import subprocess
from fink_fat.others.utils import LoggerNewLine
import fink_fat
import configparser
import pathlib

from fink_fat.others.utils import init_logging


def offline_fitroid(
config: configparser.ConfigParser,
path_config: str,
start_date: datetime,
end_date: datetime,
logger: LoggerNewLine,
verbose: bool,
):
if verbose:
logger.info(
f"""
--- START FITROID OFFLINE ---
start date: {start_date}
end date: {end_date}
"""
)
logger.newline()

ff_path = os.path.dirname(fink_fat.__file__)
offline_path = os.path.join(ff_path, "command_line", "cli_main", "offline_fitroid")

log_path = config["OFFLINE"]["log_path"]
if not os.path.isdir(log_path):
pathlib.Path(log_path).mkdir(parents=True)

proc = subprocess.run(
os.path.join(
offline_path,
f"run_offline_fitroid.sh {str(start_date)} {str(end_date)} {offline_path} {path_config} {log_path} {verbose}",
),
shell=True,
)

if proc.returncode != 0:
logger = init_logging()
logger.info(proc.stderr)
logger.info(proc.stdout)
return
Loading

0 comments on commit 7134532

Please sign in to comment.