Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrei Neagu committed Mar 19, 2024
1 parent 3ea2298 commit 1ef6b75
Show file tree
Hide file tree
Showing 7 changed files with 346 additions and 69 deletions.
9 changes: 9 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ devenv: .venv ## create a python virtual environment with tools to dev, run and
requirements: devenv ## runs pip-tools to build requirements.txt that will be installed in the JupyterLab
# freezes requirements
pip-compile kernels/python-maths/requirements.in --resolver=backtracking --output-file kernels/python-maths/requirements.txt
pip-compile requirements/test.in --resolver=backtracking --output-file requirements/test.txt

# Builds new service version ----------------------------------------------------------------------------
define _bumpversion
Expand Down Expand Up @@ -66,6 +67,14 @@ publish-local: ## push to local throw away registry to test integration
docker push registry:5000/simcore/services/dynamic/$(DOCKER_IMAGE_NAME):$(DOCKER_IMAGE_TAG)
@curl registry:5000/v2/_catalog | jq

.PHONY: install-dev
install-dev: ## run tests in development mode
pip install -r requirements/test.txt

.PHONY: tests-dev
tests-dev: ## run tests in development mode
.venv/bin/pytest --pdb -vvv tests

.PHONY: help
help: ## this colorful help
@echo "Recipes for '$(notdir $(CURDIR))':"
Expand Down
181 changes: 112 additions & 69 deletions docker/activity_monitor.py
Original file line number Diff line number Diff line change
@@ -1,40 +1,106 @@
#!/home/jovyan/.venv/bin/python


# How does this work?
# 1. controls that the service is not busy at regular intervals
# 2a. cheks if kernels are busy
# 2b. checks total CPU usage of all children processes is >= THRESHOLD_CPU_USAGE
# 3. if either of the above checks if True the service will result as busy


import asyncio
import json
import psutil
import requests
import tornado
import subprocess
import time

from threading import Thread
from concurrent.futures import ThreadPoolExecutor, as_completed
from contextlib import suppress
from datetime import datetime
from typing import Final
from abc import abstractmethod


CHECK_INTERVAL_S: Final[float] = 5
CPU_USAGE_MONITORING_INTERVAL_S: Final[float] = 1
THRESHOLD_CPU_USAGE: Final[float] = 5 # percent in range [0, 100]


class JupyterKernelMonitor:
# Utilities
class AbstractIsBusyMonitor:
def __init__(self, poll_interval: float) -> None:
self._poll_interval: float = poll_interval
self._keep_running: bool = True
self._thread: Thread | None = None

self.is_busy: bool = True

@abstractmethod
def _check_if_busy(self) -> bool:
"""Must be user defined and returns if current
metric is to be considered busy
Returns:
bool: True if considered busy
"""

def _worker(self) -> None:
while self._keep_running:
self.is_busy = self._check_if_busy()
time.sleep(self._poll_interval)

def start(self) -> None:
self._thread = Thread(target=self._worker, daemon=True)
self._thread.start()

def stop(self) -> None:
self._keep_running = False
if self._thread:
self._thread.join()

def __enter__(self):
self.start()
return self

def __exit__(self, exc_type, exc_value, traceback):
self.stop()


def __get_children_processes(pid) -> list[psutil.Process]:
try:
return psutil.Process(pid).children(recursive=True)
except psutil.NoSuchProcess:
return []


def _get_brother_processes() -> list[psutil.Process]:
# Returns the CPU usage of all processes except this one.
# ASSUMPTIONS:
# - `CURRENT_PROC` is a child of root process
# - `CURRENT_PROC` does not create any child processes
#
# It looks for its brothers (and their children) p1 to pN in order
# to compute real CPU usage.
# - CURRENT_PROC
# - p1
# ...
# - pN
current_process = psutil.Process()
parent_pid = current_process.ppid()
children = __get_children_processes(parent_pid)
return [c for c in children if c.pid != current_process.pid]


