diff --git a/benchmarks/gems_model/benchmark_amoebanet_gems+spatial.py b/benchmarks/gems_model/benchmark_amoebanet_gems+spatial.py index 6bd627bb..43817ccc 100644 --- a/benchmarks/gems_model/benchmark_amoebanet_gems+spatial.py +++ b/benchmarks/gems_model/benchmark_amoebanet_gems+spatial.py @@ -1,3 +1,21 @@ +# Copyright 2023, The Ohio State University. All rights reserved. +# The MPI4DL software package is developed by the team members of +# The Ohio State University's Network-Based Computing Laboratory (NBCL), +# headed by Professor Dhabaleswar K. (DK) Panda. +# +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import torch import torch.distributed as dist import torchvision.transforms as transforms @@ -6,6 +24,7 @@ import time import sys import math +import logging from torchgems import parser from torchgems.mp_pipeline import model_generator from torchgems.train_spatial import get_shapes_spatial, split_input @@ -18,6 +37,9 @@ parser_obj = parser.get_parser() args = parser_obj.parse_args() +if args.verbose: + logging.basicConfig(level=logging.DEBUG) + if args.halo_d2: from models import amoebanet from models import amoebanet_d2 @@ -63,46 +85,47 @@ def get_depth(version, n): # Example of GEMS + SPATIAL split_size = 2, spatial_size = 1, num_spatial_parts = 4 # +# split_size = 5, spatial_size = 1, num_spatial_parts = 4 are not valid as ranks 1, 2, 3 are used by spatial parts from both the model. # Model 1: -# _______________ ____ -# | 0(0)| 1(1) | | | -# |-------|-------| --------->|4(4)| -# | 2(2) | 3(3) | | | -# |_______|_______| |____| +# _______________ ____ ____ ____ ____ +# | 0(0) | 1(1) | | | | | | | | | +# |-------|-------|----->|4(4)|----->|5(5)|----->|6(6)|----->|7(7)| +# | 2(2) | 3(3) | | | | | | | | | +# |_______|_______| |____| |____| |____| |____| # # Model 2 (INVERSE GEMS): -# _______________ ____ -# | 0(4) | 1(3) | | | -# |-------|-------| --------->|4(0)| -# | 2(2) | 3(1) | | | -# |_______|_______| |____| +# _______________ ____ ____ ____ ____ +# | 0(7) | 1(6) | | | | | | | | | +# |-------|-------|----->|4(3)|----->|5(2)|----->|6(1)|----->|7(0)| +# | 2(5) | 3(4) | | | | | | | | | +# |_______|_______| |____| |____| |____| |____| # # Numbers inside the brackets () refer to World Rank # whereas outside numbers refer to local rank for each model -# torch.set_num_threads(1) np.random.seed(seed=1405) parts = args.parts batch_size = args.batch_size resnet_n = 12 -epoch = args.num_epochs +epochs = args.num_epochs ENABLE_ASYNC = True # APP # 1: Medical # 2: Cifar # 3: synthetic -APP = 3 -amoebanet_test = False +APP = args.app image_size = int(args.image_size) -print("image size", image_size) -steps = 100 num_layers = args.num_layers num_filters = args.num_filters balance = args.balance split_size = args.split_size spatial_size = args.spatial_size slice_method = args.slice_method +times = args.times +datapath = args.datapath +num_classes = args.num_classes +LOCAL_DP_LP = args.local_DP ENABLE_MASTER_OPT = args.enable_master_comm_opt temp_num_spatial_parts = args.num_spatial_parts.split(",") @@ -116,11 +139,6 @@ def get_depth(version, n): spatial_part_size = num_spatial_parts_list[0] # Partition size for spatial parallelism -times = args.times -num_classes = args.num_classes -LOCAL_DP_LP = args.local_DP - - mpi_comm_first = gems_comm.MPIComm( split_size=split_size, ENABLE_MASTER=False, @@ -150,12 +168,6 @@ def get_depth(version, n): gems_comm.sync_comms_for_master(mpi_comm_first, mpi_comm_second) comm_size = mpi_comm_first.size -# rank = mpi_comm.local_rank -# comm_size = mpi_comm.size -# local_rank = rank - -# split_rank = mpi_comm.split_rank - if args.balance != None: balance = args.balance.split(",") @@ -164,24 +176,38 @@ def get_depth(version, n): balance = None +##################### AmoebaNet model specific parameters ##################### + +""" +"image_size_seq" is required to determine the output shape after spatial partitioning of images. +The shape of the output will be determined for each model partition based on the values in "image_size_seq." +These values will then be used to calculate the output shape for a given input size and spatial partition. +""" image_size_seq = 512 +############################################################################### + +# Initialize AmoebaNet model model_seq = amoebanet.amoebanetd( num_layers=num_layers, num_filters=num_filters, num_classes=num_classes ) -print("length", len(model_seq), balance) + +# Initialize parameters for Model Parallelism model_gen_seq = model_generator( model=model_seq, split_size=split_size, input_size=(int(batch_size / parts), 3, image_size_seq, image_size_seq), balance=balance, ) +# Get the shape of model on each split rank for image_size_seq and move it to device +# Note : we take shape w.r.t image_size_seq as model w.r.t image_size may not be +# able to fit in memory model_gen_seq.ready_model( split_rank=mpi_comm_second.split_rank, GET_SHAPES_ON_CUDA=True ) +# Get the shape of model on each split rank for image_size and number of spatial parts image_size_times = int(image_size / image_size_seq) - resnet_shapes_list = get_shapes_spatial( shape_list=model_gen_seq.shape_list, slice_method=slice_method, @@ -190,13 +216,11 @@ def get_depth(version, n): image_size_times=image_size_times, ) -print(model_gen_seq.shape_list, resnet_shapes_list) - del model_seq del model_gen_seq torch.cuda.ipc_collect() - +# Initialize AmoebaNet model with Spatial and Model Parallelism support if args.halo_d2: model1 = amoebanet_d2.amoebanetd_spatial( local_rank=mpi_comm_first.local_rank % mpi_comm_first.total_spatial_processes, @@ -254,9 +278,12 @@ def get_depth(version, n): balance=balance, shape_list=resnet_shapes_list, ) -model_gen1.ready_model(split_rank=mpi_comm_first.split_rank) -# model_gen1.DDP_model(mpi_comm_first, num_spatial_parts, spatial_size, bucket_size=25, local_rank = mpi_comm_first.local_rank) +# Move model it it's repective devices +model_gen1.ready_model(split_rank=mpi_comm_first.split_rank) +logging.info( + f"Shape of model1 on local_rank {mpi_comm_first.local_rank } : {model_gen1.shape_list}" +) model_gen2 = model_generator( model=model2, @@ -265,22 +292,13 @@ def get_depth(version, n): balance=balance, shape_list=resnet_shapes_list, ) -model_gen2.ready_model(split_rank=mpi_comm_second.split_rank) -# model_gen2.DDP_model(mpi_comm_second, num_spatial_parts, spatial_size, bucket_size=25, local_rank = mpi_comm_second.local_rank) - - -# model_gen.mp_size = 5 -print("Shape list", resnet_shapes_list) - - -# t_s1 = train_model_spatial(model_gen1, mpi_comm_first.local_rank,batch_size,epochs=1, spatial_size=spatial_size, num_spatial_parts=num_spatial_parts ,criterion=None,optimizer=None,parts=parts,ASYNC=True,GEMS_INVERSE=False, slice_method = args.slice_method, -# LOCAL_DP_LP=LOCAL_DP_LP, -# mpi_comm = mpi_comm_first) +# Move model it it's repective devices +model_gen2.ready_model(split_rank=mpi_comm_second.split_rank) +logging.info( + f"Shape of model2 on local_rank {mpi_comm_first.local_rank } : {model_gen2.shape_list}" +) -# t_s2 = train_model_spatial(model_gen2, mpi_comm_second.local_rank,batch_size,epochs=1, spatial_size=spatial_size, num_spatial_parts=num_spatial_parts ,criterion=None,optimizer=None,parts=parts,ASYNC=True,GEMS_INVERSE=True, slice_method = args.slice_method, -# LOCAL_DP_LP=LOCAL_DP_LP, -# mpi_comm = mpi_comm_second) t_s_master = train_spatial_model_master( model_gen1, @@ -299,11 +317,7 @@ def get_depth(version, n): replications=int(args.times / 2), ) -x = torch.zeros( - (batch_size, 3, int(image_size / 2), int(image_size / 2)), device="cuda" -) -y = torch.zeros((batch_size,), dtype=torch.long, device="cuda") - +############################## Dataset Definition ############################## transform = transforms.Compose( [transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))] @@ -312,7 +326,7 @@ def get_depth(version, n): if APP == 1: trainset = torchvision.datasets.ImageFolder( - "/usr/workspace/jain8/project/cancer/1024_1024_5/train", + datapath, transform=transform, target_transform=None, ) @@ -325,8 +339,16 @@ def get_depth(version, n): ) size_dataset = 1030 elif APP == 2: + transform = transforms.Compose( + [ + transforms.Resize((512, 512)), + transforms.ToTensor(), + transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)), + ] + ) + torch.manual_seed(0) trainset = torchvision.datasets.CIFAR10( - root="./data", train=True, download=True, transform=transform + root=datapath, train=True, download=True, transform=transform ) my_dataloader = torch.utils.data.DataLoader( trainset, @@ -335,7 +357,7 @@ def get_depth(version, n): num_workers=0, pin_memory=True, ) - size_dataset = 50000 + size_dataset = len(my_dataloader.dataset) else: my_dataset = torchvision.datasets.FakeData( size=10 * batch_size * args.times, @@ -354,37 +376,30 @@ def get_depth(version, n): ) size_dataset = 10 * batch_size - -# sync_allreduce.sync_model_spatial(model_gen) -perf = [] +################################################################################ sync_comm = gems_comm.SyncAllreduce(mpi_comm_first) +################################# Train Model ################################## -MASTER = args.times - -print("ENABLE_MASTER_OPT", ENABLE_MASTER_OPT) +perf = [] def run_epoch(): - for i_e in range(epoch): + for i_e in range(epochs): loss = 0 correct = 0 t = time.time() - for i, data in enumerate(my_dataloader, 0): + size = len(my_dataloader.dataset) + for batch, data in enumerate(my_dataloader, 0): start_event = torch.cuda.Event(enable_timing=True, blocking=True) end_event = torch.cuda.Event(enable_timing=True, blocking=True) start_event.record() - if i > math.floor(size_dataset / (times * batch_size)) - 1: + if batch > math.floor(size_dataset / (times * batch_size)) - 1: break - # inputs=data_x - # labels = data_y - inputs, labels = data - # inputs = inputs.to(device) - # labels = labels.to(device) + inputs, labels = data - # t= time.time() if mpi_comm_first.local_rank < num_spatial_parts_list[0]: x = split_input( inputs=inputs, @@ -404,20 +419,15 @@ def run_epoch(): else: x = inputs - # for j in range(MASTER): - - # temp_loss,temp_correct = t_s1.run_step(x,labels) - # temp_loss,temp_correct = t_s2.run_step(x,labels) - if ENABLE_MASTER_OPT: - temp_loss, temp_correct = t_s_master.run_step_allreduce( - x, labels, i % 2 == 1 + local_loss, local_correct = t_s_master.run_step_allreduce( + x, labels, batch % 2 == 1 ) else: - temp_loss, temp_correct = t_s_master.run_step(x, labels) + local_loss, local_correct = t_s_master.run_step(x, labels) - loss += temp_loss - correct += temp_correct + loss += local_loss + correct += local_correct start_event_allreduce = torch.cuda.Event(enable_timing=True, blocking=True) end_event_allreduce = torch.cuda.Event(enable_timing=True, blocking=True) @@ -425,21 +435,13 @@ def run_epoch(): t_allreduce_temp = time.time() if ENABLE_MASTER_OPT == False: - print("benchmark_amoebanet_gems+spatial : START ALL REDUCE OPERATION") sync_comm.apply_allreduce_master_master( model_gen1, model_gen2, mpi_comm_first, mpi_comm_second ) - - """ - if(local_rank < spatial_size * num_spatial_parts): - None - #No need for this as, DDP is now used - # sync_allreduce.apply_allreduce(model_gen,mpi_comm.spatial_allreduce_grp) - """ torch.cuda.synchronize() if ENABLE_MASTER_OPT: - if i % 2 == 1: + if batch % 2 == 1: t_s_master.train_model1.update() else: t_s_master.train_model2.update() @@ -453,8 +455,9 @@ def run_epoch(): t_allreduce = time.time() - t_allreduce_temp if mpi_comm_second.local_rank == comm_size - 1: - None - # print("Step",i," LOSS",temp_loss, " Global loss:",loss/(i+1), " Acc:",temp_correct) + logging.info( + f"Step :{batch}, LOSS: {local_loss}, Global loss: {loss/(batch+1)} Acc: {local_correct} [{batch * len(inputs):>5d}/{size:>5d}]" + ) if ENABLE_MASTER_OPT: torch.distributed.barrier() @@ -463,24 +466,20 @@ def run_epoch(): torch.cuda.synchronize() t = start_event.elapsed_time(end_event) / 1000 if mpi_comm_second.local_rank == 0: - None print( - "images per sec:", - batch_size / t, - "Time:", - t, - " Time Allreduce:", - t_allreduce, + f"Epoch: {i_e} images per sec:{batch_size / t} Time:{t} Time Allreduce:{t_allreduce}" ) perf.append(batch_size / t) t = time.time() if mpi_comm_second.local_rank == comm_size - 1: - print("epoch", i_e, " Global loss:", loss, " acc", correct / i) + print(f"Epoch {i_e} Global loss: {loss / batch} Acc {correct / batch}") run_epoch() +################################################################################ + if mpi_comm_second.local_rank == 0: print("Mean {} Median {}".format(sum(perf) / len(perf), np.median(perf))) diff --git a/benchmarks/gems_model/benchmark_resnet_gems+spatial.py b/benchmarks/gems_model/benchmark_resnet_gems+spatial.py index 743e2b61..039a66be 100644 --- a/benchmarks/gems_model/benchmark_resnet_gems+spatial.py +++ b/benchmarks/gems_model/benchmark_resnet_gems+spatial.py @@ -1,3 +1,21 @@ +# Copyright 2023, The Ohio State University. All rights reserved. +# The MPI4DL software package is developed by the team members of +# The Ohio State University's Network-Based Computing Laboratory (NBCL), +# headed by Professor Dhabaleswar K. (DK) Panda. +# +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import torch import torch.distributed as dist import torchvision.transforms as transforms @@ -6,6 +24,7 @@ import time import sys import math +import logging from torchgems import parser from torchgems.mp_pipeline import model_generator from torchgems.train_spatial import get_shapes_spatial, split_input @@ -16,12 +35,33 @@ import torchgems.comm as gems_comm from models import resnet +# Example of GEMS + SPATIAL split_size = 2, spatial_size = 1, num_spatial_parts = 4 +# +# split_size = 5, spatial_size = 1, num_spatial_parts = 4 are not valid as ranks 1, 2, 3 are used by spatial parts from both the model. +# Model 1: +# _______________ ____ ____ ____ ____ +# | 0(0) | 1(1) | | | | | | | | | +# |-------|-------|----->|4(4)|----->|5(5)|----->|6(6)|----->|7(7)| +# | 2(2) | 3(3) | | | | | | | | | +# |_______|_______| |____| |____| |____| |____| +# +# Model 2 (INVERSE GEMS): +# _______________ ____ ____ ____ ____ +# | 0(7) | 1(6) | | | | | | | | | +# |-------|-------|----->|4(3)|----->|5(2)|----->|6(1)|----->|7(0)| +# | 2(5) | 3(4) | | | | | | | | | +# |_______|_______| |____| |____| |____| |____| +# +# Numbers inside the brackets () refer to World Rank +# whereas outside numbers refer to local rank for each model parser_obj = parser.get_parser() args = parser_obj.parse_args() +if args.verbose: + logging.basicConfig(level=logging.DEBUG) + if args.halo_d2: - # from models import resnet from models import resnet_spatial_d2 as resnet_spatial else: from models import resnet_spatial @@ -53,57 +93,29 @@ def init_processes(backend="tcp"): return size, rank -def get_depth(version, n): - if version == 1: - return n * 6 + 2 - elif version == 2: - return n * 9 + 2 - - sys.stdout = Unbuffered(sys.stdout) -# Example of GEMS + SPATIAL split_size = 2, spatial_size = 1, num_spatial_parts = 4 -# -# Model 1: -# _______________ ____ -# | 0(0)| 1(1) | | | -# |-------|-------| --------->|4(4)| -# | 2(2) | 3(3) | | | -# |_______|_______| |____| -# -# Model 2 (INVERSE GEMS): -# _______________ ____ -# | 0(4) | 1(3) | | | -# |-------|-------| --------->|4(0)| -# | 2(2) | 3(1) | | | -# |_______|_______| |____| -# -# Numbers inside the brackets () refer to World Rank -# whereas outside numbers refer to local rank for each model - -# torch.set_num_threads(1) np.random.seed(seed=1405) + +ENABLE_ASYNC = True parts = args.parts batch_size = args.batch_size -resnet_n = 12 -epoch = args.num_epochs -ENABLE_ASYNC = True - -# APP -# 1: Medical -# 2: Cifar -# 3: synthetic -APP = 3 +epochs = args.num_epochs image_size = int(args.image_size) -print("image size", image_size) -steps = 100 -num_layers = args.num_layers -num_filters = args.num_filters balance = args.balance split_size = args.split_size spatial_size = args.spatial_size slice_method = args.slice_method +times = args.times +datapath = args.datapath +num_classes = args.num_classes +LOCAL_DP_LP = args.local_DP ENABLE_MASTER_OPT = args.enable_master_comm_opt +# APP +# 1: Medical +# 2: Cifar +# 3: synthetic +APP = args.app temp_num_spatial_parts = args.num_spatial_parts.split(",") @@ -114,10 +126,25 @@ def get_depth(version, n): num_spatial_parts = [int(i) for i in temp_num_spatial_parts] num_spatial_parts_list = num_spatial_parts -times = args.times -num_classes = args.num_classes -LOCAL_DP_LP = args.local_DP +################## ResNet model specific parameters/functions ################## +""" +"image_size_seq" is required to determine the output shape after spatial partitioning of images. +The shape of the output will be determined for each model partition based on the values in "image_size_seq." +These values will then be used to calculate the output shape for a given input size and spatial partition. +""" +image_size_seq = 32 +resnet_n = 12 + + +def get_depth(version, n): + if version == 1: + return n * 6 + 2 + elif version == 2: + return n * 9 + 2 + + +############################################################################### mpi_comm_first = gems_comm.MPIComm( split_size=split_size, @@ -148,12 +175,6 @@ def get_depth(version, n): gems_comm.sync_comms_for_master(mpi_comm_first, mpi_comm_second) comm_size = mpi_comm_first.size -# rank = mpi_comm.local_rank -# comm_size = mpi_comm.size -# local_rank = rank - -# split_rank = mpi_comm.split_rank - if args.balance != None: balance = args.balance.split(",") @@ -161,26 +182,29 @@ def get_depth(version, n): else: balance = None - -image_size_seq = 32 - +# Initialize ResNet model model_seq = resnet.get_resnet_v2( (int(batch_size / parts), 3, image_size_seq, image_size_seq), depth=get_depth(2, resnet_n), ) -print("length", len(model_seq), balance) + model_gen_seq = model_generator( model=model_seq, split_size=split_size, input_size=(int(batch_size / parts), 3, image_size_seq, image_size_seq), balance=balance, ) + +# Get the shape of model on each split rank for image_size_seq and move it to device +# Note : we take shape w.r.t image_size_seq as model w.r.t image_size may not be +# able to fit in memory model_gen_seq.ready_model( split_rank=mpi_comm_second.split_rank, GET_SHAPES_ON_CUDA=True ) image_size_times = int(image_size / image_size_seq) +# Get the shape of model on each split rank for image_size and number of spatial parts resnet_shapes_list = get_shapes_spatial( shape_list=model_gen_seq.shape_list, slice_method=slice_method, @@ -189,13 +213,11 @@ def get_depth(version, n): image_size_times=image_size_times, ) -print(model_gen_seq.shape_list, resnet_shapes_list) - del model_seq del model_gen_seq torch.cuda.ipc_collect() - +# Initialize ResNet model with Spatial and Model Parallelism support if args.halo_d2: model1, balance = resnet_spatial.get_resnet_v2( input_shape=(batch_size / parts, 3, image_size, image_size), @@ -257,9 +279,12 @@ def get_depth(version, n): balance=balance, shape_list=resnet_shapes_list, ) -model_gen1.ready_model(split_rank=mpi_comm_first.split_rank) -# model_gen1.DDP_model(mpi_comm_first, num_spatial_parts, spatial_size, bucket_size=25, local_rank = mpi_comm_first.local_rank) +# Move model it it's repective devices +model_gen1.ready_model(split_rank=mpi_comm_first.split_rank) +logging.info( + f"Shape of model1 on local_rank {mpi_comm_first.local_rank } : {model_gen1.shape_list}" +) model_gen2 = model_generator( model=model2, @@ -268,12 +293,12 @@ def get_depth(version, n): balance=balance, shape_list=resnet_shapes_list, ) -model_gen2.ready_model(split_rank=mpi_comm_second.split_rank) -# model_gen2.DDP_model(mpi_comm_second, num_spatial_parts, spatial_size, bucket_size=25, local_rank = mpi_comm_second.local_rank) - -# model_gen.mp_size = 5 -print("Shape list", resnet_shapes_list) +# Move model it it's repective devices +model_gen2.ready_model(split_rank=mpi_comm_second.split_rank) +logging.info( + f"Shape of model2 on local_rank {mpi_comm_first.local_rank } : {model_gen2.shape_list}" +) t_s_master = train_spatial_model_master( model_gen1, @@ -292,11 +317,7 @@ def get_depth(version, n): replications=int(args.times / 2), ) -x = torch.zeros( - (batch_size, 3, int(image_size / 2), int(image_size / 2)), device="cuda" -) -y = torch.zeros((batch_size,), dtype=torch.long, device="cuda") - +############################## Dataset Definition ############################## transform = transforms.Compose( [transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))] @@ -329,7 +350,7 @@ def get_depth(version, n): pin_memory=True, ) size_dataset = 50000 -else: +elif APP == 3: my_dataset = torchvision.datasets.FakeData( size=10 * batch_size * args.times, image_size=(3, image_size, image_size), @@ -346,38 +367,52 @@ def get_depth(version, n): pin_memory=True, ) size_dataset = 10 * batch_size +else: + transform = transforms.Compose( + [ + transforms.Resize((64, 64)), + transforms.ToTensor(), + transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)), + ] + ) + trainset = torchvision.datasets.ImageFolder( + datapath, + transform=transform, + target_transform=None, + ) + my_dataloader = torch.utils.data.DataLoader( + trainset, + batch_size=times * batch_size, + shuffle=True, + num_workers=0, + pin_memory=True, + ) + size_dataset = len(my_dataloader.dataset) - -# sync_allreduce.sync_model_spatial(model_gen) -perf = [] +################################################################################ sync_comm = gems_comm.SyncAllreduce(mpi_comm_first) +################################# Train Model ################################## -MASTER = args.times - -print("ENABLE_MASTER_OPT", ENABLE_MASTER_OPT) +perf = [] def run_epoch(): - for i_e in range(epoch): + for i_e in range(epochs): loss = 0 correct = 0 t = time.time() - for i, data in enumerate(my_dataloader, 0): + size = len(my_dataloader.dataset) + for batch, data in enumerate(my_dataloader, 0): start_event = torch.cuda.Event(enable_timing=True, blocking=True) end_event = torch.cuda.Event(enable_timing=True, blocking=True) start_event.record() - if i > math.floor(size_dataset / (times * batch_size)) - 1: + if batch > math.floor(size_dataset / (times * batch_size)) - 1: break - # inputs=data_x - # labels = data_y - inputs, labels = data - # inputs = inputs.to(device) - # labels = labels.to(device) + inputs, labels = data - # t= time.time() if mpi_comm_first.local_rank < num_spatial_parts_list[0]: x = split_input( inputs=inputs, @@ -398,14 +433,14 @@ def run_epoch(): x = inputs if ENABLE_MASTER_OPT: - temp_loss, temp_correct = t_s_master.run_step_allreduce( - x, labels, i % 2 == 1 + local_loss, local_correct = t_s_master.run_step_allreduce( + x, labels, batch % 2 == 1 ) else: - temp_loss, temp_correct = t_s_master.run_step(x, labels) + local_loss, local_correct = t_s_master.run_step(x, labels) - loss += temp_loss - correct += temp_correct + loss += local_loss + correct += local_correct start_event_allreduce = torch.cuda.Event(enable_timing=True, blocking=True) end_event_allreduce = torch.cuda.Event(enable_timing=True, blocking=True) @@ -413,21 +448,13 @@ def run_epoch(): t_allreduce_temp = time.time() if ENABLE_MASTER_OPT == False: - print("benchmark_resnet_gems+spatial : START ALL REDUCE OPERATION") sync_comm.apply_allreduce_master_master( model_gen1, model_gen2, mpi_comm_first, mpi_comm_second ) - - """ - if(local_rank < spatial_size * num_spatial_parts): - None - #No need for this as, DDP is now used - # sync_allreduce.apply_allreduce(model_gen,mpi_comm.spatial_allreduce_grp) - """ torch.cuda.synchronize() if ENABLE_MASTER_OPT: - if i % 2 == 1: + if batch % 2 == 1: t_s_master.train_model1.update() else: t_s_master.train_model2.update() @@ -441,8 +468,9 @@ def run_epoch(): t_allreduce = time.time() - t_allreduce_temp if mpi_comm_second.local_rank == comm_size - 1: - None - # print("Step",i," LOSS",temp_loss, " Global loss:",loss/(i+1), " Acc:",temp_correct) + logging.info( + f"Step :{batch}, LOSS: {local_loss}, Global loss: {loss/(batch+1)} Acc: {local_correct} [{batch * len(inputs):>5d}/{size:>5d}]" + ) if ENABLE_MASTER_OPT: torch.distributed.barrier() @@ -451,24 +479,20 @@ def run_epoch(): torch.cuda.synchronize() t = start_event.elapsed_time(end_event) / 1000 if mpi_comm_second.local_rank == 0: - None print( - "images per sec:", - batch_size / t, - "Time:", - t, - " Time Allreduce:", - t_allreduce, + f"Epoch: {i_e} images per sec:{batch_size / t} Time:{t} Time Allreduce:{t_allreduce}" ) perf.append(batch_size / t) t = time.time() if mpi_comm_second.local_rank == comm_size - 1: - print("epoch", i_e, " Global loss:", loss, " acc", correct / i) + print(f"Epoch {i_e} Global loss: {loss / batch} Acc {correct / batch}") run_epoch() +################################################################################ + if mpi_comm_second.local_rank == 0: print("Mean {} Median {}".format(sum(perf) / len(perf), np.median(perf))) diff --git a/benchmarks/spatial_parallelism/benchmark_resnet_sp.py b/benchmarks/spatial_parallelism/benchmark_resnet_sp.py index e4459fc4..7029ade9 100644 --- a/benchmarks/spatial_parallelism/benchmark_resnet_sp.py +++ b/benchmarks/spatial_parallelism/benchmark_resnet_sp.py @@ -106,7 +106,7 @@ def init_processes(backend="mpi"): ################## ResNet model specific parameters/functions ################## """ -"image_size_seq" is required to determine the output shape after spatial partitioning of images. +"image_size_seq" is required to determine the output shape after spatial partitioning of images. The shape of the output will be determined for each model partition based on the values in "image_size_seq." These values will then be used to calculate the output shape for a given input size and spatial partition. """ @@ -129,7 +129,7 @@ def isPowerTwo(num): """ -For ResNet model, image size and image size after partitioning should be power of two. +For ResNet model, image size and image size after partitioning should be power of two. As, ResNet performs convolution operations at different layers, odd input size (i.e. image size which is not power of 2) will lead to truncation of input. Thus, other GPU devices will receive truncated input with unexpected input size. diff --git a/src/torchgems/comm.py b/src/torchgems/comm.py index 6cc9be46..9a5fbf88 100644 --- a/src/torchgems/comm.py +++ b/src/torchgems/comm.py @@ -65,7 +65,6 @@ def __init__( - spatial_size + (split_size - spatial_size) * (LOCAL_DP_LP - 1) ) - print("MP_SIZE : ", self.mp_size) if DISABLE_INIT: self.rank = dist.get_rank() @@ -214,28 +213,19 @@ def create_allreduce_comm_spatial(self): if self.ENABLE_MASTER: for i in range(len(ranks)): - # ranks[i] = self.mp_size - 1 - ranks[i] ranks.append(self.mp_size - 1 - ranks[i]) - print("RANKS:", ranks) + temp_spatial_allreduce_grp = torch.distributed.new_group(ranks=ranks) if self.ENABLE_MASTER: if self.spatial_size == 1 and first_local_rank < self.num_spatial_parts: self.first_spatial_allreduce_grp = temp_spatial_allreduce_grp - print( - "first_spatial_allreduce_grp", self.rank, self.local_rank, ranks - ) + elif ( self.spatial_size == 1 and second_local_rank < self.num_spatial_parts ): self.second_spatial_allreduce_grp = temp_spatial_allreduce_grp - print( - "second_spatial_allreduce_grp", - self.rank, - self.local_rank, - ranks, - ) elif self.spatial_size > 1: if first_local_rank < np.sum( @@ -323,7 +313,6 @@ def sync_comms_for_master(comm1, comm2): # MASTER related communicators are in comm2 first_local_rank = comm1.local_rank second_local_rank = comm2.local_rank - print("sync_comms_for_master", first_local_rank, second_local_rank) if first_local_rank < comm1.total_spatial_processes: comm1.spatial_allreduce_grp = comm2.first_spatial_allreduce_grp diff --git a/src/torchgems/mp_pipeline.py b/src/torchgems/mp_pipeline.py index b0b5479c..2cdb16ab 100644 --- a/src/torchgems/mp_pipeline.py +++ b/src/torchgems/mp_pipeline.py @@ -437,7 +437,6 @@ def forward_pass(self, data_x, data_y, part_number=0): # part_number: part number between 0 and self.parts-1 used to find right input recv buffer # Receive inputs if local is not 0 - print("mp_pipeline:forward_pass: START", data_x.size(), data_y.size()) if self.split_rank == 0: input_x = data_x else: @@ -458,11 +457,6 @@ def forward_pass(self, data_x, data_y, part_number=0): torch.cuda.synchronize() - print( - "mp_pipeline:forward_pass: SEND_INPUT_OR_CAL_LOSS", - data_x.size(), - data_y.size(), - ) if self.split_rank != self.split_size - 1: if self.ENABLE_ASYNC == True: self.send_input_async(y) @@ -471,7 +465,7 @@ def forward_pass(self, data_x, data_y, part_number=0): else: loss = self.criterion(y, data_y) - print("mp_pipeline:forward_pass: END", data_x.size(), data_y.size()) + if self.split_rank == self.split_size - 1: corrects = (data_y.eq(torch.argmax(y, dim=-1).long())).sum().float() return loss, corrects / self.batch_size @@ -524,62 +518,19 @@ def run_step(self, data_x, data_y): for i in range(self.parts): start = i * parts_size end = (i + 1) * parts_size - print( - "mp_pipeline:train_model:run_step : START FORWARD PASS", - " rank :", - self.local_rank, - " inverse : ", - self.GEMS_INVERSE, - self.parts, - start, - end, - ) temp_y, temp_correct = self.forward_pass( data_x[start:end], data_y[start:end], part_number=i ) - - print( - "mp_pipeline:train_model:run_step : END FORWARD PASS", - " rank :", - self.local_rank, - " inverse : ", - self.GEMS_INVERSE, - self.parts, - start, - end, - ) - y_list.append(temp_y) if self.split_rank == self.split_size - 1: loss += temp_y.item() corrects += temp_correct.item() - print( - "mp_pipeline:train_model:run_step : START BACKWARD PASS", - " rank :", - self.local_rank, - " inverse : ", - self.GEMS_INVERSE, - self.parts, - start, - end, - ) for i in range(self.parts): None self.backward_pass(y_list[i], part_number=i) - print( - "mp_pipeline:train_model:run_step : END BACKWARD PASS", - " rank :", - self.local_rank, - " inverse : ", - self.GEMS_INVERSE, - self.parts, - start, - end, - ) - return loss, corrects def update(self): diff --git a/src/torchgems/train_spatial.py b/src/torchgems/train_spatial.py index 9f6c0a1e..7f09d18b 100644 --- a/src/torchgems/train_spatial.py +++ b/src/torchgems/train_spatial.py @@ -685,16 +685,10 @@ def receive_input_async_joint(self, part_number, ranks): ranks = [ self.local_rank - 1 - i for i in range(self.num_spatial_parts - 1, -1, -1) ] - print("receive_input_async_joint", " rank : ", self.local_rank, ranks) + if self.GEMS_INVERSE: for i in range(len(ranks)): ranks[i] = self.mp_size - 1 - ranks[i] - print( - "receive_input_async_joint if self.GEMS_INVERSE", - " rank : ", - self.local_rank, - ranks, - ) reqs = [] @@ -1257,54 +1251,27 @@ def forward_pass(self, data_x, data_y, part_number=0): # data_x: input data # data_y: labels # part_number: part number between 0 and self.parts-1 used to find right input recv buffer - print( - "train_spatial:forward_pass: START", - data_x.size(), - data_y.size(), - self.local_rank, - self.GEMS_INVERSE, - "self.ENABLE_ASYNC", - self.ENABLE_ASYNC, - "self.split_rank", - self.split_rank, - "self.spatial_size", - self.spatial_size, - " self.total_spatial_processes", - self.total_spatial_processes, - ) + # Receive inputs if local is not 0 if self.split_rank == 0: input_x = data_x else: if self.ENABLE_ASYNC == True: - print("self.ENABLE_ASYNC == True") if self.split_rank == self.spatial_size: if self.ENABLE_LOCAL_DP_LP: - print( - "Calling recv_input_MP_joint_LP_DP", - " rank : ", - self.local_rank, - ) self.recv_input_MP_joint_LP_DP(part_number) else: - print("Calling recv_inputs_joint", " rank : ", self.local_rank) self.recv_inputs_joint(part_number) elif self.SKEWED_RECV_SPATIAL: - print("Calling recv_input_spatial", " rank : ", self.local_rank) self.recv_input_spatial(part_number) else: - print("Calling receive_input_async", " rank : ", self.local_rank) self.receive_input_async(part_number) else: - print("self.ENABLE_ASYNC == False") if self.local_rank == self.total_spatial_processes: - print("Calling recv_inputs_joint", " rank : ", self.local_rank) self.recv_inputs_joint(part_number) elif self.SKEWED_RECV_SPATIAL: - print("Calling recv_input_spatial", " rank : ", self.local_rank) self.recv_input_spatial(part_number) else: - print("Calling receive_input_sync", " rank : ", self.local_rank) self.receive_input_sync(part_number) # join spatial inputs @@ -1318,13 +1285,6 @@ def forward_pass(self, data_x, data_y, part_number=0): else: input_x = self.input_x_list[part_number] - print( - "train_spatial:forward_pass: RECEIVED INPUTS", - " rank :", - self.local_rank, - self.GEMS_INVERSE, - ) - # Apply forward pass torch.cuda.synchronize() @@ -1336,92 +1296,24 @@ def forward_pass(self, data_x, data_y, part_number=0): ) and part_number != self.parts - 1 ): - print( - "train_spatial:forward_pass: DP", - " rank :", - self.local_rank, - self.GEMS_INVERSE, - part_number, - self.parts, - ) with self.models.no_sync(): - # print("MODEL :", " rank ", self.local_rank, self.models) y = self.models(input_x) else: - print( - "train_spatial:forward_pass: no_DP", - " rank :", - self.local_rank, - self.GEMS_INVERSE, - part_number, - self.parts, - ) - # print("MODEL :", " rank ", self.local_rank, self.models) - num_gpus = torch.cuda.device_count() - - # Print information about each GPU device - for i in range(num_gpus): - gpu_properties = torch.cuda.get_device_properties(i) - print(f"GPU {i}: {gpu_properties.name}") - print(f" Total Memory: {gpu_properties.total_memory / (1024 ** 2)} MB") - y = self.models(input_x) - # NVMN -?GPU MEMORY UTILIZATIOON - print( - "train_spatial:forward_pass: DONE_MODEL_TRAIN", - " rank :", - self.local_rank, - self.GEMS_INVERSE, - ) torch.cuda.synchronize() - print( - "train_spatial:forward_pass: CALCULATED_Y", - " rank :", - self.local_rank, - self.GEMS_INVERSE, - ) + if self.split_rank != self.split_size - 1: if self.ENABLE_ASYNC == True: if self.split_rank == self.spatial_size - 1 and self.ENABLE_LOCAL_DP_LP: - print( - "train_spatial:forward_pass: calling self.ENABLE_ASYNC send_input_spatial_MP_joint_LP_DP", - " rank :", - self.local_rank, - self.GEMS_INVERSE, - ) self.send_input_spatial_MP_joint_LP_DP(y) else: - print( - "train_spatial:forward_pass: calling self.ENABLE_ASYNC send_input_async", - " rank :", - self.local_rank, - self.GEMS_INVERSE, - ) self.send_input_async(y) else: if self.split_rank == self.spatial_size - 1 and self.ENABLE_LOCAL_DP_LP: - print( - "train_spatial:forward_pass: calling send_input_spatial_MP_joint_LP_DP", - " rank :", - self.local_rank, - self.GEMS_INVERSE, - ) self.send_input_spatial_MP_joint_LP_DP(y) else: - print( - "train_spatial:forward_pass: calling send_input_sync", - " rank :", - self.local_rank, - self.GEMS_INVERSE, - ) self.send_input_sync(y) - print( - "train_spatial:forward_pass: SENT_Y", - " rank :", - self.local_rank, - self.GEMS_INVERSE, - ) else: pos = self.local_rank - (self.mp_size - self.LOCAL_DP_LP) @@ -1441,20 +1333,6 @@ def forward_pass(self, data_x, data_y, part_number=0): else: loss = self.criterion(y, data_y) - print( - "train_spatial:forward_pass: CALCULATED_LOSS", - " rank :", - self.local_rank, - self.GEMS_INVERSE, - ) - - print( - "train_spatial:forward_pass: END", - " rank :", - self.local_rank, - self.GEMS_INVERSE, - ) - if self.split_rank == self.split_size - 1: corrects = (data_y.eq(torch.argmax(y, dim=-1).long())).sum().float() return loss, corrects / self.batch_size diff --git a/src/torchgems/train_spatial_master.py b/src/torchgems/train_spatial_master.py index 6c5bad2d..f0b37135 100644 --- a/src/torchgems/train_spatial_master.py +++ b/src/torchgems/train_spatial_master.py @@ -27,8 +27,7 @@ def isPowerTwo(num): """ -TBD : Update comments -For SP+MASTER, image size and image size after partitioning should be power of two. +For SP, image size and image size after partitioning should be power of two. As, while performing convolution operations at different layers, odd input size (i.e. image size which is not power of 2) will lead to truncation of input. Thus, other GPU devices will receive truncated input with unexpected input size. @@ -462,25 +461,21 @@ def run_step_allreduce(self, inputs, labels, odd_iteration): def run_step(self, inputs, labels): loss, correct = 0, 0 # torch.cuda.empty_cache() - print("START RUN_STEP MODEL1", "rank ", self.local_rank) self.train_model1.models = self.train_model1.models.to("cuda") temp_loss, temp_correct = self.train_model1.run_step( inputs[: self.batch_size], labels[: self.batch_size] ) - print("END RUN_STEP MODEL1", "rank ", self.local_rank) loss += temp_loss correct += temp_correct torch.cuda.empty_cache() - print("START RUN_STEP MODEL2", "rank ", self.local_rank) self.train_model1.models = self.train_model1.models.to("cpu") self.train_model2.models = self.train_model2.models.to("cuda") temp_loss, temp_correct = self.train_model2.run_step( inputs[self.batch_size : 2 * self.batch_size], labels[self.batch_size : 2 * self.batch_size], ) - print("END RUN_STEP MODEL2", "rank ", self.local_rank) self.train_model2.models = self.train_model2.models.to("cpu") torch.cuda.empty_cache() @@ -488,26 +483,20 @@ def run_step(self, inputs, labels): loss += temp_loss correct += temp_correct - print("Calculated loss and accuracy for MODEL1 AND MODEL2") - torch.cuda.synchronize() for times in range(self.replications - 1): index = (2 * times) + 2 - print("Times :", times) - print("START RUN_STEP MODEL1") temp_loss, temp_correct = self.train_model1.run_step( inputs[index * self.batch_size : (index + 1) * self.batch_size], labels[index * self.batch_size : (index + 1) * self.batch_size], ) - print("END RUN_STEP MODEL1") loss += temp_loss correct += temp_correct - print("START RUN_STEP MODEL2") + temp_loss, temp_correct = self.train_model2.run_step( inputs[(index + 1) * self.batch_size : (index + 2) * self.batch_size], labels[(index + 1) * self.batch_size : (index + 2) * self.batch_size], ) - print("END RUN_STEP MODEL2") loss += temp_loss correct += temp_correct