From 85fa1c016b87d14d15c64223810a2b4f0c32723c Mon Sep 17 00:00:00 2001 From: Oskar Taubert Date: Tue, 24 May 2022 16:02:46 +0200 Subject: [PATCH 01/15] added moving notice --- README.rst | 17 +---------------- propulate/__init__.py | 3 +++ 2 files changed, 4 insertions(+), 16 deletions(-) diff --git a/README.rst b/README.rst index b83639df..d2df9eb2 100644 --- a/README.rst +++ b/README.rst @@ -3,6 +3,7 @@ propulate ========= +!!! NOTE propulate has moved to [https://github.com/Helmholtz-AI-Energy/propulate](https://github.com/Helmholtz-AI-Energy/propulate) Parallel propagator of populations. @@ -25,19 +26,3 @@ Installation Pull and run ``pip install -e .`` or ``python setup.py develop`` Requires a MPI implementation (currently only tested with OpenMPI) and ``mpi4py`` -TODO -==== - -- soft reproducibility + tests -- weight/parameter succession from parents or hall of fame -- fix ignite -- fix example output -- fix checkpoint example -- simplify extracting best result/enesemble -- include more algorithms and operators, covariance matric adaptation - -- move worker communication into member functions -- implement pollination -- check checkpointing + seed -- fix book-keeping of populations (number of active individuals should equal number of evaluations) - diff --git a/propulate/__init__.py b/propulate/__init__.py index 082081a5..37017141 100644 --- a/propulate/__init__.py +++ b/propulate/__init__.py @@ -10,7 +10,10 @@ finally: del get_distribution, DistributionNotFound +print("propulate has moved to https://github.com/Helmholtz-AI-Energy/propulate \n Find support and up to date versions there.") + from .propulator import Propulator from .wrapper import Islands from . import propagators from . import population + From 833c5d18ab274e9ae8e8277e1b654abc5fd0e0d6 Mon Sep 17 00:00:00 2001 From: mcw92 Date: Wed, 29 Jun 2022 09:04:12 +0200 Subject: [PATCH 02/15] fixed minor things like links --- README.rst | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/README.rst b/README.rst index d2df9eb2..d6944131 100644 --- a/README.rst +++ b/README.rst @@ -3,7 +3,7 @@ propulate ========= -!!! NOTE propulate has moved to [https://github.com/Helmholtz-AI-Energy/propulate](https://github.com/Helmholtz-AI-Energy/propulate) +!!! NOTE: Propulate has moved to `https://github.com/Helmholtz-AI-Energy/propulate `. Parallel propagator of populations. @@ -23,6 +23,6 @@ For usage example see scripts. Installation ============ -Pull and run ``pip install -e .`` or ``python setup.py develop`` -Requires a MPI implementation (currently only tested with OpenMPI) and ``mpi4py`` +Pull and run ``pip install -e .`` or ``python setup.py develop``. +Requires an MPI implementation (currently only tested with OpenMPI) and ``mpi4py``. From 462c754fb7d8faffcd5a0aed1d9efc748fc9a209 Mon Sep 17 00:00:00 2001 From: mcw92 Date: Wed, 29 Jun 2022 09:05:49 +0200 Subject: [PATCH 03/15] fixed minor things like links --- README.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.rst b/README.rst index d6944131..1e997435 100644 --- a/README.rst +++ b/README.rst @@ -3,7 +3,7 @@ propulate ========= -!!! NOTE: Propulate has moved to `https://github.com/Helmholtz-AI-Energy/propulate `. +!!! NOTE: Propulate has moved to https://github.com/Helmholtz-AI-Energy/propulate . Parallel propagator of populations. From 6ca1c0b4446d29e35c877448d1f4229edea5f54e Mon Sep 17 00:00:00 2001 From: mcw92 Date: Wed, 29 Jun 2022 09:07:01 +0200 Subject: [PATCH 04/15] fixed minor things like links --- README.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.rst b/README.rst index 1e997435..e4107414 100644 --- a/README.rst +++ b/README.rst @@ -3,7 +3,7 @@ propulate ========= -!!! NOTE: Propulate has moved to https://github.com/Helmholtz-AI-Energy/propulate . +!!! NOTE: Propulate has moved to https://github.com/Helmholtz-AI-Energy/propulate . Parallel propagator of populations. From 91ea32527d582e37bd3b9034ed6f72cae7d15fce Mon Sep 17 00:00:00 2001 From: mcw92 Date: Wed, 29 Jun 2022 09:08:56 +0200 Subject: [PATCH 05/15] fixed minor things like links --- README.rst | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/README.rst b/README.rst index e4107414..fec297a4 100644 --- a/README.rst +++ b/README.rst @@ -3,26 +3,26 @@ propulate ========= -!!! NOTE: Propulate has moved to https://github.com/Helmholtz-AI-Energy/propulate . -Parallel propagator of populations. +| !!! NOTE: Propulate has moved to https://github.com/Helmholtz-AI-Energy/propulate . +| Parallel propagator of populations. Description =========== -Evolution-inspired hyperparameter-optimization in MPI-parallelized fashion. -In order to be more efficient generations are less well separated than they often are in evolutionary algorithms. +| Evolution-inspired hyperparameter-optimization in MPI-parallelized fashion. +| In order to be more efficient generations are less well separated than they often are in evolutionary algorithms. Instead a new individual is generated from a pool of currently active already evaluated individuals that may be from any generation. Individuals may be removed from the breeding population based on different criteria. Documentation ============= -For usage example see scripts. +| For usage example see scripts. Installation ============ -Pull and run ``pip install -e .`` or ``python setup.py develop``. -Requires an MPI implementation (currently only tested with OpenMPI) and ``mpi4py``. +| Pull and run ``pip install -e .`` or ``python setup.py develop``. +| Requires an MPI implementation (currently only tested with OpenMPI) and ``mpi4py``. From 1a9733d2c581afcbe66fa96e408e12da65d518ba Mon Sep 17 00:00:00 2001 From: coquelin77 Date: Tue, 7 May 2024 10:54:05 +0200 Subject: [PATCH 06/15] added ddp tutorial --- tutorials/torch_ddp_example.py | 384 +++++++++++++++++++++++++++++++++ 1 file changed, 384 insertions(+) create mode 100644 tutorials/torch_ddp_example.py diff --git a/tutorials/torch_ddp_example.py b/tutorials/torch_ddp_example.py new file mode 100644 index 00000000..db39f0f9 --- /dev/null +++ b/tutorials/torch_ddp_example.py @@ -0,0 +1,384 @@ +""" +Toy example for HP optimization / NAS in Propulate, using a simple CNN trained on the MNIST dataset. + +This script was tested on a single compute node with 4 GPUs. Note that you need to adapt ``GPUS_PER_NODE`` (see ll. 25). +""" +import logging +import pathlib +import random +from typing import Dict, Tuple, Union + +import torch +import os +import socket +from mpi4py import MPI +from torch import nn +import time +from torch.utils.data import DataLoader +import torch.utils.data.distributed as datadist +from torch.optim.lr_scheduler import StepLR +from torch import optim +import torch.distributed as dist +import datetime as dt +from torchvision.datasets import MNIST +from torchvision.transforms import Compose, Normalize, ToTensor +import torch.nn.functional as F + +from propulate import Islands +from propulate.utils import get_default_propagator, set_logger_config +from propulate.utils.benchmark_functions import parse_arguments + + +GPUS_PER_NODE: int = 4 # This example script was tested on a single node with 4 GPUs. +NUM_WORKERS: int = ( + 2 # Set this to the recommended number of workers in the PyTorch dataloader. +) +SUBGROUP_COMM_METHOD = "nccl-slurm" +log_path = "torch_ckpts" +log = logging.getLogger(__name__) # Get logger instance. + + +class Net(nn.Module): + def __init__( + self, + conv_layers: int, + activation: torch.nn.modules.activation, + ): + super().__init__() + layers = [] # Set up the model architecture (depending on number of convolutional layers specified). + layers += [ + nn.Sequential( + nn.Conv2d(in_channels=1, out_channels=10, kernel_size=3, padding=1), + activation(), + ), + ] + layers += [ + nn.Sequential( + nn.Conv2d(in_channels=10, out_channels=10, kernel_size=3, padding=1), + activation(), + ) + for _ in range(conv_layers - 1) + ] + + self.fc = nn.Linear(in_features=7840, out_features=10) # MNIST has 10 classes. + self.conv_layers = nn.Sequential(*layers) + + def forward(self, x: torch.Tensor) -> torch.Tensor: + """ + Forward pass. + + Parameters + ---------- + x : torch.Tensor + The data sample. + + Returns + ------- + torch.Tensor + The model's predictions for input data sample. + """ + b, c, w, h = x.size() + x = self.conv_layers(x) + x = x.view(b, 10 * 28 * 28) + x = self.fc(x) + output = F.log_softmax(x, dim=1) + return output + + +def get_data_loaders(batch_size: int, subgroup_comm: MPI.Comm) -> Tuple[DataLoader, DataLoader]: + """ + Get MNIST train and validation dataloaders. + + Parameters + ---------- + batch_size : int + The batch size. + subgroup_comm: MPI.Comm + The MPI communicator object for the local class + + Returns + ------- + torch.utils.data.DataLoader + The training dataloader. + torch.utils.data.DataLoader + The validation dataloader. + """ + data_transform = Compose([ToTensor(), Normalize((0.1307,), (0.3081,))]) + train_dataset = MNIST(download=False, root=".", transform=data_transform, train=True) + val_dataset = MNIST(download=False, root=".", transform=data_transform, train=False) + if subgroup_comm.size > 1: # need to make the samplers use the torch world to distributed data + train_sampler = datadist.DistributedSampler(train_dataset) + val_sampler = datadist.DistributedSampler(val_dataset) + else: + train_sampler = None + val_sampler = None + num_workers = NUM_WORKERS + log.info(f"Use {num_workers} workers in dataloader.") + + train_loader = DataLoader( + dataset=train_dataset, # Use MNIST training dataset. + batch_size=batch_size, # Batch size + num_workers=num_workers, + pin_memory=True, + persistent_workers=True, + shuffle=(train_sampler is None), # Shuffle data. + sampler=train_sampler, + ) + val_loader = DataLoader( + dataset=val_dataset, + num_workers=num_workers, + pin_memory=True, + persistent_workers=True, + batch_size=1, # Batch size + shuffle=False, # Do not shuffle data. + sampler=val_sampler, + ) + return train_loader, val_loader + + +def torch_process_group_init(subgroup_comm: MPI.Comm, method) -> None: + global _DATA_PARALLEL_GROUP + global _DATA_PARALLEL_ROOT + # done want different groups to use the same port + subgroup_id = MPI.COMM_WORLD.rank // subgroup_comm.size + port = 29500 + subgroup_id + # get master address and port + # os.environ["NCCL_ASYNC_ERROR_HANDLING"] = "0" + + comm_size = subgroup_comm.Get_size() + if comm_size == 1: + return + master_address = socket.gethostname() + # each subgroup needs to get the hostname of rank 0 of that group + master_address = subgroup_comm.bcast(str(master_address), root=0) + + # save env vars + os.environ["MASTER_ADDR"] = master_address + # use the default pytorch port + os.environ["MASTER_PORT"] = str(port) + + comm_rank = subgroup_comm.Get_rank() + + nccl_world_size = comm_size + nccl_world_rank = comm_rank + # print(subgroup_comm.rank, subgroup_comm.size, master_address, port) + if not torch.cuda.is_available(): + method = "gloo" + log.info("No CUDA devices found: falling back to gloo") + else: + log.info(f"CUDA_VISIBLE_DEVICES: {os.environ['CUDA_VISIBLE_DEVICES']}") + num_cuda_devices = torch.cuda.device_count() + log.info(f"device count: {num_cuda_devices}, device number: {comm_rank % num_cuda_devices}") + torch.cuda.set_device(comm_rank % num_cuda_devices) + + + time.sleep(0.001 * comm_rank) # avoid DDOS'ing rank 0 + if method == "nccl-openmpi": + dist.init_process_group( + backend="nccl", + rank=subgroup_comm.rank, + world_size=subgroup_comm.size, + ) + + elif method == "nccl-slurm": + wireup_store = dist.TCPStore( + host_name=master_address, + port=port, + world_size=nccl_world_size, + is_master=(nccl_world_rank == 0), + timeout=dt.timedelta(seconds=60), + ) + dist.init_process_group( + backend="nccl", + store=wireup_store, + world_size=nccl_world_size, + rank=nccl_world_rank, + ) + elif method == "gloo": + wireup_store = dist.TCPStore( + host_name=master_address, + port=port, + world_size=nccl_world_size, + is_master=(nccl_world_rank == 0), + timeout=dt.timedelta(seconds=60), + ) + dist.init_process_group( + backend="gloo", + store=wireup_store, + world_size=nccl_world_size, + rank=nccl_world_rank, + ) + else: + raise NotImplementedError(f"Given 'method' ({method}) not in [nccl-openmpi, nccl-slurm, gloo]") + + # make sure to call a barrier here in order for sharp to use the default comm: + if dist.is_initialized(): + dist.barrier() + disttest = torch.ones(1) + if method != "gloo": + disttest = disttest.cuda() + + dist.all_reduce(disttest) + assert disttest[0] == nccl_world_size, "failed test of dist!" + else: + disttest = None + + +def ind_loss(params: Dict[str, Union[int, float, str]], subgroup_comm: MPI.Comm) -> float: + """ + Loss function for evolutionary optimization with Propulate. Minimize the model's negative validation accuracy. + + Parameters + ---------- + params : Dict[str, int | float | str] + The hyperparameters to be optimized evolutionarily. + + Returns + ------- + float + The trained model's negative validation accuracy. + """ + torch_process_group_init(subgroup_comm, method=SUBGROUP_COMM_METHOD) + # Extract hyperparameter combination to test from input dictionary. + conv_layers = params["conv_layers"] # Number of convolutional layers + activation = params["activation"] # Activation function + lr = params["lr"] # Learning rate + gamma = params["gamma"] + + epochs = 100 + + activations = { + "relu": nn.ReLU, + "sigmoid": nn.Sigmoid, + "tanh": nn.Tanh, + } # Define activation function mapping. + activation = activations[activation] # Get activation function. + loss_fn = torch.nn.NLLLoss() + + model = Net(conv_layers, activation) + # Set up neural network with specified hyperparameters. + # model.best_accuracy = 0.0 # Initialize the model's best validation accuracy. + + train_loader, val_loader = get_data_loaders( + batch_size=8, subgroup_comm=subgroup_comm + ) # Get training and validation data loaders. + + device = "cpu" if not torch.cuda.is_available() else MPI.COMM_WORLD.rank % GPUS_PER_NODE + optimizer = optim.Adadelta(model.parameters(), lr=lr) + scheduler = StepLR(optimizer, step_size=1, gamma=gamma) + log_interval = 100 + best_val_loss = 1000000 + early_stopping_count, early_stopping_limit = 0, 5 + set_new_best = False + model.train() + for epoch in range(epochs): + # train loop ==================================================== + for batch_idx, (data, target) in enumerate(train_loader): + data, target = data.to(device), target.to(device) + optimizer.zero_grad() + output = model(data) + loss = loss_fn(output, target) + loss.backward() + optimizer.step() + if batch_idx == len(train_loader) - 1: + log.info('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format( + epoch, batch_idx * len(data), len(train_loader.dataset), + 100. * batch_idx / len(train_loader), loss.item())) + # val loop ====================================================== + model.eval() + val_loss = 0 + correct = 0 + with torch.no_grad(): + for data, target in val_loader: + data, target = data.to(device), target.to(device) + output = model(data) + val_loss += loss_fn(output, target, reduction='sum').item() # sum up batch loss + pred = output.argmax(dim=1, keepdim=True) # get the index of the max log-probability + correct += pred.eq(target.view_as(pred)).sum().item() + + val_loss /= len(val_loader.dataset) + if val_loss < best_val_loss: + best_val_loss = val_loss + set_new_best = True + + log.info(f'\nTest set: Average loss: {val_loss:.4f}, Accuracy: {correct}/{len(val_loader.dataset)} ({100. * correct / len(val_loader.dataset):.0f}%)\n') + + if not set_new_best: + early_stopping_count += 1 + if early_stopping_count >= early_stopping_limit: + log.info(f"hit early stopping count, breaking") + break + + # scheduler step ================================================ + scheduler.step() + set_new_best = False + + # Return best validation loss as an individual's loss (trained so lower is better) + return best_val_loss + + +if __name__ == "__main__": + config, _ = parse_arguments() + + comm = MPI.COMM_WORLD + if comm.rank == 0: # Download data at the top, then we dont need to later + train_loader = DataLoader( + dataset=MNIST( + download=True, root=".", transform=None, train=True + ), # Use MNIST training dataset. + batch_size=2, # Batch size + num_workers=1, + pin_memory=True, + persistent_workers=True, + shuffle=True, # Shuffle data. + ) + del train_loader + comm.Barrier() + pop_size = 2 * comm.size # Breeding population size + limits = { + "conv_layers": (2, 10), + "activation": ("relu", "sigmoid", "tanh"), + "lr": (0.01, 0.0001), + "gamma": (0.5, 0.999), + } # Define search space. + rng = random.Random( + comm.rank + ) # Set up separate random number generator for evolutionary optimizer. + propagator = get_default_propagator( # Get default evolutionary operator. + pop_size=pop_size, # Breeding population size + limits=limits, # Search space + crossover_prob=0.7, # Crossover probability + mutation_prob=0.4, # Mutation probability + random_init_prob=0.1, # Random-initialization probability + rng=rng, # Separate random number generator for Propulate optimization + ) + + # Set up separate logger for Propulate optimization. + set_logger_config( + level=logging.INFO, # Logging level + log_file=f"{log_path}/{pathlib.Path(__file__).stem}.log", # Logging path + log_to_stdout=True, # Print log on stdout. + log_rank=False, # Do not prepend MPI rank to logging messages. + colors=True, # Use colors. + ) + + # Set up island model. + islands = Islands( + loss_fn=ind_loss, # Loss function to be minimized + propagator=propagator, # Propagator, i.e., evolutionary operator to be used + rng=rng, # Separate random number generator for Propulate optimization + generations=config.generations, # Overall number of generations + num_islands=config.num_islands, # Number of islands + migration_probability=config.migration_probability, # Migration probability + pollination=config.pollination, # Whether to use pollination or migration + checkpoint_path=config.checkpoint, # Checkpoint path + # ----- SPECIFIC FOR MULTI-RANK UCS ---- + ranks_per_worker=2, # Number of ranks per (multi rank) worker + ) + + # Run actual optimization. + islands.evolve( + top_n=config.top_n, # Print top-n best individuals on each island in summary. + logging_interval=config.logging_interval, # Logging interval + debug=config.verbosity, # Debug level + ) From dcb384446ea9a041d607c93d6994a2da3d9bff7c Mon Sep 17 00:00:00 2001 From: coquelin77 Date: Tue, 7 May 2024 11:11:30 +0200 Subject: [PATCH 07/15] updates locally, wrong window --- tutorials/torch_ddp_example.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/tutorials/torch_ddp_example.py b/tutorials/torch_ddp_example.py index db39f0f9..8723a215 100644 --- a/tutorials/torch_ddp_example.py +++ b/tutorials/torch_ddp_example.py @@ -222,6 +222,7 @@ def torch_process_group_init(subgroup_comm: MPI.Comm, method) -> None: assert disttest[0] == nccl_world_size, "failed test of dist!" else: disttest = None + log.info(f"Finish subgroup torch.dist init: world size: {dist.get_world_size()}, rank: {dist.get_rank()}") def ind_loss(params: Dict[str, Union[int, float, str]], subgroup_comm: MPI.Comm) -> float: @@ -263,7 +264,11 @@ def ind_loss(params: Dict[str, Union[int, float, str]], subgroup_comm: MPI.Comm) batch_size=8, subgroup_comm=subgroup_comm ) # Get training and validation data loaders. - device = "cpu" if not torch.cuda.is_available() else MPI.COMM_WORLD.rank % GPUS_PER_NODE + if torch.cuda.is_available(): + device = MPI.COMM_WORLD.rank % GPUS_PER_NODE + model = model.to(device) + else: + device = "cpu" optimizer = optim.Adadelta(model.parameters(), lr=lr) scheduler = StepLR(optimizer, step_size=1, gamma=gamma) log_interval = 100 @@ -280,7 +285,7 @@ def ind_loss(params: Dict[str, Union[int, float, str]], subgroup_comm: MPI.Comm) loss = loss_fn(output, target) loss.backward() optimizer.step() - if batch_idx == len(train_loader) - 1: + if batch_idx % log_interval == 0 or batch_idx == len(train_loader) - 1: log.info('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format( epoch, batch_idx * len(data), len(train_loader.dataset), 100. * batch_idx / len(train_loader), loss.item())) @@ -292,7 +297,7 @@ def ind_loss(params: Dict[str, Union[int, float, str]], subgroup_comm: MPI.Comm) for data, target in val_loader: data, target = data.to(device), target.to(device) output = model(data) - val_loss += loss_fn(output, target, reduction='sum').item() # sum up batch loss + val_loss += loss_fn(output, target).item() # sum up batch loss pred = output.argmax(dim=1, keepdim=True) # get the index of the max log-probability correct += pred.eq(target.view_as(pred)).sum().item() From f4bd0e3b51ba6628dbbab7f0acb193dcc6e118f8 Mon Sep 17 00:00:00 2001 From: coquelin77 Date: Tue, 7 May 2024 11:43:59 +0200 Subject: [PATCH 08/15] update to running DDP example with subworkers --- tutorials/torch_ddp_example.py | 129 ++++++++++++++++++++++----------- 1 file changed, 88 insertions(+), 41 deletions(-) diff --git a/tutorials/torch_ddp_example.py b/tutorials/torch_ddp_example.py index 8723a215..67ae1a49 100644 --- a/tutorials/torch_ddp_example.py +++ b/tutorials/torch_ddp_example.py @@ -3,42 +3,56 @@ This script was tested on a single compute node with 4 GPUs. Note that you need to adapt ``GPUS_PER_NODE`` (see ll. 25). """ +import datetime as dt import logging +import os import pathlib import random +import socket +import time from typing import Dict, Tuple, Union import torch -import os -import socket -from mpi4py import MPI -from torch import nn -import time -from torch.utils.data import DataLoader +import torch.distributed as dist +import torch.nn.functional as F # noqa: N812 import torch.utils.data.distributed as datadist +from mpi4py import MPI +from torch import nn, optim from torch.optim.lr_scheduler import StepLR -from torch import optim -import torch.distributed as dist -import datetime as dt +from torch.utils.data import DataLoader from torchvision.datasets import MNIST from torchvision.transforms import Compose, Normalize, ToTensor -import torch.nn.functional as F from propulate import Islands from propulate.utils import get_default_propagator, set_logger_config from propulate.utils.benchmark_functions import parse_arguments - GPUS_PER_NODE: int = 4 # This example script was tested on a single node with 4 GPUs. NUM_WORKERS: int = ( 2 # Set this to the recommended number of workers in the PyTorch dataloader. ) SUBGROUP_COMM_METHOD = "nccl-slurm" log_path = "torch_ckpts" -log = logging.getLogger(__name__) # Get logger instance. +log = logging.getLogger("propulate") # Get logger instance. class Net(nn.Module): + """ + Toy Neural network class. + + Attributes + ---------- + conv_layers : torch.nn.modules.container.Sequential + The model's convolutional layers. + fc : nn.Linear + fully connected output layer + + Methods + ------- + forward() + The forward pass. + """ + def __init__( self, conv_layers: int, @@ -85,7 +99,9 @@ def forward(self, x: torch.Tensor) -> torch.Tensor: return output -def get_data_loaders(batch_size: int, subgroup_comm: MPI.Comm) -> Tuple[DataLoader, DataLoader]: +def get_data_loaders( + batch_size: int, subgroup_comm: MPI.Comm +) -> Tuple[DataLoader, DataLoader]: """ Get MNIST train and validation dataloaders. @@ -104,9 +120,13 @@ def get_data_loaders(batch_size: int, subgroup_comm: MPI.Comm) -> Tuple[DataLoad The validation dataloader. """ data_transform = Compose([ToTensor(), Normalize((0.1307,), (0.3081,))]) - train_dataset = MNIST(download=False, root=".", transform=data_transform, train=True) + train_dataset = MNIST( + download=False, root=".", transform=data_transform, train=True + ) val_dataset = MNIST(download=False, root=".", transform=data_transform, train=False) - if subgroup_comm.size > 1: # need to make the samplers use the torch world to distributed data + if ( + subgroup_comm.size > 1 + ): # need to make the samplers use the torch world to distributed data train_sampler = datadist.DistributedSampler(train_dataset) val_sampler = datadist.DistributedSampler(val_dataset) else: @@ -137,6 +157,19 @@ def get_data_loaders(batch_size: int, subgroup_comm: MPI.Comm) -> Tuple[DataLoad def torch_process_group_init(subgroup_comm: MPI.Comm, method) -> None: + """ + Create the torch process group on the subgroup of the MPI world. + + Parameters + ---------- + subgroup_comm : MPI.Comm + the split communicator for the subgroup. This is provided to the individual's loss function + by the Islands class if there are multiple ranks per worker. + method : str + method to use to initialize the process group. + options: [nccl-slurm, nccl-openmpi, gloo] + if CUDA is not available, then gloo is automatically chosen for the method + """ global _DATA_PARALLEL_GROUP global _DATA_PARALLEL_ROOT # done want different groups to use the same port @@ -154,7 +187,7 @@ def torch_process_group_init(subgroup_comm: MPI.Comm, method) -> None: # save env vars os.environ["MASTER_ADDR"] = master_address - # use the default pytorch port + # use the default pytorch port os.environ["MASTER_PORT"] = str(port) comm_rank = subgroup_comm.Get_rank() @@ -168,9 +201,9 @@ def torch_process_group_init(subgroup_comm: MPI.Comm, method) -> None: else: log.info(f"CUDA_VISIBLE_DEVICES: {os.environ['CUDA_VISIBLE_DEVICES']}") num_cuda_devices = torch.cuda.device_count() - log.info(f"device count: {num_cuda_devices}, device number: {comm_rank % num_cuda_devices}") - torch.cuda.set_device(comm_rank % num_cuda_devices) - + dev_number = MPI.COMM_WORLD.rank % num_cuda_devices + log.info(f"device count: {num_cuda_devices}, device number: {dev_number}") + torch.cuda.set_device(dev_number) time.sleep(0.001 * comm_rank) # avoid DDOS'ing rank 0 if method == "nccl-openmpi": @@ -209,7 +242,9 @@ def torch_process_group_init(subgroup_comm: MPI.Comm, method) -> None: rank=nccl_world_rank, ) else: - raise NotImplementedError(f"Given 'method' ({method}) not in [nccl-openmpi, nccl-slurm, gloo]") + raise NotImplementedError( + f"Given 'method' ({method}) not in [nccl-openmpi, nccl-slurm, gloo]" + ) # make sure to call a barrier here in order for sharp to use the default comm: if dist.is_initialized(): @@ -222,10 +257,14 @@ def torch_process_group_init(subgroup_comm: MPI.Comm, method) -> None: assert disttest[0] == nccl_world_size, "failed test of dist!" else: disttest = None - log.info(f"Finish subgroup torch.dist init: world size: {dist.get_world_size()}, rank: {dist.get_rank()}") + log.info( + f"Finish subgroup torch.dist init: world size: {dist.get_world_size()}, rank: {dist.get_rank()}" + ) -def ind_loss(params: Dict[str, Union[int, float, str]], subgroup_comm: MPI.Comm) -> float: +def ind_loss( + params: Dict[str, Union[int, float, str]], subgroup_comm: MPI.Comm +) -> float: """ Loss function for evolutionary optimization with Propulate. Minimize the model's negative validation accuracy. @@ -246,7 +285,7 @@ def ind_loss(params: Dict[str, Union[int, float, str]], subgroup_comm: MPI.Comm) lr = params["lr"] # Learning rate gamma = params["gamma"] - epochs = 100 + epochs = 20 activations = { "relu": nn.ReLU, @@ -256,7 +295,7 @@ def ind_loss(params: Dict[str, Union[int, float, str]], subgroup_comm: MPI.Comm) activation = activations[activation] # Get activation function. loss_fn = torch.nn.NLLLoss() - model = Net(conv_layers, activation) + model = Net(conv_layers, activation) # Set up neural network with specified hyperparameters. # model.best_accuracy = 0.0 # Initialize the model's best validation accuracy. @@ -271,7 +310,7 @@ def ind_loss(params: Dict[str, Union[int, float, str]], subgroup_comm: MPI.Comm) device = "cpu" optimizer = optim.Adadelta(model.parameters(), lr=lr) scheduler = StepLR(optimizer, step_size=1, gamma=gamma) - log_interval = 100 + log_interval = 10000 best_val_loss = 1000000 early_stopping_count, early_stopping_limit = 0, 5 set_new_best = False @@ -286,9 +325,9 @@ def ind_loss(params: Dict[str, Union[int, float, str]], subgroup_comm: MPI.Comm) loss.backward() optimizer.step() if batch_idx % log_interval == 0 or batch_idx == len(train_loader) - 1: - log.info('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format( - epoch, batch_idx * len(data), len(train_loader.dataset), - 100. * batch_idx / len(train_loader), loss.item())) + log.info( + f"Train Epoch: {epoch} [{batch_idx}/{len(train_loader)} ({100. * batch_idx / len(train_loader):.0f}%)]\tLoss: {loss.item():.6f}" + ) # val loop ====================================================== model.eval() val_loss = 0 @@ -298,7 +337,9 @@ def ind_loss(params: Dict[str, Union[int, float, str]], subgroup_comm: MPI.Comm) data, target = data.to(device), target.to(device) output = model(data) val_loss += loss_fn(output, target).item() # sum up batch loss - pred = output.argmax(dim=1, keepdim=True) # get the index of the max log-probability + pred = output.argmax( + dim=1, keepdim=True + ) # get the index of the max log-probability correct += pred.eq(target.view_as(pred)).sum().item() val_loss /= len(val_loader.dataset) @@ -306,19 +347,22 @@ def ind_loss(params: Dict[str, Union[int, float, str]], subgroup_comm: MPI.Comm) best_val_loss = val_loss set_new_best = True - log.info(f'\nTest set: Average loss: {val_loss:.4f}, Accuracy: {correct}/{len(val_loader.dataset)} ({100. * correct / len(val_loader.dataset):.0f}%)\n') + log.info( + f"\nTest set: Average loss: {val_loss:.4f}, Accuracy: {correct}/{len(val_loader.dataset)} ({100. * correct / len(val_loader.dataset):.0f}%)\n" + ) if not set_new_best: early_stopping_count += 1 if early_stopping_count >= early_stopping_limit: - log.info(f"hit early stopping count, breaking") + log.info("hit early stopping count, breaking") break - + # scheduler step ================================================ scheduler.step() set_new_best = False - + # Return best validation loss as an individual's loss (trained so lower is better) + dist.destroy_process_group() return best_val_loss @@ -349,14 +393,6 @@ def ind_loss(params: Dict[str, Union[int, float, str]], subgroup_comm: MPI.Comm) rng = random.Random( comm.rank ) # Set up separate random number generator for evolutionary optimizer. - propagator = get_default_propagator( # Get default evolutionary operator. - pop_size=pop_size, # Breeding population size - limits=limits, # Search space - crossover_prob=0.7, # Crossover probability - mutation_prob=0.4, # Mutation probability - random_init_prob=0.1, # Random-initialization probability - rng=rng, # Separate random number generator for Propulate optimization - ) # Set up separate logger for Propulate optimization. set_logger_config( @@ -366,6 +402,17 @@ def ind_loss(params: Dict[str, Union[int, float, str]], subgroup_comm: MPI.Comm) log_rank=False, # Do not prepend MPI rank to logging messages. colors=True, # Use colors. ) + if comm.rank == 0: + log.info("Starting Torch DDP tutorial!") + + propagator = get_default_propagator( # Get default evolutionary operator. + pop_size=pop_size, # Breeding population size + limits=limits, # Search space + crossover_prob=0.7, # Crossover probability + mutation_prob=0.4, # Mutation probability + random_init_prob=0.1, # Random-initialization probability + rng=rng, # Separate random number generator for Propulate optimization + ) # Set up island model. islands = Islands( From f202328ea6fff8523798c2300b7424db3f58a1c7 Mon Sep 17 00:00:00 2001 From: coquelin77 Date: Tue, 7 May 2024 11:52:19 +0200 Subject: [PATCH 09/15] weird extra old readme???????? --- README.rst | 28 ---------------------------- 1 file changed, 28 deletions(-) delete mode 100644 README.rst diff --git a/README.rst b/README.rst deleted file mode 100644 index fec297a4..00000000 --- a/README.rst +++ /dev/null @@ -1,28 +0,0 @@ -========= -propulate -========= - - -| !!! NOTE: Propulate has moved to https://github.com/Helmholtz-AI-Energy/propulate . -| Parallel propagator of populations. - - -Description -=========== - -| Evolution-inspired hyperparameter-optimization in MPI-parallelized fashion. -| In order to be more efficient generations are less well separated than they often are in evolutionary algorithms. -Instead a new individual is generated from a pool of currently active already evaluated individuals that may be from any generation. -Individuals may be removed from the breeding population based on different criteria. - -Documentation -============= - -| For usage example see scripts. - -Installation -============ - -| Pull and run ``pip install -e .`` or ``python setup.py develop``. -| Requires an MPI implementation (currently only tested with OpenMPI) and ``mpi4py``. - From 0a3a9b22c18a44d9c2f75e901aab46cf74f10f0d Mon Sep 17 00:00:00 2001 From: Marie Weiel Date: Wed, 8 May 2024 08:48:43 +0200 Subject: [PATCH 10/15] rename misleading variable --- tests/test_cmaes.py | 4 ++-- tests/test_island.py | 26 +++++++++++++------------- tests/test_propulator.py | 10 +++++----- tutorials/cmaes_example.py | 4 ++-- tutorials/islands_example.py | 4 ++-- tutorials/propulator_example.py | 4 ++-- tutorials/pso_example.py | 4 ++-- 7 files changed, 28 insertions(+), 28 deletions(-) diff --git a/tests/test_cmaes.py b/tests/test_cmaes.py index 76c6a87b..3f8fbc41 100644 --- a/tests/test_cmaes.py +++ b/tests/test_cmaes.py @@ -28,14 +28,14 @@ def test_cmaes_basic(cma_adapter, mpi_tmp_path: pathlib.Path) -> None: The temporary checkpoint directory. """ rng = random.Random(42) # Separate random number generator for optimization. - function, limits = get_function_search_space("sphere") + benchmark_function, limits = get_function_search_space("sphere") # Set up evolutionary operator. adapter = cma_adapter propagator = CMAPropagator(adapter, limits, rng=rng) # Set up Propulator performing actual optimization. propulator = Propulator( - loss_fn=function, + loss_fn=benchmark_function, propagator=propagator, rng=rng, generations=100, diff --git a/tests/test_island.py b/tests/test_island.py index 368f9907..95781a19 100644 --- a/tests/test_island.py +++ b/tests/test_island.py @@ -20,7 +20,7 @@ def global_variables(): rng = random.Random( 42 + MPI.COMM_WORLD.rank ) # Set up separate random number generator for optimization. - function, limits = get_function_search_space( + benchmark_function, limits = get_function_search_space( "sphere" ) # Get function and search space to optimize. propagator = get_default_propagator( @@ -28,7 +28,7 @@ def global_variables(): limits=limits, rng=rng, ) # Set up evolutionary operator. - yield rng, function, limits, propagator + yield rng, benchmark_function, limits, propagator @pytest.fixture( @@ -62,12 +62,12 @@ def test_islands( mpi_tmp_path : pathlib.Path The temporary checkpoint directory. """ - rng, function, limits, propagator = global_variables + rng, benchmark_function, limits, propagator = global_variables set_logger_config(log_file=mpi_tmp_path / "log.log") # Set up island model. islands = Islands( - loss_fn=function, + loss_fn=benchmark_function, propagator=propagator, rng=rng, generations=100, @@ -100,12 +100,12 @@ def test_checkpointing_isolated( mpi_tmp_path : pathlib.Path The temporary checkpoint directory. """ - rng, function, limits, propagator = global_variables + rng, benchmark_function, limits, propagator = global_variables set_logger_config(log_file=mpi_tmp_path / "log.log") # Set up island model. islands = Islands( - loss_fn=function, + loss_fn=benchmark_function, propagator=propagator, rng=rng, generations=100, @@ -121,7 +121,7 @@ def test_checkpointing_isolated( del islands islands = Islands( - loss_fn=function, + loss_fn=benchmark_function, propagator=propagator, rng=rng, generations=100, @@ -160,12 +160,12 @@ def test_checkpointing( mpi_tmp_path : pathlib.Path The temporary checkpoint directory. """ - rng, function, limits, propagator = global_variables + rng, benchmark_function, limits, propagator = global_variables set_logger_config(log_file=mpi_tmp_path / "log.log") # Set up island model. islands = Islands( - loss_fn=function, + loss_fn=benchmark_function, propagator=propagator, rng=rng, generations=100, @@ -185,7 +185,7 @@ def test_checkpointing( del islands islands = Islands( - loss_fn=function, + loss_fn=benchmark_function, propagator=propagator, rng=rng, generations=100, @@ -225,12 +225,12 @@ def test_checkpointing_unequal_populations( mpi_tmp_path : pathlib.Path The temporary checkpoint directory. """ - rng, function, limits, propagator = global_variables + rng, benchmark_function, limits, propagator = global_variables set_logger_config(log_file=mpi_tmp_path / "log.log") # Set up island model. islands = Islands( - loss_fn=function, + loss_fn=benchmark_function, propagator=propagator, rng=rng, generations=100, @@ -251,7 +251,7 @@ def test_checkpointing_unequal_populations( del islands islands = Islands( - loss_fn=function, + loss_fn=benchmark_function, propagator=propagator, rng=rng, generations=100, diff --git a/tests/test_propulator.py b/tests/test_propulator.py index 3b04db3e..e401648a 100644 --- a/tests/test_propulator.py +++ b/tests/test_propulator.py @@ -50,7 +50,7 @@ def test_propulator(function_name: str, mpi_tmp_path: pathlib.Path) -> None: rng = random.Random( 42 + MPI.COMM_WORLD.rank ) # Random number generator for optimization - function, limits = get_function_search_space(function_name) + benchmark_function, limits = get_function_search_space(function_name) set_logger_config(log_file=mpi_tmp_path / "log.log") propagator = get_default_propagator( pop_size=4, @@ -58,7 +58,7 @@ def test_propulator(function_name: str, mpi_tmp_path: pathlib.Path) -> None: rng=rng, ) # Set up evolutionary operator. propulator = Propulator( - loss_fn=function, + loss_fn=benchmark_function, propagator=propagator, rng=rng, generations=100, @@ -81,7 +81,7 @@ def test_propulator_checkpointing(mpi_tmp_path: pathlib.Path) -> None: rng = random.Random( 42 + MPI.COMM_WORLD.rank ) # Separate random number generator for optimization - function, limits = get_function_search_space("sphere") + benchmark_function, limits = get_function_search_space("sphere") propagator = get_default_propagator( # Get default evolutionary operator. pop_size=4, # Breeding pool size @@ -89,7 +89,7 @@ def test_propulator_checkpointing(mpi_tmp_path: pathlib.Path) -> None: rng=rng, # Random number generator ) propulator = Propulator( - loss_fn=function, + loss_fn=benchmark_function, propagator=propagator, generations=100, checkpoint_path=mpi_tmp_path, @@ -105,7 +105,7 @@ def test_propulator_checkpointing(mpi_tmp_path: pathlib.Path) -> None: MPI.COMM_WORLD.barrier() # Synchronize all processes. propulator = Propulator( - loss_fn=function, + loss_fn=benchmark_function, propagator=propagator, generations=20, checkpoint_path=mpi_tmp_path, diff --git a/tutorials/cmaes_example.py b/tutorials/cmaes_example.py index 62d42a3d..20b79503 100644 --- a/tutorials/cmaes_example.py +++ b/tutorials/cmaes_example.py @@ -36,7 +36,7 @@ rng = random.Random( config.seed + comm.rank ) # Separate random number generator for optimization. - function, limits = get_function_search_space( + benchmark_function, limits = get_function_search_space( config.function ) # Get callable function + search-space limits. @@ -52,7 +52,7 @@ # Set up propulator performing actual optimization. propulator = Propulator( - loss_fn=function, + loss_fn=benchmark_function, propagator=propagator, rng=rng, island_comm=comm, diff --git a/tutorials/islands_example.py b/tutorials/islands_example.py index b23df370..e2db49ae 100755 --- a/tutorials/islands_example.py +++ b/tutorials/islands_example.py @@ -31,7 +31,7 @@ rng = random.Random( config.seed + comm.rank ) # Separate random number generator for optimization. - function, limits = get_function_search_space( + benchmark_function, limits = get_function_search_space( config.function ) # Get callable function + search-space limits. @@ -58,7 +58,7 @@ # Set up island model. islands = Islands( - loss_fn=function, + loss_fn=benchmark_function, propagator=propagator, rng=rng, generations=config.generations, diff --git a/tutorials/propulator_example.py b/tutorials/propulator_example.py index d9437981..a4d169af 100755 --- a/tutorials/propulator_example.py +++ b/tutorials/propulator_example.py @@ -35,7 +35,7 @@ rng = random.Random( config.seed + comm.rank ) # Separate random number generator for optimization. - function, limits = get_function_search_space( + benchmark_function, limits = get_function_search_space( config.function ) # Get callable function + search-space limits. # Set up evolutionary operator. @@ -50,7 +50,7 @@ # Set up propulator performing actual optimization. propulator = Propulator( - loss_fn=function, + loss_fn=benchmark_function, propagator=propagator, rng=rng, island_comm=comm, diff --git a/tutorials/pso_example.py b/tutorials/pso_example.py index e32cde25..3491cf4a 100644 --- a/tutorials/pso_example.py +++ b/tutorials/pso_example.py @@ -47,7 +47,7 @@ rng = random.Random( config.seed + comm.rank ) # Separate random number generator for optimization. - function, limits = get_function_search_space( + benchmark_function, limits = get_function_search_space( config.function ) # Get callable function + search-space limits. @@ -91,7 +91,7 @@ propagator = Conditional(config.pop_size, pso_propagator, init) propulator = Propulator( - function, + benchmark_function, propagator, rng=rng, island_comm=comm, From 1bc03ed9cc128d88f7c541891887c49dc242480c Mon Sep 17 00:00:00 2001 From: Marie Weiel Date: Wed, 8 May 2024 09:23:04 +0200 Subject: [PATCH 11/15] clean up comments and code --- tutorials/torch_ddp_example.py | 134 ++++++++++++++++++--------------- 1 file changed, 73 insertions(+), 61 deletions(-) diff --git a/tutorials/torch_ddp_example.py b/tutorials/torch_ddp_example.py index 67ae1a49..15c3df6b 100644 --- a/tutorials/torch_ddp_example.py +++ b/tutorials/torch_ddp_example.py @@ -1,8 +1,9 @@ """ -Toy example for HP optimization / NAS in Propulate, using a simple CNN trained on the MNIST dataset. +Toy example for HP optimization / NAS in Propulate, using a simple CNN trained on MNIST in a data-parallel fashion. -This script was tested on a single compute node with 4 GPUs. Note that you need to adapt ``GPUS_PER_NODE`` (see ll. 25). +This script was tested on two compute nodes with 4 GPUs each. Note that you need to adapt ``GPUS_PER_NODE`` in l. 25. """ + import datetime as dt import logging import os @@ -14,11 +15,9 @@ import torch import torch.distributed as dist -import torch.nn.functional as F # noqa: N812 import torch.utils.data.distributed as datadist from mpi4py import MPI from torch import nn, optim -from torch.optim.lr_scheduler import StepLR from torch.utils.data import DataLoader from torchvision.datasets import MNIST from torchvision.transforms import Compose, Normalize, ToTensor @@ -38,14 +37,14 @@ class Net(nn.Module): """ - Toy Neural network class. + Toy neural network class. Attributes ---------- conv_layers : torch.nn.modules.container.Sequential The model's convolutional layers. fc : nn.Linear - fully connected output layer + The fully connected output layer. Methods ------- @@ -57,7 +56,17 @@ def __init__( self, conv_layers: int, activation: torch.nn.modules.activation, - ): + ) -> None: + """ + Initialize the neural network. + + Parameters + ---------- + conv_layers : int + The number of convolutional layers to use. + activation : torch.nn.modules.activation + The activation function to use. + """ super().__init__() layers = [] # Set up the model architecture (depending on number of convolutional layers specified). layers += [ @@ -95,7 +104,7 @@ def forward(self, x: torch.Tensor) -> torch.Tensor: x = self.conv_layers(x) x = x.view(b, 10 * 28 * 28) x = self.fc(x) - output = F.log_softmax(x, dim=1) + output = nn.functional.log_softmax(x, dim=1) return output @@ -158,95 +167,91 @@ def get_data_loaders( def torch_process_group_init(subgroup_comm: MPI.Comm, method) -> None: """ - Create the torch process group on the subgroup of the MPI world. + Create the torch process group of each multi-rank worker from a subgroup of the MPI world. Parameters ---------- subgroup_comm : MPI.Comm - the split communicator for the subgroup. This is provided to the individual's loss function - by the Islands class if there are multiple ranks per worker. + The split communicator for the multi-rank worker's subgroup. This is provided to the individual's loss function + by the ``Islands`` class if there are multiple ranks per worker. method : str - method to use to initialize the process group. - options: [nccl-slurm, nccl-openmpi, gloo] - if CUDA is not available, then gloo is automatically chosen for the method + The method to use to initialize the process group. + Options: [``nccl-slurm``, ``nccl-openmpi``, ``gloo``] + If CUDA is not available, ``gloo`` is automatically chosen for the method. """ global _DATA_PARALLEL_GROUP global _DATA_PARALLEL_ROOT - # done want different groups to use the same port - subgroup_id = MPI.COMM_WORLD.rank // subgroup_comm.size + + comm_rank, comm_size = subgroup_comm.rank, subgroup_comm.size + + # Get master address and port + # Don't want different groups to use the same port. + subgroup_id = MPI.COMM_WORLD.rank // comm_size port = 29500 + subgroup_id - # get master address and port - # os.environ["NCCL_ASYNC_ERROR_HANDLING"] = "0" - comm_size = subgroup_comm.Get_size() if comm_size == 1: return master_address = socket.gethostname() - # each subgroup needs to get the hostname of rank 0 of that group + # Each multi-rank worker rank needs to get the hostname of rank 0 of its subgroup. master_address = subgroup_comm.bcast(str(master_address), root=0) - # save env vars + # Save environment variables. os.environ["MASTER_ADDR"] = master_address - # use the default pytorch port + # Use the default PyTorch port. os.environ["MASTER_PORT"] = str(port) - comm_rank = subgroup_comm.Get_rank() - - nccl_world_size = comm_size - nccl_world_rank = comm_rank - # print(subgroup_comm.rank, subgroup_comm.size, master_address, port) if not torch.cuda.is_available(): method = "gloo" - log.info("No CUDA devices found: falling back to gloo") + log.info("No CUDA devices found: Falling back to gloo.") else: log.info(f"CUDA_VISIBLE_DEVICES: {os.environ['CUDA_VISIBLE_DEVICES']}") num_cuda_devices = torch.cuda.device_count() - dev_number = MPI.COMM_WORLD.rank % num_cuda_devices - log.info(f"device count: {num_cuda_devices}, device number: {dev_number}") - torch.cuda.set_device(dev_number) + device_number = MPI.COMM_WORLD.rank % num_cuda_devices + log.info(f"device count: {num_cuda_devices}, device number: {device_number}") + torch.cuda.set_device(device_number) - time.sleep(0.001 * comm_rank) # avoid DDOS'ing rank 0 - if method == "nccl-openmpi": + time.sleep(0.001 * comm_rank) # Avoid DDOS'ing rank 0. + if method == "nccl-openmpi": # Use NCCL with OpenMPI. dist.init_process_group( backend="nccl", - rank=subgroup_comm.rank, - world_size=subgroup_comm.size, + rank=comm_rank, + world_size=comm_size, ) - elif method == "nccl-slurm": + elif method == "nccl-slurm": # Use NCCL with a TCP store. wireup_store = dist.TCPStore( host_name=master_address, port=port, - world_size=nccl_world_size, - is_master=(nccl_world_rank == 0), + world_size=comm_size, + is_master=(comm_rank == 0), timeout=dt.timedelta(seconds=60), ) dist.init_process_group( backend="nccl", store=wireup_store, - world_size=nccl_world_size, - rank=nccl_world_rank, + world_size=comm_size, + rank=comm_rank, ) - elif method == "gloo": + elif method == "gloo": # Use gloo. wireup_store = dist.TCPStore( host_name=master_address, port=port, - world_size=nccl_world_size, - is_master=(nccl_world_rank == 0), + world_size=comm_size, + is_master=(comm_rank == 0), timeout=dt.timedelta(seconds=60), ) dist.init_process_group( backend="gloo", store=wireup_store, - world_size=nccl_world_size, - rank=nccl_world_rank, + world_size=comm_size, + rank=comm_rank, ) else: raise NotImplementedError( - f"Given 'method' ({method}) not in [nccl-openmpi, nccl-slurm, gloo]" + f"Given 'method' ({method}) not in [nccl-openmpi, nccl-slurm, gloo]!" ) - # make sure to call a barrier here in order for sharp to use the default comm: + # Call a barrier here in order for sharp to use the default comm. if dist.is_initialized(): dist.barrier() disttest = torch.ones(1) @@ -254,7 +259,7 @@ def torch_process_group_init(subgroup_comm: MPI.Comm, method) -> None: disttest = disttest.cuda() dist.all_reduce(disttest) - assert disttest[0] == nccl_world_size, "failed test of dist!" + assert disttest[0] == comm_size, "Failed test of dist!" else: disttest = None log.info( @@ -272,11 +277,13 @@ def ind_loss( ---------- params : Dict[str, int | float | str] The hyperparameters to be optimized evolutionarily. + subgroup_comm : MPI.Comm + Each multi-rank worker's subgroup communicator. Returns ------- float - The trained model's negative validation accuracy. + The trained model's validation loss. """ torch_process_group_init(subgroup_comm, method=SUBGROUP_COMM_METHOD) # Extract hyperparameter combination to test from input dictionary. @@ -308,16 +315,19 @@ def ind_loss( model = model.to(device) else: device = "cpu" + optimizer = optim.Adadelta(model.parameters(), lr=lr) - scheduler = StepLR(optimizer, step_size=1, gamma=gamma) + scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=1, gamma=gamma) log_interval = 10000 best_val_loss = 1000000 early_stopping_count, early_stopping_limit = 0, 5 set_new_best = False model.train() - for epoch in range(epochs): - # train loop ==================================================== - for batch_idx, (data, target) in enumerate(train_loader): + for epoch in range(epochs): # Loop over epochs. + # ------------ Train loop ------------ + for batch_idx, (data, target) in enumerate( + train_loader + ): # Loop over training batches. data, target = data.to(device), target.to(device) optimizer.zero_grad() output = model(data) @@ -326,9 +336,10 @@ def ind_loss( optimizer.step() if batch_idx % log_interval == 0 or batch_idx == len(train_loader) - 1: log.info( - f"Train Epoch: {epoch} [{batch_idx}/{len(train_loader)} ({100. * batch_idx / len(train_loader):.0f}%)]\tLoss: {loss.item():.6f}" + f"Train Epoch: {epoch} [{batch_idx}/{len(train_loader)} " + f"({100. * batch_idx / len(train_loader):.0f}%)]\tLoss: {loss.item():.6f}" ) - # val loop ====================================================== + # ------------ Validation loop ------------ model.eval() val_loss = 0 correct = 0 @@ -336,10 +347,10 @@ def ind_loss( for data, target in val_loader: data, target = data.to(device), target.to(device) output = model(data) - val_loss += loss_fn(output, target).item() # sum up batch loss + val_loss += loss_fn(output, target).item() # Sum up batch loss. pred = output.argmax( dim=1, keepdim=True - ) # get the index of the max log-probability + ) # Get the index of the max log-probability. correct += pred.eq(target.view_as(pred)).sum().item() val_loss /= len(val_loader.dataset) @@ -348,7 +359,8 @@ def ind_loss( set_new_best = True log.info( - f"\nTest set: Average loss: {val_loss:.4f}, Accuracy: {correct}/{len(val_loader.dataset)} ({100. * correct / len(val_loader.dataset):.0f}%)\n" + f"\nTest set: Average loss: {val_loss:.4f}, Accuracy: {correct}/{len(val_loader.dataset)} " + f"({100. * correct / len(val_loader.dataset):.0f}%)\n" ) if not set_new_best: @@ -357,11 +369,11 @@ def ind_loss( log.info("hit early stopping count, breaking") break - # scheduler step ================================================ + # ------------ Scheduler step ------------ scheduler.step() set_new_best = False - # Return best validation loss as an individual's loss (trained so lower is better) + # Return best validation loss as an individual's loss (trained so lower is better). dist.destroy_process_group() return best_val_loss @@ -370,7 +382,7 @@ def ind_loss( config, _ = parse_arguments() comm = MPI.COMM_WORLD - if comm.rank == 0: # Download data at the top, then we dont need to later + if comm.rank == 0: # Download data at the top, then we don't need to later. train_loader = DataLoader( dataset=MNIST( download=True, root=".", transform=None, train=True From 8eab486566be119eeda684399d59772b28017ea8 Mon Sep 17 00:00:00 2001 From: Marie Weiel Date: Wed, 8 May 2024 09:23:27 +0200 Subject: [PATCH 12/15] appease ruff --- tutorials/cmaes_example.py | 1 + tutorials/islands_example.py | 1 + tutorials/propulator_example.py | 1 + tutorials/pso_example.py | 1 + 4 files changed, 4 insertions(+) diff --git a/tutorials/cmaes_example.py b/tutorials/cmaes_example.py index 20b79503..46b47b9f 100644 --- a/tutorials/cmaes_example.py +++ b/tutorials/cmaes_example.py @@ -1,4 +1,5 @@ """Simple example script using CMA-ES.""" + import pathlib import random diff --git a/tutorials/islands_example.py b/tutorials/islands_example.py index e2db49ae..ce1db5fd 100755 --- a/tutorials/islands_example.py +++ b/tutorials/islands_example.py @@ -1,4 +1,5 @@ """Simple island model example script.""" + import pathlib import random diff --git a/tutorials/propulator_example.py b/tutorials/propulator_example.py index a4d169af..5f0813ca 100755 --- a/tutorials/propulator_example.py +++ b/tutorials/propulator_example.py @@ -1,4 +1,5 @@ """Simple Propulator example script using the default genetic propagator.""" + import pathlib import random diff --git a/tutorials/pso_example.py b/tutorials/pso_example.py index 3491cf4a..dc14df60 100644 --- a/tutorials/pso_example.py +++ b/tutorials/pso_example.py @@ -4,6 +4,7 @@ You can choose between benchmark functions and optimize them. The example shows how to set up Propulate in order to use it with PSO. """ + import pathlib import random From 5fa49b215557a15dc5a9d7be5055c6659ff33e8a Mon Sep 17 00:00:00 2001 From: coquelin77 Date: Wed, 8 May 2024 09:52:25 +0200 Subject: [PATCH 13/15] cleaned up dataset downloads --- tutorials/torch_ddp_example.py | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/tutorials/torch_ddp_example.py b/tutorials/torch_ddp_example.py index 15c3df6b..58b66d3d 100644 --- a/tutorials/torch_ddp_example.py +++ b/tutorials/torch_ddp_example.py @@ -383,17 +383,9 @@ def ind_loss( comm = MPI.COMM_WORLD if comm.rank == 0: # Download data at the top, then we don't need to later. - train_loader = DataLoader( - dataset=MNIST( - download=True, root=".", transform=None, train=True - ), # Use MNIST training dataset. - batch_size=2, # Batch size - num_workers=1, - pin_memory=True, - persistent_workers=True, - shuffle=True, # Shuffle data. - ) - del train_loader + dataset = MNIST(download=True, root=".", transform=None, train=True) + dataset = MNIST(download=True, root=".", transform=None, train=False) + del dataset comm.Barrier() pop_size = 2 * comm.size # Breeding population size limits = { From cb642d09b84a200633a5deff510dcb883a1385a0 Mon Sep 17 00:00:00 2001 From: Marie Weiel Date: Wed, 8 May 2024 10:03:39 +0200 Subject: [PATCH 14/15] clean up comments --- tutorials/torch_ddp_example.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/tutorials/torch_ddp_example.py b/tutorials/torch_ddp_example.py index 58b66d3d..69ed3407 100644 --- a/tutorials/torch_ddp_example.py +++ b/tutorials/torch_ddp_example.py @@ -1,7 +1,7 @@ """ Toy example for HP optimization / NAS in Propulate, using a simple CNN trained on MNIST in a data-parallel fashion. -This script was tested on two compute nodes with 4 GPUs each. Note that you need to adapt ``GPUS_PER_NODE`` in l. 25. +This script was tested on two compute nodes with 4 GPUs each. Note that you need to adapt ``GPUS_PER_NODE`` in l. 29. """ import datetime as dt @@ -135,7 +135,7 @@ def get_data_loaders( val_dataset = MNIST(download=False, root=".", transform=data_transform, train=False) if ( subgroup_comm.size > 1 - ): # need to make the samplers use the torch world to distributed data + ): # Make the samplers use the torch world to distribute data train_sampler = datadist.DistributedSampler(train_dataset) val_sampler = datadist.DistributedSampler(val_dataset) else: @@ -150,7 +150,7 @@ def get_data_loaders( num_workers=num_workers, pin_memory=True, persistent_workers=True, - shuffle=(train_sampler is None), # Shuffle data. + shuffle=(train_sampler is None), # Shuffle data only if no sampler is provided. sampler=train_sampler, ) val_loader = DataLoader( @@ -290,7 +290,7 @@ def ind_loss( conv_layers = params["conv_layers"] # Number of convolutional layers activation = params["activation"] # Activation function lr = params["lr"] # Learning rate - gamma = params["gamma"] + gamma = params["gamma"] # Learning rate reduction factor epochs = 20 @@ -302,9 +302,8 @@ def ind_loss( activation = activations[activation] # Get activation function. loss_fn = torch.nn.NLLLoss() - model = Net(conv_layers, activation) # Set up neural network with specified hyperparameters. - # model.best_accuracy = 0.0 # Initialize the model's best validation accuracy. + model = Net(conv_layers, activation) train_loader, val_loader = get_data_loaders( batch_size=8, subgroup_comm=subgroup_comm @@ -428,7 +427,7 @@ def ind_loss( migration_probability=config.migration_probability, # Migration probability pollination=config.pollination, # Whether to use pollination or migration checkpoint_path=config.checkpoint, # Checkpoint path - # ----- SPECIFIC FOR MULTI-RANK UCS ---- + # ----- SPECIFIC FOR MULTI-RANK UCS ----- ranks_per_worker=2, # Number of ranks per (multi rank) worker ) From d27069c8aa4f21233812b85626d894849995e250 Mon Sep 17 00:00:00 2001 From: Marie Weiel Date: Wed, 8 May 2024 10:08:24 +0200 Subject: [PATCH 15/15] introduce global initial dataloading in main --- tutorials/torch_example.py | 45 ++++++++++++++------------------------ 1 file changed, 16 insertions(+), 29 deletions(-) diff --git a/tutorials/torch_example.py b/tutorials/torch_example.py index 49a27b60..e671a52a 100755 --- a/tutorials/torch_example.py +++ b/tutorials/torch_example.py @@ -221,35 +221,17 @@ def get_data_loaders(batch_size: int) -> Tuple[DataLoader, DataLoader]: num_workers = NUM_WORKERS log.info(f"Use {num_workers} workers in dataloader.") - if MPI.COMM_WORLD.rank == 0: # Only root downloads data. - train_loader = DataLoader( - dataset=MNIST( - download=True, root=".", transform=data_transform, train=True - ), # Use MNIST training dataset. - batch_size=batch_size, # Batch size - num_workers=num_workers, - pin_memory=True, - persistent_workers=True, - shuffle=True, # Shuffle data. - ) - - # NOTE barrier only called, when dataset has not been downloaded yet - if not hasattr(get_data_loaders, "barrier_called"): - MPI.COMM_WORLD.Barrier() - - setattr(get_data_loaders, "barrier_called", True) - - if MPI.COMM_WORLD.rank != 0: - train_loader = DataLoader( - dataset=MNIST( - download=False, root=".", transform=data_transform, train=True - ), # Use MNIST training dataset. - batch_size=batch_size, # Batch size - num_workers=num_workers, - pin_memory=True, - persistent_workers=True, - shuffle=True, # Shuffle data. - ) + # Note that the MNIST dataset has already been downloaded before globally by rank 0 in the main part. + train_loader = DataLoader( + dataset=MNIST( + download=False, root=".", transform=data_transform, train=True + ), # Use MNIST training dataset. + batch_size=batch_size, # Batch size + num_workers=num_workers, + pin_memory=True, + persistent_workers=True, + shuffle=True, # Shuffle data. + ) val_loader = DataLoader( dataset=MNIST( download=False, root=".", transform=data_transform, train=False @@ -327,6 +309,11 @@ def ind_loss(params: Dict[str, Union[int, float, str]]) -> float: if __name__ == "__main__": comm = MPI.COMM_WORLD + if comm.rank == 0: # Download data at the top, then we don't need to later. + dataset = MNIST(download=True, root=".", transform=None, train=True) + dataset = MNIST(download=True, root=".", transform=None, train=False) + del dataset + comm.Barrier() num_generations = 10 # Number of generations pop_size = 2 * comm.size # Breeding population size limits = {