From 25c49dc215275562e8fc857a06fb74c82895b0c6 Mon Sep 17 00:00:00 2001 From: Sylvain Lesage Date: Fri, 12 Jan 2024 17:17:57 +0100 Subject: [PATCH] remove check on disks (#2286) --- chart/templates/_env/_envWorker.tpl | 4 ---- chart/values.yaml | 2 -- services/worker/README.md | 6 ----- services/worker/src/worker/config.py | 5 ---- services/worker/src/worker/loop.py | 24 +++---------------- services/worker/src/worker/resources.py | 7 ------ .../worker/src/worker/start_worker_loop.py | 1 - services/worker/tests/test_loop.py | 1 - services/worker/tests/test_resources.py | 3 --- tools/docker-compose-datasets-server.yml | 1 - tools/docker-compose-dev-base.yml | 1 - tools/docker-compose-dev-datasets-server.yml | 2 -- 12 files changed, 3 insertions(+), 54 deletions(-) diff --git a/chart/templates/_env/_envWorker.tpl b/chart/templates/_env/_envWorker.tpl index caeb031c08..f8a8a23ae9 100644 --- a/chart/templates/_env/_envWorker.tpl +++ b/chart/templates/_env/_envWorker.tpl @@ -10,8 +10,6 @@ value: {{ .Values.worker.killZombiesIntervalSeconds | quote}} - name: WORKER_KILL_LONG_JOB_INTERVAL_SECONDS value: {{ .Values.worker.killLongJobIntervalSeconds | quote}} -- name: WORKER_MAX_DISK_USAGE_PCT - value: {{ .Values.worker.maxDiskUsagePct | quote }} - name: WORKER_MAX_JOB_DURATION_SECONDS value: {{ .Values.worker.maxJobDurationSeconds | quote }} - name: WORKER_MAX_LOAD_PCT @@ -25,8 +23,6 @@ - name: TMPDIR value: "/tmp" # ^ensure the temporary files are created in /tmp, which is writable -- name: WORKER_STORAGE_PATHS - value: {{ .Values.assets.storageDirectory | quote }} # specific to the /first-rows job runner - name: FIRST_ROWS_MAX_BYTES value: {{ .Values.firstRows.maxBytes | quote }} diff --git a/chart/values.yaml b/chart/values.yaml index 9c6193beaa..1777c41882 100644 --- a/chart/values.yaml +++ b/chart/values.yaml @@ -180,8 +180,6 @@ worker: killLongJobIntervalSeconds: 60 # the time interval at which the worker looks for zombie jobs to kill them killZombiesIntervalSeconds: 600 - # maximum disk usage of every storage disk in the list (in percentage) to allow a job to start. Set to 0 to disable the test. - maxDiskUsagePct: 90 # the maximum duration of a job before it gets stopped for exceeded the maximum duration maxJobDurationSeconds: 2400 # Max CPU load (%) - if reached, sleeps until it comes back under the limit. Set to 0 to disable the test. diff --git a/services/worker/README.md b/services/worker/README.md index 3c255adf67..541eb07f26 100644 --- a/services/worker/README.md +++ b/services/worker/README.md @@ -30,13 +30,11 @@ Set environment variables to configure the worker. - `WORKER_JOB_TYPES_ONLY`: comma-separated list of the non-blocked job types to process, e.g. "dataset-config-names,dataset-split-names". If empty, the worker processes all the non-blocked jobs. Defaults to empty. - `WORKER_KILL_LONG_JOB_INTERVAL_SECONDS`: the time interval at which the worker looks for long jobs to kill them. Defaults to `60` (1 minute). - `WORKER_KILL_ZOMBIES_INTERVAL_SECONDS`: the time interval at which the worker looks for zombie jobs to kill them. Defaults to `600` (10 minutes). -- `WORKER_MAX_DISK_USAGE_PCT`: maximum disk usage of every storage disk in the list (in percentage) to allow a job to start. Set to 0 to disable the test. Defaults to 90. - `WORKER_MAX_JOB_DURATION_SECONDS`: the maximum duration allowed for a job to run. If the job runs longer, it is killed (see `WORKER_KILL_LONG_JOB_INTERVAL_SECONDS`). Defaults to `1200` (20 minutes). - `WORKER_MAX_LOAD_PCT`: maximum load of the machine (in percentage: the max between the 1m load and the 5m load divided by the number of CPUs \*100) allowed to start a job. Set to 0 to disable the test. Defaults to 70. - `WORKER_MAX_MEMORY_PCT`: maximum memory (RAM + SWAP) usage of the machine (in percentage) allowed to start a job. Set to 0 to disable the test. Defaults to 80. - `WORKER_MAX_MISSING_HEARTBEATS`: the number of hearbeats a job must have missed to be considered a zombie job. Defaults to `5`. - `WORKER_SLEEP_SECONDS`: wait duration in seconds at each loop iteration before checking if resources are available and processing a job if any is available. Note that the loop doesn't wait just after finishing a job: the next job is immediately processed. Defaults to `15`. -- `WORKER_STORAGE_PATHS`: comma-separated list of paths to check for disk usage. Defaults to empty. Also, it's possible to force the parent directory in which the temporary files (as the current job state file and its associated lock file) will be created by setting `TMPDIR` to a writable directory. If not set, the worker will use the default temporary directory of the system, as described in https://docs.python.org/3/library/tempfile.html#tempfile.gettempdir. @@ -50,16 +48,12 @@ Also, set the modules cache configuration for the datasets-based worker. See [.. - `HF_MODULES_CACHE`: directory where the `datasets` library will store the cached dataset scripts. If not set, the datasets library will choose the default location. Defaults to None. -Note that both directories will be appended to `WORKER_STORAGE_PATHS` (see [../../libs/libcommon/README.md](../../libs/libcommon/README.md)) to hold the workers when the disk is full. - ### Numba library Numba requires setting the `NUMBA_CACHE_DIR` environment variable to a writable directory to cache the compiled functions. Required on cloud infrastructure (see https://stackoverflow.com/a/63367171/7351594): - `NUMBA_CACHE_DIR`: directory where the `numba` decorators (used by `librosa`) can write cache. -Note that this directory will be appended to `WORKER_STORAGE_PATHS` (see [../../libs/libcommon/README.md](../../libs/libcommon/README.md)) to hold the workers when the disk is full. - ### Huggingface_hub library If the Hub is not https://huggingface.co (i.e., if you set the `COMMON_HF_ENDPOINT` environment variable), you must set the `HF_ENDPOINT` environment variable to the same value. See https://github.com/huggingface/datasets/pull/5196#issuecomment-1322191411 for more details: diff --git a/services/worker/src/worker/config.py b/services/worker/src/worker/config.py index e52d039c60..b70cf7e312 100644 --- a/services/worker/src/worker/config.py +++ b/services/worker/src/worker/config.py @@ -44,7 +44,6 @@ def from_env(cls) -> "UvicornConfig": WORKER_HEARTBEAT_INTERVAL_SECONDS = 60 WORKER_KILL_LONG_JOB_INTERVAL_SECONDS = 60 WORKER_KILL_ZOMBIES_INTERVAL_SECONDS = 10 * 60 -WORKER_MAX_DISK_USAGE_PCT = 90 WORKER_MAX_JOB_DURATION_SECONDS = 20 * 60 WORKER_MAX_LOAD_PCT = 70 WORKER_MAX_MEMORY_PCT = 80 @@ -67,14 +66,12 @@ class WorkerConfig: job_types_only: list[str] = field(default_factory=get_empty_str_list) kill_long_job_interval_seconds: float = WORKER_KILL_LONG_JOB_INTERVAL_SECONDS kill_zombies_interval_seconds: float = WORKER_KILL_ZOMBIES_INTERVAL_SECONDS - max_disk_usage_pct: int = WORKER_MAX_DISK_USAGE_PCT max_job_duration_seconds: float = WORKER_MAX_JOB_DURATION_SECONDS max_load_pct: int = WORKER_MAX_LOAD_PCT max_memory_pct: int = WORKER_MAX_MEMORY_PCT max_missing_heartbeats: int = WORKER_MAX_MISSING_HEARTBEATS sleep_seconds: float = WORKER_SLEEP_SECONDS state_file_path: Optional[str] = WORKER_STATE_FILE_PATH - storage_paths: list[str] = field(default_factory=get_empty_str_list) @classmethod def from_env(cls) -> "WorkerConfig": @@ -95,7 +92,6 @@ def from_env(cls) -> "WorkerConfig": kill_zombies_interval_seconds=env.float( name="KILL_ZOMBIES_INTERVAL_SECONDS", default=WORKER_KILL_ZOMBIES_INTERVAL_SECONDS ), - max_disk_usage_pct=env.int(name="MAX_DISK_USAGE_PCT", default=WORKER_MAX_DISK_USAGE_PCT), max_job_duration_seconds=env.float( name="MAX_JOB_DURATION_SECONDS", default=WORKER_MAX_JOB_DURATION_SECONDS ), @@ -106,7 +102,6 @@ def from_env(cls) -> "WorkerConfig": state_file_path=env.str( name="STATE_FILE_PATH", default=WORKER_STATE_FILE_PATH ), # this environment variable is not expected to be set explicitly, it's set by the worker executor - storage_paths=env.list(name="STORAGE_PATHS", default=get_empty_str_list()), ) diff --git a/services/worker/src/worker/loop.py b/services/worker/src/worker/loop.py index 4605541c23..e32f87710e 100644 --- a/services/worker/src/worker/loop.py +++ b/services/worker/src/worker/loop.py @@ -4,7 +4,7 @@ import logging import random import time -from dataclasses import dataclass, field +from dataclasses import dataclass from datetime import datetime from typing import Optional, TypedDict @@ -19,7 +19,7 @@ Queue, ) from libcommon.utils import JobInfo, get_datetime -from psutil import cpu_count, disk_usage, getloadavg, swap_memory, virtual_memory +from psutil import cpu_count, getloadavg, swap_memory, virtual_memory from worker.config import AppConfig from worker.job_manager import JobManager @@ -43,8 +43,6 @@ class Loop: job_runner_factory (`JobRunnerFactory`): The job runner factory that will create a job runner for each job. Must be able to process the jobs of the queue. - library_cache_paths (`set[str]`): - The paths of the library caches. Used to check if the disk is full. worker_config (`WorkerConfig`): Worker configuration. state_file_path (`str`): @@ -52,14 +50,11 @@ class Loop: """ job_runner_factory: BaseJobRunnerFactory - library_cache_paths: set[str] app_config: AppConfig state_file_path: str - storage_paths: set[str] = field(init=False) def __post_init__(self) -> None: self.queue = Queue() - self.storage_paths = set(self.app_config.worker.storage_paths).union(self.library_cache_paths) def has_memory(self) -> bool: if self.app_config.worker.max_memory_pct <= 0: @@ -85,21 +80,8 @@ def has_cpu(self) -> bool: logging.info(f"cpu load is too high: {load_pct:.0f}% - max is {self.app_config.worker.max_load_pct}%") return ok - def has_storage(self) -> bool: - if self.app_config.worker.max_disk_usage_pct <= 0: - return True - for path in self.storage_paths: - try: - usage = disk_usage(path) - if usage.percent >= self.app_config.worker.max_disk_usage_pct: - return False - except Exception: - # if we can't get the disk usage, we let the process continue - return True - return True - def has_resources(self) -> bool: - return self.has_memory() and self.has_cpu() and self.has_storage() + return self.has_memory() and self.has_cpu() def sleep(self) -> None: jitter = 0.75 + random.random() / 2 # nosec diff --git a/services/worker/src/worker/resources.py b/services/worker/src/worker/resources.py index 363b1ad854..44d6fb6209 100644 --- a/services/worker/src/worker/resources.py +++ b/services/worker/src/worker/resources.py @@ -20,7 +20,6 @@ class LibrariesResource(Resource): previous_hf_update_download_counts: bool = field(init=False) previous_verbosity: int = field(init=False) hf_datasets_cache: Path = field(init=False) - storage_paths: set[str] = field(init=False) def allocate(self) -> None: self.hf_datasets_cache = ( @@ -45,12 +44,6 @@ def allocate(self) -> None: # environment variable to the desired value. # TODO: check here if huggingface_hub and datasets use the same endpoint - # Add the datasets and numba cache paths to the list of storage paths, to ensure the disk is not full - storage_paths = {str(self.hf_datasets_cache), str(datasets.config.HF_MODULES_CACHE)} - if self.numba_path is not None: - storage_paths.add(self.numba_path) - self.storage_paths = storage_paths - def release(self) -> None: datasets.config.HF_ENDPOINT = self.previous_hf_endpoint datasets.config.HF_UPDATE_DOWNLOAD_COUNTS = self.previous_hf_update_download_counts diff --git a/services/worker/src/worker/start_worker_loop.py b/services/worker/src/worker/start_worker_loop.py index 8a4f23cb13..24f43ec15b 100644 --- a/services/worker/src/worker/start_worker_loop.py +++ b/services/worker/src/worker/start_worker_loop.py @@ -67,7 +67,6 @@ storage_client=storage_client, ) loop = Loop( - library_cache_paths=libraries_resource.storage_paths, job_runner_factory=job_runner_factory, state_file_path=state_file_path, app_config=app_config, diff --git a/services/worker/tests/test_loop.py b/services/worker/tests/test_loop.py index f28bc4a026..5d80c4de45 100644 --- a/services/worker/tests/test_loop.py +++ b/services/worker/tests/test_loop.py @@ -46,7 +46,6 @@ def test_process_next_job( loop = Loop( job_runner_factory=factory, - library_cache_paths=libraries_resource.storage_paths, app_config=app_config, state_file_path=worker_state_file_path, ) diff --git a/services/worker/tests/test_resources.py b/services/worker/tests/test_resources.py index d13e85c4b2..c85f83cc8f 100644 --- a/services/worker/tests/test_resources.py +++ b/services/worker/tests/test_resources.py @@ -25,9 +25,6 @@ def test_libraries( hf_endpoint=hf_endpoint, init_hf_datasets_cache=init_hf_datasets_cache, numba_path=numba_path ) assert datasets.config.HF_ENDPOINT == hf_endpoint - assert (numba_path in resource.storage_paths) == define_numba_path - assert str(resource.hf_datasets_cache) in resource.storage_paths - assert str(datasets.config.HF_MODULES_CACHE) in resource.storage_paths assert not datasets.config.HF_UPDATE_DOWNLOAD_COUNTS assert (str(resource.hf_datasets_cache) == init_hf_datasets_cache) == define_init_hf_datasets_cache diff --git a/tools/docker-compose-datasets-server.yml b/tools/docker-compose-datasets-server.yml index aea8b99bd3..493054921b 100644 --- a/tools/docker-compose-datasets-server.yml +++ b/tools/docker-compose-datasets-server.yml @@ -228,7 +228,6 @@ services: PARQUET_AND_INFO_URL_TEMPLATE: ${PARQUET_AND_INFO_URL_TEMPLATE-/datasets/%s/resolve/%s/%s} PARQUET_METADATA_STORAGE_DIRECTORY: ${PARQUET_METADATA_STORAGE_DIRECTORY-/parquet_metadata} ROWS_INDEX_MAX_ARROW_DATA_IN_MEMORY: ${ROWS_INDEX_MAX_ARROW_DATA_IN_MEMORY-300_000_000} - WORKER_STORAGE_PATHS: ${ASSETS_STORAGE_ROOT-/storage/assets} # prometheus PROMETHEUS_MULTIPROC_DIR: ${PROMETHEUS_MULTIPROC_DIR-} # ^ note: the datasets cache is automatically added, so no need to add it here diff --git a/tools/docker-compose-dev-base.yml b/tools/docker-compose-dev-base.yml index f584af305b..1a3c61eed7 100644 --- a/tools/docker-compose-dev-base.yml +++ b/tools/docker-compose-dev-base.yml @@ -23,7 +23,6 @@ services: WORKER_CONTENT_MAX_BYTES: ${WORKER_CONTENT_MAX_BYTES-10_000_000} WORKER_KILL_LONG_JOB_INTERVAL_SECONDS: ${WORKER_KILL_LONG_JOB_INTERVAL_SECONDS-60} WORKER_KILL_ZOMBIES_INTERVAL_SECONDS: ${WORKER_KILL_ZOMBIES_INTERVAL_SECONDS-600} - WORKER_MAX_DISK_USAGE_PCT: ${WORKER_MAX_DISK_USAGE_PCT-90} WORKER_MAX_JOB_DURATION_SECONDS: ${WORKER_MAX_JOB_DURATION_SECONDS-1200} WORKER_MAX_MISSING_HEARTBEATS: ${WORKER_MAX_MISSING_HEARTBEATS-5} WORKER_MAX_LOAD_PCT: ${WORKER_MAX_LOAD_PCT-70} diff --git a/tools/docker-compose-dev-datasets-server.yml b/tools/docker-compose-dev-datasets-server.yml index 20c1c09e76..111c481a77 100644 --- a/tools/docker-compose-dev-datasets-server.yml +++ b/tools/docker-compose-dev-datasets-server.yml @@ -235,8 +235,6 @@ services: PARQUET_AND_INFO_URL_TEMPLATE: ${PARQUET_AND_INFO_URL_TEMPLATE-/datasets/%s/resolve/%s/%s} PARQUET_METADATA_STORAGE_DIRECTORY: ${PARQUET_METADATA_STORAGE_DIRECTORY-/parquet_metadata} ROWS_INDEX_MAX_ARROW_DATA_IN_MEMORY: ${ROWS_INDEX_MAX_ARROW_DATA_IN_MEMORY-300_000_000} - WORKER_STORAGE_PATHS: ${ASSETS_STORAGE_ROOT-/storage/assets} - # ^ note: the datasets cache is automatically added, so no need to add it here # prometheus PROMETHEUS_MULTIPROC_DIR: ${PROMETHEUS_MULTIPROC_DIR-} # uvicorn