From e7bf8db640676a45865ae1b00c1698b952018e88 Mon Sep 17 00:00:00 2001 From: Vincent Chen Date: Tue, 16 Jul 2024 15:59:19 -0700 Subject: [PATCH] Add convert_dataset_hf to CLI (#1348) * convert_dataset_hf * precommit * precommit * arguemnt * optino * back to fully option * typer is a pain * comma sep * checks * test * typo * clean imports * commit comments 1 * commit comments 2 (precommit hell) * script args * typer defaults * precommit * bruh * precommit * yapf * cli * update annotation * update annotation * merge * merge + refactor * typo * typo * move app * typo --------- Co-authored-by: v-chen_data --- llmfoundry/cli/cli.py | 47 +- llmfoundry/cli/data_prep_cli.py | 61 +++ llmfoundry/cli/registry_cli.py | 4 +- llmfoundry/command_utils/__init__.py | 6 + .../data_prep/convert_dataset_hf.py | 489 ++++++++++++++++++ scripts/data_prep/convert_dataset_hf.py | 429 +-------------- .../data_prep/test_convert_dataset_hf.py | 31 +- tests/data/test_dataloader.py | 63 +-- tests/data_utils.py | 32 +- 9 files changed, 658 insertions(+), 504 deletions(-) create mode 100644 llmfoundry/cli/data_prep_cli.py create mode 100644 llmfoundry/command_utils/data_prep/convert_dataset_hf.py diff --git a/llmfoundry/cli/cli.py b/llmfoundry/cli/cli.py index 606f6855ff..6c4a2d12c4 100644 --- a/llmfoundry/cli/cli.py +++ b/llmfoundry/cli/cli.py @@ -1,25 +1,34 @@ # Copyright 2024 MosaicML LLM Foundry authors # SPDX-License-Identifier: Apache-2.0 -from typing import Optional +from typing import Annotated, Optional -import typer +from typer import Argument, Typer -from llmfoundry.cli import registry_cli -from llmfoundry.command_utils import eval_from_yaml, train_from_yaml +from llmfoundry.cli import ( + data_prep_cli, + registry_cli, +) +from llmfoundry.command_utils import ( + eval_from_yaml, + train_from_yaml, +) -app = typer.Typer(pretty_exceptions_show_locals=False) +app = Typer(pretty_exceptions_show_locals=False) app.add_typer(registry_cli.app, name='registry') +app.add_typer(data_prep_cli.app, name='data_prep') @app.command(name='train') def train( - yaml_path: str = typer.Argument( - ..., - help='Path to the YAML configuration file', - ), # type: ignore - args_list: Optional[list[str]] = typer. - Argument(None, help='Additional command line arguments'), # type: ignore + yaml_path: Annotated[str, + Argument( + ..., + help='Path to the YAML configuration file', + )], + args_list: Annotated[ + Optional[list[str]], + Argument(help='Additional command line arguments')] = None, ): """Run the training with optional overrides from CLI.""" train_from_yaml(yaml_path, args_list) @@ -27,14 +36,16 @@ def train( @app.command(name='eval') def eval( - yaml_path: str = typer.Argument( - ..., - help='Path to the YAML configuration file', - ), # type: ignore - args_list: Optional[list[str]] = typer. - Argument(None, help='Additional command line arguments'), # type: ignore + yaml_path: Annotated[str, + Argument( + ..., + help='Path to the YAML configuration file', + )], + args_list: Annotated[ + Optional[list[str]], + Argument(help='Additional command line arguments')] = None, ): - """Run the training with optional overrides from CLI.""" + """Run the eval with optional overrides from CLI.""" eval_from_yaml(yaml_path, args_list) diff --git a/llmfoundry/cli/data_prep_cli.py b/llmfoundry/cli/data_prep_cli.py new file mode 100644 index 0000000000..731a9f06f0 --- /dev/null +++ b/llmfoundry/cli/data_prep_cli.py @@ -0,0 +1,61 @@ +# Copyright 2024 MosaicML LLM Foundry authors +# SPDX-License-Identifier: Apache-2.0 + +from typing import Annotated, Optional + +from typer import Option, Typer + +from llmfoundry.command_utils import ( + convert_dataset_hf_from_args, +) + +app = Typer(pretty_exceptions_show_locals=False) + + +@app.command(name='convert_dataset_hf') +def convert_dataset_hf( + dataset: Annotated[str, Option(..., help='Name of the dataset')], + out_root: Annotated[str, Option(..., help='Output root directory')], + data_subset: Annotated[ + Optional[str], + Option(help='Subset of the dataset (e.g., "all" or "en")'), + ] = None, + splits: Annotated[str, + Option(help='Comma-separated list of dataset splits',), + ] = 'train, train_small, val, val_small, val_xsmall', + compression: Annotated[Optional[str], + Option(help='Compression type')] = None, + concat_tokens: Annotated[ + Optional[int], + Option(help='Concatenate tokens up to this many tokens')] = None, + tokenizer: Annotated[Optional[str], + Option(help='Tokenizer name')] = None, + tokenizer_kwargs: Annotated[ + Optional[str], + Option(help='Tokenizer keyword arguments in JSON format')] = None, + bos_text: Annotated[Optional[str], Option(help='BOS text')] = None, + eos_text: Annotated[Optional[str], Option(help='EOS text')] = None, + no_wrap: Annotated[ + bool, + Option(help='Do not wrap text across max_length boundaries'), + ] = False, + num_workers: Annotated[Optional[int], + Option(help='Number of workers')] = None, +): + """Converts dataset from HuggingFace into JSON files.""" + # Convert comma-separated splits into a list + splits_list = splits.split(',') if splits else [] + convert_dataset_hf_from_args( + dataset=dataset, + data_subset=data_subset, + splits=splits_list, + out_root=out_root, + compression=compression, + concat_tokens=concat_tokens, + tokenizer=tokenizer, + tokenizer_kwargs=tokenizer_kwargs, + bos_text=bos_text, + eos_text=eos_text, + no_wrap=no_wrap, + num_workers=num_workers, + ) diff --git a/llmfoundry/cli/registry_cli.py b/llmfoundry/cli/registry_cli.py index 38ada51fd9..db090cd3aa 100644 --- a/llmfoundry/cli/registry_cli.py +++ b/llmfoundry/cli/registry_cli.py @@ -3,15 +3,15 @@ from typing import Optional -import typer from rich.console import Console from rich.table import Table +from typer import Typer from llmfoundry import registry from llmfoundry.utils.registry_utils import TypedRegistry console = Console() -app = typer.Typer(pretty_exceptions_show_locals=False) +app = Typer(pretty_exceptions_show_locals=False) def _get_registries(group: Optional[str] = None) -> list[TypedRegistry]: diff --git a/llmfoundry/command_utils/__init__.py b/llmfoundry/command_utils/__init__.py index 7dd8a32c36..adaaf03b6e 100644 --- a/llmfoundry/command_utils/__init__.py +++ b/llmfoundry/command_utils/__init__.py @@ -1,5 +1,9 @@ # Copyright 2024 MosaicML LLM Foundry authors # SPDX-License-Identifier: Apache-2.0 +from llmfoundry.command_utils.data_prep.convert_dataset_hf import ( + convert_dataset_hf, + convert_dataset_hf_from_args, +) from llmfoundry.command_utils.eval import ( eval_from_yaml, evaluate, @@ -20,4 +24,6 @@ 'validate_config', 'evaluate', 'eval_from_yaml', + 'convert_dataset_hf', + 'convert_dataset_hf_from_args', ] diff --git a/llmfoundry/command_utils/data_prep/convert_dataset_hf.py b/llmfoundry/command_utils/data_prep/convert_dataset_hf.py new file mode 100644 index 0000000000..f9bbe6b0cf --- /dev/null +++ b/llmfoundry/command_utils/data_prep/convert_dataset_hf.py @@ -0,0 +1,489 @@ +# Copyright 2022 MosaicML LLM Foundry authors +# SPDX-License-Identifier: Apache-2.0 + +"""Streaming dataset conversion scripts for C4 and The Pile.""" +import json +import os +import platform +from dataclasses import dataclass, field +from enum import Enum +from typing import Any, Dict, Iterable, Optional, Union + +import datasets as hf_datasets +import psutil +import torch +from numpy.typing import NDArray +from streaming import MDSWriter +from torch.utils.data import DataLoader, Dataset, IterableDataset +from tqdm import tqdm +from transformers import PreTrainedTokenizerBase + +from llmfoundry.data import ConcatTokensDataset, NoConcatDataset +from llmfoundry.utils.builders import build_tokenizer + + +class ConcatMode(Enum): + NO_CONCAT = 'NO_CONCAT' + CONCAT_TOKENS = 'CONCAT_TOKENS' + + +@dataclass +class DataSplitConstants: + hf_split: str + folder_split: str + raw_samples: Optional[int] + truncated_samples: Union[int, None] + + +@dataclass +class DatasetConstants: + chars_per_sample: int + chars_per_token: int + splits: Dict[str, DataSplitConstants] = field(default_factory=dict) + + def __iter__(self): + for v in self.splits.values(): + yield v + + +class TrainSmallConstants(DataSplitConstants): + + def __init__( + self, + hf_split: str = 'train', + folder_split: str = 'train_small', + raw_samples: int = 100000, + truncated_samples: int = 100000, + ): + super().__init__(hf_split, folder_split, raw_samples, truncated_samples) + + +class ValSmallConstants(DataSplitConstants): + + def __init__( + self, + hf_split: str = 'validation', + folder_split: str = 'val_small', + raw_samples: int = 10000, + truncated_samples: int = 10000, + ): + super().__init__(hf_split, folder_split, raw_samples, truncated_samples) + + +class ValXSmallConstants(DataSplitConstants): + + def __init__( + self, + hf_split: str = 'validation', + folder_split: str = 'val_xsmall', + raw_samples: int = 3000, + truncated_samples: int = 3000, + ): + super().__init__(hf_split, folder_split, raw_samples, truncated_samples) + + +pileconstants = DatasetConstants( + chars_per_sample=6212, # Computed over validation set + chars_per_token=4, # OpenAI estimate +) +pileconstants.splits['train'] = DataSplitConstants( + hf_split='train', + folder_split='train', + raw_samples=210607728, + truncated_samples=None, +) +pileconstants.splits['train_small'] = DataSplitConstants( + hf_split='train', + folder_split='train_small', + raw_samples=100000, + truncated_samples=100000, +) +pileconstants.splits['val'] = DataSplitConstants( + hf_split='validation', + folder_split='val', + raw_samples=214670, + truncated_samples=None, +) +pileconstants.splits['val_small'] = DataSplitConstants( + hf_split='validation', + folder_split='val_small', + raw_samples=10000, + truncated_samples=10000, +) +pileconstants.splits['val_xsmall'] = DataSplitConstants( + hf_split='validation', + folder_split='val_xsmall', + raw_samples=3000, + truncated_samples=3000, +) + +c4constants = DatasetConstants( + chars_per_sample=2163, # Computed over validation set + chars_per_token=4, # OpenAI estimate +) +c4constants.splits['train'] = DataSplitConstants( + hf_split='train', + folder_split='train', + raw_samples=364868892, + truncated_samples=None, +) +c4constants.splits['train_small'] = DataSplitConstants( + hf_split='train', + folder_split='train_small', + raw_samples=100000, + truncated_samples=100000, +) +c4constants.splits['val'] = DataSplitConstants( + hf_split='validation', + folder_split='val', + raw_samples=364608, + truncated_samples=None, +) +c4constants.splits['val_small'] = DataSplitConstants( + hf_split='validation', + folder_split='val_small', + raw_samples=10000, + truncated_samples=10000, +) +c4constants.splits['val_xsmall'] = DataSplitConstants( + hf_split='validation', + folder_split='val_xsmall', + raw_samples=3000, + truncated_samples=3000, +) +c4constants.splits['val_xxsmall'] = DataSplitConstants( + hf_split='validation', + folder_split='val_xxsmall', + raw_samples=100, + truncated_samples=100, +) + +CONSTS = {'c4': c4constants, 'the_pile': pileconstants} + + +def build_hf_dataset( + dataset_name: str, + split: str, + mode: ConcatMode, + max_length: Optional[int] = None, + bos_text: str = '', + eos_text: str = '', + no_wrap: bool = False, + tokenizer: PreTrainedTokenizerBase = None, + data_subset: Union[str, None] = None, +) -> IterableDataset: + """Build an IterableDataset over the HF C4 or pile source data. + + Args: + dataset_name (str): Dataset name + split (str): Split name. + mode (ConcatMode): NO_CONCAT, or CONCAT_TOKENS + max_length (int): The length of concatenated tokens + bos_text (str): text to insert at the beginning of each sequence + eos_text (str): text to insert at the end of each sequence + no_wrap (bool): if concatenating, whether to wrap text across `max_length` boundaries + tokenizer (PreTrainedTokenizerBase): if mode is CONCAT_TOKENS, the tokenizer to use + data_subset (str): Referred to as "name" in HuggingFace datasets.load_dataset. + Typically "all" (The Pile) or "en" (c4). + + Returns: + An IterableDataset. + """ + hf_dataset = hf_datasets.load_dataset( + path=dataset_name, + name=data_subset, + split=split, + streaming=True, + ) + if mode == ConcatMode.NO_CONCAT: + dataset = NoConcatDataset(hf_dataset) + else: + if not isinstance(tokenizer, PreTrainedTokenizerBase): + raise ValueError( + f'{tokenizer=} must be of type PreTrainedTokenizerBase', + ) + if max_length is None: + raise ValueError(f'max_length must be set.') + if bos_text + eos_text == '': + test_tokens = tokenizer('test') + if test_tokens['input_ids'][ + 0] != tokenizer.bos_token_id and test_tokens['input_ids'][ + -1] != tokenizer.eos_token_id: + tok_error_msg = 'This tokenizer does not insert an EOS nor BOS token. ' + tok_error_msg += 'Concatenating with this tokenizer will result in sequences being ' + tok_error_msg += 'attached without a separating token. Please use another tokenizer, ' + tok_error_msg += 'such as facebook/opt-125m, or specify EOS/BOS text with e.g. ' + tok_error_msg += '--bos_text=<|endoftext|>.' + raise ValueError(tok_error_msg) + dataset = ConcatTokensDataset( + hf_dataset=hf_dataset, + tokenizer=tokenizer, + max_length=max_length, + bos_text=bos_text, + eos_text=eos_text, + no_wrap=no_wrap, + ) + return dataset + + +def _est_progress_denominator( + total_samples: int, + chars_per_sample: int, + chars_per_token: int, + mode: ConcatMode, + max_length: int, +): + est_tokens_per_sample = chars_per_sample // chars_per_token + if mode == ConcatMode.NO_CONCAT: + return total_samples + elif mode == ConcatMode.CONCAT_TOKENS: + return total_samples * est_tokens_per_sample // max_length + + +def build_dataloader( + dataset: Dataset, + batch_size: int, + num_workers: Optional[int], +) -> DataLoader: + if num_workers is None: + # Multiple workers is only supported on linux machines + if 'linux' or 'macos' in platform.platform().lower(): + num_workers = max(1, psutil.cpu_count()) + else: + num_workers = 0 + + # If using multiple workers, configure each worker to prefetch as many samples as it can, up to + # the aggregate device batch size + # If not using workers, the torch DataLoader expects the default value for prefetch_factor, + # which non-intuitively must be 2. + prefetch_factor = max( + 1, + 2 * batch_size // num_workers, + ) if num_workers > 0 else 2 + + return DataLoader( + dataset=dataset, + sampler=None, + batch_size=batch_size, + num_workers=num_workers, + prefetch_factor=prefetch_factor, + ) + + +def generate_samples( + loader: DataLoader, + truncate_num_samples: Optional[int] = None, +) -> Iterable[Union[Dict[str, bytes], Dict[str, NDArray]]]: + """Generator over samples of a dataloader. + + Args: + loader (DataLoader): A dataloader emitting batches like {key: [sample0_bytes, sample1_bytes, sample2_bytes, ...]} + truncate_num_samples (Optional[int]): An optional # of samples to stop at. + + Yields: + Sample dicts. + """ + n_samples = 0 + for batch in loader: + keys = list(batch.keys()) + current_bs = len(batch[keys[0]]) + for idx in range(current_bs): + if truncate_num_samples is not None and n_samples == truncate_num_samples: + return + n_samples += 1 + yield { + k: + v[idx].numpy() if isinstance(v[idx], torch.Tensor) else v[idx] + for k, v in batch.items() + } + + +def convert_dataset_hf( + dataset: str, + data_subset: Optional[str], + splits: list[str], + out_root: str, + compression: Optional[str], + concat_tokens: Optional[int], + tokenizer: Optional[str], + tokenizer_kwargs: dict[str, Any], + bos_text: str, + eos_text: str, + no_wrap: bool, + num_workers: Optional[int], +) -> None: + """Converts HuggingFace datasets to MDS format. + + Args: + dataset (str): Name of the dataset + data_subset (Optional[str]): Subset of the dataset (e.g., "all" or "en") + splits (list[str]): Comma-separated list of dataset splits + out_root (str): Output root directory + compression (Optional[str]): Compression type + concat_tokens (Optional[int]): Concatenate tokens up to this many tokens + tokenizer (Optional[str]): Tokenizer name + tokenizer_kwargs (dict[str, Any]): Tokenizer keyword arguments + bos_text (str): BOS text + eos_text (str): EOS text + no_wrap (bool): Do not wrap text across max_length boundaries + num_workers (Optional[int]): Number of workers + + Raises: + KeyError: If constants are not defined for the split + """ + try: + dataset_constants = CONSTS[dataset] + except KeyError: + raise ValueError( + f'Constants for dataset "{dataset}" not found. Currently only "the_pile" and "c4" are supported.', + ) + + if concat_tokens is not None and tokenizer is not None: + mode = ConcatMode.CONCAT_TOKENS + built_tokenizer = build_tokenizer(tokenizer, tokenizer_kwargs) + # we will enforce length, so suppress warnings about sequences too long for the model + built_tokenizer.model_max_length = int(1e30) + columns = {'tokens': 'ndarray:int32'} + else: + mode = ConcatMode.NO_CONCAT + built_tokenizer = None + columns = {'text': 'str'} + + for split_name in splits: + try: + split = dataset_constants.splits[split_name] + except KeyError: + raise KeyError(f'Constants not defined for split {split_name}.') + hf_split = split.hf_split + folder_split = split.folder_split + expected_num_samples = split.raw_samples + truncate_num_samples = split.truncated_samples + # Only generate the splits requested + if folder_split not in splits: + continue + + # Get samples + hf_dataset = build_hf_dataset( + dataset_name=dataset, + data_subset=data_subset, + split=hf_split, + mode=mode, + max_length=concat_tokens, + bos_text=bos_text, + eos_text=eos_text, + no_wrap=no_wrap, + tokenizer=built_tokenizer, + ) + loader = build_dataloader( + dataset=hf_dataset, + batch_size=512, + num_workers=num_workers, + ) + samples = generate_samples( + loader, + truncate_num_samples=truncate_num_samples, + ) + + if expected_num_samples is not None and concat_tokens is not None: + denominator = truncate_num_samples if truncate_num_samples is not None else _est_progress_denominator( + total_samples=expected_num_samples, + chars_per_sample=dataset_constants.chars_per_sample, + chars_per_token=dataset_constants.chars_per_token, + mode=mode, + max_length=concat_tokens, + ) + else: + denominator = None + + # Write samples + print(f'Converting {folder_split} to MDS format...') + print( + f'Note: the progress bar is based on the dataset length before tokenization, and may finish at a value before 100%.', + ) + with MDSWriter( + columns=columns, + out=os.path.join(out_root, folder_split), + compression=compression, + ) as out: + if denominator is not None: + for sample in tqdm( + samples, + desc=folder_split, + total=denominator, + ): + out.write(sample) + else: + for sample in tqdm(samples, desc=folder_split): + out.write(sample) + + +def convert_dataset_hf_from_args( + dataset: str, + data_subset: Optional[str], + splits: list[str], + out_root: str, + compression: Optional[str], + concat_tokens: Optional[int], + tokenizer: Optional[str], + tokenizer_kwargs: Optional[str], + bos_text: Optional[str], + eos_text: Optional[str], + no_wrap: bool, + num_workers: Optional[int], +) -> None: + """A wrapper for `convert_dataset_hf` that parses arguments. + + Args: + dataset (str): Name of the dataset + data_subset (Optional[str]): Subset of the dataset (e.g., "all" or "en") + splits (list[str]): Comma-separated list of dataset splits + out_root (str): Output root directory + compression (Optional[str]): Compression type + concat_tokens (Optional[int]): Concatenate tokens up to this many tokens + tokenizer (Optional[str]): Tokenizer name + tokenizer_kwargs (Optional[str]): Tokenizer keyword arguments in JSON format + bos_text (Optional[str]): BOS text + eos_text (Optional[str]): EOS text + no_wrap (bool): Do not wrap text across max_length boundaries + num_workers (Optional[int]): Number of workers + + Raises: + ValueError: If the output directory already contains the requested splits + ValueError: If `concat_tokens` is set but `tokenizer` is not + """ + if tokenizer_kwargs: + parsed_tokenizer_kwargs = json.loads(tokenizer_kwargs) + else: + parsed_tokenizer_kwargs = {} + + if os.path.isdir(out_root) and len( + set(os.listdir(out_root)).intersection(set(splits)), + ) > 0: + raise ValueError( + f'--out_root={out_root} contains {os.listdir(out_root)} which cannot overlap with the requested splits {splits}.', + ) + + # Make sure we have needed concat options + if ( + concat_tokens is not None and isinstance(concat_tokens, int) and + tokenizer is None + ): + raise ValueError( + 'When setting --concat_tokens, you must specify a --tokenizer', + ) + + # now that we have validated them, change BOS/EOS to strings and convert + convert_dataset_hf( + dataset=dataset, + data_subset=data_subset, + splits=splits, + out_root=out_root, + compression=compression, + concat_tokens=concat_tokens, + tokenizer=tokenizer, + tokenizer_kwargs=parsed_tokenizer_kwargs, + bos_text=bos_text if bos_text else '', + eos_text=eos_text if eos_text else '', + no_wrap=no_wrap, + num_workers=num_workers, + ) diff --git a/scripts/data_prep/convert_dataset_hf.py b/scripts/data_prep/convert_dataset_hf.py index bf7f145610..3b893868b2 100644 --- a/scripts/data_prep/convert_dataset_hf.py +++ b/scripts/data_prep/convert_dataset_hf.py @@ -2,30 +2,9 @@ # SPDX-License-Identifier: Apache-2.0 """Streaming dataset conversion scripts for C4 and The Pile.""" -import json -import os -import platform from argparse import ArgumentParser, Namespace -from dataclasses import dataclass, field -from enum import Enum -from typing import Dict, Iterable, Optional, Union -import datasets as hf_datasets -import psutil -import torch -from numpy.typing import NDArray -from streaming import MDSWriter -from torch.utils.data import DataLoader, Dataset, IterableDataset -from tqdm import tqdm -from transformers import PreTrainedTokenizerBase - -from llmfoundry.data import ConcatTokensDataset, NoConcatDataset -from llmfoundry.utils.builders import build_tokenizer - - -class ConcatMode(Enum): - NO_CONCAT = 'NO_CONCAT' - CONCAT_TOKENS = 'CONCAT_TOKENS' +from llmfoundry.command_utils import convert_dataset_hf_from_args def parse_args() -> Namespace: @@ -64,398 +43,22 @@ def parse_args() -> Namespace: parser.add_argument('--num_workers', type=int, required=False, default=None) parsed = parser.parse_args() - - if parsed.tokenizer_kwargs is not None: - parsed.tokenizer_kwargs = json.loads(parsed.tokenizer_kwargs) - else: - parsed.tokenizer_kwargs = {} - - if os.path.isdir(parsed.out_root) and len( - set(os.listdir(parsed.out_root)).intersection(set(parsed.splits)), - ) > 0: - raise ValueError( - f'--out_root={parsed.out_root} contains {os.listdir(parsed.out_root)} which cannot overlap with the requested splits {parsed.splits}.', - ) - - # Make sure we have needed concat options - if ( - parsed.concat_tokens is not None and - isinstance(parsed.concat_tokens, int) and parsed.tokenizer is None - ): - parser.error( - 'When setting --concat_tokens, you must specify a --tokenizer', - ) - - # now that we have validated them, change BOS/EOS to strings - if parsed.bos_text is None: - parsed.bos_text = '' - if parsed.eos_text is None: - parsed.eos_text = '' return parsed -@dataclass -class DataSplitConstants: - hf_split: str - folder_split: str - raw_samples: Optional[int] - truncated_samples: Union[int, None] - - -@dataclass -class DatasetConstants: - chars_per_sample: int - chars_per_token: int - splits: Dict[str, DataSplitConstants] = field(default_factory=dict) - - def __iter__(self): - for v in self.splits.values(): - yield v - - -class TrainSmallConstants(DataSplitConstants): - - def __init__( - self, - hf_split: str = 'train', - folder_split: str = 'train_small', - raw_samples: int = 100000, - truncated_samples: int = 100000, - ): - super().__init__(hf_split, folder_split, raw_samples, truncated_samples) - - -class ValSmallConstants(DataSplitConstants): - - def __init__( - self, - hf_split: str = 'validation', - folder_split: str = 'val_small', - raw_samples: int = 10000, - truncated_samples: int = 10000, - ): - super().__init__(hf_split, folder_split, raw_samples, truncated_samples) - - -class ValXSmallConstants(DataSplitConstants): - - def __init__( - self, - hf_split: str = 'validation', - folder_split: str = 'val_xsmall', - raw_samples: int = 3000, - truncated_samples: int = 3000, - ): - super().__init__(hf_split, folder_split, raw_samples, truncated_samples) - - -pileconstants = DatasetConstants( - chars_per_sample=6212, # Computed over validation set - chars_per_token=4, # OpenAI estimate -) -pileconstants.splits['train'] = DataSplitConstants( - hf_split='train', - folder_split='train', - raw_samples=210607728, - truncated_samples=None, -) -pileconstants.splits['train_small'] = DataSplitConstants( - hf_split='train', - folder_split='train_small', - raw_samples=100000, - truncated_samples=100000, -) -pileconstants.splits['val'] = DataSplitConstants( - hf_split='validation', - folder_split='val', - raw_samples=214670, - truncated_samples=None, -) -pileconstants.splits['val_small'] = DataSplitConstants( - hf_split='validation', - folder_split='val_small', - raw_samples=10000, - truncated_samples=10000, -) -pileconstants.splits['val_xsmall'] = DataSplitConstants( - hf_split='validation', - folder_split='val_xsmall', - raw_samples=3000, - truncated_samples=3000, -) - -c4constants = DatasetConstants( - chars_per_sample=2163, # Computed over validation set - chars_per_token=4, # OpenAI estimate -) -c4constants.splits['train'] = DataSplitConstants( - hf_split='train', - folder_split='train', - raw_samples=364868892, - truncated_samples=None, -) -c4constants.splits['train_small'] = DataSplitConstants( - hf_split='train', - folder_split='train_small', - raw_samples=100000, - truncated_samples=100000, -) -c4constants.splits['val'] = DataSplitConstants( - hf_split='validation', - folder_split='val', - raw_samples=364608, - truncated_samples=None, -) -c4constants.splits['val_small'] = DataSplitConstants( - hf_split='validation', - folder_split='val_small', - raw_samples=10000, - truncated_samples=10000, -) -c4constants.splits['val_xsmall'] = DataSplitConstants( - hf_split='validation', - folder_split='val_xsmall', - raw_samples=3000, - truncated_samples=3000, -) -c4constants.splits['val_xxsmall'] = DataSplitConstants( - hf_split='validation', - folder_split='val_xxsmall', - raw_samples=100, - truncated_samples=100, -) - -CONSTS = {'c4': c4constants, 'the_pile': pileconstants} - - -def build_hf_dataset( - dataset_name: str, - split: str, - mode: ConcatMode, - max_length: Optional[int] = None, - bos_text: str = '', - eos_text: str = '', - no_wrap: bool = False, - tokenizer: PreTrainedTokenizerBase = None, - data_subset: Union[str, None] = None, -) -> IterableDataset: - """Build an IterableDataset over the HF C4 or pile source data. - - Args: - dataset_name (str): Dataset name - split (str): Split name. - mode (ConcatMode): NO_CONCAT, or CONCAT_TOKENS - max_length (int): The length of concatenated tokens - bos_text (str): text to insert at the beginning of each sequence - eos_text (str): text to insert at the end of each sequence - no_wrap (bool): if concatenating, whether to wrap text across `max_length` boundaries - tokenizer (PreTrainedTokenizerBase): if mode is CONCAT_TOKENS, the tokenizer to use - data_subset (str): Referred to as "name" in HuggingFace datasets.load_dataset. - Typically "all" (The Pile) or "en" (c4). - - Returns: - An IterableDataset. - """ - hf_dataset = hf_datasets.load_dataset( - path=dataset_name, - name=data_subset, - split=split, - streaming=True, - ) - if mode == ConcatMode.NO_CONCAT: - dataset = NoConcatDataset(hf_dataset) - else: - if not isinstance(tokenizer, PreTrainedTokenizerBase): - raise ValueError( - f'{tokenizer=} must be of type PreTrainedTokenizerBase', - ) - if max_length is None: - raise ValueError(f'max_length must be set.') - if bos_text + eos_text == '': - test_tokens = tokenizer('test') - if test_tokens['input_ids'][ - 0] != tokenizer.bos_token_id and test_tokens['input_ids'][ - -1] != tokenizer.eos_token_id: - tok_error_msg = 'This tokenizer does not insert an EOS nor BOS token. ' - tok_error_msg += 'Concatenating with this tokenizer will result in sequences being ' - tok_error_msg += 'attached without a separating token. Please use another tokenizer, ' - tok_error_msg += 'such as facebook/opt-125m, or specify EOS/BOS text with e.g. ' - tok_error_msg += '--bos_text=<|endoftext|>.' - raise ValueError(tok_error_msg) - dataset = ConcatTokensDataset( - hf_dataset=hf_dataset, - tokenizer=tokenizer, - max_length=max_length, - bos_text=bos_text, - eos_text=eos_text, - no_wrap=no_wrap, - ) - return dataset - - -def _est_progress_denominator( - total_samples: int, - chars_per_sample: int, - chars_per_token: int, - mode: ConcatMode, - max_length: int, -): - est_tokens_per_sample = chars_per_sample // chars_per_token - if mode == ConcatMode.NO_CONCAT: - return total_samples - elif mode == ConcatMode.CONCAT_TOKENS: - return total_samples * est_tokens_per_sample // max_length - - -def build_dataloader( - dataset: Dataset, - batch_size: int, - num_workers: Optional[int], -) -> DataLoader: - if num_workers is None: - # Multiple workers is only supported on linux machines - if 'linux' or 'macos' in platform.platform().lower(): - num_workers = max(1, psutil.cpu_count()) - else: - num_workers = 0 - - # If using multiple workers, configure each worker to prefetch as many samples as it can, up to - # the aggregate device batch size - # If not using workers, the torch DataLoader expects the default value for prefetch_factor, - # which non-intuitively must be 2. - prefetch_factor = max( - 1, - 2 * batch_size // num_workers, - ) if num_workers > 0 else 2 - - return DataLoader( - dataset=dataset, - sampler=None, - batch_size=batch_size, - num_workers=num_workers, - prefetch_factor=prefetch_factor, - ) - - -def generate_samples( - loader: DataLoader, - truncate_num_samples: Optional[int] = None, -) -> Iterable[Union[Dict[str, bytes], Dict[str, NDArray]]]: - """Generator over samples of a dataloader. - - Args: - loader (DataLoader): A dataloader emitting batches like {key: [sample0_bytes, sample1_bytes, sample2_bytes, ...]} - truncate_num_samples (Optional[int]): An optional # of samples to stop at. - - Yields: - Sample dicts. - """ - n_samples = 0 - for batch in loader: - keys = list(batch.keys()) - current_bs = len(batch[keys[0]]) - for idx in range(current_bs): - if truncate_num_samples is not None and n_samples == truncate_num_samples: - return - n_samples += 1 - yield { - k: - v[idx].numpy() if isinstance(v[idx], torch.Tensor) else v[idx] - for k, v in batch.items() - } - - -def main(args: Namespace) -> None: - """Main: create C4/pile streaming dataset. - - Args: - args (Namespace): Commandline arguments. - """ - try: - dataset_constants = CONSTS[args.dataset] - except KeyError: - raise ValueError( - f'Constants for dataset "{args.dataset}" not found. Currently only "the_pile" and "c4" are supported.', - ) - - if args.concat_tokens is not None: - mode = ConcatMode.CONCAT_TOKENS - tokenizer = build_tokenizer(args.tokenizer, args.tokenizer_kwargs) - # we will enforce length, so suppress warnings about sequences too long for the model - tokenizer.model_max_length = int(1e30) - columns = {'tokens': 'ndarray:int32'} - else: - mode = ConcatMode.NO_CONCAT - tokenizer = None - columns = {'text': 'str'} - - for split_name in args.splits: - try: - split = dataset_constants.splits[split_name] - except KeyError: - raise KeyError(f'Constants not defined for split {split_name}.') - hf_split = split.hf_split - folder_split = split.folder_split - expected_num_samples = split.raw_samples - truncate_num_samples = split.truncated_samples - # Only generate the splits requested - if folder_split not in args.splits: - continue - - # Get samples - dataset = build_hf_dataset( - dataset_name=args.dataset, - data_subset=args.data_subset, - split=hf_split, - mode=mode, - max_length=args.concat_tokens, - bos_text=args.bos_text, - eos_text=args.eos_text, - no_wrap=args.no_wrap, - tokenizer=tokenizer, - ) - loader = build_dataloader( - dataset=dataset, - batch_size=512, - num_workers=args.num_workers, - ) - samples = generate_samples( - loader, - truncate_num_samples=truncate_num_samples, - ) - - if expected_num_samples is not None: - denominator = truncate_num_samples if truncate_num_samples is not None else _est_progress_denominator( - total_samples=expected_num_samples, - chars_per_sample=dataset_constants.chars_per_sample, - chars_per_token=dataset_constants.chars_per_token, - mode=mode, - max_length=args.concat_tokens, - ) - else: - denominator = None - - # Write samples - print(f'Converting {folder_split} to MDS format...') - print( - f'Note: the progress bar is based on the dataset length before tokenization, and may finish at a value before 100%.', - ) - with MDSWriter( - columns=columns, - out=os.path.join(args.out_root, folder_split), - compression=args.compression, - ) as out: - if denominator is not None: - for sample in tqdm( - samples, - desc=folder_split, - total=denominator, - ): - out.write(sample) - else: - for sample in tqdm(samples, desc=folder_split): - out.write(sample) - - if __name__ == '__main__': - main(parse_args()) + args = parse_args() + convert_dataset_hf_from_args( + dataset=args.dataset, + data_subset=args.data_subset, + splits=args.splits, + out_root=args.out_root, + compression=args.compression, + concat_tokens=args.concat_tokens, + tokenizer=args.tokenizer, + tokenizer_kwargs=args.tokenizer_kwargs, + bos_text=args.bos_text, + eos_text=args.eos_text, + no_wrap=args.no_wrap, + num_workers=args.num_workers, + ) diff --git a/tests/a_scripts/data_prep/test_convert_dataset_hf.py b/tests/a_scripts/data_prep/test_convert_dataset_hf.py index 4c5d1a6bba..e09c54ca70 100644 --- a/tests/a_scripts/data_prep/test_convert_dataset_hf.py +++ b/tests/a_scripts/data_prep/test_convert_dataset_hf.py @@ -2,29 +2,26 @@ # SPDX-License-Identifier: Apache-2.0 import os -from argparse import Namespace from pathlib import Path -from scripts.data_prep.convert_dataset_hf import main as main_hf +from llmfoundry.command_utils import convert_dataset_hf def test_download_script_from_api(tmp_path: Path): # test calling it directly path = os.path.join(tmp_path, 'my-copy-c4-1') - main_hf( - Namespace( - **{ - 'dataset': 'c4', - 'data_subset': 'en', - 'splits': ['val_xsmall'], - 'out_root': path, - 'compression': None, - 'concat_tokens': None, - 'bos_text': None, - 'eos_text': None, - 'no_wrap': False, - 'num_workers': None, - }, - ), + convert_dataset_hf( + dataset='c4', + data_subset='en', + splits=['val_xsmall'], + out_root=path, + compression=None, + concat_tokens=None, + bos_text='', + eos_text='', + no_wrap=False, + num_workers=None, + tokenizer=None, + tokenizer_kwargs={}, ) assert os.path.exists(path) diff --git a/tests/data/test_dataloader.py b/tests/data/test_dataloader.py index a489002399..21d73c0d34 100644 --- a/tests/data/test_dataloader.py +++ b/tests/data/test_dataloader.py @@ -5,7 +5,6 @@ import pathlib import random import shutil -from argparse import Namespace from contextlib import nullcontext as does_not_raise from pathlib import Path from typing import ContextManager, Literal, Optional, Union @@ -22,6 +21,7 @@ from streaming import MDSWriter from streaming.base.util import clean_stale_shared_memory +from llmfoundry.command_utils import convert_dataset_hf from llmfoundry.data import build_dataloader, build_finetuning_dataloader from llmfoundry.data.finetuning.collator import ( _HF_IGNORE_INDEX, @@ -56,7 +56,6 @@ UnknownExampleTypeError, ) # yapf: enable -from scripts.data_prep.convert_dataset_hf import main as main_hf from scripts.data_prep.convert_finetuning_dataset import get_columns_and_format from tests.data_utils import ( make_tiny_conversation_ft_dataset, @@ -204,42 +203,34 @@ def test_correct_padding( path = get_abs_data_path(data_local) shutil.rmtree(path, ignore_errors=True) if pretokenize: - main_hf( - Namespace( - **{ - 'dataset': 'c4', - 'data_subset': 'en', - 'splits': [split], - 'out_root': path, - 'compression': None, - 'concat_tokens': 2048, - 'tokenizer': tokenizer_name, - 'tokenizer_kwargs': {}, - 'bos_text': bos_text, - 'eos_text': eos_text, - 'no_wrap': False, - 'num_workers': None, - }, - ), + convert_dataset_hf( + dataset='c4', + data_subset='en', + splits=[split], + out_root=path, + compression=None, + concat_tokens=2048, + tokenizer=tokenizer_name, + tokenizer_kwargs={}, + bos_text=bos_text, + eos_text=eos_text, + no_wrap=False, + num_workers=None, ) else: - main_hf( - Namespace( - **{ - 'dataset': 'c4', - 'data_subset': 'en', - 'splits': [split], - 'out_root': path, - 'compression': None, - 'concat_tokens': None, - 'tokenizer': tokenizer_name, - 'tokenizer_kwargs': {}, - 'bos_text': bos_text, - 'eos_text': eos_text, - 'no_wrap': False, - 'num_workers': None, - }, - ), + convert_dataset_hf( + dataset='c4', + data_subset='en', + splits=[split], + out_root=path, + compression=None, + concat_tokens=None, + tokenizer=tokenizer_name, + tokenizer_kwargs={}, + bos_text=bos_text, + eos_text=eos_text, + no_wrap=False, + num_workers=None, ) if not os.path.isdir(path): raise RuntimeError(f'c4 dataset at {path} not set up as expected') diff --git a/tests/data_utils.py b/tests/data_utils.py index 9653d8579a..35e11db531 100644 --- a/tests/data_utils.py +++ b/tests/data_utils.py @@ -11,7 +11,7 @@ from omegaconf import DictConfig from omegaconf import OmegaConf as om -from scripts.data_prep.convert_dataset_hf import main as main_hf # noqa: E402 +from llmfoundry.command_utils import convert_dataset_hf from scripts.data_prep.convert_dataset_json import \ main as main_json # noqa: E402 @@ -230,23 +230,19 @@ def create_c4_dataset_xxsmall(path: Path) -> str: downloaded_split = 'val_xxsmall' # very fast to convert # Hyperparameters from https://github.com/mosaicml/llm-foundry/blob/340a56658560ebceb2a3aa69d6e37813e415acd0/README.md#L188 - main_hf( - Namespace( - **{ - 'dataset': 'c4', - 'data_subset': 'en', - 'splits': [downloaded_split], - 'out_root': c4_dir, - 'compression': None, - 'concat_tokens': 2048, - 'tokenizer': 'EleutherAI/gpt-neox-20b', - 'tokenizer_kwargs': {}, - 'bos_text': '', - 'eos_text': '<|endoftext|>', - 'no_wrap': False, - 'num_workers': 8, - }, - ), + convert_dataset_hf( + dataset='c4', + data_subset='en', + splits=[downloaded_split], + out_root=c4_dir, + compression=None, + concat_tokens=2048, + tokenizer='EleutherAI/gpt-neox-20b', + tokenizer_kwargs={}, + bos_text='', + eos_text='<|endoftext|>', + no_wrap=False, + num_workers=8, ) # copy the small downloaded_split to other c4 splits for mocking purposes