diff --git a/.coverage b/.coverage new file mode 100644 index 0000000..dd919e7 Binary files /dev/null and b/.coverage differ diff --git a/.gitignore b/.gitignore index 7c0d726..6f3e694 100644 --- a/.gitignore +++ b/.gitignore @@ -54,4 +54,3 @@ docker-compose.yml validation/ *.ignore.* -.coverage \ No newline at end of file diff --git a/.osparc/jupyter-math/runtime.yml b/.osparc/jupyter-math/runtime.yml index 75d2568..3eaad03 100644 --- a/.osparc/jupyter-math/runtime.yml +++ b/.osparc/jupyter-math/runtime.yml @@ -21,5 +21,5 @@ paths-mapping: callbacks-mapping: inactivity: service: container - command: ["python", "/docker/activity.py"] + command: ["python", "/usr/local/bin/service-monitor/activity.py"] timeout: 1 \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index 4831188..861839e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -92,8 +92,26 @@ ENV JP_LSP_VIRTUAL_DIR="/home/${NB_USER}/.virtual_documents" # Copying boot scripts COPY --chown=$NB_UID:$NB_GID docker /docker -RUN chmod +x /docker/activity.py \ - && chmod +x /docker/activity_monitor.py +# install service activity monitor +ARG ACTIVITY_MONITOR_VERSION=v0.0.1 + +# Detection thresholds for application +ENV ACTIVITY_MONITOR_BUSY_THRESHOLD_CPU_PERCENT=0.5 +ENV ACTIVITY_MONITOR_BUSY_THRESHOLD_DISK_READ_BPS=0 +ENV ACTIVITY_MONITOR_BUSY_THRESHOLD_DISK_WRITE_BPS=0 +ENV ACTIVITY_MONITOR_BUSY_THRESHOLD_NETWORK_RECEIVE_BPS=1024 +ENV ACTIVITY_MONITOR_BUSY_THRESHOLD_NETWORK_SENT_BPS=1024 + +# install service activity monitor +RUN apt-get update && \ + apt-get install -y curl && \ + # install using curl + curl -sSL https://raw.githubusercontent.com/ITISFoundation/service-activity-monitor/main/scripts/install.sh | \ + bash -s -- ${ACTIVITY_MONITOR_VERSION} && \ + # cleanup and remove curl + apt-get purge -y --auto-remove curl && \ + rm -rf /var/lib/apt/lists/* + RUN echo 'export PATH="/home/${NB_USER}/.venv/bin:$PATH"' >> "/home/${NB_USER}/.bashrc" diff --git a/Makefile b/Makefile index 15bf5c5..5402f42 100644 --- a/Makefile +++ b/Makefile @@ -67,18 +67,6 @@ publish-local: ## push to local throw away registry to test integration docker push registry:5000/simcore/services/dynamic/$(DOCKER_IMAGE_NAME):$(DOCKER_IMAGE_TAG) @curl registry:5000/v2/_catalog | jq -.PHONY: install-test -install-test: ## install dependencies for testing - pip install -r requirements/test.txt - -.PHONY: tests-dev -tests-dev: ## run tests in development mode - .venv/bin/pytest --pdb -vvv tests - -.PHONY: tests-ci -tests-ci: ## run testds in the CI - .venv/bin/pytest -vvv --color=yes --cov-report term --cov=activity_monitor tests - .PHONY: help help: ## this colorful help @echo "Recipes for '$(notdir $(CURDIR))':" diff --git a/README.md b/README.md index 516169d..096624f 100644 --- a/README.md +++ b/README.md @@ -29,15 +29,6 @@ If you already have a local copy of **o2S2PARC** running a make publish-local ``` -Setup for test development locally: - -```shell -make devenv -source .venv/bin/activate -make tests-dev -``` - - ### Testing manually After a new service version has been published on the master deployment, it can be manually tested. For example a Template, called "Test Jupyter-math 2.0.9 ipywidgets" can be used for internal testing on the master deployment. diff --git a/docker/activity.py b/docker/activity.py deleted file mode 100644 index c8c7989..0000000 --- a/docker/activity.py +++ /dev/null @@ -1,4 +0,0 @@ -import requests - -r = requests.get("http://localhost:19597") -print(r.text) diff --git a/docker/activity_monitor.py b/docker/activity_monitor.py deleted file mode 100755 index 7b763ac..0000000 --- a/docker/activity_monitor.py +++ /dev/null @@ -1,519 +0,0 @@ -import json -import logging -import psutil -import requests -import time - -from abc import abstractmethod -from concurrent.futures import ThreadPoolExecutor, as_completed -from contextlib import suppress -from datetime import datetime -from http.server import HTTPServer, BaseHTTPRequestHandler -from threading import Thread -from typing import Final - - -_logger = logging.getLogger(__name__) - - -LISTEN_PORT: Final[int] = 19597 - -KERNEL_CHECK_INTERVAL_S: Final[float] = 5 -CHECK_INTERVAL_S: Final[float] = 1 -THREAD_EXECUTOR_WORKERS: Final[int] = 10 - -_KB: Final[int] = 1024 - -BUSY_USAGE_THRESHOLD_CPU: Final[float] = 0.5 # percent in range [0, 100] -BUSY_USAGE_THRESHOLD_DISK_READ: Final[int] = 0 -BUSY_USAGE_THRESHOLD_DISK_WRITE: Final[int] = 0 -BUSY_USAGE_THRESHOLD_NETWORK_RECEIVED: Final[int] = 1 * _KB -BUSY_USAGE_THRESHOLD_NETWORK_SENT: Final[int] = 1 * _KB - - -# Utilities -class AbstractIsBusyMonitor: - def __init__(self, poll_interval: float) -> None: - self._poll_interval: float = poll_interval - self._keep_running: bool = True - self._thread: Thread | None = None - - self.is_busy: bool = True - self.thread_executor = ThreadPoolExecutor(max_workers=THREAD_EXECUTOR_WORKERS) - - @abstractmethod - def _check_if_busy(self) -> bool: - """Must be user defined and returns if current - metric is to be considered busy - - Returns: - bool: True if considered busy - """ - - def _worker(self) -> None: - while self._keep_running: - try: - self.is_busy = self._check_if_busy() - except Exception as e: - _logger.exception("Failed to check if busy") - time.sleep(self._poll_interval) - - def start(self) -> None: - self._thread = Thread( - target=self._worker, - daemon=True, - name=f"{self.__class__.__name__}_check_busy", - ) - self._thread.start() - - def stop(self) -> None: - self._keep_running = False - if self._thread: - self._thread.join() - self.thread_executor.shutdown(wait=True) - - def __enter__(self): - self.start() - return self - - def __exit__(self, exc_type, exc_value, traceback): - self.stop() - - -def __get_children_processes_recursive(pid) -> list[psutil.Process]: - try: - return psutil.Process(pid).children(recursive=True) - except psutil.NoSuchProcess: - return [] - - -def _get_sibling_processes() -> list[psutil.Process]: - # Returns the CPU usage of all processes except this one. - # ASSUMPTIONS: - # - `CURRENT_PROC` is a child of root process - # - `CURRENT_PROC` does not create any child processes - # - # It looks for its brothers (and their children) p1 to pN in order - # to compute real CPU usage. - # - CURRENT_PROC - # - p1 - # ... - # - pN - current_process = psutil.Process() - parent_pid = current_process.ppid() - all_children = __get_children_processes_recursive(parent_pid) - return [c for c in all_children if c.pid != current_process.pid] - - -# Monitors - - -class JupyterKernelMonitor(AbstractIsBusyMonitor): - BASE_URL = "http://localhost:8888" - HEADERS = {"accept": "application/json"} - - def __init__(self, poll_interval: float) -> None: - super().__init__(poll_interval=poll_interval) - self.are_kernels_busy: bool = False - - def _get(self, path: str) -> dict: - r = requests.get(f"{self.BASE_URL}{path}", headers=self.HEADERS) - return r.json() - - def _update_kernels_activity(self) -> None: - json_response = self._get("/api/kernels") - - are_kernels_busy = False - - for kernel_data in json_response: - kernel_id = kernel_data["id"] - - kernel_info = self._get(f"/api/kernels/{kernel_id}") - if kernel_info["execution_state"] != "idle": - are_kernels_busy = True - - self.are_kernels_busy = are_kernels_busy - - def _check_if_busy(self) -> bool: - self._update_kernels_activity() - return self.are_kernels_busy - - -ProcessID = int -TimeSeconds = float -PercentCPU = float - - -class CPUUsageMonitor(AbstractIsBusyMonitor): - """At regular intervals computes the total CPU usage - and averages over 1 second. - """ - - def __init__(self, poll_interval: float, *, busy_threshold: float): - super().__init__(poll_interval=poll_interval) - self.busy_threshold = busy_threshold - - # snapshot - self._last_sample: dict[ProcessID, tuple[TimeSeconds, PercentCPU]] = ( - self._sample_total_cpu_usage() - ) - self.total_cpu_usage: PercentCPU = 0 - - @staticmethod - def _sample_cpu_usage( - process: psutil.Process, - ) -> tuple[ProcessID, tuple[TimeSeconds, PercentCPU]]: - """returns: tuple[pid, tuple[time, percent_cpu_usage]]""" - return (process.pid, (time.time(), process.cpu_percent())) - - def _sample_total_cpu_usage( - self, - ) -> dict[ProcessID, tuple[TimeSeconds, PercentCPU]]: - futures = [ - self.thread_executor.submit(self._sample_cpu_usage, p) - for p in _get_sibling_processes() - ] - return dict([f.result() for f in as_completed(futures)]) - - @staticmethod - def _get_cpu_over_1_second( - last: tuple[TimeSeconds, PercentCPU], current: tuple[TimeSeconds, PercentCPU] - ) -> float: - interval = current[0] - last[0] - measured_cpu_in_interval = current[1] - # cpu_over_1_second[%] = 1[s] * measured_cpu_in_interval[%] / interval[s] - return measured_cpu_in_interval / interval - - def _update_total_cpu_usage(self) -> None: - current_sample = self._sample_total_cpu_usage() - - total_cpu: float = 0 - for pid, time_and_cpu_usage in current_sample.items(): - if pid not in self._last_sample: - continue # skip if not found - - last_time_and_cpu_usage = self._last_sample[pid] - total_cpu += self._get_cpu_over_1_second( - last_time_and_cpu_usage, time_and_cpu_usage - ) - - self._last_sample = current_sample # replace - - self.total_cpu_usage = total_cpu - - def _check_if_busy(self) -> bool: - self._update_total_cpu_usage() - return self.total_cpu_usage > self.busy_threshold - - -BytesRead = int -BytesWrite = int - - -class DiskUsageMonitor(AbstractIsBusyMonitor): - def __init__( - self, - poll_interval: float, - *, - read_usage_threshold: int, - write_usage_threshold: int, - ): - super().__init__(poll_interval=poll_interval) - self.read_usage_threshold = read_usage_threshold - self.write_usage_threshold = write_usage_threshold - - self._last_sample: dict[ - ProcessID, tuple[TimeSeconds, BytesRead, BytesWrite] - ] = self._sample_total_disk_usage() - - self.total_bytes_read: BytesRead = 0 - self.total_bytes_write: BytesWrite = 0 - - @staticmethod - def _sample_disk_usage( - process: psutil.Process, - ) -> tuple[ProcessID, tuple[TimeSeconds, BytesRead, BytesWrite]]: - counters = process.io_counters() - return (process.pid, (time.time(), counters.read_bytes, counters.write_bytes)) - - def _sample_total_disk_usage( - self, - ) -> dict[ProcessID, tuple[TimeSeconds, BytesRead, BytesWrite]]: - futures = [ - self.thread_executor.submit(self._sample_disk_usage, p) - for p in _get_sibling_processes() - ] - return dict([f.result() for f in as_completed(futures)]) - - @staticmethod - def _get_bytes_over_one_second( - last: tuple[TimeSeconds, BytesRead, BytesWrite], - current: tuple[TimeSeconds, BytesRead, BytesWrite], - ) -> tuple[BytesRead, BytesWrite]: - interval = current[0] - last[0] - measured_bytes_read_in_interval = current[1] - last[1] - measured_bytes_write_in_interval = current[2] - last[2] - - # bytes_*_1_second[%] = 1[s] * measured_bytes_*_in_interval[%] / interval[s] - bytes_read_over_1_second = int(measured_bytes_read_in_interval / interval) - bytes_write_over_1_second = int(measured_bytes_write_in_interval / interval) - return bytes_read_over_1_second, bytes_write_over_1_second - - def _update_total_disk_usage(self) -> None: - current_sample = self._sample_total_disk_usage() - - total_bytes_read: int = 0 - total_bytes_write: int = 0 - for pid, time_and_disk_usage in current_sample.items(): - if pid not in self._last_sample: - continue # skip if not found - - last_time_and_disk_usage = self._last_sample[pid] - - bytes_read, bytes_write = self._get_bytes_over_one_second( - last_time_and_disk_usage, time_and_disk_usage - ) - total_bytes_read += bytes_read - total_bytes_write += bytes_write - - self._last_sample = current_sample # replace - - self.total_bytes_read = total_bytes_read - self.total_bytes_write = total_bytes_write - - def _check_if_busy(self) -> bool: - self._update_total_disk_usage() - return ( - self.total_bytes_read > self.read_usage_threshold - or self.total_bytes_write > self.write_usage_threshold - ) - - -InterfaceName = str -BytesReceived = int -BytesSent = int - - -class NetworkUsageMonitor(AbstractIsBusyMonitor): - _EXCLUDE_INTERFACES: set[InterfaceName] = { - "lo", - } - - def __init__( - self, - poll_interval: float, - *, - received_usage_threshold: int, - sent_usage_threshold: int, - ): - super().__init__(poll_interval=poll_interval) - self.received_usage_threshold = received_usage_threshold - self.sent_usage_threshold = sent_usage_threshold - - self._last_sample: tuple[TimeSeconds, BytesReceived, BytesSent] = ( - self._sample_total_network_usage() - ) - self.bytes_received: BytesReceived = 0 - self.bytes_sent: BytesSent = 0 - - def _sample_total_network_usage( - self, - ) -> tuple[TimeSeconds, BytesReceived, BytesSent]: - net_io_counters = psutil.net_io_counters(pernic=True) - - total_bytes_received: int = 0 - total_bytes_sent: int = 0 - for nic, stats in net_io_counters.items(): - if nic in self._EXCLUDE_INTERFACES: - continue - - total_bytes_received += stats.bytes_recv - total_bytes_sent += stats.bytes_sent - - return time.time(), total_bytes_received, total_bytes_sent - - @staticmethod - def _get_bytes_over_one_second( - last: tuple[TimeSeconds, BytesReceived, BytesSent], - current: tuple[TimeSeconds, BytesReceived, BytesSent], - ) -> tuple[BytesReceived, BytesSent]: - interval = current[0] - last[0] - measured_bytes_received_in_interval = current[1] - last[1] - measured_bytes_sent_in_interval = current[2] - last[2] - - # bytes_*_1_second[%] = 1[s] * measured_bytes_*_in_interval[%] / interval[s] - bytes_received_over_1_second = int( - measured_bytes_received_in_interval / interval - ) - bytes_sent_over_1_second = int(measured_bytes_sent_in_interval / interval) - return bytes_received_over_1_second, bytes_sent_over_1_second - - def _update_total_network_usage(self) -> None: - current_sample = self._sample_total_network_usage() - - bytes_received, bytes_sent = self._get_bytes_over_one_second( - self._last_sample, current_sample - ) - - self._last_sample = current_sample # replace - - self.bytes_received = bytes_received - self.bytes_sent = bytes_sent - - def _check_if_busy(self) -> bool: - self._update_total_network_usage() - return ( - self.bytes_received > self.received_usage_threshold - or self.bytes_sent > self.sent_usage_threshold - ) - - -class ActivityManager: - def __init__(self, interval: float) -> None: - self._keep_running: bool = True - self._thread: Thread | None = None - - self.interval = interval - self.last_idle: datetime | None = None - - self.jupyter_kernel_monitor = JupyterKernelMonitor(KERNEL_CHECK_INTERVAL_S) - self.cpu_usage_monitor = CPUUsageMonitor( - CHECK_INTERVAL_S, - busy_threshold=BUSY_USAGE_THRESHOLD_CPU, - ) - self.disk_usage_monitor = DiskUsageMonitor( - CHECK_INTERVAL_S, - read_usage_threshold=BUSY_USAGE_THRESHOLD_DISK_READ, - write_usage_threshold=BUSY_USAGE_THRESHOLD_DISK_WRITE, - ) - self.network_monitor = NetworkUsageMonitor( - CHECK_INTERVAL_S, - received_usage_threshold=BUSY_USAGE_THRESHOLD_NETWORK_RECEIVED, - sent_usage_threshold=BUSY_USAGE_THRESHOLD_NETWORK_SENT, - ) - - def check(self): - is_busy = ( - self.jupyter_kernel_monitor.is_busy - or self.cpu_usage_monitor.is_busy - or self.disk_usage_monitor.is_busy - or self.network_monitor.is_busy - ) - - if is_busy: - self.last_idle = None - - if not is_busy and self.last_idle is None: - self.last_idle = datetime.utcnow() - - def get_idle_seconds(self) -> float: - if self.last_idle is None: - return 0 - - idle_seconds = (datetime.utcnow() - self.last_idle).total_seconds() - return idle_seconds if idle_seconds > 0 else 0 - - def _worker(self) -> None: - while self._keep_running: - with suppress(Exception): - self.check() - time.sleep(self.interval) - - def start(self) -> None: - self.jupyter_kernel_monitor.start() - self.cpu_usage_monitor.start() - self.disk_usage_monitor.start() - self.network_monitor.start() - - self._thread = Thread( - target=self._worker, - daemon=True, - name=f"{self.__class__.__name__}_check_busy", - ) - self._thread.start() - - def stop(self) -> None: - self.jupyter_kernel_monitor.stop() - self.cpu_usage_monitor.stop() - self.disk_usage_monitor.stop() - self.network_monitor.stop() - - self._keep_running = False - self._thread.join() - - -def _get_response_debug(activity_manager: ActivityManager) -> dict: - return { - "seconds_inactive": activity_manager.get_idle_seconds(), - "cpu_usage": { - "is_busy": activity_manager.cpu_usage_monitor.is_busy, - "total": activity_manager.cpu_usage_monitor.total_cpu_usage, - }, - "disk_usage": { - "is_busy": activity_manager.disk_usage_monitor.is_busy, - "total": { - "bytes_read_per_second": activity_manager.disk_usage_monitor.total_bytes_read, - "bytes_write_per_second": activity_manager.disk_usage_monitor.total_bytes_write, - }, - }, - "network_usage": { - "is_busy": activity_manager.network_monitor.is_busy, - "total": { - "bytes_received_per_second": activity_manager.network_monitor.bytes_received, - "bytes_sent_per_second": activity_manager.network_monitor.bytes_sent, - }, - }, - "kernel_monitor": {"is_busy": activity_manager.jupyter_kernel_monitor.is_busy}, - } - - -def _get_response_root(activity_manager: ActivityManager) -> dict: - return {"seconds_inactive": activity_manager.get_idle_seconds()} - - -class ServerState: - pass - - -class HTTPServerWithState(HTTPServer): - def __init__(self, server_address, RequestHandlerClass, state): - self.state = state # application's state - super().__init__(server_address, RequestHandlerClass) - - -class JSONRequestHandler(BaseHTTPRequestHandler): - def _send_response(self, code: int, data: dict) -> None: - self.send_response(code) - self.send_header("Content-type", "application/json") - self.end_headers() - self.wfile.write(json.dumps(data).encode("utf-8")) - - def do_GET(self): - state = self.server.state - - if self.path == "/": # The root endpoint - self._send_response(200, _get_response_root(state.activity_manager)) - elif self.path == "/debug": # The debug endpoint - self._send_response(200, _get_response_debug(state.activity_manager)) - else: # Handle case where the endpoint is not found - self._send_response( - 404, _get_response_debug({"error": "Resource not found"}) - ) - - -def make_server(port: int) -> HTTPServerWithState: - state = ServerState() - state.activity_manager = ActivityManager(CHECK_INTERVAL_S) - state.activity_manager.start() - - server_address = ("", port) # Listen on all interfaces, port 8000 - return HTTPServerWithState(server_address, JSONRequestHandler, state) - - -def main(): - http_server = make_server(LISTEN_PORT) - http_server.serve_forever() - - -if __name__ == "__main__": - main() diff --git a/docker/entrypoint.bash b/docker/entrypoint.bash index 56b83c7..2029a36 100755 --- a/docker/entrypoint.bash +++ b/docker/entrypoint.bash @@ -74,5 +74,5 @@ chmod gu-w "/home/${NB_USER}/work" echo echo "$INFO" "Starting notebook ..." -exec gosu "$NB_USER" python /docker/activity_monitor.py & +exec gosu "$NB_USER" python /usr/local/bin/service-monitor/activity_monitor.py & exec gosu "$NB_USER" /docker/boot_notebook.bash diff --git a/requirements/test.in b/requirements/test.in deleted file mode 100644 index 202a1b1..0000000 --- a/requirements/test.in +++ /dev/null @@ -1,14 +0,0 @@ -# from jupyter - -psutil -tornado - -# testing - -pytest -pytest-asyncio -pytest-cov -pytest-mock -requests -requests-mock -tenacity \ No newline at end of file diff --git a/requirements/test.txt b/requirements/test.txt deleted file mode 100644 index 43fae43..0000000 --- a/requirements/test.txt +++ /dev/null @@ -1,54 +0,0 @@ -# -# This file is autogenerated by pip-compile with Python 3.10 -# by the following command: -# -# pip-compile --output-file=requirements/test.txt requirements/test.in -# -certifi==2024.2.2 - # via requests -charset-normalizer==3.3.2 - # via requests -coverage[toml]==7.4.4 - # via pytest-cov -exceptiongroup==1.2.0 - # via pytest -idna==3.6 - # via requests -iniconfig==2.0.0 - # via pytest -packaging==24.0 - # via pytest -pluggy==1.4.0 - # via pytest -psutil==5.9.8 - # via -r requirements/test.in -pytest==8.1.1 - # via - # -r requirements/test.in - # pytest-asyncio - # pytest-cov - # pytest-mock -pytest-asyncio==0.23.6 - # via -r requirements/test.in -pytest-cov==4.1.0 - # via -r requirements/test.in -pytest-mock==3.12.0 - # via -r requirements/test.in -requests==2.31.0 - # via - # -r requirements/test.in - # requests-mock -requests-mock==1.11.0 - # via -r requirements/test.in -six==1.16.0 - # via requests-mock -tenacity==8.2.3 - # via -r requirements/test.in -tomli==2.0.1 - # via - # coverage - # pytest -tornado==6.4 - # via -r requirements/test.in -urllib3==2.2.1 - # via requests diff --git a/scripts/ci/run_tests.sh b/scripts/ci/run_tests.sh deleted file mode 100755 index e1d4c8f..0000000 --- a/scripts/ci/run_tests.sh +++ /dev/null @@ -1,12 +0,0 @@ -#/bin/sh - -# http://redsymbol.net/articles/unofficial-bash-strict-mode/ -set -o errexit # abort on nonzero exitstatus -set -o nounset # abort on unbound variable -set -o pipefail # don't hide errors within pipes -IFS=$'\n\t' - -make .venv -source .venv/bin/activate -make install-test -make tests-ci \ No newline at end of file diff --git a/tests/_import_utils.py b/tests/_import_utils.py deleted file mode 100644 index 33681c4..0000000 --- a/tests/_import_utils.py +++ /dev/null @@ -1,13 +0,0 @@ -import sys -from pathlib import Path - -_CURRENT_DIR = ( - Path(sys.argv[0] if __name__ == "__main__" else __file__).resolve().parent -) - - -def allow_imports() -> None: - path = (_CURRENT_DIR / "..." / ".." / ".." / "docker").absolute().resolve() - sys.path.append(f"{path}") - - import activity_monitor diff --git a/tests/conftest.py b/tests/conftest.py deleted file mode 100644 index 78984a4..0000000 --- a/tests/conftest.py +++ /dev/null @@ -1,135 +0,0 @@ -import ctypes -import pytest -import socket -import threading -import time - -from concurrent.futures import ThreadPoolExecutor, wait -from multiprocessing import Array, Process -from tempfile import NamedTemporaryFile - -from typing import Callable, Final, Iterable - - -_LOCAL_LISTEN_PORT: Final[int] = 12345 - - -class _ListenSocketServer: - def __init__(self): - self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.server_socket.bind(("localhost", _LOCAL_LISTEN_PORT)) - self.server_socket.listen(100) # max number of connections - self._process: Process | None = None - - def start(self): - self._process = Process(target=self._accept_clients, daemon=True) - self._process.start() - - def stop(self): - if self._process: - self._process.terminate() - self._process.join() - - def _accept_clients(self): - while True: - client_socket, _ = self.server_socket.accept() - threading.Thread( - target=self._handle_client, daemon=True, args=(client_socket,) - ).start() - - def _handle_client(self, client_socket): - try: - while True: - data = client_socket.recv(1024) - if not data: - break - finally: - client_socket.close() - - -@pytest.fixture -def socket_server() -> None: - socket_server = _ListenSocketServer() - socket_server.start() - yield None - socket_server.stop() - - -class _ActivityGenerator: - def __init__(self, *, network: bool, cpu: bool, disk: bool) -> None: - self._process: Process | None = None - - _keep_running = True - self.shared_array = Array(ctypes.c_bool, 4) - self.shared_array[0] = network - self.shared_array[1] = cpu - self.shared_array[2] = disk - self.shared_array[3] = _keep_running - - def __load_cpu(self) -> None: - for _ in range(1000000): - pass - - def __load_network(self) -> None: - client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - client_socket.connect(("localhost", _LOCAL_LISTEN_PORT)) - client_socket.sendall("mock_message_to_send".encode()) - client_socket.close() - - def __load_disk(self) -> None: - with NamedTemporaryFile() as temp_file: - temp_file.write(b"0" * 1024 * 1024) # 1MB - temp_file.read() - - def _run(self) -> None: - with ThreadPoolExecutor(max_workers=3) as executor: - while self.shared_array[3]: - futures = [] - if self.shared_array[0]: - futures.append(executor.submit(self.__load_network)) - if self.shared_array[1]: - futures.append(executor.submit(self.__load_cpu)) - if self.shared_array[2]: - futures.append(executor.submit(self.__load_disk)) - - wait(futures) - time.sleep(0.1) - - def __enter__(self): - self.start() - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self.stop() - - def start(self) -> None: - self._process = Process(target=self._run, daemon=True) - self._process.start() - - def stop(self) -> None: - _keep_running = False - self.shared_array[3] = _keep_running - if self._process: - self._process.join() - - def get_pid(self) -> int: - assert self._process - return self._process.pid - - -@pytest.fixture -def create_activity_generator() -> ( - Iterable[Callable[[bool, bool, bool], _ActivityGenerator]] -): - created: list[_ActivityGenerator] = [] - - def _(*, network: bool, cpu: bool, disk: bool) -> _ActivityGenerator: - instance = _ActivityGenerator(network=network, cpu=cpu, disk=disk) - instance.start() - created.append(instance) - return instance - - yield _ - - for instance in created: - instance.stop() diff --git a/tests/test_activity_monitor.py b/tests/test_activity_monitor.py deleted file mode 100644 index 61962e2..0000000 --- a/tests/test_activity_monitor.py +++ /dev/null @@ -1,284 +0,0 @@ -import asyncio -import json -import psutil -import pytest -import pytest_asyncio -import requests -import requests_mock -import threading -import time - -from typing import Callable, Final, Iterable, TYPE_CHECKING -from pytest_mock import MockFixture -from tenacity import AsyncRetrying -from tenacity.stop import stop_after_delay -from tenacity.wait import wait_fixed -from conftest import _ActivityGenerator - - -if TYPE_CHECKING: - from ..docker import activity_monitor -else: - from _import_utils import allow_imports - - allow_imports() - import activity_monitor - -pytestmark = pytest.mark.asyncio - - -@pytest.fixture -def mock__get_sibling_processes( - mocker: MockFixture, -) -> Callable[[list[int]], list[psutil.Process]]: - def _get_processes(pids: list[int]) -> list[psutil.Process]: - results = [] - for pid in pids: - proc = psutil.Process(pid) - assert proc.status() - results.append(proc) - return results - - def _(pids: list[int]) -> None: - mocker.patch( - "activity_monitor._get_sibling_processes", return_value=_get_processes(pids) - ) - - return _ - - -async def test_cpu_usage_monitor_not_busy( - socket_server: None, - mock__get_sibling_processes: Callable[[list[int]], list[psutil.Process]], - create_activity_generator: Callable[[bool, bool, bool], _ActivityGenerator], -): - activity_generator = create_activity_generator(network=False, cpu=False, disk=False) - mock__get_sibling_processes([activity_generator.get_pid()]) - - with activity_monitor.CPUUsageMonitor(1, busy_threshold=5) as cpu_usage_monitor: - async for attempt in AsyncRetrying( - stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True - ): - with attempt: - assert cpu_usage_monitor.total_cpu_usage == 0 - assert cpu_usage_monitor.is_busy is False - - -async def test_cpu_usage_monitor_still_busy( - socket_server: None, - mock__get_sibling_processes: Callable[[list[int]], list[psutil.Process]], - create_activity_generator: Callable[[bool, bool, bool], _ActivityGenerator], -): - activity_generator = create_activity_generator(network=False, cpu=True, disk=False) - mock__get_sibling_processes([activity_generator.get_pid()]) - - with activity_monitor.CPUUsageMonitor(0.5, busy_threshold=5) as cpu_usage_monitor: - # wait for monitor to trigger - await asyncio.sleep(1) - - # must still result busy - assert cpu_usage_monitor.total_cpu_usage > 0 - assert cpu_usage_monitor.is_busy is True - - -async def test_disk_usage_monitor_not_busy( - socket_server: None, - mock__get_sibling_processes: Callable[[list[int]], list[psutil.Process]], - create_activity_generator: Callable[[bool, bool, bool], _ActivityGenerator], -): - activity_generator = create_activity_generator(network=False, cpu=False, disk=False) - mock__get_sibling_processes([activity_generator.get_pid()]) - - with activity_monitor.DiskUsageMonitor( - 0.5, read_usage_threshold=0, write_usage_threshold=0 - ) as disk_usage_monitor: - async for attempt in AsyncRetrying( - stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True - ): - with attempt: - read_bytes = disk_usage_monitor.total_bytes_read - write_bytes = disk_usage_monitor.total_bytes_write - assert read_bytes == 0 - assert write_bytes == 0 - assert disk_usage_monitor.is_busy is False - - -async def test_disk_usage_monitor_still_busy( - socket_server: None, - mock__get_sibling_processes: Callable[[list[int]], list[psutil.Process]], - create_activity_generator: Callable[[bool, bool, bool], _ActivityGenerator], -): - activity_generator = create_activity_generator(network=False, cpu=False, disk=True) - mock__get_sibling_processes([activity_generator.get_pid()]) - - with activity_monitor.DiskUsageMonitor( - 0.5, read_usage_threshold=0, write_usage_threshold=0 - ) as disk_usage_monitor: - # wait for monitor to trigger - await asyncio.sleep(1) - write_bytes = disk_usage_monitor.total_bytes_write - # NOTE: due to os disk cache reading is not reliable not testing it - assert write_bytes > 0 - - # must still result busy - assert disk_usage_monitor.is_busy is True - - -@pytest.fixture -def mock_no_network_activity(mocker: MockFixture) -> None: - mocker.patch( - "activity_monitor.NetworkUsageMonitor._sample_total_network_usage", - side_effect=lambda: (time.time(), 0, 0), - ) - - -async def test_network_usage_monitor_not_busy( - mock_no_network_activity: None, - socket_server: None, - mock__get_sibling_processes: Callable[[list[int]], list[psutil.Process]], - create_activity_generator: Callable[[bool, bool, bool], _ActivityGenerator], -): - activity_generator = create_activity_generator(network=False, cpu=False, disk=False) - mock__get_sibling_processes([activity_generator.get_pid()]) - - with activity_monitor.NetworkUsageMonitor( - 0.5, received_usage_threshold=0, sent_usage_threshold=0 - ) as network_usage_monitor: - async for attempt in AsyncRetrying( - stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True - ): - with attempt: - assert network_usage_monitor.bytes_received == 0 - assert network_usage_monitor.bytes_sent == 0 - assert network_usage_monitor.is_busy is False - - -@pytest.fixture -def mock_network_monitor_exclude_interfaces(mocker: MockFixture) -> None: - mocker.patch("activity_monitor.NetworkUsageMonitor._EXCLUDE_INTERFACES", new=set()) - assert activity_monitor.NetworkUsageMonitor._EXCLUDE_INTERFACES == set() - - -async def test_network_usage_monitor_still_busy( - mock_network_monitor_exclude_interfaces: None, - socket_server: None, - mock__get_sibling_processes: Callable[[list[int]], list[psutil.Process]], - create_activity_generator: Callable[[bool, bool, bool], _ActivityGenerator], -): - activity_generator = create_activity_generator(network=True, cpu=False, disk=False) - mock__get_sibling_processes([activity_generator.get_pid()]) - - with activity_monitor.NetworkUsageMonitor( - 0.5, received_usage_threshold=0, sent_usage_threshold=0 - ) as network_usage_monitor: - # wait for monitor to trigger - await asyncio.sleep(1) - - assert network_usage_monitor.bytes_received > 0 - assert network_usage_monitor.bytes_sent > 0 - assert network_usage_monitor.is_busy is True - - -@pytest.fixture -def mock_jupyter_kernel_monitor(are_kernels_busy: bool) -> Iterable[None]: - with requests_mock.Mocker(real_http=True) as m: - m.get("http://localhost:8888/api/kernels", text=json.dumps([{"id": "atest1"}])) - m.get( - "http://localhost:8888/api/kernels/atest1", - text=json.dumps( - {"execution_state": "running" if are_kernels_busy else "idle"} - ), - ) - yield - - -@pytest.mark.parametrize("are_kernels_busy", [True, False]) -async def test_jupyter_kernel_monitor( - mock_jupyter_kernel_monitor: None, are_kernels_busy: bool -): - kernel_monitor = activity_monitor.JupyterKernelMonitor(1) - kernel_monitor._update_kernels_activity() - assert kernel_monitor.are_kernels_busy is are_kernels_busy - - -@pytest_asyncio.fixture -async def server_url() -> str: - return f"http://localhost:{activity_monitor.LISTEN_PORT}" - - -@pytest_asyncio.fixture -async def http_server(mock_jupyter_kernel_monitor: None, server_url: str) -> None: - server = activity_monitor.make_server(activity_monitor.LISTEN_PORT) - - def _run_server_worker() -> None: - server.serve_forever() - - thread = threading.Thread(target=_run_server_worker, daemon=True) - thread.start() - - # ensure server is running - async for attempt in AsyncRetrying( - stop=stop_after_delay(3), wait=wait_fixed(0.1), reraise=True - ): - with attempt: - result = requests.get(f"{server_url}/", timeout=1) - assert result.status_code == 200, result.text - - yield None - - server.shutdown() - server.server_close() - - with pytest.raises(requests.exceptions.RequestException): - requests.get(f"{server_url}/", timeout=1) - - -@pytest.mark.parametrize("are_kernels_busy", [False]) -async def test_http_server_ok(http_server: None, server_url: str): - result = requests.get(f"{server_url}/", timeout=5) - assert result.status_code == 200 - - -_BIG_THRESHOLD: Final[int] = int(1e10) - - -@pytest.fixture -def mock_activity_manager_config(mocker: MockFixture) -> None: - mocker.patch("activity_monitor.CHECK_INTERVAL_S", 1) - mocker.patch("activity_monitor.KERNEL_CHECK_INTERVAL_S", 1) - - mocker.patch( - "activity_monitor.BUSY_USAGE_THRESHOLD_NETWORK_RECEIVED", _BIG_THRESHOLD - ) - mocker.patch("activity_monitor.BUSY_USAGE_THRESHOLD_NETWORK_SENT", _BIG_THRESHOLD) - - -@pytest.mark.parametrize("are_kernels_busy", [False]) -async def test_activity_monitor_becomes_not_busy( - mock_activity_manager_config: None, - socket_server: None, - mock__get_sibling_processes: Callable[[list[int]], list[psutil.Process]], - create_activity_generator: Callable[[bool, bool, bool], _ActivityGenerator], - http_server: None, - server_url: str, -): - activity_generator = create_activity_generator(network=False, cpu=False, disk=False) - mock__get_sibling_processes([activity_generator.get_pid()]) - - async for attempt in AsyncRetrying( - stop=stop_after_delay(10), wait=wait_fixed(0.1), reraise=True - ): - with attempt: - # check that all become not busy - result = requests.get(f"{server_url}/debug", timeout=5) - assert result.status_code == 200 - debug_response = result.json() - assert debug_response["cpu_usage"]["is_busy"] is False - assert debug_response["disk_usage"]["is_busy"] is False - assert debug_response["kernel_monitor"]["is_busy"] is False - assert debug_response["network_usage"]["is_busy"] is False - - result = requests.get(f"{server_url}/", timeout=2) - assert result.status_code == 200 - response = result.json() - assert response["seconds_inactive"] > 0