Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] ENH: add ability to spawn MPI jobs from parent job #506

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Open
7 changes: 5 additions & 2 deletions examples/howto/plot_simulate_mpi_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,11 @@
# ``openmpi``, which must be installed on the system
from hnn_core import MPIBackend

with MPIBackend(n_procs=2, mpi_cmd='mpiexec'):
dpls = simulate_dipole(net, tstop=310., n_trials=1)
# to create a parent MPI job that uses MPIBackend to spawn child jobs, set
# mpi_comm_spawn=True and call this script from terminal using
# ``$ mpiexec -np 1 --oversubscribe python -m mpi4py /path/to/hnn-core/examples/howto/plot_simulate_mpi_backend.py``
jasmainak marked this conversation as resolved.
Show resolved Hide resolved
with MPIBackend(n_procs=5, mpi_cmd='mpiexec', mpi_comm_spawn=True):
dpls = simulate_dipole(net, tstop=200., n_trials=1)

trial_idx = 0
dpls[trial_idx].plot()
5 changes: 3 additions & 2 deletions hnn_core/mpi_child.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"""

# Authors: Blake Caldwell <[email protected]>
# Ryan Thorpe <[email protected]>

import sys
import pickle
Expand Down Expand Up @@ -72,7 +73,7 @@ def __exit__(self, type, value, traceback):
MPI.Finalize()

def _read_net(self):
"""Read net broadcasted to all ranks on stdin"""
"""Read net and associated objects broadcasted to all ranks on stdin"""

# read Network from stdin
if self.rank == 0:
Expand Down Expand Up @@ -148,11 +149,11 @@ def run(self, net, tstop, dt, n_trials):

try:
with MPISimulation() as mpi_sim:
# XXX: _read_net -> _read_obj, fix later
net, tstop, dt, n_trials = mpi_sim._read_net()
sim_data = mpi_sim.run(net, tstop, dt, n_trials)
mpi_sim._write_data_stderr(sim_data)
mpi_sim._wait_for_exit_signal()

except Exception:
# This can be useful to indicate the problem to the
# caller (in parallel_backends.py)
Expand Down
74 changes: 74 additions & 0 deletions hnn_core/mpi_comm_spawn_child.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
"""Script for running parallel simulations with MPI when called with mpiexec.
This script is called directly from MPIBackend.simulate()
"""

# Authors: Blake Caldwell <[email protected]>
# Ryan Thorpe <[email protected]>

from mpi4py import MPI
from hnn_core.network_builder import _simulate_single_trial


class MPISimulation(object):
"""The MPISimulation class.

Parameters
----------
skip_mpi_import : bool | None
Skip importing MPI. Only useful for testing with pytest.

Attributes
----------
comm : mpi4py.Comm object
The handle used for communicating among MPI processes
rank : int
The rank for each processor part of the MPI communicator
"""
Comment on lines +13 to +26
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update docstring

def __init__(self):
self.intercomm = MPI.Comm.Get_parent()
self.comm = MPI.COMM_WORLD
self.rank = self.comm.Get_rank()

def __enter__(self):
return self

def __exit__(self, type, value, traceback):
# skip Finalize() if we didn't import MPI on __init__
if hasattr(self, 'comm'):
self.intercomm.Disconnect()

def _read_net(self):
"""Read net and associated objects broadcasted to all ranks on stdin"""

return self.intercomm.bcast(None, root=0)

def run(self, net, tstop, dt, n_trials):
"""Run MPI simulation(s) and write results to stderr"""

sim_data = []
for trial_idx in range(n_trials):
single_sim_data = _simulate_single_trial(net, tstop, dt, trial_idx)

# go ahead and append trial data for each rank, though
# only rank 0 has data that should be sent back to MPIBackend
sim_data.append(single_sim_data)

return sim_data


if __name__ == '__main__':
"""This file is called on command-line from nrniv"""

with MPISimulation() as mpi_sim:
net, tstop, dt, n_trials = mpi_sim._read_net()

try:
sim_data = mpi_sim.run(net, tstop, dt, n_trials)
except:
err_occured = True
else:
err_occured = False
finally:
mpi_sim.intercomm.allreduce(err_occured, op=MPI.LAND)
if mpi_sim.rank == 0:
mpi_sim.intercomm.send(sim_data, dest=0)
150 changes: 116 additions & 34 deletions hnn_core/parallel_backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def _gather_trial_data(sim_data, net, n_trials, postproc):
To be called after simulate(). Returns list of Dipoles, one for each trial,
and saves spiking info in net (instance of Network).
"""
dpls = []
dpls = list()

