diff --git a/hnn_core/gui/gui.py b/hnn_core/gui/gui.py index a2705ec0a..51cc27481 100644 --- a/hnn_core/gui/gui.py +++ b/hnn_core/gui/gui.py @@ -8,8 +8,6 @@ import logging import mimetypes import numpy as np -import platform -import psutil import sys import json import urllib.parse @@ -36,7 +34,9 @@ get_L5Pyr_params_default) from hnn_core.hnn_io import dict_to_network, write_network_configuration from hnn_core.cells_default import _exp_g_at_dist -from hnn_core.parallel_backends import _has_mpi4py, _has_psutil +from hnn_core.parallel_backends import (_determine_cores_hwthreading, + _has_mpi4py, + _has_psutil) hnn_core_root = Path(hnn_core.__file__).parent default_network_configuration = (hnn_core_root / 'param' / @@ -347,7 +347,10 @@ def __init__(self, theme_color="#802989", self.params = self.load_parameters(network_configuration) # Number of available cores - self.n_cores = self._available_cores() + [self.n_cores, _] = _determine_cores_hwthreading( + enable_hwthreading=False, + sensible_default_cores=True, + ) # In-memory storage of all simulation and visualization related data self.simulation_data = defaultdict(lambda: dict(net=None, dpls=list())) @@ -407,7 +410,8 @@ def __init__(self, theme_color="#802989", self.widget_mpi_cmd = Text(value='mpiexec', placeholder='Fill if applies', description='MPI cmd:', disabled=False) - self.widget_n_jobs = BoundedIntText(value=1, min=1, + self.widget_n_jobs = BoundedIntText(value=1, + min=1, max=self.n_cores, description='Cores:', disabled=False) @@ -496,22 +500,6 @@ def __init__(self, theme_color="#802989", self._init_ui_components() self.add_logging_window_logger() - @staticmethod - def _available_cores(): - """Return the number of available cores to the process. - - This is important for systems where the number of available cores is - partitioned such as on HPC systems. Linux and Windows can return cpu - affinity, which is the number of available cores. MacOS can only return - total system cores. - """ - # For macos - if platform.system() == 'Darwin': - return psutil.cpu_count() - # For Linux and Windows - else: - return len(psutil.Process().cpu_affinity()) - @staticmethod def _check_backend(): """Checks for MPI and returns the default backend name""" @@ -2108,7 +2096,10 @@ def run_button_clicked(widget_simulation_name, log_out, drive_widgets, if backend_selection.value == "MPI": backend = MPIBackend( n_procs=n_jobs.value, - mpi_cmd=mpi_cmd.value) + mpi_cmd=mpi_cmd.value, + hwthreading=False, + oversubscribe=False, + ) else: backend = JoblibBackend(n_jobs=n_jobs.value) print(f"Using Joblib with {n_jobs.value} core(s).") diff --git a/hnn_core/parallel_backends.py b/hnn_core/parallel_backends.py index 9757414d0..1ab9deb76 100644 --- a/hnn_core/parallel_backends.py +++ b/hnn_core/parallel_backends.py @@ -6,7 +6,6 @@ import os import sys import re -import multiprocessing import shlex import pickle import base64 @@ -16,6 +15,8 @@ from queue import Queue, Empty from threading import Thread, Event +from typing import Union + from .cell_response import CellResponse from .dipole import Dipole from .network_builder import _simulate_single_trial @@ -554,34 +555,237 @@ def simulate(self, net, tstop, dt, n_trials, postproc=False): The Dipole results from each simulation trial """ - print(f"Joblib will run {n_trials} trial(s) in parallel by " - f"distributing trials over {self.n_jobs} jobs.") + print( + f"Joblib will run {n_trials} trial(s) in parallel by " + f"distributing trials over {self.n_jobs} jobs." + ) parallel, myfunc = self._parallel_func(_simulate_single_trial) - sim_data = parallel(myfunc(net, tstop, dt, trial_idx) for - trial_idx in range(n_trials)) + sim_data = parallel( + myfunc(net, tstop, dt, trial_idx) for trial_idx in range(n_trials) + ) - dpls = _gather_trial_data(sim_data, net=net, n_trials=n_trials, - postproc=postproc) + dpls = _gather_trial_data( + sim_data, net=net, n_trials=n_trials, postproc=postproc + ) return dpls +def _determine_cores_hwthreading( + enable_hwthreading: Union[None, bool] = True, + sensible_default_cores: bool = False, +) -> [int, bool]: + """Return the number of available cores and if hardware-threading is used. + + This is important for systems where the number of available cores is + partitioned such as on HPC systems, but is also important for determining + hardware support for hardware-threading. Hardware-threading ("hwthread" in + OpenMPI parlance), simultaneous multi-threading (SMT, + https://en.wikipedia.org/wiki/Simultaneous_multithreading ), and [Intel] + Hyperthreading are all terms used interchangeably, and are essentially + equivalent. + + Parameters + ---------- + enable_hwthreading : bool + Whether to detect support for hardware-threading and, if the feature is + detected, return the available number of 'logical' hardware-threaded + cores. Defaults to True. If 'False', or the feature is not detected, + return the available number of 'physical' cores (excluding + double-counting of hardware-threaded cores). + sensible_default_cores : bool + Whether to decrease the number of cores returned in a reasonable + manner, such that it balances speed with the user experience (e.g., + preventing the machine 'locking-up'). Defaults to 'False'. + + Returns + ------- + core_count : int + Number of logical CPU cores available for use by a process. + hwthreading_present : bool + Whether or not hardware-threading is present on some or all of the + logical CPU cores. + """ + # Needs its own import checks since it may be called by the GUI before + # MPIBackend() + if _has_mpi4py() and _has_psutil(): + if enable_hwthreading is None: + # This lets us pass the same arg to this function and MPIBackend() + # in case we want to use the default approaches. + enable_hwthreading = True + import platform + import psutil + if platform.system() == "Darwin": + # In Macos' case here, the situation is relatively simple, and we + # can determine all the information we need. We are never using + # Macos in an HPC environment, so even though we cannot use + # `psutil.Process().cpu_affinity()` in Macos, we do not need it. + + # First, let's get both the "logical" and "physical" cores of the + # system: 1. In the case of older Macs with Intel CPUs, this will + # detect each *single* Intel-Hyperthreaded physical-CPU as *two* + # distinct logical-CPUs. Similarly, the number of physical-CPUs + # detected will be less than the number of logical-CPUs (though not + # necessarily half!). 2. In the case of newer Macs with Apple + # Silicon CPUs, the logical-CPUs will be equal to the + # physical-CPUs, since these CPUs do not tell the system that they + # have hardware-threading. + logical_core_count = psutil.cpu_count(logical=True) + physical_core_count = psutil.cpu_count(logical=False) + + # We can compare these numbers to automatically determine which CPU + # architecture is present, and therefore if hardware-threading is + # present too: + hwthreading_detected = logical_core_count != physical_core_count + + # By default, return logical core number and, if present, + # hardware-threading. If the user informs us that they don't want + # hardware-threading, return physical core number and no + # hwthreading flag. + if enable_hwthreading: + core_count = logical_core_count + hwthreading_present = hwthreading_detected + else: + core_count = physical_core_count + hwthreading_present = False + + elif platform.system() == "Linux": + # In Linux's case here, the situation is more complex: + # + # 1. In the case of non-HPC Linux computers with Intel chips, the + # situation will be similar to the above Macos Intel-CPU case: + # if there are Intel-Hyperthreaded cores, then the number of + # logical cores will be greater than the number of physical + # cores (but not necessarily double!). Additionally, in this + # case, the number of "affinity" cores (which are the cores + # actually able to be used by new Processes) will be equal to + # the number of logical cores. Pretty simple. + # + # 2. In the case of non-HPC Linux computers with AMD chips, I do + # not know what will happen. I suspect that, for AMD chips with + # their "SMT" feature ("Simultaneous Multi-Threading", + # equivalent to Intel's trademarked Hyperthreading), they will + # probably behave identically to the Intel Linux case, and + # should work the same. + # + # 3. In the case of HPC Linux computers such as allocations on + # OSCAR, however, it's different: + # + # A. Hardware-Threading: The number of logical and physical + # cores reported are always equal to each other. It is not clear + # to me if you can enable true hardware-threading on OSCAR, nor + # if you can even detect it. The closest that OSCAR has to + # documentation about requesting multiple threads is here: + # https://docs.ccv.brown.edu/oscar/submitting-jobs/ + # mpi-jobs#hybrid-mpiopenmp + # which discusses using `--cpus-per-task` and then setting a + # custom OpenMP environment variable. (Note that OpenMP is a + # very, very different technology than OpenMPI!) Similarly, I + # cannot get a successful allocation using the option + # `--threads-per-core` (see + # https://slurm.schedmd.com/sbatch.html ) when using any value + # except one. Fortunately, since the logical CPU number appears + # to always match the physical number, OSCAR should always fail + # our hardware-threading detection "test". + # + # B. Cores: Depending on your OSCAR allocation, the number of + # cores you are allowed to use will change, but it appears it + # will always be reflected by the affinity CPU count. In + # contrast, the logical or physical CPU counts are those of the + # node as a whole, which you do NOT necessarily have access to, + # depending on your allocation. For a single node, the number of + # physical and logical cores appears to always be equal. The + # affinity core number will always be less than or equal to the + # number of physical (or logical) cores. + logical_core_count = psutil.cpu_count(logical=True) + physical_core_count = psutil.cpu_count(logical=False) + affinity_core_count = len(psutil.Process().cpu_affinity()) + + hwthreading_detected = logical_core_count != physical_core_count + + if enable_hwthreading: + # If we want to use hardware-threading if it's detected, then + # in all three of the above cases, we can simply use the CPU + # affinity count for our number of cores, and pass the result + # of our hardware-threading detection check. + core_count = affinity_core_count + hwthreading_present = hwthreading_detected + else: + # If the user informs us they don't want hardware-threading, + # then: + # 1. In the Linux-laptop case, affinity core number is the same + # as logical core number (i.e. including hardware-threaded + # cores). We should use the physical core number, which will + # always be less than or equal to the affinity core number. + # 2. In the OSCAR/HPC case, physical core number is the same as + # logical core number. We should use the affinity core + # number, which, in a single node, will always be less than + # or equal to the physical core number. + core_count = min(physical_core_count, affinity_core_count) + hwthreading_present = False + + else: + # In Windows' case here, "all bets are off". We do not currently + # officially support MPIBackend() usage on Windows due to the + # difficulty of its install, and there are outstanding issues with + # trying to use hardware-threads in particular: see + # https://github.com/jonescompneurolab/hnn-core/issues/589 . + # + # Therefore, we also do not support hardware-threading in this + # case, and it is disabled by default here. The cores reported are + # the non-hardware-threaded physical cores. + physical_core_count = psutil.cpu_count(logical=False) + core_count = physical_core_count + hwthreading_present = False + + default_threshold = 12 + if sensible_default_cores: + if core_count > default_threshold: + core_count = default_threshold + elif core_count > 2: + # This way, sensible defaults still always returns multiple + # cores if multiple are available. + core_count = core_count - 1 + else: + missing_packages = list() + if not _has_mpi4py(): + missing_packages += ["mpi4py"] + if not _has_psutil(): + missing_packages += ["psutil"] + missing_packages = " and ".join(missing_packages) + warn(f"{missing_packages} not installed. Will run on single " + "processor, with no hardware-threading.") + core_count = 1 + hwthreading_present = False + + return [core_count, hwthreading_present] + + class MPIBackend(object): """The MPIBackend class. Parameters ---------- - n_procs : int | None - The number of MPI processes requested by the user. If None, then will - attempt to detect number of cores (including hyperthreads) and start - parallel simulation over all of them. + n_procs : None | int + The number of MPI processes requested by the user. Defaults to 'None', + in which case it will attempt to detect the number of cores (including + hardware-threads) and start parallel simulation over all of them. mpi_cmd : str - The name of the mpi launcher executable. Will use 'mpiexec' - (openmpi) by default. + The name of the mpi launcher executable. Will use 'mpiexec' (openmpi) + by default. + hwthreading : None | bool + Whether or not to tell MPI to use hardware-threading. Defaults to + 'None', in which case it will use a heuristic for determing whether to + use it. If 'False', then hardware-threading is never used, and if + 'True', then hardware-threading is always used. + oversubscribe : None | bool + Whether or not to tell MPI to use oversubscription. Defaults to 'None', + in which case it will use a heuristic for determing whether to use + it. If 'False', then oversubscription is never used, and if 'True', + then oversubscription is always used. Attributes ---------- - n_procs : int The number of processes MPI will actually use (spread over cores). If 1 is specified or mpi4py could not be loaded, the simulation will be run @@ -594,59 +798,59 @@ class MPIBackend(object): proc_queue : threading.Queue A Queue object to hold process handles from Popen in a thread-safe way. There will be a valid process handle present the queue when a MPI - åsimulation is running. + simulation is running. """ - def __init__(self, n_procs=None, mpi_cmd='mpiexec'): + + def __init__( + self, + n_procs: Union[None, int] = None, + mpi_cmd: str = "mpiexec", + hwthreading: Union[None, bool] = None, + oversubscribe: Union[None, bool] = None, + ) -> None: self.expected_data_length = 0 self.proc = None self.proc_queue = Queue() - n_logical_cores = multiprocessing.cpu_count() - if n_procs is None: - self.n_procs = n_logical_cores - else: - self.n_procs = n_procs - - # did user try to force running on more cores than available? - oversubscribe = False - if self.n_procs > n_logical_cores: + # Check of psutil and mpi4py import has been moved into this function, + # since this function is called by GUI before MPIBackend() + # instantiated. + [n_available_cores, hwthreading_available] = \ + _determine_cores_hwthreading( + enable_hwthreading=(False if (hwthreading is False) else True)) + + self.n_procs = n_available_cores if (n_procs is None) else n_procs + + # Heuristic: did user try to force running on more cores than + # available? + if (oversubscribe is None) and (self.n_procs > n_available_cores): + warn( + "Number of requested MPI processes exceeds available " + "cores. Enabling MPI oversubscription automatically." + ) oversubscribe = True - hyperthreading = False - - if _has_mpi4py() and _has_psutil(): - import psutil - - n_physical_cores = psutil.cpu_count(logical=False) - - # detect if we need to use hwthread-cpus with mpiexec - if self.n_procs > n_physical_cores: - hyperthreading = True - - else: - packages = list() - if not _has_mpi4py(): - packages += ['mpi4py'] - if not _has_psutil(): - packages += ['psutil'] - packages = ' and '.join(packages) - warn(f'{packages} not installed. Will run on single processor') - self.n_procs = 1 + if (hwthreading is None) and hwthreading_available: + hwthreading = True self.mpi_cmd = mpi_cmd - if hyperthreading: - self.mpi_cmd += ' --use-hwthread-cpus' + if hwthreading: + self.mpi_cmd += " --use-hwthread-cpus" if oversubscribe: - self.mpi_cmd += ' --oversubscribe' - - self.mpi_cmd += ' -np ' + str(self.n_procs) - - self.mpi_cmd += ' nrniv -python -mpi -nobanner ' + \ - sys.executable + ' ' + \ - os.path.join(os.path.dirname(sys.modules[__name__].__file__), - 'mpi_child.py') + self.mpi_cmd += " --oversubscribe" + + self.mpi_cmd += " -np " + str(self.n_procs) + + self.mpi_cmd += ( + " nrniv -python -mpi -nobanner " + + sys.executable + + " " + + os.path.join( + os.path.dirname(sys.modules[__name__].__file__), "mpi_child.py" + ) + ) # Split the command into shell arguments for passing to Popen use_posix = True if sys.platform != 'win32' else False diff --git a/hnn_core/tests/test_parallel_backends.py b/hnn_core/tests/test_parallel_backends.py index b6e9f7ed6..8db86276b 100644 --- a/hnn_core/tests/test_parallel_backends.py +++ b/hnn_core/tests/test_parallel_backends.py @@ -2,7 +2,6 @@ from os import environ import io from contextlib import redirect_stdout -from multiprocessing import cpu_count from threading import Thread, Event from time import sleep from urllib.request import urlretrieve @@ -16,7 +15,11 @@ import hnn_core from hnn_core import MPIBackend, jones_2009_model, read_params from hnn_core.dipole import simulate_dipole -from hnn_core.parallel_backends import requires_mpi4py, requires_psutil +from hnn_core.parallel_backends import ( + requires_mpi4py, + requires_psutil, + _determine_cores_hwthreading, +) from hnn_core.network_builder import NetworkBuilder @@ -74,8 +77,6 @@ def test_gid_assignment(): # simulation when there are failures in previous (faster) tests. When a test # in the sequence fails, all subsequent tests will be marked "xfailed" rather # than skipped. - - @pytest.mark.incremental class TestParallelBackends(): dpls_reduced_mpi = None @@ -102,6 +103,35 @@ def test_run_joblibbackend(self, run_hnn_core_fixture): assert_array_equal(dpls_reduced_default[trial_idx].data['agg'], dpls_reduced_joblib[trial_idx].data['agg']) + @requires_mpi4py + @requires_psutil + @pytest.mark.parametrize("sensible_default", [False, True]) + def test_detect_cores(self, sensible_default): + """Test that multiple cores can be detected""" + [detected_cores_nohw, detected_hwthreading] = \ + _determine_cores_hwthreading( + enable_hwthreading=False, + sensible_default_cores=sensible_default) + assert detected_cores_nohw > 1 + assert isinstance(detected_hwthreading, bool) + + [detected_cores_yeshw, detected_hwthreading] = \ + _determine_cores_hwthreading( + enable_hwthreading=True, + sensible_default_cores=sensible_default) + assert detected_cores_yeshw > 1 + assert isinstance(detected_hwthreading, bool) + + [detected_cores_maybehw, detected_hwthreading] = \ + _determine_cores_hwthreading( + enable_hwthreading=None, + sensible_default_cores=sensible_default) + assert detected_cores_maybehw > 1 + assert isinstance(detected_hwthreading, bool) + + assert detected_cores_yeshw >= detected_cores_nohw + assert detected_cores_maybehw >= detected_cores_nohw + @requires_mpi4py @requires_psutil def test_mpi_nprocs(self): @@ -161,7 +191,9 @@ def test_terminate_mpibackend(self, run_hnn_core_fixture): @requires_mpi4py @requires_psutil - def test_run_mpibackend_oversubscribed(self, run_hnn_core_fixture): + @pytest.mark.parametrize("hwthreading_enabled", [None, False, True]) + def test_run_mpibackend_oversubscribed(self, run_hnn_core_fixture, + hwthreading_enabled): """Test running MPIBackend with oversubscribed number of procs""" hnn_core_root = op.dirname(hnn_core.__file__) params_fname = op.join(hnn_core_root, 'param', 'default.json') @@ -173,17 +205,24 @@ def test_run_mpibackend_oversubscribed(self, run_hnn_core_fixture): net = jones_2009_model(params, add_drives_from_params=True, mesh_shape=(3, 3)) - # try running with more procs than cells in the network (will probably - # oversubscribe) + # Fail state: try running with more procs than cells in the network + # (will probably oversubscribe too) too_many_procs = net._n_cells + 1 - with pytest.raises(ValueError, match='More MPI processes were ' - 'assigned than there are cells'): + + with pytest.raises(ValueError, + match=('More MPI processes were ' + 'assigned than there are cells')): with MPIBackend(n_procs=too_many_procs) as backend: simulate_dipole(net, tstop=40) - # force oversubscription + hyperthreading, but make sure there are - # always enough cells in the network - oversubscribed_procs = cpu_count() + 1 + # Force oversubscription and make sure there are always enough cells in + # the network + [detected_cores, detected_hwthreading] = \ + _determine_cores_hwthreading( + enable_hwthreading=hwthreading_enabled, + sensible_default_cores=False) + + oversubscribed_procs = detected_cores + 1 n_grid_1d = int(np.ceil(np.sqrt(oversubscribed_procs))) params.update({'t_evprox_1': 5, 't_evdist_1': 10, @@ -191,8 +230,51 @@ def test_run_mpibackend_oversubscribed(self, run_hnn_core_fixture): 'N_trials': 2}) net = jones_2009_model(params, add_drives_from_params=True, mesh_shape=(n_grid_1d, n_grid_1d)) - with MPIBackend(n_procs=oversubscribed_procs) as backend: - assert backend.n_procs == oversubscribed_procs + + # Check that oversubscription turns on if needed, and provides a + # warning + with pytest.warns(UserWarning, + match=("Number of requested MPI processes exceeds " + "available cores. Enabling MPI " + "oversubscription automatically.")): + with MPIBackend( + n_procs=oversubscribed_procs, + hwthreading=hwthreading_enabled) as backend: + assert backend.n_procs == oversubscribed_procs + assert "--oversubscribe" in ' '.join(backend.mpi_cmd) + if detected_hwthreading: + assert "--use-hwthread-cpus" in ' '.join(backend.mpi_cmd) + simulate_dipole(net, tstop=40) + + # Check that the simulation fails if oversubscribe is forced off + with pytest.warns(UserWarning) as record: + with MPIBackend( + n_procs=oversubscribed_procs, + hwthreading=hwthreading_enabled, + oversubscribe=False, + ) as backend: + assert "--oversubscribe" not in ' '.join(backend.mpi_cmd) + if detected_hwthreading: + assert "--use-hwthread-cpus" in ' '.join(backend.mpi_cmd) + with pytest.raises( + RuntimeError, + match="MPI simulation failed. Return code: 1"): + simulate_dipole(net, tstop=40) + + expected_string = ('Received BrokenPipeError exception. ' + 'Child process failed unexpectedly') + assert len(record) == 2 + assert expected_string in record[0].message.args[0] + + # Check that simulation succeeds if oversubscription is activated but + # unnecessary + with MPIBackend( + n_procs=2, + hwthreading=hwthreading_enabled, + oversubscribe=True) as backend: + assert "--oversubscribe" in ' '.join(backend.mpi_cmd) + if detected_hwthreading: + assert "--use-hwthread-cpus" in ' '.join(backend.mpi_cmd) simulate_dipole(net, tstop=40) @pytest.mark.parametrize("backend", ['mpi', 'joblib'])