Skip to content

Commit

Permalink
Add num_rows estimate in hub_cache (#2940)
Browse files Browse the repository at this point in the history
* add num_rows estimate in hub_cache

* fix tests

* add migration

* fix test

* fix test

* add num_rows_source

* bump job version

* update tests

* Revert "update tests"

This reverts commit fd5fec8.

* Revert "bump job version"

This reverts commit 4ce87cc.

* Revert "add num_rows_source"

This reverts commit 9207cdb.

* support mix of exact and estimated to compute estimated at config level

* same for dataset level
  • Loading branch information
lhoestq authored Jun 25, 2024
1 parent 0de3ad7 commit 2245d24
Show file tree
Hide file tree
Showing 9 changed files with 877 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# SPDX-License-Identifier: Apache-2.0
# Copyright 2023 The HuggingFace Authors.

import logging

from libcommon.constants import CACHE_COLLECTION_RESPONSES, CACHE_MONGOENGINE_ALIAS
from libcommon.simple_cache import CachedResponseDocument
from mongoengine.connection import get_db

from mongodb_migration.check import check_documents
from mongodb_migration.migration import Migration


# connection already occurred in the main.py (caveat: we use globals)
class MigrationAddEstimatedNumRowsToSizeCacheResponse(Migration):
def up(self) -> None:
# See https://docs.mongoengine.org/guide/migration.html#example-1-addition-of-a-field
logging.info(
"If missing, add the 'estimated_num_rows' field with the default value"
" None to the cached results of dataset-size and config-size"
)
db = get_db(CACHE_MONGOENGINE_ALIAS)
db[CACHE_COLLECTION_RESPONSES].update_many(
{
"kind": "config-size",
"http_status": 200,
"content.size.config.estimated_num_rows": {"$exists": False},
},
{
"$set": {
"content.size.config.estimated_num_rows": None,
"content.size.splits.$[].estimated_num_rows": None,
}
},
)
db[CACHE_COLLECTION_RESPONSES].update_many(
{
"kind": "dataset-size",
"http_status": 200,
"content.size.dataset.estimated_num_rows": {"$exists": False},
},
{
"$set": {
"content.size.dataset.estimated_num_rows": None,
"content.size.configs.$[].estimated_num_rows": None,
"content.size.splits.$[].estimated_num_rows": None,
}
},
)

def down(self) -> None:
logging.info("Remove the 'config-size' field from all the cached results")
db = get_db(CACHE_MONGOENGINE_ALIAS)
db[CACHE_COLLECTION_RESPONSES].update_many(
{
"kind": "config-size",
"http_status": 200,
},
{
"$unset": {
"content.size.config.estimated_num_rows": "",
"content.size.splits.$[].estimated_num_rows": "",
}
},
)
db[CACHE_COLLECTION_RESPONSES].update_many(
{
"kind": "dataset-size",
"http_status": 200,
},
{
"$unset": {
"content.size.dataset.estimated_num_rows": "",
"content.size.configs.$[].estimated_num_rows": "",
"content.size.splits.$[].estimated_num_rows": "",
}
},
)

def validate(self) -> None:
logging.info("Ensure that a random selection of cached results have the 'estimated_num_rows' field")

check_documents(DocCls=CachedResponseDocument, sample_size=10)
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
# SPDX-License-Identifier: Apache-2.0
# Copyright 2024 The HuggingFace Authors.
from typing import Any

from libcommon.constants import CACHE_COLLECTION_RESPONSES, CACHE_MONGOENGINE_ALIAS
from libcommon.resources import MongoResource
from mongoengine.connection import get_db

from mongodb_migration.migrations._20240624144000_cache_add_estimated_num_rows_field_in_size import (
MigrationAddEstimatedNumRowsToSizeCacheResponse,
)


def assert_estimated_num_rows_in_config(dataset: str, kind: str) -> None:
db = get_db(CACHE_MONGOENGINE_ALIAS)
entry = db[CACHE_COLLECTION_RESPONSES].find_one({"dataset": dataset, "kind": kind})
assert entry is not None
assert entry["content"]["size"]["config"]["estimated_num_rows"] is None
assert all(split["estimated_num_rows"] is None for split in entry["content"]["size"]["splits"])


def assert_estimated_num_rows_in_dataset(dataset: str, kind: str) -> None:
db = get_db(CACHE_MONGOENGINE_ALIAS)
entry = db[CACHE_COLLECTION_RESPONSES].find_one({"dataset": dataset, "kind": kind})
assert entry is not None
assert entry["content"]["size"]["dataset"]["estimated_num_rows"] is None
assert all(split["estimated_num_rows"] is None for split in entry["content"]["size"]["configs"])
assert all(split["estimated_num_rows"] is None for split in entry["content"]["size"]["splits"])


def assert_unchanged_in_config(dataset: str, kind: str) -> None:
db = get_db(CACHE_MONGOENGINE_ALIAS)
entry = db[CACHE_COLLECTION_RESPONSES].find_one({"dataset": dataset, "kind": kind})
assert entry is not None
if "size" in entry["content"]:
assert "estimated_num_rows" not in entry["content"]["size"]["config"]
assert all("estimated_num_rows" not in split for split in entry["content"]["size"]["splits"])


def assert_unchanged_in_dataset(dataset: str, kind: str) -> None:
db = get_db(CACHE_MONGOENGINE_ALIAS)
entry = db[CACHE_COLLECTION_RESPONSES].find_one({"dataset": dataset, "kind": kind})
assert entry is not None
if "size" in entry["content"]:
assert "estimated_num_rows" not in entry["content"]["size"]["dataset"]
assert all("estimated_num_rows" not in split for split in entry["content"]["size"]["configs"])
assert all("estimated_num_rows" not in split for split in entry["content"]["size"]["splits"])


def test_cache_add_partial(mongo_host: str) -> None:
with MongoResource(database="test_cache_add_tags_to_hub_cache", host=mongo_host, mongoengine_alias="cache"):
db = get_db(CACHE_MONGOENGINE_ALIAS)
cache: list[dict[str, Any]] = [
{
"dataset": "dataset",
"config": "config",
"kind": "config-size",
"content": {
"size": {
"config": {
"dataset": "dataset",
"config": "config",
"num_bytes_original_files": 123,
"num_bytes_parquet_files": 123,
"num_bytes_memory": 123,
"num_rows": 1000,
"num_columns": 1,
},
"splits": [
{
"dataset": "dataset",
"config": "config",
"split": "train",
"num_bytes_original_files": 120,
"num_bytes_parquet_files": 120,
"num_bytes_memory": 120,
"num_rows": 900,
"num_columns": 1,
},
{
"dataset": "dataset",
"config": "config",
"split": "test",
"num_bytes_original_files": 3,
"num_bytes_parquet_files": 3,
"num_bytes_memory": 3,
"num_rows": 100,
"num_columns": 1,
},
],
},
"partial": False,
},
"http_status": 200,
"job_runner_version": 1,
"progress": None,
},
{
"dataset": "dataset",
"config": "config",
"kind": "dataset-size",
"content": {
"size": {
"dataset": {
"dataset": "dataset",
"config": "config",
"num_bytes_original_files": 123,
"num_bytes_parquet_files": 123,
"num_bytes_memory": 123,
"num_rows": 1000,
"num_columns": 1,
},
"configs": [
{
"dataset": "dataset",
"config": "config",
"num_bytes_original_files": 123,
"num_bytes_parquet_files": 123,
"num_bytes_memory": 123,
"num_rows": 1000,
"num_columns": 1,
}
],
"splits": [
{
"dataset": "dataset",
"config": "config",
"split": "train",
"num_bytes_original_files": 120,
"num_bytes_parquet_files": 120,
"num_bytes_memory": 120,
"num_rows": 900,
"num_columns": 1,
},
{
"dataset": "dataset",
"config": "config",
"split": "test",
"num_bytes_original_files": 3,
"num_bytes_parquet_files": 3,
"num_bytes_memory": 3,
"num_rows": 100,
"num_columns": 1,
},
],
},
"partial": False,
},
"http_status": 200,
"job_runner_version": 1,
"progress": None,
},
{
"dataset": "dataset_with_error",
"config": "config_with_error",
"kind": "config-size",
"content": {"error": "error"},
"details": {
"error": "error",
"cause_exception": "UnexpextedError",
"cause_message": "error",
"cause_traceback": ["Traceback"],
},
"error_code": "UnexpectedError",
"http_status": 500,
"job_runner_version": 1,
"progress": None,
},
]

db[CACHE_COLLECTION_RESPONSES].insert_many(cache)

migration = MigrationAddEstimatedNumRowsToSizeCacheResponse(
version="20240624144000",
description="add the 'estimated_num_rows' fields to size",
)
migration.up()

assert_estimated_num_rows_in_config("dataset", kind="config-size")
assert_estimated_num_rows_in_dataset("dataset", kind="dataset-size")
assert_unchanged_in_config("dataset_with_error", kind="config-size")

migration.down()
assert_unchanged_in_config("dataset", kind="config-size")
assert_unchanged_in_dataset("dataset", kind="dataset-size")
assert_unchanged_in_config("dataset_with_error", kind="config-size")

db[CACHE_COLLECTION_RESPONSES].drop()
3 changes: 3 additions & 0 deletions services/worker/src/worker/dtos.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ class ConfigSize(TypedDict):
num_bytes_memory: int
num_rows: int
num_columns: int
estimated_num_rows: Optional[int]


class SplitSize(TypedDict):
Expand All @@ -216,6 +217,7 @@ class SplitSize(TypedDict):
num_bytes_memory: int
num_rows: int
num_columns: int
estimated_num_rows: Optional[int]


class ConfigSizeContent(TypedDict):
Expand Down Expand Up @@ -379,6 +381,7 @@ class DatasetSize(TypedDict):
num_bytes_parquet_files: int
num_bytes_memory: int
num_rows: int
estimated_num_rows: Optional[int]


class DatasetSizeContent(TypedDict):
Expand Down
18 changes: 18 additions & 0 deletions services/worker/src/worker/job_runners/config/size.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,15 @@ def compute_config_size_response(dataset: str, config: str) -> ConfigSizeRespons
"Previous step did not return the expected content.",
TypeError(f"dataset_info should be a dict, but got {type(content['dataset_info'])}"),
)
if content["estimated_dataset_info"] is not None and not isinstance(content["estimated_dataset_info"], dict):
raise PreviousStepFormatError(
"Previous step did not return the expected content.",
TypeError(f"estimated_info should be a dict, but got {type(content['dataset_info'])}"),
)

