From 255fa3574f8c6286fdf5e4a2c1fc08e88a957b6a Mon Sep 17 00:00:00 2001 From: Oskar Taubert <oskar.taubert@kit.edu> Date: Fri, 10 May 2024 00:42:47 +0200 Subject: [PATCH 01/21] removed pickle based checkpointing --- propulate/_globals.py | 1 - propulate/migrator.py | 17 ----------- propulate/pollinator.py | 15 --------- propulate/propulator.py | 67 +---------------------------------------- 4 files changed, 1 insertion(+), 99 deletions(-) diff --git a/propulate/_globals.py b/propulate/_globals.py index 43e38374..ba04846b 100644 --- a/propulate/_globals.py +++ b/propulate/_globals.py @@ -2,5 +2,4 @@ SYNCHRONIZATION_TAG = 2 INIT_TAG = 3 POPULATION_TAG = 4 -DUMP_TAG = 5 MIGRATION_TAG = 6 diff --git a/propulate/migrator.py b/propulate/migrator.py index 0442a21b..b630b0a7 100644 --- a/propulate/migrator.py +++ b/propulate/migrator.py @@ -428,7 +428,6 @@ def propulate(self, logging_interval: int = 10, debug: int = 1) -> None: if self.island_comm.rank == 0: log.info(f"Island {self.island_idx} has {self.island_comm.size} workers.") - dump = True if self.island_comm.rank == 0 else False migration = True if self.migration_prob > 0 else False self.propulate_comm.barrier() @@ -460,13 +459,6 @@ def propulate(self, logging_interval: int = 10, debug: int = 1) -> None: if debug == 2: check = self._check_emigrants_to_deactivate() assert check is False - - if dump: # Dump checkpoint. - self._dump_checkpoint() - - dump = ( - self._determine_worker_dumping_next() - ) # Determine worker dumping checkpoint in the next generation. self.generation += 1 # Go to next generation. # Having completed all generations, the workers have to wait for each other. @@ -507,12 +499,3 @@ def propulate(self, logging_interval: int = 10, debug: int = 1) -> None: raise ValueError( "There should not be any individuals left that need to be deactivated." ) - - self.propulate_comm.barrier() - - # Final checkpointing on rank 0. - if self.island_comm.rank == 0: - self._dump_final_checkpoint() # Dump checkpoint. - self.propulate_comm.barrier() - _ = self._determine_worker_dumping_next() - self.propulate_comm.barrier() diff --git a/propulate/pollinator.py b/propulate/pollinator.py index 67e3ab27..2e338023 100644 --- a/propulate/pollinator.py +++ b/propulate/pollinator.py @@ -411,7 +411,6 @@ def propulate(self, logging_interval: int = 10, debug: int = 1) -> None: if self.island_comm.rank == 0: log.info(f"Island {self.island_idx} has {self.island_comm.size} workers.") - dump = True if self.island_comm.rank == 0 else False migration = True if self.migration_prob > 0 else False self.propulate_comm.barrier() @@ -440,12 +439,6 @@ def propulate(self, logging_interval: int = 10, debug: int = 1) -> None: # Immigration: Check for individuals replaced by other intra-island workers to be deactivated. self._deactivate_replaced_individuals() - if dump: # Dump checkpoint. - self._dump_checkpoint() - - dump = ( - self._determine_worker_dumping_next() - ) # Determine worker dumping checkpoint in the next generation. self.generation += 1 # Go to next generation. # Having completed all generations, the workers have to wait for each other. @@ -477,11 +470,3 @@ def propulate(self, logging_interval: int = 10, debug: int = 1) -> None: f"Finally {len(self.replaced)} individual(s) in replaced: {self.replaced}:\n{self.population}" ) self._deactivate_replaced_individuals() - self.propulate_comm.barrier() - - # Final checkpointing on rank 0. - if self.island_comm.rank == 0: - self._dump_final_checkpoint() # Dump checkpoint. - self.propulate_comm.barrier() - _ = self._determine_worker_dumping_next() - self.propulate_comm.barrier() diff --git a/propulate/propulator.py b/propulate/propulator.py index 1a9f0531..a30f938f 100644 --- a/propulate/propulator.py +++ b/propulate/propulator.py @@ -13,7 +13,7 @@ import numpy as np from mpi4py import MPI -from ._globals import DUMP_TAG, INDIVIDUAL_TAG +from ._globals import INDIVIDUAL_TAG from .population import Individual from .propagators import Propagator, SelectMin from .surrogate import Surrogate @@ -481,9 +481,6 @@ def propulate(self, logging_interval: int = 10, debug: int = -1) -> None: if self.island_comm.rank == 0: log.info(f"Island {self.island_idx} has {self.island_comm.size} workers.") - dump = True if self.island_comm.rank == 0 else False - self.propulate_comm.barrier() - # Loop over generations. while self.generations <= -1 or self.generation < self.generations: if self.generation % int(logging_interval) == 0: @@ -497,13 +494,6 @@ def propulate(self, logging_interval: int = 10, debug: int = -1) -> None: # Check for and possibly receive incoming individuals from other intra-island workers. self._receive_intra_island_individuals() - if dump: # Dump checkpoint. - self._dump_checkpoint() - - dump = ( - self._determine_worker_dumping_next() - ) # Determine worker dumping checkpoint in the next generation. - # Go to next generation. self.generation += 1 @@ -520,61 +510,6 @@ def propulate(self, logging_interval: int = 10, debug: int = -1) -> None: self._receive_intra_island_individuals() self.propulate_comm.barrier() - # Final checkpointing on rank 0. - if self.island_comm.rank == 0: - self._dump_final_checkpoint() # Dump checkpoint. - self.propulate_comm.barrier() - _ = self._determine_worker_dumping_next() - self.propulate_comm.barrier() - - def _dump_checkpoint(self) -> None: - """Dump checkpoint to file.""" - log.debug( - f"Island {self.island_idx} Worker {self.island_comm.rank} Generation {self.generation}: " - f"Dumping checkpoint..." - ) - save_ckpt_file = self.checkpoint_path / f"island_{self.island_idx}_ckpt.pickle" - if os.path.isfile(save_ckpt_file): - try: - os.replace(save_ckpt_file, save_ckpt_file.with_suffix(".bkp")) - except OSError as e: - log.warning(e) - with open(save_ckpt_file, "wb") as f: - pickle.dump(self.population, f) - - dest = ( - self.island_comm.rank + 1 - if self.island_comm.rank + 1 < self.island_comm.size - else 0 - ) - self.island_comm.send(True, dest=dest, tag=DUMP_TAG) - - def _determine_worker_dumping_next(self) -> bool: - """Determine the worker who dumps the checkpoint in the next generation.""" - dump = False - stat = MPI.Status() - probe_dump = self.island_comm.iprobe( - source=MPI.ANY_SOURCE, tag=DUMP_TAG, status=stat - ) - if probe_dump: - dump = self.island_comm.recv(source=stat.Get_source(), tag=DUMP_TAG) - log.debug( - f"Island {self.island_idx} Worker {self.island_comm.rank} Generation {self.generation}: " - f"Going to dump next: {dump}. Before: Worker {stat.Get_source()}" - ) - return dump - - def _dump_final_checkpoint(self) -> None: - """Dump final checkpoint.""" - save_ckpt_file = self.checkpoint_path / f"island_{self.island_idx}_ckpt.pickle" - if os.path.isfile(save_ckpt_file): - try: - os.replace(save_ckpt_file, save_ckpt_file.with_suffix(".bkp")) - except OSError as e: - log.warning(e) - with open(save_ckpt_file, "wb") as f: - pickle.dump(self.population, f) - def _check_for_duplicates( self, active: bool, debug: int = 1 ) -> Tuple[List[List[Union[Individual, int]]], List[Individual]]: From ad2edaac2bc0b902da1bf8756c810406debe1508 Mon Sep 17 00:00:00 2001 From: Oskar Taubert <oskar.taubert@kit.edu> Date: Fri, 10 May 2024 01:14:52 +0200 Subject: [PATCH 02/21] basic checkpointing for just the propulator --- propulate/propulator.py | 152 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 151 insertions(+), 1 deletion(-) diff --git a/propulate/propulator.py b/propulate/propulator.py index a30f938f..bc16689b 100644 --- a/propulate/propulator.py +++ b/propulate/propulator.py @@ -10,12 +10,13 @@ from typing import Callable, Final, Generator, List, Optional, Tuple, Type, Union import deepdiff +import h5py import numpy as np from mpi4py import MPI from ._globals import INDIVIDUAL_TAG from .population import Individual -from .propagators import Propagator, SelectMin +from .propagators import BasicPSO, Propagator, SelectMin from .surrogate import Surrogate log = logging.getLogger(__name__) # Get logger instance. @@ -208,6 +209,155 @@ def __init__( "No valid checkpoint file given. Initializing population randomly..." ) + def load_checkpoint(self): + """Load checkpoint from HDF5 file. Since this is only a read, all workers can do this in read-only mode without the mpio driver.""" + # TODO check that the island and worker setup is the same as in the checkpoint + # TODO load the migrated individuals from the other islands checkpoints + # NOTE each individual is only stored once at the position given by its origin island and worker, the modifications have to be put in the checkpoint file during migration TODO test if this works as intended reliably + # TODO get the started but not yet completed ones from the difference in start time and evaltime + + with h5py.File(self.checkpoint_path, "r", driver=None) as f: + group = f[f"{self.island_idx}"] + self.generation = group[f"{self.comm.Get_rank()}"].attrs["generation"] + for rank in range(self.comm.size): + # for generation in range(len(group[f"{rank}"])): + for generation in range(self.generation): + if group[f"{rank}"]["current"][generation] == self.island_idx: + ind = Individual( + group[f"{rank}"]["x"][generation, 0], + self.propagator.limits, + ) + ind.rank = rank + ind.island = self.island_idx + ind.current = group[f"{rank}"]["current"][generation] + # TODO velocity loading + # if len(group[f"{rank}"].shape) > 1: + # ind.velocity = group[f"{rank}"]["x"][generation, 1] + ind.loss = group[f"{rank}"]["loss"][generation] + ind.startime = group[f"{rank}"]["starttime"][generation] + ind.evaltime = group[f"{rank}"]["evaltime"][generation] + ind.evalperiod = group[f"{rank}"]["evalperiod"][generation] + ind.generation = generation + self.population.append(ind) + + def set_up_checkpoint(self): + """Initialize checkpoint file.""" + limit_dim = 0 + for key in self.propagator.limits: + if isinstance(self.propagator.limits[key][0], str): + limit_dim += len(self.propagator.limits[key]) + else: + limit_dim += 1 + + num_islands = 1 + if self.island_counts is not None: + num_islands = len(self.island_counts) + + with h5py.File( + self.checkpoint_path, "a", driver="mpio", comm=MPI.COMM_WORLD + ) as f: + # limits + limitsgroup = f.require_group("limits") + for key in self.propagator.limits: + if key not in limitsgroup.attrs: + limitsgroup.attrs[key] = str(self.propagator.limits[key]) + else: + if not str(self.propagator.limits[key]) == limitsgroup.attrs[key]: + raise RuntimeError("Limits inconsistent with checkpoint") + + xdim = 1 + if isinstance(self.propagator, BasicPSO): + xdim = 2 + + oldgenerations = self.generations + if "0" in f: + oldgenerations = f["0"]["0"]["x"].shape[0] + + # population + for i in range(num_islands): + f.require_group(f"{i}") + for worker_idx in range(self.comm.Get_size()): + group = f[f"{i}"].require_group(f"{worker_idx}") + if oldgenerations < self.generations: + group["x"].resize(self.generations, axis=0) + group["loss"].resize(self.generations, axis=0) + group["active"].resize(self.generations, axis=0) + group["current"].resize(self.generations, axis=0) + group["migration_steps"].resize(self.generations, axis=0) + group["starttime"].resize(self.generations, axis=0) + group["evaltime"].resize(self.generations, axis=0) + group["evalperiod"].resize(self.generations, axis=0) + + group.require_dataset( + "x", + (self.generations, xdim, limit_dim), + dtype=np.float32, + chunks=True, + maxshape=(None, xdim, limit_dim), + ) + group.require_dataset( + "loss", + (self.generations,), + np.float32, + chunks=True, + maxshape=(None,), + ) + group.require_dataset( + "active", + (self.generations,), + np.bool_, + chunks=True, + maxshape=(None,), + ) + group.require_dataset( + "current", + (self.generations,), + np.int16, + chunks=True, + maxshape=(None,), + ) + group.require_dataset( + "migration_steps", + (self.generations,), + np.int32, + chunks=True, + maxshape=(None,), + ) + group.require_dataset( + "starttime", + (self.generations,), + np.uint64, + chunks=True, + maxshape=(None,), + ) + group.require_dataset( + "evaltime", + (self.generations,), + np.uint64, + chunks=True, + maxshape=(None,), + ) + group.require_dataset( + "evalperiod", + (self.generations,), + np.uint64, + chunks=True, + maxshape=(None,), + ) + + def propulate(self, logging_interval: int = 10, debug: int = 1) -> None: + """ + Run asynchronous evolutionary optimization routine. + + Parameters + ---------- + logging_interval : int, optional + Print each worker's progress every ``logging_interval``-th generation. Default is 10. + debug : int, optional + The debug level; 0 - silent; 1 - moderate, 2 - noisy (debug mode). Default is 1. + """ + self._work(logging_interval, debug) + def _get_active_individuals(self) -> Tuple[List[Individual], int]: """ Get active individuals in current population list. From 849b12627fc844914ced48a1913fc915b7069f23 Mon Sep 17 00:00:00 2001 From: Oskar Taubert <oskar.taubert@kit.edu> Date: Sat, 11 May 2024 18:38:32 +0200 Subject: [PATCH 03/21] added limits member to the propagator at the top of the hierarchy as a workaround --- propulate/propagators/base.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/propulate/propagators/base.py b/propulate/propagators/base.py index f56b7796..e22903aa 100644 --- a/propulate/propagators/base.py +++ b/propulate/propagators/base.py @@ -177,6 +177,7 @@ class Conditional(Propagator): def __init__( self, + limits: Dict, pop_size: int, true_prop: Propagator, false_prop: Propagator, @@ -203,6 +204,7 @@ def __init__( self.pop_size = pop_size self.true_prop = true_prop self.false_prop = false_prop + self.limits = limits def __call__(self, inds: List[Individual]) -> Union[List[Individual], Individual]: """ From a91bfb1a2bd3161f3067330473c97990674edaa5 Mon Sep 17 00:00:00 2001 From: Oskar Taubert <oskar.taubert@kit.edu> Date: Sat, 11 May 2024 18:39:03 +0200 Subject: [PATCH 04/21] added limits member to the cmaes propagator --- propulate/propagators/cmaes.py | 1 + 1 file changed, 1 insertion(+) diff --git a/propulate/propagators/cmaes.py b/propulate/propagators/cmaes.py index a81d76ea..3acbfd87 100644 --- a/propulate/propagators/cmaes.py +++ b/propulate/propagators/cmaes.py @@ -751,6 +751,7 @@ def __init__( The separate random number generator for the Propulate optimization. """ self.adapter = adapter + self.limits = limits problem_dimension = len(limits) # Number of individuals considered for each generation lambd = ( From 620f5da6ddd8da29816d27818679c1517ca46248 Mon Sep 17 00:00:00 2001 From: Oskar Taubert <oskar.taubert@kit.edu> Date: Sat, 11 May 2024 18:41:36 +0200 Subject: [PATCH 05/21] updated the signature of the propagator in the propagator factory --- propulate/utils/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/propulate/utils/__init__.py b/propulate/utils/__init__.py index a37942a7..a911c5de 100644 --- a/propulate/utils/__init__.py +++ b/propulate/utils/__init__.py @@ -92,7 +92,7 @@ def get_default_propagator( init = InitUniform(limits, rng=rng) propagator = Conditional( - pop_size, propagator, init + limits, pop_size, propagator, init ) # Initialize random if population size < specified `pop_size`. return propagator From 804ee7136c1c3c795687ced8c5e37b21d8972040 Mon Sep 17 00:00:00 2001 From: Oskar Taubert <oskar.taubert@kit.edu> Date: Sun, 12 May 2024 22:25:28 +0200 Subject: [PATCH 06/21] updated logging for tests and added pso checkpointing test --- tests/test_island.py | 6 ++-- tests/test_multi_rank_workers.py | 3 +- tests/test_propulator.py | 8 +++-- tests/test_pso.py | 61 ++++++++++++++++++++++++++++++-- 4 files changed, 68 insertions(+), 10 deletions(-) diff --git a/tests/test_island.py b/tests/test_island.py index 720c711e..6aeec7ff 100644 --- a/tests/test_island.py +++ b/tests/test_island.py @@ -13,6 +13,8 @@ from propulate.utils import get_default_propagator, set_logger_config from propulate.utils.benchmark_functions import get_function_search_space +set_logger_config() + @pytest.fixture(scope="module") def global_variables() -> Tuple[random.Random, Callable, Dict, Propagator]: @@ -63,7 +65,6 @@ def test_islands( The temporary checkpoint directory. """ rng, benchmark_function, limits, propagator = global_variables - set_logger_config(log_file=mpi_tmp_path / "log.log") # Set up island model. islands = Islands( @@ -102,7 +103,6 @@ def test_checkpointing_isolated( The temporary checkpoint directory. """ rng, benchmark_function, limits, propagator = global_variables - set_logger_config(log_file=mpi_tmp_path / "log.log") # Set up island model. islands = Islands( @@ -163,7 +163,6 @@ def test_checkpointing( The temporary checkpoint directory. """ rng, benchmark_function, limits, propagator = global_variables - set_logger_config(log_file=mpi_tmp_path / "log.log") # Set up island model. islands = Islands( @@ -226,7 +225,6 @@ def test_checkpointing_unequal_populations( The temporary checkpoint directory. """ rng, benchmark_function, limits, propagator = global_variables - set_logger_config(log_file=mpi_tmp_path / "log.log") # Set up island model. islands = Islands( diff --git a/tests/test_multi_rank_workers.py b/tests/test_multi_rank_workers.py index 7bb99aa0..677b7a82 100644 --- a/tests/test_multi_rank_workers.py +++ b/tests/test_multi_rank_workers.py @@ -9,6 +9,8 @@ from propulate import Islands from propulate.utils import get_default_propagator, set_logger_config +set_logger_config() + def parallel_sphere(params: Dict[str, float], comm: MPI.Comm = MPI.COMM_SELF) -> float: """ @@ -51,7 +53,6 @@ def test_multi_rank_workers(mpi_tmp_path: pathlib.Path) -> None: The temporary checkpoint directory. """ full_world_comm = MPI.COMM_WORLD # Get full world communicator. - set_logger_config(log_file=mpi_tmp_path / "log.log") rng = random.Random(42 + full_world_comm.rank) limits = { diff --git a/tests/test_propulator.py b/tests/test_propulator.py index fc9f6ec3..fc02f882 100644 --- a/tests/test_propulator.py +++ b/tests/test_propulator.py @@ -10,6 +10,8 @@ from propulate.utils import get_default_propagator, set_logger_config from propulate.utils.benchmark_functions import get_function_search_space +set_logger_config() + @pytest.fixture( params=[ @@ -51,7 +53,6 @@ def test_propulator(function_name: str, mpi_tmp_path: pathlib.Path) -> None: 42 + MPI.COMM_WORLD.rank ) # Random number generator for optimization benchmark_function, limits = get_function_search_space(function_name) - set_logger_config(log_file=mpi_tmp_path / "log.log") propagator = get_default_propagator( pop_size=4, limits=limits, @@ -91,7 +92,7 @@ def test_propulator_checkpointing(mpi_tmp_path: pathlib.Path) -> None: propulator = Propulator( loss_fn=benchmark_function, propagator=propagator, - generations=100, + generations=10, checkpoint_path=mpi_tmp_path, rng=rng, ) # Set up propulator performing actual optimization. @@ -118,3 +119,6 @@ def test_propulator_checkpointing(mpi_tmp_path: pathlib.Path) -> None: len(deepdiff.DeepDiff(old_population, propulator.population, ignore_order=True)) == 0 ) + + +# TODO test loading a checkpoint with an unevaluated individual diff --git a/tests/test_pso.py b/tests/test_pso.py index 6a10adec..081a0ab6 100644 --- a/tests/test_pso.py +++ b/tests/test_pso.py @@ -1,6 +1,8 @@ +import copy import pathlib import random +import deepdiff import pytest from mpi4py import MPI @@ -59,7 +61,7 @@ def pso_propagator(request: pytest.FixtureRequest) -> Propagator: @pytest.mark.mpi def test_pso(pso_propagator: Propagator, mpi_tmp_path: pathlib.Path) -> None: """ - Test single worker using Propulator to optimize a benchmark function using the default genetic propagator. + Test a pso propagator. Parameters ---------- @@ -68,9 +70,9 @@ def test_pso(pso_propagator: Propagator, mpi_tmp_path: pathlib.Path) -> None: mpi_tmp_path : pathlib.Path The temporary checkpoint directory. """ - # Set up evolutionary operator. + # Set up pso propagator. init = InitUniformPSO(limits, rng=rng, rank=rank) - propagator = Conditional(1, pso_propagator, init) + propagator = Conditional(limits, 1, pso_propagator, init) # Set up propulator performing actual optimization. propulator = Propulator( @@ -83,3 +85,56 @@ def test_pso(pso_propagator: Propagator, mpi_tmp_path: pathlib.Path) -> None: # Run optimization and print summary of results. propulator.propulate() + + +@pytest.mark.mpi +def test_pso_checkpointing(pso_propagator, mpi_tmp_path: pathlib.Path): + """ + Test velocity checkpointing when using a PSO propagator. + + Parameters + ---------- + pso_propagator : BasicPSO + The PSO propagator variant to test. + mpi_tmp_path : pathlib.Path + The temporary checkpoint directory. + """ + # Set up pso propagator. + init = InitUniformPSO(limits, rng=rng, rank=rank) + propagator = Conditional(limits, 1, pso_propagator, init) + + # Set up propulator performing actual optimization. + propulator = Propulator( + loss_fn=sphere, + propagator=propagator, + rng=rng, + generations=100, + checkpoint_path=mpi_tmp_path, + ) + + # Run optimization and print summary of results. + propulator.propulate() + + old_population = copy.deepcopy( + propulator.population + ) # Save population list from the last run. + del propulator # Delete propulator object. + MPI.COMM_WORLD.barrier() # Synchronize all processes. + + propulator = Propulator( + loss_fn=sphere, + propagator=propagator, + generations=20, + checkpoint_path=mpi_tmp_path, + rng=rng, + ) # Set up new propulator starting from checkpoint. + + # As the number of requested generations is smaller than the number of generations from the run before, + # no new evaluations are performed. Thus, the length of both Propulators' populations must be equal. + assert ( + len(deepdiff.DeepDiff(old_population, propulator.population, ignore_order=True)) + == 0 + ) + + +# TODO test resuming pso run from a non-pso checkpoint From 0e448fb9d8a5d0cd7111b33808c19470da6bf5ba Mon Sep 17 00:00:00 2001 From: Oskar Taubert <oskar.taubert@kit.edu> Date: Sun, 12 May 2024 23:34:19 +0200 Subject: [PATCH 07/21] fixed logging output during tests and added a surrogate checkpointing test stub --- tests/test_cmaes.py | 6 ++ tests/test_island.py | 11 +++- tests/test_multi_rank_workers.py | 5 +- tests/test_nm.py | 6 ++ tests/test_propulator.py | 11 +++- tests/test_pso.py | 7 ++ tests/test_surrogate.py | 108 +++++++++++++++++++++++++++++++ 7 files changed, 149 insertions(+), 5 deletions(-) diff --git a/tests/test_cmaes.py b/tests/test_cmaes.py index b227add3..63e85be0 100644 --- a/tests/test_cmaes.py +++ b/tests/test_cmaes.py @@ -1,3 +1,4 @@ +import logging import pathlib import random @@ -5,8 +6,11 @@ from propulate import Propulator from propulate.propagators import ActiveCMA, BasicCMA, CMAAdapter, CMAPropagator +from propulate.utils import set_logger_config from propulate.utils.benchmark_functions import get_function_search_space +log = logging.getLogger("propulate") # Get logger instance. + @pytest.fixture(params=[BasicCMA(), ActiveCMA()]) def cma_adapter(request: pytest.FixtureRequest) -> CMAAdapter: @@ -27,6 +31,7 @@ def test_cmaes_basic(cma_adapter: CMAAdapter, mpi_tmp_path: pathlib.Path) -> Non mpi_tmp_path : pathlib.Path The temporary checkpoint directory. """ + set_logger_config() rng = random.Random(42) # Separate random number generator for optimization. benchmark_function, limits = get_function_search_space("sphere") # Set up evolutionary operator. @@ -43,3 +48,4 @@ def test_cmaes_basic(cma_adapter: CMAAdapter, mpi_tmp_path: pathlib.Path) -> Non ) # Run optimization and print summary of results. propulator.propulate() + log.handlers.clear() diff --git a/tests/test_island.py b/tests/test_island.py index 6aeec7ff..62a03b20 100644 --- a/tests/test_island.py +++ b/tests/test_island.py @@ -1,4 +1,5 @@ import copy +import logging import pathlib import random from typing import Callable, Dict, Tuple @@ -13,7 +14,7 @@ from propulate.utils import get_default_propagator, set_logger_config from propulate.utils.benchmark_functions import get_function_search_space -set_logger_config() +log = logging.getLogger("propulate") # Get logger instance. @pytest.fixture(scope="module") @@ -64,6 +65,7 @@ def test_islands( mpi_tmp_path : pathlib.Path The temporary checkpoint directory. """ + set_logger_config() rng, benchmark_function, limits, propagator = global_variables # Set up island model. @@ -83,6 +85,7 @@ def test_islands( debug=2, ) islands.summarize(debug=2) + log.handlers.clear() @pytest.mark.mpi(min_size=4) @@ -102,6 +105,7 @@ def test_checkpointing_isolated( mpi_tmp_path : pathlib.Path The temporary checkpoint directory. """ + set_logger_config() rng, benchmark_function, limits, propagator = global_variables # Set up island model. @@ -140,6 +144,7 @@ def test_checkpointing_isolated( ) == 0 ) + log.handlers.clear() @pytest.mark.mpi(min_size=4) @@ -162,6 +167,7 @@ def test_checkpointing( mpi_tmp_path : pathlib.Path The temporary checkpoint directory. """ + set_logger_config() rng, benchmark_function, limits, propagator = global_variables # Set up island model. @@ -202,6 +208,7 @@ def test_checkpointing( ) == 0 ) + log.handlers.clear() @pytest.mark.mpi(min_size=8) @@ -224,6 +231,7 @@ def test_checkpointing_unequal_populations( mpi_tmp_path : pathlib.Path The temporary checkpoint directory. """ + set_logger_config() rng, benchmark_function, limits, propagator = global_variables # Set up island model. @@ -266,3 +274,4 @@ def test_checkpointing_unequal_populations( ) == 0 ) + log.handlers.clear() diff --git a/tests/test_multi_rank_workers.py b/tests/test_multi_rank_workers.py index 677b7a82..35083dfe 100644 --- a/tests/test_multi_rank_workers.py +++ b/tests/test_multi_rank_workers.py @@ -1,3 +1,4 @@ +import logging import pathlib import random from typing import Dict @@ -9,7 +10,7 @@ from propulate import Islands from propulate.utils import get_default_propagator, set_logger_config -set_logger_config() +log = logging.getLogger("propulate") # Get logger instance. def parallel_sphere(params: Dict[str, float], comm: MPI.Comm = MPI.COMM_SELF) -> float: @@ -52,6 +53,7 @@ def test_multi_rank_workers(mpi_tmp_path: pathlib.Path) -> None: mpi_tmp_path : pathlib.Path The temporary checkpoint directory. """ + set_logger_config() full_world_comm = MPI.COMM_WORLD # Get full world communicator. rng = random.Random(42 + full_world_comm.rank) @@ -85,3 +87,4 @@ def test_multi_rank_workers(mpi_tmp_path: pathlib.Path) -> None: debug=1, # Debug level ) islands.summarize(top_n=1, debug=1) + log.handlers.clear() diff --git a/tests/test_nm.py b/tests/test_nm.py index e8336912..555e0b75 100644 --- a/tests/test_nm.py +++ b/tests/test_nm.py @@ -1,3 +1,4 @@ +import logging import pathlib import random from typing import Tuple @@ -7,8 +8,11 @@ from propulate import Propulator from propulate.propagators import ParallelNelderMead +from propulate.utils import set_logger_config from propulate.utils.benchmark_functions import get_function_search_space +log = logging.getLogger("propulate") # Get logger instance. + @pytest.fixture( params=[ @@ -48,6 +52,7 @@ def test_cmaes( mpi_tmp_path : pathlib.Path The temporary checkpoint directory. """ + set_logger_config() rng = random.Random(42) # Separate random number generator for optimization. function, limits = get_function_search_space(function_parameters[0]) # Set up evolutionary operator. @@ -67,3 +72,4 @@ def test_cmaes( # Run optimization and print summary of results. propulator.propulate() propulator.summarize() + log.handlers.clear() diff --git a/tests/test_propulator.py b/tests/test_propulator.py index fc02f882..e57a4ad2 100644 --- a/tests/test_propulator.py +++ b/tests/test_propulator.py @@ -1,4 +1,5 @@ import copy +import logging import pathlib import random @@ -10,7 +11,7 @@ from propulate.utils import get_default_propagator, set_logger_config from propulate.utils.benchmark_functions import get_function_search_space -set_logger_config() +log = logging.getLogger("propulate") # Get logger instance. @pytest.fixture( @@ -49,6 +50,7 @@ def test_propulator(function_name: str, mpi_tmp_path: pathlib.Path) -> None: mpi_tmp_path : pathlib.Path The temporary checkpoint directory. """ + set_logger_config() rng = random.Random( 42 + MPI.COMM_WORLD.rank ) # Random number generator for optimization @@ -66,6 +68,7 @@ def test_propulator(function_name: str, mpi_tmp_path: pathlib.Path) -> None: checkpoint_path=mpi_tmp_path, ) # Set up propulator performing actual optimization. propulator.propulate() # Run optimization and print summary of results. + log.handlers.clear() def test_propulator_checkpointing(mpi_tmp_path: pathlib.Path) -> None: @@ -79,6 +82,7 @@ def test_propulator_checkpointing(mpi_tmp_path: pathlib.Path) -> None: mpi_tmp_path : pathlib.Path The temporary checkpoint directory. """ + set_logger_config() rng = random.Random( 42 + MPI.COMM_WORLD.rank ) # Separate random number generator for optimization @@ -92,7 +96,7 @@ def test_propulator_checkpointing(mpi_tmp_path: pathlib.Path) -> None: propulator = Propulator( loss_fn=benchmark_function, propagator=propagator, - generations=10, + generations=100, checkpoint_path=mpi_tmp_path, rng=rng, ) # Set up propulator performing actual optimization. @@ -108,7 +112,7 @@ def test_propulator_checkpointing(mpi_tmp_path: pathlib.Path) -> None: propulator = Propulator( loss_fn=benchmark_function, propagator=propagator, - generations=20, + generations=200, checkpoint_path=mpi_tmp_path, rng=rng, ) # Set up new propulator starting from checkpoint. @@ -119,6 +123,7 @@ def test_propulator_checkpointing(mpi_tmp_path: pathlib.Path) -> None: len(deepdiff.DeepDiff(old_population, propulator.population, ignore_order=True)) == 0 ) + log.handlers.clear() # TODO test loading a checkpoint with an unevaluated individual diff --git a/tests/test_pso.py b/tests/test_pso.py index 081a0ab6..4b156c6e 100644 --- a/tests/test_pso.py +++ b/tests/test_pso.py @@ -1,4 +1,5 @@ import copy +import logging import pathlib import random @@ -15,8 +16,10 @@ InitUniformPSO, VelocityClampingPSO, ) +from propulate.utils import set_logger_config from propulate.utils.benchmark_functions import get_function_search_space, sphere +log = logging.getLogger("propulate") # Get logger instance. limits = get_function_search_space("sphere")[1] rank = MPI.COMM_WORLD.rank rng = random.Random(42 + rank) @@ -70,6 +73,7 @@ def test_pso(pso_propagator: Propagator, mpi_tmp_path: pathlib.Path) -> None: mpi_tmp_path : pathlib.Path The temporary checkpoint directory. """ + set_logger_config() # Set up pso propagator. init = InitUniformPSO(limits, rng=rng, rank=rank) propagator = Conditional(limits, 1, pso_propagator, init) @@ -85,6 +89,7 @@ def test_pso(pso_propagator: Propagator, mpi_tmp_path: pathlib.Path) -> None: # Run optimization and print summary of results. propulator.propulate() + log.handlers.clear() @pytest.mark.mpi @@ -99,6 +104,7 @@ def test_pso_checkpointing(pso_propagator, mpi_tmp_path: pathlib.Path): mpi_tmp_path : pathlib.Path The temporary checkpoint directory. """ + set_logger_config() # Set up pso propagator. init = InitUniformPSO(limits, rng=rng, rank=rank) propagator = Conditional(limits, 1, pso_propagator, init) @@ -135,6 +141,7 @@ def test_pso_checkpointing(pso_propagator, mpi_tmp_path: pathlib.Path): len(deepdiff.DeepDiff(old_population, propulator.population, ignore_order=True)) == 0 ) + log.handlers.clear() # TODO test resuming pso run from a non-pso checkpoint diff --git a/tests/test_surrogate.py b/tests/test_surrogate.py index 7df0136d..4f79be81 100644 --- a/tests/test_surrogate.py +++ b/tests/test_surrogate.py @@ -41,8 +41,93 @@ def ind_loss(params: Dict[str, Union[int, float, str]]) -> Generator[float, None ) +<<<<<<< HEAD def test_static(mpi_tmp_path: Path) -> None: """Test static surrogate using a dummy function.""" +======= + activations = { + "relu": nn.ReLU, + "sigmoid": nn.Sigmoid, + "tanh": nn.Tanh, + } # Define activation function mapping. + activation = activations[activation] # Get activation function. + loss_fn = ( + torch.nn.CrossEntropyLoss() + ) # Use cross-entropy loss for multi-class classification. + + model = Net(conv_layers, activation, lr, loss_fn).to( + device + ) # Set up neural network with specified hyperparameters. + model.best_accuracy = 0.0 # Initialize the model's best validation accuracy. + + train_loader, val_loader = get_data_loaders( + batch_size=8, root=root + ) # Get training and validation data loaders. + + # Configure optimizer. + optimizer = model.configure_optimizers() + + for epoch in range(epochs): + model.train() + total_train_loss = 0 + # Training loop + for batch_idx, (data, target) in enumerate(train_loader): + data, target = data.to(device), target.to(device) + # Zero out gradients. + optimizer.zero_grad() + # Forward + backward pass and optimizer step to update parameters. + loss = model.training_step((data, target)) + loss.backward() + optimizer.step() + # Update loss. + total_train_loss += loss.item() + + avg_train_loss = total_train_loss / len(train_loader) + log.info(f"Epoch {epoch+1}: Avg Training Loss: {avg_train_loss}") + + # Validation loop + model.eval() + total_val_loss = 0 + with torch.no_grad(): + for batch_idx, (data, target) in enumerate(val_loader): + data, target = data.to(device), target.to(device) + # Forward pass + loss = model.validation_step((data, target)) + # Update loss. + total_val_loss += loss.item() + + avg_val_loss = total_val_loss / len(val_loader) + log.info(f"Epoch {epoch+1}: Avg Validation Loss: {avg_val_loss}") + + yield avg_val_loss + + +def set_seeds(seed_value: int = 42) -> None: + """ + Set seed for reproducibility. + + Parameters + ---------- + seed_value : int, optional + The seed to use. Default is 42. + """ + random.seed(seed_value) # Python random module + torch.manual_seed(seed_value) # PyTorch random number generator for CPU + torch.cuda.manual_seed(seed_value) # PyTorch random number generator for all GPUs + torch.cuda.manual_seed_all( + seed_value + ) # PyTorch random number generator for multi-GPU + torch.backends.cudnn.deterministic = True # Use deterministic algorithms. + torch.backends.cudnn.benchmark = False # Disable to be deterministic. + os.environ["PYTHONHASHSEED"] = str(seed_value) # Python hash seed + + +@pytest.mark.mpi(min_size=4) +def test_mnist_static(mpi_tmp_path): + """Test static surrogate using a torch convolutional network on the MNIST dataset.""" + set_logger_config() + num_generations = 3 # Number of generations +>>>>>>> fb69169 (fixed logging output during tests and added a surrogate checkpointing test stub) pop_size = 2 * MPI.COMM_WORLD.size # Breeding population size limits: Dict[str, Union[Tuple[int, int], Tuple[float, float], Tuple[str, ...]]] = { "start": (0.1, 7.0), @@ -109,15 +194,28 @@ def test_static_island(mpi_tmp_path: Path) -> None: ) islands.summarize(top_n=1, debug=2) MPI.COMM_WORLD.barrier() +<<<<<<< HEAD +======= + delattr(get_data_loaders, "barrier_called") + log.handlers.clear() +>>>>>>> fb69169 (fixed logging output during tests and added a surrogate checkpointing test stub) @pytest.mark.filterwarnings( "ignore::DeprecationWarning", match="Assigning the 'data' attribute is an inherently unsafe operation and will be removed in the future.", ) +<<<<<<< HEAD def test_dynamic(mpi_tmp_path: Path) -> None: """Test dynamic surrogate using a dummy function.""" num_generations = 10 # Number of generations +======= +@pytest.mark.mpi(min_size=4) +def test_mnist_dynamic(mpi_tmp_path): + """Test static surrogate using a torch convolutional network on the MNIST dataset.""" + set_logger_config() + num_generations = 3 # Number of generations +>>>>>>> fb69169 (fixed logging output during tests and added a surrogate checkpointing test stub) pop_size = 2 * MPI.COMM_WORLD.size # Breeding population size limits: Dict[str, Union[Tuple[int, int], Tuple[float, float], Tuple[str, ...]]] = { "start": (0.1, 7.0), @@ -145,4 +243,14 @@ def test_dynamic(mpi_tmp_path: Path) -> None: logging_interval=1, # Logging interval debug=2, # Verbosity level ) +<<<<<<< HEAD islands.summarize(top_n=1, debug=2) +======= + log.handlers.clear() + + +@pytest.mark.mpi(min_size=4) +def test_mnist_dynamic_checkpointing(mpi_tmp_path): + """Test whether the surrogate state for pruning is checkpointed correctly.""" + raise +>>>>>>> fb69169 (fixed logging output during tests and added a surrogate checkpointing test stub) From 4dd75d91dcc1fb72bbae1630ea31f2dcd2243214 Mon Sep 17 00:00:00 2001 From: Oskar Taubert <oskar.taubert@kit.edu> Date: Mon, 13 May 2024 00:32:21 +0200 Subject: [PATCH 08/21] the checkpoint test also resumes the run from the checkpoint --- tests/test_propulator.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/test_propulator.py b/tests/test_propulator.py index e57a4ad2..6178896d 100644 --- a/tests/test_propulator.py +++ b/tests/test_propulator.py @@ -123,6 +123,8 @@ def test_propulator_checkpointing(mpi_tmp_path: pathlib.Path) -> None: len(deepdiff.DeepDiff(old_population, propulator.population, ignore_order=True)) == 0 ) + + propulator.propulate() log.handlers.clear() From 63149d039cd37aa0762f3d0dca771d2d0fda30a8 Mon Sep 17 00:00:00 2001 From: Oskar Taubert <oskar.taubert@kit.edu> Date: Mon, 13 May 2024 00:33:23 +0200 Subject: [PATCH 09/21] added hdf5 checkpointing, tested only for single sequential worker --- propulate/propulator.py | 175 ++++++++++++++++++++++++---------------- 1 file changed, 105 insertions(+), 70 deletions(-) diff --git a/propulate/propulator.py b/propulate/propulator.py index bc16689b..075fd8a6 100644 --- a/propulate/propulator.py +++ b/propulate/propulator.py @@ -2,7 +2,6 @@ import inspect import logging import os -import pickle import random import time from operator import attrgetter @@ -16,7 +15,7 @@ from ._globals import INDIVIDUAL_TAG from .population import Individual -from .propagators import BasicPSO, Propagator, SelectMin +from .propagators import BasicPSO, Conditional, Propagator, SelectMin from .surrogate import Surrogate log = logging.getLogger(__name__) # Get logger instance. @@ -173,38 +172,22 @@ def __init__( self.rng = rng # Load initial population of evaluated individuals from checkpoint if exists. - load_ckpt_file = self.checkpoint_path / f"island_{self.island_idx}_ckpt.pickle" - if not os.path.isfile(load_ckpt_file): # If not exists, check for backup file. - load_ckpt_file = load_ckpt_file.with_suffix(".bkp") - - if os.path.isfile(load_ckpt_file): - with open(load_ckpt_file, "rb") as f: - try: - self.population = pickle.load(f) - self.generation = ( - max( - [ - x.generation - for x in self.population - if x.rank == self.island_comm.rank - ] - ) - + 1 - ) # Determine generation to be evaluated next from population checkpoint. - if self.island_comm.rank == 0: - log.info( - "Valid checkpoint file found. " - f"Resuming from generation {self.generation} of loaded population..." - ) - except OSError: - self.population = [] - if self.island_comm.rank == 0: - log.info( - "No valid checkpoint file. Initializing population randomly..." - ) + self.checkpoint_path = self.checkpoint_path / "ckpt.hdf5" + + self.population = [] + # consistency check and ensure enough space is allocated + self.set_up_checkpoint() + if os.path.isfile(self.checkpoint_path): + self.load_checkpoint() + if self.propulate_comm.rank == 0: + log.info( + "Valid checkpoint file found. " + f"Resuming from generation {self.generation} of loaded population..." + ) + # TODO it says resuming from generation 0, so something is not right + # TODO also each worker might be on a different generation so this message probably does not make all of the sense else: - self.population = [] - if self.island_comm.rank == 0: + if self.propulate_comm.rank == 0: log.info( "No valid checkpoint file given. Initializing population randomly..." ) @@ -212,36 +195,50 @@ def __init__( def load_checkpoint(self): """Load checkpoint from HDF5 file. Since this is only a read, all workers can do this in read-only mode without the mpio driver.""" # TODO check that the island and worker setup is the same as in the checkpoint - # TODO load the migrated individuals from the other islands checkpoints # NOTE each individual is only stored once at the position given by its origin island and worker, the modifications have to be put in the checkpoint file during migration TODO test if this works as intended reliably # TODO get the started but not yet completed ones from the difference in start time and evaltime with h5py.File(self.checkpoint_path, "r", driver=None) as f: - group = f[f"{self.island_idx}"] - self.generation = group[f"{self.comm.Get_rank()}"].attrs["generation"] - for rank in range(self.comm.size): - # for generation in range(len(group[f"{rank}"])): - for generation in range(self.generation): - if group[f"{rank}"]["current"][generation] == self.island_idx: + islandgroup = f[f"{self.island_idx}"] + + # NOTE check limits are consistent + limitsgroup = f["limits"] + for key in self.propagator.limits: + if set(limitsgroup.attrs.keys()) != set(self.propagator.limits): + raise RuntimeError("Limits inconsistent with checkpoint") + # TODO check island sizes are consistent + + self.generation = int(f["generations"][self.propulate_comm.Get_rank()]) + print("AAAAAA", self.generation) + # NOTE load individuals, since they might have migrated, every worker has to check each dataset + for rank in range(self.propulate_comm.size): + # for generation in range(len(islandgroup[f"{rank}"])): + for generation in range(f["generations"][rank]): + if islandgroup[f"{rank}"]["current"][generation] == self.island_idx: ind = Individual( - group[f"{rank}"]["x"][generation, 0], + islandgroup[f"{rank}"]["x"][generation, 0], self.propagator.limits, ) ind.rank = rank ind.island = self.island_idx - ind.current = group[f"{rank}"]["current"][generation] + ind.current = islandgroup[f"{rank}"]["current"][generation] # TODO velocity loading # if len(group[f"{rank}"].shape) > 1: - # ind.velocity = group[f"{rank}"]["x"][generation, 1] - ind.loss = group[f"{rank}"]["loss"][generation] - ind.startime = group[f"{rank}"]["starttime"][generation] - ind.evaltime = group[f"{rank}"]["evaltime"][generation] - ind.evalperiod = group[f"{rank}"]["evalperiod"][generation] + # ind.velocity = islandgroup[f"{rank}"]["x"][generation, 1] + ind.loss = islandgroup[f"{rank}"]["loss"][generation] + ind.startime = islandgroup[f"{rank}"]["starttime"][generation] + ind.evaltime = islandgroup[f"{rank}"]["evaltime"][generation] + ind.evalperiod = islandgroup[f"{rank}"]["evalperiod"][ + generation + ] ind.generation = generation self.population.append(ind) + if ind.loss is None: + # TODO resume evaluation + raise def set_up_checkpoint(self): - """Initialize checkpoint file.""" + """Initialize checkpoint file or check consistenct with an existing one.""" limit_dim = 0 for key in self.propagator.limits: if isinstance(self.propagator.limits[key][0], str): @@ -254,29 +251,36 @@ def set_up_checkpoint(self): num_islands = len(self.island_counts) with h5py.File( - self.checkpoint_path, "a", driver="mpio", comm=MPI.COMM_WORLD + self.checkpoint_path, "a", driver="mpio", comm=self.propulate_comm ) as f: # limits limitsgroup = f.require_group("limits") for key in self.propagator.limits: - if key not in limitsgroup.attrs: - limitsgroup.attrs[key] = str(self.propagator.limits[key]) - else: - if not str(self.propagator.limits[key]) == limitsgroup.attrs[key]: - raise RuntimeError("Limits inconsistent with checkpoint") + limitsgroup.attrs[key] = str(self.propagator.limits[key]) xdim = 1 - if isinstance(self.propagator, BasicPSO): + # TODO clean this up when reorganizing propagators + if isinstance(self.propagator, BasicPSO) or ( + isinstance(self.propagator, Conditional) + and isinstance(self.propagator.true_prop, BasicPSO) + ): xdim = 2 oldgenerations = self.generations if "0" in f: oldgenerations = f["0"]["0"]["x"].shape[0] + # Store per worker what generation they are at, since islands can be different sizes, it's flat + f.require_dataset( + "generations", + (self.propulate_comm.Get_size(),), + dtype=np.int32, + data=np.zeros((self.propulate_comm.Get_size(),), dtype=np.int32), + ) # population for i in range(num_islands): f.require_group(f"{i}") - for worker_idx in range(self.comm.Get_size()): + for worker_idx in range(self.propulate_comm.Get_size()): group = f[f"{i}"].require_group(f"{worker_idx}") if oldgenerations < self.generations: group["x"].resize(self.generations, axis=0) @@ -356,6 +360,8 @@ def propulate(self, logging_interval: int = 10, debug: int = 1) -> None: debug : int, optional The debug level; 0 - silent; 1 - moderate, 2 - noisy (debug mode). Default is 1. """ + # TODO note the propulating times in the checkpoint file for when the run is interrupted + self.start_time = time.time_ns() self._work(logging_interval, debug) def _get_active_individuals(self) -> Tuple[List[Individual], int]: @@ -409,11 +415,32 @@ def _breed(self) -> Individual: assert isinstance(ind, Individual) return ind # Return new individual. - def _evaluate_individual(self) -> None: + def _evaluate_individual(self, hdf5_checkpoint) -> None: """Breed and evaluate individual.""" ind = self._breed() # Breed new individual. - start_time = time.time() # Start evaluation timer. + start_time = time.time_ns() - self.start_time # Start evaluation timer. + ind.starttime = start_time + ckpt_idx = ind.generation + hdf5_checkpoint["generations"][self.propulate_comm.Get_rank()] = ind.generation + + group = hdf5_checkpoint[f"{self.island_idx}"][ + f"{self.propulate_comm.Get_rank()}" + ] + group.attrs["generation"] = ckpt_idx + 1 + # save candidate + group["x"][ckpt_idx, 0, :] = ind.position[:] + if ind.velocity is not None: + group["x"][ckpt_idx, 1, :] = ind.velocity[:] + group["starttime"][ckpt_idx] = start_time + + ind.loss = self.loss_fn(ind) # Evaluate its loss. + ind.evaltime = time.time_ns() - self.start_time # Stop evaluation timer. + ind.evalperiod = ind.evaltime - start_time # Calculate evaluation duration. + # save result for candidate + group["loss"][ckpt_idx] = ind.loss + group["evaltime"][ckpt_idx] = ind.evaltime + group["evalperiod"][ckpt_idx] = ind.evalperiod # Signal start of run to surrogate model. if self.surrogate is not None: self.surrogate.start_run(ind) @@ -622,7 +649,8 @@ def propulate(self, logging_interval: int = 10, debug: int = -1) -> None: if self.worker_sub_comm != MPI.COMM_SELF: self.generation = self.worker_sub_comm.bcast(self.generation, root=0) if self.propulate_comm is None: - while self.generations <= -1 or self.generation < self.generations: + while self.generation < self.generations: + raise # Breed and evaluate individual. self._evaluate_individual() self.generation += 1 @@ -632,20 +660,27 @@ def propulate(self, logging_interval: int = 10, debug: int = -1) -> None: log.info(f"Island {self.island_idx} has {self.island_comm.size} workers.") # Loop over generations. - while self.generations <= -1 or self.generation < self.generations: - if self.generation % int(logging_interval) == 0: - log.info( - f"Island {self.island_idx} Worker {self.island_comm.rank}: In generation {self.generation}..." - ) + with h5py.File( + self.checkpoint_path, "a", driver="mpio", comm=MPI.COMM_WORLD + ) as f: + while self.generation < self.generations: + if self.generation % int(logging_interval) == 0: + log.info( + f"Island {self.island_idx} Worker {self.island_comm.rank}: In generation {self.generation}..." + ) - # Breed and evaluate individual. - self._evaluate_individual() + # Breed and evaluate individual. + self._evaluate_individual(f) - # Check for and possibly receive incoming individuals from other intra-island workers. - self._receive_intra_island_individuals() + # Check for and possibly receive incoming individuals from other intra-island workers. + self._receive_intra_island_individuals() - # Go to next generation. - self.generation += 1 + # Go to next generation. + self.generation += 1 + islandgroup = f[f"{self.island_idx}"] + islandgroup[f"{self.propulate_comm.Get_rank()}"].attrs[ + "generation" + ] = self.generation # Having completed all generations, the workers have to wait for each other. # Once all workers are done, they should check for incoming messages once again From 4bf06bb39aa2bb4d8159cfbf255a5e630b36a40e Mon Sep 17 00:00:00 2001 From: Oskar Taubert <oskar.taubert@kit.edu> Date: Mon, 13 May 2024 00:40:25 +0200 Subject: [PATCH 10/21] removed debug print --- propulate/propulator.py | 1 - 1 file changed, 1 deletion(-) diff --git a/propulate/propulator.py b/propulate/propulator.py index 075fd8a6..bd707429 100644 --- a/propulate/propulator.py +++ b/propulate/propulator.py @@ -209,7 +209,6 @@ def load_checkpoint(self): # TODO check island sizes are consistent self.generation = int(f["generations"][self.propulate_comm.Get_rank()]) - print("AAAAAA", self.generation) # NOTE load individuals, since they might have migrated, every worker has to check each dataset for rank in range(self.propulate_comm.size): # for generation in range(len(islandgroup[f"{rank}"])): From 695d7f8a3c2c4b375c1ae84ae4c5b74538d2fa2e Mon Sep 17 00:00:00 2001 From: Oskar Taubert <oskar.taubert@kit.edu> Date: Mon, 13 May 2024 00:58:22 +0200 Subject: [PATCH 11/21] added checkpointing to island models --- propulate/migrator.py | 51 ++++++++++++++++++++++------------------- propulate/pollinator.py | 43 +++++++++++++++++++--------------- propulate/propulator.py | 2 +- 3 files changed, 53 insertions(+), 43 deletions(-) diff --git a/propulate/migrator.py b/propulate/migrator.py index b630b0a7..2cdb62ee 100644 --- a/propulate/migrator.py +++ b/propulate/migrator.py @@ -4,6 +4,7 @@ from pathlib import Path from typing import Callable, Generator, List, Optional, Type, Union +import h5py import numpy as np from mpi4py import MPI @@ -432,34 +433,38 @@ def propulate(self, logging_interval: int = 10, debug: int = 1) -> None: self.propulate_comm.barrier() # Loop over generations. - while self.generations <= -1 or self.generation < self.generations: - if self.generation % int(logging_interval) == 0: - log.info( - f"Island {self.island_idx} Worker {self.island_comm.rank}: In generation {self.generation}..." - ) + # TODO this should probably be refactored, checkpointing can probably be handled in one place + with h5py.File( + self.checkpoint_path, "a", driver="mpio", comm=MPI.COMM_WORLD + ) as f: + while self.generation < self.generations: + if self.generation % int(logging_interval) == 0: + log.info( + f"Island {self.island_idx} Worker {self.island_comm.rank}: In generation {self.generation}..." + ) - # Breed and evaluate individual. - self._evaluate_individual() + # Breed and evaluate individual. + self._evaluate_individual(f) - # Check for and possibly receive incoming individuals from other intra-island workers. - self._receive_intra_island_individuals() + # Check for and possibly receive incoming individuals from other intra-island workers. + self._receive_intra_island_individuals() - # Migration. - if migration: - # Emigration: Island sends individuals out. - # Happens on per-worker basis with certain probability. - if self.rng.random() < self.migration_prob: - self._send_emigrants() + # Migration. + if migration: + # Emigration: Island sends individuals out. + # Happens on per-worker basis with certain probability. + if self.rng.random() < self.migration_prob: + self._send_emigrants() - # Immigration: Check for incoming individuals from other islands. - self._receive_immigrants() + # Immigration: Check for incoming individuals from other islands. + self._receive_immigrants() - # Emigration: Check for emigrants from other intra-island workers to be deactivated. - self._deactivate_emigrants() - if debug == 2: - check = self._check_emigrants_to_deactivate() - assert check is False - self.generation += 1 # Go to next generation. + # Emigration: Check for emigrants from other intra-island workers to be deactivated. + self._deactivate_emigrants() + if debug == 2: + check = self._check_emigrants_to_deactivate() + assert check is False + self.generation += 1 # Go to next generation. # Having completed all generations, the workers have to wait for each other. # Once all workers are done, they should check for incoming messages once again diff --git a/propulate/pollinator.py b/propulate/pollinator.py index 2e338023..4d019c8f 100644 --- a/propulate/pollinator.py +++ b/propulate/pollinator.py @@ -4,6 +4,7 @@ from pathlib import Path from typing import Callable, Generator, List, Optional, Tuple, Type, Union +import h5py import numpy as np from mpi4py import MPI @@ -415,31 +416,35 @@ def propulate(self, logging_interval: int = 10, debug: int = 1) -> None: self.propulate_comm.barrier() # Loop over generations. - while self.generations <= -1 or self.generation < self.generations: - if debug == 1 and self.generation % int(logging_interval) == 0: - log.info( - f"Island {self.island_idx} Worker {self.island_comm.rank}: In generation {self.generation}..." - ) + # TODO this should probably be refactored, checkpointing can probably be handled in one place + with h5py.File( + self.checkpoint_path, "a", driver="mpio", comm=MPI.COMM_WORLD + ) as f: + while self.generations <= -1 or self.generation < self.generations: + if debug == 1 and self.generation % int(logging_interval) == 0: + log.info( + f"Island {self.island_idx} Worker {self.island_comm.rank}: In generation {self.generation}..." + ) - # Breed and evaluate individual. - self._evaluate_individual() + # Breed and evaluate individual. + self._evaluate_individual(f) - # Check for and possibly receive incoming individuals from other intra-island workers. - self._receive_intra_island_individuals() + # Check for and possibly receive incoming individuals from other intra-island workers. + self._receive_intra_island_individuals() - if migration: - # Emigration: Island sends individuals out. - # Happens on per-worker basis with certain probability. - if self.rng.random() < self.migration_prob: - self._send_emigrants() + if migration: + # Emigration: Island sends individuals out. + # Happens on per-worker basis with certain probability. + if self.rng.random() < self.migration_prob: + self._send_emigrants() - # Immigration: Island checks for incoming individuals from other islands. - self._receive_immigrants() + # Immigration: Island checks for incoming individuals from other islands. + self._receive_immigrants() - # Immigration: Check for individuals replaced by other intra-island workers to be deactivated. - self._deactivate_replaced_individuals() + # Immigration: Check for individuals replaced by other intra-island workers to be deactivated. + self._deactivate_replaced_individuals() - self.generation += 1 # Go to next generation. + self.generation += 1 # Go to next generation. # Having completed all generations, the workers have to wait for each other. # Once all workers are done, they should check for incoming messages once again diff --git a/propulate/propulator.py b/propulate/propulator.py index bd707429..b783f403 100644 --- a/propulate/propulator.py +++ b/propulate/propulator.py @@ -649,8 +649,8 @@ def propulate(self, logging_interval: int = 10, debug: int = -1) -> None: self.generation = self.worker_sub_comm.bcast(self.generation, root=0) if self.propulate_comm is None: while self.generation < self.generations: - raise # Breed and evaluate individual. + # TODO this should be refactored, the subworkers don't need the logfile self._evaluate_individual() self.generation += 1 return From 2a911c5e0f0638dfae4e991b16c57aa3deec5d54 Mon Sep 17 00:00:00 2001 From: Oskar Taubert <oskar.taubert@kit.edu> Date: Tue, 14 May 2024 09:32:45 +0200 Subject: [PATCH 12/21] added some more asserts and mode verbose output --- tests/test_propulator.py | 23 ++++++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/tests/test_propulator.py b/tests/test_propulator.py index 6178896d..93d9e716 100644 --- a/tests/test_propulator.py +++ b/tests/test_propulator.py @@ -82,7 +82,10 @@ def test_propulator_checkpointing(mpi_tmp_path: pathlib.Path) -> None: mpi_tmp_path : pathlib.Path The temporary checkpoint directory. """ - set_logger_config() + first_generations = 20 + second_generations = 40 + set_logger_config(level=logging.DEBUG) + log.debug("test") rng = random.Random( 42 + MPI.COMM_WORLD.rank ) # Separate random number generator for optimization @@ -96,12 +99,16 @@ def test_propulator_checkpointing(mpi_tmp_path: pathlib.Path) -> None: propulator = Propulator( loss_fn=benchmark_function, propagator=propagator, - generations=100, + generations=first_generations, checkpoint_path=mpi_tmp_path, rng=rng, ) # Set up propulator performing actual optimization. propulator.propulate() # Run optimization and print summary of results. + assert ( + len(propulator.population) + == first_generations * propulator.propulate_comm.Get_size() + ) old_population = copy.deepcopy( propulator.population @@ -112,7 +119,7 @@ def test_propulator_checkpointing(mpi_tmp_path: pathlib.Path) -> None: propulator = Propulator( loss_fn=benchmark_function, propagator=propagator, - generations=200, + generations=second_generations, checkpoint_path=mpi_tmp_path, rng=rng, ) # Set up new propulator starting from checkpoint. @@ -125,6 +132,16 @@ def test_propulator_checkpointing(mpi_tmp_path: pathlib.Path) -> None: ) propulator.propulate() + # NOTE make sure nothing was overwritten + seniors = [ + ind for ind in propulator.population if ind.generation < first_generations + ] + assert len(deepdiff.DeepDiff(old_population, seniors, ignore_order=True)) == 0 + assert ( + len(propulator.population) + == second_generations * propulator.propulate_comm.Get_size() + ) + log.handlers.clear() From c914c98a0c0975451d0c00ab80c476a910da704e Mon Sep 17 00:00:00 2001 From: Oskar Taubert <oskar.taubert@kit.edu> Date: Tue, 14 May 2024 09:34:44 +0200 Subject: [PATCH 13/21] fixed checkpoint file communicator --- propulate/migrator.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/propulate/migrator.py b/propulate/migrator.py index 2cdb62ee..a85c6fd1 100644 --- a/propulate/migrator.py +++ b/propulate/migrator.py @@ -434,21 +434,26 @@ def propulate(self, logging_interval: int = 10, debug: int = 1) -> None: # Loop over generations. # TODO this should probably be refactored, checkpointing can probably be handled in one place + log.debug("opening checkpoint file") with h5py.File( - self.checkpoint_path, "a", driver="mpio", comm=MPI.COMM_WORLD + self.checkpoint_path, "a", driver="mpio", comm=self.propulate_comm ) as f: + log.debug("opened checkpoint file") while self.generation < self.generations: if self.generation % int(logging_interval) == 0: log.info( f"Island {self.island_idx} Worker {self.island_comm.rank}: In generation {self.generation}..." ) + log.debug(f"eval ind {self.generation}") # Breed and evaluate individual. self._evaluate_individual(f) + log.debug("immigration") # Check for and possibly receive incoming individuals from other intra-island workers. self._receive_intra_island_individuals() + log.debug("emigration") # Migration. if migration: # Emigration: Island sends individuals out. From 03bb0f02ef550fd2bc95fbb32b5244d63a74da4a Mon Sep 17 00:00:00 2001 From: Oskar Taubert <oskar.taubert@kit.edu> Date: Tue, 14 May 2024 09:38:57 +0200 Subject: [PATCH 14/21] fixed off by one error in generations when loading checkpoint, removed touching attrs in the parallel opened checkpoint file --- propulate/propulator.py | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/propulate/propulator.py b/propulate/propulator.py index b783f403..3a9a853b 100644 --- a/propulate/propulator.py +++ b/propulate/propulator.py @@ -176,7 +176,6 @@ def __init__( self.population = [] # consistency check and ensure enough space is allocated - self.set_up_checkpoint() if os.path.isfile(self.checkpoint_path): self.load_checkpoint() if self.propulate_comm.rank == 0: @@ -191,6 +190,7 @@ def __init__( log.info( "No valid checkpoint file given. Initializing population randomly..." ) + self.set_up_checkpoint() def load_checkpoint(self): """Load checkpoint from HDF5 file. Since this is only a read, all workers can do this in read-only mode without the mpio driver.""" @@ -209,10 +209,17 @@ def load_checkpoint(self): # TODO check island sizes are consistent self.generation = int(f["generations"][self.propulate_comm.Get_rank()]) + if ( + islandgroup[f"{self.island_comm.Get_rank()}"]["evalperiod"][ + self.generation + ] + > 0.0 + ): + self.generation += 1 # NOTE load individuals, since they might have migrated, every worker has to check each dataset for rank in range(self.propulate_comm.size): # for generation in range(len(islandgroup[f"{rank}"])): - for generation in range(f["generations"][rank]): + for generation in range(f["generations"][rank] + 1): if islandgroup[f"{rank}"]["current"][generation] == self.island_idx: ind = Individual( islandgroup[f"{rank}"]["x"][generation, 0], @@ -346,6 +353,7 @@ def set_up_checkpoint(self): np.uint64, chunks=True, maxshape=(None,), + data=-1 * np.ones((self.generations,)), ) def propulate(self, logging_interval: int = 10, debug: int = 1) -> None: @@ -425,7 +433,6 @@ def _evaluate_individual(self, hdf5_checkpoint) -> None: group = hdf5_checkpoint[f"{self.island_idx}"][ f"{self.propulate_comm.Get_rank()}" ] - group.attrs["generation"] = ckpt_idx + 1 # save candidate group["x"][ckpt_idx, 0, :] = ind.position[:] if ind.velocity is not None: @@ -669,28 +676,28 @@ def propulate(self, logging_interval: int = 10, debug: int = -1) -> None: ) # Breed and evaluate individual. + log.debug(f"breeding and evaluating {self.generation}") self._evaluate_individual(f) # Check for and possibly receive incoming individuals from other intra-island workers. + log.debug(f"receive intra island {self.generation}") self._receive_intra_island_individuals() # Go to next generation. self.generation += 1 - islandgroup = f[f"{self.island_idx}"] - islandgroup[f"{self.propulate_comm.Get_rank()}"].attrs[ - "generation" - ] = self.generation # Having completed all generations, the workers have to wait for each other. # Once all workers are done, they should check for incoming messages once again # so that each of them holds the complete final population and the found optimum # irrespective of the order they finished. + log.debug("barrier") if self.propulate_comm.rank == 0: log.info("OPTIMIZATION DONE.\nNEXT: Final checks for incoming messages...") self.propulate_comm.barrier() # Final check for incoming individuals evaluated by other intra-island workers. + log.debug("final sync") self._receive_intra_island_individuals() self.propulate_comm.barrier() From 2070a5764214e01ebbe2a22f67a7ee14d334c2ff Mon Sep 17 00:00:00 2001 From: Oskar Taubert <oskar.taubert@kit.edu> Date: Tue, 14 May 2024 22:53:48 +0200 Subject: [PATCH 15/21] removed comment and debug output --- tests/test_propulator.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/tests/test_propulator.py b/tests/test_propulator.py index 93d9e716..8f7416a0 100644 --- a/tests/test_propulator.py +++ b/tests/test_propulator.py @@ -85,7 +85,6 @@ def test_propulator_checkpointing(mpi_tmp_path: pathlib.Path) -> None: first_generations = 20 second_generations = 40 set_logger_config(level=logging.DEBUG) - log.debug("test") rng = random.Random( 42 + MPI.COMM_WORLD.rank ) # Separate random number generator for optimization @@ -124,8 +123,6 @@ def test_propulator_checkpointing(mpi_tmp_path: pathlib.Path) -> None: rng=rng, ) # Set up new propulator starting from checkpoint. - # As the number of requested generations is smaller than the number of generations from the run before, - # no new evaluations are performed. Thus, the length of both Propulators' populations must be equal. assert ( len(deepdiff.DeepDiff(old_population, propulator.population, ignore_order=True)) == 0 From cdc3e4d1a13d2edaa9e22d917254675353c7754b Mon Sep 17 00:00:00 2001 From: Oskar Taubert <oskar.taubert@kit.edu> Date: Wed, 15 May 2024 02:10:19 +0200 Subject: [PATCH 16/21] comments, generations, debugging stuff --- tests/test_island.py | 36 ++++++++++++++++++++++++++++-------- 1 file changed, 28 insertions(+), 8 deletions(-) diff --git a/tests/test_island.py b/tests/test_island.py index 62a03b20..e231360d 100644 --- a/tests/test_island.py +++ b/tests/test_island.py @@ -65,7 +65,7 @@ def test_islands( mpi_tmp_path : pathlib.Path The temporary checkpoint directory. """ - set_logger_config() + set_logger_config(level=logging.DEBUG) rng, benchmark_function, limits, propagator = global_variables # Set up island model. @@ -105,7 +105,9 @@ def test_checkpointing_isolated( mpi_tmp_path : pathlib.Path The temporary checkpoint directory. """ - set_logger_config() + first_generations = 20 + second_generations = 40 + set_logger_config(level=logging.DEBUG) rng, benchmark_function, limits, propagator = global_variables # Set up island model. @@ -113,28 +115,38 @@ def test_checkpointing_isolated( loss_fn=benchmark_function, propagator=propagator, rng=rng, - generations=100, + generations=first_generations, num_islands=2, migration_probability=0.0, checkpoint_path=mpi_tmp_path, ) # Run actual optimization. +<<<<<<< HEAD islands.propulate(debug=2) islands.summarize(top_n=1, debug=2) +======= + islands.evolve(top_n=1, debug=2) + assert ( + len(islands.propulator.population) + == first_generations * islands.propulator.island_comm.Get_size() + ) +>>>>>>> c7403f4 (comments, generations, debugging stuff) old_population = copy.deepcopy(islands.propulator.population) del islands + MPI.COMM_WORLD.barrier() # Synchronize all processes. islands = Islands( loss_fn=benchmark_function, propagator=propagator, rng=rng, - generations=100, + generations=second_generations, num_islands=2, migration_probability=0.0, checkpoint_path=mpi_tmp_path, ) + assert len(old_population) == len(islands.propulator.population) assert ( len( @@ -167,6 +179,8 @@ def test_checkpointing( mpi_tmp_path : pathlib.Path The temporary checkpoint directory. """ + first_generations = 20 + second_generations = 40 set_logger_config() rng, benchmark_function, limits, propagator = global_variables @@ -175,7 +189,7 @@ def test_checkpointing( loss_fn=benchmark_function, propagator=propagator, rng=rng, - generations=100, + generations=first_generations, num_islands=2, migration_probability=0.9, pollination=pollination, @@ -193,7 +207,7 @@ def test_checkpointing( loss_fn=benchmark_function, propagator=propagator, rng=rng, - generations=100, + generations=second_generations, num_islands=2, migration_probability=0.9, pollination=pollination, @@ -231,6 +245,8 @@ def test_checkpointing_unequal_populations( mpi_tmp_path : pathlib.Path The temporary checkpoint directory. """ + first_generations = 20 + second_generations = 40 set_logger_config() rng, benchmark_function, limits, propagator = global_variables @@ -239,7 +255,7 @@ def test_checkpointing_unequal_populations( loss_fn=benchmark_function, propagator=propagator, rng=rng, - generations=100, + generations=first_generations, num_islands=2, island_sizes=np.array([3, 5]), migration_probability=0.9, @@ -258,7 +274,7 @@ def test_checkpointing_unequal_populations( loss_fn=benchmark_function, propagator=propagator, rng=rng, - generations=100, + generations=second_generations, num_islands=2, island_sizes=np.array([3, 5]), migration_probability=0.9, @@ -266,6 +282,7 @@ def test_checkpointing_unequal_populations( checkpoint_path=mpi_tmp_path, ) + # TODO compare active only assert ( len( deepdiff.DeepDiff( @@ -275,3 +292,6 @@ def test_checkpointing_unequal_populations( == 0 ) log.handlers.clear() + + +# TODO start from checkpoint with unevaluated candidates From acd9de8ce4d9f5c1a7d55c5a093054a3a4714123 Mon Sep 17 00:00:00 2001 From: Oskar Taubert <oskar.taubert@kit.edu> Date: Wed, 15 May 2024 02:11:17 +0200 Subject: [PATCH 17/21] removed some debug messages and added migration updates to the checkpoint --- propulate/migrator.py | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/propulate/migrator.py b/propulate/migrator.py index a85c6fd1..3b83c953 100644 --- a/propulate/migrator.py +++ b/propulate/migrator.py @@ -124,7 +124,7 @@ def __init__( Individual ] = [] # Emigrated individuals to be deactivated on sending island - def _send_emigrants(self) -> None: + def _send_emigrants(self, hdf5_checkpoint) -> None: """Perform migration, i.e. island sends individuals out to other islands.""" log_string = ( f"Island {self.island_idx} Worker {self.island_comm.rank} " @@ -189,6 +189,10 @@ def _send_emigrants(self) -> None: # Send emigrants to target island. departing = copy.deepcopy(emigrants) + for ind in departing: + hdf5_checkpoint[f"{ind.island}"][f"{ind.island_rank}"][ + "active_on_island" + ][ind.generation, self.island_idx] = False # Determine new responsible worker on target island. for ind in departing: ind.current = self.rng.randrange(0, count) @@ -237,7 +241,7 @@ def _send_emigrants(self) -> None: f"to select {num_emigrants} migrants." ) - def _receive_immigrants(self) -> None: + def _receive_immigrants(self, hdf5_checkpoint) -> None: """ Check for and possibly receive immigrants send by other islands. @@ -285,6 +289,9 @@ def _receive_immigrants(self) -> None: log_string + f"Identical immigrant {immigrant} already active on target island {self.island_idx}." ) + hdf5_checkpoint[f"{immigrant.island}"][f"{immigrant.island_rank}"][ + "active_on_island" + ][immigrant.generation] = True self.population.append( copy.deepcopy(immigrant) ) # Append immigrant to population. @@ -434,35 +441,30 @@ def propulate(self, logging_interval: int = 10, debug: int = 1) -> None: # Loop over generations. # TODO this should probably be refactored, checkpointing can probably be handled in one place - log.debug("opening checkpoint file") with h5py.File( self.checkpoint_path, "a", driver="mpio", comm=self.propulate_comm ) as f: - log.debug("opened checkpoint file") while self.generation < self.generations: if self.generation % int(logging_interval) == 0: log.info( f"Island {self.island_idx} Worker {self.island_comm.rank}: In generation {self.generation}..." ) - log.debug(f"eval ind {self.generation}") # Breed and evaluate individual. self._evaluate_individual(f) - log.debug("immigration") # Check for and possibly receive incoming individuals from other intra-island workers. self._receive_intra_island_individuals() - log.debug("emigration") # Migration. if migration: # Emigration: Island sends individuals out. # Happens on per-worker basis with certain probability. if self.rng.random() < self.migration_prob: - self._send_emigrants() + self._send_emigrants(f) # Immigration: Check for incoming individuals from other islands. - self._receive_immigrants() + self._receive_immigrants(f) # Emigration: Check for emigrants from other intra-island workers to be deactivated. self._deactivate_emigrants() @@ -488,7 +490,10 @@ def propulate(self, logging_interval: int = 10, debug: int = 1) -> None: if migration: # Final check for incoming individuals from other islands. - self._receive_immigrants() + with h5py.File( + self.checkpoint_path, "a", driver="mpio", comm=self.propulate_comm + ) as f: + self._receive_immigrants(f) self.propulate_comm.barrier() # Emigration: Final check for emigrants from other intra-island workers to be deactivated. From e30b2f592d436997aa567426661b6ec8dd70e070 Mon Sep 17 00:00:00 2001 From: Oskar Taubert <oskar.taubert@kit.edu> Date: Wed, 15 May 2024 02:12:33 +0200 Subject: [PATCH 18/21] fixed comm --- propulate/pollinator.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/propulate/pollinator.py b/propulate/pollinator.py index 4d019c8f..6614f172 100644 --- a/propulate/pollinator.py +++ b/propulate/pollinator.py @@ -418,10 +418,10 @@ def propulate(self, logging_interval: int = 10, debug: int = 1) -> None: # Loop over generations. # TODO this should probably be refactored, checkpointing can probably be handled in one place with h5py.File( - self.checkpoint_path, "a", driver="mpio", comm=MPI.COMM_WORLD + self.checkpoint_path, "a", driver="mpio", comm=self.propulate_comm ) as f: - while self.generations <= -1 or self.generation < self.generations: - if debug == 1 and self.generation % int(logging_interval) == 0: + while self.generation < self.generations: + if self.generation % int(logging_interval) == 0: log.info( f"Island {self.island_idx} Worker {self.island_comm.rank}: In generation {self.generation}..." ) From d516b3fd306238edb53aadf58a26029ca6a7d5df Mon Sep 17 00:00:00 2001 From: Oskar Taubert <oskar.taubert@kit.edu> Date: Wed, 15 May 2024 02:16:08 +0200 Subject: [PATCH 19/21] fixed mistake in checkpoint reading, fixed comm, used individual island origin, implemented active_on_island --- propulate/propulator.py | 110 ++++++++++++++++++++++------------------ 1 file changed, 61 insertions(+), 49 deletions(-) diff --git a/propulate/propulator.py b/propulate/propulator.py index 3a9a853b..49277656 100644 --- a/propulate/propulator.py +++ b/propulate/propulator.py @@ -167,7 +167,10 @@ def __init__( self.island_displs = ( island_displs # Propulate world rank of each island's worker ) - self.island_counts = island_counts # Number of workers on each island + if island_counts is None: + self.island_counts = np.array([self.island_comm.Get_size()]) + else: + self.island_counts = island_counts # Number of workers on each island self.emigration_propagator = emigration_propagator # Emigration propagator self.rng = rng @@ -197,51 +200,57 @@ def load_checkpoint(self): # TODO check that the island and worker setup is the same as in the checkpoint # NOTE each individual is only stored once at the position given by its origin island and worker, the modifications have to be put in the checkpoint file during migration TODO test if this works as intended reliably # TODO get the started but not yet completed ones from the difference in start time and evaltime + # TODO only load an incomplete one, if you're then going to evaluate it with h5py.File(self.checkpoint_path, "r", driver=None) as f: - islandgroup = f[f"{self.island_idx}"] - # NOTE check limits are consistent limitsgroup = f["limits"] - for key in self.propagator.limits: - if set(limitsgroup.attrs.keys()) != set(self.propagator.limits): - raise RuntimeError("Limits inconsistent with checkpoint") + if set(limitsgroup.attrs.keys()) != set(self.propagator.limits): + raise RuntimeError("Limits inconsistent with checkpoint") # TODO check island sizes are consistent self.generation = int(f["generations"][self.propulate_comm.Get_rank()]) if ( - islandgroup[f"{self.island_comm.Get_rank()}"]["evalperiod"][ + f[f"{self.island_idx}"][f"{self.island_comm.Get_rank()}"]["evalperiod"][ self.generation ] > 0.0 ): self.generation += 1 # NOTE load individuals, since they might have migrated, every worker has to check each dataset - for rank in range(self.propulate_comm.size): - # for generation in range(len(islandgroup[f"{rank}"])): - for generation in range(f["generations"][rank] + 1): - if islandgroup[f"{rank}"]["current"][generation] == self.island_idx: - ind = Individual( - islandgroup[f"{rank}"]["x"][generation, 0], - self.propagator.limits, - ) - ind.rank = rank - ind.island = self.island_idx - ind.current = islandgroup[f"{rank}"]["current"][generation] - # TODO velocity loading - # if len(group[f"{rank}"].shape) > 1: - # ind.velocity = islandgroup[f"{rank}"]["x"][generation, 1] - ind.loss = islandgroup[f"{rank}"]["loss"][generation] - ind.startime = islandgroup[f"{rank}"]["starttime"][generation] - ind.evaltime = islandgroup[f"{rank}"]["evaltime"][generation] - ind.evalperiod = islandgroup[f"{rank}"]["evalperiod"][ - generation - ] - ind.generation = generation - self.population.append(ind) - if ind.loss is None: - # TODO resume evaluation - raise + num_islands = len(self.island_counts) + for i in range(num_islands): + islandgroup = f[f"{i}"] + for rank in range(self.island_counts[i]): + for generation in range(f["generations"][rank] + 1): + if islandgroup[f"{rank}"]["active_on_island"][generation][ + self.island_idx + ]: + ind = Individual( + islandgroup[f"{rank}"]["x"][generation, 0], + self.propagator.limits, + ) + # TODO check what rank was used for, i think it was the rank in the propulator_comm + ind.rank = rank + ind.island = self.island_idx + ind.current = islandgroup[f"{rank}"]["current"][generation] + # TODO velocity loading + # if len(group[f"{rank}"].shape) > 1: + # ind.velocity = islandgroup[f"{rank}"]["x"][generation, 1] + ind.loss = islandgroup[f"{rank}"]["loss"][generation] + # ind.startime = islandgroup[f"{rank}"]["starttime"][generation] + ind.evaltime = islandgroup[f"{rank}"]["evaltime"][ + generation + ] + ind.evalperiod = islandgroup[f"{rank}"]["evalperiod"][ + generation + ] + ind.generation = generation + ind.island_rank = rank + self.population.append(ind) + if ind.loss is None: + # TODO resume evaluation on this individual + raise def set_up_checkpoint(self): """Initialize checkpoint file or check consistenct with an existing one.""" @@ -252,10 +261,9 @@ def set_up_checkpoint(self): else: limit_dim += 1 - num_islands = 1 - if self.island_counts is not None: - num_islands = len(self.island_counts) + num_islands = len(self.island_counts) + # TODO this can probably be done without mpi just on rank 0 with h5py.File( self.checkpoint_path, "a", driver="mpio", comm=self.propulate_comm ) as f: @@ -286,17 +294,17 @@ def set_up_checkpoint(self): # population for i in range(num_islands): f.require_group(f"{i}") - for worker_idx in range(self.propulate_comm.Get_size()): + for worker_idx in range(self.island_counts[i]): group = f[f"{i}"].require_group(f"{worker_idx}") if oldgenerations < self.generations: group["x"].resize(self.generations, axis=0) group["loss"].resize(self.generations, axis=0) - group["active"].resize(self.generations, axis=0) group["current"].resize(self.generations, axis=0) group["migration_steps"].resize(self.generations, axis=0) group["starttime"].resize(self.generations, axis=0) group["evaltime"].resize(self.generations, axis=0) group["evalperiod"].resize(self.generations, axis=0) + group["active_on_island"].resize(self.generations, axis=0) group.require_dataset( "x", @@ -312,13 +320,6 @@ def set_up_checkpoint(self): chunks=True, maxshape=(None,), ) - group.require_dataset( - "active", - (self.generations,), - np.bool_, - chunks=True, - maxshape=(None,), - ) group.require_dataset( "current", (self.generations,), @@ -355,6 +356,14 @@ def set_up_checkpoint(self): maxshape=(None,), data=-1 * np.ones((self.generations,)), ) + group.require_dataset( + "active_on_island", + (self.generations, num_islands), + dtype=bool, + chunks=True, + maxshape=(None, None), + data=np.zeros((self.generations, num_islands), dtype=bool), + ) def propulate(self, logging_interval: int = 10, debug: int = 1) -> None: """ @@ -425,28 +434,31 @@ def _breed(self) -> Individual: def _evaluate_individual(self, hdf5_checkpoint) -> None: """Breed and evaluate individual.""" ind = self._breed() # Breed new individual. + ind.island_rank = self.island_comm.Get_rank() start_time = time.time_ns() - self.start_time # Start evaluation timer. ind.starttime = start_time ckpt_idx = ind.generation hdf5_checkpoint["generations"][self.propulate_comm.Get_rank()] = ind.generation - group = hdf5_checkpoint[f"{self.island_idx}"][ - f"{self.propulate_comm.Get_rank()}" - ] + group = hdf5_checkpoint[f"{self.island_idx}"][f"{self.island_comm.Get_rank()}"] # save candidate group["x"][ckpt_idx, 0, :] = ind.position[:] if ind.velocity is not None: group["x"][ckpt_idx, 1, :] = ind.velocity[:] group["starttime"][ckpt_idx] = start_time + group["current"][ckpt_idx] = ind.current ind.loss = self.loss_fn(ind) # Evaluate its loss. ind.evaltime = time.time_ns() - self.start_time # Stop evaluation timer. ind.evalperiod = ind.evaltime - start_time # Calculate evaluation duration. + # TODO fix evalperiod for resumed from checkpoint individuals + # TODO somehow store migration history, maybe just as islands_visited # save result for candidate group["loss"][ckpt_idx] = ind.loss group["evaltime"][ckpt_idx] = ind.evaltime group["evalperiod"][ckpt_idx] = ind.evalperiod + group["active_on_island"][ckpt_idx, self.island_idx] = True # Signal start of run to surrogate model. if self.surrogate is not None: self.surrogate.start_run(ind) @@ -491,7 +503,7 @@ def loss_fn(individual: Individual) -> float: self.surrogate.update(ind.loss) if self.propulate_comm is None: return - ind.evaltime = time.time() # Stop evaluation timer. + ind.evaltime = time.time_ns() - self.start_time # Stop evaluation timer. ind.evalperiod = ind.evaltime - start_time # Calculate evaluation duration. self.population.append( ind @@ -667,7 +679,7 @@ def propulate(self, logging_interval: int = 10, debug: int = -1) -> None: # Loop over generations. with h5py.File( - self.checkpoint_path, "a", driver="mpio", comm=MPI.COMM_WORLD + self.checkpoint_path, "a", driver="mpio", comm=self.propulate_comm ) as f: while self.generation < self.generations: if self.generation % int(logging_interval) == 0: From b9056e4e8f6d90f47bafb9a95e8eb15847d5393d Mon Sep 17 00:00:00 2001 From: Oskar Taubert <oskar-t@gmx.de> Date: Sun, 4 Aug 2024 23:17:14 +0200 Subject: [PATCH 20/21] fixed pre-commit issues, updated installation instructions, excluded coverage file from pre-commit hooks, fixed a missed merge conflicts --- .pre-commit-config.yaml | 1 + README.md | 2 + propulate/migrator.py | 8 ++- propulate/pollinator.py | 15 +++-- propulate/population.py | 1 + propulate/propagators/base.py | 8 ++- propulate/propagators/pso.py | 2 +- propulate/propulator.py | 61 ++++++++----------- tests/test_island.py | 5 -- tests/test_pso.py | 4 +- tests/test_surrogate.py | 108 ---------------------------------- tutorials/pso_example.py | 2 +- 12 files changed, 56 insertions(+), 161 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 48440f6f..6477fe9c 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -5,6 +5,7 @@ repos: hooks: - id: trailing-whitespace - id: end-of-file-fixer + exclude: 'coverage' - id: check-yaml - id: check-added-large-files - repo: https://github.com/astral-sh/ruff-pre-commit diff --git a/README.md b/README.md index 89ec736d..4fd5e10f 100644 --- a/README.md +++ b/README.md @@ -93,6 +93,8 @@ discussions](https://github.com/Helmholtz-AI-Energy/propulate/discussions) :octo ## Installation +- Installations of MPI and hdf5 are required. +- ``Propulate`` depends on parallel hdf5. Try ``CC="mpicc" HDF5_MPI="ON" pip install --no-binary=h5py h5py`` to install the parallel enabled Python bindings or follow instructions [here](https://docs.h5py.org/en/stable/build.html#install). - You can install the **latest stable release** from PyPI: ``pip install propulate`` - If you need the **latest updates**, you can also install ``Propulate`` directly from the master branch. Pull and run ``pip install .``. diff --git a/propulate/migrator.py b/propulate/migrator.py index 3b83c953..003e6efa 100644 --- a/propulate/migrator.py +++ b/propulate/migrator.py @@ -124,7 +124,7 @@ def __init__( Individual ] = [] # Emigrated individuals to be deactivated on sending island - def _send_emigrants(self, hdf5_checkpoint) -> None: + def _send_emigrants(self, hdf5_checkpoint: h5py.File) -> None: """Perform migration, i.e. island sends individuals out to other islands.""" log_string = ( f"Island {self.island_idx} Worker {self.island_comm.rank} " @@ -241,7 +241,7 @@ def _send_emigrants(self, hdf5_checkpoint) -> None: f"to select {num_emigrants} migrants." ) - def _receive_immigrants(self, hdf5_checkpoint) -> None: + def _receive_immigrants(self, hdf5_checkpoint: h5py.File) -> None: """ Check for and possibly receive immigrants send by other islands. @@ -429,7 +429,9 @@ def propulate(self, logging_interval: int = 10, debug: int = 1) -> None: if self.propulate_comm is None: while self.generations <= -1 or self.generation < self.generations: # Breed and evaluate individual. - self._evaluate_individual() + # TODO this should be refactored, the subworkers don't need the logfile + # TODO this needs to be addressed before merge, since multirank workers should fail with this + self._evaluate_individual(None) self.generation += 1 return diff --git a/propulate/pollinator.py b/propulate/pollinator.py index 6614f172..149cdedf 100644 --- a/propulate/pollinator.py +++ b/propulate/pollinator.py @@ -1,6 +1,7 @@ import copy import logging import random +import time from pathlib import Path from typing import Callable, Generator, List, Optional, Tuple, Type, Union @@ -205,7 +206,8 @@ def _send_emigrants(self) -> None: f"to select {num_emigrants} migrants." ) - def _receive_immigrants(self) -> None: + # TODO implement checkpoint update + def _receive_immigrants(self, hdf5_checkpoint: h5py.File) -> None: """Check for and possibly receive immigrants send by other islands.""" replace_num = 0 log_string = ( @@ -401,12 +403,15 @@ def propulate(self, logging_interval: int = 10, debug: int = 1) -> None: debug : int, optional The debug level; 0 - silent; 1 - moderate, 2 - noisy (debug mode). Default is 1. """ + self.start_time = time.time_ns() if self.worker_sub_comm != MPI.COMM_SELF: self.generation = self.worker_sub_comm.bcast(self.generation, root=0) if self.propulate_comm is None: while self.generations <= -1 or self.generation < self.generations: # Breed and evaluate individual. - self._evaluate_individual() + # TODO this should be refactored, the subworkers don't need the logfile + # TODO this needs to be addressed before merge, since multirank workers should fail with this + self._evaluate_individual(None) self.generation += 1 return if self.island_comm.rank == 0: @@ -439,7 +444,8 @@ def propulate(self, logging_interval: int = 10, debug: int = 1) -> None: self._send_emigrants() # Immigration: Island checks for incoming individuals from other islands. - self._receive_immigrants() + # TODO this should probably update the checkpoint so it needs to pass the handle + self._receive_immigrants(None) # Immigration: Check for individuals replaced by other intra-island workers to be deactivated. self._deactivate_replaced_individuals() @@ -462,7 +468,8 @@ def propulate(self, logging_interval: int = 10, debug: int = 1) -> None: if migration: # Final check for incoming individuals from other islands. - self._receive_immigrants() + # TODO this needs to update the checkpoint + self._receive_immigrants(None) self.propulate_comm.barrier() # Immigration: Final check for individuals replaced by other intra-island workers to be deactivated. diff --git a/propulate/population.py b/propulate/population.py index 2f65361b..e16a12a5 100644 --- a/propulate/population.py +++ b/propulate/population.py @@ -85,6 +85,7 @@ def __init__( self.migration_history: str = "" # migration history self.evaltime = float("inf") # evaluation time self.evalperiod = 0.0 # evaluation duration + self.island_rank = 0 # rank in the island comm # NOTE needed for PSO type propagators self.velocity = velocity diff --git a/propulate/propagators/base.py b/propulate/propagators/base.py index e22903aa..27b9820a 100644 --- a/propulate/propagators/base.py +++ b/propulate/propagators/base.py @@ -73,6 +73,10 @@ def __init__( if rng is None: rng = random.Random() self.rng = rng # Random number generator + # TODO + self.limits: Mapping[ + str, Union[Tuple[float, float], Tuple[int, int], Tuple[str, ...]] + ] = dict() def __call__(self, inds: List[Individual]) -> Union[List[Individual], Individual]: """ @@ -177,7 +181,9 @@ class Conditional(Propagator): def __init__( self, - limits: Dict, + limits: Mapping[ + str, Union[Tuple[float, float], Tuple[int, int], Tuple[str, ...]] + ], pop_size: int, true_prop: Propagator, false_prop: Propagator, diff --git a/propulate/propagators/pso.py b/propulate/propagators/pso.py index 2748c482..b325bebb 100644 --- a/propulate/propagators/pso.py +++ b/propulate/propagators/pso.py @@ -270,7 +270,7 @@ def __call__(self, individuals: List[Individual]) -> Individual: individuals : List[propulate.population.Individual] The list of individuals that must at least contain one individual that belongs to the propagator. This list is used to calculate personal and global best of the particle and the swarm, - respectively, and then to update the particle based on the retrieved results. + respectively, and then to update the particle based on the retrieved results. cannot be used as ``Individual`` objects are converted to particles first. Returns diff --git a/propulate/propulator.py b/propulate/propulator.py index 49277656..4f8db028 100644 --- a/propulate/propulator.py +++ b/propulate/propulator.py @@ -6,7 +6,7 @@ import time from operator import attrgetter from pathlib import Path -from typing import Callable, Final, Generator, List, Optional, Tuple, Type, Union +from typing import Any, Callable, Final, Generator, List, Optional, Tuple, Type, Union import deepdiff import h5py @@ -177,7 +177,7 @@ def __init__( # Load initial population of evaluated individuals from checkpoint if exists. self.checkpoint_path = self.checkpoint_path / "ckpt.hdf5" - self.population = [] + self.population: list[Individual] = [] # consistency check and ensure enough space is allocated if os.path.isfile(self.checkpoint_path): self.load_checkpoint() @@ -195,7 +195,7 @@ def __init__( ) self.set_up_checkpoint() - def load_checkpoint(self): + def load_checkpoint(self) -> None: """Load checkpoint from HDF5 file. Since this is only a read, all workers can do this in read-only mode without the mpio driver.""" # TODO check that the island and worker setup is the same as in the checkpoint # NOTE each individual is only stored once at the position given by its origin island and worker, the modifications have to be put in the checkpoint file during migration TODO test if this works as intended reliably @@ -252,7 +252,7 @@ def load_checkpoint(self): # TODO resume evaluation on this individual raise - def set_up_checkpoint(self): + def set_up_checkpoint(self) -> None: """Initialize checkpoint file or check consistenct with an existing one.""" limit_dim = 0 for key in self.propagator.limits: @@ -365,21 +365,6 @@ def set_up_checkpoint(self): data=np.zeros((self.generations, num_islands), dtype=bool), ) - def propulate(self, logging_interval: int = 10, debug: int = 1) -> None: - """ - Run asynchronous evolutionary optimization routine. - - Parameters - ---------- - logging_interval : int, optional - Print each worker's progress every ``logging_interval``-th generation. Default is 10. - debug : int, optional - The debug level; 0 - silent; 1 - moderate, 2 - noisy (debug mode). Default is 1. - """ - # TODO note the propulating times in the checkpoint file for when the run is interrupted - self.start_time = time.time_ns() - self._work(logging_interval, debug) - def _get_active_individuals(self) -> Tuple[List[Individual], int]: """ Get active individuals in current population list. @@ -431,12 +416,12 @@ def _breed(self) -> Individual: assert isinstance(ind, Individual) return ind # Return new individual. - def _evaluate_individual(self, hdf5_checkpoint) -> None: + def _evaluate_individual(self, hdf5_checkpoint: h5py.File) -> None: """Breed and evaluate individual.""" ind = self._breed() # Breed new individual. ind.island_rank = self.island_comm.Get_rank() start_time = time.time_ns() - self.start_time # Start evaluation timer. - ind.starttime = start_time + # ind.starttime = start_time ckpt_idx = ind.generation hdf5_checkpoint["generations"][self.propulate_comm.Get_rank()] = ind.generation @@ -448,17 +433,6 @@ def _evaluate_individual(self, hdf5_checkpoint) -> None: group["starttime"][ckpt_idx] = start_time group["current"][ckpt_idx] = ind.current - ind.loss = self.loss_fn(ind) # Evaluate its loss. - ind.evaltime = time.time_ns() - self.start_time # Stop evaluation timer. - ind.evalperiod = ind.evaltime - start_time # Calculate evaluation duration. - # TODO fix evalperiod for resumed from checkpoint individuals - # TODO somehow store migration history, maybe just as islands_visited - - # save result for candidate - group["loss"][ckpt_idx] = ind.loss - group["evaltime"][ckpt_idx] = ind.evaltime - group["evalperiod"][ckpt_idx] = ind.evalperiod - group["active_on_island"][ckpt_idx, self.island_idx] = True # Signal start of run to surrogate model. if self.surrogate is not None: self.surrogate.start_run(ind) @@ -480,6 +454,7 @@ def loss_gen(individual: Individual) -> Generator[float, None, None]: for last in loss_gen(ind): if self.surrogate is not None: if self.surrogate.cancel(last): # Check cancel for each yield. + assert ind.loss == float("inf") log.debug( f"Island {self.island_idx} Worker {self.island_comm.rank} Generation {self.generation}: PRUNING\n" f"{ind}" @@ -498,6 +473,16 @@ def loss_fn(individual: Individual) -> float: ind.loss = float(loss_fn(ind)) # Evaluate its loss. + ind.evaltime = time.time_ns() - self.start_time # Stop evaluation timer. + ind.evalperiod = ind.evaltime - start_time # Calculate evaluation duration. + # save result for candidate + group["evaltime"][ckpt_idx] = ind.evaltime + group["evalperiod"][ckpt_idx] = ind.evalperiod + group["active_on_island"][ckpt_idx, self.island_idx] = True + # TODO fix evalperiod for resumed from checkpoint individuals + # TODO somehow store migration history, maybe just as islands_visited + + group["loss"][ckpt_idx] = ind.loss # Add final value to surrogate. if self.surrogate is not None: self.surrogate.update(ind.loss) @@ -571,7 +556,7 @@ def _receive_intra_island_individuals(self) -> None: ) log.debug(log_string) - def _send_emigrants(self) -> None: + def _send_emigrants(self, *args: Any, **kwargs: Any) -> None: """ Perform migration, i.e., island sends individuals out to other islands. @@ -583,7 +568,7 @@ def _send_emigrants(self) -> None: """ raise NotImplementedError - def _receive_immigrants(self) -> None: + def _receive_immigrants(self, *args: Any, **kwargs: Any) -> None: """ Check for and possibly receive immigrants send by other islands. @@ -664,13 +649,15 @@ def propulate(self, logging_interval: int = 10, debug: int = -1) -> None: If any individuals are left that should have been deactivated before (only for debug > 0). """ + self.start_time = time.time_ns() if self.worker_sub_comm != MPI.COMM_SELF: self.generation = self.worker_sub_comm.bcast(self.generation, root=0) if self.propulate_comm is None: while self.generation < self.generations: # Breed and evaluate individual. # TODO this should be refactored, the subworkers don't need the logfile - self._evaluate_individual() + # TODO this needs to be addressed before merge, since multirank workers should fail with this + self._evaluate_individual(None) self.generation += 1 return @@ -815,9 +802,9 @@ def summarize( ) self.propulate_comm.barrier() if debug == 0: - best = min(self.population, key=attrgetter("loss")) + best = [min(self.population, key=attrgetter("loss"))] if self.island_comm.rank == 0: - log.info(f"Top result on island {self.island_idx}: {best}") + log.info(f"Top result on island {self.island_idx}: {best[0]}") else: unique_pop = self._get_unique_individuals() unique_pop.sort(key=lambda x: x.loss) diff --git a/tests/test_island.py b/tests/test_island.py index e231360d..d2c940bc 100644 --- a/tests/test_island.py +++ b/tests/test_island.py @@ -122,16 +122,11 @@ def test_checkpointing_isolated( ) # Run actual optimization. -<<<<<<< HEAD islands.propulate(debug=2) - islands.summarize(top_n=1, debug=2) -======= - islands.evolve(top_n=1, debug=2) assert ( len(islands.propulator.population) == first_generations * islands.propulator.island_comm.Get_size() ) ->>>>>>> c7403f4 (comments, generations, debugging stuff) old_population = copy.deepcopy(islands.propulator.population) del islands diff --git a/tests/test_pso.py b/tests/test_pso.py index 4b156c6e..87235546 100644 --- a/tests/test_pso.py +++ b/tests/test_pso.py @@ -93,7 +93,9 @@ def test_pso(pso_propagator: Propagator, mpi_tmp_path: pathlib.Path) -> None: @pytest.mark.mpi -def test_pso_checkpointing(pso_propagator, mpi_tmp_path: pathlib.Path): +def test_pso_checkpointing( + pso_propagator: BasicPSO, mpi_tmp_path: pathlib.Path +) -> None: """ Test velocity checkpointing when using a PSO propagator. diff --git a/tests/test_surrogate.py b/tests/test_surrogate.py index 4f79be81..7df0136d 100644 --- a/tests/test_surrogate.py +++ b/tests/test_surrogate.py @@ -41,93 +41,8 @@ def ind_loss(params: Dict[str, Union[int, float, str]]) -> Generator[float, None ) -<<<<<<< HEAD def test_static(mpi_tmp_path: Path) -> None: """Test static surrogate using a dummy function.""" -======= - activations = { - "relu": nn.ReLU, - "sigmoid": nn.Sigmoid, - "tanh": nn.Tanh, - } # Define activation function mapping. - activation = activations[activation] # Get activation function. - loss_fn = ( - torch.nn.CrossEntropyLoss() - ) # Use cross-entropy loss for multi-class classification. - - model = Net(conv_layers, activation, lr, loss_fn).to( - device - ) # Set up neural network with specified hyperparameters. - model.best_accuracy = 0.0 # Initialize the model's best validation accuracy. - - train_loader, val_loader = get_data_loaders( - batch_size=8, root=root - ) # Get training and validation data loaders. - - # Configure optimizer. - optimizer = model.configure_optimizers() - - for epoch in range(epochs): - model.train() - total_train_loss = 0 - # Training loop - for batch_idx, (data, target) in enumerate(train_loader): - data, target = data.to(device), target.to(device) - # Zero out gradients. - optimizer.zero_grad() - # Forward + backward pass and optimizer step to update parameters. - loss = model.training_step((data, target)) - loss.backward() - optimizer.step() - # Update loss. - total_train_loss += loss.item() - - avg_train_loss = total_train_loss / len(train_loader) - log.info(f"Epoch {epoch+1}: Avg Training Loss: {avg_train_loss}") - - # Validation loop - model.eval() - total_val_loss = 0 - with torch.no_grad(): - for batch_idx, (data, target) in enumerate(val_loader): - data, target = data.to(device), target.to(device) - # Forward pass - loss = model.validation_step((data, target)) - # Update loss. - total_val_loss += loss.item() - - avg_val_loss = total_val_loss / len(val_loader) - log.info(f"Epoch {epoch+1}: Avg Validation Loss: {avg_val_loss}") - - yield avg_val_loss - - -def set_seeds(seed_value: int = 42) -> None: - """ - Set seed for reproducibility. - - Parameters - ---------- - seed_value : int, optional - The seed to use. Default is 42. - """ - random.seed(seed_value) # Python random module - torch.manual_seed(seed_value) # PyTorch random number generator for CPU - torch.cuda.manual_seed(seed_value) # PyTorch random number generator for all GPUs - torch.cuda.manual_seed_all( - seed_value - ) # PyTorch random number generator for multi-GPU - torch.backends.cudnn.deterministic = True # Use deterministic algorithms. - torch.backends.cudnn.benchmark = False # Disable to be deterministic. - os.environ["PYTHONHASHSEED"] = str(seed_value) # Python hash seed - - -@pytest.mark.mpi(min_size=4) -def test_mnist_static(mpi_tmp_path): - """Test static surrogate using a torch convolutional network on the MNIST dataset.""" - set_logger_config() - num_generations = 3 # Number of generations ->>>>>>> fb69169 (fixed logging output during tests and added a surrogate checkpointing test stub) pop_size = 2 * MPI.COMM_WORLD.size # Breeding population size limits: Dict[str, Union[Tuple[int, int], Tuple[float, float], Tuple[str, ...]]] = { "start": (0.1, 7.0), @@ -194,28 +109,15 @@ def test_static_island(mpi_tmp_path: Path) -> None: ) islands.summarize(top_n=1, debug=2) MPI.COMM_WORLD.barrier() -<<<<<<< HEAD -======= - delattr(get_data_loaders, "barrier_called") - log.handlers.clear() ->>>>>>> fb69169 (fixed logging output during tests and added a surrogate checkpointing test stub) @pytest.mark.filterwarnings( "ignore::DeprecationWarning", match="Assigning the 'data' attribute is an inherently unsafe operation and will be removed in the future.", ) -<<<<<<< HEAD def test_dynamic(mpi_tmp_path: Path) -> None: """Test dynamic surrogate using a dummy function.""" num_generations = 10 # Number of generations -======= -@pytest.mark.mpi(min_size=4) -def test_mnist_dynamic(mpi_tmp_path): - """Test static surrogate using a torch convolutional network on the MNIST dataset.""" - set_logger_config() - num_generations = 3 # Number of generations ->>>>>>> fb69169 (fixed logging output during tests and added a surrogate checkpointing test stub) pop_size = 2 * MPI.COMM_WORLD.size # Breeding population size limits: Dict[str, Union[Tuple[int, int], Tuple[float, float], Tuple[str, ...]]] = { "start": (0.1, 7.0), @@ -243,14 +145,4 @@ def test_mnist_dynamic(mpi_tmp_path): logging_interval=1, # Logging interval debug=2, # Verbosity level ) -<<<<<<< HEAD islands.summarize(top_n=1, debug=2) -======= - log.handlers.clear() - - -@pytest.mark.mpi(min_size=4) -def test_mnist_dynamic_checkpointing(mpi_tmp_path): - """Test whether the surrogate state for pruning is checkpointed correctly.""" - raise ->>>>>>> fb69169 (fixed logging output during tests and added a surrogate checkpointing test stub) diff --git a/tutorials/pso_example.py b/tutorials/pso_example.py index dc14df60..8665e041 100644 --- a/tutorials/pso_example.py +++ b/tutorials/pso_example.py @@ -89,7 +89,7 @@ else: raise ValueError("Invalid PSO propagator name given.") init = InitUniformPSO(limits, rng=rng, rank=MPI.COMM_WORLD.rank) - propagator = Conditional(config.pop_size, pso_propagator, init) + propagator = Conditional(limits, config.pop_size, pso_propagator, init) propulator = Propulator( benchmark_function, From 03289f3faa0a41d3ab93d562ecd3e19d50f685e6 Mon Sep 17 00:00:00 2001 From: Oskar Taubert <oskar-t@gmx.de> Date: Mon, 5 Aug 2024 00:00:17 +0200 Subject: [PATCH 21/21] fixed ruff errors, that were not found in the pre-commit hook for some reason --- propulate/population.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/propulate/population.py b/propulate/population.py index e16a12a5..5ffe86e2 100644 --- a/propulate/population.py +++ b/propulate/population.py @@ -100,11 +100,11 @@ def __getitem__(self, key: str) -> Union[float, int, str]: return self.mapping[key] else: # continuous variable - if self.types[key] == float: + if self.types[key] is float: return float(self.position[self.offsets[key]].item()) - elif self.types[key] == int: + elif self.types[key] is int: return int(np.rint(self.position[self.offsets[key]]).item()) - elif self.types[key] == str: + elif self.types[key] is str: offset = self.offsets[key] upper = self.offsets[key] + len(self.limits[key]) return str( @@ -121,13 +121,13 @@ def __setitem__(self, key: str, newvalue: Union[float, int, str, Any]) -> None: else: if key not in self.limits: raise ValueError("Unknown gene.") - if self.types[key] == float: + if self.types[key] is float: assert isinstance(newvalue, float) self.position[self.offsets[key]] = newvalue - elif self.types[key] == int: + elif self.types[key] is int: assert isinstance(newvalue, int) self.position[self.offsets[key]] = float(newvalue) - elif self.types[key] == str: + elif self.types[key] is str: assert newvalue in self.limits[key] offset = self.offsets[key] upper = len(self.limits[key])