From 429ed382f94a84b43a43e3506ca69e093744ed18 Mon Sep 17 00:00:00 2001 From: Sylvain Lesage Date: Fri, 23 Aug 2024 21:53:28 +0000 Subject: [PATCH] re-enable concurrency test --- services/worker/tests/fixtures/files.py | 32 ++++ services/worker/tests/fixtures/hub.py | 7 + .../config/test_parquet_and_info.py | 143 ++++++++---------- 3 files changed, 98 insertions(+), 84 deletions(-) diff --git a/services/worker/tests/fixtures/files.py b/services/worker/tests/fixtures/files.py index b5494b560..47a93db40 100644 --- a/services/worker/tests/fixtures/files.py +++ b/services/worker/tests/fixtures/files.py @@ -110,3 +110,35 @@ def extra_fields_readme(tmp_path_factory: pytest.TempPathFactory) -> str: with open(path, "w", newline="") as f: f.writelines(f"{line}\n" for line in lines) return path + + +@pytest.fixture(scope="session") +def n_configs_paths(tmp_path_factory: pytest.TempPathFactory) -> list[str]: + directory = tmp_path_factory.mktemp("data") + readme = directory / "README.md" + N = 15 + lines = [ + "---", + "configs:", + ] + for i in range(N): + lines += [ + f"- config_name: config{i}", + f' data_files: "config{i}.csv"', + ] + lines += [ + "---", + ] + with open(readme, "w", newline="") as f: + f.writelines(f"{line}\n" for line in lines) + files = [readme] + for i in range(N): + config_name = f"config{i}" + path = directory / f"{config_name}.csv" + with open(path, "w", newline="") as f: + writer = csv.DictWriter(f, fieldnames=["text"]) + writer.writeheader() + for _ in range(1000): + writer.writerow({"text": config_name}) + files.append(path) + return files diff --git a/services/worker/tests/fixtures/hub.py b/services/worker/tests/fixtures/hub.py index 55b865448..de8eef058 100644 --- a/services/worker/tests/fixtures/hub.py +++ b/services/worker/tests/fixtures/hub.py @@ -254,6 +254,13 @@ def hub_public_big_no_info(datasets: Mapping[str, Dataset]) -> Iterator[str]: delete_hub_dataset_repo(repo_id=repo_id) +@pytest.fixture(scope="session") +def hub_public_n_configs(n_configs_paths: str) -> Iterator[str]: + repo_id = create_hub_dataset_repo(prefix="n-configs", file_paths=n_configs_paths) + yield repo_id + delete_hub_dataset_repo(repo_id=repo_id) + + @pytest.fixture(scope="session") def hub_public_big_csv(big_csv_path: str) -> Iterator[str]: repo_id = create_hub_dataset_repo(prefix="big-csv", file_paths=[big_csv_path]) diff --git a/services/worker/tests/job_runners/config/test_parquet_and_info.py b/services/worker/tests/job_runners/config/test_parquet_and_info.py index e09ddf2ac..aea05730f 100644 --- a/services/worker/tests/job_runners/config/test_parquet_and_info.py +++ b/services/worker/tests/job_runners/config/test_parquet_and_info.py @@ -1,11 +1,11 @@ # SPDX-License-Identifier: Apache-2.0 # Copyright 2022 The HuggingFace Authors. - import io import os import zipfile from collections.abc import Callable, Iterator, Mapping from http import HTTPStatus +from multiprocessing import Pool from pathlib import Path from typing import Any, Optional, TypedDict from unittest.mock import patch @@ -25,11 +25,13 @@ from datasets.utils.py_utils import asdict from huggingface_hub.hf_api import CommitOperationAdd, HfApi from libcommon.dtos import JobInfo, JobParams, Priority +from libcommon.queue.jobs import Queue from libcommon.resources import CacheMongoResource, QueueMongoResource from libcommon.simple_cache import upsert_response from worker.config import AppConfig from worker.dtos import CompleteJobResult +from worker.job_manager import JobManager from worker.job_runners.config.parquet_and_info import ( ConfigParquetAndInfoJobRunner, ParquetFile, @@ -483,89 +485,62 @@ def launch_job_runner(job_runner_args: JobRunnerArgs) -> CompleteJobResult: return result -# # Previously run on a dataset created with the following script -# # N = 15 -# DATASET_SCRIPT_WITH_N_CONFIGS = """ -# import os - -# import datasets -# from datasets import DatasetInfo, BuilderConfig, Features, Split, SplitGenerator, Value - - -# class DummyDataset(datasets.GeneratorBasedBuilder): - -# BUILDER_CONFIGS = [BuilderConfig(name="config"+str(i)) for i in range(15)] - -# def _info(self) -> DatasetInfo: -# return DatasetInfo(features=Features({"text": Value("string")})) - -# def _split_generators(self, dl_manager): -# return [ -# SplitGenerator(Split.TRAIN, gen_kwargs={"text": self.config.name}), -# ] - -# def _generate_examples(self, text, **kwargs): -# for i in range(1000): -# yield i, {"text": text} -# """ - - -# def test_concurrency( -# hub_public_n_configs: str, -# app_config: AppConfig, -# tmp_path: Path, -# get_dataset_config_names_job_runner: GetDatasetConfigNamesJobRunner, -# queue_mongo_resource: QueueMongoResource, -# cache_mongo_resource: CacheMongoResource, -# ) -> None: -# """ -# Test that multiple job runners (to compute config-parquet-and-info) can run in parallel, -# without having conflicts when sending commits to the Hub. -# For this test, we need a lot of configs for the same dataset (say 20) and one job runner for each. -# Ideally we would try for both quick and slow jobs. -# """ -# repo_id = hub_public_n_configs -# hf_api = HfApi(endpoint=CI_HUB_ENDPOINT, token=CI_USER_TOKEN) -# revision = hf_api.dataset_info(repo_id=repo_id, files_metadata=False).sha -# if revision is None: -# raise ValueError(f"Could not find revision for dataset {repo_id}") - -# # fill the cache for the step dataset-config-names, required by the job_runner -# # it's a lot of code 😅 -# job_info = JobInfo( -# job_id="not_used", -# type="dataset-config-names", -# params=JobParams(dataset=repo_id, revision=revision, config=None, split=None), -# priority=Priority.NORMAL, -# difficulty=50, -# started_at=None, -# ) -# queue = Queue() -# queue.create_jobs([job_info]) -# job_info = queue.start_job() -# job_manager = JobManager( -# job_info=job_info, -# app_config=app_config, -# job_runner=get_dataset_config_names_job_runner(repo_id, app_config), -# ) -# job_result = job_manager.run_job() -# job_manager.finish(job_result=job_result) -# if not job_result["output"]: -# raise ValueError("Could not get config names") -# configs = [str(config_name["config"]) for config_name in job_result["output"]["content"]["config_names"]] - -# # launch the job runners -# NUM_JOB_RUNNERS = 10 -# with Pool(NUM_JOB_RUNNERS) as pool: -# pool.map( -# launch_job_runner, -# [ -# JobRunnerArgs( -# dataset=repo_id, revision=revision, config=config, app_config=app_config, tmp_path=tmp_path -# ) -# for config in configs -# ], -# ) +def test_concurrency( + hub_public_n_configs: str, + app_config: AppConfig, + tmp_path: Path, + get_dataset_config_names_job_runner: GetDatasetConfigNamesJobRunner, + queue_mongo_resource: QueueMongoResource, + cache_mongo_resource: CacheMongoResource, +) -> None: + """ + Test that multiple job runners (to compute config-parquet-and-info) can run in parallel, + without having conflicts when sending commits to the Hub. + For this test, we need a lot of configs for the same dataset (say 20) and one job runner for each. + Ideally we would try for both quick and slow jobs. + """ + repo_id = hub_public_n_configs + hf_api = HfApi(endpoint=CI_HUB_ENDPOINT, token=CI_USER_TOKEN) + revision = hf_api.dataset_info(repo_id=repo_id, files_metadata=False).sha + if revision is None: + raise ValueError(f"Could not find revision for dataset {repo_id}") + + # fill the cache for the step dataset-config-names, required by the job_runner + # it's a lot of code 😅 + job_info = JobInfo( + job_id="not_used", + type="dataset-config-names", + params=JobParams(dataset=repo_id, revision=revision, config=None, split=None), + priority=Priority.NORMAL, + difficulty=50, + started_at=None, + ) + queue = Queue() + queue.create_jobs([job_info]) + job_info = queue.start_job() + job_manager = JobManager( + job_info=job_info, + app_config=app_config, + job_runner=get_dataset_config_names_job_runner(repo_id, app_config), + ) + job_result = job_manager.run_job() + job_manager.finish(job_result=job_result) + if not job_result["output"]: + raise ValueError("Could not get config names") + configs = [str(config_name["config"]) for config_name in job_result["output"]["content"]["config_names"]] + + # launch the job runners + NUM_JOB_RUNNERS = 10 + with Pool(NUM_JOB_RUNNERS) as pool: + pool.map( + launch_job_runner, + [ + JobRunnerArgs( + dataset=repo_id, revision=revision, config=config, app_config=app_config, tmp_path=tmp_path + ) + for config in configs + ], + ) @pytest.mark.parametrize(