From 39f7156981f623dae0f3c6c13296c3c51f1c2f33 Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Mon, 18 Mar 2024 15:22:07 +0100 Subject: [PATCH] update kernel checker script --- docker/kernel_checker.py | 92 +++++++++++++++++++++++++++++++++------- 1 file changed, 76 insertions(+), 16 deletions(-) diff --git a/docker/kernel_checker.py b/docker/kernel_checker.py index 52d2ab2..fd7874d 100644 --- a/docker/kernel_checker.py +++ b/docker/kernel_checker.py @@ -1,30 +1,39 @@ #!/home/jovyan/.venv/bin/python +# How does this work? +# 1. controls that the service is not busy at regular intervals +# 2a. cheks if kernels are busy +# 2b. checks total CPU usage of all children processes is >= THRESHOLD_CPU_USAGE +# 3. if either of the above checks if True the service will result as busy + + import asyncio import json +import psutil import requests -from datetime import datetime import tornado + +from concurrent.futures import ThreadPoolExecutor, as_completed from contextlib import suppress +from datetime import datetime from typing import Final -KERNEL_BUSY_CHECK_INTERVAL_S: Final[float] = 5 +CHECK_INTERVAL_S: Final[float] = 5 +CPU_USAGE_MONITORING_INTERVAL_S: Final[float] = 1 +THRESHOLD_CPU_USAGE: Final[float] = 20 # percent in range [0, 100] -class JupyterKernelChecker: +class JupyterKernelMonitor: BASE_URL = "http://localhost:8888" HEADERS = {"accept": "application/json"} - def __init__(self) -> None: - self.last_idle: datetime | None = None - def _get(self, path: str) -> dict: r = requests.get(f"{self.BASE_URL}{path}", headers=self.HEADERS) return r.json() - def _are_kernels_busy(self) -> bool: + def are_kernels_busy(self) -> bool: json_response = self._get("/api/kernels") are_kernels_busy = False @@ -38,13 +47,63 @@ def _are_kernels_busy(self) -> bool: return are_kernels_busy + +class CPUUsageMonitor: + def __init__(self, threshold: float): + self.threshold = threshold + def _get_children_processes(self, pid) -> list[psutil.Process]: + try: + return psutil.Process(pid).children(recursive=True) + except psutil.NoSuchProcess: + return [] + + def _get_brother_processes(self) -> list[psutil.Process]: + # Returns the CPU usage of all processes except this one. + # ASSUMPTIONS: + # - `CURRENT_PROC` is a child of root process + # - `CURRENT_PROC` does not create any child processes + # + # It looks for its brothers (and their children) p1 to pN in order + # to compute real CPU usage. + # - CURRENT_PROC + # - p1 + # ... + # - pN + current_process = psutil.Process() + parent_pid = current_process.ppid() + children = self._get_children_processes(parent_pid) + return [c for c in children if c.pid != current_process.pid] + + def _get_total_cpu_usage(self) -> float: + with ThreadPoolExecutor(max_workers=10) as executor: + futures = [ + executor.submit(x.cpu_percent, CPU_USAGE_MONITORING_INTERVAL_S) + for x in self._get_brother_processes() + ] + return sum([future.result() for future in as_completed(futures)]) + + def are_children_busy(self) -> bool: + return self._get_total_cpu_usage() >= self.threshold + + +class ActivityManager: + def __init__(self, interval: float) -> None: + self.interval = interval + self.last_idle: datetime | None = None + + self.jupyter_kernel_monitor = JupyterKernelMonitor() + self.cpu_usage_monitor = CPUUsageMonitor(THRESHOLD_CPU_USAGE) + def check(self): - are_kernels_busy = self._are_kernels_busy() + is_busy = ( + self.jupyter_kernel_monitor.are_kernels_busy() + or self.cpu_usage_monitor.are_children_busy() + ) - if are_kernels_busy: + if is_busy: self.last_idle = None - if not are_kernels_busy and self.last_idle is None: + if not is_busy and self.last_idle is None: self.last_idle = datetime.utcnow() def get_idle_seconds(self) -> float: @@ -57,17 +116,18 @@ async def run(self): while True: with suppress(Exception): self.check() - await asyncio.sleep(KERNEL_BUSY_CHECK_INTERVAL_S) + await asyncio.sleep(self.interval) -kernel_checker = JupyterKernelChecker() +activity_manager = ActivityManager(CHECK_INTERVAL_S) class MainHandler(tornado.web.RequestHandler): def get(self): - idle_seconds = kernel_checker.get_idle_seconds() - response = {"seconds_inactive": idle_seconds if idle_seconds > 0 else 0} - self.write(json.dumps(response)) + idle_seconds = activity_manager.get_idle_seconds() + seconds_inactive = idle_seconds if idle_seconds > 0 else 0 + + self.write(json.dumps({"seconds_inactive": seconds_inactive})) def make_app() -> tornado.web.Application: @@ -77,7 +137,7 @@ def make_app() -> tornado.web.Application: async def main(): app = make_app() app.listen(19597) - asyncio.create_task(kernel_checker.run()) + asyncio.create_task(activity_manager.run()) await asyncio.Event().wait()