diff --git a/libs/libcommon/src/libcommon/processing_graph.py b/libs/libcommon/src/libcommon/processing_graph.py index 91a929208..3ef134ea1 100644 --- a/libs/libcommon/src/libcommon/processing_graph.py +++ b/libs/libcommon/src/libcommon/processing_graph.py @@ -708,7 +708,7 @@ def parse_id(id: str) -> tuple[str, str, Optional[str], Optional[str], str]: }, "dataset-modalities": { "input_type": "dataset", - "triggered_by": ["dataset-info", "dataset-filetypes"], + "triggered_by": ["dataset-info", "dataset-filetypes", "split-image-url-columns"], "job_runner_version": 2, "difficulty": 20, }, diff --git a/libs/libcommon/tests/test_processing_graph.py b/libs/libcommon/tests/test_processing_graph.py index 6d94d7a47..711756779 100644 --- a/libs/libcommon/tests/test_processing_graph.py +++ b/libs/libcommon/tests/test_processing_graph.py @@ -168,8 +168,19 @@ def test_graph() -> None: ( "dataset-modalities", ["dataset-hub-cache"], - ["dataset-info", "dataset-filetypes"], - ["dataset-config-names", "config-parquet-and-info", "config-info", "dataset-info", "dataset-filetypes"], + ["dataset-info", "dataset-filetypes", "split-image-url-columns"], + [ + "config-info", + "config-parquet", + "config-parquet-and-info", + "config-parquet-metadata", + "config-split-names", + "dataset-config-names", + "dataset-filetypes", + "dataset-info", + "split-first-rows", + "split-image-url-columns", + ], ), ( "dataset-is-valid", @@ -195,7 +206,7 @@ def test_graph() -> None: ), ( "split-image-url-columns", - ["split-opt-in-out-urls-scan"], + ["dataset-modalities", "split-opt-in-out-urls-scan"], ["split-first-rows"], [ "dataset-config-names", @@ -371,6 +382,7 @@ def test_graph() -> None: "dataset-size", "split-duckdb-index", "split-first-rows", + "split-image-url-columns", "split-is-valid", "split-descriptive-statistics", ], diff --git a/libs/libcommon/tests/test_utils.py b/libs/libcommon/tests/test_utils.py index 6f706d510..d929e8d7d 100644 --- a/libs/libcommon/tests/test_utils.py +++ b/libs/libcommon/tests/test_utils.py @@ -152,4 +152,4 @@ def test_serialize_and_truncate_raises(obj: Any, max_bytes: int) -> None: def test_get_duration() -> None: - assert get_duration(get_datetime() - timedelta(seconds=10)) == pytest.approx(10) + assert get_duration(get_datetime() - timedelta(seconds=10)) == pytest.approx(10, rel=0.01) diff --git a/services/worker/src/worker/job_runners/dataset/modalities.py b/services/worker/src/worker/job_runners/dataset/modalities.py index 0e5280923..a287a9c15 100644 --- a/services/worker/src/worker/job_runners/dataset/modalities.py +++ b/services/worker/src/worker/job_runners/dataset/modalities.py @@ -2,12 +2,15 @@ # Copyright 2022 The HuggingFace Authors. import logging +from http import HTTPStatus from datasets import Audio, Features, Image, Sequence, Translation, TranslationVariableLanguages, Value from datasets.features.features import FeatureType, _visit from libcommon.exceptions import PreviousStepFormatError from libcommon.simple_cache import ( + CachedArtifactNotFoundError, get_previous_step_or_raise, + get_response, ) from worker.dtos import ( @@ -106,6 +109,53 @@ def detect_modalities_from_features(dataset: str) -> set[DatasetModality]: return modalities +def detect_modalities_from_url_columns(dataset: str) -> set[DatasetModality]: + """ + Detect modalities of a dataset using the type of URL columns. + E.g. if a column contains URLs of images. + + Args: + dataset (`str`): + A namespace (user or an organization) and a repo name separated by a `/`. + + Raises: + [~`libcommon.simple_cache.CachedArtifactError`]: + If the previous step gave an error. + [~`libcommon.exceptions.PreviousStepFormatError`]: + If the content of the previous step has not the expected format + + Returns: + `set[DatasetModality]`: A set of modalities. + """ + split_names_response = get_previous_step_or_raise(kind="dataset-split-names", dataset=dataset) + content = split_names_response["content"] + if "splits" not in content and not isinstance(content["splits"], list): + raise PreviousStepFormatError("Previous step did not return the expected content: 'splits'.") + + try: + for split_item in content["splits"][:10]: # no need to check all the configs + config = split_item["config"] + split = split_item["split"] + try: + response = get_response(kind="split-image-url-columns", dataset=dataset, config=config, split=split) + except CachedArtifactNotFoundError: + logging.debug("No response found in previous step for this dataset: 'split-image-url-columns'.") + continue + if response["http_status"] != HTTPStatus.OK: + logging.debug(f"Previous step gave an error: {response['http_status']}.") + continue + else: + try: + if response["content"]["columns"]: + return {"image"} + except Exception as e: + raise PreviousStepFormatError("Previous step did not return the expected content.", e) from e + except Exception as e: + raise PreviousStepFormatError("Previous step did not return the expected content.", e) from e + + return set() + + # from https://developer.mozilla.org/en-US/docs/Web/Media/Formats/Image_types IMAGE_EXTENSIONS = { ".apng", @@ -302,6 +352,14 @@ def compute_modalities_response(dataset: str) -> DatasetModalitiesResponse: logging.info(f"failed to detect modalities from features of {dataset=}") pass + try: + modalities.update(detect_modalities_from_url_columns(dataset)) + except PreviousStepFormatError: + raise + except Exception: + logging.info(f"failed to detect modalities from file types of {dataset=}") + pass + try: modalities.update(detect_modalities_from_filetypes(dataset)) except PreviousStepFormatError: diff --git a/services/worker/tests/job_runners/dataset/test_modalities.py b/services/worker/tests/job_runners/dataset/test_modalities.py index b1e394c28..e78d63a22 100644 --- a/services/worker/tests/job_runners/dataset/test_modalities.py +++ b/services/worker/tests/job_runners/dataset/test_modalities.py @@ -33,6 +33,7 @@ def prepare_and_clean_mongo(app_config: AppConfig) -> None: IMAGE_TEXT_DATASET = "image-text-dataset" IMAGE_DATASET = "image-dataset" TIME_SERIES_DATASET = "time-series-dataset" +IMAGE_URLS_DATASET = "image-urls-dataset" ERROR_DATASET = "error-dataset" text_features = Features({"conversations": [{"from": Value("string"), "value": Value("string")}]}) @@ -174,6 +175,24 @@ def prepare_and_clean_mongo(app_config: AppConfig) -> None: }, progress=1.0, ) +UPSTREAM_RESPONSE_IMAGE_URL_DATASET_SPLITS: UpstreamResponse = UpstreamResponse( + kind="dataset-split-names", + dataset=IMAGE_URLS_DATASET, + dataset_git_revision=REVISION_NAME, + http_status=HTTPStatus.OK, + content={"splits": [{"dataset": IMAGE_URLS_DATASET, "config": "default", "split": "train"}]}, + progress=1.0, +) +UPSTREAM_RESPONSE_IMAGE_URL_COLUMNS: UpstreamResponse = UpstreamResponse( + kind="split-image-url-columns", + dataset=IMAGE_URLS_DATASET, + config="default", + split="train", + dataset_git_revision=REVISION_NAME, + http_status=HTTPStatus.OK, + content={"columns": "image_url"}, + progress=1.0, +) EXPECTED_TEXT: tuple[DatasetModalitiesResponse, float] = ( {"modalities": ["text"]}, @@ -212,6 +231,10 @@ def prepare_and_clean_mongo(app_config: AppConfig) -> None: {"modalities": ["timeseries"]}, 1.0, ) +EXPECTED_IMAGE_URLS: tuple[DatasetModalitiesResponse, float] = ( + {"modalities": ["image"]}, + 1.0, +) @pytest.fixture @@ -325,6 +348,14 @@ def _get_job_runner( ], EXPECTED_TIME_SERIES, ), + ( + IMAGE_URLS_DATASET, + [ + UPSTREAM_RESPONSE_IMAGE_URL_DATASET_SPLITS, + UPSTREAM_RESPONSE_IMAGE_URL_COLUMNS, + ], + EXPECTED_IMAGE_URLS, + ), ], ) def test_compute(