From 33428514b52d6a872fc03742a9c3bcaf36675894 Mon Sep 17 00:00:00 2001 From: Olivier Le Thanh Duong Date: Mon, 18 Nov 2024 16:19:00 +0100 Subject: [PATCH] merge logs endpoint and new endpoint --- src/aleph/vm/orchestrator/supervisor.py | 4 +- src/aleph/vm/orchestrator/views/operator.py | 39 +++----- tests/supervisor/views/test_operator.py | 101 +------------------- 3 files changed, 14 insertions(+), 130 deletions(-) diff --git a/src/aleph/vm/orchestrator/supervisor.py b/src/aleph/vm/orchestrator/supervisor.py index 777674c2..a5ca999a 100644 --- a/src/aleph/vm/orchestrator/supervisor.py +++ b/src/aleph/vm/orchestrator/supervisor.py @@ -51,7 +51,6 @@ operate_confidential_measurement, operate_erase, operate_expire, - operate_logs, operate_logs_json, operate_reboot, operate_stop, @@ -105,8 +104,7 @@ def setup_webapp(): # /control APIs are used to control the VMs and access their logs web.post("/control/allocation/notify", notify_allocation), web.get("/control/machine/{ref}/stream_logs", stream_logs), - web.get("/control/machine/{ref}/logs.json", operate_logs_json), - web.get("/control/machine/{ref}/logs", operate_logs), + web.get("/control/machine/{ref}/logs", operate_logs_json), web.post("/control/machine/{ref}/expire", operate_expire), web.post("/control/machine/{ref}/stop", operate_stop), web.post("/control/machine/{ref}/erase", operate_erase), diff --git a/src/aleph/vm/orchestrator/views/operator.py b/src/aleph/vm/orchestrator/views/operator.py index 4cc9b3db..0422cca4 100644 --- a/src/aleph/vm/orchestrator/views/operator.py +++ b/src/aleph/vm/orchestrator/views/operator.py @@ -101,40 +101,23 @@ async def stream_logs(request: web.Request) -> web.StreamResponse: execution.vm.unregister_queue(queue) -@cors_allow_all -@require_jwk_authentication -async def operate_logs(request: web.Request, authenticated_sender: str) -> web.StreamResponse: - """Logs of a VM (not streaming)""" - vm_hash = get_itemhash_or_400(request.match_info) - pool: VmPool = request.app["vm_pool"] - execution = get_execution_or_404(vm_hash, pool=pool) - if not is_sender_authorized(authenticated_sender, execution.message): - return web.Response(status=403, body="Unauthorized sender") - - response = web.StreamResponse() - response.headers["Content-Type"] = "text/plain" - await response.prepare(request) - - for entry in execution.vm.past_logs(): - msg = f'{entry["__REALTIME_TIMESTAMP"].isoformat()}> {entry["MESSAGE"]}' - await response.write(msg.encode()) - await response.write_eof() - return response - - @cors_allow_all @require_jwk_authentication async def operate_logs_json(request: web.Request, authenticated_sender: str) -> web.StreamResponse: - """Logs of a VM (not streaming)""" + """Logs of a VM (not streaming) as json""" vm_hash = get_itemhash_or_400(request.match_info) # This endpoint allow logs for past executions, so we look into the database if any execution by that hash - # occurred, which we can then use to look for right - record = await metrics.get_last_record_for_vm(vm_hash=vm_hash) - if not record: - raise aiohttp.web_exceptions.HTTPNotFound(body="No execution found for this VM") - - message = get_message_executable_content(json.loads(record.message)) + # occurred, which we can then use to look for rights. We still check in the pool first, it is faster + pool: VmPool = request.app["vm_pool"] + execution = pool.executions.get(vm_hash) + if execution: + message = execution.message + else: + record = await metrics.get_last_record_for_vm(vm_hash=vm_hash) + if not record: + raise aiohttp.web_exceptions.HTTPNotFound(body="No execution found for this VM") + message = get_message_executable_content(json.loads(record.message)) if not is_sender_authorized(authenticated_sender, message): return web.Response(status=403, body="Unauthorized sender") diff --git a/tests/supervisor/views/test_operator.py b/tests/supervisor/views/test_operator.py index c43c4b75..b08d814b 100644 --- a/tests/supervisor/views/test_operator.py +++ b/tests/supervisor/views/test_operator.py @@ -228,103 +228,6 @@ class FakeVmPool: assert pool.systemd_manager.restart.call_count == 1 -@pytest.mark.asyncio -async def test_logs(aiohttp_client, mocker): - mock_address = "mock_address" - mock_hash = "fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_" - mocker.patch( - "aleph.vm.orchestrator.views.authentication.authenticate_jwk", - return_value=mock_address, - ) - - # noinspection PyMissingConstructor - class FakeVmPool(VmPool): - def __init__(self): - pass - - executions = { - mock_hash: mocker.Mock( - vm_hash=mock_hash, - message=mocker.Mock(address=mock_address), - is_confidential=False, - is_running=True, - vm=mocker.Mock( - past_logs=mocker.Mock( - return_value=[ - EntryDict( - SYSLOG_IDENTIFIER="stdout", - MESSAGE="logline1", - __REALTIME_TIMESTAMP=datetime.datetime(2020, 10, 12, 1, 2), - ), - EntryDict( - SYSLOG_IDENTIFIER="stdout", - MESSAGE="logline2", - __REALTIME_TIMESTAMP=datetime.datetime(2020, 10, 12, 1, 3), - ), - ] - ) - ), - ), - } - systemd_manager = mocker.Mock(restart=mocker.Mock()) - - app = setup_webapp() - pool = FakeVmPool() - app["vm_pool"] = pool - app["pubsub"] = FakeVmPool() - client = await aiohttp_client(app) - response = await client.get( - f"/control/machine/{mock_hash}/logs", - ) - assert response.status == 200 - assert await response.text() == "2020-10-12T01:02:00> logline12020-10-12T01:03:00> logline2" - - -@pytest.mark.asyncio -async def test_websocket_logs(aiohttp_client, mocker): - mock_address = "mock_address" - mock_hash = "fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_" - mocker.patch( - "aleph.vm.orchestrator.views.operator.authenticate_websocket_message", - return_value=mock_address, - ) - fake_queue: Queue[tuple[str, str]] = asyncio.Queue() - await fake_queue.put(("stdout", "this is a first log entry")) - - fakeVmPool = mocker.Mock( - executions={ - mock_hash: mocker.Mock( - vm_hash=mock_hash, - message=mocker.Mock(address=mock_address), - is_confidential=False, - is_running=True, - vm=mocker.Mock( - get_log_queue=mocker.Mock(return_value=fake_queue), - ), - ), - }, - ) - app = setup_webapp() - app["vm_pool"] = fakeVmPool - app["pubsub"] = None - client = await aiohttp_client(app) - websocket = await client.ws_connect( - f"/control/machine/{mock_hash}/stream_logs", - ) - await websocket.send_json({"auth": "auth is disabled"}) - response = await websocket.receive_json() - assert response == {"status": "connected"} - - response = await websocket.receive_json() - assert response == {"message": "this is a first log entry", "type": "stdout"} - - await fake_queue.put(("stdout", "this is a second log entry")) - response = await websocket.receive_json() - assert response == {"message": "this is a second log entry", "type": "stdout"} - await websocket.close() - assert websocket.closed - - @pytest.mark.asyncio async def test_websocket_logs_missing_auth(aiohttp_client, mocker): mock_address = "mock_address" @@ -538,12 +441,12 @@ async def test_get_past_logs(aiohttp_client, mocker, patch_datetime_now): ) app = setup_webapp() - pool = mocker.MagicMock() + pool = mocker.MagicMock(executions={}) app["vm_pool"] = pool app["pubsub"] = mocker.MagicMock() client = await aiohttp_client(app) response = await client.get( - f"/control/machine/{mock_hash}/logs.json", + f"/control/machine/{mock_hash}/logs", ) assert response.status == 200