Skip to content

Commit

Permalink
replaced tornado with builtin server
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrei Neagu committed Mar 25, 2024
1 parent f113c79 commit 04c8de7
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 96 deletions.
131 changes: 69 additions & 62 deletions docker/activity_monitor.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
#!/home/jovyan/.venv/bin/python


import asyncio
import logging
import json
import logging
import psutil
import requests
import tornado
import time

from threading import Thread
from abc import abstractmethod
from concurrent.futures import ThreadPoolExecutor, as_completed
from contextlib import suppress
from datetime import datetime
from http.server import HTTPServer, BaseHTTPRequestHandler
from threading import Thread
from typing import Final
from abc import abstractmethod


_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -446,70 +445,78 @@ def stop(self) -> None:
self._thread.join()


class DebugHandler(tornado.web.RequestHandler):
def initialize(self, activity_manager: ActivityManager):
self.activity_manager: ActivityManager = activity_manager

async def get(self):
assert self.activity_manager
self.write(
json.dumps(
{
"seconds_inactive": self.activity_manager.get_idle_seconds(),
"cpu_usage": {
"is_busy": self.activity_manager.cpu_usage_monitor.is_busy,
"total": self.activity_manager.cpu_usage_monitor.total_cpu_usage,
},
"disk_usage": {
"is_busy": self.activity_manager.disk_usage_monitor.is_busy,
"total": {
"bytes_read_per_second": self.activity_manager.disk_usage_monitor.total_bytes_read,
"bytes_write_per_second": self.activity_manager.disk_usage_monitor.total_bytes_write,
},
},
"network_usage": {
"is_busy": self.activity_manager.network_monitor.is_busy,
"total": {
"bytes_received_per_second": self.activity_manager.network_monitor.bytes_received,
"bytes_sent_per_second": self.activity_manager.network_monitor.bytes_sent,
},
},
"kernel_monitor": {
"is_busy": self.activity_manager.jupyter_kernel_monitor.is_busy
},
}
)
)
def _get_response_debug(activity_manager: ActivityManager) -> dict:
return {
"seconds_inactive": activity_manager.get_idle_seconds(),
"cpu_usage": {
"is_busy": activity_manager.cpu_usage_monitor.is_busy,
"total": activity_manager.cpu_usage_monitor.total_cpu_usage,
},
"disk_usage": {
"is_busy": activity_manager.disk_usage_monitor.is_busy,
"total": {
"bytes_read_per_second": activity_manager.disk_usage_monitor.total_bytes_read,
"bytes_write_per_second": activity_manager.disk_usage_monitor.total_bytes_write,
},
},
"network_usage": {
"is_busy": activity_manager.network_monitor.is_busy,
"total": {
"bytes_received_per_second": activity_manager.network_monitor.bytes_received,
"bytes_sent_per_second": activity_manager.network_monitor.bytes_sent,
},
},
"kernel_monitor": {"is_busy": activity_manager.jupyter_kernel_monitor.is_busy},
}


class MainHandler(tornado.web.RequestHandler):
def initialize(self, activity_manager: ActivityManager):
self.activity_manager: ActivityManager = activity_manager
def _get_response_root(activity_manager: ActivityManager) -> dict:
return {"seconds_inactive": activity_manager.get_idle_seconds()}

async def get(self):
assert self.activity_manager
self.write(
json.dumps({"seconds_inactive": self.activity_manager.get_idle_seconds()})
)

class ServerState:
pass

async def make_app() -> tornado.web.Application:
activity_manager = ActivityManager(CHECK_INTERVAL_S)
activity_manager.start()
app = tornado.web.Application(
[
(r"/", MainHandler, {"activity_manager": activity_manager}),
(r"/debug", DebugHandler, {"activity_manager": activity_manager}),
]
)
return app

class HTTPServerWithState(HTTPServer):
def __init__(self, server_address, RequestHandlerClass, state):
self.state = state # application's state
super().__init__(server_address, RequestHandlerClass)


class JSONRequestHandler(BaseHTTPRequestHandler):
def _send_response(self, code: int, data: dict) -> None:
self.send_response(code)
self.send_header("Content-type", "application/json")
self.end_headers()
self.wfile.write(json.dumps(data).encode("utf-8"))

def do_GET(self):
state = self.server.state

