Skip to content

Commit

Permalink
mostly removed the old checkpointing mechanism
Browse files Browse the repository at this point in the history
  • Loading branch information
Oskar Taubert committed Oct 25, 2023
1 parent 30894ff commit cb79041
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 104 deletions.
3 changes: 1 addition & 2 deletions propulate/_globals.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,4 @@
SYNCHRONIZATION_TAG = 2
INIT_TAG = 3
POPULATION_TAG = 4
DUMP_TAG = 5
MIGRATION_TAG = 6
MIGRATION_TAG = 5
136 changes: 34 additions & 102 deletions propulate/propulator.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import copy
import logging
import os
import pickle
import random
import time
from operator import attrgetter
Expand All @@ -11,8 +10,9 @@
import deepdiff
import numpy as np
from mpi4py import MPI
import h5py

from ._globals import DUMP_TAG, INDIVIDUAL_TAG
from ._globals import INDIVIDUAL_TAG
from .population import Individual
from .propagators import Propagator, SelectMin

Expand All @@ -34,8 +34,8 @@ def __init__(
propagator: Propagator,
island_idx: int = 0,
comm: MPI.Comm = MPI.COMM_WORLD,
generations: int = -1,
checkpoint_path: Union[str, Path] = Path("./"),
generations: int = 100,
checkpoint_directory: Union[str, Path] = Path("./"),
migration_topology: np.ndarray = None,
migration_prob: float = 0.0,
emigration_propagator: Type[Propagator] = SelectMin,
Expand All @@ -58,7 +58,7 @@ def __init__(
intra-island communicator
generations: int
number of generations to run
checkpoint_path: Union[Path, str]
checkpoint_directory: Union[Path, str]
Path where checkpoints are loaded from and stored.
migration_topology: numpy.ndarray
2D matrix where entry (i,j) specifies how many
Expand Down Expand Up @@ -91,7 +91,7 @@ def __init__(
self.generation = 0 # current generation not yet evaluated
self.island_idx = island_idx # island index
self.comm = comm # intra-island communicator
self.checkpoint_path = Path(checkpoint_path) # checkpoint path
self.checkpoint_path = Path(checkpoint_directory) # checkpoint path
self.checkpoint_path.mkdir(parents=True, exist_ok=True)
self.migration_prob = migration_prob # per-rank migration probability
self.migration_topology = migration_topology # migration topology
Expand All @@ -102,43 +102,37 @@ def __init__(
self.emigration_propagator = emigration_propagator # emigration propagator
self.rng = rng

# Load initial population of evaluated individuals from checkpoint if exists.
load_ckpt_file = self.checkpoint_path / f"island_{self.island_idx}_ckpt.pickle"
if not os.path.isfile(load_ckpt_file): # If not exists, check for backup file.
load_ckpt_file = load_ckpt_file.with_suffix(".bkp")

if os.path.isfile(load_ckpt_file):
with open(load_ckpt_file, "rb") as f:
try:
self.population = pickle.load(f)
self.generation = (
max(
[
x.generation
for x in self.population
if x.rank == self.comm.rank
]
)
+ 1
)
if self.comm.rank == 0:
log.info(
"Valid checkpoint file found. "
f"Resuming from generation {self.generation} of loaded population..."
)
except OSError:
self.population = []
if self.comm.rank == 0:
log.info(
"No valid checkpoint file. Initializing population randomly..."
)
# Load initial population of individuals from checkpoint if exists.
self.checkpoint_path = self.checkpoint_path / "ckpt.hdf5"

if os.path.isfile(self.checkpoint_path):
self.population, self.generation = self.load_checkpoint(self.checkpoint_path)
if self.comm.rank == 0:
log.info(
"Valid checkpoint file found. "
f"Resuming from generation {self.generation} of loaded population..."
)
else:
self.population = []
if self.comm.rank == 0:
log.info(
"No valid checkpoint file given. Initializing population randomly..."
)

def load_checkpoint(self, ckpt_file):
"""
Load checkpoint from HDF5 file.
Since this is only a read, all workers can do this in read-only mode without the mpio driver.
Parameters
----------
ckpt_file: str
Path to the file to load
"""
with h5py.File(self.checkpoint_path, 'r') as f:
pass

def propulate(self, logging_interval: int = 10, debug: int = 1) -> None:
"""
Run asynchronous evolutionary optimization routine.
Expand All @@ -150,7 +144,8 @@ def propulate(self, logging_interval: int = 10, debug: int = 1) -> None:
debug: int
verbosity/debug level; 0 - silent; 1 - moderate, 2 - noisy (debug mode)
"""
self._work(logging_interval, debug)
with h5py.File(self.checkpoint_path, '') as f:
self._work(f, logging_interval, debug)

def _get_active_individuals(self) -> Tuple[List[Individual], int]:
"""
Expand Down Expand Up @@ -325,7 +320,7 @@ def _check_intra_island_synchronization(

def _work(self, logging_interval: int, debug: int):
"""
Execute evolutionary algorithm in parallel.
Execute optimization algorithm in parallel.
Parameters
----------
Expand All @@ -344,11 +339,10 @@ def _work(self, logging_interval: int, debug: int):
if self.comm.rank == 0:
log.info(f"Island {self.island_idx} has {self.comm.size} workers.")

dump = True if self.comm.rank == 0 else False
MPI.COMM_WORLD.barrier()

# Loop over generations.
while self.generations <= -1 or self.generation < self.generations:
while self.generation < self.generations:
if self.generation % int(logging_interval) == 0:
log.info(
f"Island {self.island_idx} Worker {self.comm.rank}: In generation {self.generation}..."
Expand All @@ -360,13 +354,6 @@ def _work(self, logging_interval: int, debug: int):
# Check for and possibly receive incoming individuals from other intra-island workers.
self._receive_intra_island_individuals()

if dump: # Dump checkpoint.
self._dump_checkpoint()

dump = (
self._determine_worker_dumping_next()
) # Determine worker dumping checkpoint in the next generation.

# Go to next generation.
self.generation += 1

Expand All @@ -385,61 +372,6 @@ def _work(self, logging_interval: int, debug: int):
self._receive_intra_island_individuals()
MPI.COMM_WORLD.barrier()

# Final checkpointing on rank 0.
if self.comm.rank == 0:
self._dump_final_checkpoint() # Dump checkpoint.
MPI.COMM_WORLD.barrier()
_ = self._determine_worker_dumping_next()
MPI.COMM_WORLD.barrier()

def _dump_checkpoint(self):
"""
Dump checkpoint to file.
"""
log.debug(
f"Island {self.island_idx} Worker {self.comm.rank} Generation {self.generation}: "
f"Dumping checkpoint..."
)
save_ckpt_file = self.checkpoint_path / f"island_{self.island_idx}_ckpt.pickle"
if os.path.isfile(save_ckpt_file):
try:
os.replace(save_ckpt_file, save_ckpt_file.with_suffix(".bkp"))
except OSError as e:
log.warning(e)
with open(save_ckpt_file, "wb") as f:
pickle.dump(self.population, f)

dest = self.comm.rank + 1 if self.comm.rank + 1 < self.comm.size else 0
self.comm.send(True, dest=dest, tag=DUMP_TAG)

def _determine_worker_dumping_next(self):
"""
Determine the worker who dumps the checkpoint in the next generation.
"""
dump = False
stat = MPI.Status()
probe_dump = self.comm.iprobe(source=MPI.ANY_SOURCE, tag=DUMP_TAG, status=stat)
if probe_dump:
dump = self.comm.recv(source=stat.Get_source(), tag=DUMP_TAG)
log.debug(
f"Island {self.island_idx} Worker {self.comm.rank} Generation {self.generation}: "
f"Going to dump next: {dump}. Before: Worker {stat.Get_source()}"
)
return dump

def _dump_final_checkpoint(self):
"""
Dump final checkpoint.
"""
save_ckpt_file = self.checkpoint_path / f"island_{self.island_idx}_ckpt.pickle"
if os.path.isfile(save_ckpt_file):
try:
os.replace(save_ckpt_file, save_ckpt_file.with_suffix(".bkp"))
except OSError as e:
log.warning(e)
with open(save_ckpt_file, "wb") as f:
pickle.dump(self.population, f)

def _check_for_duplicates(
self, active: bool, debug: int
) -> Tuple[List[List[Union[Individual, int]]], List[Individual]]:
Expand Down

0 comments on commit cb79041

Please sign in to comment.