Skip to content

Commit

Permalink
remove check on disks (#2286)
Browse files Browse the repository at this point in the history
  • Loading branch information
severo authored Jan 12, 2024
1 parent 0f42e45 commit 25c49dc
Show file tree
Hide file tree
Showing 12 changed files with 3 additions and 54 deletions.
4 changes: 0 additions & 4 deletions chart/templates/_env/_envWorker.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
value: {{ .Values.worker.killZombiesIntervalSeconds | quote}}
- name: WORKER_KILL_LONG_JOB_INTERVAL_SECONDS
value: {{ .Values.worker.killLongJobIntervalSeconds | quote}}
- name: WORKER_MAX_DISK_USAGE_PCT
value: {{ .Values.worker.maxDiskUsagePct | quote }}
- name: WORKER_MAX_JOB_DURATION_SECONDS
value: {{ .Values.worker.maxJobDurationSeconds | quote }}
- name: WORKER_MAX_LOAD_PCT
Expand All @@ -25,8 +23,6 @@
- name: TMPDIR
value: "/tmp"
# ^ensure the temporary files are created in /tmp, which is writable
- name: WORKER_STORAGE_PATHS
value: {{ .Values.assets.storageDirectory | quote }}
# specific to the /first-rows job runner
- name: FIRST_ROWS_MAX_BYTES
value: {{ .Values.firstRows.maxBytes | quote }}
Expand Down
2 changes: 0 additions & 2 deletions chart/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,6 @@ worker:
killLongJobIntervalSeconds: 60
# the time interval at which the worker looks for zombie jobs to kill them
killZombiesIntervalSeconds: 600
# maximum disk usage of every storage disk in the list (in percentage) to allow a job to start. Set to 0 to disable the test.
maxDiskUsagePct: 90
# the maximum duration of a job before it gets stopped for exceeded the maximum duration
maxJobDurationSeconds: 2400
# Max CPU load (%) - if reached, sleeps until it comes back under the limit. Set to 0 to disable the test.
Expand Down
6 changes: 0 additions & 6 deletions services/worker/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,11 @@ Set environment variables to configure the worker.
- `WORKER_JOB_TYPES_ONLY`: comma-separated list of the non-blocked job types to process, e.g. "dataset-config-names,dataset-split-names". If empty, the worker processes all the non-blocked jobs. Defaults to empty.
- `WORKER_KILL_LONG_JOB_INTERVAL_SECONDS`: the time interval at which the worker looks for long jobs to kill them. Defaults to `60` (1 minute).
- `WORKER_KILL_ZOMBIES_INTERVAL_SECONDS`: the time interval at which the worker looks for zombie jobs to kill them. Defaults to `600` (10 minutes).
- `WORKER_MAX_DISK_USAGE_PCT`: maximum disk usage of every storage disk in the list (in percentage) to allow a job to start. Set to 0 to disable the test. Defaults to 90.
- `WORKER_MAX_JOB_DURATION_SECONDS`: the maximum duration allowed for a job to run. If the job runs longer, it is killed (see `WORKER_KILL_LONG_JOB_INTERVAL_SECONDS`). Defaults to `1200` (20 minutes).
- `WORKER_MAX_LOAD_PCT`: maximum load of the machine (in percentage: the max between the 1m load and the 5m load divided by the number of CPUs \*100) allowed to start a job. Set to 0 to disable the test. Defaults to 70.
- `WORKER_MAX_MEMORY_PCT`: maximum memory (RAM + SWAP) usage of the machine (in percentage) allowed to start a job. Set to 0 to disable the test. Defaults to 80.
- `WORKER_MAX_MISSING_HEARTBEATS`: the number of hearbeats a job must have missed to be considered a zombie job. Defaults to `5`.
- `WORKER_SLEEP_SECONDS`: wait duration in seconds at each loop iteration before checking if resources are available and processing a job if any is available. Note that the loop doesn't wait just after finishing a job: the next job is immediately processed. Defaults to `15`.
- `WORKER_STORAGE_PATHS`: comma-separated list of paths to check for disk usage. Defaults to empty.

Also, it's possible to force the parent directory in which the temporary files (as the current job state file and its associated lock file) will be created by setting `TMPDIR` to a writable directory. If not set, the worker will use the default temporary directory of the system, as described in https://docs.python.org/3/library/tempfile.html#tempfile.gettempdir.

Expand All @@ -50,16 +48,12 @@ Also, set the modules cache configuration for the datasets-based worker. See [..

- `HF_MODULES_CACHE`: directory where the `datasets` library will store the cached dataset scripts. If not set, the datasets library will choose the default location. Defaults to None.

Note that both directories will be appended to `WORKER_STORAGE_PATHS` (see [../../libs/libcommon/README.md](../../libs/libcommon/README.md)) to hold the workers when the disk is full.

### Numba library

Numba requires setting the `NUMBA_CACHE_DIR` environment variable to a writable directory to cache the compiled functions. Required on cloud infrastructure (see https://stackoverflow.com/a/63367171/7351594):

- `NUMBA_CACHE_DIR`: directory where the `numba` decorators (used by `librosa`) can write cache.

Note that this directory will be appended to `WORKER_STORAGE_PATHS` (see [../../libs/libcommon/README.md](../../libs/libcommon/README.md)) to hold the workers when the disk is full.

### Huggingface_hub library

If the Hub is not https://huggingface.co (i.e., if you set the `COMMON_HF_ENDPOINT` environment variable), you must set the `HF_ENDPOINT` environment variable to the same value. See https://github.com/huggingface/datasets/pull/5196#issuecomment-1322191411 for more details:
Expand Down
5 changes: 0 additions & 5 deletions services/worker/src/worker/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ def from_env(cls) -> "UvicornConfig":
WORKER_HEARTBEAT_INTERVAL_SECONDS = 60
WORKER_KILL_LONG_JOB_INTERVAL_SECONDS = 60
WORKER_KILL_ZOMBIES_INTERVAL_SECONDS = 10 * 60
WORKER_MAX_DISK_USAGE_PCT = 90
WORKER_MAX_JOB_DURATION_SECONDS = 20 * 60
WORKER_MAX_LOAD_PCT = 70
WORKER_MAX_MEMORY_PCT = 80
Expand All @@ -67,14 +66,12 @@ class WorkerConfig:
job_types_only: list[str] = field(default_factory=get_empty_str_list)
kill_long_job_interval_seconds: float = WORKER_KILL_LONG_JOB_INTERVAL_SECONDS
kill_zombies_interval_seconds: float = WORKER_KILL_ZOMBIES_INTERVAL_SECONDS
max_disk_usage_pct: int = WORKER_MAX_DISK_USAGE_PCT
max_job_duration_seconds: float = WORKER_MAX_JOB_DURATION_SECONDS
max_load_pct: int = WORKER_MAX_LOAD_PCT
max_memory_pct: int = WORKER_MAX_MEMORY_PCT
max_missing_heartbeats: int = WORKER_MAX_MISSING_HEARTBEATS
sleep_seconds: float = WORKER_SLEEP_SECONDS
state_file_path: Optional[str] = WORKER_STATE_FILE_PATH
storage_paths: list[str] = field(default_factory=get_empty_str_list)

@classmethod
def from_env(cls) -> "WorkerConfig":
Expand All @@ -95,7 +92,6 @@ def from_env(cls) -> "WorkerConfig":
kill_zombies_interval_seconds=env.float(
name="KILL_ZOMBIES_INTERVAL_SECONDS", default=WORKER_KILL_ZOMBIES_INTERVAL_SECONDS
),
max_disk_usage_pct=env.int(name="MAX_DISK_USAGE_PCT", default=WORKER_MAX_DISK_USAGE_PCT),
max_job_duration_seconds=env.float(
name="MAX_JOB_DURATION_SECONDS", default=WORKER_MAX_JOB_DURATION_SECONDS
),
Expand All @@ -106,7 +102,6 @@ def from_env(cls) -> "WorkerConfig":
state_file_path=env.str(
name="STATE_FILE_PATH", default=WORKER_STATE_FILE_PATH
), # this environment variable is not expected to be set explicitly, it's set by the worker executor
storage_paths=env.list(name="STORAGE_PATHS", default=get_empty_str_list()),
)


Expand Down
24 changes: 3 additions & 21 deletions services/worker/src/worker/loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import logging
import random
import time
from dataclasses import dataclass, field
from dataclasses import dataclass
from datetime import datetime
from typing import Optional, TypedDict

Expand All @@ -19,7 +19,7 @@
Queue,
)
from libcommon.utils import JobInfo, get_datetime
from psutil import cpu_count, disk_usage, getloadavg, swap_memory, virtual_memory
from psutil import cpu_count, getloadavg, swap_memory, virtual_memory

from worker.config import AppConfig
from worker.job_manager import JobManager
Expand All @@ -43,23 +43,18 @@ class Loop:
job_runner_factory (`JobRunnerFactory`):
The job runner factory that will create a job runner for each job. Must be able to process the jobs of the
queue.
library_cache_paths (`set[str]`):
The paths of the library caches. Used to check if the disk is full.
worker_config (`WorkerConfig`):
Worker configuration.
state_file_path (`str`):
The path of the file where the state of the loop will be saved.
"""

job_runner_factory: BaseJobRunnerFactory
library_cache_paths: set[str]
app_config: AppConfig
state_file_path: str
storage_paths: set[str] = field(init=False)

def __post_init__(self) -> None:
self.queue = Queue()
self.storage_paths = set(self.app_config.worker.storage_paths).union(self.library_cache_paths)

def has_memory(self) -> bool:
if self.app_config.worker.max_memory_pct <= 0:
Expand All @@ -85,21 +80,8 @@ def has_cpu(self) -> bool:
logging.info(f"cpu load is too high: {load_pct:.0f}% - max is {self.app_config.worker.max_load_pct}%")
return ok

def has_storage(self) -> bool:
if self.app_config.worker.max_disk_usage_pct <= 0:
return True
for path in self.storage_paths:
try:
usage = disk_usage(path)
if usage.percent >= self.app_config.worker.max_disk_usage_pct:
return False
except Exception:
# if we can't get the disk usage, we let the process continue
return True
return True

def has_resources(self) -> bool:
return self.has_memory() and self.has_cpu() and self.has_storage()
return self.has_memory() and self.has_cpu()

def sleep(self) -> None:
jitter = 0.75 + random.random() / 2 # nosec
Expand Down
7 changes: 0 additions & 7 deletions services/worker/src/worker/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ class LibrariesResource(Resource):
previous_hf_update_download_counts: bool = field(init=False)
previous_verbosity: int = field(init=False)
hf_datasets_cache: Path = field(init=False)
storage_paths: set[str] = field(init=False)

def allocate(self) -> None:
self.hf_datasets_cache = (
Expand All @@ -45,12 +44,6 @@ def allocate(self) -> None:
# environment variable to the desired value.
# TODO: check here if huggingface_hub and datasets use the same endpoint

# Add the datasets and numba cache paths to the list of storage paths, to ensure the disk is not full
storage_paths = {str(self.hf_datasets_cache), str(datasets.config.HF_MODULES_CACHE)}
if self.numba_path is not None:
storage_paths.add(self.numba_path)
self.storage_paths = storage_paths

def release(self) -> None:
datasets.config.HF_ENDPOINT = self.previous_hf_endpoint
datasets.config.HF_UPDATE_DOWNLOAD_COUNTS = self.previous_hf_update_download_counts
Expand Down
1 change: 0 additions & 1 deletion services/worker/src/worker/start_worker_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@
storage_client=storage_client,
)
loop = Loop(
library_cache_paths=libraries_resource.storage_paths,
job_runner_factory=job_runner_factory,
state_file_path=state_file_path,
app_config=app_config,
Expand Down
1 change: 0 additions & 1 deletion services/worker/tests/test_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ def test_process_next_job(

loop = Loop(
job_runner_factory=factory,
library_cache_paths=libraries_resource.storage_paths,
app_config=app_config,
state_file_path=worker_state_file_path,
)
Expand Down
3 changes: 0 additions & 3 deletions services/worker/tests/test_resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@ def test_libraries(
hf_endpoint=hf_endpoint, init_hf_datasets_cache=init_hf_datasets_cache, numba_path=numba_path
)
assert datasets.config.HF_ENDPOINT == hf_endpoint
assert (numba_path in resource.storage_paths) == define_numba_path
assert str(resource.hf_datasets_cache) in resource.storage_paths
assert str(datasets.config.HF_MODULES_CACHE) in resource.storage_paths
assert not datasets.config.HF_UPDATE_DOWNLOAD_COUNTS
assert (str(resource.hf_datasets_cache) == init_hf_datasets_cache) == define_init_hf_datasets_cache

Expand Down
1 change: 0 additions & 1 deletion tools/docker-compose-datasets-server.yml
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,6 @@ services:
PARQUET_AND_INFO_URL_TEMPLATE: ${PARQUET_AND_INFO_URL_TEMPLATE-/datasets/%s/resolve/%s/%s}
PARQUET_METADATA_STORAGE_DIRECTORY: ${PARQUET_METADATA_STORAGE_DIRECTORY-/parquet_metadata}
ROWS_INDEX_MAX_ARROW_DATA_IN_MEMORY: ${ROWS_INDEX_MAX_ARROW_DATA_IN_MEMORY-300_000_000}
WORKER_STORAGE_PATHS: ${ASSETS_STORAGE_ROOT-/storage/assets}
# prometheus
PROMETHEUS_MULTIPROC_DIR: ${PROMETHEUS_MULTIPROC_DIR-}
# ^ note: the datasets cache is automatically added, so no need to add it here
Expand Down
1 change: 0 additions & 1 deletion tools/docker-compose-dev-base.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ services:
WORKER_CONTENT_MAX_BYTES: ${WORKER_CONTENT_MAX_BYTES-10_000_000}
WORKER_KILL_LONG_JOB_INTERVAL_SECONDS: ${WORKER_KILL_LONG_JOB_INTERVAL_SECONDS-60}
WORKER_KILL_ZOMBIES_INTERVAL_SECONDS: ${WORKER_KILL_ZOMBIES_INTERVAL_SECONDS-600}
WORKER_MAX_DISK_USAGE_PCT: ${WORKER_MAX_DISK_USAGE_PCT-90}
WORKER_MAX_JOB_DURATION_SECONDS: ${WORKER_MAX_JOB_DURATION_SECONDS-1200}
WORKER_MAX_MISSING_HEARTBEATS: ${WORKER_MAX_MISSING_HEARTBEATS-5}
WORKER_MAX_LOAD_PCT: ${WORKER_MAX_LOAD_PCT-70}
Expand Down
2 changes: 0 additions & 2 deletions tools/docker-compose-dev-datasets-server.yml
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,6 @@ services:
PARQUET_AND_INFO_URL_TEMPLATE: ${PARQUET_AND_INFO_URL_TEMPLATE-/datasets/%s/resolve/%s/%s}
PARQUET_METADATA_STORAGE_DIRECTORY: ${PARQUET_METADATA_STORAGE_DIRECTORY-/parquet_metadata}
ROWS_INDEX_MAX_ARROW_DATA_IN_MEMORY: ${ROWS_INDEX_MAX_ARROW_DATA_IN_MEMORY-300_000_000}
WORKER_STORAGE_PATHS: ${ASSETS_STORAGE_ROOT-/storage/assets}
# ^ note: the datasets cache is automatically added, so no need to add it here
# prometheus
PROMETHEUS_MULTIPROC_DIR: ${PROMETHEUS_MULTIPROC_DIR-}
# uvicorn
Expand Down

0 comments on commit 25c49dc

Please sign in to comment.