From 1bb774bb0216cf558b8ed64f88e259577ed3ed8c Mon Sep 17 00:00:00 2001 From: yuvrajjain2003 Date: Fri, 1 Dec 2023 14:01:28 -0500 Subject: [PATCH 1/4] Move config files into legacy data section in test --- .../data/legacy_batch_configs}/pittWorkstation.json | 0 .../data/legacy_batch_configs}/sgeDukeBIAC.json | 0 .../data/legacy_batch_configs}/slurmUNCConfig.json | 0 .../data/legacy_batch_configs}/slurmUNCConfigHeudiconv.json | 0 .../data/legacy_batch_configs}/slurmUNCConfigSnakeCase.json | 0 .../data/legacy_batch_configs}/slurmUVAConfig.json | 0 6 files changed, 0 insertions(+), 0 deletions(-) rename {clpipe/batchConfigs => tests/data/legacy_batch_configs}/pittWorkstation.json (100%) rename {clpipe/batchConfigs => tests/data/legacy_batch_configs}/sgeDukeBIAC.json (100%) rename {clpipe/batchConfigs => tests/data/legacy_batch_configs}/slurmUNCConfig.json (100%) rename {clpipe/batchConfigs => tests/data/legacy_batch_configs}/slurmUNCConfigHeudiconv.json (100%) rename {clpipe/batchConfigs => tests/data/legacy_batch_configs}/slurmUNCConfigSnakeCase.json (100%) rename {clpipe/batchConfigs => tests/data/legacy_batch_configs}/slurmUVAConfig.json (100%) diff --git a/clpipe/batchConfigs/pittWorkstation.json b/tests/data/legacy_batch_configs/pittWorkstation.json similarity index 100% rename from clpipe/batchConfigs/pittWorkstation.json rename to tests/data/legacy_batch_configs/pittWorkstation.json diff --git a/clpipe/batchConfigs/sgeDukeBIAC.json b/tests/data/legacy_batch_configs/sgeDukeBIAC.json similarity index 100% rename from clpipe/batchConfigs/sgeDukeBIAC.json rename to tests/data/legacy_batch_configs/sgeDukeBIAC.json diff --git a/clpipe/batchConfigs/slurmUNCConfig.json b/tests/data/legacy_batch_configs/slurmUNCConfig.json similarity index 100% rename from clpipe/batchConfigs/slurmUNCConfig.json rename to tests/data/legacy_batch_configs/slurmUNCConfig.json diff --git a/clpipe/batchConfigs/slurmUNCConfigHeudiconv.json b/tests/data/legacy_batch_configs/slurmUNCConfigHeudiconv.json similarity index 100% rename from clpipe/batchConfigs/slurmUNCConfigHeudiconv.json rename to tests/data/legacy_batch_configs/slurmUNCConfigHeudiconv.json diff --git a/clpipe/batchConfigs/slurmUNCConfigSnakeCase.json b/tests/data/legacy_batch_configs/slurmUNCConfigSnakeCase.json similarity index 100% rename from clpipe/batchConfigs/slurmUNCConfigSnakeCase.json rename to tests/data/legacy_batch_configs/slurmUNCConfigSnakeCase.json diff --git a/clpipe/batchConfigs/slurmUVAConfig.json b/tests/data/legacy_batch_configs/slurmUVAConfig.json similarity index 100% rename from clpipe/batchConfigs/slurmUVAConfig.json rename to tests/data/legacy_batch_configs/slurmUVAConfig.json From cf86245123aadce5d78d3388021bd3fd78606633 Mon Sep 17 00:00:00 2001 From: yuvrajjain2003 Date: Fri, 1 Dec 2023 14:16:50 -0500 Subject: [PATCH 2/4] Remove old batch manager, so tests not needed --- tests/test_batch_manager.py | 38 ------------------------------------- 1 file changed, 38 deletions(-) delete mode 100644 tests/test_batch_manager.py diff --git a/tests/test_batch_manager.py b/tests/test_batch_manager.py deleted file mode 100644 index 59888998..00000000 --- a/tests/test_batch_manager.py +++ /dev/null @@ -1,38 +0,0 @@ -import pytest -from clpipe.job_manager import * - -SLURMUNCCONFIG: str = "clpipe/batchConfigs/slurmUNCConfig.json" - - -def test_batch_manager_instantiation(): - batch_manager = BatchManager(SLURMUNCCONFIG) - assert isinstance(batch_manager, BatchManager) - - -def test_adding_jobs(): - batch_manager = BatchManager(SLURMUNCCONFIG) - job1 = Job(1, "echo hi") - batch_manager.add_job(job1) - - assert len(batch_manager.jobs) == 1 - assert batch_manager.jobs[0].job_name == 1 - assert batch_manager.jobs[0].job_string == "echo hi" - - job2 = Job(2, "additional_job") - batch_manager.add_job(job2) - - assert len(batch_manager.jobs) == 2 - assert batch_manager.jobs[1].job_name == 2 - assert batch_manager.jobs[1].job_string == "additional_job" - - -def test_job_string_creation(): - batch_manager = BatchManager(SLURMUNCCONFIG) - job = Job(1, "test_job_string") - batch_manager.add_job(job) - batch_manager.compile_job_strings() - assert len(batch_manager.submission_list) == 1 - - -def test_parallel_manager_creation(): - parallel_manager = JobManager() From 309acb899458884105b241b9fca7a58838b08dda Mon Sep 17 00:00:00 2001 From: yuvrajjain2003 Date: Fri, 1 Dec 2023 14:17:57 -0500 Subject: [PATCH 3/4] Refactor GLM to use the new job manager factory GLM no longer uses the BatchConfig inside the BatchConfigOptions to set up the config file. Instead it uses the ProjectOptions to load in the batchconfigs --- clpipe/data/defaultGLMConfig.json | 6 ++-- clpipe/glm_launch.py | 52 +++++++------------------------ 2 files changed, 14 insertions(+), 44 deletions(-) diff --git a/clpipe/data/defaultGLMConfig.json b/clpipe/data/defaultGLMConfig.json index 03ed53ed..0720134b 100644 --- a/clpipe/data/defaultGLMConfig.json +++ b/clpipe/data/defaultGLMConfig.json @@ -37,8 +37,7 @@ "MemoryUsage": "10G", "TimeUsage": "10:0:0", "NThreads": "4", - "Email": "", - "BatchConfig": "slurmUNCConfig.json" + "Email": "" } } ], @@ -54,8 +53,7 @@ "MemoryUsage": "5G", "TimeUsage": "5:0:0", "NThreads": "4", - "Email": "", - "BatchConfig": "slurmUNCConfig.json" + "Email": "" } } ] diff --git a/clpipe/glm_launch.py b/clpipe/glm_launch.py index 3122a456..5acac5c1 100644 --- a/clpipe/glm_launch.py +++ b/clpipe/glm_launch.py @@ -3,7 +3,7 @@ from pathlib import Path from .config.glm import * -from .job_manager import BatchManager, Job, DEFAULT_BATCH_CONFIG_PATH +from .job_manager import JobManagerFactory, DEFAULT_BATCH_CONFIG_PATH from .utils import get_logger DEFAULT_L1_MEMORY_USAGE = "10G" @@ -60,7 +60,7 @@ def glm_launch( memory_usage = batch_options["MemoryUsage"] time_usage = batch_options["TimeUsage"] n_threads = int(batch_options["NThreads"]) - batch_config_path = batch_options["BatchConfig"] + batch_config_path = glm_config.parent_options.batch_config_path email = batch_options["Email"] except KeyError: if level == L1: @@ -88,20 +88,22 @@ def glm_launch( log_dir = out_dir logger.info(f"Using log dir: {log_dir}") - batch_manager = _setup_batch_manager( - batch_config_path, - log_dir, - memory_usage=memory_usage, - time_usage=time_usage, - n_threads=n_threads, - email=email, + batch_manager = JobManagerFactory.get( + batch_config=batch_config_path, + output_directory=log_dir, + mem_use=memory_usage, + time=time_usage, + threads=n_threads, + email=email ) submission_strings = _create_submission_strings(fsf_dir, test_one=test_one) num_jobs = len(submission_strings) - _populate_batch_manager(batch_manager, submission_strings) + for key in submission_strings.keys(): + batch_manager.addjob(key, submission_strings[key]) + if submit: logger.info(f"Running {num_jobs} job(s) in batch mode") batch_manager.submit_jobs() @@ -109,28 +111,6 @@ def glm_launch( batch_manager.print_jobs() sys.exit(0) - -def _setup_batch_manager( - batch_config_path: str, - log_dir: str, - memory_usage: str = None, - time_usage: str = None, - n_threads: int = None, - email: str = None, -): - batch_manager = BatchManager(batch_config_path, log_dir) - if memory_usage: - batch_manager.update_mem_usage(memory_usage) - if time_usage: - batch_manager.update_time(time_usage) - if n_threads: - batch_manager.update_nthreads(n_threads) - if email: - batch_manager.update_email(email) - - return batch_manager - - def _create_submission_strings(fsf_files: os.PathLike, test_one: bool = False): submission_strings = {} @@ -147,11 +127,3 @@ def _create_submission_strings(fsf_files: os.PathLike, test_one: bool = False): if test_one: break return submission_strings - - -def _populate_batch_manager(batch_manager: BatchManager, submission_strings: dict): - for key in submission_strings.keys(): - batch_manager.addjob(Job(key, submission_strings[key])) - - batch_manager.createsubmissionhead() - batch_manager.compilejobstrings() From cfe6f21ac649d18ce8e537e98a3bd908e75499da Mon Sep 17 00:00:00 2001 From: yuvrajjain2003 Date: Fri, 1 Dec 2023 14:18:27 -0500 Subject: [PATCH 4/4] Update job manager to take file paths or dataclass --- clpipe/config/options.py | 2 +- clpipe/job_manager.py | 146 ++------------------------------------ tests/test_job_manager.py | 20 +++++- 3 files changed, 26 insertions(+), 142 deletions(-) diff --git a/clpipe/config/options.py b/clpipe/config/options.py index 60889180..36215da7 100644 --- a/clpipe/config/options.py +++ b/clpipe/config/options.py @@ -7,7 +7,7 @@ from pathlib import Path from typing import Union from .package import VERSION -from utils import get_logger +from ..utils import get_logger DEFAULT_CONFIG_FILE_NAME = "clpipe_config.json" DEFAULT_PROCESSING_STREAM = "default" diff --git a/clpipe/job_manager.py b/clpipe/job_manager.py index aa4d9dc3..c7d90d18 100644 --- a/clpipe/job_manager.py +++ b/clpipe/job_manager.py @@ -67,7 +67,7 @@ def submit_jobs(self): class BatchJobManager(JobManager): def __init__( self, - batch_system_config: os.PathLike, + batch_system_config: BatchManagerConfig, output_directory=None, debug=False, mem_use=None, @@ -76,7 +76,7 @@ def __init__( email=None, ): super().__init__(output_directory, debug) - self.config = BatchManagerConfig.load(os.path.abspath(batch_system_config)) + self.config = batch_system_config self.config.mem_use = mem_use if mem_use else self.config.memory_default self.config.time = time if time else self.config.time_default @@ -177,6 +177,9 @@ def get( The method to be used for running the job. """ if batch_config: + if not isinstance(batch_config, BatchManagerConfig): + batch_config = BatchManagerConfig.load(os.path.abspath(batch_config)) + return BatchJobManager( batch_config, output_directory, debug, mem_use, time, threads, email ) @@ -187,141 +190,4 @@ def get( class Job: def __init__(self, job_name, job_string): self.job_name = job_name - self.job_string = job_string - - -# OLD BATCH MANAGER - - -class BatchManager: - """ - Handles the creation and submission of batch jobs. - """ - - def __init__( - self, batch_system_config: os.PathLike, output_directory=None, debug=False - ): - self.jobs = [] - self.debug = debug - self.logger = get_logger(LOGGER_NAME, debug=debug) - - if os.path.exists(os.path.abspath(batch_system_config)): - self.logger.debug(f"Using batch config at: {batch_system_config}") - with open(os.path.abspath(batch_system_config)) as bat_config: - self.config = json.load(bat_config) - else: - with resource_stream( - __name__, "batchConfigs/" + batch_system_config - ) as bat_config: - self.config = json.load(bat_config) - - self.submission_list = [] - if output_directory is None: - self.logger.warning( - ("No output directory provided " "- defauling to current directory") - ) - output_directory = "." - self.logger.info(f"Batch job output path: {output_directory}") - self.output_dir = os.path.abspath(output_directory) - if not os.path.isdir(output_directory): - os.makedirs(output_directory) - self.logger.debug(f"Created batch output directory at: {output_directory}") - - def update_mem_usage(self, mem_use): - self.config["MemoryDefault"] = mem_use - - def update_time(self, time): - self.config["TimeDefault"] = time - - def update_nthreads(self, threads): - self.config["NThreads"] = threads - - def update_email(self, email): - self.config["EmailAddress"] = email - - def addjob(self, job): - self.jobs.append(job) - - def add_job(self, job): - return self.addjob(job) - - def compilejobstrings(self): - header = self.createsubmissionhead() - for job in self.jobs: - temp = header.format(jobid=job.job_name, cmdwrap=job.job_string) - self.submission_list.append(temp) - - def compile_job_strings(self): - return self.compilejobstrings() - - def createsubmissionhead(self): - head = [self.config["SubmissionHead"]] - for e in self.config["SubmissionOptions"]: - temp = e["command"] + " " + e["args"] - head.append(temp) - for e in self.config["SubOptionsEqual"]: - temp = e["command"] + "=" + e["args"] - head.append(temp) - - head.append( - self.config["MemoryCommand"].format(mem=self.config["MemoryDefault"]) - ) - if self.config["TimeCommandActive"]: - head.append( - self.config["TimeCommand"].format(time=self.config["TimeDefault"]) - ) - if self.config["ThreadCommandActive"]: - head.append( - self.config["NThreadsCommand"].format(nthreads=self.config["NThreads"]) - ) - if self.config["JobIDCommandActive"]: - head.append(self.config["JobIDCommand"].format(jobid=JOB_ID_FORMAT_STR)) - if self.config["OutputCommandActive"]: - head.append( - self.config["OutputCommand"].format( - output=os.path.abspath( - os.path.join(self.output_dir, OUTPUT_FORMAT_STR) - ) - ) - ) - if self.config["EmailAddress"]: - head.append( - self.config["EmailCommand"].format(email=self.config["EmailAddress"]) - ) - head.append(self.config["CommandWrapper"]) - - return " ".join(head) - - def create_submission_head(self): - return self.createsubmissionhead() - - def submit_jobs(self): - self.logger.info(f"Submitting {len(self.submission_list)} job(s).") - self.logger.debug(f"Memory usage: {self.config['MemoryDefault']}") - self.logger.debug(f"Time usage: {self.config['TimeDefault']}") - self.logger.debug(f"Number of threads: {self.config['NThreads']}") - self.logger.debug(f"Email: {self.config['EmailAddress']}") - for job in self.submission_list: - os.system(job) - - def print_jobs(self): - job_count = len(self.submission_list) - - if job_count == 0: - output = "No jobs to run." - else: - output = "Jobs to run:\n\n" - for index, job in enumerate(self.submission_list): - output += "\t" + job + "\n\n" - if ( - index == MAX_JOB_DISPLAY - 1 - and job_count > MAX_JOB_DISPLAY - and not self.debug - ): - output += f"\t...and {job_count - 5} more job(s).\n" - break - output += "Re-run with the '-submit' flag to launch these jobs." - self.logger.info(output) - - def get_threads_command(self): - return [self.config["NThreadsCommand"], self.config["NThreads"]] + self.job_string = job_string \ No newline at end of file diff --git a/tests/test_job_manager.py b/tests/test_job_manager.py index f6a657eb..38e9152b 100644 --- a/tests/test_job_manager.py +++ b/tests/test_job_manager.py @@ -1,7 +1,7 @@ import pytest from clpipe.job_manager import * -SLURMUNCCONFIG: str = "clpipe/batchConfigs/slurmUNCConfigSnakeCase.json" +SLURMUNCCONFIG: str = "tests/data/legacy_batch_configs/slurmUNCConfigSnakeCase.json" def test_batch_manager_instantiation(scatch_dir): @@ -21,6 +21,24 @@ def test_batch_manager_instantiation(scatch_dir): assert len(batch_manager.job_queue) == 0 +def test_batch_manager_instantiation_dataclass(scatch_dir): + batch_config = BatchManagerConfig.from_default("unc") + batch_manager = JobManagerFactory.get( + batch_config=batch_config, output_directory=scatch_dir + ) + assert isinstance(batch_manager, BatchJobManager) + + batch_manager.add_job(1, "echo hi") + assert len(batch_manager.job_queue) == 1 + + batch_manager.add_job(2, "echo test") + assert len(batch_manager.job_queue) == 2 + + batch_manager.print_jobs() + batch_manager.submit_jobs() + + assert len(batch_manager.job_queue) == 0 + def test_local_manager_instantiation(scatch_dir): local_manager = JobManagerFactory.get(output_directory=scatch_dir)