diff --git a/libs/libcommon/src/libcommon/constants.py b/libs/libcommon/src/libcommon/constants.py index 505b083bd..11305c8e8 100644 --- a/libs/libcommon/src/libcommon/constants.py +++ b/libs/libcommon/src/libcommon/constants.py @@ -43,6 +43,8 @@ LARGE_MAX_FAILED_RUNS = 30 # for errors that should not be permanent MAX_FAILED_RUNS_PER_ERROR_CODE = { # default + "ConfigNamesError": DEFAULT_MAX_FAILED_RUNS, # <- 20240822: to recompute all of them in the next backfill + "RetryableConfigNamesError": DEFAULT_MAX_FAILED_RUNS, "ConnectionError": DEFAULT_MAX_FAILED_RUNS, "ExternalServerError": DEFAULT_MAX_FAILED_RUNS, "JobManagerCrashedError": DEFAULT_MAX_FAILED_RUNS, diff --git a/libs/libcommon/src/libcommon/exceptions.py b/libs/libcommon/src/libcommon/exceptions.py index b0b00e042..dd1010f40 100644 --- a/libs/libcommon/src/libcommon/exceptions.py +++ b/libs/libcommon/src/libcommon/exceptions.py @@ -78,6 +78,7 @@ def as_response(self) -> ErrorResponse: "ConfigNamesError", "ConfigNotFoundError", "CreateCommitError", + "DataFilesNotFoundError", "DatasetGenerationError", "DatasetGenerationCastError", "DatasetInBlockListError", @@ -99,6 +100,7 @@ def as_response(self) -> ErrorResponse: "ExternalServerError", "FeaturesError", "FeaturesResponseEmptyError", + "FileFormatMismatchBetweenSplitsError", "FileSystemError", "HfHubError", "InfoError", @@ -120,6 +122,7 @@ def as_response(self) -> ErrorResponse: "PreviousStepStatusError", "PreviousStepStillProcessingError", "PolarsParquetReadError", + "RetryableConfigNamesError", "RowsPostProcessingError", "SplitsNamesError", "SplitNamesFromStreamingError", @@ -185,6 +188,13 @@ def __init__(self, message: str, cause: Optional[BaseException] = None): super().__init__(message, HTTPStatus.INTERNAL_SERVER_ERROR, "CreateCommitError", cause, False) +class DataFilesNotFoundError(CacheableError): + """No (supported) data files found.""" + + def __init__(self, message: str, cause: Optional[BaseException] = None): + super().__init__(message, HTTPStatus.INTERNAL_SERVER_ERROR, "DataFilesNotFoundError", cause, False) + + class DatasetGenerationError(CacheableError): """The dataset generation failed.""" @@ -323,6 +333,15 @@ def __init__(self, message: str, cause: Optional[BaseException] = None): super().__init__(message, HTTPStatus.INTERNAL_SERVER_ERROR, "FeaturesResponseEmptyError", cause, True) +class FileFormatMismatchBetweenSplitsError(CacheableError): + """Couldn't infer the same data file format for all splits.""" + + def __init__(self, message: str, cause: Optional[BaseException] = None): + super().__init__( + message, HTTPStatus.INTERNAL_SERVER_ERROR, "FileFormatMismatchBetweenSplitsError", cause, False + ) + + class FileSystemError(CacheableError): """An error happen reading from File System.""" @@ -446,6 +465,13 @@ def __init__(self, message: str, cause: Optional[BaseException] = None): super().__init__(message, HTTPStatus.INTERNAL_SERVER_ERROR, "PreviousStepStillProcessingError", cause, False) +class RetryableConfigNamesError(CacheableError): + """The config names could not be fetched, but we should retry.""" + + def __init__(self, message: str, cause: Optional[BaseException] = None): + super().__init__(message, HTTPStatus.INTERNAL_SERVER_ERROR, "RetryableConfigNamesError", cause, True) + + class RowsPostProcessingError(CacheableError): """The rows could not be post-processed successfully.""" diff --git a/services/worker/src/worker/job_runners/config/parquet_and_info.py b/services/worker/src/worker/job_runners/config/parquet_and_info.py index 109b20ce8..2d2c8353b 100644 --- a/services/worker/src/worker/job_runners/config/parquet_and_info.py +++ b/services/worker/src/worker/job_runners/config/parquet_and_info.py @@ -62,7 +62,6 @@ ) from libcommon.dtos import JobInfo, SplitHubFile from libcommon.exceptions import ( - ConfigNamesError, CreateCommitError, DatasetGenerationCastError, DatasetGenerationError, @@ -1408,8 +1407,6 @@ def compute_config_parquet_and_info_response( If one of the commits could not be created on the Hub. [~`libcommon.exceptions.EmptyDatasetError`]: The dataset is empty. - [~`libcommon.exceptions.ConfigNamesError`]: - If the list of configurations could not be obtained using the datasets library. [~`libcommon.exceptions.UnsupportedExternalFilesError`]: If we failed to get the external files sizes to make sure we can convert the dataset to parquet [~`libcommon.exceptions.ExternalFilesSizeRequestHTTPError`]: @@ -1450,7 +1447,7 @@ def compute_config_parquet_and_info_response( config_names = {config_name_item["config"] for config_name_item in config_names_content["config_names"]} if config not in config_names: - raise ConfigNamesError(f"{config=} does not exist in {dataset=}") + raise ValueError(f"{config=} does not exist in {dataset=}") hf_api = HfApi(endpoint=hf_endpoint, token=hf_token) committer_hf_api = HfApi(endpoint=hf_endpoint, token=committer_hf_token) diff --git a/services/worker/src/worker/job_runners/dataset/config_names.py b/services/worker/src/worker/job_runners/dataset/config_names.py index 003c3568e..293b9f5fd 100644 --- a/services/worker/src/worker/job_runners/dataset/config_names.py +++ b/services/worker/src/worker/job_runners/dataset/config_names.py @@ -12,13 +12,20 @@ load_dataset_builder, ) from datasets.data_files import EmptyDatasetError as _EmptyDatasetError -from datasets.exceptions import DefunctDatasetError +from datasets.exceptions import ( + DataFilesNotFoundError as _DataFilesNotFoundError, +) +from datasets.exceptions import DatasetNotFoundError, DefunctDatasetError +from huggingface_hub.utils import HfHubHTTPError from libcommon.exceptions import ( ConfigNamesError, + DataFilesNotFoundError, DatasetModuleNotInstalledError, DatasetWithScriptNotSupportedError, DatasetWithTooManyConfigsError, EmptyDatasetError, + FileFormatMismatchBetweenSplitsError, + RetryableConfigNamesError, ) from worker.dtos import CompleteJobResult, ConfigNameItem, DatasetConfigNamesResponse @@ -107,10 +114,16 @@ def compute_config_names_response( ] except _EmptyDatasetError as err: raise EmptyDatasetError("The dataset is empty.", cause=err) from err + except _DataFilesNotFoundError as err: + raise DataFilesNotFoundError(str(err), cause=err) from err except ValueError as err: if "trust_remote_code" in str(err): raise DatasetWithScriptNotSupportedError from err + if "Couldn't infer the same data file format for all splits" in str(err): + raise FileFormatMismatchBetweenSplitsError(str(err), cause=err) from err raise ConfigNamesError("Cannot get the config names for the dataset.", cause=err) from err + except (HfHubHTTPError, BrokenPipeError, DatasetNotFoundError, PermissionError, ConnectionError) as err: + raise RetryableConfigNamesError("Cannot get the config names for the dataset.", cause=err) from err except ImportError as err: # this should only happen if the dataset is in the allow list, which should soon disappear raise DatasetModuleNotInstalledError( diff --git a/services/worker/tests/job_runners/dataset/test_config_names.py b/services/worker/tests/job_runners/dataset/test_config_names.py index 234322de4..8f7220039 100644 --- a/services/worker/tests/job_runners/dataset/test_config_names.py +++ b/services/worker/tests/job_runners/dataset/test_config_names.py @@ -3,6 +3,7 @@ from collections.abc import Callable from dataclasses import replace +from typing import Optional from unittest.mock import patch import pytest @@ -98,9 +99,9 @@ def test_compute_too_many_configs( ("n_configs_with_default", False, None, None), # should we really test the following cases? # The assumption is that the dataset exists and is accessible with the token - ("does_not_exist", False, "ConfigNamesError", "DatasetNotFoundError"), - ("gated", False, "ConfigNamesError", "ConnectionError"), # See: huggingface/datasets#7109 - ("private", False, "ConfigNamesError", "DatasetNotFoundError"), + ("does_not_exist", False, "RetryableConfigNamesError", "DatasetNotFoundError"), + ("gated", False, "RetryableConfigNamesError", "ConnectionError"), # See: huggingface/datasets#7109 + ("private", False, "RetryableConfigNamesError", "DatasetNotFoundError"), ], ) def test_compute_splits_response( @@ -114,7 +115,7 @@ def test_compute_splits_response( get_job_runner: GetJobRunner, name: str, use_token: bool, - error_code: str, + error_code: Optional[str], cause: str, app_config: AppConfig, ) -> None: