diff --git a/llmfoundry/data/denoising.py b/llmfoundry/data/denoising.py index 7d497b4efd..8ccf7f25e9 100644 --- a/llmfoundry/data/denoising.py +++ b/llmfoundry/data/denoising.py @@ -477,13 +477,13 @@ def build_text_denoising_dataloader( remote=cfg.dataset.get('remote'), split=cfg.dataset.get('split'), shuffle=cfg.dataset.get('shuffle', False), - predownload=cfg.dataset.get('predownload', 100_000), + predownload=cfg.dataset.get('predownload', None), keep_zip=cfg.dataset.get('keep_zip', False), download_retry=cfg.dataset.get('download_retry', 2), download_timeout=cfg.dataset.get('download_timeout', 60), - validate_hash=cfg.dataset.get('validate_hash'), + validate_hash=cfg.dataset.get('validate_hash', None), shuffle_seed=cfg.dataset.get('shuffle_seed', 9176), - num_canonical_nodes=cfg.dataset.get('num_canonical_nodes', 128), + num_canonical_nodes=cfg.dataset.get('num_canonical_nodes', None), batch_size=device_batch_size, ) diff --git a/llmfoundry/data/finetuning/dataloader.py b/llmfoundry/data/finetuning/dataloader.py index 44d6d345f5..b19cab841f 100644 --- a/llmfoundry/data/finetuning/dataloader.py +++ b/llmfoundry/data/finetuning/dataloader.py @@ -136,13 +136,13 @@ def build_finetuning_dataloader(cfg: DictConfig, epoch_size=cfg.dataset.get('epoch_size', None), predownload=cfg.dataset.get('predownload', None), cache_limit=cfg.dataset.get('cache_limit', None), - partition_algo=cfg.dataset.get('partition_algo', 'orig'), + partition_algo=cfg.dataset.get('partition_algo', 'relaxed'), num_canonical_nodes=cfg.dataset.get('num_canonical_nodes', None), batch_size=device_batch_size, shuffle=cfg.dataset.get('shuffle', False), - shuffle_algo=cfg.dataset.get('shuffle_algo', 'py1b'), + shuffle_algo=cfg.dataset.get('shuffle_algo', 'py1e'), shuffle_seed=cfg.dataset.get('shuffle_seed', 9176), - shuffle_block_size=cfg.dataset.get('shuffle_block_size', 1 << 18), + shuffle_block_size=cfg.dataset.get('shuffle_block_size', None), sampling_method=cfg.dataset.get('sampling_method', 'balanced'), sampling_granularity=cfg.dataset.get('sampling_granularity', 1), batching_method=cfg.dataset.get('batching_method', 'random'), diff --git a/llmfoundry/data/finetuning/tasks.py b/llmfoundry/data/finetuning/tasks.py index 6ba6ad96c8..bc712a7504 100644 --- a/llmfoundry/data/finetuning/tasks.py +++ b/llmfoundry/data/finetuning/tasks.py @@ -88,12 +88,12 @@ class StreamingFinetuningDataset(StreamingDataset): keep_zip (bool): Whether to keep or delete the compressed form when decompressing downloaded shards. If ``False``, keep iff remote is local or no remote. Defaults to `False``. - epoch_size (int, optional): Number of samples to draw per epoch balanced across all + epoch_size (Union[int, str], optional): Number of samples to draw per epoch balanced across all streams. If ``None``, takes its value from the total number of underlying samples. Provide this field if you are weighting streams relatively to target a larger or smaller epoch size. Defaults to ``None``. predownload (int, optional): Target number of samples ahead to download the shards of while - iterating. Defaults to ``100_000``. + iterating. If ``None``, its value is set to ``8 * batch_size``. Defaults to ``None``. cache_limit (Union[int, str], optional) - Maximum size in bytes of this StreamingDataset's shard cache. Before downloading a shard, the least recently used resident shard(s) may be evicted (deleted from the local cache) in order to stay under the limit. Set to None @@ -101,15 +101,17 @@ class StreamingFinetuningDataset(StreamingDataset): bytes (e.g., 100b, 64kb, 77mb, and so on). Defaults to None. partition_algo (str): Which partitioning algorithm to use. Defaults to ``orig``. num_canonical_nodes (int, optional): Canonical number of nodes for shuffling with - resumption. Defaults to ``None``, which is interpreted as the number of nodes of the - initial run. + resumption. If ``None``, this is interpreted as 64 times the number of physical + nodes of the initial run if ``shuffle_algo`` is ``py1s`` or ``py2s``, and simply the + number of physical nodes of the initial run otherwise. Defaults to ``None``. batch_size (int, optional): Batch size of its DataLoader, which affects how the dataset is partitioned over the workers. Defaults to ``None``. shuffle (bool): Whether to iterate over the samples in randomized order. Defaults to ``False``. - shuffle_algo (str): Which shuffling algorithm to use. Defaults to ``py1b``. + shuffle_algo (str): Which shuffling algorithm to use. Defaults to ``py1e``. shuffle_seed (int): Seed for Deterministic data shuffling. Defaults to ``9176``. - shuffle_block_size (int): Unit of shuffle. Defaults to ``1 << 18``. + shuffle_block_size (int): Unit of shuffle. If ``None``, its value is calculated as + ``max(4_000_000 // num_canonical_nodes), 1 << 18)``. Defaults to ``None``. sampling_method (str): Which sampling method to use, either ``balanced`` or ``fixed``. Defaults to ``balanced``. sampling_granularity (int): When picking samples for a stream's final partial repeat, @@ -129,16 +131,16 @@ def __init__(self, download_timeout: float = 60, validate_hash: Optional[str] = None, keep_zip: bool = False, - epoch_size: Optional[int] = None, + epoch_size: Optional[Union[int, str]] = None, predownload: Optional[int] = None, cache_limit: Optional[Union[int, str]] = None, - partition_algo: str = 'orig', + partition_algo: str = 'relaxed', num_canonical_nodes: Optional[int] = None, batch_size: Optional[int] = None, shuffle: bool = False, - shuffle_algo: str = 'py1b', + shuffle_algo: str = 'py1e', shuffle_seed: int = 9176, - shuffle_block_size: int = 1 << 18, + shuffle_block_size: Optional[int] = None, sampling_method: str = 'balanced', sampling_granularity: int = 1, batching_method: str = 'random', diff --git a/llmfoundry/data/text_data.py b/llmfoundry/data/text_data.py index 93af2f63ed..51fd6b38dc 100644 --- a/llmfoundry/data/text_data.py +++ b/llmfoundry/data/text_data.py @@ -46,12 +46,12 @@ class StreamingTextDataset(StreamingDataset): keep_zip (bool): Whether to keep or delete the compressed form when decompressing downloaded shards. If ``False``, keep iff remote is local or no remote. Defaults to `False``. - epoch_size (int, optional): Number of samples to draw per epoch balanced across all + epoch_size (Union[int, str], optional): Number of samples to draw per epoch balanced across all streams. If ``None``, takes its value from the total number of underlying samples. Provide this field if you are weighting streams relatively to target a larger or smaller epoch size. Defaults to ``None``. predownload (int, optional): Target number of samples ahead to download the shards of while - iterating. Defaults to ``100_000``. + iterating. If ``None``, its value is set to ``8 * batch_size``. Defaults to ``None``. cache_limit (Union[int, str], optional) - Maximum size in bytes of this StreamingDataset's shard cache. Before downloading a shard, the least recently used resident shard(s) may be evicted (deleted from the local cache) in order to stay under the limit. Set to None @@ -59,15 +59,19 @@ class StreamingTextDataset(StreamingDataset): bytes (e.g., 100b, 64kb, 77mb, and so on). Defaults to None. partition_algo (str): Which partitioning algorithm to use. Defaults to ``orig``. num_canonical_nodes (int, optional): Canonical number of nodes for shuffling with - resumption. Defaults to ``None``, which is interpreted as the number of nodes of the - initial run. + resumption. If ``None``, this is interpreted as 64 times the number of physical + nodes of the initial run if ``shuffle_algo`` is ``py1s`` or ``py2s``, and simply the + number of physical nodes of the initial run otherwise. Defaults to ``None``. batch_size (int, optional): Batch size of its DataLoader, which affects how the dataset is partitioned over the workers. Defaults to ``None``. shuffle (bool): Whether to iterate over the samples in randomized order. Defaults to ``False``. - shuffle_algo (str): Which shuffling algorithm to use. Defaults to ``py1b``. + shuffle_algo (str): Which shuffling algorithm to use. Defaults to ``py1e``. shuffle_seed (int): Seed for Deterministic data shuffling. Defaults to ``9176``. - shuffle_block_size (int): Unit of shuffle. Defaults to ``1 << 18``. + shuffle_block_size (int, optional): Unit of shuffle. A canonical node's samples are split + into blocks of this size, and samples within each block are shuffled. If ``None``, its + value is calculated as ``max(4_000_000 // num_canonical_nodes), 1 << 18)``. Defaults to + ``None``. sampling_method (str): Which sampling method to use, either ``balanced`` or ``fixed``. Defaults to ``balanced``. sampling_granularity (int): When picking samples for a stream's final partial repeat, @@ -89,16 +93,16 @@ def __init__(self, download_timeout: float = 60, validate_hash: Optional[str] = None, keep_zip: bool = False, - epoch_size: Optional[int] = None, - predownload: int = 100_000, + epoch_size: Optional[Union[int, str]] = None, + predownload: Optional[int] = None, cache_limit: Optional[Union[int, str]] = None, - partition_algo: str = 'orig', + partition_algo: str = 'relaxed', num_canonical_nodes: Optional[int] = None, batch_size: Optional[int] = None, shuffle: bool = False, - shuffle_algo: str = 'py1b', + shuffle_algo: str = 'py1e', shuffle_seed: int = 9176, - shuffle_block_size: int = 1 << 18, + shuffle_block_size: Optional[int] = None, sampling_method: str = 'balanced', sampling_granularity: int = 1, batching_method: str = 'random', diff --git a/setup.py b/setup.py index 81178686d2..05d1d1bbbe 100644 --- a/setup.py +++ b/setup.py @@ -50,7 +50,7 @@ 'mosaicml[libcloud,wandb,mlflow,oci,gcs]>=0.16.4,<0.17', 'accelerate>=0.20,<0.21', # for HF inference `device_map` 'transformers>=4.34.1,<4.35', - 'mosaicml-streaming>=0.6,<0.7', + 'mosaicml-streaming>=0.7.1,<0.8', 'torch>=1.13.1,<2.1.1', 'datasets>=2.14.5,<2.15', 'fsspec==2023.6.0', # newer version results in a bug in datasets that duplicates data