From fd86623e38529b97006f58641199e458328f84ef Mon Sep 17 00:00:00 2001 From: Roman Date: Wed, 17 Jan 2024 12:18:01 +0100 Subject: [PATCH] improve offline cli and refactor the way to launch spark applications --- fink_fat/command_line/association_cli.py | 67 ++++-------- fink_fat/command_line/cli_main/fitroid.py | 25 +++-- fink_fat/command_line/cli_main/offline.py | 48 ++++++--- .../cli_main/offline_fitroid/launch_roid.py | 63 ++--------- fink_fat/command_line/orbit_cli.py | 52 +++------ fink_fat/data/fink_fat.conf | 11 +- fink_fat/data/fink_fat_euclid.conf | 7 +- fink_fat/orbit_fitting/orbfit_cluster.py | 26 +++-- fink_fat/others/launch_spark.py | 100 ++++++++++++++++++ .../streaming_associations/spark_ephem.py | 37 +------ fink_fat/test/cli_test/fitroid_test.conf | 6 +- 11 files changed, 221 insertions(+), 221 deletions(-) create mode 100644 fink_fat/others/launch_spark.py diff --git a/fink_fat/command_line/association_cli.py b/fink_fat/command_line/association_cli.py index 056bb248..2f3f569b 100644 --- a/fink_fat/command_line/association_cli.py +++ b/fink_fat/command_line/association_cli.py @@ -15,6 +15,8 @@ import configparser import pathlib +import fink_fat.others.launch_spark as spark + def request_fink( object_class, @@ -413,24 +415,12 @@ def get_last_roid_streaming_alert( if not os.path.isdir(output_path_spark): pathlib.Path(output_path_spark).mkdir(parents=True) - # load alerts from spark - master_manager = config["SOLVE_ORBIT_PARAMS"]["manager"] - principal_group = config["SOLVE_ORBIT_PARAMS"]["principal"] - secret = config["SOLVE_ORBIT_PARAMS"]["secret"] - role = config["SOLVE_ORBIT_PARAMS"]["role"] - executor_env = config["SOLVE_ORBIT_PARAMS"]["exec_env"] - driver_mem = config["SOLVE_ORBIT_PARAMS"]["driver_memory"] - exec_mem = config["SOLVE_ORBIT_PARAMS"]["executor_memory"] - max_core = config["SOLVE_ORBIT_PARAMS"]["max_core"] - exec_core = config["SOLVE_ORBIT_PARAMS"]["executor_core"] - application = os.path.join( os.path.dirname(fink_fat.__file__), "command_line", "association_cli.py prod", ) - application += " " + master_manager application += " " + input_path application += " " + output_path_spark application += " " + str(is_mpc) @@ -438,33 +428,15 @@ def get_last_roid_streaming_alert( application += " " + month application += " " + day - # FIXME - # temporary dependencies (only during the performance test phase) - FINK_FAT = "/home/roman.le-montagner/home_big_storage/Doctorat/Asteroids/fink-fat/dist/fink_fat-1.0.0-py3.9.egg" - FINK_SCIENCE = "/home/roman.le-montagner/home_big_storage/Doctorat/fink-science/dist/fink_science-4.4-py3.7.egg" - - spark_submit = f"spark-submit \ - --master {master_manager} \ - --conf spark.mesos.principal={principal_group} \ - --conf spark.mesos.secret={secret} \ - --conf spark.mesos.role={role} \ - --conf spark.executorEnv.HOME={executor_env} \ - --driver-memory {driver_mem}G \ - --executor-memory {exec_mem}G \ - --conf spark.cores.max={max_core} \ - --conf spark.executor.cores={exec_core} \ - --conf spark.driver.maxResultSize=6G\ - --conf spark.sql.execution.arrow.pyspark.enabled=true\ - --conf spark.sql.execution.arrow.maxRecordsPerBatch=1000000\ - --conf spark.kryoserializer.buffer.max=512m\ - --py-files {FINK_FAT},{FINK_SCIENCE}\ - {application}" + spark_submit = spark.build_spark_submit(config) + spark_app = spark.spark_submit_application(spark_submit, application) + process = spark.run_spark_submit(spark_app, verbose) - if verbose: - logger.info("run recovering of data with spark") - process = subprocess.run(spark_submit, shell=True) if process.returncode != 0: logger = init_logging() + logger.error(f""" + Recovering of stream data with spark exited with a non-zero return code: {process.returncode} +""") logger.info(process.stderr) logger.info(process.stdout) return pd.DataFrame(columns=cols_to_keep) @@ -609,21 +581,18 @@ def get_data(tr_df_path, obs_df_path): sys.exit(doctest.testmod()[0]) elif sys.argv[1] == "prod": - from pyspark.sql import SparkSession + from fink_utils.broker.sparkUtils import init_sparksession logger = init_logging() - master_adress = str(sys.argv[2]) - read_path = str(sys.argv[3]) - output_path = str(sys.argv[4]) - is_mpc = string_to_bool(str(sys.argv[5])) - year = sys.argv[6] - month = sys.argv[7] - day = sys.argv[8] - - spark = ( - SparkSession.builder.master(master_adress) - .appName(f"FINK-FAT_recover_stream_data_{year}{month}{day}") - .getOrCreate() + read_path = str(sys.argv[2]) + output_path = str(sys.argv[3]) + is_mpc = string_to_bool(str(sys.argv[4])) + year = sys.argv[5] + month = sys.argv[6] + day = sys.argv[7] + + spark = init_sparksession( + f"FINK-FAT_recover_stream_data_{year}{month}{day}" ) df = spark.read.load(read_path) df = df.select( diff --git a/fink_fat/command_line/cli_main/fitroid.py b/fink_fat/command_line/cli_main/fitroid.py index c567a79d..08544a03 100644 --- a/fink_fat/command_line/cli_main/fitroid.py +++ b/fink_fat/command_line/cli_main/fitroid.py @@ -122,6 +122,7 @@ def fitroid_associations( if arguments["--night"]: last_night = arguments["--night"] + year, month, day = last_night.split("-") # path to the orbits data path_orbit = os.path.join(output_path, "orbital.parquet") path_trajectory_orb = os.path.join(output_path, "trajectory_orb.parquet") @@ -143,7 +144,6 @@ def fitroid_associations( "No alerts in the current night, compute the ephemeries for the next night" ) # even if no alerts for the current night, compute the ephemeries for the next night in any case - year, month, day = last_night.split("-") launch_spark_ephem( config, path_orbit, @@ -359,6 +359,7 @@ def fitroid_associations( with pd.option_context("mode.chained_assignment", None): trajectory_df["trajectory_id"] = trajectory_df["trajectory_id"].astype(int) fit_roid_df["trajectory_id"] = fit_roid_df["trajectory_id"].astype(int) + fit_roid_df["last_assoc_date"] = f"{year}-{month}-{day}" trajectory_df = trajectory_df.drop("updated", axis=1) trajectory_df.to_parquet(path_trajectory_df, index=False) @@ -369,19 +370,21 @@ def fitroid_associations( trajectory_orb.to_parquet(path_trajectory_orb, index=False) orbits.to_parquet(path_orbit, index=False) - # compute the ephemerides for the next observation night - if arguments["--verbose"]: - logger.info("start to compute ephemerides using spark") - t_before = time.time() + if len(orbits) > 0: + # compute the ephemerides for the next observation night + if arguments["--verbose"]: + logger.info("start to compute ephemerides using spark") + t_before = time.time() - year, month, day = last_night.split("-") - launch_spark_ephem( - config, path_orbit, os.path.join(output_path, "ephem.parquet"), year, month, day - ) + launch_spark_ephem( + config, path_orbit, os.path.join(output_path, "ephem.parquet"), year, month, day + ) + + if arguments["--verbose"]: + logger.info(f"ephemeries computing time: {time.time() - t_before:.4f} seconds") + logger.newline() if arguments["--verbose"]: - logger.info(f"ephemeries computing time: {time.time() - t_before:.4f} seconds") - logger.newline() hours, minutes, secondes = seconds_to_hms(time.time() - start_assoc_time) logger.info( f"total execution time: {hours} hours, {minutes} minutes, {secondes} seconds" diff --git a/fink_fat/command_line/cli_main/offline.py b/fink_fat/command_line/cli_main/offline.py index d829defc..652d0e3c 100644 --- a/fink_fat/command_line/cli_main/offline.py +++ b/fink_fat/command_line/cli_main/offline.py @@ -47,6 +47,7 @@ def cli_offline(arguments, config, output_path): # path to the associations data tr_df_path = os.path.join(output_path, "trajectory_df.parquet") obs_df_path = os.path.join(output_path, "old_obs.parquet") + fitroid_path = os.path.join(output_path, "fit_roid.parquet") # path to the orbit data orb_res_path = os.path.join(output_path, "orbital.parquet") @@ -90,7 +91,12 @@ def cli_offline(arguments, config, output_path): current_date = datetime.datetime.now() - delta_day # test if the trajectory_df and old_obs_df exists in the output directory. - if os.path.exists(tr_df_path) and os.path.exists(obs_df_path): + if object_class == "SSO fitroid": + test_path = os.path.exists(tr_df_path) and os.path.exists(fitroid_path) + else: + test_path = os.path.exists(tr_df_path) and os.path.exists(obs_df_path) + + if test_path: if arguments[""] is not None: logger.info("A save of trajectories candidates already exists.") logger.info( @@ -102,18 +108,30 @@ def cli_offline(arguments, config, output_path): logger.info("Abort offline mode.") exit() - trajectory_df = pd.read_parquet(tr_df_path) - old_obs_df = pd.read_parquet(obs_df_path) + # 1.0 case: load the fit_roid parquet + if object_class == "SSO fitroid": + fitroid_df = pd.read_parquet(fitroid_path) + current_date = max(pd.to_datetime( + fitroid_df["last_assoc_date"], format="%Y-%m-%d" + )) + current_date += delta_day - # first case: trajectories already exists: begin the offline mode with the last associations date + 1 - last_tr_date = pd.to_datetime( - trajectory_df["last_assoc_date"], format="%Y-%m-%d" - ) + # version < 1.0: load trajectory_df and old_obs parquet + else: + trajectory_df = pd.read_parquet(tr_df_path) + old_obs_df = pd.read_parquet(obs_df_path) - last_obs_date = pd.to_datetime(old_obs_df["last_assoc_date"], format="%Y-%m-%d") + # first case: trajectories already exists: begin the offline mode with the last associations date + 1 + last_tr_date = pd.to_datetime( + trajectory_df["last_assoc_date"], format="%Y-%m-%d" + ) - current_date = max(last_tr_date.max(), last_obs_date.max()) - current_date += delta_day + last_obs_date = pd.to_datetime(old_obs_df["last_assoc_date"], format="%Y-%m-%d") + + current_date = max(last_tr_date.max(), last_obs_date.max()) + current_date += delta_day + + logger.info(f"Start offline mode with the previous reconstructed trajectory, start date = {current_date.date()}") # last case: options given by the user, start the offline mode from this date. if arguments[""] is not None: @@ -126,8 +144,14 @@ def cli_offline(arguments, config, output_path): today = datetime.datetime.now().date() if current_date.date() > stop_date.date(): - logger.info( - f"Error !!! Start date {current_date.date()} is greater than stop date {stop_date.date()}." + logger.error( + f"Start date {current_date.date()} is greater than stop date {stop_date.date()}." + ) + exit() + + if current_date.date() == stop_date.date(): + logger.warn( + f"Start date {current_date.date()} = stop date {stop_date.date()}.\n\tStart the offline mode with the argument strictly greater than stop date." ) exit() diff --git a/fink_fat/command_line/cli_main/offline_fitroid/launch_roid.py b/fink_fat/command_line/cli_main/offline_fitroid/launch_roid.py index eaeccc6d..b8d82908 100755 --- a/fink_fat/command_line/cli_main/offline_fitroid/launch_roid.py +++ b/fink_fat/command_line/cli_main/offline_fitroid/launch_roid.py @@ -1,28 +1,9 @@ -# YEAR=$1 -# MONTH=$2 -# DAY=$3 - -# spark-submit \ -# --master mesos://vm-75063.lal.in2p3.fr:5050 \ -# --conf spark.mesos.principal=lsst \ -# --conf spark.mesos.secret=secret \ -# --conf spark.mesos.role=lsst \ -# --conf spark.executorEnv.HOME='/home/roman.le-montagner'\ -# --conf spark.driver.maxResultSize=6G\ -# --conf spark.sql.execution.arrow.pyspark.enabled=true\ -# --conf spark.sql.execution.arrow.maxRecordsPerBatch=1000000\ -# --conf spark.kryoserializer.buffer.max=512m\ -# --driver-memory 6G --executor-memory 4G --conf spark.cores.max=16 --conf spark.executor.cores=2\ -# --py-files ${FINK_FAT},${FINK_SCIENCE}\ -# $4/run_roid.py $YEAR $MONTH $DAY - - if __name__ == "__main__": import os import sys - import subprocess from fink_fat.others.utils import init_logging from fink_fat.command_line.utils_cli import init_cli + import fink_fat.others.launch_spark as spark logger = init_logging() @@ -33,17 +14,6 @@ config, output_path = init_cli({"--config": path_config}) - # load alerts from spark - master_manager = config["SOLVE_ORBIT_PARAMS"]["manager"] - principal_group = config["SOLVE_ORBIT_PARAMS"]["principal"] - secret = config["SOLVE_ORBIT_PARAMS"]["secret"] - role = config["SOLVE_ORBIT_PARAMS"]["role"] - executor_env = config["SOLVE_ORBIT_PARAMS"]["exec_env"] - driver_mem = config["SOLVE_ORBIT_PARAMS"]["driver_memory"] - exec_mem = config["SOLVE_ORBIT_PARAMS"]["executor_memory"] - max_core = config["SOLVE_ORBIT_PARAMS"]["max_core"] - exec_core = config["SOLVE_ORBIT_PARAMS"]["executor_core"] - application = os.path.join( path_offline, "run_roid.py", @@ -54,33 +24,14 @@ application += " " + day application += " " + path_config - # FIXME - # temporary dependencies (only during the performance test phase) - FINK_FAT = "/home/roman.le-montagner/home_big_storage/Doctorat/Asteroids/fink-fat/dist/fink_fat-1.0.0-py3.9.egg" - FINK_SCIENCE = "/home/roman.le-montagner/home_big_storage/Doctorat/fink-science/dist/fink_science-4.4-py3.7.egg" - - spark_submit = f"spark-submit \ - --master {master_manager} \ - --conf spark.mesos.principal={principal_group} \ - --conf spark.mesos.secret={secret} \ - --conf spark.mesos.role={role} \ - --conf spark.executorEnv.HOME={executor_env} \ - --driver-memory {driver_mem}G \ - --executor-memory {exec_mem}G \ - --conf spark.cores.max={max_core} \ - --conf spark.executor.cores={exec_core} \ - --conf spark.driver.maxResultSize=6G\ - --conf spark.sql.execution.arrow.pyspark.enabled=true\ - --conf spark.sql.execution.arrow.maxRecordsPerBatch=1000000\ - --conf spark.kryoserializer.buffer.max=512m\ - {application}" - - # --py-files {FINK_FAT},{FINK_SCIENCE}\ + spark_submit = spark.build_spark_submit(config) + spark_app = spark.spark_submit_application(spark_submit, application) + process = spark.run_spark_submit(spark_app, verbose) - if verbose: - logger.info("run recovering of data with spark") - process = subprocess.run(spark_submit, shell=True) if process.returncode != 0: logger = init_logging() + logger.error(f""" + Offline launch roid spark_submit exited with a non-zero return code: {process.returncode} +""") logger.info(process.stderr) logger.info(process.stdout) diff --git a/fink_fat/command_line/orbit_cli.py b/fink_fat/command_line/orbit_cli.py index 83a27f4a..4c8fcf79 100644 --- a/fink_fat/command_line/orbit_cli.py +++ b/fink_fat/command_line/orbit_cli.py @@ -12,6 +12,7 @@ compute_df_orbit_param, ) from fink_fat.command_line.utils_cli import assig_tags +import fink_fat.others.launch_spark as spark def intro_reset_orbit(): # pragma: no cover @@ -61,7 +62,12 @@ def get_orbital_data(config, tr_df_path): # pragma: no cover def cluster_mode( - config: dict, traj_to_orbital: pd.DataFrame, year: str, month: str, day: str + config: dict, + traj_to_orbital: pd.DataFrame, + year: str, + month: str, + day: str, + verbose: bool = False, ) -> pd.DataFrame: # pragma: no cover """ Compute orbits using the cluster mode @@ -101,24 +107,12 @@ def cluster_mode( prop_epoch = config["SOLVE_ORBIT_PARAMS"]["prop_epoch"] orbfit_verbose = config["SOLVE_ORBIT_PARAMS"]["orbfit_verbose"] - master_manager = config["SOLVE_ORBIT_PARAMS"]["manager"] - principal_group = config["SOLVE_ORBIT_PARAMS"]["principal"] - secret = config["SOLVE_ORBIT_PARAMS"]["secret"] - role = config["SOLVE_ORBIT_PARAMS"]["role"] - executor_env = config["SOLVE_ORBIT_PARAMS"]["exec_env"] - driver_mem = config["SOLVE_ORBIT_PARAMS"]["driver_memory"] - exec_mem = config["SOLVE_ORBIT_PARAMS"]["executor_memory"] - max_core = config["SOLVE_ORBIT_PARAMS"]["max_core"] - exec_core = config["SOLVE_ORBIT_PARAMS"]["executor_core"] - orbfit_home = config["SOLVE_ORBIT_PARAMS"]["orbfit_path"] - application = os.path.join( os.path.dirname(fink_fat.__file__), "orbit_fitting", "orbfit_cluster.py prod", ) - application += " " + master_manager application += " " + ram_dir application += " " + n_triplets application += " " + noise_ntrials @@ -128,33 +122,15 @@ def cluster_mode( application += " " + month application += " " + day - # FIXME - # temporary dependencies (only during the performance test phase) - FINK_FAT = "/home/roman.le-montagner/home_big_storage/Doctorat/Asteroids/fink-fat/dist/fink_fat-1.0.0-py3.9.egg" - FINK_SCIENCE = "/home/roman.le-montagner/home_big_storage/Doctorat/fink-science/dist/fink_science-4.4-py3.7.egg" - - spark_submit = f"spark-submit \ - --master {master_manager} \ - --conf spark.mesos.principal={principal_group} \ - --conf spark.mesos.secret={secret} \ - --conf spark.mesos.role={role} \ - --conf spark.executorEnv.HOME={executor_env} \ - --driver-memory {driver_mem}G \ - --executor-memory {exec_mem}G \ - --conf spark.cores.max={max_core} \ - --conf spark.executor.cores={exec_core} \ - --conf spark.driver.maxResultSize=6G\ - --conf spark.sql.execution.arrow.pyspark.enabled=true\ - --conf spark.sql.execution.arrow.maxRecordsPerBatch=1000000\ - --conf spark.kryoserializer.buffer.max=512m\ - --conf spark.executorEnv.ORBFIT_HOME={orbfit_home} \ - --py-files {FINK_FAT},{FINK_SCIENCE}\ - {application}" - - process = subprocess.run(spark_submit, shell=True) + spark_submit = spark.build_spark_submit(config) + spark_app = spark.spark_submit_application(spark_submit, application) + process = spark.run_spark_submit(spark_app, verbose) if process.returncode != 0: logger = init_logging() + logger.error(f""" + Spark orbit fitting exited with a non-zero return code: {process.returncode} +""") logger.info(process.stderr) logger.info(process.stdout) exit() @@ -218,7 +194,7 @@ def switch_local_cluster( traj_orb = traj_orb.drop("estimator_id", axis=1) if "ffdistnr" in traj_cols: traj_orb = traj_orb.drop("ffdistnr", axis=1) - new_orbit_pdf = cluster_mode(config, traj_orb, year, month, day) + new_orbit_pdf = cluster_mode(config, traj_orb, year, month, day, verbose) else: config_epoch = config["SOLVE_ORBIT_PARAMS"]["prop_epoch"] prop_epoch = None if config_epoch == "None" else float(config_epoch) diff --git a/fink_fat/data/fink_fat.conf b/fink_fat/data/fink_fat.conf index 31aa4214..70c596b5 100644 --- a/fink_fat/data/fink_fat.conf +++ b/fink_fat/data/fink_fat.conf @@ -45,20 +45,23 @@ n_triplets=30 noise_ntrials=20 prop_epoch=None orbfit_verbose=3 - +local_mode_limit=1000 orbfit_limit=6 cpu_count=1 -ram_dir="" +ram_dir=/media/virtuelram +orbfit_path=$ORBFIT_HOME + +[SPARK] manager=local[2] principal= secret= role= -exec_env=$HOME +exec_env='/home' driver_memory=4 executor_memory=16 max_core=64 executor_core=8 -orbfit_path=$ORBFIT_HOME +py_files= [ASSOC_SYSTEM] diff --git a/fink_fat/data/fink_fat_euclid.conf b/fink_fat/data/fink_fat_euclid.conf index 708cce28..0a050791 100644 --- a/fink_fat/data/fink_fat_euclid.conf +++ b/fink_fat/data/fink_fat_euclid.conf @@ -23,10 +23,13 @@ n_triplets=30 noise_ntrials=20 prop_epoch=2459769.02144 orbfit_verbose=3 - +local_mode_limit=1000 orbfit_limit=6 cpu_count=1 ram_dir=/media/peloton/ +orbfit_path=/home + +[SPARK] manager=local[2] principal= secret= @@ -36,7 +39,7 @@ driver_memory=4 executor_memory=16 max_core=64 executor_core=8 -orbfit_path=/home +py_files= [ASSOC_SYSTEM] diff --git a/fink_fat/orbit_fitting/orbfit_cluster.py b/fink_fat/orbit_fitting/orbfit_cluster.py index 9cef8180..98ffb2e8 100644 --- a/fink_fat/orbit_fitting/orbfit_cluster.py +++ b/fink_fat/orbit_fitting/orbfit_cluster.py @@ -209,16 +209,17 @@ def get_orbit_element(ra, dec, magpsf, band, date, traj_id): # pragma: no cover # Run the test suite spark_unit_tests(globs) elif sys.argv[1] == "prod": + from fink_utils.broker.sparkUtils import init_sparksession + logger = init_logging() - master_adress = str(sys.argv[2]) - ram_dir = str(sys.argv[3]) - n_triplets = int(sys.argv[4]) - noise_ntrials = int(sys.argv[5]) - prop_epoch = None if sys.argv[6] == "None" else float(sys.argv[6]) - orbfit_verbose = int(sys.argv[7]) - year = sys.argv[8] - month = sys.argv[9] - day = sys.argv[10] + ram_dir = str(sys.argv[2]) + n_triplets = int(sys.argv[3]) + noise_ntrials = int(sys.argv[4]) + prop_epoch = None if sys.argv[5] == "None" else float(sys.argv[5]) + orbfit_verbose = int(sys.argv[6]) + year = sys.argv[7] + month = sys.argv[8] + day = sys.argv[9] msg_info = """ master: {} @@ -227,7 +228,6 @@ def get_orbit_element(ra, dec, magpsf, band, date, traj_id): # pragma: no cover noise_ntrials: {} prop_epoch: {} """.format( - master_adress, ram_dir, n_triplets, noise_ntrials, @@ -235,10 +235,8 @@ def get_orbit_element(ra, dec, magpsf, band, date, traj_id): # pragma: no cover ) logger.info(msg_info) - spark = spark = ( - SparkSession.builder.master(master_adress) - .appName(f"Fink-FAT_solve_orbit_{year}{month}{day}") - .getOrCreate() + spark = init_sparksession( + f"Fink-FAT_solve_orbit_{year}{month}{day}" ) # read the input from local parquet file diff --git a/fink_fat/others/launch_spark.py b/fink_fat/others/launch_spark.py new file mode 100644 index 00000000..da12f634 --- /dev/null +++ b/fink_fat/others/launch_spark.py @@ -0,0 +1,100 @@ +import configparser +import subprocess + +from fink_fat.others.utils import init_logging + + +def build_spark_submit(config: configparser.ConfigParser) -> str: + """ + Build the spark-submit application command + + Parameters + ---------- + config : configparser.ConfigParser + the config entries from the config file + + Returns + ------- + str + spark-submit command without the application + """ + master_manager = config["SPARK"]["manager"] + principal_group = config["SPARK"]["principal"] + secret = config["SPARK"]["secret"] + role = config["SPARK"]["role"] + executor_env = config["SPARK"]["exec_env"] + driver_mem = config["SPARK"]["driver_memory"] + exec_mem = config["SPARK"]["executor_memory"] + max_core = config["SPARK"]["max_core"] + exec_core = config["SPARK"]["executor_core"] + py_files = config["SPARK"]["py_files"] + + spark_submit = f"spark-submit \ + --master {master_manager} \ + --conf spark.mesos.principal={principal_group} \ + --conf spark.mesos.secret={secret} \ + --conf spark.mesos.role={role} \ + --conf spark.executorEnv.HOME={executor_env} \ + --driver-memory {driver_mem}G \ + --executor-memory {exec_mem}G \ + --conf spark.cores.max={max_core} \ + --conf spark.executor.cores={exec_core} \ + --conf spark.driver.maxResultSize=6G\ + --conf spark.sql.execution.arrow.pyspark.enabled=true\ + --conf spark.sql.execution.arrow.maxRecordsPerBatch=1000000\ + --conf spark.kryoserializer.buffer.max=512m" + + if py_files != "": + spark_submit += f" --py-files {py_files}" + + return spark_submit + + +def spark_submit_application(spark_cmd: str, application: str) -> str: + """ + Merge the spark submit command with the application + + Parameters + ---------- + spark_cmd : str + spark submit command + application : str + application with the parameters + + Returns + ------- + str + spark submit command concatenate with the application + """ + return f"{spark_cmd} {application}" + + +def run_spark_submit(spark_cmd: str, verbose: bool) -> subprocess.CompletedProcess: + """ + Run the spark_submit command within a subprocess + + Parameters + ---------- + spark_cmd : str + spark submit command + verbose : bool + if true, print logs + + Returns + ------- + subprocess.CompletedProcess + the process running the spark command + """ + logger = init_logging() + if verbose: + logger.info( + f""" + run a spark_submit command + + cmd: + {spark_cmd} + """ + ) + process = subprocess.run(spark_cmd, shell=True) + + return process diff --git a/fink_fat/streaming_associations/spark_ephem.py b/fink_fat/streaming_associations/spark_ephem.py index d7b2f29c..d7452c6b 100644 --- a/fink_fat/streaming_associations/spark_ephem.py +++ b/fink_fat/streaming_associations/spark_ephem.py @@ -1,9 +1,9 @@ import configparser import os -import subprocess import fink_fat from fink_fat.others.utils import init_logging +import fink_fat.others.launch_spark as spark def launch_spark_ephem( config: configparser.ConfigParser, @@ -63,16 +63,6 @@ def launch_spark_ephem( >>> os.remove("ephem.parquet") """ - master_manager = config["SOLVE_ORBIT_PARAMS"]["manager"] - principal_group = config["SOLVE_ORBIT_PARAMS"]["principal"] - secret = config["SOLVE_ORBIT_PARAMS"]["secret"] - role = config["SOLVE_ORBIT_PARAMS"]["role"] - executor_env = config["SOLVE_ORBIT_PARAMS"]["exec_env"] - driver_mem = config["SOLVE_ORBIT_PARAMS"]["driver_memory"] - exec_mem = config["SOLVE_ORBIT_PARAMS"]["executor_memory"] - max_core = config["SOLVE_ORBIT_PARAMS"]["max_core"] - exec_core = config["SOLVE_ORBIT_PARAMS"]["executor_core"] - application = os.path.join( os.path.dirname(fink_fat.__file__), "others", @@ -87,29 +77,10 @@ def launch_spark_ephem( application += " " + month application += " " + day - # FIXME - # temporary dependencies (only during the performance test phase) - FINK_FAT = "/home/roman.le-montagner/home_big_storage/Doctorat/Asteroids/fink-fat/dist/fink_fat-1.0.0-py3.9.egg" - FINK_SCIENCE = "/home/roman.le-montagner/home_big_storage/Doctorat/fink-science/dist/fink_science-4.4-py3.7.egg" - - spark_submit = f"spark-submit \ - --master {master_manager} \ - --conf spark.mesos.principal={principal_group} \ - --conf spark.mesos.secret={secret} \ - --conf spark.mesos.role={role} \ - --conf spark.executorEnv.HOME={executor_env} \ - --driver-memory {driver_mem}G \ - --executor-memory {exec_mem}G \ - --conf spark.cores.max={max_core} \ - --conf spark.executor.cores={exec_core} \ - --conf spark.driver.maxResultSize=6G\ - --conf spark.sql.execution.arrow.pyspark.enabled=true\ - --conf spark.sql.execution.arrow.maxRecordsPerBatch=1000000\ - --conf spark.kryoserializer.buffer.max=512m\ - --py-files {FINK_FAT},{FINK_SCIENCE}\ - {application}" + spark_submit = spark.build_spark_submit(config) + spark_app = spark.spark_submit_application(spark_submit, application) + process = spark.run_spark_submit(spark_app, verbose) - process = subprocess.run(spark_submit, shell=True, capture_output=True) logger = init_logging() if verbose: logger.info( diff --git a/fink_fat/test/cli_test/fitroid_test.conf b/fink_fat/test/cli_test/fitroid_test.conf index 4acdde69..eabf60bf 100644 --- a/fink_fat/test/cli_test/fitroid_test.conf +++ b/fink_fat/test/cli_test/fitroid_test.conf @@ -45,11 +45,13 @@ n_triplets=30 noise_ntrials=20 prop_epoch=None orbfit_verbose=3 - local_mode_limit=1000 orbfit_limit=10 cpu_count=1 ram_dir=fink_fat/test/cli_test +orbfit_path=$ORBFIT_HOME + +[SPARK] manager=local[8] principal=lsst secret=secret @@ -59,7 +61,7 @@ driver_memory=4 executor_memory=16 max_core=64 executor_core=8 -orbfit_path=$ORBFIT_HOME +py_files= [ASSOC_SYSTEM]