try:
config_info = content["dataset_info"]
config_estimated_info = content["estimated_dataset_info"]
num_columns = len(config_info["features"])
split_sizes: list[SplitSize] = [
{
Expand All @@ -58,6 +64,13 @@ def compute_config_size_response(dataset: str, config: str) -> ConfigSizeRespons
"num_bytes_memory": split_info["num_bytes"] if "num_bytes" in split_info else 0,
"num_rows": split_info["num_examples"] if "num_examples" in split_info else 0,
"num_columns": num_columns,
"estimated_num_rows": config_estimated_info["splits"][split_info["name"]]["num_examples"]
if isinstance(config_estimated_info, dict)
and "splits" in config_estimated_info
and "name" in split_info
and split_info["name"] in config_estimated_info["splits"]
and "num_examples" in config_estimated_info["splits"][split_info["name"]]
else None,
}
for split_info in config_info["splits"].values()
]
Expand All @@ -70,6 +83,11 @@ def compute_config_size_response(dataset: str, config: str) -> ConfigSizeRespons
"num_bytes_memory": sum(split_size["num_bytes_memory"] for split_size in split_sizes),
"num_rows": sum(split_size["num_rows"] for split_size in split_sizes),
"num_columns": num_columns,
"estimated_num_rows": sum(
split_size["estimated_num_rows"] or split_size["num_rows"] for split_size in split_sizes
)
if any(split_size["estimated_num_rows"] for split_size in split_sizes)
else None,
}
)
partial = content["partial"]
Expand Down
6 changes: 5 additions & 1 deletion services/worker/src/worker/job_runners/dataset/hub_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,16 @@ def compute_hub_cache_response(dataset: str) -> tuple[DatasetHubCacheResponse, f
or "dataset" not in content["size"]
or "num_rows" not in content["size"]["dataset"]
or not isinstance(content["size"]["dataset"]["num_rows"], int)
or not (
isinstance(content["size"]["dataset"]["estimated_num_rows"], int)
or content["size"]["dataset"]["estimated_num_rows"] is None
)
):
raise PreviousStepFormatError(
"Previous step 'dataset-size' did not return the expected content: 'partial' or 'size.dataset.num_rows'."
)
partial = content["partial"]
num_rows = content["size"]["dataset"]["num_rows"]
num_rows = content["size"]["dataset"]["estimated_num_rows"] or content["size"]["dataset"]["num_rows"]
progresses.append(size_response["progress"])
except PreviousStepFormatError:
raise
Expand Down
5 changes: 5 additions & 0 deletions services/worker/src/worker/job_runners/dataset/size.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@ def compute_sizes_response(dataset: str) -> tuple[DatasetSizeResponse, float]:
"num_bytes_parquet_files": sum(config_size["num_bytes_parquet_files"] for config_size in config_sizes),
"num_bytes_memory": sum(config_size["num_bytes_memory"] for config_size in config_sizes),
"num_rows": sum(config_size["num_rows"] for config_size in config_sizes),
"estimated_num_rows": sum(
config_size["estimated_num_rows"] or config_size["num_rows"] for config_size in config_sizes
)
if any(config_size["estimated_num_rows"] for config_size in config_sizes)
else None,
}
except Exception as e:
raise PreviousStepFormatError("Previous step did not return the expected content.", e) from e
Expand Down
Loading

0 comments on commit 2245d24

Please sign in to comment.