# Monitors


class JupyterKernelMonitor(AbstractIsBusyMonitor):
BASE_URL = "http://localhost:8888"
HEADERS = {"accept": "application/json"}

def __init__(self, poll_interval: float) -> None:
super().__init__(poll_interval=poll_interval)

def _get(self, path: str) -> dict:
r = requests.get(f"{self.BASE_URL}{path}", headers=self.HEADERS)
return r.json()

def are_kernels_busy(self) -> bool:
def _are_kernels_busy(self) -> bool:
json_response = self._get("/api/kernels")

are_kernels_busy = False
Expand All @@ -48,52 +114,24 @@ def are_kernels_busy(self) -> bool:

return are_kernels_busy

def _check_if_busy(self) -> bool:
return self._are_kernels_busy()

class CPUUsageMonitor:
def __init__(self, threshold: float):
self.threshold = threshold

def _get_children_processes(self, pid) -> list[psutil.Process]:
try:
return psutil.Process(pid).children(recursive=True)
except psutil.NoSuchProcess:
return []

def _get_brother_processes(self) -> list[psutil.Process]:
# Returns the CPU usage of all processes except this one.
# ASSUMPTIONS:
# - `CURRENT_PROC` is a child of root process
# - `CURRENT_PROC` does not create any child processes
#
# It looks for its brothers (and their children) p1 to pN in order
# to compute real CPU usage.
# - CURRENT_PROC
# - p1
# ...
# - pN
current_process = psutil.Process()
parent_pid = current_process.ppid()
children = self._get_children_processes(parent_pid)
return [c for c in children if c.pid != current_process.pid]

def _get_cpu_usage(self, pid: int) -> float:
cmd = f"ps -p {pid} -o %cpu --no-headers"
output = subprocess.check_output(cmd, shell=True, universal_newlines=True)
try:
return float(output)
except ValueError:
print(f"Could not parse {pid} cpu usage: {output}")
return float(0)
class CPUUsageMonitor(AbstractIsBusyMonitor):
def __init__(self, poll_interval: float, *, threshold: float):
super().__init__(poll_interval=poll_interval)
self.threshold = threshold

def _get_total_cpu_usage(self) -> float:
with ThreadPoolExecutor(max_workers=10) as executor:
futures = [
executor.submit(x.cpu_percent, CPU_USAGE_MONITORING_INTERVAL_S)
for x in self._get_brother_processes()
for x in _get_brother_processes()
]
return sum([future.result() for future in as_completed(futures)])

def are_children_busy(self) -> bool:
def _check_if_busy(self) -> bool:
return self._get_total_cpu_usage() >= self.threshold


Expand All @@ -102,14 +140,13 @@ def __init__(self, interval: float) -> None:
self.interval = interval
self.last_idle: datetime | None = None

self.jupyter_kernel_monitor = JupyterKernelMonitor()
self.cpu_usage_monitor = CPUUsageMonitor(THRESHOLD_CPU_USAGE)
self.jupyter_kernel_monitor = JupyterKernelMonitor(CHECK_INTERVAL_S)
self.cpu_usage_monitor = CPUUsageMonitor(
CHECK_INTERVAL_S, threshold=THRESHOLD_CPU_USAGE
)

def check(self):
is_busy = (
self.jupyter_kernel_monitor.are_kernels_busy()
or self.cpu_usage_monitor.are_children_busy()
)
is_busy = self.jupyter_kernel_monitor.is_busy or self.cpu_usage_monitor.is_busy

if is_busy:
self.last_idle = None
Expand All @@ -121,7 +158,8 @@ def get_idle_seconds(self) -> float:
if self.last_idle is None:
return 0

return (datetime.utcnow() - self.last_idle).total_seconds()
idle_seconds = (datetime.utcnow() - self.last_idle).total_seconds()
return idle_seconds if idle_seconds > 0 else 0