if self.path == "/": # The root endpoint
self._send_response(200, _get_response_root(state.activity_manager))
elif self.path == "/debug": # The debug endpoint
self._send_response(200, _get_response_debug(state.activity_manager))
else: # Handle case where the endpoint is not found
self._send_response(
404, _get_response_debug({"error": "Resource not found"})
)


def make_server(port: int) -> HTTPServerWithState:
state = ServerState()
state.activity_manager = ActivityManager(CHECK_INTERVAL_S)
state.activity_manager.start()

server_address = ("", port) # Listen on all interfaces, port 8000
return HTTPServerWithState(server_address, JSONRequestHandler, state)


async def main():
app = await make_app()
app.listen(19597)
await asyncio.Event().wait()
def main():
http_server = make_server(LISTEN_PORT)
http_server.serve_forever()


if __name__ == "__main__":
asyncio.run(main())
main()
56 changes: 22 additions & 34 deletions tests/test_activity_monitor.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
import asyncio
import pytest
import psutil
import tornado.web
import json
import tornado.httpserver
import tornado.ioloop
import threading
import psutil
import pytest
import pytest_asyncio
import requests
import requests_mock
import threading
import time

from queue import Queue
from typing import Callable, Final, Iterable, TYPE_CHECKING
from pytest_mock import MockFixture
from tenacity import AsyncRetrying
Expand Down Expand Up @@ -127,7 +124,16 @@ async def test_disk_usage_monitor_still_busy(
assert disk_usage_monitor.is_busy is True


@pytest.fixture
def mock_no_network_activity(mocker: MockFixture) -> None:
mocker.patch(
"activity_monitor.NetworkUsageMonitor._sample_total_network_usage",
side_effect=lambda: (time.time(), 0, 0),
)


async def test_network_usage_monitor_not_busy(
mock_no_network_activity: None,
socket_server: None,
mock__get_sibling_processes: Callable[[list[int]], list[psutil.Process]],
create_activity_generator: Callable[[bool, bool, bool], _ActivityGenerator],
Expand Down Expand Up @@ -201,29 +207,11 @@ async def server_url() -> str:


@pytest_asyncio.fixture
async def tornado_server(mock_jupyter_kernel_monitor: None, server_url: str) -> None:
app = await activity_monitor.make_app()

stop_queue = Queue()

def _run_server_worker():
http_server = tornado.httpserver.HTTPServer(app)
http_server.listen(activity_monitor.LISTEN_PORT)
current_io_loop = tornado.ioloop.IOLoop.current()

def _queue_stopper() -> None:
stop_queue.get()
current_io_loop.stop()

stopping_thread = threading.Thread(target=_queue_stopper, daemon=True)
stopping_thread.start()

current_io_loop.start()
stopping_thread.join()
async def http_server(mock_jupyter_kernel_monitor: None, server_url: str) -> None:
server = activity_monitor.make_server(activity_monitor.LISTEN_PORT)

# cleanly shut down tornado server and loop
current_io_loop.close()
http_server.stop()
def _run_server_worker() -> None:
server.serve_forever()

thread = threading.Thread(target=_run_server_worker, daemon=True)
thread.start()
Expand All @@ -238,15 +226,15 @@ def _queue_stopper() -> None:

yield None

stop_queue.put(None)
thread.join(timeout=1)
server.shutdown()
server.server_close()

with pytest.raises(requests.exceptions.ReadTimeout):
with pytest.raises(requests.exceptions.RequestException):
requests.get(f"{server_url}/", timeout=1)


@pytest.mark.parametrize("are_kernels_busy", [False])
async def test_tornado_server_ok(tornado_server: None, server_url: str):
async def test_http_server_ok(http_server: None, server_url: str):
result = requests.get(f"{server_url}/", timeout=5)
assert result.status_code == 200

Expand All @@ -271,7 +259,7 @@ async def test_activity_monitor_becomes_not_busy(
socket_server: None,
mock__get_sibling_processes: Callable[[list[int]], list[psutil.Process]],
create_activity_generator: Callable[[bool, bool, bool], _ActivityGenerator],
tornado_server: None,
http_server: None,
server_url: str,
):
activity_generator = create_activity_generator(network=False, cpu=False, disk=False)
Expand Down

0 comments on commit 04c8de7

Please sign in to comment.