Skip to content

Commit

Permalink
removed pickle based checkpointing
Browse files Browse the repository at this point in the history
  • Loading branch information
Oskar Taubert committed May 9, 2024
1 parent db9bc3c commit edf558f
Show file tree
Hide file tree
Showing 4 changed files with 1 addition and 99 deletions.
1 change: 0 additions & 1 deletion propulate/_globals.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,4 @@
SYNCHRONIZATION_TAG = 2
INIT_TAG = 3
POPULATION_TAG = 4
DUMP_TAG = 5
MIGRATION_TAG = 6
17 changes: 0 additions & 17 deletions propulate/migrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,6 @@ def _work(self, logging_interval: int = 10, debug: int = 1):
if self.island_comm.rank == 0:
log.info(f"Island {self.island_idx} has {self.island_comm.size} workers.")

dump = True if self.island_comm.rank == 0 else False
migration = True if self.migration_prob > 0 else False
self.propulate_comm.barrier()

Expand Down Expand Up @@ -455,13 +454,6 @@ def _work(self, logging_interval: int = 10, debug: int = 1):
if debug == 2:
check = self._check_emigrants_to_deactivate()
assert check is False

if dump: # Dump checkpoint.
self._dump_checkpoint()

dump = (
self._determine_worker_dumping_next()
) # Determine worker dumping checkpoint in the next generation.
self.generation += 1 # Go to next generation.

# Having completed all generations, the workers have to wait for each other.
Expand Down Expand Up @@ -502,12 +494,3 @@ def _work(self, logging_interval: int = 10, debug: int = 1):
raise ValueError(
"There should not be any individuals left that need to be deactivated."
)

self.propulate_comm.barrier()

# Final checkpointing on rank 0.
if self.island_comm.rank == 0:
self._dump_final_checkpoint() # Dump checkpoint.
self.propulate_comm.barrier()
_ = self._determine_worker_dumping_next()
self.propulate_comm.barrier()
15 changes: 0 additions & 15 deletions propulate/pollinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,6 @@ def _work(self, logging_interval: int = 10, debug: int = 1):
if self.island_comm.rank == 0:
log.info(f"Island {self.island_idx} has {self.island_comm.size} workers.")

dump = True if self.island_comm.rank == 0 else False
migration = True if self.migration_prob > 0 else False
self.propulate_comm.barrier()

Expand Down Expand Up @@ -437,12 +436,6 @@ def _work(self, logging_interval: int = 10, debug: int = 1):
# Immigration: Check for individuals replaced by other intra-island workers to be deactivated.
self._deactivate_replaced_individuals()

if dump: # Dump checkpoint.
self._dump_checkpoint()

dump = (
self._determine_worker_dumping_next()
) # Determine worker dumping checkpoint in the next generation.
self.generation += 1 # Go to next generation.

# Having completed all generations, the workers have to wait for each other.
Expand Down Expand Up @@ -474,11 +467,3 @@ def _work(self, logging_interval: int = 10, debug: int = 1):
f"Finally {len(self.replaced)} individual(s) in replaced: {self.replaced}:\n{self.population}"
)
self._deactivate_replaced_individuals()
self.propulate_comm.barrier()

# Final checkpointing on rank 0.
if self.island_comm.rank == 0:
self._dump_final_checkpoint() # Dump checkpoint.
self.propulate_comm.barrier()
_ = self._determine_worker_dumping_next()
self.propulate_comm.barrier()
67 changes: 1 addition & 66 deletions propulate/propulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import numpy as np
from mpi4py import MPI

from ._globals import DUMP_TAG, INDIVIDUAL_TAG
from ._globals import INDIVIDUAL_TAG
from .population import Individual
from .propagators import Propagator, SelectMin
from .surrogate import Surrogate
Expand Down Expand Up @@ -475,9 +475,6 @@ def _work(self, logging_interval: int = 10, debug: int = -1):
if self.island_comm.rank == 0:
log.info(f"Island {self.island_idx} has {self.island_comm.size} workers.")

dump = True if self.island_comm.rank == 0 else False
self.propulate_comm.barrier()

# Loop over generations.
while self.generations <= -1 or self.generation < self.generations:
if self.generation % int(logging_interval) == 0:
Expand All @@ -491,13 +488,6 @@ def _work(self, logging_interval: int = 10, debug: int = -1):
# Check for and possibly receive incoming individuals from other intra-island workers.
self._receive_intra_island_individuals()

if dump: # Dump checkpoint.
self._dump_checkpoint()

dump = (
self._determine_worker_dumping_next()
) # Determine worker dumping checkpoint in the next generation.

# Go to next generation.
self.generation += 1

Expand All @@ -515,61 +505,6 @@ def _work(self, logging_interval: int = 10, debug: int = -1):
self._receive_intra_island_individuals()
self.propulate_comm.barrier()

# Final checkpointing on rank 0.
if self.island_comm.rank == 0:
self._dump_final_checkpoint() # Dump checkpoint.
self.propulate_comm.barrier()
_ = self._determine_worker_dumping_next()
self.propulate_comm.barrier()

def _dump_checkpoint(self):
"""Dump checkpoint to file."""
log.debug(
f"Island {self.island_idx} Worker {self.island_comm.rank} Generation {self.generation}: "
f"Dumping checkpoint..."
)
save_ckpt_file = self.checkpoint_path / f"island_{self.island_idx}_ckpt.pickle"
if os.path.isfile(save_ckpt_file):
try:
os.replace(save_ckpt_file, save_ckpt_file.with_suffix(".bkp"))
except OSError as e:
log.warning(e)
with open(save_ckpt_file, "wb") as f:
pickle.dump(self.population, f)

dest = (
self.island_comm.rank + 1
if self.island_comm.rank + 1 < self.island_comm.size
else 0
)
self.island_comm.send(True, dest=dest, tag=DUMP_TAG)

def _determine_worker_dumping_next(self):
"""Determine the worker who dumps the checkpoint in the next generation."""
dump = False
stat = MPI.Status()
probe_dump = self.island_comm.iprobe(
source=MPI.ANY_SOURCE, tag=DUMP_TAG, status=stat
)
if probe_dump:
dump = self.island_comm.recv(source=stat.Get_source(), tag=DUMP_TAG)
log.debug(
f"Island {self.island_idx} Worker {self.island_comm.rank} Generation {self.generation}: "
f"Going to dump next: {dump}. Before: Worker {stat.Get_source()}"
)
return dump

def _dump_final_checkpoint(self):
"""Dump final checkpoint."""
save_ckpt_file = self.checkpoint_path / f"island_{self.island_idx}_ckpt.pickle"
if os.path.isfile(save_ckpt_file):
try:
os.replace(save_ckpt_file, save_ckpt_file.with_suffix(".bkp"))
except OSError as e:
log.warning(e)
with open(save_ckpt_file, "wb") as f:
pickle.dump(self.population, f)

def _check_for_duplicates(
self, active: bool, debug: int = 1
) -> Tuple[List[List[Union[Individual, int]]], List[Individual]]:
Expand Down

0 comments on commit edf558f

Please sign in to comment.