From 9e221e096469f130f3933a26966fbfb6a49dff05 Mon Sep 17 00:00:00 2001 From: Sylvain Lesage Date: Thu, 22 Aug 2024 11:48:26 +0200 Subject: [PATCH 1/4] fix the lock release when finishing a job (#3039) --- libs/libcommon/src/libcommon/queue/jobs.py | 9 ++++----- libs/libcommon/src/libcommon/queue/lock.py | 15 --------------- 2 files changed, 4 insertions(+), 20 deletions(-) diff --git a/libs/libcommon/src/libcommon/queue/jobs.py b/libs/libcommon/src/libcommon/queue/jobs.py index 5c0bf9e489..cfbdea2085 100644 --- a/libs/libcommon/src/libcommon/queue/jobs.py +++ b/libs/libcommon/src/libcommon/queue/jobs.py @@ -30,7 +30,7 @@ ) from libcommon.dtos import FlatJobInfo, JobInfo, Priority, Status, WorkerSize from libcommon.queue.dataset_blockages import DATASET_STATUS_BLOCKED, DATASET_STATUS_NORMAL, get_blocked_datasets -from libcommon.queue.lock import lock, release_lock, release_locks +from libcommon.queue.lock import lock, release_lock from libcommon.queue.metrics import ( decrease_metric, decrease_worker_size_metrics, @@ -147,7 +147,7 @@ class JobDocument(Document): config (`str`, *optional*): The config on which to apply the job. split (`str`, *optional*): The split on which to apply the job. unicity_id (`str`): A string that identifies the job uniquely. Only one job with the same unicity_id can be in - the started state. The revision is not part of the unicity_id. + the started state. namespace (`str`): The dataset namespace (user or organization) if any, else the dataset name (canonical name). priority (`Priority`, *optional*): The priority of the job. Defaults to Priority.LOW. status (`Status`, *optional*): The status of the job. Defaults to Status.WAITING. @@ -260,7 +260,7 @@ class Queue: the jobs. You can create multiple Queue objects, it has no effect on the database. It's a FIFO queue, with the following properties: - - a job is identified by its input arguments: unicity_id (type, dataset, config and split, NOT revision) + - a job is identified by its input arguments: unicity_id (type, dataset, config and split, revision) - a job can be in one of the following states: waiting, started - a job can be in the queue only once (unicity_id) in the "started" state - a job can be in the queue multiple times in the other states @@ -698,8 +698,7 @@ def finish_job(self, job_id: str) -> Optional[Priority]: ) job_priority = job.priority job.delete() - release_locks(owner=job_id) - # ^ bug: the lock owner is not set to the job id anymore when calling start_job()! + release_lock(key=job.unicity_id) if was_blocked: pending_jobs = self.get_pending_jobs_df(dataset=job.dataset) for _, pending_job in pending_jobs.iterrows(): diff --git a/libs/libcommon/src/libcommon/queue/lock.py b/libs/libcommon/src/libcommon/queue/lock.py index 7f0486e3a5..7b583a896e 100644 --- a/libs/libcommon/src/libcommon/queue/lock.py +++ b/libs/libcommon/src/libcommon/queue/lock.py @@ -180,21 +180,6 @@ def git_branch( return cls(key=key, owner=owner, sleeps=sleeps, ttl=_TTL.LOCK_TTL_SECONDS_TO_WRITE_ON_GIT_BRANCH) -def release_locks(owner: str) -> None: - """ - Release all locks owned by the given owner - - Args: - owner (`str`): the current owner that holds the locks - """ - Lock.objects(owner=owner).update( - write_concern={"w": "majority", "fsync": True}, - read_concern={"level": "majority"}, - owner=None, - updated_at=get_datetime(), - ) - - def release_lock(key: str) -> None: """ Release the lock for a specific key From 0434dc62aa510bdbda7896ec5c48bbf3fe699732 Mon Sep 17 00:00:00 2001 From: Sylvain Lesage Date: Thu, 22 Aug 2024 11:49:09 +0200 Subject: [PATCH 2/4] More explicit test and comments about offset-naive datetimes read from mongo (#3036) * be more explicit about how to handle dates with mongo * add comments --- libs/libcommon/src/libcommon/queue/dataset_blockages.py | 1 + libs/libcommon/src/libcommon/queue/jobs.py | 7 +++++-- libs/libcommon/src/libcommon/queue/lock.py | 8 ++++++-- libs/libcommon/src/libcommon/queue/metrics.py | 4 +++- libs/libcommon/src/libcommon/queue/past_jobs.py | 1 + libs/libcommon/src/libcommon/simple_cache.py | 1 + libs/libcommon/tests/queue/test_past_jobs.py | 6 +++++- 7 files changed, 22 insertions(+), 6 deletions(-) diff --git a/libs/libcommon/src/libcommon/queue/dataset_blockages.py b/libs/libcommon/src/libcommon/queue/dataset_blockages.py index a5791c9b5b..c0cf4134c1 100644 --- a/libs/libcommon/src/libcommon/queue/dataset_blockages.py +++ b/libs/libcommon/src/libcommon/queue/dataset_blockages.py @@ -49,6 +49,7 @@ class DatasetBlockageDocument(Document): Args: dataset (`str`): The dataset on which to apply the job. blocked_at (`datetime`): The date the dataset has been blocked. + When read, it's an offset-naive datetime. Use pytz.UTC.localize() to make it timezone-aware. """ meta = { diff --git a/libs/libcommon/src/libcommon/queue/jobs.py b/libs/libcommon/src/libcommon/queue/jobs.py index cfbdea2085..17297223be 100644 --- a/libs/libcommon/src/libcommon/queue/jobs.py +++ b/libs/libcommon/src/libcommon/queue/jobs.py @@ -152,9 +152,12 @@ class JobDocument(Document): priority (`Priority`, *optional*): The priority of the job. Defaults to Priority.LOW. status (`Status`, *optional*): The status of the job. Defaults to Status.WAITING. difficulty (`int`): The difficulty of the job: 1=easy, 100=hard as a convention (strictly positive integer). - created_at (`datetime`): The creation date of the job. - started_at (`datetime`, *optional*): When the job has started. + created_at (`datetime`): The creation date of the job. When read, it's an offset-naive datetime. + Use pytz.UTC.localize() to make it timezone-aware. + started_at (`datetime`, *optional*): When the job has started. When read, it's an offset-naive datetime. + Use pytz.UTC.localize() to make it timezone-aware. last_heartbeat (`datetime`, *optional*): Last time the running job got a heartbeat from the worker. + When read, it's an offset-naive datetime. Use pytz.UTC.localize() to make it timezone-aware. """ meta = { diff --git a/libs/libcommon/src/libcommon/queue/lock.py b/libs/libcommon/src/libcommon/queue/lock.py index 7b583a896e..e052da0980 100644 --- a/libs/libcommon/src/libcommon/queue/lock.py +++ b/libs/libcommon/src/libcommon/queue/lock.py @@ -79,8 +79,12 @@ class Lock(Document): ttl = IntField() job_id = StringField() # deprecated - created_at = DateTimeField() - updated_at = DateTimeField() + created_at = ( + DateTimeField() + ) # When read, it's an offset-naive datetime. Use pytz.UTC.localize() to make it timezone-aware. + updated_at = ( + DateTimeField() + ) # When read, it's an offset-naive datetime. Use pytz.UTC.localize() to make it timezone-aware. objects = QuerySetManager["Lock"]() diff --git a/libs/libcommon/src/libcommon/queue/metrics.py b/libs/libcommon/src/libcommon/queue/metrics.py index b38d2711f2..73839a3f33 100644 --- a/libs/libcommon/src/libcommon/queue/metrics.py +++ b/libs/libcommon/src/libcommon/queue/metrics.py @@ -54,7 +54,8 @@ class JobTotalMetricDocument(Document): status (`str`): job status see libcommon.queue.jobs.Status dataset_status (`str`): whether the dataset is blocked ("normal", "blocked") total (`int`): total of jobs - created_at (`datetime`): when the metric has been created. + created_at (`datetime`): when the metric has been created. When read, it's an + offset-naive datetime. Use pytz.UTC.localize() to make it timezone-aware. """ id = ObjectIdField(db_field="_id", primary_key=True, default=ObjectId) @@ -84,6 +85,7 @@ class WorkerSizeJobsCountDocument(Document): worker_size (`WorkerSize`): worker size jobs_count (`int`): jobs count created_at (`datetime`): when the metric has been created. + When read, it's an offset-naive datetime. Use pytz.UTC.localize() to make it timezone-aware. """ id = ObjectIdField(db_field="_id", primary_key=True, default=ObjectId) diff --git a/libs/libcommon/src/libcommon/queue/past_jobs.py b/libs/libcommon/src/libcommon/queue/past_jobs.py index 4268f46854..62e9533c5d 100644 --- a/libs/libcommon/src/libcommon/queue/past_jobs.py +++ b/libs/libcommon/src/libcommon/queue/past_jobs.py @@ -55,6 +55,7 @@ class PastJobDocument(Document): dataset (`str`): The dataset on which to apply the job. duration (`int`): The duration of the job, in seconds. finished_at (`datetime`): The date the job has finished. + When read, it's an offset-naive datetime. Use pytz.UTC.localize() to make it timezone-aware. """ meta = { diff --git a/libs/libcommon/src/libcommon/simple_cache.py b/libs/libcommon/src/libcommon/simple_cache.py index cbc38526e5..8f911788a8 100644 --- a/libs/libcommon/src/libcommon/simple_cache.py +++ b/libs/libcommon/src/libcommon/simple_cache.py @@ -131,6 +131,7 @@ class CachedResponseDocument(Document): content (`dict`): The content of the cached response. Can be an error or a valid content. details (`dict`, *optional*): Additional details, eg. a detailed error that we don't want to send as a response. updated_at (`datetime`): When the cache entry has been last updated. + When read, it's an offset-naive datetime. Use pytz.UTC.localize() to make it timezone-aware. duration (`float`, *optional*): Duration of a corresponding job in seconds. job_runner_version (`int`): The version of the job runner that cached the response. failed_runs (`int`): The number of failed_runs to get cached result. diff --git a/libs/libcommon/tests/queue/test_past_jobs.py b/libs/libcommon/tests/queue/test_past_jobs.py index 65f5a78b5a..bbea38169f 100644 --- a/libs/libcommon/tests/queue/test_past_jobs.py +++ b/libs/libcommon/tests/queue/test_past_jobs.py @@ -5,6 +5,7 @@ from datetime import timedelta import pytest +import pytz from libcommon.queue.dataset_blockages import get_blocked_datasets from libcommon.queue.jobs import Queue @@ -56,13 +57,16 @@ def test_create_past_job_raises_if_timezone_unaware() -> None: queue.add_job(job_type="test_type", dataset="test_dataset", revision="test_revision", difficulty=50) job_info = queue.start_job() started_at = queue._get_started_job(job_info["job_id"]).started_at - # ^ mongo looses the timezone, see https://github.com/huggingface/dataset-viewer/issues/862 assert started_at is not None + assert started_at.tzinfo is None + # ^ mongo looses the timezone, see https://github.com/huggingface/dataset-viewer/issues/862 with pytest.raises(TypeError) as exc_info: create_past_job(dataset=DATASET, started_at=started_at, finished_at=finished_at) assert "can't subtract offset-naive and offset-aware datetimes" in str(exc_info.value) + create_past_job(dataset=DATASET, started_at=pytz.UTC.localize(started_at), finished_at=finished_at) + @pytest.mark.parametrize( "jobs,expected_blocked_datasets", From e7fa2e0f92e9c2a4ae1a7dd7a5998b96bfae2e96 Mon Sep 17 00:00:00 2001 From: Sylvain Lesage Date: Thu, 22 Aug 2024 16:34:50 +0200 Subject: [PATCH 3/4] Raise specific errors instead of ConfigNamesError when appropriate (#3041) * this case should not exist (and we have no occurrence in the db) - let it raise * add a specific error code when data files are missing * give a specific exception for common error * for the remaining errors, let retry, it should fix some of them * retry some causes * follow naming convention + pass a message * temporary, to recompute the errors * ConnectionError is a standard error * same with PermissionError * fix tests --- libs/libcommon/src/libcommon/constants.py | 2 ++ libs/libcommon/src/libcommon/exceptions.py | 26 +++++++++++++++++++ .../job_runners/config/parquet_and_info.py | 5 +--- .../job_runners/dataset/config_names.py | 15 ++++++++++- .../job_runners/dataset/test_config_names.py | 9 ++++--- 5 files changed, 48 insertions(+), 9 deletions(-) diff --git a/libs/libcommon/src/libcommon/constants.py b/libs/libcommon/src/libcommon/constants.py index 505b083bdf..11305c8e84 100644 --- a/libs/libcommon/src/libcommon/constants.py +++ b/libs/libcommon/src/libcommon/constants.py @@ -43,6 +43,8 @@ LARGE_MAX_FAILED_RUNS = 30 # for errors that should not be permanent MAX_FAILED_RUNS_PER_ERROR_CODE = { # default + "ConfigNamesError": DEFAULT_MAX_FAILED_RUNS, # <- 20240822: to recompute all of them in the next backfill + "RetryableConfigNamesError": DEFAULT_MAX_FAILED_RUNS, "ConnectionError": DEFAULT_MAX_FAILED_RUNS, "ExternalServerError": DEFAULT_MAX_FAILED_RUNS, "JobManagerCrashedError": DEFAULT_MAX_FAILED_RUNS, diff --git a/libs/libcommon/src/libcommon/exceptions.py b/libs/libcommon/src/libcommon/exceptions.py index b0b00e042d..dd1010f40b 100644 --- a/libs/libcommon/src/libcommon/exceptions.py +++ b/libs/libcommon/src/libcommon/exceptions.py @@ -78,6 +78,7 @@ def as_response(self) -> ErrorResponse: "ConfigNamesError", "ConfigNotFoundError", "CreateCommitError", + "DataFilesNotFoundError", "DatasetGenerationError", "DatasetGenerationCastError", "DatasetInBlockListError", @@ -99,6 +100,7 @@ def as_response(self) -> ErrorResponse: "ExternalServerError", "FeaturesError", "FeaturesResponseEmptyError", + "FileFormatMismatchBetweenSplitsError", "FileSystemError", "HfHubError", "InfoError", @@ -120,6 +122,7 @@ def as_response(self) -> ErrorResponse: "PreviousStepStatusError", "PreviousStepStillProcessingError", "PolarsParquetReadError", + "RetryableConfigNamesError", "RowsPostProcessingError", "SplitsNamesError", "SplitNamesFromStreamingError", @@ -185,6 +188,13 @@ def __init__(self, message: str, cause: Optional[BaseException] = None): super().__init__(message, HTTPStatus.INTERNAL_SERVER_ERROR, "CreateCommitError", cause, False) +class DataFilesNotFoundError(CacheableError): + """No (supported) data files found.""" + + def __init__(self, message: str, cause: Optional[BaseException] = None): + super().__init__(message, HTTPStatus.INTERNAL_SERVER_ERROR, "DataFilesNotFoundError", cause, False) + + class DatasetGenerationError(CacheableError): """The dataset generation failed.""" @@ -323,6 +333,15 @@ def __init__(self, message: str, cause: Optional[BaseException] = None): super().__init__(message, HTTPStatus.INTERNAL_SERVER_ERROR, "FeaturesResponseEmptyError", cause, True) +class FileFormatMismatchBetweenSplitsError(CacheableError): + """Couldn't infer the same data file format for all splits.""" + + def __init__(self, message: str, cause: Optional[BaseException] = None): + super().__init__( + message, HTTPStatus.INTERNAL_SERVER_ERROR, "FileFormatMismatchBetweenSplitsError", cause, False + ) + + class FileSystemError(CacheableError): """An error happen reading from File System.""" @@ -446,6 +465,13 @@ def __init__(self, message: str, cause: Optional[BaseException] = None): super().__init__(message, HTTPStatus.INTERNAL_SERVER_ERROR, "PreviousStepStillProcessingError", cause, False) +class RetryableConfigNamesError(CacheableError): + """The config names could not be fetched, but we should retry.""" + + def __init__(self, message: str, cause: Optional[BaseException] = None): + super().__init__(message, HTTPStatus.INTERNAL_SERVER_ERROR, "RetryableConfigNamesError", cause, True) + + class RowsPostProcessingError(CacheableError): """The rows could not be post-processed successfully.""" diff --git a/services/worker/src/worker/job_runners/config/parquet_and_info.py b/services/worker/src/worker/job_runners/config/parquet_and_info.py index 109b20ce83..2d2c8353b6 100644 --- a/services/worker/src/worker/job_runners/config/parquet_and_info.py +++ b/services/worker/src/worker/job_runners/config/parquet_and_info.py @@ -62,7 +62,6 @@ ) from libcommon.dtos import JobInfo, SplitHubFile from libcommon.exceptions import ( - ConfigNamesError, CreateCommitError, DatasetGenerationCastError, DatasetGenerationError, @@ -1408,8 +1407,6 @@ def compute_config_parquet_and_info_response( If one of the commits could not be created on the Hub. [~`libcommon.exceptions.EmptyDatasetError`]: The dataset is empty. - [~`libcommon.exceptions.ConfigNamesError`]: - If the list of configurations could not be obtained using the datasets library. [~`libcommon.exceptions.UnsupportedExternalFilesError`]: If we failed to get the external files sizes to make sure we can convert the dataset to parquet [~`libcommon.exceptions.ExternalFilesSizeRequestHTTPError`]: @@ -1450,7 +1447,7 @@ def compute_config_parquet_and_info_response( config_names = {config_name_item["config"] for config_name_item in config_names_content["config_names"]} if config not in config_names: - raise ConfigNamesError(f"{config=} does not exist in {dataset=}") + raise ValueError(f"{config=} does not exist in {dataset=}") hf_api = HfApi(endpoint=hf_endpoint, token=hf_token) committer_hf_api = HfApi(endpoint=hf_endpoint, token=committer_hf_token) diff --git a/services/worker/src/worker/job_runners/dataset/config_names.py b/services/worker/src/worker/job_runners/dataset/config_names.py index 003c3568e6..293b9f5fde 100644 --- a/services/worker/src/worker/job_runners/dataset/config_names.py +++ b/services/worker/src/worker/job_runners/dataset/config_names.py @@ -12,13 +12,20 @@ load_dataset_builder, ) from datasets.data_files import EmptyDatasetError as _EmptyDatasetError -from datasets.exceptions import DefunctDatasetError +from datasets.exceptions import ( + DataFilesNotFoundError as _DataFilesNotFoundError, +) +from datasets.exceptions import DatasetNotFoundError, DefunctDatasetError +from huggingface_hub.utils import HfHubHTTPError from libcommon.exceptions import ( ConfigNamesError, + DataFilesNotFoundError, DatasetModuleNotInstalledError, DatasetWithScriptNotSupportedError, DatasetWithTooManyConfigsError, EmptyDatasetError, + FileFormatMismatchBetweenSplitsError, + RetryableConfigNamesError, ) from worker.dtos import CompleteJobResult, ConfigNameItem, DatasetConfigNamesResponse @@ -107,10 +114,16 @@ def compute_config_names_response( ] except _EmptyDatasetError as err: raise EmptyDatasetError("The dataset is empty.", cause=err) from err + except _DataFilesNotFoundError as err: + raise DataFilesNotFoundError(str(err), cause=err) from err except ValueError as err: if "trust_remote_code" in str(err): raise DatasetWithScriptNotSupportedError from err + if "Couldn't infer the same data file format for all splits" in str(err): + raise FileFormatMismatchBetweenSplitsError(str(err), cause=err) from err raise ConfigNamesError("Cannot get the config names for the dataset.", cause=err) from err + except (HfHubHTTPError, BrokenPipeError, DatasetNotFoundError, PermissionError, ConnectionError) as err: + raise RetryableConfigNamesError("Cannot get the config names for the dataset.", cause=err) from err except ImportError as err: # this should only happen if the dataset is in the allow list, which should soon disappear raise DatasetModuleNotInstalledError( diff --git a/services/worker/tests/job_runners/dataset/test_config_names.py b/services/worker/tests/job_runners/dataset/test_config_names.py index 234322de48..8f72200391 100644 --- a/services/worker/tests/job_runners/dataset/test_config_names.py +++ b/services/worker/tests/job_runners/dataset/test_config_names.py @@ -3,6 +3,7 @@ from collections.abc import Callable from dataclasses import replace +from typing import Optional from unittest.mock import patch import pytest @@ -98,9 +99,9 @@ def test_compute_too_many_configs( ("n_configs_with_default", False, None, None), # should we really test the following cases? # The assumption is that the dataset exists and is accessible with the token - ("does_not_exist", False, "ConfigNamesError", "DatasetNotFoundError"), - ("gated", False, "ConfigNamesError", "ConnectionError"), # See: huggingface/datasets#7109 - ("private", False, "ConfigNamesError", "DatasetNotFoundError"), + ("does_not_exist", False, "RetryableConfigNamesError", "DatasetNotFoundError"), + ("gated", False, "RetryableConfigNamesError", "ConnectionError"), # See: huggingface/datasets#7109 + ("private", False, "RetryableConfigNamesError", "DatasetNotFoundError"), ], ) def test_compute_splits_response( @@ -114,7 +115,7 @@ def test_compute_splits_response( get_job_runner: GetJobRunner, name: str, use_token: bool, - error_code: str, + error_code: Optional[str], cause: str, app_config: AppConfig, ) -> None: From fc2f2b12e3d642403f00aa618e88bd95ed7b39a9 Mon Sep 17 00:00:00 2001 From: Sylvain Lesage Date: Thu, 22 Aug 2024 16:38:12 +0200 Subject: [PATCH 4/4] show the traceback of the cause, it's clearer (#3042) * show the traceback of the cause, it's clearer * retry for backfill --- libs/libcommon/src/libcommon/constants.py | 1 + .../worker/src/worker/job_runners/config/parquet_and_info.py | 4 +++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/libs/libcommon/src/libcommon/constants.py b/libs/libcommon/src/libcommon/constants.py index 11305c8e84..35679390ec 100644 --- a/libs/libcommon/src/libcommon/constants.py +++ b/libs/libcommon/src/libcommon/constants.py @@ -43,6 +43,7 @@ LARGE_MAX_FAILED_RUNS = 30 # for errors that should not be permanent MAX_FAILED_RUNS_PER_ERROR_CODE = { # default + "DatasetGenerationError": DEFAULT_MAX_FAILED_RUNS, # <- 20240822: to recompute all of them in the next backfill "ConfigNamesError": DEFAULT_MAX_FAILED_RUNS, # <- 20240822: to recompute all of them in the next backfill "RetryableConfigNamesError": DEFAULT_MAX_FAILED_RUNS, "ConnectionError": DEFAULT_MAX_FAILED_RUNS, diff --git a/services/worker/src/worker/job_runners/config/parquet_and_info.py b/services/worker/src/worker/job_runners/config/parquet_and_info.py index 2d2c8353b6..a0a9d8d0a2 100644 --- a/services/worker/src/worker/job_runners/config/parquet_and_info.py +++ b/services/worker/src/worker/job_runners/config/parquet_and_info.py @@ -1531,7 +1531,9 @@ def compute_config_parquet_and_info_response( except datasets.exceptions.DatasetGenerationCastError as err: raise DatasetGenerationCastError("The dataset generation failed because of a cast error", cause=err) from err except datasets.exceptions.DatasetGenerationError as err: - raise DatasetGenerationError("The dataset generation failed", cause=err) from err + if err.__cause__: + raise DatasetGenerationError("The dataset generation failed", cause=err.__cause__) from err + raise DatasetGenerationError("The dataset generation failed") from err raise_if_long_column_name(builder.info.features)