diff --git a/tests/supervisor/views/test_operator.py b/tests/supervisor/views/test_operator.py index c8b4a2dc..4776b3c4 100644 --- a/tests/supervisor/views/test_operator.py +++ b/tests/supervisor/views/test_operator.py @@ -1,3 +1,4 @@ +import asyncio import datetime import pytest @@ -144,3 +145,54 @@ def __init__(self): ) assert response.status == 200 assert await response.text() == "2020-10-12T01:02:00> logline12020-10-12T01:03:00> logline2" + + +@pytest.mark.asyncio +async def test_websocket_logs(aiohttp_client, mocker): + mock_address = "mock_address" + mock_hash = "fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_" + mocker.patch( + "aleph.vm.orchestrator.views.operator.authenticate_websocket_message", + return_value=mock_address, + ) + fake_queue = asyncio.Queue() + + # noinspection PyMissingConstructor + class FakeVmPool(VmPool): + def __init__(self): + pass + + executions = { + mock_hash: mocker.Mock( + vm_hash=mock_hash, + message=mocker.Mock(address=mock_address), + is_confidential=False, + is_running=True, + vm=mocker.Mock( + get_log_queue=mocker.Mock(return_value=fake_queue), + ), + ), + } + systemd_manager = mocker.Mock(restart=mocker.Mock()) + + app = setup_webapp() + pool = FakeVmPool() + app["vm_pool"] = pool + app["pubsub"] = FakeVmPool() + client = await aiohttp_client(app) + websocket = await client.ws_connect( + f"/control/machine/{mock_hash}/stream_logs", + ) + await websocket.send_json({"auth": "auth is disabled"}) + response = await websocket.receive_json() + assert response == {"status": "connected"} + + await fake_queue.put(("stdout", "this is a first log entry")) + response = await websocket.receive_json() + assert response == {"message": "this is a first log entry", "type": "stdout"} + + await fake_queue.put(("stdout", "this is a second log entry")) + response = await websocket.receive_json() + assert response == {"message": "this is a second log entry", "type": "stdout"} + await websocket.close() + assert websocket.closed