Skip to content

Commit

Permalink
feat: integrated FileExistsPolicy
Browse files Browse the repository at this point in the history
  • Loading branch information
le1nux committed Jan 27, 2025
1 parent 68a49e7 commit 8fa3f20
Show file tree
Hide file tree
Showing 2 changed files with 153 additions and 23 deletions.
91 changes: 82 additions & 9 deletions src/modalities/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,19 @@
from pydantic import BaseModel, FilePath

from modalities.api import (
FileExistencePolicy,
convert_pytorch_to_hf_checkpoint,
create_raw_data_index,
create_shuffled_dataset_chunk,
generate_text,
merge_packed_data_files,
pack_encoded_data,
shuffle_tokenized_data,
)
from modalities.batch import EvaluationResultBatch
from modalities.config.component_factory import ComponentFactory
from modalities.config.config import ProcessGroupBackendType, load_app_config_dict
from modalities.config.instantiation_models import TrainingComponentsInstantiationModel, TrainingReportGenerator
from modalities.dataloader.shuffle_tokenized_data import shuffle_tokenized_data
from modalities.evaluator import Evaluator
from modalities.gym import Gym
from modalities.logging_broker.message_broker import MessageBroker
Expand Down Expand Up @@ -133,7 +134,13 @@ def data():
default=None,
help="output path for index. will use parent directory of src_path if none.",
)
def CMD_entry_point_data_create_raw_index(src_path: Path, index_path: Path):
@click.option(
"--file_existence_policy",
type=click.Choice([policy.value for policy in FileExistencePolicy]),
default=FileExistencePolicy.ERROR.value,
help="Policy for handling existing files.",
)
def CMD_entry_point_data_create_raw_index(src_path: Path, index_path: Path, file_existence_policy: FileExistencePolicy):
"""Utility CMD for indexing the content of a large jsonl-file.
Background is the ability to further process the respective file without loading it,
while splitting its content line-based. This step is necessary in advance of further processing like tokenization.
Expand All @@ -142,27 +149,37 @@ def CMD_entry_point_data_create_raw_index(src_path: Path, index_path: Path):
Args:
src_path (Path): The path to the jsonl-file.
index_path (Path): The path to the index file, that will be created.
file_existence_policy (FileExistencePolicy): Policy for handling existing files.
Raises:
ValueError: If the index file already exists.
"""
create_raw_data_index(src_path=src_path, index_path=index_path)
file_existence_policy = FileExistencePolicy(file_existence_policy)
create_raw_data_index(src_path=src_path, index_path=index_path, file_existence_policy=file_existence_policy)


@data.command(name="pack_encoded_data")
@click.argument("config_path", type=FilePath)
def CMD_entry_point_pack_encoded_data(config_path: FilePath):
@click.option(
"--file_existence_policy",
type=click.Choice([policy.value for policy in FileExistencePolicy]),
default=FileExistencePolicy.ERROR.value,
help="Policy for handling existing files.",
)
def CMD_entry_point_pack_encoded_data(config_path: FilePath, file_existence_policy: FileExistencePolicy):
"""Utility to encode an indexed, large jsonl-file.
(see also `create_index` for more information)
Returns .pbin-file, which can be inserted into a training process directly
and does not require its original jsonl-file or the respective index file anymore.
Args:
config_path (FilePath): Path to the config file describing the tokenization setup.
file_existence_policy (FileExistencePolicy): Policy for handling existing files.
"""
file_existence_policy = FileExistencePolicy(file_existence_policy)
config_dict = load_app_config_dict(config_path)

pack_encoded_data(config_dict=config_dict)
pack_encoded_data(config_dict=config_dict, file_existence_policy=file_existence_policy)


@data.command(name="create_shuffled_dataset_chunk")
Expand Down Expand Up @@ -196,9 +213,40 @@ def CMD_entry_point_pack_encoded_data(config_path: FilePath):
required=True,
help="The size of the vocabulary.",
)
@click.option(
"--file_existence_policy",
type=click.Choice([policy.value for policy in FileExistencePolicy]),
default=FileExistencePolicy.ERROR.value,
help="Policy for handling existing files.",
)
@click.option(
"--global_seed",
type=int,
default=None,
help="The global seed to use for shuffling.",
)
def CMD_create_shuffled_dataset_chunk(
input_file_list_path: Path, output_chunk_file_path: Path, chunk_id: int, num_chunks: int, vocab_size: int
input_file_list_path: Path,
output_chunk_file_path: Path,
chunk_id: int,
num_chunks: int,
vocab_size: int,
file_existence_policy: FileExistencePolicy,
global_seed: int,
):
"""Utility to create a dataset chunk from a list of shuffled and tokenized pbin files.
Args:
input_file_list_path (Path): Relative file path to the list of files to be chunked.
output_chunk_file_path (Path): File path to the chunked dataset.
chunk_id (int): The id of the chunk to be created.
num_chunks (int): Number of chunks in total.
vocab_size (int): The size of the vocabulary.
file_existence_policy (FileExistencePolicy): Policy for handling existing files.
global_seed (int): The global seed to use for shuffling.
"""
file_existence_policy = FileExistencePolicy(file_existence_policy)

