From 1ef6b75b4af853906a3099b2644ed2c8e3b9f0f3 Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Tue, 19 Mar 2024 16:17:38 +0100 Subject: [PATCH] wip --- Makefile | 9 ++ docker/activity_monitor.py | 181 ++++++++++++++++++++------------- requirements/test.in | 12 +++ requirements/test.txt | 41 ++++++++ tests/_import_utils.py | 13 +++ tests/conftest.py | 0 tests/test_activity_monitor.py | 159 +++++++++++++++++++++++++++++ 7 files changed, 346 insertions(+), 69 deletions(-) create mode 100644 requirements/test.in create mode 100644 requirements/test.txt create mode 100644 tests/_import_utils.py create mode 100644 tests/conftest.py create mode 100644 tests/test_activity_monitor.py diff --git a/Makefile b/Makefile index 9a3fbf8..655e9a2 100644 --- a/Makefile +++ b/Makefile @@ -29,6 +29,7 @@ devenv: .venv ## create a python virtual environment with tools to dev, run and requirements: devenv ## runs pip-tools to build requirements.txt that will be installed in the JupyterLab # freezes requirements pip-compile kernels/python-maths/requirements.in --resolver=backtracking --output-file kernels/python-maths/requirements.txt + pip-compile requirements/test.in --resolver=backtracking --output-file requirements/test.txt # Builds new service version ---------------------------------------------------------------------------- define _bumpversion @@ -66,6 +67,14 @@ publish-local: ## push to local throw away registry to test integration docker push registry:5000/simcore/services/dynamic/$(DOCKER_IMAGE_NAME):$(DOCKER_IMAGE_TAG) @curl registry:5000/v2/_catalog | jq +.PHONY: install-dev +install-dev: ## run tests in development mode + pip install -r requirements/test.txt + +.PHONY: tests-dev +tests-dev: ## run tests in development mode + .venv/bin/pytest --pdb -vvv tests + .PHONY: help help: ## this colorful help @echo "Recipes for '$(notdir $(CURDIR))':" diff --git a/docker/activity_monitor.py b/docker/activity_monitor.py index 7919bfc..95b8765 100755 --- a/docker/activity_monitor.py +++ b/docker/activity_monitor.py @@ -1,24 +1,19 @@ #!/home/jovyan/.venv/bin/python -# How does this work? -# 1. controls that the service is not busy at regular intervals -# 2a. cheks if kernels are busy -# 2b. checks total CPU usage of all children processes is >= THRESHOLD_CPU_USAGE -# 3. if either of the above checks if True the service will result as busy - - import asyncio import json import psutil import requests import tornado -import subprocess +import time +from threading import Thread from concurrent.futures import ThreadPoolExecutor, as_completed from contextlib import suppress from datetime import datetime from typing import Final +from abc import abstractmethod CHECK_INTERVAL_S: Final[float] = 5 @@ -26,15 +21,86 @@ THRESHOLD_CPU_USAGE: Final[float] = 5 # percent in range [0, 100] -class JupyterKernelMonitor: +# Utilities +class AbstractIsBusyMonitor: + def __init__(self, poll_interval: float) -> None: + self._poll_interval: float = poll_interval + self._keep_running: bool = True + self._thread: Thread | None = None + + self.is_busy: bool = True + + @abstractmethod + def _check_if_busy(self) -> bool: + """Must be user defined and returns if current + metric is to be considered busy + + Returns: + bool: True if considered busy + """ + + def _worker(self) -> None: + while self._keep_running: + self.is_busy = self._check_if_busy() + time.sleep(self._poll_interval) + + def start(self) -> None: + self._thread = Thread(target=self._worker, daemon=True) + self._thread.start() + + def stop(self) -> None: + self._keep_running = False + if self._thread: + self._thread.join() + + def __enter__(self): + self.start() + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.stop() + + +def __get_children_processes(pid) -> list[psutil.Process]: + try: + return psutil.Process(pid).children(recursive=True) + except psutil.NoSuchProcess: + return [] + + +def _get_brother_processes() -> list[psutil.Process]: + # Returns the CPU usage of all processes except this one. + # ASSUMPTIONS: + # - `CURRENT_PROC` is a child of root process + # - `CURRENT_PROC` does not create any child processes + # + # It looks for its brothers (and their children) p1 to pN in order + # to compute real CPU usage. + # - CURRENT_PROC + # - p1 + # ... + # - pN + current_process = psutil.Process() + parent_pid = current_process.ppid() + children = __get_children_processes(parent_pid) + return [c for c in children if c.pid != current_process.pid] + + +# Monitors + + +class JupyterKernelMonitor(AbstractIsBusyMonitor): BASE_URL = "http://localhost:8888" HEADERS = {"accept": "application/json"} + def __init__(self, poll_interval: float) -> None: + super().__init__(poll_interval=poll_interval) + def _get(self, path: str) -> dict: r = requests.get(f"{self.BASE_URL}{path}", headers=self.HEADERS) return r.json() - def are_kernels_busy(self) -> bool: + def _are_kernels_busy(self) -> bool: json_response = self._get("/api/kernels") are_kernels_busy = False @@ -48,52 +114,24 @@ def are_kernels_busy(self) -> bool: return are_kernels_busy + def _check_if_busy(self) -> bool: + return self._are_kernels_busy() -class CPUUsageMonitor: - def __init__(self, threshold: float): - self.threshold = threshold - def _get_children_processes(self, pid) -> list[psutil.Process]: - try: - return psutil.Process(pid).children(recursive=True) - except psutil.NoSuchProcess: - return [] - - def _get_brother_processes(self) -> list[psutil.Process]: - # Returns the CPU usage of all processes except this one. - # ASSUMPTIONS: - # - `CURRENT_PROC` is a child of root process - # - `CURRENT_PROC` does not create any child processes - # - # It looks for its brothers (and their children) p1 to pN in order - # to compute real CPU usage. - # - CURRENT_PROC - # - p1 - # ... - # - pN - current_process = psutil.Process() - parent_pid = current_process.ppid() - children = self._get_children_processes(parent_pid) - return [c for c in children if c.pid != current_process.pid] - - def _get_cpu_usage(self, pid: int) -> float: - cmd = f"ps -p {pid} -o %cpu --no-headers" - output = subprocess.check_output(cmd, shell=True, universal_newlines=True) - try: - return float(output) - except ValueError: - print(f"Could not parse {pid} cpu usage: {output}") - return float(0) +class CPUUsageMonitor(AbstractIsBusyMonitor): + def __init__(self, poll_interval: float, *, threshold: float): + super().__init__(poll_interval=poll_interval) + self.threshold = threshold def _get_total_cpu_usage(self) -> float: with ThreadPoolExecutor(max_workers=10) as executor: futures = [ executor.submit(x.cpu_percent, CPU_USAGE_MONITORING_INTERVAL_S) - for x in self._get_brother_processes() + for x in _get_brother_processes() ] return sum([future.result() for future in as_completed(futures)]) - def are_children_busy(self) -> bool: + def _check_if_busy(self) -> bool: return self._get_total_cpu_usage() >= self.threshold @@ -102,14 +140,13 @@ def __init__(self, interval: float) -> None: self.interval = interval self.last_idle: datetime | None = None - self.jupyter_kernel_monitor = JupyterKernelMonitor() - self.cpu_usage_monitor = CPUUsageMonitor(THRESHOLD_CPU_USAGE) + self.jupyter_kernel_monitor = JupyterKernelMonitor(CHECK_INTERVAL_S) + self.cpu_usage_monitor = CPUUsageMonitor( + CHECK_INTERVAL_S, threshold=THRESHOLD_CPU_USAGE + ) def check(self): - is_busy = ( - self.jupyter_kernel_monitor.are_kernels_busy() - or self.cpu_usage_monitor.are_children_busy() - ) + is_busy = self.jupyter_kernel_monitor.is_busy or self.cpu_usage_monitor.is_busy if is_busy: self.last_idle = None @@ -121,7 +158,8 @@ def get_idle_seconds(self) -> float: if self.last_idle is None: return 0 - return (datetime.utcnow() - self.last_idle).total_seconds() + idle_seconds = (datetime.utcnow() - self.last_idle).total_seconds() + return idle_seconds if idle_seconds > 0 else 0 async def run(self): while True: @@ -130,20 +168,21 @@ async def run(self): await asyncio.sleep(self.interval) -activity_manager = ActivityManager(CHECK_INTERVAL_S) - - class DebugHandler(tornado.web.RequestHandler): - def get(self): + def initialize(self, activity_manager: ActivityManager): + self.activity_manager: ActivityManager = activity_manager + + async def get(self): + assert self.activity_manager self.write( json.dumps( { + "seconds_inactive": self.activity_manager.get_idle_seconds(), "cpu_usage": { - "current": activity_manager.cpu_usage_monitor._get_total_cpu_usage(), - "busy": activity_manager.cpu_usage_monitor.are_children_busy(), + "is_busy": self.activity_manager.cpu_usage_monitor.is_busy, }, - "kernal_monitor": { - "busy": activity_manager.jupyter_kernel_monitor.are_kernels_busy() + "kernel_monitor": { + "is_busy": self.activity_manager.jupyter_kernel_monitor.is_busy }, } ) @@ -151,24 +190,28 @@ def get(self): class MainHandler(tornado.web.RequestHandler): - def get(self): - idle_seconds = activity_manager.get_idle_seconds() - seconds_inactive = idle_seconds if idle_seconds > 0 else 0 + def initialize(self, activity_manager: ActivityManager): + self.activity_manager: ActivityManager = activity_manager - self.write(json.dumps({"seconds_inactive": seconds_inactive})) + async def get(self): + assert self.activity_manager + self.write( + json.dumps({"seconds_inactive": self.activity_manager.get_idle_seconds()}) + ) -def make_app() -> tornado.web.Application: +def make_app(activity_manager) -> tornado.web.Application: return tornado.web.Application( [ - (r"/", MainHandler), - (r"/debug", DebugHandler), + (r"/", MainHandler, dict(activity_manager=activity_manager)), + (r"/debug", DebugHandler, dict(activity_manager=activity_manager)), ] ) async def main(): - app = make_app() + activity_manager = ActivityManager(CHECK_INTERVAL_S) + app = make_app(activity_manager) app.listen(19597) asyncio.create_task(activity_manager.run()) await asyncio.Event().wait() diff --git a/requirements/test.in b/requirements/test.in new file mode 100644 index 0000000..e41c02b --- /dev/null +++ b/requirements/test.in @@ -0,0 +1,12 @@ +# from jupyter + +psutil +tornado + +# testing + +pytest +pytest-asyncio +pytest-mock +requests +tenacity \ No newline at end of file diff --git a/requirements/test.txt b/requirements/test.txt new file mode 100644 index 0000000..6f731a3 --- /dev/null +++ b/requirements/test.txt @@ -0,0 +1,41 @@ +# +# This file is autogenerated by pip-compile with Python 3.10 +# by the following command: +# +# pip-compile --output-file=requirements/test.txt requirements/test.in +# +certifi==2024.2.2 + # via requests +charset-normalizer==3.3.2 + # via requests +exceptiongroup==1.2.0 + # via pytest +idna==3.6 + # via requests +iniconfig==2.0.0 + # via pytest +packaging==24.0 + # via pytest +pluggy==1.4.0 + # via pytest +psutil==5.9.8 + # via -r requirements/test.in +pytest==8.1.1 + # via + # -r requirements/test.in + # pytest-asyncio + # pytest-mock +pytest-asyncio==0.23.6 + # via -r requirements/test.in +pytest-mock==3.12.0 + # via -r requirements/test.in +requests==2.31.0 + # via -r requirements/test.in +tenacity==8.2.3 + # via -r requirements/test.in +tomli==2.0.1 + # via pytest +tornado==6.4 + # via -r requirements/test.in +urllib3==2.2.1 + # via requests diff --git a/tests/_import_utils.py b/tests/_import_utils.py new file mode 100644 index 0000000..33681c4 --- /dev/null +++ b/tests/_import_utils.py @@ -0,0 +1,13 @@ +import sys +from pathlib import Path + +_CURRENT_DIR = ( + Path(sys.argv[0] if __name__ == "__main__" else __file__).resolve().parent +) + + +def allow_imports() -> None: + path = (_CURRENT_DIR / "..." / ".." / ".." / "docker").absolute().resolve() + sys.path.append(f"{path}") + + import activity_monitor diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_activity_monitor.py b/tests/test_activity_monitor.py new file mode 100644 index 0000000..9387183 --- /dev/null +++ b/tests/test_activity_monitor.py @@ -0,0 +1,159 @@ +import ctypes +import pytest +import socket +import threading +import psutil +import time + +from concurrent.futures import ThreadPoolExecutor, wait +from multiprocessing import Array, Process +from tempfile import NamedTemporaryFile + +from typing import Callable, Final, TYPE_CHECKING +from pytest_mock import MockFixture +from tenacity import AsyncRetrying +from tenacity.stop import stop_after_delay +from tenacity.wait import wait_fixed + + +if TYPE_CHECKING: + from ..docker import activity_monitor +else: + from _import_utils import allow_imports + + allow_imports() + import activity_monitor + + +_LOCAL_LISTEN_PORT: Final[int] = 12345 + +pytestmark = pytest.mark.asyncio + + +class _ListenSocketServer: + def __init__(self): + self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.server_socket.bind(("localhost", _LOCAL_LISTEN_PORT)) + self.server_socket.listen(100) # max number of connections + self.stop_event = threading.Event() + + def start(self): + threading.Thread(target=self._accept_clients, daemon=True).start() + + def stop(self): + self.stop_event.set() + + def _accept_clients(self): + while not self.stop_event.is_set(): + client_socket, _ = self.server_socket.accept() + threading.Thread( + target=self._handle_client, daemon=True, args=(client_socket,) + ).start() + + def _handle_client(self, client_socket): + try: + while not self.stop_event.is_set(): + data = client_socket.recv(1024) + if not data: + break + finally: + client_socket.close() + + +class _ActivityGenerator: + def __init__(self, *, network: bool, cpu: bool, disk: bool) -> None: + self._process: Process | None = None + + _keep_running = True + self.shared_array = Array(ctypes.c_bool, 4) + self.shared_array[0] = network + self.shared_array[1] = cpu + self.shared_array[2] = disk + self.shared_array[3] = _keep_running + + def __load_cpu(self) -> None: + for _ in range(1000000): + pass + + def __load_network(self) -> None: + client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + client_socket.connect(("localhost", _LOCAL_LISTEN_PORT)) + client_socket.sendall("mock_message_to_send".encode()) + client_socket.close() + + def __load_disk(self) -> None: + with NamedTemporaryFile() as temp_file: + temp_file.write(b"0" * 1024 * 1024) # 1MB + temp_file.read() + + def _run(self) -> None: + with ThreadPoolExecutor(max_workers=3) as executor: + while self.shared_array[3]: + futures = [] + if self.shared_array[0]: + futures.append(executor.submit(self.__load_network)) + if self.shared_array[1]: + futures.append(executor.submit(self.__load_cpu)) + if self.shared_array[2]: + futures.append(executor.submit(self.__load_disk)) + + wait(futures) + time.sleep(0.01) + + def __enter__(self): + self.start() + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.stop() + + def start(self) -> None: + self._process = Process(target=self._run, daemon=True) + self._process.start() + + def stop(self) -> None: + self.shared_array[3] = False + if self._process: + self._process.join() + + def get_pid(self) -> int: + assert self._process + return self._process.pid + + +@pytest.fixture +def socket_server() -> None: + socket_server = _ListenSocketServer() + socket_server.start() + yield None + socket_server.stop() + + +@pytest.fixture +def mock__get_brother_processes(mocker: MockFixture) -> Callable: + def _(pids: list[int]) -> None: + mocker.patch( + "activity_monitor._get_brother_processes", + return_value=[psutil.Process(p) for p in pids], + ) + + return _ + + +async def test_is_working(socket_server: None, mock__get_brother_processes: Callable): + with _ActivityGenerator(network=False, cpu=False, disk=False) as activity_generator: + mock__get_brother_processes([activity_generator.get_pid()]) + + assert len(activity_monitor._get_brother_processes()) == 1 + + # some tests + with activity_monitor.CPUUsageMonitor(1, threshold=0) as cpu_usage_monitor: + # poll for it to be idle since it takes some time + async for attempt in AsyncRetrying( + stop=stop_after_delay(3), wait=wait_fixed(0.1), reraise=True + ): + with attempt: + # TODO: figure out why test is wrong here + assert cpu_usage_monitor.is_busy is False + + # now we can test whatever here