From 04c8de701270c04e3a1171307aecfdd02e082089 Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Mon, 25 Mar 2024 13:28:14 +0100 Subject: [PATCH] replaced tornado with builtin server --- docker/activity_monitor.py | 131 +++++++++++++++++---------------- tests/test_activity_monitor.py | 56 ++++++-------- 2 files changed, 91 insertions(+), 96 deletions(-) diff --git a/docker/activity_monitor.py b/docker/activity_monitor.py index ddeb251..702c1a8 100755 --- a/docker/activity_monitor.py +++ b/docker/activity_monitor.py @@ -1,20 +1,19 @@ #!/home/jovyan/.venv/bin/python -import asyncio -import logging import json +import logging import psutil import requests -import tornado import time -from threading import Thread +from abc import abstractmethod from concurrent.futures import ThreadPoolExecutor, as_completed from contextlib import suppress from datetime import datetime +from http.server import HTTPServer, BaseHTTPRequestHandler +from threading import Thread from typing import Final -from abc import abstractmethod _logger = logging.getLogger(__name__) @@ -446,70 +445,78 @@ def stop(self) -> None: self._thread.join() -class DebugHandler(tornado.web.RequestHandler): - def initialize(self, activity_manager: ActivityManager): - self.activity_manager: ActivityManager = activity_manager - - async def get(self): - assert self.activity_manager - self.write( - json.dumps( - { - "seconds_inactive": self.activity_manager.get_idle_seconds(), - "cpu_usage": { - "is_busy": self.activity_manager.cpu_usage_monitor.is_busy, - "total": self.activity_manager.cpu_usage_monitor.total_cpu_usage, - }, - "disk_usage": { - "is_busy": self.activity_manager.disk_usage_monitor.is_busy, - "total": { - "bytes_read_per_second": self.activity_manager.disk_usage_monitor.total_bytes_read, - "bytes_write_per_second": self.activity_manager.disk_usage_monitor.total_bytes_write, - }, - }, - "network_usage": { - "is_busy": self.activity_manager.network_monitor.is_busy, - "total": { - "bytes_received_per_second": self.activity_manager.network_monitor.bytes_received, - "bytes_sent_per_second": self.activity_manager.network_monitor.bytes_sent, - }, - }, - "kernel_monitor": { - "is_busy": self.activity_manager.jupyter_kernel_monitor.is_busy - }, - } - ) - ) +def _get_response_debug(activity_manager: ActivityManager) -> dict: + return { + "seconds_inactive": activity_manager.get_idle_seconds(), + "cpu_usage": { + "is_busy": activity_manager.cpu_usage_monitor.is_busy, + "total": activity_manager.cpu_usage_monitor.total_cpu_usage, + }, + "disk_usage": { + "is_busy": activity_manager.disk_usage_monitor.is_busy, + "total": { + "bytes_read_per_second": activity_manager.disk_usage_monitor.total_bytes_read, + "bytes_write_per_second": activity_manager.disk_usage_monitor.total_bytes_write, + }, + }, + "network_usage": { + "is_busy": activity_manager.network_monitor.is_busy, + "total": { + "bytes_received_per_second": activity_manager.network_monitor.bytes_received, + "bytes_sent_per_second": activity_manager.network_monitor.bytes_sent, + }, + }, + "kernel_monitor": {"is_busy": activity_manager.jupyter_kernel_monitor.is_busy}, + } -class MainHandler(tornado.web.RequestHandler): - def initialize(self, activity_manager: ActivityManager): - self.activity_manager: ActivityManager = activity_manager +def _get_response_root(activity_manager: ActivityManager) -> dict: + return {"seconds_inactive": activity_manager.get_idle_seconds()} - async def get(self): - assert self.activity_manager - self.write( - json.dumps({"seconds_inactive": self.activity_manager.get_idle_seconds()}) - ) +class ServerState: + pass -async def make_app() -> tornado.web.Application: - activity_manager = ActivityManager(CHECK_INTERVAL_S) - activity_manager.start() - app = tornado.web.Application( - [ - (r"/", MainHandler, {"activity_manager": activity_manager}), - (r"/debug", DebugHandler, {"activity_manager": activity_manager}), - ] - ) - return app + +class HTTPServerWithState(HTTPServer): + def __init__(self, server_address, RequestHandlerClass, state): + self.state = state # application's state + super().__init__(server_address, RequestHandlerClass) + + +class JSONRequestHandler(BaseHTTPRequestHandler): + def _send_response(self, code: int, data: dict) -> None: + self.send_response(code) + self.send_header("Content-type", "application/json") + self.end_headers() + self.wfile.write(json.dumps(data).encode("utf-8")) + + def do_GET(self): + state = self.server.state + + if self.path == "/": # The root endpoint + self._send_response(200, _get_response_root(state.activity_manager)) + elif self.path == "/debug": # The debug endpoint + self._send_response(200, _get_response_debug(state.activity_manager)) + else: # Handle case where the endpoint is not found + self._send_response( + 404, _get_response_debug({"error": "Resource not found"}) + ) + + +def make_server(port: int) -> HTTPServerWithState: + state = ServerState() + state.activity_manager = ActivityManager(CHECK_INTERVAL_S) + state.activity_manager.start() + + server_address = ("", port) # Listen on all interfaces, port 8000 + return HTTPServerWithState(server_address, JSONRequestHandler, state) -async def main(): - app = await make_app() - app.listen(19597) - await asyncio.Event().wait() +def main(): + http_server = make_server(LISTEN_PORT) + http_server.serve_forever() if __name__ == "__main__": - asyncio.run(main()) + main() diff --git a/tests/test_activity_monitor.py b/tests/test_activity_monitor.py index 79828a7..61962e2 100644 --- a/tests/test_activity_monitor.py +++ b/tests/test_activity_monitor.py @@ -1,16 +1,13 @@ import asyncio -import pytest -import psutil -import tornado.web import json -import tornado.httpserver -import tornado.ioloop -import threading +import psutil +import pytest import pytest_asyncio import requests import requests_mock +import threading +import time -from queue import Queue from typing import Callable, Final, Iterable, TYPE_CHECKING from pytest_mock import MockFixture from tenacity import AsyncRetrying @@ -127,7 +124,16 @@ async def test_disk_usage_monitor_still_busy( assert disk_usage_monitor.is_busy is True +@pytest.fixture +def mock_no_network_activity(mocker: MockFixture) -> None: + mocker.patch( + "activity_monitor.NetworkUsageMonitor._sample_total_network_usage", + side_effect=lambda: (time.time(), 0, 0), + ) + + async def test_network_usage_monitor_not_busy( + mock_no_network_activity: None, socket_server: None, mock__get_sibling_processes: Callable[[list[int]], list[psutil.Process]], create_activity_generator: Callable[[bool, bool, bool], _ActivityGenerator], @@ -201,29 +207,11 @@ async def server_url() -> str: @pytest_asyncio.fixture -async def tornado_server(mock_jupyter_kernel_monitor: None, server_url: str) -> None: - app = await activity_monitor.make_app() - - stop_queue = Queue() - - def _run_server_worker(): - http_server = tornado.httpserver.HTTPServer(app) - http_server.listen(activity_monitor.LISTEN_PORT) - current_io_loop = tornado.ioloop.IOLoop.current() - - def _queue_stopper() -> None: - stop_queue.get() - current_io_loop.stop() - - stopping_thread = threading.Thread(target=_queue_stopper, daemon=True) - stopping_thread.start() - - current_io_loop.start() - stopping_thread.join() +async def http_server(mock_jupyter_kernel_monitor: None, server_url: str) -> None: + server = activity_monitor.make_server(activity_monitor.LISTEN_PORT) - # cleanly shut down tornado server and loop - current_io_loop.close() - http_server.stop() + def _run_server_worker() -> None: + server.serve_forever() thread = threading.Thread(target=_run_server_worker, daemon=True) thread.start() @@ -238,15 +226,15 @@ def _queue_stopper() -> None: yield None - stop_queue.put(None) - thread.join(timeout=1) + server.shutdown() + server.server_close() - with pytest.raises(requests.exceptions.ReadTimeout): + with pytest.raises(requests.exceptions.RequestException): requests.get(f"{server_url}/", timeout=1) @pytest.mark.parametrize("are_kernels_busy", [False]) -async def test_tornado_server_ok(tornado_server: None, server_url: str): +async def test_http_server_ok(http_server: None, server_url: str): result = requests.get(f"{server_url}/", timeout=5) assert result.status_code == 200 @@ -271,7 +259,7 @@ async def test_activity_monitor_becomes_not_busy( socket_server: None, mock__get_sibling_processes: Callable[[list[int]], list[psutil.Process]], create_activity_generator: Callable[[bool, bool, bool], _ActivityGenerator], - tornado_server: None, + http_server: None, server_url: str, ): activity_generator = create_activity_generator(network=False, cpu=False, disk=False)