From bd98004511b8b93a20e77f453c849806b939174a Mon Sep 17 00:00:00 2001 From: Daniel Weindl Date: Wed, 27 Nov 2024 11:20:09 +0100 Subject: [PATCH 1/3] Fix `NLoptOptimizer.__repr__` (#1518) An incorrect value was used for `local_options`. --- pypesto/optimize/optimizer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pypesto/optimize/optimizer.py b/pypesto/optimize/optimizer.py index b05570e73..c445a2776 100644 --- a/pypesto/optimize/optimizer.py +++ b/pypesto/optimize/optimizer.py @@ -1169,7 +1169,7 @@ def __repr__(self) -> str: if self.options is not None: rep += f" options={self.options}" if self.local_options is not None: - rep += f" local_options={self.local_methods}" + rep += f" local_options={self.local_options}" return rep + ">" @minimize_decorator_collection From 8f1bf803fd00a504be1a5979d3679059108d7c9b Mon Sep 17 00:00:00 2001 From: Daniel Weindl Date: Wed, 27 Nov 2024 11:28:04 +0100 Subject: [PATCH 2/3] Improve exception-handling in SacessOptimizer (#1517) Previously, uncaught exceptions in SacessOptimizer worker processes would have resulted in deadlocks in `SacessOptimizer.minimize`. These changes will (in most cases) prevent deadlocks and other errors due to missing results from individual workers. Furthermore, this will lead to termination of minimize() relatively soon after an error occurred on some worker - not only after some other exit criterion is met. Closes #1512. --- pypesto/optimize/ess/ess.py | 2 + pypesto/optimize/ess/sacess.py | 158 +++++++++++++++++++++++++-------- test/optimize/test_optimize.py | 35 ++++++++ 3 files changed, 157 insertions(+), 38 deletions(-) diff --git a/pypesto/optimize/ess/ess.py b/pypesto/optimize/ess/ess.py index 2a86e18d9..74d88f641 100644 --- a/pypesto/optimize/ess/ess.py +++ b/pypesto/optimize/ess/ess.py @@ -37,6 +37,8 @@ class ESSExitFlag(int, enum.Enum): MAX_EVAL = -2 # Exited after exhausting wall-time budget MAX_TIME = -3 + # Termination because for other reason than exit criteria + ERROR = -99 class OptimizerFactory(Protocol): diff --git a/pypesto/optimize/ess/sacess.py b/pypesto/optimize/ess/sacess.py index 113310f25..9340b23da 100644 --- a/pypesto/optimize/ess/sacess.py +++ b/pypesto/optimize/ess/sacess.py @@ -7,6 +7,7 @@ import multiprocessing import os import time +from contextlib import suppress from dataclasses import dataclass from math import ceil, sqrt from multiprocessing import get_context @@ -20,6 +21,7 @@ import pypesto +from ... import MemoryHistory from ...startpoint import StartpointMethod from ...store.read_from_hdf5 import read_result from ...store.save_to_hdf5 import write_result @@ -331,12 +333,18 @@ def minimize( n_eval_total = sum( worker_result.n_eval for worker_result in self.worker_results ) - logger.info( - f"{self.__class__.__name__} stopped after {walltime:3g}s " - f"and {n_eval_total} objective evaluations " - f"with global best {result.optimize_result[0].fval}." - ) - + if len(result.optimize_result): + logger.info( + f"{self.__class__.__name__} stopped after {walltime:3g}s " + f"and {n_eval_total} objective evaluations " + f"with global best {result.optimize_result[0].fval}." + ) + else: + logger.error( + f"{self.__class__.__name__} stopped after {walltime:3g}s " + f"and {n_eval_total} objective evaluations without producing " + "a result." + ) return result def _create_result(self, problem: Problem) -> pypesto.Result: @@ -345,25 +353,40 @@ def _create_result(self, problem: Problem) -> pypesto.Result: Creates an overall Result object from the results saved by the workers. """ # gather results from workers and delete temporary result files - result = None + result = pypesto.Result() + retry_after_sleep = True for worker_idx in range(self.num_workers): tmp_result_filename = SacessWorker.get_temp_result_filename( worker_idx, self._tmpdir ) + tmp_result = None try: tmp_result = read_result( tmp_result_filename, problem=False, optimize=True ) except FileNotFoundError: # wait and retry, maybe the file wasn't found due to some filesystem latency issues - time.sleep(5) - tmp_result = read_result( - tmp_result_filename, problem=False, optimize=True - ) + if retry_after_sleep: + time.sleep(5) + # waiting once is enough - don't wait for every worker + retry_after_sleep = False + + try: + tmp_result = read_result( + tmp_result_filename, problem=False, optimize=True + ) + except FileNotFoundError: + logger.error( + f"Worker {worker_idx} did not produce a result." + ) + continue + else: + logger.error( + f"Worker {worker_idx} did not produce a result." + ) + continue - if result is None: - result = tmp_result - else: + if tmp_result: result.optimize_result.append( tmp_result.optimize_result, sort=False, @@ -375,7 +398,8 @@ def _create_result(self, problem: Problem) -> pypesto.Result: filename = SacessWorker.get_temp_result_filename( worker_idx, self._tmpdir ) - os.remove(filename) + with suppress(FileNotFoundError): + os.remove(filename) # delete tmpdir if empty try: self._tmpdir.rmdir() @@ -397,6 +421,7 @@ class SacessManager: Attributes ---------- + _dim: Dimension of the optimization problem _num_workers: Number of workers _ess_options: ESS options for each worker _best_known_fx: Best objective value encountered so far @@ -410,6 +435,7 @@ class SacessManager: _rejection_threshold: Threshold for relative objective improvements that incoming solutions have to pass to be accepted _lock: Lock for accessing shared state. + _terminate: Flag to signal termination of the SACESS run to workers _logger: A logger instance _options: Further optimizer hyperparameters. """ @@ -421,6 +447,7 @@ def __init__( dim: int, options: SacessOptions = None, ): + self._dim = dim self._options = options or SacessOptions() self._num_workers = len(ess_options) self._ess_options = [shmem_manager.dict(o) for o in ess_options] @@ -440,6 +467,7 @@ def __init__( self._worker_scores = shmem_manager.Array( "d", range(self._num_workers) ) + self._terminate = shmem_manager.Value("b", False) self._worker_comms = shmem_manager.Array("i", [0] * self._num_workers) self._lock = shmem_manager.RLock() self._logger = logging.getLogger() @@ -550,6 +578,16 @@ def submit_solution( ) self._rejections.value = 0 + def abort(self): + """Abort the SACESS run.""" + with self._lock: + self._terminate.value = True + + def aborted(self) -> bool: + """Whether this run was aborted.""" + with self._lock: + return self._terminate.value + class SacessWorker: """A SACESS worker. @@ -641,7 +679,7 @@ def run( ess = self._setup_ess(startpoint_method) # run ESS until exit criteria are met, but start at least one iteration - while self._keep_going() or ess.n_iter == 0: + while self._keep_going(ess) or ess.n_iter == 0: # perform one ESS iteration ess._do_iteration() @@ -667,19 +705,42 @@ def run( f"(best: {self._best_known_fx}, " f"n_eval: {ess.evaluator.n_eval})." ) - - ess.history.finalize(exitflag=ess.exit_flag.name) - worker_result = SacessWorkerResult( - x=ess.x_best, - fx=ess.fx_best, - history=ess.history, - n_eval=ess.evaluator.n_eval, - n_iter=ess.n_iter, - exit_flag=ess.exit_flag, - ) + self._finalize(ess) + + def _finalize(self, ess: ESSOptimizer = None): + """Finalize the worker.""" + # Whatever happens here, we need to put something to the queue before + # returning to avoid deadlocks. + worker_result = None + if ess is not None: + try: + ess.history.finalize(exitflag=ess.exit_flag.name) + ess._report_final() + worker_result = SacessWorkerResult( + x=ess.x_best, + fx=ess.fx_best, + history=ess.history, + n_eval=ess.evaluator.n_eval, + n_iter=ess.n_iter, + exit_flag=ess.exit_flag, + ) + except Exception as e: + self._logger.exception( + f"Worker {self._worker_idx} failed to finalize: {e}" + ) + if worker_result is None: + # Create some dummy result + worker_result = SacessWorkerResult( + x=np.full(self._manager._dim, np.nan), + fx=np.nan, + history=MemoryHistory(), + n_eval=0, + n_iter=0, + exit_flag=ESSExitFlag.ERROR, + ) self._manager._result_queue.put(worker_result) + self._logger.debug(f"Final configuration: {self._ess_kwargs}") - ess._report_final() def _setup_ess(self, startpoint_method: StartpointMethod) -> ESSOptimizer: """Run ESS.""" @@ -821,7 +882,7 @@ def replace_solution(refset: RefSet, x: np.ndarray, fx: float): fx=fx, ) - def _keep_going(self): + def _keep_going(self, ess: ESSOptimizer) -> bool: """Check exit criteria. Returns @@ -830,14 +891,26 @@ def _keep_going(self): """ # elapsed time if time.time() - self._start_time >= self._max_walltime_s: - self.exit_flag = ESSExitFlag.MAX_TIME + ess.exit_flag = ESSExitFlag.MAX_TIME self._logger.debug( f"Max walltime ({self._max_walltime_s}s) exceeded." ) return False - + # other reasons for termination (some worker failed, ...) + if self._manager.aborted(): + ess.exit_flag = ESSExitFlag.ERROR + self._logger.debug("Manager requested termination.") + return False return True + def abort(self): + """Send signal to abort.""" + self._logger.error(f"Worker {self._worker_idx} aborting.") + # signal to manager + self._manager.abort() + + self._finalize(None) + @staticmethod def get_temp_result_filename(worker_idx: int, tmpdir: str | Path) -> str: return str(Path(tmpdir, f"sacess-{worker_idx:02d}_tmp.h5").absolute()) @@ -853,15 +926,24 @@ def _run_worker( Helper function as entrypoint for sacess worker processes. """ - # different random seeds per process - np.random.seed((os.getpid() * int(time.time() * 1000)) % 2**32) - - # Forward log messages to the logging process - h = logging.handlers.QueueHandler(log_process_queue) - worker._logger = logging.getLogger(multiprocessing.current_process().name) - worker._logger.addHandler(h) + try: + # different random seeds per process + np.random.seed((os.getpid() * int(time.time() * 1000)) % 2**32) + + # Forward log messages to the logging process + h = logging.handlers.QueueHandler(log_process_queue) + worker._logger = logging.getLogger( + multiprocessing.current_process().name + ) + worker._logger.addHandler(h) - return worker.run(problem=problem, startpoint_method=startpoint_method) + return worker.run(problem=problem, startpoint_method=startpoint_method) + except Exception as e: + with suppress(Exception): + worker._logger.exception( + f"Worker {worker._worker_idx} failed: {e}" + ) + worker.abort() def get_default_ess_options( diff --git a/test/optimize/test_optimize.py b/test/optimize/test_optimize.py index 48ebdea55..3e6a5b4ed 100644 --- a/test/optimize/test_optimize.py +++ b/test/optimize/test_optimize.py @@ -18,6 +18,7 @@ import pypesto import pypesto.optimize as optimize +from pypesto import Objective from pypesto.optimize.ess import ( ESSOptimizer, FunctionEvaluatorMP, @@ -577,6 +578,40 @@ def test_ess_refset_repr(): ) +class FunctionOrError: + """Callable that raises an error every nth invocation.""" + + def __init__(self, fun, error_frequency=100): + self.counter = 0 + self.error_frequency = error_frequency + self.fun = fun + + def __call__(self, *args, **kwargs): + self.counter += 1 + if self.counter % self.error_frequency == 0: + raise RuntimeError("Intentional error.") + return self.fun(*args, **kwargs) + + +def test_sacess_worker_error(capsys): + """Check that SacessOptimizer does not hang if an error occurs on a worker.""" + objective = Objective( + fun=FunctionOrError(sp.optimize.rosen), grad=sp.optimize.rosen_der + ) + problem = pypesto.Problem( + objective=objective, lb=0 * np.ones((1, 2)), ub=1 * np.ones((1, 2)) + ) + sacess = SacessOptimizer( + num_workers=2, + max_walltime_s=2, + sacess_loglevel=logging.DEBUG, + ess_loglevel=logging.DEBUG, + ) + res = sacess.minimize(problem) + assert isinstance(res, pypesto.Result) + assert "Intentional error." in capsys.readouterr().err + + def test_scipy_integrated_grad(): integrated = True obj = rosen_for_sensi(max_sensi_order=2, integrated=integrated)["obj"] From d39b959fef3a08746b9718ef4de1a7a37a471b09 Mon Sep 17 00:00:00 2001 From: Daniel Weindl Date: Wed, 27 Nov 2024 11:28:25 +0100 Subject: [PATCH 3/3] CI: don't cache amici; pytest ignore amici_models; passenv BNGPATH (#1523) * Don't cache the amici installation. We are using amici/develop on purpose to detect problems early on. E.g., #1520 should have been detected much earlier. * Ignore `amici_models` when collecting tests * Use external BNGPATH if set --- .github/workflows/ci.yml | 2 +- pytest.ini | 1 + tox.ini | 10 +++++++--- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4db56ab55..f55f8f6b5 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -157,7 +157,7 @@ jobs: - name: Run tests timeout-minutes: 35 - run: tox -e petab + run: tox -e petab && tox e -e petab -- pip uninstall -y amici env: CC: clang CXX: clang++ diff --git a/pytest.ini b/pytest.ini index c852c729e..82ae58578 100644 --- a/pytest.ini +++ b/pytest.ini @@ -2,3 +2,4 @@ addopts = "--doctest-modules" filterwarnings = ignore:.*inspect.getargspec\(\) is deprecated.*:DeprecationWarning +norecursedirs = amici_models diff --git a/tox.ini b/tox.ini index e9193e0ad..d1937e4ee 100644 --- a/tox.ini +++ b/tox.ini @@ -29,7 +29,7 @@ envlist = # Base-environment [testenv] -passenv = AMICI_PARALLEL_COMPILE,CC,CXX,MPLBACKEND +passenv = AMICI_PARALLEL_COMPILE,CC,CXX,MPLBACKEND,BNGPATH # Sub-environments # inherit settings defined in the base @@ -75,10 +75,14 @@ description = Test basic functionality on Windows [testenv:petab] -extras = test,amici,petab,pyswarm,roadrunner +extras = test,petab,pyswarm,roadrunner deps = git+https://github.com/Benchmarking-Initiative/Benchmark-Models-PEtab.git@master\#subdirectory=src/python - git+https://github.com/AMICI-dev/amici.git@develop\#egg=amici&subdirectory=python/sdist +# always install amici from develop branch, avoid caching +# to skip re-installation, run `tox -e petab --override testenv:petab.commands_pre=` +commands_pre = + python3 -m pip uninstall -y amici + python3 -m pip install git+https://github.com/AMICI-dev/amici.git@develop\#egg=amici&subdirectory=python/sdist commands = python3 -m pip install git+https://github.com/PEtab-dev/petab_test_suite@main python3 -m pip install git+https://github.com/pysb/pysb@master