Skip to content

Commit

Permalink
Merge branch 'develop' of github.com:cohenlabUNC/clpipe into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
wasciutto committed Dec 1, 2023
2 parents 7af6d22 + 5be479f commit 3f6f71e
Show file tree
Hide file tree
Showing 11 changed files with 39 additions and 223 deletions.
6 changes: 2 additions & 4 deletions clpipe/data/defaultGLMConfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@
"MemoryUsage": "10G",
"TimeUsage": "10:0:0",
"NThreads": "4",
"Email": "",
"BatchConfig": "slurmUNCConfig.json"
"Email": ""
}
}
],
Expand All @@ -54,8 +53,7 @@
"MemoryUsage": "5G",
"TimeUsage": "5:0:0",
"NThreads": "4",
"Email": "",
"BatchConfig": "slurmUNCConfig.json"
"Email": ""
}
}
]
Expand Down
52 changes: 12 additions & 40 deletions clpipe/glm_launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from pathlib import Path

from .config.glm import *
from .job_manager import BatchManager, Job, DEFAULT_BATCH_CONFIG_PATH
from .job_manager import JobManagerFactory, DEFAULT_BATCH_CONFIG_PATH
from .utils import get_logger

DEFAULT_L1_MEMORY_USAGE = "10G"
Expand Down Expand Up @@ -60,7 +60,7 @@ def glm_launch(
memory_usage = batch_options["MemoryUsage"]
time_usage = batch_options["TimeUsage"]
n_threads = int(batch_options["NThreads"])
batch_config_path = batch_options["BatchConfig"]
batch_config_path = glm_config.parent_options.batch_config_path
email = batch_options["Email"]
except KeyError:
if level == L1:
Expand Down Expand Up @@ -88,49 +88,29 @@ def glm_launch(
log_dir = out_dir
logger.info(f"Using log dir: {log_dir}")

batch_manager = _setup_batch_manager(
batch_config_path,
log_dir,
memory_usage=memory_usage,
time_usage=time_usage,
n_threads=n_threads,
email=email,
batch_manager = JobManagerFactory.get(
batch_config=batch_config_path,
output_directory=log_dir,
mem_use=memory_usage,
time=time_usage,
threads=n_threads,
email=email
)

submission_strings = _create_submission_strings(fsf_dir, test_one=test_one)

num_jobs = len(submission_strings)

_populate_batch_manager(batch_manager, submission_strings)
for key in submission_strings.keys():
batch_manager.addjob(key, submission_strings[key])

if submit:
logger.info(f"Running {num_jobs} job(s) in batch mode")
batch_manager.submit_jobs()
else:
batch_manager.print_jobs()
sys.exit(0)


def _setup_batch_manager(
batch_config_path: str,
log_dir: str,
memory_usage: str = None,
time_usage: str = None,
n_threads: int = None,
email: str = None,
):
batch_manager = BatchManager(batch_config_path, log_dir)
if memory_usage:
batch_manager.update_mem_usage(memory_usage)
if time_usage:
batch_manager.update_time(time_usage)
if n_threads:
batch_manager.update_nthreads(n_threads)
if email:
batch_manager.update_email(email)

return batch_manager


def _create_submission_strings(fsf_files: os.PathLike, test_one: bool = False):
submission_strings = {}

Expand All @@ -147,11 +127,3 @@ def _create_submission_strings(fsf_files: os.PathLike, test_one: bool = False):
if test_one:
break
return submission_strings


def _populate_batch_manager(batch_manager: BatchManager, submission_strings: dict):
for key in submission_strings.keys():
batch_manager.addjob(Job(key, submission_strings[key]))

batch_manager.createsubmissionhead()
batch_manager.compilejobstrings()
146 changes: 6 additions & 140 deletions clpipe/job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def submit_jobs(self):
class BatchJobManager(JobManager):
def __init__(
self,
batch_system_config: os.PathLike,
batch_system_config: BatchManagerConfig,
output_directory=None,
debug=False,
mem_use=None,
Expand All @@ -76,7 +76,7 @@ def __init__(
email=None,
):
super().__init__(output_directory, debug)
self.config = BatchManagerConfig.load(os.path.abspath(batch_system_config))
self.config = batch_system_config

self.config.mem_use = mem_use if mem_use else self.config.memory_default
self.config.time = time if time else self.config.time_default
Expand Down Expand Up @@ -177,6 +177,9 @@ def get(
The method to be used for running the job.
"""
if batch_config:
if not isinstance(batch_config, BatchManagerConfig):
batch_config = BatchManagerConfig.load(os.path.abspath(batch_config))

return BatchJobManager(
batch_config, output_directory, debug, mem_use, time, threads, email
)
Expand All @@ -187,141 +190,4 @@ def get(
class Job:
def __init__(self, job_name, job_string):
self.job_name = job_name
self.job_string = job_string


# OLD BATCH MANAGER


class BatchManager:
"""
Handles the creation and submission of batch jobs.
"""

def __init__(
self, batch_system_config: os.PathLike, output_directory=None, debug=False
):
self.jobs = []
self.debug = debug
self.logger = get_logger(LOGGER_NAME, debug=debug)

if os.path.exists(os.path.abspath(batch_system_config)):
self.logger.debug(f"Using batch config at: {batch_system_config}")
with open(os.path.abspath(batch_system_config)) as bat_config:
self.config = json.load(bat_config)
else:
with resource_stream(
__name__, "batchConfigs/" + batch_system_config
) as bat_config:
self.config = json.load(bat_config)

self.submission_list = []
if output_directory is None:
self.logger.warning(
("No output directory provided " "- defauling to current directory")
)
output_directory = "."
self.logger.info(f"Batch job output path: {output_directory}")
self.output_dir = os.path.abspath(output_directory)
if not os.path.isdir(output_directory):
os.makedirs(output_directory)
self.logger.debug(f"Created batch output directory at: {output_directory}")

def update_mem_usage(self, mem_use):
self.config["MemoryDefault"] = mem_use

def update_time(self, time):
self.config["TimeDefault"] = time

def update_nthreads(self, threads):
self.config["NThreads"] = threads

def update_email(self, email):
self.config["EmailAddress"] = email

def addjob(self, job):
self.jobs.append(job)

def add_job(self, job):
return self.addjob(job)

def compilejobstrings(self):
header = self.createsubmissionhead()
for job in self.jobs:
temp = header.format(jobid=job.job_name, cmdwrap=job.job_string)
self.submission_list.append(temp)

def compile_job_strings(self):
return self.compilejobstrings()

def createsubmissionhead(self):
head = [self.config["SubmissionHead"]]
for e in self.config["SubmissionOptions"]:
temp = e["command"] + " " + e["args"]
head.append(temp)
for e in self.config["SubOptionsEqual"]:
temp = e["command"] + "=" + e["args"]
head.append(temp)

head.append(
self.config["MemoryCommand"].format(mem=self.config["MemoryDefault"])
)
if self.config["TimeCommandActive"]:
head.append(
self.config["TimeCommand"].format(time=self.config["TimeDefault"])
)
if self.config["ThreadCommandActive"]:
head.append(
self.config["NThreadsCommand"].format(nthreads=self.config["NThreads"])
)
if self.config["JobIDCommandActive"]:
head.append(self.config["JobIDCommand"].format(jobid=JOB_ID_FORMAT_STR))
if self.config["OutputCommandActive"]:
head.append(
self.config["OutputCommand"].format(
output=os.path.abspath(
os.path.join(self.output_dir, OUTPUT_FORMAT_STR)
)
)
)
if self.config["EmailAddress"]:
head.append(
self.config["EmailCommand"].format(email=self.config["EmailAddress"])
)
head.append(self.config["CommandWrapper"])

return " ".join(head)

def create_submission_head(self):
return self.createsubmissionhead()

def submit_jobs(self):
self.logger.info(f"Submitting {len(self.submission_list)} job(s).")
self.logger.debug(f"Memory usage: {self.config['MemoryDefault']}")
self.logger.debug(f"Time usage: {self.config['TimeDefault']}")
self.logger.debug(f"Number of threads: {self.config['NThreads']}")
self.logger.debug(f"Email: {self.config['EmailAddress']}")
for job in self.submission_list:
os.system(job)

def print_jobs(self):
job_count = len(self.submission_list)

if job_count == 0:
output = "No jobs to run."
else:
output = "Jobs to run:\n\n"
for index, job in enumerate(self.submission_list):
output += "\t" + job + "\n\n"
if (
index == MAX_JOB_DISPLAY - 1
and job_count > MAX_JOB_DISPLAY
and not self.debug
):
output += f"\t...and {job_count - 5} more job(s).\n"
break
output += "Re-run with the '-submit' flag to launch these jobs."
self.logger.info(output)

def get_threads_command(self):
return [self.config["NThreadsCommand"], self.config["NThreads"]]
self.job_string = job_string
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
38 changes: 0 additions & 38 deletions tests/test_batch_manager.py

This file was deleted.

20 changes: 19 additions & 1 deletion tests/test_job_manager.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import pytest
from clpipe.job_manager import *

SLURMUNCCONFIG: str = "clpipe/batchConfigs/slurmUNCConfigSnakeCase.json"
SLURMUNCCONFIG: str = "tests/data/legacy_batch_configs/slurmUNCConfigSnakeCase.json"


def test_batch_manager_instantiation(scatch_dir):
Expand All @@ -21,6 +21,24 @@ def test_batch_manager_instantiation(scatch_dir):

assert len(batch_manager.job_queue) == 0

def test_batch_manager_instantiation_dataclass(scatch_dir):
batch_config = BatchManagerConfig.from_default("unc")
batch_manager = JobManagerFactory.get(
batch_config=batch_config, output_directory=scatch_dir
)
assert isinstance(batch_manager, BatchJobManager)

batch_manager.add_job(1, "echo hi")
assert len(batch_manager.job_queue) == 1

batch_manager.add_job(2, "echo test")
assert len(batch_manager.job_queue) == 2

batch_manager.print_jobs()
batch_manager.submit_jobs()

assert len(batch_manager.job_queue) == 0


def test_local_manager_instantiation(scatch_dir):
local_manager = JobManagerFactory.get(output_directory=scatch_dir)
Expand Down

0 comments on commit 3f6f71e

Please sign in to comment.