diff --git a/propulate/_globals.py b/propulate/_globals.py index 43e38374..2914bb26 100644 --- a/propulate/_globals.py +++ b/propulate/_globals.py @@ -2,5 +2,4 @@ SYNCHRONIZATION_TAG = 2 INIT_TAG = 3 POPULATION_TAG = 4 -DUMP_TAG = 5 -MIGRATION_TAG = 6 +MIGRATION_TAG = 5 diff --git a/propulate/propulator.py b/propulate/propulator.py index 63f0714c..9a374478 100644 --- a/propulate/propulator.py +++ b/propulate/propulator.py @@ -1,7 +1,6 @@ import copy import logging import os -import pickle import random import time from operator import attrgetter @@ -11,8 +10,9 @@ import deepdiff import numpy as np from mpi4py import MPI +import h5py -from ._globals import DUMP_TAG, INDIVIDUAL_TAG +from ._globals import INDIVIDUAL_TAG from .population import Individual from .propagators import Propagator, SelectMin @@ -34,8 +34,8 @@ def __init__( propagator: Propagator, island_idx: int = 0, comm: MPI.Comm = MPI.COMM_WORLD, - generations: int = -1, - checkpoint_path: Union[str, Path] = Path("./"), + generations: int = 100, + checkpoint_directory: Union[str, Path] = Path("./"), migration_topology: np.ndarray = None, migration_prob: float = 0.0, emigration_propagator: Type[Propagator] = SelectMin, @@ -58,7 +58,7 @@ def __init__( intra-island communicator generations: int number of generations to run - checkpoint_path: Union[Path, str] + checkpoint_directory: Union[Path, str] Path where checkpoints are loaded from and stored. migration_topology: numpy.ndarray 2D matrix where entry (i,j) specifies how many @@ -91,7 +91,7 @@ def __init__( self.generation = 0 # current generation not yet evaluated self.island_idx = island_idx # island index self.comm = comm # intra-island communicator - self.checkpoint_path = Path(checkpoint_path) # checkpoint path + self.checkpoint_path = Path(checkpoint_directory) # checkpoint path self.checkpoint_path.mkdir(parents=True, exist_ok=True) self.migration_prob = migration_prob # per-rank migration probability self.migration_topology = migration_topology # migration topology @@ -102,36 +102,16 @@ def __init__( self.emigration_propagator = emigration_propagator # emigration propagator self.rng = rng - # Load initial population of evaluated individuals from checkpoint if exists. - load_ckpt_file = self.checkpoint_path / f"island_{self.island_idx}_ckpt.pickle" - if not os.path.isfile(load_ckpt_file): # If not exists, check for backup file. - load_ckpt_file = load_ckpt_file.with_suffix(".bkp") - - if os.path.isfile(load_ckpt_file): - with open(load_ckpt_file, "rb") as f: - try: - self.population = pickle.load(f) - self.generation = ( - max( - [ - x.generation - for x in self.population - if x.rank == self.comm.rank - ] - ) - + 1 - ) - if self.comm.rank == 0: - log.info( - "Valid checkpoint file found. " - f"Resuming from generation {self.generation} of loaded population..." - ) - except OSError: - self.population = [] - if self.comm.rank == 0: - log.info( - "No valid checkpoint file. Initializing population randomly..." - ) + # Load initial population of individuals from checkpoint if exists. + self.checkpoint_path = self.checkpoint_path / "ckpt.hdf5" + + if os.path.isfile(self.checkpoint_path): + self.population, self.generation = self.load_checkpoint(self.checkpoint_path) + if self.comm.rank == 0: + log.info( + "Valid checkpoint file found. " + f"Resuming from generation {self.generation} of loaded population..." + ) else: self.population = [] if self.comm.rank == 0: @@ -139,6 +119,20 @@ def __init__( "No valid checkpoint file given. Initializing population randomly..." ) + def load_checkpoint(self, ckpt_file): + """ + Load checkpoint from HDF5 file. + Since this is only a read, all workers can do this in read-only mode without the mpio driver. + + Parameters + ---------- + + ckpt_file: str + Path to the file to load + """ + with h5py.File(self.checkpoint_path, 'r') as f: + pass + def propulate(self, logging_interval: int = 10, debug: int = 1) -> None: """ Run asynchronous evolutionary optimization routine. @@ -150,7 +144,8 @@ def propulate(self, logging_interval: int = 10, debug: int = 1) -> None: debug: int verbosity/debug level; 0 - silent; 1 - moderate, 2 - noisy (debug mode) """ - self._work(logging_interval, debug) + with h5py.File(self.checkpoint_path, '') as f: + self._work(f, logging_interval, debug) def _get_active_individuals(self) -> Tuple[List[Individual], int]: """ @@ -325,7 +320,7 @@ def _check_intra_island_synchronization( def _work(self, logging_interval: int, debug: int): """ - Execute evolutionary algorithm in parallel. + Execute optimization algorithm in parallel. Parameters ---------- @@ -344,11 +339,10 @@ def _work(self, logging_interval: int, debug: int): if self.comm.rank == 0: log.info(f"Island {self.island_idx} has {self.comm.size} workers.") - dump = True if self.comm.rank == 0 else False MPI.COMM_WORLD.barrier() # Loop over generations. - while self.generations <= -1 or self.generation < self.generations: + while self.generation < self.generations: if self.generation % int(logging_interval) == 0: log.info( f"Island {self.island_idx} Worker {self.comm.rank}: In generation {self.generation}..." @@ -360,13 +354,6 @@ def _work(self, logging_interval: int, debug: int): # Check for and possibly receive incoming individuals from other intra-island workers. self._receive_intra_island_individuals() - if dump: # Dump checkpoint. - self._dump_checkpoint() - - dump = ( - self._determine_worker_dumping_next() - ) # Determine worker dumping checkpoint in the next generation. - # Go to next generation. self.generation += 1 @@ -385,61 +372,6 @@ def _work(self, logging_interval: int, debug: int): self._receive_intra_island_individuals() MPI.COMM_WORLD.barrier() - # Final checkpointing on rank 0. - if self.comm.rank == 0: - self._dump_final_checkpoint() # Dump checkpoint. - MPI.COMM_WORLD.barrier() - _ = self._determine_worker_dumping_next() - MPI.COMM_WORLD.barrier() - - def _dump_checkpoint(self): - """ - Dump checkpoint to file. - """ - log.debug( - f"Island {self.island_idx} Worker {self.comm.rank} Generation {self.generation}: " - f"Dumping checkpoint..." - ) - save_ckpt_file = self.checkpoint_path / f"island_{self.island_idx}_ckpt.pickle" - if os.path.isfile(save_ckpt_file): - try: - os.replace(save_ckpt_file, save_ckpt_file.with_suffix(".bkp")) - except OSError as e: - log.warning(e) - with open(save_ckpt_file, "wb") as f: - pickle.dump(self.population, f) - - dest = self.comm.rank + 1 if self.comm.rank + 1 < self.comm.size else 0 - self.comm.send(True, dest=dest, tag=DUMP_TAG) - - def _determine_worker_dumping_next(self): - """ - Determine the worker who dumps the checkpoint in the next generation. - """ - dump = False - stat = MPI.Status() - probe_dump = self.comm.iprobe(source=MPI.ANY_SOURCE, tag=DUMP_TAG, status=stat) - if probe_dump: - dump = self.comm.recv(source=stat.Get_source(), tag=DUMP_TAG) - log.debug( - f"Island {self.island_idx} Worker {self.comm.rank} Generation {self.generation}: " - f"Going to dump next: {dump}. Before: Worker {stat.Get_source()}" - ) - return dump - - def _dump_final_checkpoint(self): - """ - Dump final checkpoint. - """ - save_ckpt_file = self.checkpoint_path / f"island_{self.island_idx}_ckpt.pickle" - if os.path.isfile(save_ckpt_file): - try: - os.replace(save_ckpt_file, save_ckpt_file.with_suffix(".bkp")) - except OSError as e: - log.warning(e) - with open(save_ckpt_file, "wb") as f: - pickle.dump(self.population, f) - def _check_for_duplicates( self, active: bool, debug: int ) -> Tuple[List[List[Union[Individual, int]]], List[Individual]]: