Skip to content

Commit

Permalink
re-enable concurrency test
Browse files Browse the repository at this point in the history
  • Loading branch information
severo committed Aug 23, 2024
1 parent 9ecd26d commit 429ed38
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 84 deletions.
32 changes: 32 additions & 0 deletions services/worker/tests/fixtures/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,35 @@ def extra_fields_readme(tmp_path_factory: pytest.TempPathFactory) -> str:
with open(path, "w", newline="") as f:
f.writelines(f"{line}\n" for line in lines)
return path


@pytest.fixture(scope="session")
def n_configs_paths(tmp_path_factory: pytest.TempPathFactory) -> list[str]:
directory = tmp_path_factory.mktemp("data")
readme = directory / "README.md"
N = 15
lines = [
"---",
"configs:",
]
for i in range(N):
lines += [
f"- config_name: config{i}",
f' data_files: "config{i}.csv"',
]
lines += [
"---",
]
with open(readme, "w", newline="") as f:
f.writelines(f"{line}\n" for line in lines)
files = [readme]
for i in range(N):
config_name = f"config{i}"
path = directory / f"{config_name}.csv"
with open(path, "w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=["text"])
writer.writeheader()
for _ in range(1000):
writer.writerow({"text": config_name})
files.append(path)
return files
7 changes: 7 additions & 0 deletions services/worker/tests/fixtures/hub.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,13 @@ def hub_public_big_no_info(datasets: Mapping[str, Dataset]) -> Iterator[str]:
delete_hub_dataset_repo(repo_id=repo_id)


@pytest.fixture(scope="session")
def hub_public_n_configs(n_configs_paths: str) -> Iterator[str]:
repo_id = create_hub_dataset_repo(prefix="n-configs", file_paths=n_configs_paths)
yield repo_id
delete_hub_dataset_repo(repo_id=repo_id)


@pytest.fixture(scope="session")
def hub_public_big_csv(big_csv_path: str) -> Iterator[str]:
repo_id = create_hub_dataset_repo(prefix="big-csv", file_paths=[big_csv_path])
Expand Down
143 changes: 59 additions & 84 deletions services/worker/tests/job_runners/config/test_parquet_and_info.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
# SPDX-License-Identifier: Apache-2.0
# Copyright 2022 The HuggingFace Authors.

import io
import os
import zipfile
from collections.abc import Callable, Iterator, Mapping
from http import HTTPStatus
from multiprocessing import Pool
from pathlib import Path
from typing import Any, Optional, TypedDict
from unittest.mock import patch
Expand All @@ -25,11 +25,13 @@
from datasets.utils.py_utils import asdict
from huggingface_hub.hf_api import CommitOperationAdd, HfApi
from libcommon.dtos import JobInfo, JobParams, Priority
from libcommon.queue.jobs import Queue
from libcommon.resources import CacheMongoResource, QueueMongoResource
from libcommon.simple_cache import upsert_response

from worker.config import AppConfig
from worker.dtos import CompleteJobResult
from worker.job_manager import JobManager
from worker.job_runners.config.parquet_and_info import (
ConfigParquetAndInfoJobRunner,
ParquetFile,
Expand Down Expand Up @@ -483,89 +485,62 @@ def launch_job_runner(job_runner_args: JobRunnerArgs) -> CompleteJobResult:
return result


# # Previously run on a dataset created with the following script
# # N = 15
# DATASET_SCRIPT_WITH_N_CONFIGS = """
# import os

# import datasets
# from datasets import DatasetInfo, BuilderConfig, Features, Split, SplitGenerator, Value


# class DummyDataset(datasets.GeneratorBasedBuilder):

# BUILDER_CONFIGS = [BuilderConfig(name="config"+str(i)) for i in range(15)]

# def _info(self) -> DatasetInfo:
# return DatasetInfo(features=Features({"text": Value("string")}))

# def _split_generators(self, dl_manager):
# return [
# SplitGenerator(Split.TRAIN, gen_kwargs={"text": self.config.name}),
# ]

# def _generate_examples(self, text, **kwargs):
# for i in range(1000):
# yield i, {"text": text}
# """


# def test_concurrency(
# hub_public_n_configs: str,
# app_config: AppConfig,
# tmp_path: Path,
# get_dataset_config_names_job_runner: GetDatasetConfigNamesJobRunner,
# queue_mongo_resource: QueueMongoResource,
# cache_mongo_resource: CacheMongoResource,
# ) -> None:
# """
# Test that multiple job runners (to compute config-parquet-and-info) can run in parallel,
# without having conflicts when sending commits to the Hub.
# For this test, we need a lot of configs for the same dataset (say 20) and one job runner for each.
# Ideally we would try for both quick and slow jobs.
# """
# repo_id = hub_public_n_configs
# hf_api = HfApi(endpoint=CI_HUB_ENDPOINT, token=CI_USER_TOKEN)
# revision = hf_api.dataset_info(repo_id=repo_id, files_metadata=False).sha
# if revision is None:
# raise ValueError(f"Could not find revision for dataset {repo_id}")

# # fill the cache for the step dataset-config-names, required by the job_runner
# # it's a lot of code 😅
# job_info = JobInfo(
# job_id="not_used",
# type="dataset-config-names",
# params=JobParams(dataset=repo_id, revision=revision, config=None, split=None),
# priority=Priority.NORMAL,
# difficulty=50,
# started_at=None,
# )
# queue = Queue()
# queue.create_jobs([job_info])
# job_info = queue.start_job()
# job_manager = JobManager(
# job_info=job_info,
# app_config=app_config,
# job_runner=get_dataset_config_names_job_runner(repo_id, app_config),
# )
# job_result = job_manager.run_job()
# job_manager.finish(job_result=job_result)
# if not job_result["output"]:
# raise ValueError("Could not get config names")
# configs = [str(config_name["config"]) for config_name in job_result["output"]["content"]["config_names"]]

# # launch the job runners
# NUM_JOB_RUNNERS = 10
# with Pool(NUM_JOB_RUNNERS) as pool:
# pool.map(
# launch_job_runner,
# [
# JobRunnerArgs(
# dataset=repo_id, revision=revision, config=config, app_config=app_config, tmp_path=tmp_path
# )
# for config in configs
# ],
# )
def test_concurrency(
hub_public_n_configs: str,
app_config: AppConfig,
tmp_path: Path,
get_dataset_config_names_job_runner: GetDatasetConfigNamesJobRunner,
queue_mongo_resource: QueueMongoResource,
cache_mongo_resource: CacheMongoResource,
) -> None:
"""
Test that multiple job runners (to compute config-parquet-and-info) can run in parallel,
without having conflicts when sending commits to the Hub.
For this test, we need a lot of configs for the same dataset (say 20) and one job runner for each.
Ideally we would try for both quick and slow jobs.
"""
repo_id = hub_public_n_configs
hf_api = HfApi(endpoint=CI_HUB_ENDPOINT, token=CI_USER_TOKEN)
revision = hf_api.dataset_info(repo_id=repo_id, files_metadata=False).sha
if revision is None:
raise ValueError(f"Could not find revision for dataset {repo_id}")

# fill the cache for the step dataset-config-names, required by the job_runner
# it's a lot of code 😅
job_info = JobInfo(
job_id="not_used",
type="dataset-config-names",
params=JobParams(dataset=repo_id, revision=revision, config=None, split=None),
priority=Priority.NORMAL,
difficulty=50,
started_at=None,
)
queue = Queue()
queue.create_jobs([job_info])
job_info = queue.start_job()
job_manager = JobManager(
job_info=job_info,
app_config=app_config,
job_runner=get_dataset_config_names_job_runner(repo_id, app_config),
)
job_result = job_manager.run_job()
job_manager.finish(job_result=job_result)
if not job_result["output"]:
raise ValueError("Could not get config names")
configs = [str(config_name["config"]) for config_name in job_result["output"]["content"]["config_names"]]

# launch the job runners
NUM_JOB_RUNNERS = 10
with Pool(NUM_JOB_RUNNERS) as pool:
pool.map(
launch_job_runner,
[
JobRunnerArgs(
dataset=repo_id, revision=revision, config=config, app_config=app_config, tmp_path=tmp_path
)
for config in configs
],
)


@pytest.mark.parametrize(
Expand Down

0 comments on commit 429ed38

Please sign in to comment.