diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 48440f6f..6477fe9c 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -5,6 +5,7 @@ repos: hooks: - id: trailing-whitespace - id: end-of-file-fixer + exclude: 'coverage' - id: check-yaml - id: check-added-large-files - repo: https://github.com/astral-sh/ruff-pre-commit diff --git a/README.md b/README.md index 89ec736d..4fd5e10f 100644 --- a/README.md +++ b/README.md @@ -93,6 +93,8 @@ discussions](https://github.com/Helmholtz-AI-Energy/propulate/discussions) :octo ## Installation +- Installations of MPI and hdf5 are required. +- ``Propulate`` depends on parallel hdf5. Try ``CC="mpicc" HDF5_MPI="ON" pip install --no-binary=h5py h5py`` to install the parallel enabled Python bindings or follow instructions [here](https://docs.h5py.org/en/stable/build.html#install). - You can install the **latest stable release** from PyPI: ``pip install propulate`` - If you need the **latest updates**, you can also install ``Propulate`` directly from the master branch. Pull and run ``pip install .``. diff --git a/propulate/_globals.py b/propulate/_globals.py index 43e38374..ba04846b 100644 --- a/propulate/_globals.py +++ b/propulate/_globals.py @@ -2,5 +2,4 @@ SYNCHRONIZATION_TAG = 2 INIT_TAG = 3 POPULATION_TAG = 4 -DUMP_TAG = 5 MIGRATION_TAG = 6 diff --git a/propulate/migrator.py b/propulate/migrator.py index 0442a21b..003e6efa 100644 --- a/propulate/migrator.py +++ b/propulate/migrator.py @@ -4,6 +4,7 @@ from pathlib import Path from typing import Callable, Generator, List, Optional, Type, Union +import h5py import numpy as np from mpi4py import MPI @@ -123,7 +124,7 @@ def __init__( Individual ] = [] # Emigrated individuals to be deactivated on sending island - def _send_emigrants(self) -> None: + def _send_emigrants(self, hdf5_checkpoint: h5py.File) -> None: """Perform migration, i.e. island sends individuals out to other islands.""" log_string = ( f"Island {self.island_idx} Worker {self.island_comm.rank} " @@ -188,6 +189,10 @@ def _send_emigrants(self) -> None: # Send emigrants to target island. departing = copy.deepcopy(emigrants) + for ind in departing: + hdf5_checkpoint[f"{ind.island}"][f"{ind.island_rank}"][ + "active_on_island" + ][ind.generation, self.island_idx] = False # Determine new responsible worker on target island. for ind in departing: ind.current = self.rng.randrange(0, count) @@ -236,7 +241,7 @@ def _send_emigrants(self) -> None: f"to select {num_emigrants} migrants." ) - def _receive_immigrants(self) -> None: + def _receive_immigrants(self, hdf5_checkpoint: h5py.File) -> None: """ Check for and possibly receive immigrants send by other islands. @@ -284,6 +289,9 @@ def _receive_immigrants(self) -> None: log_string + f"Identical immigrant {immigrant} already active on target island {self.island_idx}." ) + hdf5_checkpoint[f"{immigrant.island}"][f"{immigrant.island_rank}"][ + "active_on_island" + ][immigrant.generation] = True self.population.append( copy.deepcopy(immigrant) ) # Append immigrant to population. @@ -421,53 +429,51 @@ def propulate(self, logging_interval: int = 10, debug: int = 1) -> None: if self.propulate_comm is None: while self.generations <= -1 or self.generation < self.generations: # Breed and evaluate individual. - self._evaluate_individual() + # TODO this should be refactored, the subworkers don't need the logfile + # TODO this needs to be addressed before merge, since multirank workers should fail with this + self._evaluate_individual(None) self.generation += 1 return if self.island_comm.rank == 0: log.info(f"Island {self.island_idx} has {self.island_comm.size} workers.") - dump = True if self.island_comm.rank == 0 else False migration = True if self.migration_prob > 0 else False self.propulate_comm.barrier() # Loop over generations. - while self.generations <= -1 or self.generation < self.generations: - if self.generation % int(logging_interval) == 0: - log.info( - f"Island {self.island_idx} Worker {self.island_comm.rank}: In generation {self.generation}..." - ) - - # Breed and evaluate individual. - self._evaluate_individual() - - # Check for and possibly receive incoming individuals from other intra-island workers. - self._receive_intra_island_individuals() + # TODO this should probably be refactored, checkpointing can probably be handled in one place + with h5py.File( + self.checkpoint_path, "a", driver="mpio", comm=self.propulate_comm + ) as f: + while self.generation < self.generations: + if self.generation % int(logging_interval) == 0: + log.info( + f"Island {self.island_idx} Worker {self.island_comm.rank}: In generation {self.generation}..." + ) - # Migration. - if migration: - # Emigration: Island sends individuals out. - # Happens on per-worker basis with certain probability. - if self.rng.random() < self.migration_prob: - self._send_emigrants() + # Breed and evaluate individual. + self._evaluate_individual(f) - # Immigration: Check for incoming individuals from other islands. - self._receive_immigrants() + # Check for and possibly receive incoming individuals from other intra-island workers. + self._receive_intra_island_individuals() - # Emigration: Check for emigrants from other intra-island workers to be deactivated. - self._deactivate_emigrants() - if debug == 2: - check = self._check_emigrants_to_deactivate() - assert check is False + # Migration. + if migration: + # Emigration: Island sends individuals out. + # Happens on per-worker basis with certain probability. + if self.rng.random() < self.migration_prob: + self._send_emigrants(f) - if dump: # Dump checkpoint. - self._dump_checkpoint() + # Immigration: Check for incoming individuals from other islands. + self._receive_immigrants(f) - dump = ( - self._determine_worker_dumping_next() - ) # Determine worker dumping checkpoint in the next generation. - self.generation += 1 # Go to next generation. + # Emigration: Check for emigrants from other intra-island workers to be deactivated. + self._deactivate_emigrants() + if debug == 2: + check = self._check_emigrants_to_deactivate() + assert check is False + self.generation += 1 # Go to next generation. # Having completed all generations, the workers have to wait for each other. # Once all workers are done, they should check for incoming messages once again @@ -486,7 +492,10 @@ def propulate(self, logging_interval: int = 10, debug: int = 1) -> None: if migration: # Final check for incoming individuals from other islands. - self._receive_immigrants() + with h5py.File( + self.checkpoint_path, "a", driver="mpio", comm=self.propulate_comm + ) as f: + self._receive_immigrants(f) self.propulate_comm.barrier() # Emigration: Final check for emigrants from other intra-island workers to be deactivated. @@ -507,12 +516,3 @@ def propulate(self, logging_interval: int = 10, debug: int = 1) -> None: raise ValueError( "There should not be any individuals left that need to be deactivated." ) - - self.propulate_comm.barrier() - - # Final checkpointing on rank 0. - if self.island_comm.rank == 0: - self._dump_final_checkpoint() # Dump checkpoint. - self.propulate_comm.barrier() - _ = self._determine_worker_dumping_next() - self.propulate_comm.barrier() diff --git a/propulate/pollinator.py b/propulate/pollinator.py index 67e3ab27..149cdedf 100644 --- a/propulate/pollinator.py +++ b/propulate/pollinator.py @@ -1,9 +1,11 @@ import copy import logging import random +import time from pathlib import Path from typing import Callable, Generator, List, Optional, Tuple, Type, Union +import h5py import numpy as np from mpi4py import MPI @@ -204,7 +206,8 @@ def _send_emigrants(self) -> None: f"to select {num_emigrants} migrants." ) - def _receive_immigrants(self) -> None: + # TODO implement checkpoint update + def _receive_immigrants(self, hdf5_checkpoint: h5py.File) -> None: """Check for and possibly receive immigrants send by other islands.""" replace_num = 0 log_string = ( @@ -400,53 +403,54 @@ def propulate(self, logging_interval: int = 10, debug: int = 1) -> None: debug : int, optional The debug level; 0 - silent; 1 - moderate, 2 - noisy (debug mode). Default is 1. """ + self.start_time = time.time_ns() if self.worker_sub_comm != MPI.COMM_SELF: self.generation = self.worker_sub_comm.bcast(self.generation, root=0) if self.propulate_comm is None: while self.generations <= -1 or self.generation < self.generations: # Breed and evaluate individual. - self._evaluate_individual() + # TODO this should be refactored, the subworkers don't need the logfile + # TODO this needs to be addressed before merge, since multirank workers should fail with this + self._evaluate_individual(None) self.generation += 1 return if self.island_comm.rank == 0: log.info(f"Island {self.island_idx} has {self.island_comm.size} workers.") - dump = True if self.island_comm.rank == 0 else False migration = True if self.migration_prob > 0 else False self.propulate_comm.barrier() # Loop over generations. - while self.generations <= -1 or self.generation < self.generations: - if debug == 1 and self.generation % int(logging_interval) == 0: - log.info( - f"Island {self.island_idx} Worker {self.island_comm.rank}: In generation {self.generation}..." - ) - - # Breed and evaluate individual. - self._evaluate_individual() + # TODO this should probably be refactored, checkpointing can probably be handled in one place + with h5py.File( + self.checkpoint_path, "a", driver="mpio", comm=self.propulate_comm + ) as f: + while self.generation < self.generations: + if self.generation % int(logging_interval) == 0: + log.info( + f"Island {self.island_idx} Worker {self.island_comm.rank}: In generation {self.generation}..." + ) - # Check for and possibly receive incoming individuals from other intra-island workers. - self._receive_intra_island_individuals() + # Breed and evaluate individual. + self._evaluate_individual(f) - if migration: - # Emigration: Island sends individuals out. - # Happens on per-worker basis with certain probability. - if self.rng.random() < self.migration_prob: - self._send_emigrants() + # Check for and possibly receive incoming individuals from other intra-island workers. + self._receive_intra_island_individuals() - # Immigration: Island checks for incoming individuals from other islands. - self._receive_immigrants() + if migration: + # Emigration: Island sends individuals out. + # Happens on per-worker basis with certain probability. + if self.rng.random() < self.migration_prob: + self._send_emigrants() - # Immigration: Check for individuals replaced by other intra-island workers to be deactivated. - self._deactivate_replaced_individuals() + # Immigration: Island checks for incoming individuals from other islands. + # TODO this should probably update the checkpoint so it needs to pass the handle + self._receive_immigrants(None) - if dump: # Dump checkpoint. - self._dump_checkpoint() + # Immigration: Check for individuals replaced by other intra-island workers to be deactivated. + self._deactivate_replaced_individuals() - dump = ( - self._determine_worker_dumping_next() - ) # Determine worker dumping checkpoint in the next generation. - self.generation += 1 # Go to next generation. + self.generation += 1 # Go to next generation. # Having completed all generations, the workers have to wait for each other. # Once all workers are done, they should check for incoming messages once again @@ -464,7 +468,8 @@ def propulate(self, logging_interval: int = 10, debug: int = 1) -> None: if migration: # Final check for incoming individuals from other islands. - self._receive_immigrants() + # TODO this needs to update the checkpoint + self._receive_immigrants(None) self.propulate_comm.barrier() # Immigration: Final check for individuals replaced by other intra-island workers to be deactivated. @@ -477,11 +482,3 @@ def propulate(self, logging_interval: int = 10, debug: int = 1) -> None: f"Finally {len(self.replaced)} individual(s) in replaced: {self.replaced}:\n{self.population}" ) self._deactivate_replaced_individuals() - self.propulate_comm.barrier() - - # Final checkpointing on rank 0. - if self.island_comm.rank == 0: - self._dump_final_checkpoint() # Dump checkpoint. - self.propulate_comm.barrier() - _ = self._determine_worker_dumping_next() - self.propulate_comm.barrier() diff --git a/propulate/population.py b/propulate/population.py index 2f65361b..5ffe86e2 100644 --- a/propulate/population.py +++ b/propulate/population.py @@ -85,6 +85,7 @@ def __init__( self.migration_history: str = "" # migration history self.evaltime = float("inf") # evaluation time self.evalperiod = 0.0 # evaluation duration + self.island_rank = 0 # rank in the island comm # NOTE needed for PSO type propagators self.velocity = velocity @@ -99,11 +100,11 @@ def __getitem__(self, key: str) -> Union[float, int, str]: return self.mapping[key] else: # continuous variable - if self.types[key] == float: + if self.types[key] is float: return float(self.position[self.offsets[key]].item()) - elif self.types[key] == int: + elif self.types[key] is int: return int(np.rint(self.position[self.offsets[key]]).item()) - elif self.types[key] == str: + elif self.types[key] is str: offset = self.offsets[key] upper = self.offsets[key] + len(self.limits[key]) return str( @@ -120,13 +121,13 @@ def __setitem__(self, key: str, newvalue: Union[float, int, str, Any]) -> None: else: if key not in self.limits: raise ValueError("Unknown gene.") - if self.types[key] == float: + if self.types[key] is float: assert isinstance(newvalue, float) self.position[self.offsets[key]] = newvalue - elif self.types[key] == int: + elif self.types[key] is int: assert isinstance(newvalue, int) self.position[self.offsets[key]] = float(newvalue) - elif self.types[key] == str: + elif self.types[key] is str: assert newvalue in self.limits[key] offset = self.offsets[key] upper = len(self.limits[key]) diff --git a/propulate/propagators/base.py b/propulate/propagators/base.py index f56b7796..27b9820a 100644 --- a/propulate/propagators/base.py +++ b/propulate/propagators/base.py @@ -73,6 +73,10 @@ def __init__( if rng is None: rng = random.Random() self.rng = rng # Random number generator + # TODO + self.limits: Mapping[ + str, Union[Tuple[float, float], Tuple[int, int], Tuple[str, ...]] + ] = dict() def __call__(self, inds: List[Individual]) -> Union[List[Individual], Individual]: """ @@ -177,6 +181,9 @@ class Conditional(Propagator): def __init__( self, + limits: Mapping[ + str, Union[Tuple[float, float], Tuple[int, int], Tuple[str, ...]] + ], pop_size: int, true_prop: Propagator, false_prop: Propagator, @@ -203,6 +210,7 @@ def __init__( self.pop_size = pop_size self.true_prop = true_prop self.false_prop = false_prop + self.limits = limits def __call__(self, inds: List[Individual]) -> Union[List[Individual], Individual]: """ diff --git a/propulate/propagators/cmaes.py b/propulate/propagators/cmaes.py index a81d76ea..3acbfd87 100644 --- a/propulate/propagators/cmaes.py +++ b/propulate/propagators/cmaes.py @@ -751,6 +751,7 @@ def __init__( The separate random number generator for the Propulate optimization. """ self.adapter = adapter + self.limits = limits problem_dimension = len(limits) # Number of individuals considered for each generation lambd = ( diff --git a/propulate/propagators/pso.py b/propulate/propagators/pso.py index 2748c482..b325bebb 100644 --- a/propulate/propagators/pso.py +++ b/propulate/propagators/pso.py @@ -270,7 +270,7 @@ def __call__(self, individuals: List[Individual]) -> Individual: individuals : List[propulate.population.Individual] The list of individuals that must at least contain one individual that belongs to the propagator. This list is used to calculate personal and global best of the particle and the swarm, - respectively, and then to update the particle based on the retrieved results. + respectively, and then to update the particle based on the retrieved results. cannot be used as ``Individual`` objects are converted to particles first. Returns diff --git a/propulate/propulator.py b/propulate/propulator.py index 1a9f0531..4f8db028 100644 --- a/propulate/propulator.py +++ b/propulate/propulator.py @@ -2,20 +2,20 @@ import inspect import logging import os -import pickle import random import time from operator import attrgetter from pathlib import Path -from typing import Callable, Final, Generator, List, Optional, Tuple, Type, Union +from typing import Any, Callable, Final, Generator, List, Optional, Tuple, Type, Union import deepdiff +import h5py import numpy as np from mpi4py import MPI -from ._globals import DUMP_TAG, INDIVIDUAL_TAG +from ._globals import INDIVIDUAL_TAG from .population import Individual -from .propagators import Propagator, SelectMin +from .propagators import BasicPSO, Conditional, Propagator, SelectMin from .surrogate import Surrogate log = logging.getLogger(__name__) # Get logger instance. @@ -167,46 +167,203 @@ def __init__( self.island_displs = ( island_displs # Propulate world rank of each island's worker ) - self.island_counts = island_counts # Number of workers on each island + if island_counts is None: + self.island_counts = np.array([self.island_comm.Get_size()]) + else: + self.island_counts = island_counts # Number of workers on each island self.emigration_propagator = emigration_propagator # Emigration propagator self.rng = rng # Load initial population of evaluated individuals from checkpoint if exists. - load_ckpt_file = self.checkpoint_path / f"island_{self.island_idx}_ckpt.pickle" - if not os.path.isfile(load_ckpt_file): # If not exists, check for backup file. - load_ckpt_file = load_ckpt_file.with_suffix(".bkp") - - if os.path.isfile(load_ckpt_file): - with open(load_ckpt_file, "rb") as f: - try: - self.population = pickle.load(f) - self.generation = ( - max( - [ - x.generation - for x in self.population - if x.rank == self.island_comm.rank - ] - ) - + 1 - ) # Determine generation to be evaluated next from population checkpoint. - if self.island_comm.rank == 0: - log.info( - "Valid checkpoint file found. " - f"Resuming from generation {self.generation} of loaded population..." - ) - except OSError: - self.population = [] - if self.island_comm.rank == 0: - log.info( - "No valid checkpoint file. Initializing population randomly..." - ) + self.checkpoint_path = self.checkpoint_path / "ckpt.hdf5" + + self.population: list[Individual] = [] + # consistency check and ensure enough space is allocated + if os.path.isfile(self.checkpoint_path): + self.load_checkpoint() + if self.propulate_comm.rank == 0: + log.info( + "Valid checkpoint file found. " + f"Resuming from generation {self.generation} of loaded population..." + ) + # TODO it says resuming from generation 0, so something is not right + # TODO also each worker might be on a different generation so this message probably does not make all of the sense else: - self.population = [] - if self.island_comm.rank == 0: + if self.propulate_comm.rank == 0: log.info( "No valid checkpoint file given. Initializing population randomly..." ) + self.set_up_checkpoint() + + def load_checkpoint(self) -> None: + """Load checkpoint from HDF5 file. Since this is only a read, all workers can do this in read-only mode without the mpio driver.""" + # TODO check that the island and worker setup is the same as in the checkpoint + # NOTE each individual is only stored once at the position given by its origin island and worker, the modifications have to be put in the checkpoint file during migration TODO test if this works as intended reliably + # TODO get the started but not yet completed ones from the difference in start time and evaltime + # TODO only load an incomplete one, if you're then going to evaluate it + + with h5py.File(self.checkpoint_path, "r", driver=None) as f: + # NOTE check limits are consistent + limitsgroup = f["limits"] + if set(limitsgroup.attrs.keys()) != set(self.propagator.limits): + raise RuntimeError("Limits inconsistent with checkpoint") + # TODO check island sizes are consistent + + self.generation = int(f["generations"][self.propulate_comm.Get_rank()]) + if ( + f[f"{self.island_idx}"][f"{self.island_comm.Get_rank()}"]["evalperiod"][ + self.generation + ] + > 0.0 + ): + self.generation += 1 + # NOTE load individuals, since they might have migrated, every worker has to check each dataset + num_islands = len(self.island_counts) + for i in range(num_islands): + islandgroup = f[f"{i}"] + for rank in range(self.island_counts[i]): + for generation in range(f["generations"][rank] + 1): + if islandgroup[f"{rank}"]["active_on_island"][generation][ + self.island_idx + ]: + ind = Individual( + islandgroup[f"{rank}"]["x"][generation, 0], + self.propagator.limits, + ) + # TODO check what rank was used for, i think it was the rank in the propulator_comm + ind.rank = rank + ind.island = self.island_idx + ind.current = islandgroup[f"{rank}"]["current"][generation] + # TODO velocity loading + # if len(group[f"{rank}"].shape) > 1: + # ind.velocity = islandgroup[f"{rank}"]["x"][generation, 1] + ind.loss = islandgroup[f"{rank}"]["loss"][generation] + # ind.startime = islandgroup[f"{rank}"]["starttime"][generation] + ind.evaltime = islandgroup[f"{rank}"]["evaltime"][ + generation + ] + ind.evalperiod = islandgroup[f"{rank}"]["evalperiod"][ + generation + ] + ind.generation = generation + ind.island_rank = rank + self.population.append(ind) + if ind.loss is None: + # TODO resume evaluation on this individual + raise + + def set_up_checkpoint(self) -> None: + """Initialize checkpoint file or check consistenct with an existing one.""" + limit_dim = 0 + for key in self.propagator.limits: + if isinstance(self.propagator.limits[key][0], str): + limit_dim += len(self.propagator.limits[key]) + else: + limit_dim += 1 + + num_islands = len(self.island_counts) + + # TODO this can probably be done without mpi just on rank 0 + with h5py.File( + self.checkpoint_path, "a", driver="mpio", comm=self.propulate_comm + ) as f: + # limits + limitsgroup = f.require_group("limits") + for key in self.propagator.limits: + limitsgroup.attrs[key] = str(self.propagator.limits[key]) + + xdim = 1 + # TODO clean this up when reorganizing propagators + if isinstance(self.propagator, BasicPSO) or ( + isinstance(self.propagator, Conditional) + and isinstance(self.propagator.true_prop, BasicPSO) + ): + xdim = 2 + + oldgenerations = self.generations + if "0" in f: + oldgenerations = f["0"]["0"]["x"].shape[0] + # Store per worker what generation they are at, since islands can be different sizes, it's flat + f.require_dataset( + "generations", + (self.propulate_comm.Get_size(),), + dtype=np.int32, + data=np.zeros((self.propulate_comm.Get_size(),), dtype=np.int32), + ) + + # population + for i in range(num_islands): + f.require_group(f"{i}") + for worker_idx in range(self.island_counts[i]): + group = f[f"{i}"].require_group(f"{worker_idx}") + if oldgenerations < self.generations: + group["x"].resize(self.generations, axis=0) + group["loss"].resize(self.generations, axis=0) + group["current"].resize(self.generations, axis=0) + group["migration_steps"].resize(self.generations, axis=0) + group["starttime"].resize(self.generations, axis=0) + group["evaltime"].resize(self.generations, axis=0) + group["evalperiod"].resize(self.generations, axis=0) + group["active_on_island"].resize(self.generations, axis=0) + + group.require_dataset( + "x", + (self.generations, xdim, limit_dim), + dtype=np.float32, + chunks=True, + maxshape=(None, xdim, limit_dim), + ) + group.require_dataset( + "loss", + (self.generations,), + np.float32, + chunks=True, + maxshape=(None,), + ) + group.require_dataset( + "current", + (self.generations,), + np.int16, + chunks=True, + maxshape=(None,), + ) + group.require_dataset( + "migration_steps", + (self.generations,), + np.int32, + chunks=True, + maxshape=(None,), + ) + group.require_dataset( + "starttime", + (self.generations,), + np.uint64, + chunks=True, + maxshape=(None,), + ) + group.require_dataset( + "evaltime", + (self.generations,), + np.uint64, + chunks=True, + maxshape=(None,), + ) + group.require_dataset( + "evalperiod", + (self.generations,), + np.uint64, + chunks=True, + maxshape=(None,), + data=-1 * np.ones((self.generations,)), + ) + group.require_dataset( + "active_on_island", + (self.generations, num_islands), + dtype=bool, + chunks=True, + maxshape=(None, None), + data=np.zeros((self.generations, num_islands), dtype=bool), + ) def _get_active_individuals(self) -> Tuple[List[Individual], int]: """ @@ -259,10 +416,22 @@ def _breed(self) -> Individual: assert isinstance(ind, Individual) return ind # Return new individual. - def _evaluate_individual(self) -> None: + def _evaluate_individual(self, hdf5_checkpoint: h5py.File) -> None: """Breed and evaluate individual.""" ind = self._breed() # Breed new individual. - start_time = time.time() # Start evaluation timer. + ind.island_rank = self.island_comm.Get_rank() + start_time = time.time_ns() - self.start_time # Start evaluation timer. + # ind.starttime = start_time + ckpt_idx = ind.generation + hdf5_checkpoint["generations"][self.propulate_comm.Get_rank()] = ind.generation + + group = hdf5_checkpoint[f"{self.island_idx}"][f"{self.island_comm.Get_rank()}"] + # save candidate + group["x"][ckpt_idx, 0, :] = ind.position[:] + if ind.velocity is not None: + group["x"][ckpt_idx, 1, :] = ind.velocity[:] + group["starttime"][ckpt_idx] = start_time + group["current"][ckpt_idx] = ind.current # Signal start of run to surrogate model. if self.surrogate is not None: @@ -285,6 +454,7 @@ def loss_gen(individual: Individual) -> Generator[float, None, None]: for last in loss_gen(ind): if self.surrogate is not None: if self.surrogate.cancel(last): # Check cancel for each yield. + assert ind.loss == float("inf") log.debug( f"Island {self.island_idx} Worker {self.island_comm.rank} Generation {self.generation}: PRUNING\n" f"{ind}" @@ -303,12 +473,22 @@ def loss_fn(individual: Individual) -> float: ind.loss = float(loss_fn(ind)) # Evaluate its loss. + ind.evaltime = time.time_ns() - self.start_time # Stop evaluation timer. + ind.evalperiod = ind.evaltime - start_time # Calculate evaluation duration. + # save result for candidate + group["evaltime"][ckpt_idx] = ind.evaltime + group["evalperiod"][ckpt_idx] = ind.evalperiod + group["active_on_island"][ckpt_idx, self.island_idx] = True + # TODO fix evalperiod for resumed from checkpoint individuals + # TODO somehow store migration history, maybe just as islands_visited + + group["loss"][ckpt_idx] = ind.loss # Add final value to surrogate. if self.surrogate is not None: self.surrogate.update(ind.loss) if self.propulate_comm is None: return - ind.evaltime = time.time() # Stop evaluation timer. + ind.evaltime = time.time_ns() - self.start_time # Stop evaluation timer. ind.evalperiod = ind.evaltime - start_time # Calculate evaluation duration. self.population.append( ind @@ -376,7 +556,7 @@ def _receive_intra_island_individuals(self) -> None: ) log.debug(log_string) - def _send_emigrants(self) -> None: + def _send_emigrants(self, *args: Any, **kwargs: Any) -> None: """ Perform migration, i.e., island sends individuals out to other islands. @@ -388,7 +568,7 @@ def _send_emigrants(self) -> None: """ raise NotImplementedError - def _receive_immigrants(self) -> None: + def _receive_immigrants(self, *args: Any, **kwargs: Any) -> None: """ Check for and possibly receive immigrants send by other islands. @@ -469,112 +649,57 @@ def propulate(self, logging_interval: int = 10, debug: int = -1) -> None: If any individuals are left that should have been deactivated before (only for debug > 0). """ + self.start_time = time.time_ns() if self.worker_sub_comm != MPI.COMM_SELF: self.generation = self.worker_sub_comm.bcast(self.generation, root=0) if self.propulate_comm is None: - while self.generations <= -1 or self.generation < self.generations: + while self.generation < self.generations: # Breed and evaluate individual. - self._evaluate_individual() + # TODO this should be refactored, the subworkers don't need the logfile + # TODO this needs to be addressed before merge, since multirank workers should fail with this + self._evaluate_individual(None) self.generation += 1 return if self.island_comm.rank == 0: log.info(f"Island {self.island_idx} has {self.island_comm.size} workers.") - dump = True if self.island_comm.rank == 0 else False - self.propulate_comm.barrier() - # Loop over generations. - while self.generations <= -1 or self.generation < self.generations: - if self.generation % int(logging_interval) == 0: - log.info( - f"Island {self.island_idx} Worker {self.island_comm.rank}: In generation {self.generation}..." - ) - - # Breed and evaluate individual. - self._evaluate_individual() - - # Check for and possibly receive incoming individuals from other intra-island workers. - self._receive_intra_island_individuals() + with h5py.File( + self.checkpoint_path, "a", driver="mpio", comm=self.propulate_comm + ) as f: + while self.generation < self.generations: + if self.generation % int(logging_interval) == 0: + log.info( + f"Island {self.island_idx} Worker {self.island_comm.rank}: In generation {self.generation}..." + ) - if dump: # Dump checkpoint. - self._dump_checkpoint() + # Breed and evaluate individual. + log.debug(f"breeding and evaluating {self.generation}") + self._evaluate_individual(f) - dump = ( - self._determine_worker_dumping_next() - ) # Determine worker dumping checkpoint in the next generation. + # Check for and possibly receive incoming individuals from other intra-island workers. + log.debug(f"receive intra island {self.generation}") + self._receive_intra_island_individuals() - # Go to next generation. - self.generation += 1 + # Go to next generation. + self.generation += 1 # Having completed all generations, the workers have to wait for each other. # Once all workers are done, they should check for incoming messages once again # so that each of them holds the complete final population and the found optimum # irrespective of the order they finished. + log.debug("barrier") if self.propulate_comm.rank == 0: log.info("OPTIMIZATION DONE.\nNEXT: Final checks for incoming messages...") self.propulate_comm.barrier() # Final check for incoming individuals evaluated by other intra-island workers. + log.debug("final sync") self._receive_intra_island_individuals() self.propulate_comm.barrier() - # Final checkpointing on rank 0. - if self.island_comm.rank == 0: - self._dump_final_checkpoint() # Dump checkpoint. - self.propulate_comm.barrier() - _ = self._determine_worker_dumping_next() - self.propulate_comm.barrier() - - def _dump_checkpoint(self) -> None: - """Dump checkpoint to file.""" - log.debug( - f"Island {self.island_idx} Worker {self.island_comm.rank} Generation {self.generation}: " - f"Dumping checkpoint..." - ) - save_ckpt_file = self.checkpoint_path / f"island_{self.island_idx}_ckpt.pickle" - if os.path.isfile(save_ckpt_file): - try: - os.replace(save_ckpt_file, save_ckpt_file.with_suffix(".bkp")) - except OSError as e: - log.warning(e) - with open(save_ckpt_file, "wb") as f: - pickle.dump(self.population, f) - - dest = ( - self.island_comm.rank + 1 - if self.island_comm.rank + 1 < self.island_comm.size - else 0 - ) - self.island_comm.send(True, dest=dest, tag=DUMP_TAG) - - def _determine_worker_dumping_next(self) -> bool: - """Determine the worker who dumps the checkpoint in the next generation.""" - dump = False - stat = MPI.Status() - probe_dump = self.island_comm.iprobe( - source=MPI.ANY_SOURCE, tag=DUMP_TAG, status=stat - ) - if probe_dump: - dump = self.island_comm.recv(source=stat.Get_source(), tag=DUMP_TAG) - log.debug( - f"Island {self.island_idx} Worker {self.island_comm.rank} Generation {self.generation}: " - f"Going to dump next: {dump}. Before: Worker {stat.Get_source()}" - ) - return dump - - def _dump_final_checkpoint(self) -> None: - """Dump final checkpoint.""" - save_ckpt_file = self.checkpoint_path / f"island_{self.island_idx}_ckpt.pickle" - if os.path.isfile(save_ckpt_file): - try: - os.replace(save_ckpt_file, save_ckpt_file.with_suffix(".bkp")) - except OSError as e: - log.warning(e) - with open(save_ckpt_file, "wb") as f: - pickle.dump(self.population, f) - def _check_for_duplicates( self, active: bool, debug: int = 1 ) -> Tuple[List[List[Union[Individual, int]]], List[Individual]]: @@ -677,9 +802,9 @@ def summarize( ) self.propulate_comm.barrier() if debug == 0: - best = min(self.population, key=attrgetter("loss")) + best = [min(self.population, key=attrgetter("loss"))] if self.island_comm.rank == 0: - log.info(f"Top result on island {self.island_idx}: {best}") + log.info(f"Top result on island {self.island_idx}: {best[0]}") else: unique_pop = self._get_unique_individuals() unique_pop.sort(key=lambda x: x.loss) diff --git a/propulate/utils/__init__.py b/propulate/utils/__init__.py index a37942a7..a911c5de 100644 --- a/propulate/utils/__init__.py +++ b/propulate/utils/__init__.py @@ -92,7 +92,7 @@ def get_default_propagator( init = InitUniform(limits, rng=rng) propagator = Conditional( - pop_size, propagator, init + limits, pop_size, propagator, init ) # Initialize random if population size < specified `pop_size`. return propagator diff --git a/tests/test_cmaes.py b/tests/test_cmaes.py index b227add3..63e85be0 100644 --- a/tests/test_cmaes.py +++ b/tests/test_cmaes.py @@ -1,3 +1,4 @@ +import logging import pathlib import random @@ -5,8 +6,11 @@ from propulate import Propulator from propulate.propagators import ActiveCMA, BasicCMA, CMAAdapter, CMAPropagator +from propulate.utils import set_logger_config from propulate.utils.benchmark_functions import get_function_search_space +log = logging.getLogger("propulate") # Get logger instance. + @pytest.fixture(params=[BasicCMA(), ActiveCMA()]) def cma_adapter(request: pytest.FixtureRequest) -> CMAAdapter: @@ -27,6 +31,7 @@ def test_cmaes_basic(cma_adapter: CMAAdapter, mpi_tmp_path: pathlib.Path) -> Non mpi_tmp_path : pathlib.Path The temporary checkpoint directory. """ + set_logger_config() rng = random.Random(42) # Separate random number generator for optimization. benchmark_function, limits = get_function_search_space("sphere") # Set up evolutionary operator. @@ -43,3 +48,4 @@ def test_cmaes_basic(cma_adapter: CMAAdapter, mpi_tmp_path: pathlib.Path) -> Non ) # Run optimization and print summary of results. propulator.propulate() + log.handlers.clear() diff --git a/tests/test_island.py b/tests/test_island.py index 720c711e..d2c940bc 100644 --- a/tests/test_island.py +++ b/tests/test_island.py @@ -1,4 +1,5 @@ import copy +import logging import pathlib import random from typing import Callable, Dict, Tuple @@ -13,6 +14,8 @@ from propulate.utils import get_default_propagator, set_logger_config from propulate.utils.benchmark_functions import get_function_search_space +log = logging.getLogger("propulate") # Get logger instance. + @pytest.fixture(scope="module") def global_variables() -> Tuple[random.Random, Callable, Dict, Propagator]: @@ -62,8 +65,8 @@ def test_islands( mpi_tmp_path : pathlib.Path The temporary checkpoint directory. """ + set_logger_config(level=logging.DEBUG) rng, benchmark_function, limits, propagator = global_variables - set_logger_config(log_file=mpi_tmp_path / "log.log") # Set up island model. islands = Islands( @@ -82,6 +85,7 @@ def test_islands( debug=2, ) islands.summarize(debug=2) + log.handlers.clear() @pytest.mark.mpi(min_size=4) @@ -101,15 +105,17 @@ def test_checkpointing_isolated( mpi_tmp_path : pathlib.Path The temporary checkpoint directory. """ + first_generations = 20 + second_generations = 40 + set_logger_config(level=logging.DEBUG) rng, benchmark_function, limits, propagator = global_variables - set_logger_config(log_file=mpi_tmp_path / "log.log") # Set up island model. islands = Islands( loss_fn=benchmark_function, propagator=propagator, rng=rng, - generations=100, + generations=first_generations, num_islands=2, migration_probability=0.0, checkpoint_path=mpi_tmp_path, @@ -117,20 +123,25 @@ def test_checkpointing_isolated( # Run actual optimization. islands.propulate(debug=2) - islands.summarize(top_n=1, debug=2) + assert ( + len(islands.propulator.population) + == first_generations * islands.propulator.island_comm.Get_size() + ) old_population = copy.deepcopy(islands.propulator.population) del islands + MPI.COMM_WORLD.barrier() # Synchronize all processes. islands = Islands( loss_fn=benchmark_function, propagator=propagator, rng=rng, - generations=100, + generations=second_generations, num_islands=2, migration_probability=0.0, checkpoint_path=mpi_tmp_path, ) + assert len(old_population) == len(islands.propulator.population) assert ( len( @@ -140,6 +151,7 @@ def test_checkpointing_isolated( ) == 0 ) + log.handlers.clear() @pytest.mark.mpi(min_size=4) @@ -162,15 +174,17 @@ def test_checkpointing( mpi_tmp_path : pathlib.Path The temporary checkpoint directory. """ + first_generations = 20 + second_generations = 40 + set_logger_config() rng, benchmark_function, limits, propagator = global_variables - set_logger_config(log_file=mpi_tmp_path / "log.log") # Set up island model. islands = Islands( loss_fn=benchmark_function, propagator=propagator, rng=rng, - generations=100, + generations=first_generations, num_islands=2, migration_probability=0.9, pollination=pollination, @@ -188,7 +202,7 @@ def test_checkpointing( loss_fn=benchmark_function, propagator=propagator, rng=rng, - generations=100, + generations=second_generations, num_islands=2, migration_probability=0.9, pollination=pollination, @@ -203,6 +217,7 @@ def test_checkpointing( ) == 0 ) + log.handlers.clear() @pytest.mark.mpi(min_size=8) @@ -225,15 +240,17 @@ def test_checkpointing_unequal_populations( mpi_tmp_path : pathlib.Path The temporary checkpoint directory. """ + first_generations = 20 + second_generations = 40 + set_logger_config() rng, benchmark_function, limits, propagator = global_variables - set_logger_config(log_file=mpi_tmp_path / "log.log") # Set up island model. islands = Islands( loss_fn=benchmark_function, propagator=propagator, rng=rng, - generations=100, + generations=first_generations, num_islands=2, island_sizes=np.array([3, 5]), migration_probability=0.9, @@ -252,7 +269,7 @@ def test_checkpointing_unequal_populations( loss_fn=benchmark_function, propagator=propagator, rng=rng, - generations=100, + generations=second_generations, num_islands=2, island_sizes=np.array([3, 5]), migration_probability=0.9, @@ -260,6 +277,7 @@ def test_checkpointing_unequal_populations( checkpoint_path=mpi_tmp_path, ) + # TODO compare active only assert ( len( deepdiff.DeepDiff( @@ -268,3 +286,7 @@ def test_checkpointing_unequal_populations( ) == 0 ) + log.handlers.clear() + + +# TODO start from checkpoint with unevaluated candidates diff --git a/tests/test_multi_rank_workers.py b/tests/test_multi_rank_workers.py index 7bb99aa0..35083dfe 100644 --- a/tests/test_multi_rank_workers.py +++ b/tests/test_multi_rank_workers.py @@ -1,3 +1,4 @@ +import logging import pathlib import random from typing import Dict @@ -9,6 +10,8 @@ from propulate import Islands from propulate.utils import get_default_propagator, set_logger_config +log = logging.getLogger("propulate") # Get logger instance. + def parallel_sphere(params: Dict[str, float], comm: MPI.Comm = MPI.COMM_SELF) -> float: """ @@ -50,8 +53,8 @@ def test_multi_rank_workers(mpi_tmp_path: pathlib.Path) -> None: mpi_tmp_path : pathlib.Path The temporary checkpoint directory. """ + set_logger_config() full_world_comm = MPI.COMM_WORLD # Get full world communicator. - set_logger_config(log_file=mpi_tmp_path / "log.log") rng = random.Random(42 + full_world_comm.rank) limits = { @@ -84,3 +87,4 @@ def test_multi_rank_workers(mpi_tmp_path: pathlib.Path) -> None: debug=1, # Debug level ) islands.summarize(top_n=1, debug=1) + log.handlers.clear() diff --git a/tests/test_nm.py b/tests/test_nm.py index e8336912..555e0b75 100644 --- a/tests/test_nm.py +++ b/tests/test_nm.py @@ -1,3 +1,4 @@ +import logging import pathlib import random from typing import Tuple @@ -7,8 +8,11 @@ from propulate import Propulator from propulate.propagators import ParallelNelderMead +from propulate.utils import set_logger_config from propulate.utils.benchmark_functions import get_function_search_space +log = logging.getLogger("propulate") # Get logger instance. + @pytest.fixture( params=[ @@ -48,6 +52,7 @@ def test_cmaes( mpi_tmp_path : pathlib.Path The temporary checkpoint directory. """ + set_logger_config() rng = random.Random(42) # Separate random number generator for optimization. function, limits = get_function_search_space(function_parameters[0]) # Set up evolutionary operator. @@ -67,3 +72,4 @@ def test_cmaes( # Run optimization and print summary of results. propulator.propulate() propulator.summarize() + log.handlers.clear() diff --git a/tests/test_propulator.py b/tests/test_propulator.py index fc9f6ec3..8f7416a0 100644 --- a/tests/test_propulator.py +++ b/tests/test_propulator.py @@ -1,4 +1,5 @@ import copy +import logging import pathlib import random @@ -10,6 +11,8 @@ from propulate.utils import get_default_propagator, set_logger_config from propulate.utils.benchmark_functions import get_function_search_space +log = logging.getLogger("propulate") # Get logger instance. + @pytest.fixture( params=[ @@ -47,11 +50,11 @@ def test_propulator(function_name: str, mpi_tmp_path: pathlib.Path) -> None: mpi_tmp_path : pathlib.Path The temporary checkpoint directory. """ + set_logger_config() rng = random.Random( 42 + MPI.COMM_WORLD.rank ) # Random number generator for optimization benchmark_function, limits = get_function_search_space(function_name) - set_logger_config(log_file=mpi_tmp_path / "log.log") propagator = get_default_propagator( pop_size=4, limits=limits, @@ -65,6 +68,7 @@ def test_propulator(function_name: str, mpi_tmp_path: pathlib.Path) -> None: checkpoint_path=mpi_tmp_path, ) # Set up propulator performing actual optimization. propulator.propulate() # Run optimization and print summary of results. + log.handlers.clear() def test_propulator_checkpointing(mpi_tmp_path: pathlib.Path) -> None: @@ -78,6 +82,9 @@ def test_propulator_checkpointing(mpi_tmp_path: pathlib.Path) -> None: mpi_tmp_path : pathlib.Path The temporary checkpoint directory. """ + first_generations = 20 + second_generations = 40 + set_logger_config(level=logging.DEBUG) rng = random.Random( 42 + MPI.COMM_WORLD.rank ) # Separate random number generator for optimization @@ -91,12 +98,16 @@ def test_propulator_checkpointing(mpi_tmp_path: pathlib.Path) -> None: propulator = Propulator( loss_fn=benchmark_function, propagator=propagator, - generations=100, + generations=first_generations, checkpoint_path=mpi_tmp_path, rng=rng, ) # Set up propulator performing actual optimization. propulator.propulate() # Run optimization and print summary of results. + assert ( + len(propulator.population) + == first_generations * propulator.propulate_comm.Get_size() + ) old_population = copy.deepcopy( propulator.population @@ -107,14 +118,28 @@ def test_propulator_checkpointing(mpi_tmp_path: pathlib.Path) -> None: propulator = Propulator( loss_fn=benchmark_function, propagator=propagator, - generations=20, + generations=second_generations, checkpoint_path=mpi_tmp_path, rng=rng, ) # Set up new propulator starting from checkpoint. - # As the number of requested generations is smaller than the number of generations from the run before, - # no new evaluations are performed. Thus, the length of both Propulators' populations must be equal. assert ( len(deepdiff.DeepDiff(old_population, propulator.population, ignore_order=True)) == 0 ) + + propulator.propulate() + # NOTE make sure nothing was overwritten + seniors = [ + ind for ind in propulator.population if ind.generation < first_generations + ] + assert len(deepdiff.DeepDiff(old_population, seniors, ignore_order=True)) == 0 + assert ( + len(propulator.population) + == second_generations * propulator.propulate_comm.Get_size() + ) + + log.handlers.clear() + + +# TODO test loading a checkpoint with an unevaluated individual diff --git a/tests/test_pso.py b/tests/test_pso.py index 6a10adec..87235546 100644 --- a/tests/test_pso.py +++ b/tests/test_pso.py @@ -1,6 +1,9 @@ +import copy +import logging import pathlib import random +import deepdiff import pytest from mpi4py import MPI @@ -13,8 +16,10 @@ InitUniformPSO, VelocityClampingPSO, ) +from propulate.utils import set_logger_config from propulate.utils.benchmark_functions import get_function_search_space, sphere +log = logging.getLogger("propulate") # Get logger instance. limits = get_function_search_space("sphere")[1] rank = MPI.COMM_WORLD.rank rng = random.Random(42 + rank) @@ -59,7 +64,7 @@ def pso_propagator(request: pytest.FixtureRequest) -> Propagator: @pytest.mark.mpi def test_pso(pso_propagator: Propagator, mpi_tmp_path: pathlib.Path) -> None: """ - Test single worker using Propulator to optimize a benchmark function using the default genetic propagator. + Test a pso propagator. Parameters ---------- @@ -68,9 +73,10 @@ def test_pso(pso_propagator: Propagator, mpi_tmp_path: pathlib.Path) -> None: mpi_tmp_path : pathlib.Path The temporary checkpoint directory. """ - # Set up evolutionary operator. + set_logger_config() + # Set up pso propagator. init = InitUniformPSO(limits, rng=rng, rank=rank) - propagator = Conditional(1, pso_propagator, init) + propagator = Conditional(limits, 1, pso_propagator, init) # Set up propulator performing actual optimization. propulator = Propulator( @@ -83,3 +89,61 @@ def test_pso(pso_propagator: Propagator, mpi_tmp_path: pathlib.Path) -> None: # Run optimization and print summary of results. propulator.propulate() + log.handlers.clear() + + +@pytest.mark.mpi +def test_pso_checkpointing( + pso_propagator: BasicPSO, mpi_tmp_path: pathlib.Path +) -> None: + """ + Test velocity checkpointing when using a PSO propagator. + + Parameters + ---------- + pso_propagator : BasicPSO + The PSO propagator variant to test. + mpi_tmp_path : pathlib.Path + The temporary checkpoint directory. + """ + set_logger_config() + # Set up pso propagator. + init = InitUniformPSO(limits, rng=rng, rank=rank) + propagator = Conditional(limits, 1, pso_propagator, init) + + # Set up propulator performing actual optimization. + propulator = Propulator( + loss_fn=sphere, + propagator=propagator, + rng=rng, + generations=100, + checkpoint_path=mpi_tmp_path, + ) + + # Run optimization and print summary of results. + propulator.propulate() + + old_population = copy.deepcopy( + propulator.population + ) # Save population list from the last run. + del propulator # Delete propulator object. + MPI.COMM_WORLD.barrier() # Synchronize all processes. + + propulator = Propulator( + loss_fn=sphere, + propagator=propagator, + generations=20, + checkpoint_path=mpi_tmp_path, + rng=rng, + ) # Set up new propulator starting from checkpoint. + + # As the number of requested generations is smaller than the number of generations from the run before, + # no new evaluations are performed. Thus, the length of both Propulators' populations must be equal. + assert ( + len(deepdiff.DeepDiff(old_population, propulator.population, ignore_order=True)) + == 0 + ) + log.handlers.clear() + + +# TODO test resuming pso run from a non-pso checkpoint diff --git a/tutorials/pso_example.py b/tutorials/pso_example.py index dc14df60..8665e041 100644 --- a/tutorials/pso_example.py +++ b/tutorials/pso_example.py @@ -89,7 +89,7 @@ else: raise ValueError("Invalid PSO propagator name given.") init = InitUniformPSO(limits, rng=rng, rank=MPI.COMM_WORLD.rank) - propagator = Conditional(config.pop_size, pso_propagator, init) + propagator = Conditional(limits, config.pop_size, pso_propagator, init) propulator = Propulator( benchmark_function,