Skip to content

Commit

Permalink
ENH Separate out submission command creation from to minimize repeati…
Browse files Browse the repository at this point in the history
…ng code between Executor classes
  • Loading branch information
gadorlhiac committed Apr 17, 2024
1 parent 52cbb94 commit 3795736
Showing 1 changed file with 34 additions and 44 deletions.
78 changes: 34 additions & 44 deletions lute/execution/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,27 @@ def _finalize_task(self, proc: subprocess.Popen) -> None:
"""
...

def _submit_cmd(self, executable_path: str, params: str) -> str:
"""Return a formatted command for launching Task subprocess.
May be overridden by subclasses.
Args:
executable_path (str): Path to the LUTE subprocess script.
params (str): String of formatted command-line arguments.
Returns:
cmd (str): Appropriately formatted command for this Executor.
"""
cmd: str = ""
if __debug__:
cmd = f"python -B {executable_path} {params}"
else:
cmd = f"python -OB {executable_path} {params}"

return cmd

def execute_task(self) -> None:
"""Run the requested Task as a subprocess."""
lute_path: Optional[str] = os.getenv("LUTE_PATH")
Expand All @@ -287,12 +308,7 @@ def execute_task(self) -> None:
config_path: str = self._analysis_desc.task_env["LUTE_CONFIGPATH"]
params: str = f"-c {config_path} -t {self._analysis_desc.task_result.task_name}"

cmd: str = ""
if __debug__:
cmd = f"python -B {executable_path} {params}"
else:
cmd = f"python -OB {executable_path} {params}"

cmd: str = self._submit_cmd(executable_path, params)
proc: subprocess.Popen = self._submit_task(cmd)

while self._task_is_running(proc):
Expand Down Expand Up @@ -503,20 +519,20 @@ class MPIExecutor(Executor):
for the Executor itself.
Methods:
execute_task(): Run the task as a subprocess using `mpirun`.
_submit_cmd: Run the task as a subprocess using `mpirun`.
"""

def execute_task(self) -> None:
"""Run the requested Task as a subprocess."""
lute_path: Optional[str] = os.getenv("LUTE_PATH")
if lute_path is None:
logger.debug("Absolute path to subprocess.py not found.")
lute_path = os.path.abspath(f"{os.path.dirname(__file__)}/../..")
os.environ["LUTE_PATH"] = lute_path
executable_path: str = f"{lute_path}/subprocess_task.py"
config_path: str = self._analysis_desc.task_env["LUTE_CONFIGPATH"]
params: str = f"-c {config_path} -t {self._analysis_desc.task_result.task_name}"
def _submit_cmd(self, executable_path: str, params: str) -> str:
"""Override submission command to use `mpirun`
Args:
executable_path (str): Path to the LUTE subprocess script.
params (str): String of formatted command-line arguments.
Returns:
cmd (str): Appropriately formatted command for this Executor.
"""
py_cmd: str = ""
nprocs: int = max(
int(os.environ.get("SLURM_NPROCS", len(os.sched_getaffinity(0)))) - 1, 1
Expand All @@ -528,30 +544,4 @@ def execute_task(self) -> None:
py_cmd = f"python -OB -u -m mpi4py.run {executable_path} {params}"

cmd: str = f"{mpi_cmd} {py_cmd}"
proc: subprocess.Popen = self._submit_task(cmd)

while self._task_is_running(proc):
self._task_loop(proc)
time.sleep(self._analysis_desc.poll_interval)

os.set_blocking(proc.stdout.fileno(), True)
os.set_blocking(proc.stderr.fileno(), True)

self._finalize_task(proc)
proc.stdout.close()
proc.stderr.close()
proc.wait()
if ret := proc.returncode:
logger.info(f"Task failed with return code: {ret}")
self._analysis_desc.task_result.task_status = TaskStatus.FAILED
elif self._analysis_desc.task_result.task_status == TaskStatus.RUNNING:
# Ret code is 0, no exception was thrown, task forgot to set status
self._analysis_desc.task_result.task_status = TaskStatus.COMPLETED
logger.debug(f"Task did not change from RUNNING status. Assume COMPLETED.")
self._store_configuration()
for comm in self._communicators:
comm.clear_communicator()

if self._analysis_desc.task_result.task_status == TaskStatus.FAILED:
logger.info("Exiting after Task failure. Result recorded.")
sys.exit(-1)
return cmd

0 comments on commit 3795736

Please sign in to comment.