with open(input_file_list_path, "r", encoding="utf-8") as f:
file_path_list = f.readlines()
file_path_list = [Path(file_path.strip()) for file_path in file_path_list]
Expand All @@ -209,6 +257,8 @@ def CMD_create_shuffled_dataset_chunk(
chunk_id=chunk_id,
num_chunks=num_chunks,
vocab_size=vocab_size,
file_existence_policy=file_existence_policy,
global_seed=global_seed,
)


Expand Down Expand Up @@ -246,18 +296,41 @@ def CMD_entry_point_merge_packed_data(src_paths: list[Path], target_path: Path):
@click.option(
"--batch-size", type=int, default=100, show_default=True, help="Number of documents to process per batch."
)
def CMD_shuffle_tokenized_data(input_data_path: Path, output_data_path: Path, batch_size: int) -> None:
@click.option(
"--file_existence_policy",
type=click.Choice([policy.value for policy in FileExistencePolicy]),
default=FileExistencePolicy.ERROR.value,
help="Policy for handling existing files.",
)
@click.option(
"--seed",
type=int,
default=None,
help="The seed for shuffling the data.",
)
def CMD_shuffle_tokenized_data(
input_data_path: Path, output_data_path: Path, batch_size: int, file_existence_policy, seed: int
) -> None:
"""Entrypoint for shuffling tokenized data.
Args:
input_data_path (Path): The path to the input tokenized data (.pbin).
output_data_path (Path): Path to write the shuffled tokenized data (.pbin).
batch_size (int): The size of the batches to shuffle.
file_existence_policy (FileExistencePolicy): Policy for handling existing files.
seed (int): The seed for shuffling the data.
Returns:
None
"""
shuffle_tokenized_data(input_data_path=input_data_path, output_data_path=output_data_path, batch_size=batch_size)
file_existence_policy = FileExistencePolicy(file_existence_policy)

shuffle_tokenized_data(
input_data_path=input_data_path,
output_data_path=output_data_path,
batch_size=batch_size,
file_existence_policy=file_existence_policy,
seed=seed,
)


class Main:
Expand Down
85 changes: 71 additions & 14 deletions src/modalities/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from pathlib import Path

import numpy as np
from data_quality_ablations.utils.seeding import calculate_seed
from pydantic import FilePath

import modalities.inference.inference as inference
Expand All @@ -17,6 +18,7 @@
from modalities.dataloader.large_file_lines_reader import LargeFileLinesReader
from modalities.dataloader.preprocessing.chunking.create_chunks import Chunking
from modalities.dataloader.preprocessing.tokenization.tokenized_file_writer import TokenizedFileWriter
from modalities.dataloader.shuffle_tokenized_data import TokenizedDataShuffler
from modalities.models.huggingface_adapters.hf_adapter import HFModelAdapter
from modalities.registry.components import COMPONENTS
from modalities.registry.registry import Registry
Expand All @@ -29,6 +31,32 @@ class FileExistencePolicy(Enum):
OVERRIDE = "override"


def enforce_file_existence_policy(file_path: Path, file_existence_policy: FileExistencePolicy) -> bool:
"""Enforces the file existence policy. Function returns True, if processing should continue. Otherwise False.
Args:
file_path (Path): File path to the file to check.
file_existence_policy (FileExistencePolicy): The file existence policy.
Raises:
ValueError: Raised if the file existence policy is unknown or the policy requires to raise a ValueError.
Returns:
bool: True if processing should continue, otherwise False.
"""
if file_existence_policy == FileExistencePolicy.SKIP:
get_logger(name="main").warning(f"File already exists at {str(file_path)}. Skipping ...")
return False
elif file_existence_policy == FileExistencePolicy.OVERRIDE:
get_logger(name="main").warning(f"File already exists at {str(file_path)}. Overriding it.")
os.remove(file_path)
return True
elif file_existence_policy == FileExistencePolicy.ERROR:
raise ValueError("File already exists. Delete it or specify different output folder.")
else:
raise ValueError(f"Unknown file existence policy: {file_existence_policy}")


