Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preprocessing at scale #291

Merged
merged 17 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions src/modalities/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from modalities.api import (
convert_pytorch_to_hf_checkpoint,
create_raw_data_index,
create_shuffled_dataset_chunk,
generate_text,
merge_packed_data_files,
pack_encoded_data,
Expand Down Expand Up @@ -163,6 +164,53 @@ def CMD_entry_point_pack_encoded_data(config_path: FilePath):
pack_encoded_data(config_dict=config_dict)


@data.command(name="create_shuffled_dataset_chunk")
@click.option(
"--input_file_list_path",
type=Path,
required=True,
help="Path to the file containing the list of files to be chunked.",
)
@click.option(
"--output_chunk_file_path",
type=Path,
required=True,
help="Path where the chunked dataset will be saved.",
)
@click.option(
"--chunk_id",
type=int,
required=True,
help="The id of the chunk to be created.",
)
@click.option(
"--num_chunks",
type=int,
required=True,
help="The number of chunks to create.",
)
@click.option(
"--vocab_size",
type=int,
required=True,
help="The size of the vocabulary.",
)
def CMD_create_shuffled_dataset_chunk(
input_file_list_path: Path, output_chunk_file_path: Path, chunk_id: int, num_chunks: int, vocab_size: int
):
with open(input_file_list_path, "r", encoding="utf-8") as f:
file_path_list = f.readlines()
file_path_list = [Path(file_path.strip()) for file_path in file_path_list]

create_shuffled_dataset_chunk(
file_path_list=file_path_list,
output_chunk_file_path=output_chunk_file_path,
chunk_id=chunk_id,
num_chunks=num_chunks,
vocab_size=vocab_size,
)


@data.command(name="merge_packed_data")
@click.argument("src_paths", type=click.types.Path(exists=True, path_type=Path), nargs=-1, required=True)
@click.argument("target_path", type=click.types.Path(file_okay=False, dir_okay=False, path_type=Path))
Expand Down
54 changes: 54 additions & 0 deletions src/modalities/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from enum import Enum
from pathlib import Path

import numpy as np
from pydantic import FilePath

import modalities.inference.inference as inference
Expand All @@ -12,7 +13,10 @@
from modalities.config.instantiation_models import PackedDatasetComponentsInstantiationModel
from modalities.dataloader.create_index import IndexGenerator
from modalities.dataloader.create_packed_data import EmbeddedStreamData, PackedDataGenerator, join_embedded_stream_data
from modalities.dataloader.dataset import PackedMemMapDatasetBase
from modalities.dataloader.large_file_lines_reader import LargeFileLinesReader
from modalities.dataloader.preprocessing.chunking.create_chunks import Chunking
from modalities.dataloader.preprocessing.tokenization.tokenized_file_writer import TokenizedFileWriter
from modalities.models.huggingface_adapters.hf_adapter import HFModelAdapter
from modalities.registry.components import COMPONENTS
from modalities.registry.registry import Registry
Expand Down Expand Up @@ -91,6 +95,56 @@ def convert_pytorch_to_hf_checkpoint(
return hf_model


def create_shuffled_dataset_chunk(
file_path_list: list[Path],
output_chunk_file_path: Path,
chunk_id: int,
num_chunks: int,
vocab_size: int,
shuffle: bool = True,
):
"""Creates a shuffled dataset chunk.
Given a dataset consisting of multiple tokenized pbin files, this function
le1nux marked this conversation as resolved.
Show resolved Hide resolved
creates a shuffled dataset chunk for a given chunk id.
From each tokenized pbin file, the respective chunk is extracted, shuffled
and written to a new pbin file.

Args:
file_path_list (list[Path]): List of paths to the tokenized input pbin files.
output_chunk_file_path (Path): Path to the output chunk which will be stored in pbin format.
chunk_id (int): The id of the chunk to create.
num_chunks (int): The total number of chunks to create.
vocab_size (int): The size of the vocabulary.
shuffle (bool, optional): Flag indicating whether we want to shuffle the chunk. Defaults to True.

Raises:
ValueError: _description_
"""
samples = []
for file_path in file_path_list:
dataset = PackedMemMapDatasetBase(raw_data_path=file_path, sample_key="text", load_index=True)
file_samples: list[np.ndarray] = Chunking.get_file_chunk(
dataset=dataset, num_chunks=num_chunks, chunk_id=chunk_id
)
samples.extend(file_samples)

if len(samples) == 0:
raise ValueError(
f"Chunk {chunk_id} has no samples. Please decrease the number of chunks to less than {chunk_id}."
)

# samples are shuffled in place
if shuffle:
Chunking.shuffle_file_chunks_in_place(file_chunks=samples)

token_size_in_bytes = TokenizedFileWriter.get_required_num_of_bytes_to_repr(int_to_get_repr=vocab_size)
TokenizedFileWriter.write_tokenized_dataset(
tokenized_dataset=samples,
tokenized_dataset_file_path=output_chunk_file_path,
token_size_in_bytes=token_size_in_bytes,
)


def pack_encoded_data(config_dict: dict):
"""Packs and encodes an indexed, large jsonl-file.
(see also `create_index` for more information)
Expand Down
10 changes: 8 additions & 2 deletions src/modalities/dataloader/create_packed_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def _get_required_num_of_bytes_to_repr(int_to_get_repr: int) -> int:

def _encoded_token_to_bytes(self, encoded_token: int) -> bytes:
"""
Converts an encoded token to its byte representaion.
Converts an encoded token to its byte representation.

Args:
encoded_token (int): The encoded token to be converted.
Expand All @@ -108,7 +108,13 @@ def _encoded_token_to_bytes(self, encoded_token: int) -> bytes:
bytes: The byte representation of the token.

"""
return encoded_token.to_bytes(self._token_size_in_bytes, byteorder="little", signed=False)
try:
token_bytes = encoded_token.to_bytes(self._token_size_in_bytes, byteorder="little", signed=False)
except OverflowError as e:
raise ValueError(
f"Token {encoded_token} cannot be represented by {self._token_size_in_bytes} bytes."
) from e
return token_bytes

def _default_destination_path(self, destination_path: Optional[Path] = None) -> Path:
"""
Expand Down
55 changes: 37 additions & 18 deletions src/modalities/dataloader/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,6 @@ def __init__(self, raw_data_path: Path, sample_key: str):
self.raw_data_path = raw_data_path
self.sample_key = sample_key

def _check_if_inbounds(self, idx: int):
# check if the provided index is within the bounds of the dataset.
if not 0 <= idx < len(self):
raise IndexError


class DummySampleDataType(str, Enum):
"""
Expand Down Expand Up @@ -239,6 +234,10 @@ def __init__(self, raw_data_path: Path, sample_key: str, load_index: Optional[bo
) from e
self._index = self._generate_packing_index()

@property
def token_size_in_bytes(self) -> int:
return self._token_size_in_bytes

def _generate_packing_index(self) -> list[tuple[int, int]]:
# Generates the packing index for the dataset.
# The index is list of tuples, where each tuple contains the offset and length in bytes.
Expand All @@ -254,40 +253,60 @@ def __len__(self) -> int:
"""
return len(self._index)

def __getitem__(self, idx: int) -> BatchEncoding:
def __getitem__(self, idx: int | slice) -> BatchEncoding:
"""
Retrieves the item at the given index.
Retrieves the item at the given index or a slice of items.

Args:
idx (int): The index of the item to retrieve.
idx (int | sclice): The index of the item to retrieve or slice of items.

Returns:
BatchEncoding: The retrieved item as a BatchEncoding object.

Raises:
ValueError: If the length of the sample in bytes is not a multiple of `self._token_size_in_bytes`.
"""
self._check_if_inbounds(idx)
# offset and length in bytes
offset_in_bytes, length_in_bytes = self._index[idx]
if length_in_bytes % self._token_size_in_bytes != 0:
raise ValueError(
f"Length of the sample in bytes is not a multiple of {self._token_size_in_bytes}."
f"Offset in bytes: {offset_in_bytes}, Length in bytes: {length_in_bytes}"
)

if not isinstance(idx, slice):
# (offset_in_bytes, length_in_bytes)
item_positions: list[tuple[int, int]] = [self._index[idx]]
le1nux marked this conversation as resolved.
Show resolved Hide resolved
else:
if idx.step is not None and idx.step != 1:
raise ValueError("Slicing with step != 1 is not supported.")
item_positions = self._index[idx]

if len(item_positions) == 0:
le1nux marked this conversation as resolved.
Show resolved Hide resolved
return BatchEncoding(data={self.sample_key: []})

# numpy frombuffer takes the memmap object as the buffer
# and indices the data section with the given offset (in bytes)
# and length in indices of type self._token_dtype_on_disk
num_bytes_stop = item_positions[-1][0] + item_positions[-1][1]
num_bytes_start = item_positions[0][0]
length_in_bytes = num_bytes_stop - num_bytes_start
num_tokens = length_in_bytes // self._token_size_in_bytes
tokens = np.frombuffer(
buffer=self._embedded_stream_data.data,
dtype=self._token_dtype_on_disk,
count=num_tokens,
offset=offset_in_bytes,
offset=num_bytes_start,
)
# torch can't convert most uint-formats, therefore we infer regular int types
tokens = tokens.astype(self._token_dtype_in_ram)
return BatchEncoding(data={self.sample_key: tokens})

documents = []
for offset_in_bytes, length_in_bytes in item_positions:
token_start = (offset_in_bytes - num_bytes_start) // self._token_size_in_bytes
token_end = (offset_in_bytes + length_in_bytes - num_bytes_start) // self._token_size_in_bytes
documents.append(tokens[token_start:token_end])

# TODO: the return type is inconsistent here.
# If idx is an integer, we return a BatchEncoding with a single document.
# If idx is a slice, we return a BatchEncoding with a list of documents.
if not isinstance(idx, slice):
return BatchEncoding(data={self.sample_key: documents[0]})
else:
return BatchEncoding(data={self.sample_key: documents})


class PackedMemMapDatasetContinuous(PackedMemMapDatasetBase):
Expand Down
Empty file.
Empty file.
51 changes: 51 additions & 0 deletions src/modalities/dataloader/preprocessing/chunking/create_chunks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import math

import numpy as np

from modalities.dataloader.dataset import PackedMemMapDatasetBase


class Chunking:
@staticmethod
def _get_chunk_range(num_chunks: int, num_samples: int, chunk_id: int) -> list[int]:
if num_chunks == 0:
raise ValueError("Number of chunks must be greater than 0.")
if chunk_id >= num_chunks:
raise ValueError("Chunk ID must be less than the number of chunks.")

# get the maximum chunk size given the number of samples and number of chunks
chunk_size_complete = math.ceil(num_samples / num_chunks)

# the number of complete chunks, i.e., the chunks having the maximum chunk size
num_complete_chunks = num_samples % num_chunks
if num_complete_chunks == 0:
num_complete_chunks = num_chunks

# Calculate the start and end index of the chunk
# The first num_complete_chunks chunks have the maximum chunk size and the
# remaining ones have chunk_size_complete - 1
# If the chunk_id is larger than num_complete_chunks, we need calculate the starting position of the chunk
# by adding chunk_id many offsets of size chunk_size_complete and (chunk_id - num_complete_chunks) many
# offsets of size chunk_size_complete - 1
start = chunk_size_complete * min(num_complete_chunks, chunk_id) + max((chunk_id - num_complete_chunks), 0) * (
le1nux marked this conversation as resolved.
Show resolved Hide resolved
chunk_size_complete - 1
)

if chunk_id < num_complete_chunks:
end = start + chunk_size_complete
else:
end = start + chunk_size_complete - 1

return [start, end]

@staticmethod
def get_file_chunk(dataset: PackedMemMapDatasetBase, num_chunks: int, chunk_id: int) -> list[np.ndarray]:
chunk_range = Chunking._get_chunk_range(num_chunks=num_chunks, num_samples=len(dataset), chunk_id=chunk_id)
if chunk_range[0] == chunk_range[1]:
return []
chunk = dataset[chunk_range[0] : chunk_range[1]][dataset.sample_key]
return chunk

@staticmethod
def shuffle_file_chunks_in_place(file_chunks: list[np.ndarray]):
np.random.shuffle(file_chunks)
Empty file.
Loading
Loading