# Create array of equally sampled time points for simulating currents
cell_type_names = list(net.cell_types.keys())
Expand Down Expand Up @@ -92,11 +92,66 @@ def _get_mpi_env():
return my_env


def spawn_job(command, obj, n_procs, info=None):
"""Spawn child simulation jobs from an existing MPI communicator.

Parameters
----------
command : list of str
Command and program arguments to spawn job from new MPI communicator.
obj : object
The object containing network and other simulation-specific information
to broadcast to each child process via MPI.
n_procs : int
The number of MPI processes to parallelize (by cell, see Neuron
documentation) the simulation over.
info: mpi4py.MPI.Info | None
Appliable only when mpi_comm_spawn is True: an mpi4py.MPI.Info instance
that grants the user control over how openMPI configures spawned jobs.

Returns
-------
child_data : object
The data returned by the child process.
"""

from mpi4py import MPI

if info is None:
intercomm = MPI.COMM_SELF.Spawn(command[0], args=command[1:],
maxprocs=n_procs)
else:
intercomm = MPI.COMM_SELF.Spawn(command[0], args=command[1:],
info=info,
maxprocs=n_procs)

# send Network + sim param objects to each child process
intercomm.bcast(obj, root=MPI.ROOT)

# all child processes must complete without an error; if not, prevent
# the parent from deadlocking with a blocking recv call (note that the
# traceback cannot be accessed from here or passed to the parent)
err_in_children = intercomm.allreduce(None, op=MPI.LAND)
if err_in_children:
raise RuntimeError('An error occured in a spawned MPI child process. '
'Consider debugging with MPIBackend when '
'mpi_comm_spawn=False.')

# receive data
child_data = intercomm.recv(source=MPI.ANY_SOURCE)

# close spawned communicator
intercomm.Disconnect()

return child_data


def run_subprocess(command, obj, timeout, proc_queue=None, *args, **kwargs):
"""Run process and communicate with it.
"""Run simulation process and communicate with it.

Parameters
----------
command : list of str | str
command : list of str
Command to run as subprocess (see subprocess.Popen documentation).
obj : object
The object to write to stdin after starting child process
Expand All @@ -105,6 +160,7 @@ def run_subprocess(command, obj, timeout, proc_queue=None, *args, **kwargs):
The number of seconds to wait for a process without output.
*args, **kwargs : arguments
Additional arguments to pass to subprocess.Popen.

Returns
-------
child_data : object
Expand Down Expand Up @@ -572,36 +628,49 @@ class MPIBackend(object):
attempt to detect number of cores (including hyperthreads) and start
parallel simulation over all of them.
mpi_cmd : str
The name of the mpi launcher executable. Will use 'mpiexec'
(openmpi) by default.
Applicable only when mpi_comm_spawn is False (default): The name of the
mpi launcher executable. Will use 'mpiexec' (openmpi) by default.
mpi_comm_spawn : bool
Spawns new MPI jobs from an existing MPI communicator instead of
calling mpi_cmd. This allows for the parent MPI job and associated
hardware resources to be pre-instantiated e.g. on a computing cluster.
mpi_comm_spawn_info : mpi4py.MPI.Info | None
Appliable only when mpi_comm_spawn is True: an mpi4py.MPI.Info instance
that grants the user control over how openMPI configures spawned jobs.

Attributes
----------