def create_raw_data_index(
src_path: Path, index_path: Path, file_existence_policy: FileExistencePolicy = FileExistencePolicy.ERROR
):
Expand All @@ -47,16 +75,8 @@ def create_raw_data_index(
"""
index_path = LargeFileLinesReader.default_index_path(src_path, index_path)
if index_path.exists():
if file_existence_policy == FileExistencePolicy.SKIP:
get_logger(name="main").warning(f"Index already exists at {str(index_path)}. Skipping index creation.")
if not enforce_file_existence_policy(index_path, file_existence_policy):
return
elif file_existence_policy == FileExistencePolicy.OVERRIDE:
get_logger(name="main").warning(f"Index already exists at {str(index_path)}. Overriding it.")
os.remove(index_path)
elif file_existence_policy == FileExistencePolicy.ERROR:
raise ValueError("index already exists. delete it or specify different output folder.")
else:
raise ValueError(f"Unknown file existence policy: {file_existence_policy}")

get_logger(name="main").info(
f"Reading raw data from {str(src_path)} and" f" writing index to {str(index_path)} ..."
Expand Down Expand Up @@ -95,13 +115,38 @@ def convert_pytorch_to_hf_checkpoint(
return hf_model


def shuffle_tokenized_data(
input_data_path: Path,
output_data_path: Path,
batch_size: int,
file_existence_policy: FileExistencePolicy,
seed: int = None,
):
"""Shuffles a tokenized file (.pbin) and stores it on disc.
Args:
input_data_path (Path): Filepath to the tokenized data (.pbin).
output_data_path (Path): Filepath to write the shuffled tokenized data.
batch_size (int): Number of documents to process per batch.
file_existence_policy (FileExistencePolicy): Policy to apply when the output file already exists.
"""
if output_data_path.exists():
if not enforce_file_existence_policy(output_data_path, file_existence_policy):
return

TokenizedDataShuffler.shuffle_tokenized_data(
input_data_path=input_data_path, output_data_path=output_data_path, batch_size=batch_size, seed=seed
)


def create_shuffled_dataset_chunk(
file_path_list: list[Path],
output_chunk_file_path: Path,
chunk_id: int,
num_chunks: int,
vocab_size: int,
shuffle: bool = True,
file_existence_policy: FileExistencePolicy,
global_seed: int = None,
):
"""Creates a shuffled dataset chunk.
Given a dataset consisting of multiple tokenized pbin files, this function
Expand All @@ -115,11 +160,16 @@ def create_shuffled_dataset_chunk(
chunk_id (int): The id of the chunk to create.
num_chunks (int): The total number of chunks to create.
vocab_size (int): The size of the vocabulary.
file_existence_policy (FileExistencePolicy): Policy to apply when the output chunk file already exists.
shuffle (bool, optional): Flag indicating whether we want to shuffle the chunk. Defaults to True.
Raises:
ValueError: _description_
ValueError: If the chunk has no samples.
"""
if output_chunk_file_path.exists():
if not enforce_file_existence_policy(output_chunk_file_path, file_existence_policy):
return

samples = []
for file_path in file_path_list:
dataset = PackedMemMapDatasetBase(raw_data_path=file_path, sample_key="text", load_index=True)
Expand All @@ -134,8 +184,8 @@ def create_shuffled_dataset_chunk(
)

# samples are shuffled in place
if shuffle:
Chunking.shuffle_file_chunks_in_place(file_chunks=samples)
seed = calculate_seed(input_data=[str(global_seed), str(chunk_id)]) if global_seed is not None else None
Chunking.shuffle_file_chunks_in_place(file_chunks=samples, seed=seed)

token_size_in_bytes = TokenizedFileWriter.get_required_num_of_bytes_to_repr(int_to_get_repr=vocab_size)
TokenizedFileWriter.write_tokenized_dataset(
Expand All @@ -145,7 +195,10 @@ def create_shuffled_dataset_chunk(
)


def pack_encoded_data(config_dict: dict):
def pack_encoded_data(
config_dict: dict,
file_existence_policy: FileExistencePolicy,
):
"""Packs and encodes an indexed, large jsonl-file.
(see also `create_index` for more information)
Returns .pbin-file, which can be inserted into a training process directly
Expand All @@ -167,6 +220,10 @@ def pack_encoded_data(config_dict: dict):
config_dict=config_dict, components_model_type=PackedDatasetComponentsInstantiationModel
)

if components.settings.dst_path.exists():
if not enforce_file_existence_policy(components.settings.dst_path, file_existence_policy):
return

generator = PackedDataGenerator(
components.settings.src_path,
index_path=components.settings.index_path,
Expand Down

0 comments on commit 8fa3f20

Please sign in to comment.