diff --git a/examples/howto/plot_simulate_mpi_backend.py b/examples/howto/plot_simulate_mpi_backend.py index bb7320e65..84a3d3cbb 100644 --- a/examples/howto/plot_simulate_mpi_backend.py +++ b/examples/howto/plot_simulate_mpi_backend.py @@ -47,8 +47,11 @@ # ``openmpi``, which must be installed on the system from hnn_core import MPIBackend -with MPIBackend(n_procs=2, mpi_cmd='mpiexec'): - dpls = simulate_dipole(net, tstop=310., n_trials=1) +# to create a parent MPI job that uses MPIBackend to spawn child jobs, set +# mpi_comm_spawn=True and call this script from terminal using +# ``$ mpiexec -np 1 --oversubscribe python -m mpi4py /path/to/hnn-core/examples/howto/plot_simulate_mpi_backend.py`` +with MPIBackend(n_procs=5, mpi_cmd='mpiexec', mpi_comm_spawn=True): + dpls = simulate_dipole(net, tstop=200., n_trials=1) trial_idx = 0 dpls[trial_idx].plot() diff --git a/hnn_core/mpi_child.py b/hnn_core/mpi_child.py index fc0da385a..3e3f99413 100644 --- a/hnn_core/mpi_child.py +++ b/hnn_core/mpi_child.py @@ -3,6 +3,7 @@ """ # Authors: Blake Caldwell +# Ryan Thorpe import sys import pickle @@ -72,7 +73,7 @@ def __exit__(self, type, value, traceback): MPI.Finalize() def _read_net(self): - """Read net broadcasted to all ranks on stdin""" + """Read net and associated objects broadcasted to all ranks on stdin""" # read Network from stdin if self.rank == 0: @@ -148,11 +149,11 @@ def run(self, net, tstop, dt, n_trials): try: with MPISimulation() as mpi_sim: - # XXX: _read_net -> _read_obj, fix later net, tstop, dt, n_trials = mpi_sim._read_net() sim_data = mpi_sim.run(net, tstop, dt, n_trials) mpi_sim._write_data_stderr(sim_data) mpi_sim._wait_for_exit_signal() + except Exception: # This can be useful to indicate the problem to the # caller (in parallel_backends.py) diff --git a/hnn_core/mpi_comm_spawn_child.py b/hnn_core/mpi_comm_spawn_child.py new file mode 100644 index 000000000..e98b41fb6 --- /dev/null +++ b/hnn_core/mpi_comm_spawn_child.py @@ -0,0 +1,74 @@ +"""Script for running parallel simulations with MPI when called with mpiexec. +This script is called directly from MPIBackend.simulate() +""" + +# Authors: Blake Caldwell +# Ryan Thorpe + +from mpi4py import MPI +from hnn_core.network_builder import _simulate_single_trial + + +class MPISimulation(object): + """The MPISimulation class. + + Parameters + ---------- + skip_mpi_import : bool | None + Skip importing MPI. Only useful for testing with pytest. + + Attributes + ---------- + comm : mpi4py.Comm object + The handle used for communicating among MPI processes + rank : int + The rank for each processor part of the MPI communicator + """ + def __init__(self): + self.intercomm = MPI.Comm.Get_parent() + self.comm = MPI.COMM_WORLD + self.rank = self.comm.Get_rank() + + def __enter__(self): + return self + + def __exit__(self, type, value, traceback): + # skip Finalize() if we didn't import MPI on __init__ + if hasattr(self, 'comm'): + self.intercomm.Disconnect() + + def _read_net(self): + """Read net and associated objects broadcasted to all ranks on stdin""" + + return self.intercomm.bcast(None, root=0) + + def run(self, net, tstop, dt, n_trials): + """Run MPI simulation(s) and write results to stderr""" + + sim_data = [] + for trial_idx in range(n_trials): + single_sim_data = _simulate_single_trial(net, tstop, dt, trial_idx) + + # go ahead and append trial data for each rank, though + # only rank 0 has data that should be sent back to MPIBackend + sim_data.append(single_sim_data) + + return sim_data + + +if __name__ == '__main__': + """This file is called on command-line from nrniv""" + + with MPISimulation() as mpi_sim: + net, tstop, dt, n_trials = mpi_sim._read_net() + + try: + sim_data = mpi_sim.run(net, tstop, dt, n_trials) + except: + err_occured = True + else: + err_occured = False + finally: + mpi_sim.intercomm.allreduce(err_occured, op=MPI.LAND) + if mpi_sim.rank == 0: + mpi_sim.intercomm.send(sim_data, dest=0) diff --git a/hnn_core/parallel_backends.py b/hnn_core/parallel_backends.py index bb7dc63d4..e0bacf782 100644 --- a/hnn_core/parallel_backends.py +++ b/hnn_core/parallel_backends.py @@ -37,7 +37,7 @@ def _gather_trial_data(sim_data, net, n_trials, postproc): To be called after simulate(). Returns list of Dipoles, one for each trial, and saves spiking info in net (instance of Network). """ - dpls = [] + dpls = list() # Create array of equally sampled time points for simulating currents cell_type_names = list(net.cell_types.keys()) @@ -92,11 +92,66 @@ def _get_mpi_env(): return my_env +def spawn_job(command, obj, n_procs, info=None): + """Spawn child simulation jobs from an existing MPI communicator. + + Parameters + ---------- + command : list of str + Command and program arguments to spawn job from new MPI communicator. + obj : object + The object containing network and other simulation-specific information + to broadcast to each child process via MPI. + n_procs : int + The number of MPI processes to parallelize (by cell, see Neuron + documentation) the simulation over. + info: mpi4py.MPI.Info | None + Appliable only when mpi_comm_spawn is True: an mpi4py.MPI.Info instance + that grants the user control over how openMPI configures spawned jobs. + + Returns + ------- + child_data : object + The data returned by the child process. + """ + + from mpi4py import MPI + + if info is None: + intercomm = MPI.COMM_SELF.Spawn(command[0], args=command[1:], + maxprocs=n_procs) + else: + intercomm = MPI.COMM_SELF.Spawn(command[0], args=command[1:], + info=info, + maxprocs=n_procs) + + # send Network + sim param objects to each child process + intercomm.bcast(obj, root=MPI.ROOT) + + # all child processes must complete without an error; if not, prevent + # the parent from deadlocking with a blocking recv call (note that the + # traceback cannot be accessed from here or passed to the parent) + err_in_children = intercomm.allreduce(None, op=MPI.LAND) + if err_in_children: + raise RuntimeError('An error occured in a spawned MPI child process. ' + 'Consider debugging with MPIBackend when ' + 'mpi_comm_spawn=False.') + + # receive data + child_data = intercomm.recv(source=MPI.ANY_SOURCE) + + # close spawned communicator + intercomm.Disconnect() + + return child_data + + def run_subprocess(command, obj, timeout, proc_queue=None, *args, **kwargs): - """Run process and communicate with it. + """Run simulation process and communicate with it. + Parameters ---------- - command : list of str | str + command : list of str Command to run as subprocess (see subprocess.Popen documentation). obj : object The object to write to stdin after starting child process @@ -105,6 +160,7 @@ def run_subprocess(command, obj, timeout, proc_queue=None, *args, **kwargs): The number of seconds to wait for a process without output. *args, **kwargs : arguments Additional arguments to pass to subprocess.Popen. + Returns ------- child_data : object @@ -572,36 +628,49 @@ class MPIBackend(object): attempt to detect number of cores (including hyperthreads) and start parallel simulation over all of them. mpi_cmd : str - The name of the mpi launcher executable. Will use 'mpiexec' - (openmpi) by default. + Applicable only when mpi_comm_spawn is False (default): The name of the + mpi launcher executable. Will use 'mpiexec' (openmpi) by default. + mpi_comm_spawn : bool + Spawns new MPI jobs from an existing MPI communicator instead of + calling mpi_cmd. This allows for the parent MPI job and associated + hardware resources to be pre-instantiated e.g. on a computing cluster. + mpi_comm_spawn_info : mpi4py.MPI.Info | None + Appliable only when mpi_comm_spawn is True: an mpi4py.MPI.Info instance + that grants the user control over how openMPI configures spawned jobs. Attributes ---------- - n_procs : int The number of processes MPI will actually use (spread over cores). If 1 is specified or mpi4py could not be loaded, the simulation will be run with the JoblibBackend - mpi_cmd : list of str - The mpi command with number of procs and options to be passed to Popen + command : list of str + Command to run as subprocess (see subprocess.Popen documentation). expected_data_length : int Used to check consistency between data that was sent and what MPIBackend received. proc_queue : threading.Queue A Queue object to hold process handles from Popen in a thread-safe way. - There will be a valid process handle present the queue when a MPI - åsimulation is running. + There will be a valid process handle present in the queue when an MPI + simulation is running. """ - def __init__(self, n_procs=None, mpi_cmd='mpiexec'): + def __init__(self, n_procs=None, mpi_cmd='mpiexec', mpi_comm_spawn=False, + mpi_comm_spawn_info=None): self.expected_data_length = 0 self.proc = None self.proc_queue = Queue() + self._mpi_comm_spawn = mpi_comm_spawn + self._mpi_comm_spawn_info = mpi_comm_spawn_info n_logical_cores = multiprocessing.cpu_count() if n_procs is None: self.n_procs = n_logical_cores + elif n_procs == 1: + print("Backend will use 1 core. Running simulation without MPI") + return else: self.n_procs = n_procs + print("MPI will run over %d processes" % (self.n_procs)) # did user try to force running on more cores than available? oversubscribe = False @@ -611,6 +680,7 @@ def __init__(self, n_procs=None, mpi_cmd='mpiexec'): hyperthreading = False if _has_mpi4py() and _has_psutil(): + from mpi4py import MPI import psutil n_physical_cores = psutil.cpu_count(logical=False) @@ -629,33 +699,38 @@ def __init__(self, n_procs=None, mpi_cmd='mpiexec'): warn(f'{packages} not installed. Will run on single processor') self.n_procs = 1 - self.mpi_cmd = mpi_cmd + if self._mpi_comm_spawn: + if MPI.COMM_WORLD.Get_rank() != 0: + raise RuntimeError('MPI is attempting to spawn multiple child ' + 'jobs. Make sure only one parent MPI job ' + 'is running when "mpi_comm_spawn" is True.') - if self.n_procs == 1: - print("Backend will use 1 core. Running simulation without MPI") - return - else: - print("MPI will run over %d processes" % (self.n_procs)) + self.command = 'nrniv -python -mpi -nobanner ' + \ + sys.executable + ' ' + \ + os.path.join(os.path.dirname(sys.modules[__name__].__file__), + 'mpi_comm_spawn_child.py') - if hyperthreading: - self.mpi_cmd += ' --use-hwthread-cpus' + else: + self.command = mpi_cmd + \ + ' -np ' + str(self.n_procs) - if oversubscribe: - self.mpi_cmd += ' --oversubscribe' + if hyperthreading: + self.command += ' --use-hwthread-cpus' - self.mpi_cmd += ' -np ' + str(self.n_procs) + if oversubscribe: + self.command += ' --oversubscribe' - self.mpi_cmd += ' nrniv -python -mpi -nobanner ' + \ - sys.executable + ' ' + \ - os.path.join(os.path.dirname(sys.modules[__name__].__file__), - 'mpi_child.py') + self.command += ' nrniv -python -mpi -nobanner ' + \ + sys.executable + ' ' + \ + os.path.join(os.path.dirname(sys.modules[__name__].__file__), + 'mpi_child.py') # Split the command into shell arguments for passing to Popen if 'win' in sys.platform: use_posix = True else: use_posix = False - self.mpi_cmd = shlex.split(self.mpi_cmd, posix=use_posix) + self.command = shlex.split(self.command, posix=use_posix) def __enter__(self): global _BACKEND @@ -671,7 +746,7 @@ def __exit__(self, type, value, traceback): _BACKEND = self._old_backend # always kill nrniv processes for good measure - if self.n_procs > 1: + if (not self._mpi_comm_spawn) and self.n_procs > 1: kill_proc_name('nrniv') def simulate(self, net, tstop, dt, n_trials, postproc=False): @@ -705,14 +780,21 @@ def simulate(self, net, tstop, dt, n_trials, postproc=False): postproc=postproc) print("Running %d trials..." % (n_trials)) - dpls = [] - env = _get_mpi_env() + sim_objs = [net, tstop, dt, n_trials] - self.proc, sim_data = run_subprocess( - command=self.mpi_cmd, obj=[net, tstop, dt, n_trials], timeout=30, - proc_queue=self.proc_queue, env=env, cwd=os.getcwd(), - universal_newlines=True) + if self._mpi_comm_spawn: + sim_data = spawn_job( + command=self.command, + obj=sim_objs, + n_procs=self.n_procs, + info=self._mpi_comm_spawn_info) + else: + env = _get_mpi_env() + self.proc, sim_data = run_subprocess( + command=self.command, obj=sim_objs, timeout=30, + proc_queue=self.proc_queue, env=env, cwd=os.getcwd(), + universal_newlines=True) dpls = _gather_trial_data(sim_data, net, n_trials, postproc) return dpls