diff --git a/README.md b/README.md index a777d33..7c37830 100644 --- a/README.md +++ b/README.md @@ -24,8 +24,8 @@ The three main components of this framework are the `Communicator`, `Compressor` ## Supported frameworks Currently the supported frameworks are: -- Horovod 0.21.0 ([TensorFlow 1.15](grace_dl/tensorflow)) -- Horovod 0.21.0 ([PyTorch 1.7.0](grace_dl/torch)) +- Horovod 0.23.0 ([TensorFlow 1.15](grace_dl/tensorflow)) +- Horovod 0.23.0 ([PyTorch 1.7.1](grace_dl/torch)) - DistributedDataParallel (DDP) ([PyTorch >= 1.4.0](grace_dl/dist)) ## Usage diff --git a/examples/torch/pytorch_imagenet_resnet50-grace.py b/examples/torch/pytorch_imagenet_resnet50-grace.py new file mode 100644 index 0000000..be2fc40 --- /dev/null +++ b/examples/torch/pytorch_imagenet_resnet50-grace.py @@ -0,0 +1,307 @@ +import torch +import argparse +import torch.backends.cudnn as cudnn +import torch.multiprocessing as mp +import torch.nn.functional as F +import torch.optim as optim +import torch.utils.data.distributed +from torch.utils.tensorboard import SummaryWriter +from torchvision import datasets, transforms, models +import horovod.torch as hvd +import os +import math +from tqdm import tqdm +# Import GRACE +from grace_dl.torch.communicator.allgather import Allgather +from grace_dl.torch.compressor.topk import TopKCompressor +from grace_dl.torch.memory.residual import ResidualMemory +from grace_dl.torch.helper import grace_from_params + +# Training settings +parser = argparse.ArgumentParser(description='PyTorch ImageNet Example', + formatter_class=argparse.ArgumentDefaultsHelpFormatter) +parser.add_argument('--train-dir', default=os.path.expanduser('~/imagenet/train'), + help='path to training data') +parser.add_argument('--val-dir', default=os.path.expanduser('~/imagenet/validation'), + help='path to validation data') +parser.add_argument('--log-dir', default='./logs', + help='tensorboard log directory') +parser.add_argument('--checkpoint-format', default='./checkpoint-{epoch}.pth.tar', + help='checkpoint file format') +parser.add_argument('--batches-per-allreduce', type=int, default=1, + help='number of batches processed locally before ' + 'executing allreduce across workers; it multiplies ' + 'total batch size.') +parser.add_argument('--use-adasum', action='store_true', default=False, + help='use adasum algorithm to do reduction') +parser.add_argument('--gradient-predivide-factor', type=float, default=1.0, + help='apply gradient predivide factor in optimizer (default: 1.0)') + +# Default settings from https://arxiv.org/abs/1706.02677. +parser.add_argument('--batch-size', type=int, default=32, + help='input batch size for training') +parser.add_argument('--val-batch-size', type=int, default=32, + help='input batch size for validation') +parser.add_argument('--epochs', type=int, default=90, + help='number of epochs to train') +parser.add_argument('--base-lr', type=float, default=0.0125, + help='learning rate for a single GPU') +parser.add_argument('--warmup-epochs', type=float, default=5, + help='number of warmup epochs') +parser.add_argument('--momentum', type=float, default=0.9, + help='SGD momentum') +parser.add_argument('--wd', type=float, default=0.00005, + help='weight decay') + +# GRACE args +parser.add_argument('--grace-compressor', type=str, default='topk', + help='GRACE compressor to use') +parser.add_argument('--grace-mem', type=str, default='residual', + help='GRACE memory to use') +parser.add_argument('--grace-comm', type=str, default='allgather', + help='GRACE communicator to use') + + +parser.add_argument('--no-cuda', action='store_true', default=False, + help='disables CUDA training') +parser.add_argument('--seed', type=int, default=42, + help='random seed') + + +def train(epoch): + model.train() + train_sampler.set_epoch(epoch) + train_loss = Metric('train_loss') + train_accuracy = Metric('train_accuracy') + + with tqdm(total=len(train_loader), + desc='Train Epoch #{}'.format(epoch + 1), + disable=not verbose) as t: + for batch_idx, (data, target) in enumerate(train_loader): + adjust_learning_rate(epoch, batch_idx) + + if args.cuda: + data, target = data.cuda(), target.cuda() + optimizer.zero_grad() + # Split data into sub-batches of size batch_size + for i in range(0, len(data), args.batch_size): + data_batch = data[i:i + args.batch_size] + target_batch = target[i:i + args.batch_size] + output = model(data_batch) + train_accuracy.update(accuracy(output, target_batch)) + loss = F.cross_entropy(output, target_batch) + train_loss.update(loss) + # Average gradients among sub-batches + loss.div_(math.ceil(float(len(data)) / args.batch_size)) + loss.backward() + # Gradient is applied across all ranks + optimizer.step() + t.set_postfix({'loss': train_loss.avg.item(), + 'accuracy': 100. * train_accuracy.avg.item()}) + t.update(1) + + if log_writer: + log_writer.add_scalar('train/loss', train_loss.avg, epoch) + log_writer.add_scalar('train/accuracy', train_accuracy.avg, epoch) + + +def validate(epoch): + model.eval() + val_loss = Metric('val_loss') + val_accuracy = Metric('val_accuracy') + + with tqdm(total=len(val_loader), + desc='Validate Epoch #{}'.format(epoch + 1), + disable=not verbose) as t: + with torch.no_grad(): + for data, target in val_loader: + if args.cuda: + data, target = data.cuda(), target.cuda() + output = model(data) + + val_loss.update(F.cross_entropy(output, target)) + val_accuracy.update(accuracy(output, target)) + t.set_postfix({'loss': val_loss.avg.item(), + 'accuracy': 100. * val_accuracy.avg.item()}) + t.update(1) + + if log_writer: + log_writer.add_scalar('val/loss', val_loss.avg, epoch) + log_writer.add_scalar('val/accuracy', val_accuracy.avg, epoch) + + +# Horovod: using `lr = base_lr * hvd.size()` from the very beginning leads to worse final +# accuracy. Scale the learning rate `lr = base_lr` ---> `lr = base_lr * hvd.size()` during +# the first five epochs. See https://arxiv.org/abs/1706.02677 for details. +# After the warmup reduce learning rate by 10 on the 30th, 60th and 80th epochs. +def adjust_learning_rate(epoch, batch_idx): + if epoch < args.warmup_epochs: + epoch += float(batch_idx + 1) / len(train_loader) + lr_adj = 1. / hvd.size() * (epoch * (hvd.size() - 1) / args.warmup_epochs + 1) + elif epoch < 30: + lr_adj = 1. + elif epoch < 60: + lr_adj = 1e-1 + elif epoch < 80: + lr_adj = 1e-2 + else: + lr_adj = 1e-3 + for param_group in optimizer.param_groups: + param_group['lr'] = args.base_lr * hvd.size() * args.batches_per_allreduce * lr_adj + + +def accuracy(output, target): + # get the index of the max log-probability + pred = output.max(1, keepdim=True)[1] + return pred.eq(target.view_as(pred)).cpu().float().mean() + + +def save_checkpoint(epoch): + if hvd.rank() == 0: + filepath = args.checkpoint_format.format(epoch=epoch + 1) + state = { + 'model': model.state_dict(), + 'optimizer': optimizer.state_dict(), + } + torch.save(state, filepath) + + +# Horovod: average metrics from distributed training. +class Metric(object): + def __init__(self, name): + self.name = name + self.sum = torch.tensor(0.) + self.n = torch.tensor(0.) + + def update(self, val): + self.sum += hvd.allreduce(val.detach().cpu(), name=self.name) + self.n += 1 + + @property + def avg(self): + return self.sum / self.n + + +if __name__ == '__main__': + args = parser.parse_args() + args.cuda = not args.no_cuda and torch.cuda.is_available() + + allreduce_batch_size = args.batch_size * args.batches_per_allreduce + + hvd.init() + torch.manual_seed(args.seed) + + if args.cuda: + # Horovod: pin GPU to local rank. + torch.cuda.set_device(hvd.local_rank()) + torch.cuda.manual_seed(args.seed) + + cudnn.benchmark = True + + # If set > 0, will resume training from a given checkpoint. + resume_from_epoch = 0 + for try_epoch in range(args.epochs, 0, -1): + if os.path.exists(args.checkpoint_format.format(epoch=try_epoch)): + resume_from_epoch = try_epoch + break + + # Horovod: broadcast resume_from_epoch from rank 0 (which will have + # checkpoints) to other ranks. + resume_from_epoch = hvd.broadcast(torch.tensor(resume_from_epoch), root_rank=0, + name='resume_from_epoch').item() + + # Horovod: print logs on the first worker. + verbose = 1 if hvd.rank() == 0 else 0 + + # Horovod: write TensorBoard logs on first worker. + log_writer = SummaryWriter(args.log_dir) if hvd.rank() == 0 else None + + # Horovod: limit # of CPU threads to be used per worker. + torch.set_num_threads(4) + + kwargs = {'num_workers': 4, 'pin_memory': True} if args.cuda else {} + # When supported, use 'forkserver' to spawn dataloader workers instead of 'fork' to prevent + # issues with Infiniband implementations that are not fork-safe + if (kwargs.get('num_workers', 0) > 0 and hasattr(mp, '_supports_context') and + mp._supports_context and 'forkserver' in mp.get_all_start_methods()): + kwargs['multiprocessing_context'] = 'forkserver' + + train_dataset = \ + datasets.ImageFolder(args.train_dir, + transform=transforms.Compose([ + transforms.RandomResizedCrop(224), + transforms.RandomHorizontalFlip(), + transforms.ToTensor(), + transforms.Normalize(mean=[0.485, 0.456, 0.406], + std=[0.229, 0.224, 0.225]) + ])) + # Horovod: use DistributedSampler to partition data among workers. Manually specify + # `num_replicas=hvd.size()` and `rank=hvd.rank()`. + train_sampler = torch.utils.data.distributed.DistributedSampler( + train_dataset, num_replicas=hvd.size(), rank=hvd.rank()) + train_loader = torch.utils.data.DataLoader( + train_dataset, batch_size=allreduce_batch_size, + sampler=train_sampler, **kwargs) + + val_dataset = \ + datasets.ImageFolder(args.val_dir, + transform=transforms.Compose([ + transforms.Resize(256), + transforms.CenterCrop(224), + transforms.ToTensor(), + transforms.Normalize(mean=[0.485, 0.456, 0.406], + std=[0.229, 0.224, 0.225]) + ])) + val_sampler = torch.utils.data.distributed.DistributedSampler( + val_dataset, num_replicas=hvd.size(), rank=hvd.rank()) + val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=args.val_batch_size, + sampler=val_sampler, **kwargs) + + + # Set up standard ResNet-50 model. + model = models.resnet50() + + # By default, Adasum doesn't need scaling up learning rate. + # For sum/average with gradient Accumulation: scale learning rate by batches_per_allreduce + lr_scaler = args.batches_per_allreduce * hvd.size() if not args.use_adasum else 1 + + if args.cuda: + # Move model to GPU. + model.cuda() + # If using GPU Adasum allreduce, scale learning rate by local_size. + if args.use_adasum and hvd.nccl_built(): + lr_scaler = args.batches_per_allreduce * hvd.local_size() + + # Horovod: scale learning rate by the number of GPUs. + optimizer = optim.SGD(model.parameters(), + lr=(args.base_lr * + lr_scaler), + momentum=args.momentum, weight_decay=args.wd) + + # Horovod: compression with GRACE + params = {'compressor': args.grace_compressor, 'memory': args.grace_mem, 'communicator': args.grace_comm} + grc = grace_from_params(params) + + # Horovod: wrap optimizer with DistributedOptimizer. + optimizer = hvd.DistributedOptimizer( + optimizer, grc, named_parameters=model.named_parameters(), + backward_passes_per_step=args.batches_per_allreduce, + op=hvd.Adasum if args.use_adasum else hvd.Average, + gradient_predivide_factor=args.gradient_predivide_factor) + + # Restore from a previous checkpoint, if initial_epoch is specified. + # Horovod: restore on the first worker which will broadcast weights to other workers. + if resume_from_epoch > 0 and hvd.rank() == 0: + filepath = args.checkpoint_format.format(epoch=resume_from_epoch) + checkpoint = torch.load(filepath) + model.load_state_dict(checkpoint['model']) + optimizer.load_state_dict(checkpoint['optimizer']) + + # Horovod: broadcast parameters & optimizer state. + hvd.broadcast_parameters(model.state_dict(), root_rank=0) + hvd.broadcast_optimizer_state(optimizer, root_rank=0) + + for epoch in range(resume_from_epoch, args.epochs): + train(epoch) + validate(epoch) + save_checkpoint(epoch) diff --git a/patch_files/horovod/torch/optimizer.py b/patch_files/horovod/torch/optimizer.py index ffe7d42..9e28767 100644 --- a/patch_files/horovod/torch/optimizer.py +++ b/patch_files/horovod/torch/optimizer.py @@ -25,18 +25,21 @@ from horovod.torch.compression import Compression from horovod.torch.functions import broadcast_object -from horovod.torch.mpi_ops import allreduce_async_, grouped_allreduce_async_ +from horovod.torch.mpi_ops import allreduce_async_, grouped_allreduce_async_, sparse_allreduce_async from horovod.torch.mpi_ops import synchronize from horovod.torch.mpi_ops import size from horovod.torch.mpi_ops import Average, Adasum, Sum from horovod.torch.mpi_ops import rocm_built +from horovod.torch.mpi_ops import ProcessSet, global_process_set class _DistributedOptimizer(torch.optim.Optimizer): def __init__(self, params, named_parameters, grace, compression, backward_passes_per_step=1, op=Average, gradient_predivide_factor=1.0, - num_groups=0): + groups=None, + sparse_as_dense=False, + process_set=global_process_set): super(self.__class__, self).__init__(params) self._compression = compression self._grace = grace @@ -44,9 +47,9 @@ def __init__(self, params, named_parameters, grace, compression, if named_parameters is not None: named_parameters = list(named_parameters) else: - named_parameters = [('allreduce.noname.%s' % i, v) - for param_group in self.param_groups - for i, v in enumerate(param_group['params'])] + named_parameters = [(f'allreduce.noname.{i}.{j}', v) + for i, param_group in enumerate(self.param_groups) + for j, v in enumerate(param_group['params'])] # make sure that named_parameters are tuples if any([not isinstance(p, tuple) for p in named_parameters]): raise ValueError('named_parameters should be a sequence of ' @@ -74,15 +77,33 @@ def __init__(self, params, named_parameters, grace, compression, for _, v in sorted(named_parameters)} self.op = op self.gradient_predivide_factor = gradient_predivide_factor + self.sparse_as_dense = sparse_as_dense + self.process_set = process_set + self._handles = {} self._grad_accs = [] self._requires_update = set() self._synchronized = False self._should_synchronize = True - self._num_groups = num_groups + + if groups is not None: + if not (isinstance(groups, list) or groups > 0): + raise ValueError('groups should be a non-negative integer or ' + 'a list of list of torch.Tensor.') + if isinstance(groups, list): + grouped_parameter_ids = set() + for l in groups: + for p in l: + if not isinstance(p, torch.Tensor): + raise ValueError('groups must consist of torch.Tensor.') + if id(p) in grouped_parameter_ids: + raise ValueError('A parameter can only appear once in groups.') + grouped_parameter_ids.add(id(p)) + self._groups = groups self._p_to_group = {} self._group_counts = {} - if size() > 1 or os.environ.get('HOROVOD_ELASTIC') == '1': + + if self.process_set.included() and (size() > 1 or os.environ.get('HOROVOD_ELASTIC') == '1'): self._register_hooks() def load_state_dict(self, *args, **kwargs): @@ -109,8 +130,7 @@ def set_backward_passes_per_step(self, passes): self._allreduce_delay[p] = self.backward_passes_per_step def _register_hooks(self): - - if self._num_groups > 0: + if self._groups is not None: p_list = [] # Get list of parameters with grads for param_group in self.param_groups: @@ -121,21 +141,33 @@ def _register_hooks(self): # To ensure parameter order and group formation is consistent, broadcast p_list order # from rank 0 and use for every worker p_list_names = [self._parameter_names.get(p) for p in p_list] - p_list_names = broadcast_object(p_list_names, root_rank=0) - p_list = sorted(p_list, key=lambda p : p_list_names.index(self._parameter_names.get(p))) + p_list_names = broadcast_object(p_list_names, root_rank=0, process_set=self.process_set) + p_list = sorted(p_list, key=lambda p: p_list_names.index(self._parameter_names.get(p))) # Form groups - p_groups = split_list(p_list, self._num_groups) + if isinstance(self._groups, list): + p_groups = [] + grouped_id = set() + p_list_ids = [id(p) for p in p_list] + for group in self._groups: + p_groups.append([p for p in group if id(p) in p_list_ids]) + for p in p_groups[-1]: + grouped_id.add(id(p)) + for p in p_list: + if id(p) not in grouped_id: + p_groups.append([p]) + else: + p_groups = split_list(p_list, self._groups) + p_groups = [tuple(p) for p in p_groups] for group in p_groups: - for p in group: - self._p_to_group[p] = group - self._group_counts[group] = 0 + for p in group: + self._p_to_group[p] = group + self._group_counts[group] = 0 for param_group in self.param_groups: for p in param_group['params']: if p.requires_grad: - p.grad = p.data.new(p.size()).zero_() self._requires_update.add(p) p_tmp = p.expand_as(p) grad_acc = p_tmp.grad_fn.next_functions[0][0] @@ -143,16 +175,32 @@ def _register_hooks(self): self._grad_accs.append(grad_acc) def _allreduce_grad_async(self, p): + if p.grad is None: + # Gradient was not computed, but we still need to submit a tensor to allreduce + # as one of the other ranks may have computed it (due to dynamic forward functions). + # + # NOTE: this will not work if the gradient is sparse and we perform an allgather. + # Unfrotunately, there doesn't appear to be a good way to detect that the parameter will + # produce sparse gradients before computing the gradient. + p.grad = p.data.new(p.size()).zero_() + name = self._parameter_names.get(p) tensor = p.grad - if self._grace and self._num_groups == 0 and self.op == Average: + + if p.grad.is_sparse: + if self.sparse_as_dense: + tensor = tensor.to_dense() + else: + return self._sparse_allreduce_grad_async(p, name) + + if self._grace and self._groups is None and self.op == Average: handle, ctx = self._grace.send_step(tensor, name) else: tensor_compressed, ctx = self._compression.compress(tensor) if self.op == Average: - # Split average operation across pre/postscale factors - # C++ backend will apply additional 1 / size() factor to postscale_factor for op == Average. + # Split average operation across pre/postscale factors + # C++ backend will apply additional 1 / size() factor to postscale_factor for op == Average. prescale_factor = 1.0 / self.gradient_predivide_factor postscale_factor = self.gradient_predivide_factor else: @@ -161,16 +209,23 @@ def _allreduce_grad_async(self, p): handle = allreduce_async_(tensor_compressed, name=name, op=self.op, prescale_factor=prescale_factor, - postscale_factor=postscale_factor) + postscale_factor=postscale_factor, + process_set=self.process_set) return handle, ctx def _grouped_allreduce_grad_async(self, ps): name = self._parameter_names.get(ps[0]) tensors_compressed, ctxs = zip(*[self._compression.compress(p.grad) for p in ps]) - handle = grouped_allreduce_async_(tensors_compressed, name=name, op=self.op) + handle = grouped_allreduce_async_(tensors_compressed, name=name, op=self.op, + process_set=self.process_set) return handle, ctxs + def _sparse_allreduce_grad_async(self, p, name): + handle = sparse_allreduce_async(p.grad, name=name, op=self.op, + process_set=self.process_set) + return handle, None + def _make_hook(self, p): def hook(*ignore): if p in self._handles and self._handles[p][0] is not None: @@ -185,7 +240,7 @@ def hook(*ignore): handle, ctx = None, None self._allreduce_delay[p] -= 1 if self._allreduce_delay[p] == 0: - if self._num_groups > 0: + if self._groups is not None: group = self._p_to_group[p] self._group_counts[group] += 1 if self._group_counts[group] == len(group): @@ -202,6 +257,10 @@ def hook(*ignore): return hook def synchronize(self): + if not self.process_set.included(): + self._synchronized = True + return + completed = set() for x in self._handles.keys(): completed.update(x) if isinstance(x, tuple) else completed.add(x) @@ -214,24 +273,40 @@ def synchronize(self): if handle is None: handle, ctx = self._allreduce_grad_async(p) self._handles[p] = (handle, ctx) - for p, (handle, ctx) in self._handles.items(): + if isinstance(p, tuple): # This was a grouped result, need to unpack outputs = synchronize(handle) for gp, output, gctx in zip(p, outputs, ctx): self._allreduce_delay[gp] = self.backward_passes_per_step gp.grad.set_(self._compression.decompress(output, gctx)) + if self._groups is not None and self._group_counts[p] != 0: + self._group_counts[p] = 0 else: - if self._grace and self._num_groups == 0 and self.op == Average: + if self._grace and self._groups is None and self.op == Average: # in GRACE, p is not tuple, but handle is. output = self._grace.receive_step(handle, ctx) self._allreduce_delay[p] = self.backward_passes_per_step p.grad.set_(output) else: - output = synchronize(handle) + # When handle is a callable function, it returns the aggregated tensor result + output = synchronize(handle) if not callable(handle) else handle() self._allreduce_delay[p] = self.backward_passes_per_step - p.grad.set_(self._compression.decompress(output, ctx)) + if self._groups is not None: + group = self._p_to_group[p] + if self._group_counts[group] != 0: + self._group_counts[group] = 0 + if p.grad.is_sparse: + aggregated = self._compression.decompress(output, ctx) + if not aggregated.is_sparse: + # When sparse_as_dense=True we need to convert the grad back to sparse before update + aggregated = aggregated.to_sparse() + + # Sparse grads do not support set_ for some reason, so we do this as an equivalent + p.grad.zero_().add_(aggregated) + else: + p.grad.set_(self._compression.decompress(output, ctx)) self._handles.clear() self._synchronized = True @@ -448,13 +523,15 @@ def zero_grad(self): return super(self.__class__, self).zero_grad() -def DistributedOptimizer(optimizer, +def DistributedOptimizer(optimizer, grace=None, named_parameters=None, compression=Compression.none, backward_passes_per_step=1, op=Average, gradient_predivide_factor=1.0, - num_groups=0): + num_groups=0, groups=None, + sparse_as_dense=False, + process_set=global_process_set): """ An optimizer that wraps another torch.optim.Optimizer, using an allreduce to combine gradient values before applying gradients to model weights. @@ -499,6 +576,18 @@ def DistributedOptimizer(optimizer, gradient_predivide_factor / size after the sum. num_groups: Number of groups to assign gradient allreduce ops to for explicit grouping. Defaults to no explicit groups. + groups: The parameter to group the gradient allreduce ops. Accept values is a + non-negative integer or a list of list of torch.Tensor. + If groups is a non-negative integer, it is the number of groups to assign + gradient allreduce ops to for explicit grouping. + If groups is a list of list of torch.Tensor. Tensors in the same + inner list will be assigned to the same group, while parameter that does + not appear in any list will form a group itself. + Defaults as None, which is no explicit groups. + sparse_as_dense: If set True, convert all sparse gradients to dense and perform allreduce, then + convert back to sparse before applying the update. + process_set: Gradients will only be reduced over Horovod processes belonging + to this process set. Defaults to the global process set. """ # We dynamically create a new class that inherits from the optimizer that was passed in. # The goal is to override the `step()` method with an allreduce implementation. @@ -508,12 +597,20 @@ def DistributedOptimizer(optimizer, if op != Average: raise ValueError('gradient_predivide_factor not supported with op != Average') + if num_groups != 0: + warnings.warn('Parameter `num_groups` has been replaced by `groups` ' + 'and will be removed in v0.23.0.', DeprecationWarning) + if groups is None: + groups = num_groups + if op != Adasum or size() == 1: cls = type(optimizer.__class__.__name__, (optimizer.__class__,), dict(_DistributedOptimizer.__dict__)) return cls(optimizer.param_groups, named_parameters, grace, compression, backward_passes_per_step, op, - gradient_predivide_factor, num_groups) + gradient_predivide_factor, groups, sparse_as_dense, process_set) else: + if process_set != global_process_set: + raise NotImplementedError("Adasum does not support non-global process sets yet.") cls = type(optimizer.__class__.__name__, (optimizer.__class__,), dict(_DistributedAdasumOptimizer.__dict__)) return cls(optimizer.param_groups, named_parameters, compression, backward_passes_per_step)