async def run(self):
while True:
Expand All @@ -130,45 +168,50 @@ async def run(self):
await asyncio.sleep(self.interval)


activity_manager = ActivityManager(CHECK_INTERVAL_S)


class DebugHandler(tornado.web.RequestHandler):
def get(self):
def initialize(self, activity_manager: ActivityManager):
self.activity_manager: ActivityManager = activity_manager

async def get(self):
assert self.activity_manager
self.write(
json.dumps(
{
"seconds_inactive": self.activity_manager.get_idle_seconds(),
"cpu_usage": {
"current": activity_manager.cpu_usage_monitor._get_total_cpu_usage(),
"busy": activity_manager.cpu_usage_monitor.are_children_busy(),
"is_busy": self.activity_manager.cpu_usage_monitor.is_busy,
},
"kernal_monitor": {
"busy": activity_manager.jupyter_kernel_monitor.are_kernels_busy()
"kernel_monitor": {
"is_busy": self.activity_manager.jupyter_kernel_monitor.is_busy
},
}
)
)


class MainHandler(tornado.web.RequestHandler):
def get(self):
idle_seconds = activity_manager.get_idle_seconds()
seconds_inactive = idle_seconds if idle_seconds > 0 else 0
def initialize(self, activity_manager: ActivityManager):
self.activity_manager: ActivityManager = activity_manager

self.write(json.dumps({"seconds_inactive": seconds_inactive}))
async def get(self):
assert self.activity_manager
self.write(
json.dumps({"seconds_inactive": self.activity_manager.get_idle_seconds()})
)


def make_app() -> tornado.web.Application:
def make_app(activity_manager) -> tornado.web.Application:
return tornado.web.Application(
[
(r"/", MainHandler),
(r"/debug", DebugHandler),
(r"/", MainHandler, dict(activity_manager=activity_manager)),
(r"/debug", DebugHandler, dict(activity_manager=activity_manager)),
]
)


async def main():
app = make_app()
activity_manager = ActivityManager(CHECK_INTERVAL_S)
app = make_app(activity_manager)
app.listen(19597)
asyncio.create_task(activity_manager.run())
await asyncio.Event().wait()
Expand Down
12 changes: 12 additions & 0 deletions requirements/test.in
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# from jupyter

psutil
tornado

# testing

pytest
pytest-asyncio
pytest-mock
requests
tenacity
41 changes: 41 additions & 0 deletions requirements/test.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#
# This file is autogenerated by pip-compile with Python 3.10
# by the following command:
#
# pip-compile --output-file=requirements/test.txt requirements/test.in
#
certifi==2024.2.2
# via requests
charset-normalizer==3.3.2
# via requests
exceptiongroup==1.2.0
# via pytest
idna==3.6
# via requests
iniconfig==2.0.0
# via pytest
packaging==24.0
# via pytest
pluggy==1.4.0
# via pytest
psutil==5.9.8
# via -r requirements/test.in
pytest==8.1.1
# via
# -r requirements/test.in
# pytest-asyncio
# pytest-mock
pytest-asyncio==0.23.6
# via -r requirements/test.in
pytest-mock==3.12.0
# via -r requirements/test.in
requests==2.31.0
# via -r requirements/test.in
tenacity==8.2.3
# via -r requirements/test.in
tomli==2.0.1
# via pytest
tornado==6.4
# via -r requirements/test.in
urllib3==2.2.1
# via requests
13 changes: 13 additions & 0 deletions tests/_import_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import sys
from pathlib import Path

_CURRENT_DIR = (
Path(sys.argv[0] if __name__ == "__main__" else __file__).resolve().parent
)


def allow_imports() -> None:
path = (_CURRENT_DIR / "..." / ".." / ".." / "docker").absolute().resolve()
sys.path.append(f"{path}")

import activity_monitor
Empty file added tests/conftest.py
Empty file.
Loading

0 comments on commit 1ef6b75

Please sign in to comment.