n_procs : int
The number of processes MPI will actually use (spread over cores). If 1
is specified or mpi4py could not be loaded, the simulation will be run
with the JoblibBackend
mpi_cmd : list of str
The mpi command with number of procs and options to be passed to Popen
command : list of str
Command to run as subprocess (see subprocess.Popen documentation).
expected_data_length : int
Used to check consistency between data that was sent and what
MPIBackend received.
proc_queue : threading.Queue
A Queue object to hold process handles from Popen in a thread-safe way.
There will be a valid process handle present the queue when a MPI
åsimulation is running.
There will be a valid process handle present in the queue when an MPI
simulation is running.
"""
def __init__(self, n_procs=None, mpi_cmd='mpiexec'):
def __init__(self, n_procs=None, mpi_cmd='mpiexec', mpi_comm_spawn=False,
mpi_comm_spawn_info=None):
self.expected_data_length = 0
self.proc = None
self.proc_queue = Queue()
self._mpi_comm_spawn = mpi_comm_spawn
self._mpi_comm_spawn_info = mpi_comm_spawn_info

n_logical_cores = multiprocessing.cpu_count()
if n_procs is None:
self.n_procs = n_logical_cores
elif n_procs == 1:
print("Backend will use 1 core. Running simulation without MPI")
return
else:
self.n_procs = n_procs
print("MPI will run over %d processes" % (self.n_procs))

# did user try to force running on more cores than available?
oversubscribe = False
Expand All @@ -611,6 +680,7 @@ def __init__(self, n_procs=None, mpi_cmd='mpiexec'):
hyperthreading = False

if _has_mpi4py() and _has_psutil():
from mpi4py import MPI
import psutil

n_physical_cores = psutil.cpu_count(logical=False)
Expand All @@ -629,33 +699,38 @@ def __init__(self, n_procs=None, mpi_cmd='mpiexec'):
warn(f'{packages} not installed. Will run on single processor')
self.n_procs = 1

self.mpi_cmd = mpi_cmd
if self._mpi_comm_spawn:
if MPI.COMM_WORLD.Get_rank() != 0:
raise RuntimeError('MPI is attempting to spawn multiple child '
'jobs. Make sure only one parent MPI job '
'is running when "mpi_comm_spawn" is True.')

if self.n_procs == 1:
print("Backend will use 1 core. Running simulation without MPI")
return
else:
print("MPI will run over %d processes" % (self.n_procs))
self.command = 'nrniv -python -mpi -nobanner ' + \
sys.executable + ' ' + \
os.path.join(os.path.dirname(sys.modules[__name__].__file__),
'mpi_comm_spawn_child.py')

if hyperthreading:
self.mpi_cmd += ' --use-hwthread-cpus'
else:
self.command = mpi_cmd + \
' -np ' + str(self.n_procs)

if oversubscribe:
self.mpi_cmd += ' --oversubscribe'
if hyperthreading:
self.command += ' --use-hwthread-cpus'

self.mpi_cmd += ' -np ' + str(self.n_procs)
if oversubscribe:
self.command += ' --oversubscribe'

self.mpi_cmd += ' nrniv -python -mpi -nobanner ' + \
sys.executable + ' ' + \
os.path.join(os.path.dirname(sys.modules[__name__].__file__),
'mpi_child.py')
self.command += ' nrniv -python -mpi -nobanner ' + \
sys.executable + ' ' + \
os.path.join(os.path.dirname(sys.modules[__name__].__file__),
'mpi_child.py')

# Split the command into shell arguments for passing to Popen
if 'win' in sys.platform:
use_posix = True
else:
use_posix = False
self.mpi_cmd = shlex.split(self.mpi_cmd, posix=use_posix)
self.command = shlex.split(self.command, posix=use_posix)

def __enter__(self):
global _BACKEND
Expand All @@ -671,7 +746,7 @@ def __exit__(self, type, value, traceback):
_BACKEND = self._old_backend

# always kill nrniv processes for good measure
if self.n_procs > 1:
if (not self._mpi_comm_spawn) and self.n_procs > 1:
kill_proc_name('nrniv')

def simulate(self, net, tstop, dt, n_trials, postproc=False):
Expand Down Expand Up @@ -705,14 +780,21 @@ def simulate(self, net, tstop, dt, n_trials, postproc=False):
postproc=postproc)

print("Running %d trials..." % (n_trials))
dpls = []

env = _get_mpi_env()
sim_objs = [net, tstop, dt, n_trials]

self.proc, sim_data = run_subprocess(
command=self.mpi_cmd, obj=[net, tstop, dt, n_trials], timeout=30,
proc_queue=self.proc_queue, env=env, cwd=os.getcwd(),
universal_newlines=True)
if self._mpi_comm_spawn:
sim_data = spawn_job(
command=self.command,
obj=sim_objs,
n_procs=self.n_procs,
info=self._mpi_comm_spawn_info)
else:
env = _get_mpi_env()
self.proc, sim_data = run_subprocess(
command=self.command, obj=sim_objs, timeout=30,
proc_queue=self.proc_queue, env=env, cwd=os.getcwd(),
universal_newlines=True)

dpls = _gather_trial_data(sim_data, net, n_trials, postproc)
return dpls
Expand Down