diff --git a/deepspeed/runtime/data_pipeline/data_sampling/data_analyzer.py b/deepspeed/runtime/data_pipeline/data_sampling/data_analyzer.py index 5da11097319d..7088df223bd8 100644 --- a/deepspeed/runtime/data_pipeline/data_sampling/data_analyzer.py +++ b/deepspeed/runtime/data_pipeline/data_sampling/data_analyzer.py @@ -12,7 +12,7 @@ import torch from torch.utils.data import BatchSampler, SequentialSampler, DataLoader, Subset -from deepspeed.utils import logger, groups +from deepspeed.utils import logger import deepspeed.comm as dist from deepspeed.runtime.data_pipeline.data_sampling.indexed_dataset import MMapIndexedDataset, valid_dtypes from deepspeed.runtime.data_pipeline.data_sampling.utils import split_dataset, split_index, create_mmap_dataset_builder, close_mmap_dataset_builder, find_fit_int_dtype @@ -482,17 +482,17 @@ def __init__( dist.init_distributed() # comm_group and worker_id+num_workers are mutually exclusive - if comm_group is not None: - self.comm_group = comm_group - self.num_workers = self.comm_group.size() - self.worker_id = self.comm_group.rank() + self.comm_group = comm_group + if self.comm_group is None: + # self.comm_group = deepspeed.utils.groups._clone_world_group() + self.num_workers = num_workers + self.worker_id = worker_id else: - self.comm_group = groups._clone_world_group() self.num_workers = self.comm_group.size() self.worker_id = self.comm_group.rank() if self.worker_id == 0: - logger.info(f"Data analyzer initialized with {self.num_workers} workers.") + logger.info(f"Distributed data analyzer initialized with {self.num_workers} workers.") def run_map_reduce(self): @@ -635,9 +635,18 @@ def file_write_ordered(self, tensor_list, fname, numpy_dtype): # method to deserializes a buffer into rows of different lengths and write them to file def write_buffer_to_file(buff, src, builder): assert self.worker_id == 0, "only rank 0 can write to file" + + # # write one buffer at a time + # for row_len in row_lens[src]: + # builder.add_item(buff[:row_len].cpu()) + # buff = buff[row_len:] + + # collect all buffers and write them all at once + buffer_list = [] for row_len in row_lens[src]: - builder.add_item(buff[:row_len].cpu()) + buffer_list.append(buff[:row_len].cpu()) buff = buff[row_len:] + builder.add_items(buffer_list) # 5. rank 0 prepares output folder and file if self.worker_id == 0: diff --git a/deepspeed/runtime/data_pipeline/data_sampling/indexed_dataset.py b/deepspeed/runtime/data_pipeline/data_sampling/indexed_dataset.py index 7a6963bc27eb..453e6ba6039d 100644 --- a/deepspeed/runtime/data_pipeline/data_sampling/indexed_dataset.py +++ b/deepspeed/runtime/data_pipeline/data_sampling/indexed_dataset.py @@ -581,10 +581,18 @@ def __init__(self, out_file, dtype=np.int64): self._doc_idx = [0] def add_item(self, tensor): + """ write the tensor to the file and update its size in the index""" np_array = np.array(tensor.numpy(), dtype=self._dtype) self._data_file.write(np_array.tobytes(order='C')) self._sizes.append(np_array.size) + def add_items(self, tensor_list): + """ write a list of tensors to the file and update their sizes in the index""" + np_arrays = [np.array(t.numpy(), dtype=self._dtype) for t in tensor_list] + self._data_file.writelines([arr.tobytes(order='C') for arr in np_arrays]) + for arr in np_arrays: + self._sizes.append(arr.size) + def add_item_numpy(self, np_array): if np_array.dtype != self._dtype: np_array = np_array.astype(self._dtype)