Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into update-datasets-2.20.0
Browse files Browse the repository at this point in the history
  • Loading branch information
albertvillanova committed Aug 22, 2024
2 parents daf5fd5 + fc2f2b1 commit 8599592
Show file tree
Hide file tree
Showing 12 changed files with 78 additions and 36 deletions.
3 changes: 3 additions & 0 deletions libs/libcommon/src/libcommon/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@
LARGE_MAX_FAILED_RUNS = 30 # for errors that should not be permanent
MAX_FAILED_RUNS_PER_ERROR_CODE = {
# default
"DatasetGenerationError": DEFAULT_MAX_FAILED_RUNS, # <- 20240822: to recompute all of them in the next backfill
"ConfigNamesError": DEFAULT_MAX_FAILED_RUNS, # <- 20240822: to recompute all of them in the next backfill
"RetryableConfigNamesError": DEFAULT_MAX_FAILED_RUNS,
"ConnectionError": DEFAULT_MAX_FAILED_RUNS,
"ExternalServerError": DEFAULT_MAX_FAILED_RUNS,
"JobManagerCrashedError": DEFAULT_MAX_FAILED_RUNS,
Expand Down
26 changes: 26 additions & 0 deletions libs/libcommon/src/libcommon/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ def as_response(self) -> ErrorResponse:
"ConfigNamesError",
"ConfigNotFoundError",
"CreateCommitError",
"DataFilesNotFoundError",
"DatasetGenerationError",
"DatasetGenerationCastError",
"DatasetInBlockListError",
Expand All @@ -99,6 +100,7 @@ def as_response(self) -> ErrorResponse:
"ExternalServerError",
"FeaturesError",
"FeaturesResponseEmptyError",
"FileFormatMismatchBetweenSplitsError",
"FileSystemError",
"HfHubError",
"InfoError",
Expand All @@ -120,6 +122,7 @@ def as_response(self) -> ErrorResponse:
"PreviousStepStatusError",
"PreviousStepStillProcessingError",
"PolarsParquetReadError",
"RetryableConfigNamesError",
"RowsPostProcessingError",
"SplitsNamesError",
"SplitNamesFromStreamingError",
Expand Down Expand Up @@ -185,6 +188,13 @@ def __init__(self, message: str, cause: Optional[BaseException] = None):
super().__init__(message, HTTPStatus.INTERNAL_SERVER_ERROR, "CreateCommitError", cause, False)


class DataFilesNotFoundError(CacheableError):
"""No (supported) data files found."""

def __init__(self, message: str, cause: Optional[BaseException] = None):
super().__init__(message, HTTPStatus.INTERNAL_SERVER_ERROR, "DataFilesNotFoundError", cause, False)


class DatasetGenerationError(CacheableError):
"""The dataset generation failed."""

Expand Down Expand Up @@ -323,6 +333,15 @@ def __init__(self, message: str, cause: Optional[BaseException] = None):
super().__init__(message, HTTPStatus.INTERNAL_SERVER_ERROR, "FeaturesResponseEmptyError", cause, True)


class FileFormatMismatchBetweenSplitsError(CacheableError):
"""Couldn't infer the same data file format for all splits."""

def __init__(self, message: str, cause: Optional[BaseException] = None):
super().__init__(
message, HTTPStatus.INTERNAL_SERVER_ERROR, "FileFormatMismatchBetweenSplitsError", cause, False
)


class FileSystemError(CacheableError):
"""An error happen reading from File System."""

Expand Down Expand Up @@ -446,6 +465,13 @@ def __init__(self, message: str, cause: Optional[BaseException] = None):
super().__init__(message, HTTPStatus.INTERNAL_SERVER_ERROR, "PreviousStepStillProcessingError", cause, False)


class RetryableConfigNamesError(CacheableError):
"""The config names could not be fetched, but we should retry."""

def __init__(self, message: str, cause: Optional[BaseException] = None):
super().__init__(message, HTTPStatus.INTERNAL_SERVER_ERROR, "RetryableConfigNamesError", cause, True)


class RowsPostProcessingError(CacheableError):
"""The rows could not be post-processed successfully."""

Expand Down
1 change: 1 addition & 0 deletions libs/libcommon/src/libcommon/queue/dataset_blockages.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class DatasetBlockageDocument(Document):
Args:
dataset (`str`): The dataset on which to apply the job.
blocked_at (`datetime`): The date the dataset has been blocked.
When read, it's an offset-naive datetime. Use pytz.UTC.localize() to make it timezone-aware.
"""

meta = {
Expand Down
16 changes: 9 additions & 7 deletions libs/libcommon/src/libcommon/queue/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
)
from libcommon.dtos import FlatJobInfo, JobInfo, Priority, Status, WorkerSize
from libcommon.queue.dataset_blockages import DATASET_STATUS_BLOCKED, DATASET_STATUS_NORMAL, get_blocked_datasets
from libcommon.queue.lock import lock, release_lock, release_locks
from libcommon.queue.lock import lock, release_lock
from libcommon.queue.metrics import (
decrease_metric,
decrease_worker_size_metrics,
Expand Down Expand Up @@ -147,14 +147,17 @@ class JobDocument(Document):
config (`str`, *optional*): The config on which to apply the job.
split (`str`, *optional*): The split on which to apply the job.
unicity_id (`str`): A string that identifies the job uniquely. Only one job with the same unicity_id can be in
the started state. The revision is not part of the unicity_id.
the started state.
namespace (`str`): The dataset namespace (user or organization) if any, else the dataset name (canonical name).
priority (`Priority`, *optional*): The priority of the job. Defaults to Priority.LOW.
status (`Status`, *optional*): The status of the job. Defaults to Status.WAITING.
difficulty (`int`): The difficulty of the job: 1=easy, 100=hard as a convention (strictly positive integer).
created_at (`datetime`): The creation date of the job.
started_at (`datetime`, *optional*): When the job has started.
created_at (`datetime`): The creation date of the job. When read, it's an offset-naive datetime.
Use pytz.UTC.localize() to make it timezone-aware.
started_at (`datetime`, *optional*): When the job has started. When read, it's an offset-naive datetime.
Use pytz.UTC.localize() to make it timezone-aware.
last_heartbeat (`datetime`, *optional*): Last time the running job got a heartbeat from the worker.
When read, it's an offset-naive datetime. Use pytz.UTC.localize() to make it timezone-aware.
"""

meta = {
Expand Down Expand Up @@ -260,7 +263,7 @@ class Queue:
the jobs. You can create multiple Queue objects, it has no effect on the database.
It's a FIFO queue, with the following properties:
- a job is identified by its input arguments: unicity_id (type, dataset, config and split, NOT revision)
- a job is identified by its input arguments: unicity_id (type, dataset, config and split, revision)
- a job can be in one of the following states: waiting, started
- a job can be in the queue only once (unicity_id) in the "started" state
- a job can be in the queue multiple times in the other states
Expand Down Expand Up @@ -698,8 +701,7 @@ def finish_job(self, job_id: str) -> Optional[Priority]:
)
job_priority = job.priority
job.delete()
release_locks(owner=job_id)
# ^ bug: the lock owner is not set to the job id anymore when calling start_job()!
release_lock(key=job.unicity_id)
if was_blocked:
pending_jobs = self.get_pending_jobs_df(dataset=job.dataset)
for _, pending_job in pending_jobs.iterrows():
Expand Down
23 changes: 6 additions & 17 deletions libs/libcommon/src/libcommon/queue/lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,12 @@ class Lock(Document):
ttl = IntField()
job_id = StringField() # deprecated

created_at = DateTimeField()
updated_at = DateTimeField()
created_at = (
DateTimeField()
) # When read, it's an offset-naive datetime. Use pytz.UTC.localize() to make it timezone-aware.
updated_at = (
DateTimeField()
) # When read, it's an offset-naive datetime. Use pytz.UTC.localize() to make it timezone-aware.

objects = QuerySetManager["Lock"]()

Expand Down Expand Up @@ -180,21 +184,6 @@ def git_branch(
return cls(key=key, owner=owner, sleeps=sleeps, ttl=_TTL.LOCK_TTL_SECONDS_TO_WRITE_ON_GIT_BRANCH)


def release_locks(owner: str) -> None:
"""
Release all locks owned by the given owner
Args:
owner (`str`): the current owner that holds the locks
"""
Lock.objects(owner=owner).update(
write_concern={"w": "majority", "fsync": True},
read_concern={"level": "majority"},
owner=None,
updated_at=get_datetime(),
)


def release_lock(key: str) -> None:
"""
Release the lock for a specific key
Expand Down
4 changes: 3 additions & 1 deletion libs/libcommon/src/libcommon/queue/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ class JobTotalMetricDocument(Document):
status (`str`): job status see libcommon.queue.jobs.Status
dataset_status (`str`): whether the dataset is blocked ("normal", "blocked")
total (`int`): total of jobs
created_at (`datetime`): when the metric has been created.
created_at (`datetime`): when the metric has been created. When read, it's an
offset-naive datetime. Use pytz.UTC.localize() to make it timezone-aware.
"""

id = ObjectIdField(db_field="_id", primary_key=True, default=ObjectId)
Expand Down Expand Up @@ -84,6 +85,7 @@ class WorkerSizeJobsCountDocument(Document):
worker_size (`WorkerSize`): worker size
jobs_count (`int`): jobs count
created_at (`datetime`): when the metric has been created.
When read, it's an offset-naive datetime. Use pytz.UTC.localize() to make it timezone-aware.
"""

id = ObjectIdField(db_field="_id", primary_key=True, default=ObjectId)
Expand Down
1 change: 1 addition & 0 deletions libs/libcommon/src/libcommon/queue/past_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class PastJobDocument(Document):
dataset (`str`): The dataset on which to apply the job.
duration (`int`): The duration of the job, in seconds.
finished_at (`datetime`): The date the job has finished.
When read, it's an offset-naive datetime. Use pytz.UTC.localize() to make it timezone-aware.
"""

meta = {
Expand Down
1 change: 1 addition & 0 deletions libs/libcommon/src/libcommon/simple_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ class CachedResponseDocument(Document):
content (`dict`): The content of the cached response. Can be an error or a valid content.
details (`dict`, *optional*): Additional details, eg. a detailed error that we don't want to send as a response.
updated_at (`datetime`): When the cache entry has been last updated.
When read, it's an offset-naive datetime. Use pytz.UTC.localize() to make it timezone-aware.
duration (`float`, *optional*): Duration of a corresponding job in seconds.
job_runner_version (`int`): The version of the job runner that cached the response.
failed_runs (`int`): The number of failed_runs to get cached result.
Expand Down
6 changes: 5 additions & 1 deletion libs/libcommon/tests/queue/test_past_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from datetime import timedelta

import pytest
import pytz

from libcommon.queue.dataset_blockages import get_blocked_datasets
from libcommon.queue.jobs import Queue
Expand Down Expand Up @@ -56,13 +57,16 @@ def test_create_past_job_raises_if_timezone_unaware() -> None:
queue.add_job(job_type="test_type", dataset="test_dataset", revision="test_revision", difficulty=50)
job_info = queue.start_job()
started_at = queue._get_started_job(job_info["job_id"]).started_at
# ^ mongo looses the timezone, see https://github.com/huggingface/dataset-viewer/issues/862
assert started_at is not None
assert started_at.tzinfo is None
# ^ mongo looses the timezone, see https://github.com/huggingface/dataset-viewer/issues/862

with pytest.raises(TypeError) as exc_info:
create_past_job(dataset=DATASET, started_at=started_at, finished_at=finished_at)
assert "can't subtract offset-naive and offset-aware datetimes" in str(exc_info.value)

create_past_job(dataset=DATASET, started_at=pytz.UTC.localize(started_at), finished_at=finished_at)


@pytest.mark.parametrize(
"jobs,expected_blocked_datasets",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@
)
from libcommon.dtos import JobInfo, SplitHubFile
from libcommon.exceptions import (
ConfigNamesError,
CreateCommitError,
DatasetGenerationCastError,
DatasetGenerationError,
Expand Down Expand Up @@ -1408,8 +1407,6 @@ def compute_config_parquet_and_info_response(
If one of the commits could not be created on the Hub.
[~`libcommon.exceptions.EmptyDatasetError`]:
The dataset is empty.
[~`libcommon.exceptions.ConfigNamesError`]:
If the list of configurations could not be obtained using the datasets library.
[~`libcommon.exceptions.UnsupportedExternalFilesError`]:
If we failed to get the external files sizes to make sure we can convert the dataset to parquet
[~`libcommon.exceptions.ExternalFilesSizeRequestHTTPError`]:
Expand Down Expand Up @@ -1450,7 +1447,7 @@ def compute_config_parquet_and_info_response(

config_names = {config_name_item["config"] for config_name_item in config_names_content["config_names"]}
if config not in config_names:
raise ConfigNamesError(f"{config=} does not exist in {dataset=}")
raise ValueError(f"{config=} does not exist in {dataset=}")

hf_api = HfApi(endpoint=hf_endpoint, token=hf_token)
committer_hf_api = HfApi(endpoint=hf_endpoint, token=committer_hf_token)
Expand Down Expand Up @@ -1534,7 +1531,9 @@ def compute_config_parquet_and_info_response(
except datasets.exceptions.DatasetGenerationCastError as err:
raise DatasetGenerationCastError("The dataset generation failed because of a cast error", cause=err) from err
except datasets.exceptions.DatasetGenerationError as err:
raise DatasetGenerationError("The dataset generation failed", cause=err) from err
if err.__cause__:
raise DatasetGenerationError("The dataset generation failed", cause=err.__cause__) from err
raise DatasetGenerationError("The dataset generation failed") from err

raise_if_long_column_name(builder.info.features)

Expand Down
15 changes: 14 additions & 1 deletion services/worker/src/worker/job_runners/dataset/config_names.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,20 @@
load_dataset_builder,
)
from datasets.data_files import EmptyDatasetError as _EmptyDatasetError
from datasets.exceptions import DefunctDatasetError
from datasets.exceptions import (
DataFilesNotFoundError as _DataFilesNotFoundError,
)
from datasets.exceptions import DatasetNotFoundError, DefunctDatasetError
from huggingface_hub.utils import HfHubHTTPError
from libcommon.exceptions import (
ConfigNamesError,
DataFilesNotFoundError,
DatasetModuleNotInstalledError,
DatasetWithScriptNotSupportedError,
DatasetWithTooManyConfigsError,
EmptyDatasetError,
FileFormatMismatchBetweenSplitsError,
RetryableConfigNamesError,
)

from worker.dtos import CompleteJobResult, ConfigNameItem, DatasetConfigNamesResponse
Expand Down Expand Up @@ -107,10 +114,16 @@ def compute_config_names_response(
]
except _EmptyDatasetError as err:
raise EmptyDatasetError("The dataset is empty.", cause=err) from err
except _DataFilesNotFoundError as err:
raise DataFilesNotFoundError(str(err), cause=err) from err
except ValueError as err:
if "trust_remote_code" in str(err):
raise DatasetWithScriptNotSupportedError from err
if "Couldn't infer the same data file format for all splits" in str(err):
raise FileFormatMismatchBetweenSplitsError(str(err), cause=err) from err
raise ConfigNamesError("Cannot get the config names for the dataset.", cause=err) from err
except (HfHubHTTPError, BrokenPipeError, DatasetNotFoundError, PermissionError, ConnectionError) as err:
raise RetryableConfigNamesError("Cannot get the config names for the dataset.", cause=err) from err
except ImportError as err:
# this should only happen if the dataset is in the allow list, which should soon disappear
raise DatasetModuleNotInstalledError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from collections.abc import Callable
from dataclasses import replace
from typing import Optional
from unittest.mock import patch

import pytest
Expand Down Expand Up @@ -98,9 +99,9 @@ def test_compute_too_many_configs(
("n_configs_with_default", False, None, None),
# should we really test the following cases?
# The assumption is that the dataset exists and is accessible with the token
("does_not_exist", False, "ConfigNamesError", "DatasetNotFoundError"),
("gated", False, "ConfigNamesError", "ConnectionError"), # See: huggingface/datasets#7109
("private", False, "ConfigNamesError", "DatasetNotFoundError"),
("does_not_exist", False, "RetryableConfigNamesError", "DatasetNotFoundError"),
("gated", False, "RetryableConfigNamesError", "ConnectionError"), # See: huggingface/datasets#7109
("private", False, "RetryableConfigNamesError", "DatasetNotFoundError"),
],
)
def test_compute_splits_response(
Expand All @@ -114,7 +115,7 @@ def test_compute_splits_response(
get_job_runner: GetJobRunner,
name: str,
use_token: bool,
error_code: str,
error_code: Optional[str],
cause: str,
app_config: AppConfig,
) -> None:
Expand Down

0 comments on commit 8599592

Please sign in to comment.