From a64b17466029f9771e65cefd7f730634efa5136f Mon Sep 17 00:00:00 2001 From: rythorpe Date: Mon, 27 Jun 2022 18:18:37 -0400 Subject: [PATCH 01/13] allow MPIBackend to spawn from existing MPI process --- hnn_core/parallel_backends.py | 47 ++++++++++++++++++++++++----------- 1 file changed, 32 insertions(+), 15 deletions(-) diff --git a/hnn_core/parallel_backends.py b/hnn_core/parallel_backends.py index bb7dc63d4..2507f8e4b 100644 --- a/hnn_core/parallel_backends.py +++ b/hnn_core/parallel_backends.py @@ -572,27 +572,37 @@ class MPIBackend(object): attempt to detect number of cores (including hyperthreads) and start parallel simulation over all of them. mpi_cmd : str - The name of the mpi launcher executable. Will use 'mpiexec' - (openmpi) by default. + Applicable only when mpi_comm_spawn is False (default): The name of the + mpi launcher executable. Will use 'mpiexec' (openmpi) by default. + mpi_comm_spawn : bool + Spawns new MPI subprocesses from an existing MPI process instead of + calling mpi_cmd. This allows for the MPI processes and associated + hardware resources to be pre-instantiated e.g. on a computing cluster. + mpi_comm_spawn_info : mpi4py.MPI.Info | None + Appliable only when mpi_comm_spawn is True: an mpi4py.MPI.Info instance + that grants the user control over how MPI configures spawned processes. Attributes ---------- - n_procs : int The number of processes MPI will actually use (spread over cores). If 1 is specified or mpi4py could not be loaded, the simulation will be run with the JoblibBackend mpi_cmd : list of str - The mpi command with number of procs and options to be passed to Popen + The mpi command with number of procs and options to be passed to Popen. + spawn_intercomm : mpi4py.MPI.Intercomm | None + The spawned intercommunicator instance if a new MPI subprocess was + spawned. Otherwise, None. expected_data_length : int Used to check consistency between data that was sent and what MPIBackend received. proc_queue : threading.Queue A Queue object to hold process handles from Popen in a thread-safe way. - There will be a valid process handle present the queue when a MPI - åsimulation is running. + There will be a valid process handle present in the queue when an MPI + simulation is running. """ - def __init__(self, n_procs=None, mpi_cmd='mpiexec'): + def __init__(self, n_procs=None, mpi_cmd='mpiexec', mpi_comm_spawn=False, + mpi_comm_spawn_info=None): self.expected_data_length = 0 self.proc = None self.proc_queue = Queue() @@ -629,23 +639,26 @@ def __init__(self, n_procs=None, mpi_cmd='mpiexec'): warn(f'{packages} not installed. Will run on single processor') self.n_procs = 1 - self.mpi_cmd = mpi_cmd - if self.n_procs == 1: print("Backend will use 1 core. Running simulation without MPI") return else: print("MPI will run over %d processes" % (self.n_procs)) - if hyperthreading: - self.mpi_cmd += ' --use-hwthread-cpus' + if mpi_comm_spawn: + self.mpi_cmd = '' + else: + self.mpi_cmd = mpi_cmd - if oversubscribe: - self.mpi_cmd += ' --oversubscribe' + if hyperthreading: + self.mpi_cmd += ' --use-hwthread-cpus' - self.mpi_cmd += ' -np ' + str(self.n_procs) + if oversubscribe: + self.mpi_cmd += ' --oversubscribe' - self.mpi_cmd += ' nrniv -python -mpi -nobanner ' + \ + self.mpi_cmd += ' -np ' + str(self.n_procs) + ' ' + + self.mpi_cmd += 'nrniv -python -mpi -nobanner ' + \ sys.executable + ' ' + \ os.path.join(os.path.dirname(sys.modules[__name__].__file__), 'mpi_child.py') @@ -657,6 +670,10 @@ def __init__(self, n_procs=None, mpi_cmd='mpiexec'): use_posix = False self.mpi_cmd = shlex.split(self.mpi_cmd, posix=use_posix) + self.mpi_comm_spawn = mpi_comm_spawn + self.mpi_comm_spawn_info = mpi_comm_spawn_info + self.spawn_intercomm = None + def __enter__(self): global _BACKEND From 8c4861440b448c25892b56f4bb49b6e8a52e59f3 Mon Sep 17 00:00:00 2001 From: rythorpe Date: Mon, 27 Jun 2022 18:20:41 -0400 Subject: [PATCH 02/13] move n_procs print statements earlier --- hnn_core/parallel_backends.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/hnn_core/parallel_backends.py b/hnn_core/parallel_backends.py index 2507f8e4b..5ab99a778 100644 --- a/hnn_core/parallel_backends.py +++ b/hnn_core/parallel_backends.py @@ -610,8 +610,12 @@ def __init__(self, n_procs=None, mpi_cmd='mpiexec', mpi_comm_spawn=False, n_logical_cores = multiprocessing.cpu_count() if n_procs is None: self.n_procs = n_logical_cores + elif n_procs == 1: + print("Backend will use 1 core. Running simulation without MPI") + return else: self.n_procs = n_procs + print("MPI will run over %d processes" % (self.n_procs)) # did user try to force running on more cores than available? oversubscribe = False @@ -639,12 +643,6 @@ def __init__(self, n_procs=None, mpi_cmd='mpiexec', mpi_comm_spawn=False, warn(f'{packages} not installed. Will run on single processor') self.n_procs = 1 - if self.n_procs == 1: - print("Backend will use 1 core. Running simulation without MPI") - return - else: - print("MPI will run over %d processes" % (self.n_procs)) - if mpi_comm_spawn: self.mpi_cmd = '' else: From 70f33fa50b93d9b61411ccfdeb56412814637fa5 Mon Sep 17 00:00:00 2001 From: rythorpe Date: Mon, 27 Jun 2022 19:48:50 -0400 Subject: [PATCH 03/13] attr mpi_cmd -> command --- hnn_core/parallel_backends.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/hnn_core/parallel_backends.py b/hnn_core/parallel_backends.py index 5ab99a778..af9450566 100644 --- a/hnn_core/parallel_backends.py +++ b/hnn_core/parallel_backends.py @@ -588,8 +588,8 @@ class MPIBackend(object): The number of processes MPI will actually use (spread over cores). If 1 is specified or mpi4py could not be loaded, the simulation will be run with the JoblibBackend - mpi_cmd : list of str - The mpi command with number of procs and options to be passed to Popen. + command : list of str | str + Command to run as subprocess (see subprocess.Popen documentation). spawn_intercomm : mpi4py.MPI.Intercomm | None The spawned intercommunicator instance if a new MPI subprocess was spawned. Otherwise, None. @@ -644,19 +644,19 @@ def __init__(self, n_procs=None, mpi_cmd='mpiexec', mpi_comm_spawn=False, self.n_procs = 1 if mpi_comm_spawn: - self.mpi_cmd = '' + self.command = '' else: - self.mpi_cmd = mpi_cmd + self.command = mpi_cmd if hyperthreading: - self.mpi_cmd += ' --use-hwthread-cpus' + self.command += ' --use-hwthread-cpus' if oversubscribe: - self.mpi_cmd += ' --oversubscribe' + self.command += ' --oversubscribe' - self.mpi_cmd += ' -np ' + str(self.n_procs) + ' ' + self.command += ' -np ' + str(self.n_procs) + ' ' - self.mpi_cmd += 'nrniv -python -mpi -nobanner ' + \ + self.command += 'nrniv -python -mpi -nobanner ' + \ sys.executable + ' ' + \ os.path.join(os.path.dirname(sys.modules[__name__].__file__), 'mpi_child.py') @@ -666,7 +666,7 @@ def __init__(self, n_procs=None, mpi_cmd='mpiexec', mpi_comm_spawn=False, use_posix = True else: use_posix = False - self.mpi_cmd = shlex.split(self.mpi_cmd, posix=use_posix) + self.command = shlex.split(self.command, posix=use_posix) self.mpi_comm_spawn = mpi_comm_spawn self.mpi_comm_spawn_info = mpi_comm_spawn_info @@ -725,7 +725,7 @@ def simulate(self, net, tstop, dt, n_trials, postproc=False): env = _get_mpi_env() self.proc, sim_data = run_subprocess( - command=self.mpi_cmd, obj=[net, tstop, dt, n_trials], timeout=30, + command=self.command, obj=[net, tstop, dt, n_trials], timeout=30, proc_queue=self.proc_queue, env=env, cwd=os.getcwd(), universal_newlines=True) From 5c0a861f9c34c4741fc6ef9d2eb9a61bc4ca6fad Mon Sep 17 00:00:00 2001 From: rythorpe Date: Tue, 28 Jun 2022 00:47:43 -0400 Subject: [PATCH 04/13] spawn MPI processes in MPIBackend.simulate --- hnn_core/parallel_backends.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/hnn_core/parallel_backends.py b/hnn_core/parallel_backends.py index af9450566..2e257652b 100644 --- a/hnn_core/parallel_backends.py +++ b/hnn_core/parallel_backends.py @@ -588,7 +588,7 @@ class MPIBackend(object): The number of processes MPI will actually use (spread over cores). If 1 is specified or mpi4py could not be loaded, the simulation will be run with the JoblibBackend - command : list of str | str + command : list of str Command to run as subprocess (see subprocess.Popen documentation). spawn_intercomm : mpi4py.MPI.Intercomm | None The spawned intercommunicator instance if a new MPI subprocess was @@ -670,7 +670,7 @@ def __init__(self, n_procs=None, mpi_cmd='mpiexec', mpi_comm_spawn=False, self.mpi_comm_spawn = mpi_comm_spawn self.mpi_comm_spawn_info = mpi_comm_spawn_info - self.spawn_intercomm = None + self.spawn_intercomm = None # updated in self.simulate def __enter__(self): global _BACKEND @@ -719,10 +719,18 @@ def simulate(self, net, tstop, dt, n_trials, postproc=False): n_trials=n_trials, postproc=postproc) + from mpi4py import MPI + + env = _get_mpi_env() + print("Running %d trials..." % (n_trials)) dpls = [] - env = _get_mpi_env() + if self.mpi_comm_spawn: + subcomm = MPI.COMM_SELF.Spawn('nrniv', args=self.command, + info=self.mpi_comm_spawn_info, + maxprocs=self.n_procs) + self.spawn_intercomm = subcomm self.proc, sim_data = run_subprocess( command=self.command, obj=[net, tstop, dt, n_trials], timeout=30, From 4adb015cefc8c8794fe85e0b4947762129d3a6ab Mon Sep 17 00:00:00 2001 From: rythorpe Date: Thu, 30 Jun 2022 18:00:32 -0400 Subject: [PATCH 05/13] try spawning MPI process in mpi_child.py --- hnn_core/mpi_child.py | 26 ++++++++++++++++++++------ hnn_core/parallel_backends.py | 31 +++++++++++++++++++++---------- 2 files changed, 41 insertions(+), 16 deletions(-) diff --git a/hnn_core/mpi_child.py b/hnn_core/mpi_child.py index fc0da385a..51caf0267 100644 --- a/hnn_core/mpi_child.py +++ b/hnn_core/mpi_child.py @@ -8,6 +8,7 @@ import pickle import base64 import re +from os import environ from hnn_core.parallel_backends import _extract_data, _extract_data_length @@ -147,12 +148,25 @@ def run(self, net, tstop, dt, n_trials): rc = 0 try: - with MPISimulation() as mpi_sim: - # XXX: _read_net -> _read_obj, fix later - net, tstop, dt, n_trials = mpi_sim._read_net() - sim_data = mpi_sim.run(net, tstop, dt, n_trials) - mpi_sim._write_data_stderr(sim_data) - mpi_sim._wait_for_exit_signal() + if 'HNN_CORE_MPI_COMM_SPAWN' in environ: + spawn = bool(environ['HNN_CORE_MPI_COMM_SPAWN']) + cmd = environ['HNN_CORE_SPAWN_CMD'] + n_procs = int(environ['HNN_CORE_SPAWN_N_PROCS']) + if spawn: + from mpi4py import MPI + + environ['HNN_CORE_MPI_COMM_SPAWN'] = '0' + subcomm = MPI.COMM_SELF.Spawn('nrniv', args=cmd, + info=self.mpi_comm_spawn_info, + maxprocs=n_procs) + else: + + with MPISimulation() as mpi_sim: + # XXX: _read_net -> _read_obj, fix later + net, tstop, dt, n_trials = mpi_sim._read_net() + sim_data = mpi_sim.run(net, tstop, dt, n_trials) + mpi_sim._write_data_stderr(sim_data) + mpi_sim._wait_for_exit_signal() except Exception: # This can be useful to indicate the problem to the # caller (in parallel_backends.py) diff --git a/hnn_core/parallel_backends.py b/hnn_core/parallel_backends.py index 2e257652b..a0ece72a1 100644 --- a/hnn_core/parallel_backends.py +++ b/hnn_core/parallel_backends.py @@ -711,6 +711,14 @@ def simulate(self, net, tstop, dt, n_trials, postproc=False): dpl: list of Dipole The Dipole results from each simulation trial """ + from mpi4py import MPI + + # allow only the root rank to run a simulation if the user created it + # in parallel via a parent MPI process + # XXX this is a failsafe: ideally, the user will control for this by + # only running the simulation on a single rank + if self.mpi_comm_spawn and MPI.COMM_WORLD.Get_rank() != 0: + return list() # just use the joblib backend for a single core if self.n_procs == 1: @@ -719,21 +727,24 @@ def simulate(self, net, tstop, dt, n_trials, postproc=False): n_trials=n_trials, postproc=postproc) - from mpi4py import MPI - env = _get_mpi_env() + command = self.command + if self.mpi_comm_spawn: + # add new env variable that can be accessed by mpi_child.py once + # started by Popen + mpi_child_fname = os.path.join( + os.path.dirname(sys.modules[__name__].__file__), + 'mpi_child.py') + command = [sys.executable, mpi_child_fname] + env['HNN_CORE_MPI_COMM_SPAWN'] = '1' + env['HNN_CORE_SPAWN_CMD'] = self.command + env['HNN_CORE_SPAWN_INFO'] = print("Running %d trials..." % (n_trials)) - dpls = [] - - if self.mpi_comm_spawn: - subcomm = MPI.COMM_SELF.Spawn('nrniv', args=self.command, - info=self.mpi_comm_spawn_info, - maxprocs=self.n_procs) - self.spawn_intercomm = subcomm + dpls = list() self.proc, sim_data = run_subprocess( - command=self.command, obj=[net, tstop, dt, n_trials], timeout=30, + command=command, obj=[net, tstop, dt, n_trials], timeout=30, proc_queue=self.proc_queue, env=env, cwd=os.getcwd(), universal_newlines=True) From 20b7223c0d5d048c1baa06d265cb6933950187a4 Mon Sep 17 00:00:00 2001 From: Ryan Thorpe Date: Fri, 1 Jul 2022 15:40:53 -0400 Subject: [PATCH 06/13] pass spawn params to mpi_child.py via env vars --- hnn_core/mpi_child.py | 46 ++++++++++++++++++-------- hnn_core/parallel_backends.py | 61 +++++++++++++++++------------------ 2 files changed, 62 insertions(+), 45 deletions(-) diff --git a/hnn_core/mpi_child.py b/hnn_core/mpi_child.py index 51caf0267..98ba9be0c 100644 --- a/hnn_core/mpi_child.py +++ b/hnn_core/mpi_child.py @@ -3,12 +3,15 @@ """ # Authors: Blake Caldwell +# Ryan Thorpe import sys import pickle import base64 import re +import shlex from os import environ +from mpi4py import MPI from hnn_core.parallel_backends import _extract_data, _extract_data_length @@ -73,7 +76,7 @@ def __exit__(self, type, value, traceback): MPI.Finalize() def _read_net(self): - """Read net broadcasted to all ranks on stdin""" + """Read net and associated objects broadcasted to all ranks on stdin""" # read Network from stdin if self.rank == 0: @@ -148,25 +151,40 @@ def run(self, net, tstop, dt, n_trials): rc = 0 try: - if 'HNN_CORE_MPI_COMM_SPAWN' in environ: - spawn = bool(environ['HNN_CORE_MPI_COMM_SPAWN']) - cmd = environ['HNN_CORE_SPAWN_CMD'] - n_procs = int(environ['HNN_CORE_SPAWN_N_PROCS']) - if spawn: - from mpi4py import MPI - + try: + if bool(environ['HNN_CORE_MPI_COMM_SPAWN']): + cmd = environ['HNN_CORE_SPAWN_CMD'] + # Split the command into shell arguments for passing to Popen + if 'win' in sys.platform: + use_posix = True + else: + use_posix = False + cmd = shlex.split(cmd, posix=use_posix) + + n_procs = int(environ['HNN_CORE_SPAWN_N_PROCS']) + info = environ['HNN_CORE_SPAWN_INFO'] + + # important: update MPI_COMM_SPAWN env var so that it can call + # mpi_child.py again without spawning its own child MPI process environ['HNN_CORE_MPI_COMM_SPAWN'] = '0' - subcomm = MPI.COMM_SELF.Spawn('nrniv', args=cmd, - info=self.mpi_comm_spawn_info, - maxprocs=n_procs) - else: - + + if not info: + subcomm = MPI.COMM_SELF.Spawn('nrniv', args=cmd, + maxprocs=n_procs) + else: + subcomm = MPI.COMM_SELF.Spawn('nrniv', args=cmd, + info=info, + maxprocs=n_procs) + else: + raise KeyError # trigger exception where the simulation is run + + except KeyError: with MPISimulation() as mpi_sim: - # XXX: _read_net -> _read_obj, fix later net, tstop, dt, n_trials = mpi_sim._read_net() sim_data = mpi_sim.run(net, tstop, dt, n_trials) mpi_sim._write_data_stderr(sim_data) mpi_sim._wait_for_exit_signal() + except Exception: # This can be useful to indicate the problem to the # caller (in parallel_backends.py) diff --git a/hnn_core/parallel_backends.py b/hnn_core/parallel_backends.py index a0ece72a1..b44d0ae59 100644 --- a/hnn_core/parallel_backends.py +++ b/hnn_core/parallel_backends.py @@ -590,9 +590,6 @@ class MPIBackend(object): with the JoblibBackend command : list of str Command to run as subprocess (see subprocess.Popen documentation). - spawn_intercomm : mpi4py.MPI.Intercomm | None - The spawned intercommunicator instance if a new MPI subprocess was - spawned. Otherwise, None. expected_data_length : int Used to check consistency between data that was sent and what MPIBackend received. @@ -606,6 +603,8 @@ def __init__(self, n_procs=None, mpi_cmd='mpiexec', mpi_comm_spawn=False, self.expected_data_length = 0 self.proc = None self.proc_queue = Queue() + self._intracomm = None + self._child_intercomm = None n_logical_cores = multiprocessing.cpu_count() if n_procs is None: @@ -643,34 +642,42 @@ def __init__(self, n_procs=None, mpi_cmd='mpiexec', mpi_comm_spawn=False, warn(f'{packages} not installed. Will run on single processor') self.n_procs = 1 - if mpi_comm_spawn: - self.command = '' - else: - self.command = mpi_cmd + self.command = 'nrniv -python -mpi -nobanner ' + \ + sys.executable + ' ' + \ + os.path.join(os.path.dirname(sys.modules[__name__].__file__), + 'mpi_child.py') + + if mpi_comm_spawn and _has_mpi4py(): + from mpi4py import MPI + + self._intracomm = MPI.COMM_WORLD + #self._child_intercomm = MPI.COMM_SELF + if self._intracomm.Get_rank() != 0: + raise RuntimeError('MPI is attempting to spawn multiple ' + 'child subprocesses. Make sure only one ' + 'parent MPI process is running when ' + '"mpi_comm_spawn" is True.') + else: if hyperthreading: - self.command += ' --use-hwthread-cpus' + self.command = '--use-hwthread-cpus ' + self.command if oversubscribe: - self.command += ' --oversubscribe' - - self.command += ' -np ' + str(self.n_procs) + ' ' + self.command = '--oversubscribe ' + self.command - self.command += 'nrniv -python -mpi -nobanner ' + \ - sys.executable + ' ' + \ - os.path.join(os.path.dirname(sys.modules[__name__].__file__), - 'mpi_child.py') + self.command = mpi_cmd \ + + ' -np ' + str(self.n_procs) + ' ' \ + + self.command - # Split the command into shell arguments for passing to Popen - if 'win' in sys.platform: - use_posix = True - else: - use_posix = False - self.command = shlex.split(self.command, posix=use_posix) + # Split the command into shell arguments for passing to Popen + if 'win' in sys.platform: + use_posix = True + else: + use_posix = False + self.command = shlex.split(self.command, posix=use_posix) self.mpi_comm_spawn = mpi_comm_spawn self.mpi_comm_spawn_info = mpi_comm_spawn_info - self.spawn_intercomm = None # updated in self.simulate def __enter__(self): global _BACKEND @@ -711,14 +718,6 @@ def simulate(self, net, tstop, dt, n_trials, postproc=False): dpl: list of Dipole The Dipole results from each simulation trial """ - from mpi4py import MPI - - # allow only the root rank to run a simulation if the user created it - # in parallel via a parent MPI process - # XXX this is a failsafe: ideally, the user will control for this by - # only running the simulation on a single rank - if self.mpi_comm_spawn and MPI.COMM_WORLD.Get_rank() != 0: - return list() # just use the joblib backend for a single core if self.n_procs == 1: @@ -738,7 +737,7 @@ def simulate(self, net, tstop, dt, n_trials, postproc=False): command = [sys.executable, mpi_child_fname] env['HNN_CORE_MPI_COMM_SPAWN'] = '1' env['HNN_CORE_SPAWN_CMD'] = self.command - env['HNN_CORE_SPAWN_INFO'] = + env['HNN_CORE_SPAWN_INFO'] = '' print("Running %d trials..." % (n_trials)) dpls = list() From 27636654be66f587d596cbd324ef4f0b5bdf0196 Mon Sep 17 00:00:00 2001 From: Ryan Thorpe Date: Fri, 1 Jul 2022 16:57:34 -0400 Subject: [PATCH 07/13] new direction: create whole separate method for generating a subprocess via spawned MPI process --- hnn_core/mpi_child.py | 38 ++------------ hnn_core/mpi_comm_spawn_child.py | 35 +++++++++++++ hnn_core/parallel_backends.py | 88 +++++++++++++++++++------------- 3 files changed, 93 insertions(+), 68 deletions(-) create mode 100644 hnn_core/mpi_comm_spawn_child.py diff --git a/hnn_core/mpi_child.py b/hnn_core/mpi_child.py index 98ba9be0c..e87e4ced4 100644 --- a/hnn_core/mpi_child.py +++ b/hnn_core/mpi_child.py @@ -151,39 +151,11 @@ def run(self, net, tstop, dt, n_trials): rc = 0 try: - try: - if bool(environ['HNN_CORE_MPI_COMM_SPAWN']): - cmd = environ['HNN_CORE_SPAWN_CMD'] - # Split the command into shell arguments for passing to Popen - if 'win' in sys.platform: - use_posix = True - else: - use_posix = False - cmd = shlex.split(cmd, posix=use_posix) - - n_procs = int(environ['HNN_CORE_SPAWN_N_PROCS']) - info = environ['HNN_CORE_SPAWN_INFO'] - - # important: update MPI_COMM_SPAWN env var so that it can call - # mpi_child.py again without spawning its own child MPI process - environ['HNN_CORE_MPI_COMM_SPAWN'] = '0' - - if not info: - subcomm = MPI.COMM_SELF.Spawn('nrniv', args=cmd, - maxprocs=n_procs) - else: - subcomm = MPI.COMM_SELF.Spawn('nrniv', args=cmd, - info=info, - maxprocs=n_procs) - else: - raise KeyError # trigger exception where the simulation is run - - except KeyError: - with MPISimulation() as mpi_sim: - net, tstop, dt, n_trials = mpi_sim._read_net() - sim_data = mpi_sim.run(net, tstop, dt, n_trials) - mpi_sim._write_data_stderr(sim_data) - mpi_sim._wait_for_exit_signal() + with MPISimulation() as mpi_sim: + net, tstop, dt, n_trials = mpi_sim._read_net() + sim_data = mpi_sim.run(net, tstop, dt, n_trials) + mpi_sim._write_data_stderr(sim_data) + mpi_sim._wait_for_exit_signal() except Exception: # This can be useful to indicate the problem to the diff --git a/hnn_core/mpi_comm_spawn_child.py b/hnn_core/mpi_comm_spawn_child.py new file mode 100644 index 000000000..95e245b5a --- /dev/null +++ b/hnn_core/mpi_comm_spawn_child.py @@ -0,0 +1,35 @@ + + + +class MPISimulation(object): + """The MPISimulation class. + Parameters + ---------- + skip_mpi_import : bool | None + Skip importing MPI. Only useful for testing with pytest. + + Attributes + ---------- + comm : mpi4py.Comm object + The handle used for communicating among MPI processes + rank : int + The rank for each processor part of the MPI communicator + """ + def __init__(self, skip_mpi_import=False): + self.skip_mpi_import = skip_mpi_import + if skip_mpi_import: + self.rank = 0 + else: + from mpi4py import MPI + + self.comm = MPI.COMM_WORLD + self.rank = self.comm.Get_rank() + + def __enter__(self): + return self + + def __exit__(self, type, value, traceback): + # skip Finalize() if we didn't import MPI on __init__ + if hasattr(self, 'comm'): + from mpi4py import MPI + MPI.Finalize() \ No newline at end of file diff --git a/hnn_core/parallel_backends.py b/hnn_core/parallel_backends.py index b44d0ae59..06f9edfce 100644 --- a/hnn_core/parallel_backends.py +++ b/hnn_core/parallel_backends.py @@ -37,7 +37,7 @@ def _gather_trial_data(sim_data, net, n_trials, postproc): To be called after simulate(). Returns list of Dipoles, one for each trial, and saves spiking info in net (instance of Network). """ - dpls = [] + dpls = list() # Create array of equally sampled time points for simulating currents cell_type_names = list(net.cell_types.keys()) @@ -92,6 +92,20 @@ def _get_mpi_env(): return my_env +def spawn_subprocess(parent_comm, command, n_procs, info=None): + """Spawn child simulation processes from an existing MPI process""" + + if info is None: + subcomm = parent_comm.Spawn('nrniv', args=command, + maxprocs=n_procs) + else: + subcomm = parent_comm.Spawn('nrniv', args=command, + info=info, + maxprocs=n_procs) + + return subcomm + + def run_subprocess(command, obj, timeout, proc_queue=None, *args, **kwargs): """Run process and communicate with it. Parameters @@ -603,7 +617,10 @@ def __init__(self, n_procs=None, mpi_cmd='mpiexec', mpi_comm_spawn=False, self.expected_data_length = 0 self.proc = None self.proc_queue = Queue() + self.mpi_comm_spawn = mpi_comm_spawn + self.mpi_comm_spawn_info = mpi_comm_spawn_info self._intracomm = None + self._selfcomm = None self._child_intercomm = None n_logical_cores = multiprocessing.cpu_count() @@ -647,37 +664,43 @@ def __init__(self, n_procs=None, mpi_cmd='mpiexec', mpi_comm_spawn=False, os.path.join(os.path.dirname(sys.modules[__name__].__file__), 'mpi_child.py') - if mpi_comm_spawn and _has_mpi4py(): + if self.mpi_comm_spawn and _has_mpi4py(): from mpi4py import MPI self._intracomm = MPI.COMM_WORLD - #self._child_intercomm = MPI.COMM_SELF + self._selfcomm = MPI.COMM_SELF if self._intracomm.Get_rank() != 0: raise RuntimeError('MPI is attempting to spawn multiple ' 'child subprocesses. Make sure only one ' 'parent MPI process is running when ' '"mpi_comm_spawn" is True.') + self.command = 'nrniv -python -mpi -nobanner ' + \ + sys.executable + ' ' + \ + os.path.join(os.path.dirname(sys.modules[__name__].__file__), + 'mpi_comm_spawn_child.py') + else: + self.command = mpi_cmd + \ + ' -np ' + str(self.n_procs) + if hyperthreading: - self.command = '--use-hwthread-cpus ' + self.command + self.command += ' --use-hwthread-cpus' if oversubscribe: - self.command = '--oversubscribe ' + self.command - - self.command = mpi_cmd \ - + ' -np ' + str(self.n_procs) + ' ' \ - + self.command + self.command += ' --oversubscribe' - # Split the command into shell arguments for passing to Popen - if 'win' in sys.platform: - use_posix = True - else: - use_posix = False - self.command = shlex.split(self.command, posix=use_posix) + self.command += ' nrniv -python -mpi -nobanner ' + \ + sys.executable + ' ' + \ + os.path.join(os.path.dirname(sys.modules[__name__].__file__), + 'mpi_child.py') - self.mpi_comm_spawn = mpi_comm_spawn - self.mpi_comm_spawn_info = mpi_comm_spawn_info + # Split the command into shell arguments for passing to Popen + if 'win' in sys.platform: + use_posix = True + else: + use_posix = False + self.command = shlex.split(self.command, posix=use_posix) def __enter__(self): global _BACKEND @@ -726,26 +749,21 @@ def simulate(self, net, tstop, dt, n_trials, postproc=False): n_trials=n_trials, postproc=postproc) - env = _get_mpi_env() - command = self.command - if self.mpi_comm_spawn: - # add new env variable that can be accessed by mpi_child.py once - # started by Popen - mpi_child_fname = os.path.join( - os.path.dirname(sys.modules[__name__].__file__), - 'mpi_child.py') - command = [sys.executable, mpi_child_fname] - env['HNN_CORE_MPI_COMM_SPAWN'] = '1' - env['HNN_CORE_SPAWN_CMD'] = self.command - env['HNN_CORE_SPAWN_INFO'] = '' - print("Running %d trials..." % (n_trials)) - dpls = list() - self.proc, sim_data = run_subprocess( - command=command, obj=[net, tstop, dt, n_trials], timeout=30, - proc_queue=self.proc_queue, env=env, cwd=os.getcwd(), - universal_newlines=True) + if self.mpi_comm_spawn: + self._child_intercomm = spawn_subprocess( + parent_comm=self._selfcomm, + command=self.command, + n_procs=self.n_procs, + info=self.mpi_comm_spawn_info) + + else: + env = _get_mpi_env() + self.proc, sim_data = run_subprocess( + command=self.command, obj=[net, tstop, dt, n_trials], timeout=30, + proc_queue=self.proc_queue, env=env, cwd=os.getcwd(), + universal_newlines=True) dpls = _gather_trial_data(sim_data, net, n_trials, postproc) return dpls From 18a6563aecca58bcb8054bfdfe1c62877971e0b0 Mon Sep 17 00:00:00 2001 From: rythorpe Date: Sat, 16 Jul 2022 23:21:38 -0400 Subject: [PATCH 08/13] outline API logic in MPIBackend tutorial --- examples/howto/plot_simulate_mpi_backend.py | 7 +++++-- hnn_core/parallel_backends.py | 7 +++---- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/examples/howto/plot_simulate_mpi_backend.py b/examples/howto/plot_simulate_mpi_backend.py index bb7320e65..bb60608f5 100644 --- a/examples/howto/plot_simulate_mpi_backend.py +++ b/examples/howto/plot_simulate_mpi_backend.py @@ -47,8 +47,11 @@ # ``openmpi``, which must be installed on the system from hnn_core import MPIBackend -with MPIBackend(n_procs=2, mpi_cmd='mpiexec'): - dpls = simulate_dipole(net, tstop=310., n_trials=1) +# to create a parent MPI job that uses MPIBackend to spawn child jobs, set +# mpi_comm_spawn=True and call this script from terminal using +# $ mpiexec -np 1 --oversubscribe python -m mpi4py /home/ryan/hnn-core/examples/howto/plot_simulate_mpi_backend.py +with MPIBackend(n_procs=5, mpi_cmd='mpiexec', mpi_comm_spawn=False): + dpls = simulate_dipole(net, tstop=200., n_trials=1) trial_idx = 0 dpls[trial_idx].plot() diff --git a/hnn_core/parallel_backends.py b/hnn_core/parallel_backends.py index 06f9edfce..d92f0b041 100644 --- a/hnn_core/parallel_backends.py +++ b/hnn_core/parallel_backends.py @@ -670,10 +670,9 @@ def __init__(self, n_procs=None, mpi_cmd='mpiexec', mpi_comm_spawn=False, self._intracomm = MPI.COMM_WORLD self._selfcomm = MPI.COMM_SELF if self._intracomm.Get_rank() != 0: - raise RuntimeError('MPI is attempting to spawn multiple ' - 'child subprocesses. Make sure only one ' - 'parent MPI process is running when ' - '"mpi_comm_spawn" is True.') + raise RuntimeError('MPI is attempting to spawn multiple child ' + 'jobs. Make sure only one parent MPI job ' + 'is running when "mpi_comm_spawn" is True.') self.command = 'nrniv -python -mpi -nobanner ' + \ sys.executable + ' ' + \ From ec728b4ea7f223a362461673d35e172bab3d0c55 Mon Sep 17 00:00:00 2001 From: rythorpe Date: Sat, 16 Jul 2022 23:33:03 -0400 Subject: [PATCH 09/13] adjust MPI job terminology in function name and docstrings --- hnn_core/parallel_backends.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/hnn_core/parallel_backends.py b/hnn_core/parallel_backends.py index d92f0b041..99318ba79 100644 --- a/hnn_core/parallel_backends.py +++ b/hnn_core/parallel_backends.py @@ -92,8 +92,8 @@ def _get_mpi_env(): return my_env -def spawn_subprocess(parent_comm, command, n_procs, info=None): - """Spawn child simulation processes from an existing MPI process""" +def spawn_job(parent_comm, command, n_procs, info=None): + """Spawn child simulation job from an existing MPI communicator""" if info is None: subcomm = parent_comm.Spawn('nrniv', args=command, @@ -589,12 +589,12 @@ class MPIBackend(object): Applicable only when mpi_comm_spawn is False (default): The name of the mpi launcher executable. Will use 'mpiexec' (openmpi) by default. mpi_comm_spawn : bool - Spawns new MPI subprocesses from an existing MPI process instead of - calling mpi_cmd. This allows for the MPI processes and associated + Spawns new MPI jobs from an existing MPI communicator instead of + calling mpi_cmd. This allows for the parent MPI job and associated hardware resources to be pre-instantiated e.g. on a computing cluster. mpi_comm_spawn_info : mpi4py.MPI.Info | None Appliable only when mpi_comm_spawn is True: an mpi4py.MPI.Info instance - that grants the user control over how MPI configures spawned processes. + that grants the user control over how openMPI configures spawned jobs. Attributes ---------- @@ -751,7 +751,7 @@ def simulate(self, net, tstop, dt, n_trials, postproc=False): print("Running %d trials..." % (n_trials)) if self.mpi_comm_spawn: - self._child_intercomm = spawn_subprocess( + self._child_intercomm = spawn_job( parent_comm=self._selfcomm, command=self.command, n_procs=self.n_procs, From 283847d1dc754a10993d7df117168b3bb228b95d Mon Sep 17 00:00:00 2001 From: rythorpe Date: Sun, 17 Jul 2022 02:06:51 -0400 Subject: [PATCH 10/13] work on spawn_job --- hnn_core/parallel_backends.py | 71 ++++++++++++++++++++++++----------- 1 file changed, 50 insertions(+), 21 deletions(-) diff --git a/hnn_core/parallel_backends.py b/hnn_core/parallel_backends.py index 99318ba79..3f1394efd 100644 --- a/hnn_core/parallel_backends.py +++ b/hnn_core/parallel_backends.py @@ -92,22 +92,54 @@ def _get_mpi_env(): return my_env -def spawn_job(parent_comm, command, n_procs, info=None): - """Spawn child simulation job from an existing MPI communicator""" +def spawn_job(command, obj, n_procs, info=None): + """Spawn child simulation jobs from an existing MPI communicator + + Parameters + ---------- + command : list of str | str + Command to spawn job from new MPI communicator. + obj : object + The object containing network and other simulation-specific information + to broadcast to each child process via MPI. + n_procs : int + The number of MPI processes to parallelize (by cell, see Neuron + documentation) the simulation over. + info: mpi4py.MPI.Info | None + Appliable only when mpi_comm_spawn is True: an mpi4py.MPI.Info instance + that grants the user control over how openMPI configures spawned jobs. + + Returns + ------- + child_data : object + The data returned by the child process. + """ + + from mpi4py import MPI if info is None: - subcomm = parent_comm.Spawn('nrniv', args=command, - maxprocs=n_procs) + intercomm = MPI.COMM_SELF.Spawn('nrniv', args=command, + maxprocs=n_procs) else: - subcomm = parent_comm.Spawn('nrniv', args=command, - info=info, - maxprocs=n_procs) + intercomm = MPI.COMM_SELF.Spawn('nrniv', args=command, + info=info, + maxprocs=n_procs) - return subcomm + # send Network + sim param objects to each child process + intercomm.bcast(obj, root=MPI.ROOT) + + # receive data + child_data = intercomm.recv(source=0) + + # close spawned communicator + intercomm.Disconnect() + + return child_data def run_subprocess(command, obj, timeout, proc_queue=None, *args, **kwargs): - """Run process and communicate with it. + """Run simulation process and communicate with it. + Parameters ---------- command : list of str | str @@ -119,6 +151,7 @@ def run_subprocess(command, obj, timeout, proc_queue=None, *args, **kwargs): The number of seconds to wait for a process without output. *args, **kwargs : arguments Additional arguments to pass to subprocess.Popen. + Returns ------- child_data : object @@ -621,7 +654,7 @@ def __init__(self, n_procs=None, mpi_cmd='mpiexec', mpi_comm_spawn=False, self.mpi_comm_spawn_info = mpi_comm_spawn_info self._intracomm = None self._selfcomm = None - self._child_intercomm = None + self._intercomm = None n_logical_cores = multiprocessing.cpu_count() if n_procs is None: @@ -641,6 +674,7 @@ def __init__(self, n_procs=None, mpi_cmd='mpiexec', mpi_comm_spawn=False, hyperthreading = False if _has_mpi4py() and _has_psutil(): + from mpi4py import MPI import psutil n_physical_cores = psutil.cpu_count(logical=False) @@ -659,14 +693,7 @@ def __init__(self, n_procs=None, mpi_cmd='mpiexec', mpi_comm_spawn=False, warn(f'{packages} not installed. Will run on single processor') self.n_procs = 1 - self.command = 'nrniv -python -mpi -nobanner ' + \ - sys.executable + ' ' + \ - os.path.join(os.path.dirname(sys.modules[__name__].__file__), - 'mpi_child.py') - - if self.mpi_comm_spawn and _has_mpi4py(): - from mpi4py import MPI - + if self.mpi_comm_spawn: self._intracomm = MPI.COMM_WORLD self._selfcomm = MPI.COMM_SELF if self._intracomm.Get_rank() != 0: @@ -750,17 +777,19 @@ def simulate(self, net, tstop, dt, n_trials, postproc=False): print("Running %d trials..." % (n_trials)) + sim_objs = [net, tstop, dt, n_trials] + if self.mpi_comm_spawn: - self._child_intercomm = spawn_job( + sim_data = spawn_job( parent_comm=self._selfcomm, command=self.command, + obj=sim_objs, n_procs=self.n_procs, info=self.mpi_comm_spawn_info) - else: env = _get_mpi_env() self.proc, sim_data = run_subprocess( - command=self.command, obj=[net, tstop, dt, n_trials], timeout=30, + command=self.command, obj=sim_objs, timeout=30, proc_queue=self.proc_queue, env=env, cwd=os.getcwd(), universal_newlines=True) From c616558470963e90253d72b349cbc0dc5cc8e2eb Mon Sep 17 00:00:00 2001 From: rythorpe Date: Sun, 17 Jul 2022 17:36:58 -0400 Subject: [PATCH 11/13] communicate with mpi_comm_spawn_child --- examples/howto/plot_simulate_mpi_backend.py | 4 +-- hnn_core/mpi_child.py | 3 -- hnn_core/mpi_comm_spawn_child.py | 40 +++++++++++++++++++-- hnn_core/parallel_backends.py | 11 +++--- 4 files changed, 45 insertions(+), 13 deletions(-) diff --git a/examples/howto/plot_simulate_mpi_backend.py b/examples/howto/plot_simulate_mpi_backend.py index bb60608f5..84a3d3cbb 100644 --- a/examples/howto/plot_simulate_mpi_backend.py +++ b/examples/howto/plot_simulate_mpi_backend.py @@ -49,8 +49,8 @@ # to create a parent MPI job that uses MPIBackend to spawn child jobs, set # mpi_comm_spawn=True and call this script from terminal using -# $ mpiexec -np 1 --oversubscribe python -m mpi4py /home/ryan/hnn-core/examples/howto/plot_simulate_mpi_backend.py -with MPIBackend(n_procs=5, mpi_cmd='mpiexec', mpi_comm_spawn=False): +# ``$ mpiexec -np 1 --oversubscribe python -m mpi4py /path/to/hnn-core/examples/howto/plot_simulate_mpi_backend.py`` +with MPIBackend(n_procs=5, mpi_cmd='mpiexec', mpi_comm_spawn=True): dpls = simulate_dipole(net, tstop=200., n_trials=1) trial_idx = 0 diff --git a/hnn_core/mpi_child.py b/hnn_core/mpi_child.py index e87e4ced4..3e3f99413 100644 --- a/hnn_core/mpi_child.py +++ b/hnn_core/mpi_child.py @@ -9,9 +9,6 @@ import pickle import base64 import re -import shlex -from os import environ -from mpi4py import MPI from hnn_core.parallel_backends import _extract_data, _extract_data_length diff --git a/hnn_core/mpi_comm_spawn_child.py b/hnn_core/mpi_comm_spawn_child.py index 95e245b5a..3444de894 100644 --- a/hnn_core/mpi_comm_spawn_child.py +++ b/hnn_core/mpi_comm_spawn_child.py @@ -1,8 +1,16 @@ +"""Script for running parallel simulations with MPI when called with mpiexec. +This script is called directly from MPIBackend.simulate() +""" +# Authors: Blake Caldwell +# Ryan Thorpe + +from hnn_core.network_builder import _simulate_single_trial class MPISimulation(object): """The MPISimulation class. + Parameters ---------- skip_mpi_import : bool | None @@ -22,6 +30,7 @@ def __init__(self, skip_mpi_import=False): else: from mpi4py import MPI + self.intercomm = MPI.Comm.Get_parent() self.comm = MPI.COMM_WORLD self.rank = self.comm.Get_rank() @@ -31,5 +40,32 @@ def __enter__(self): def __exit__(self, type, value, traceback): # skip Finalize() if we didn't import MPI on __init__ if hasattr(self, 'comm'): - from mpi4py import MPI - MPI.Finalize() \ No newline at end of file + self.intercomm.Disconnect() + + def _read_net(self): + """Read net and associated objects broadcasted to all ranks on stdin""" + + return self.intercomm.bcast(None, root=0) + + def run(self, net, tstop, dt, n_trials): + """Run MPI simulation(s) and write results to stderr""" + + sim_data = [] + for trial_idx in range(n_trials): + single_sim_data = _simulate_single_trial(net, tstop, dt, trial_idx) + + # go ahead and append trial data for each rank, though + # only rank 0 has data that should be sent back to MPIBackend + sim_data.append(single_sim_data) + + return sim_data + + +if __name__ == '__main__': + """This file is called on command-line from nrniv""" + + with MPISimulation() as mpi_sim: + net, tstop, dt, n_trials = mpi_sim._read_net() + sim_data = mpi_sim.run(net, tstop, dt, n_trials) + if mpi_sim.rank == 0: + mpi_sim.intercomm.send(sim_data, dest=0) diff --git a/hnn_core/parallel_backends.py b/hnn_core/parallel_backends.py index 3f1394efd..fab6e1581 100644 --- a/hnn_core/parallel_backends.py +++ b/hnn_core/parallel_backends.py @@ -118,18 +118,18 @@ def spawn_job(command, obj, n_procs, info=None): from mpi4py import MPI if info is None: - intercomm = MPI.COMM_SELF.Spawn('nrniv', args=command, + intercomm = MPI.COMM_SELF.Spawn(command[0], args=command[1:], maxprocs=n_procs) else: - intercomm = MPI.COMM_SELF.Spawn('nrniv', args=command, + intercomm = MPI.COMM_SELF.Spawn(command[0], args=command[1:], info=info, maxprocs=n_procs) - + # send Network + sim param objects to each child process intercomm.bcast(obj, root=MPI.ROOT) # receive data - child_data = intercomm.recv(source=0) + child_data = intercomm.recv(source=MPI.ANY_SOURCE) # close spawned communicator intercomm.Disconnect() @@ -742,7 +742,7 @@ def __exit__(self, type, value, traceback): _BACKEND = self._old_backend # always kill nrniv processes for good measure - if self.n_procs > 1: + if (not self.mpi_comm_spawn) and self.n_procs > 1: kill_proc_name('nrniv') def simulate(self, net, tstop, dt, n_trials, postproc=False): @@ -781,7 +781,6 @@ def simulate(self, net, tstop, dt, n_trials, postproc=False): if self.mpi_comm_spawn: sim_data = spawn_job( - parent_comm=self._selfcomm, command=self.command, obj=sim_objs, n_procs=self.n_procs, From f1b5e3f6851bc9027beaa80cbfc36d043c43ec65 Mon Sep 17 00:00:00 2001 From: rythorpe Date: Mon, 25 Jul 2022 02:41:54 -0400 Subject: [PATCH 12/13] catch any error emerging from child process to prevent deadlock --- hnn_core/mpi_comm_spawn_child.py | 29 ++++++++++++++++------------- hnn_core/parallel_backends.py | 10 ++++++++++ 2 files changed, 26 insertions(+), 13 deletions(-) diff --git a/hnn_core/mpi_comm_spawn_child.py b/hnn_core/mpi_comm_spawn_child.py index 3444de894..e98b41fb6 100644 --- a/hnn_core/mpi_comm_spawn_child.py +++ b/hnn_core/mpi_comm_spawn_child.py @@ -5,6 +5,7 @@ # Authors: Blake Caldwell # Ryan Thorpe +from mpi4py import MPI from hnn_core.network_builder import _simulate_single_trial @@ -23,16 +24,10 @@ class MPISimulation(object): rank : int The rank for each processor part of the MPI communicator """ - def __init__(self, skip_mpi_import=False): - self.skip_mpi_import = skip_mpi_import - if skip_mpi_import: - self.rank = 0 - else: - from mpi4py import MPI - - self.intercomm = MPI.Comm.Get_parent() - self.comm = MPI.COMM_WORLD - self.rank = self.comm.Get_rank() + def __init__(self): + self.intercomm = MPI.Comm.Get_parent() + self.comm = MPI.COMM_WORLD + self.rank = self.comm.Get_rank() def __enter__(self): return self @@ -66,6 +61,14 @@ def run(self, net, tstop, dt, n_trials): with MPISimulation() as mpi_sim: net, tstop, dt, n_trials = mpi_sim._read_net() - sim_data = mpi_sim.run(net, tstop, dt, n_trials) - if mpi_sim.rank == 0: - mpi_sim.intercomm.send(sim_data, dest=0) + + try: + sim_data = mpi_sim.run(net, tstop, dt, n_trials) + except: + err_occured = True + else: + err_occured = False + finally: + mpi_sim.intercomm.allreduce(err_occured, op=MPI.LAND) + if mpi_sim.rank == 0: + mpi_sim.intercomm.send(sim_data, dest=0) diff --git a/hnn_core/parallel_backends.py b/hnn_core/parallel_backends.py index fab6e1581..d8d5d46ec 100644 --- a/hnn_core/parallel_backends.py +++ b/hnn_core/parallel_backends.py @@ -128,6 +128,16 @@ def spawn_job(command, obj, n_procs, info=None): # send Network + sim param objects to each child process intercomm.bcast(obj, root=MPI.ROOT) + # all child processes must complete without an error; if not, prevent + # the parent from deadlocking with a blocking recv call + # (note that the traceback cannot be accessed from here or passed to + # the parent) + err_in_children = intercomm.allreduce(None, op=MPI.LAND) + if err_in_children: + raise RuntimeError('An error occured in a spawned MPI child process. ' + 'Consider debugging with MPIBackend when ' + 'mpi_comm_spawn=False.') + # receive data child_data = intercomm.recv(source=MPI.ANY_SOURCE) From 99c7bafe30c489a060246ecee66e2e37f30ab33f Mon Sep 17 00:00:00 2001 From: rythorpe Date: Mon, 25 Jul 2022 03:01:40 -0400 Subject: [PATCH 13/13] clean up parallel_backend.py --- hnn_core/parallel_backends.py | 32 +++++++++++++------------------- 1 file changed, 13 insertions(+), 19 deletions(-) diff --git a/hnn_core/parallel_backends.py b/hnn_core/parallel_backends.py index d8d5d46ec..e0bacf782 100644 --- a/hnn_core/parallel_backends.py +++ b/hnn_core/parallel_backends.py @@ -93,12 +93,12 @@ def _get_mpi_env(): def spawn_job(command, obj, n_procs, info=None): - """Spawn child simulation jobs from an existing MPI communicator + """Spawn child simulation jobs from an existing MPI communicator. Parameters ---------- - command : list of str | str - Command to spawn job from new MPI communicator. + command : list of str + Command and program arguments to spawn job from new MPI communicator. obj : object The object containing network and other simulation-specific information to broadcast to each child process via MPI. @@ -129,9 +129,8 @@ def spawn_job(command, obj, n_procs, info=None): intercomm.bcast(obj, root=MPI.ROOT) # all child processes must complete without an error; if not, prevent - # the parent from deadlocking with a blocking recv call - # (note that the traceback cannot be accessed from here or passed to - # the parent) + # the parent from deadlocking with a blocking recv call (note that the + # traceback cannot be accessed from here or passed to the parent) err_in_children = intercomm.allreduce(None, op=MPI.LAND) if err_in_children: raise RuntimeError('An error occured in a spawned MPI child process. ' @@ -152,7 +151,7 @@ def run_subprocess(command, obj, timeout, proc_queue=None, *args, **kwargs): Parameters ---------- - command : list of str | str + command : list of str Command to run as subprocess (see subprocess.Popen documentation). obj : object The object to write to stdin after starting child process @@ -660,11 +659,8 @@ def __init__(self, n_procs=None, mpi_cmd='mpiexec', mpi_comm_spawn=False, self.expected_data_length = 0 self.proc = None self.proc_queue = Queue() - self.mpi_comm_spawn = mpi_comm_spawn - self.mpi_comm_spawn_info = mpi_comm_spawn_info - self._intracomm = None - self._selfcomm = None - self._intercomm = None + self._mpi_comm_spawn = mpi_comm_spawn + self._mpi_comm_spawn_info = mpi_comm_spawn_info n_logical_cores = multiprocessing.cpu_count() if n_procs is None: @@ -703,10 +699,8 @@ def __init__(self, n_procs=None, mpi_cmd='mpiexec', mpi_comm_spawn=False, warn(f'{packages} not installed. Will run on single processor') self.n_procs = 1 - if self.mpi_comm_spawn: - self._intracomm = MPI.COMM_WORLD - self._selfcomm = MPI.COMM_SELF - if self._intracomm.Get_rank() != 0: + if self._mpi_comm_spawn: + if MPI.COMM_WORLD.Get_rank() != 0: raise RuntimeError('MPI is attempting to spawn multiple child ' 'jobs. Make sure only one parent MPI job ' 'is running when "mpi_comm_spawn" is True.') @@ -752,7 +746,7 @@ def __exit__(self, type, value, traceback): _BACKEND = self._old_backend # always kill nrniv processes for good measure - if (not self.mpi_comm_spawn) and self.n_procs > 1: + if (not self._mpi_comm_spawn) and self.n_procs > 1: kill_proc_name('nrniv') def simulate(self, net, tstop, dt, n_trials, postproc=False): @@ -789,12 +783,12 @@ def simulate(self, net, tstop, dt, n_trials, postproc=False): sim_objs = [net, tstop, dt, n_trials] - if self.mpi_comm_spawn: + if self._mpi_comm_spawn: sim_data = spawn_job( command=self.command, obj=sim_objs, n_procs=self.n_procs, - info=self.mpi_comm_spawn_info) + info=self._mpi_comm_spawn_info) else: env = _get_mpi_env() self.proc, sim_data = run_subprocess(