Skip to content

Commit

Permalink
merge logs endpoint and new endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
olethanh committed Nov 19, 2024
1 parent a31919d commit 3342851
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 130 deletions.
4 changes: 1 addition & 3 deletions src/aleph/vm/orchestrator/supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
operate_confidential_measurement,
operate_erase,
operate_expire,
operate_logs,
operate_logs_json,
operate_reboot,
operate_stop,
Expand Down Expand Up @@ -105,8 +104,7 @@ def setup_webapp():
# /control APIs are used to control the VMs and access their logs
web.post("/control/allocation/notify", notify_allocation),
web.get("/control/machine/{ref}/stream_logs", stream_logs),
web.get("/control/machine/{ref}/logs.json", operate_logs_json),
web.get("/control/machine/{ref}/logs", operate_logs),
web.get("/control/machine/{ref}/logs", operate_logs_json),
web.post("/control/machine/{ref}/expire", operate_expire),
web.post("/control/machine/{ref}/stop", operate_stop),
web.post("/control/machine/{ref}/erase", operate_erase),
Expand Down
39 changes: 11 additions & 28 deletions src/aleph/vm/orchestrator/views/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,40 +101,23 @@ async def stream_logs(request: web.Request) -> web.StreamResponse:
execution.vm.unregister_queue(queue)


@cors_allow_all
@require_jwk_authentication
async def operate_logs(request: web.Request, authenticated_sender: str) -> web.StreamResponse:
"""Logs of a VM (not streaming)"""
vm_hash = get_itemhash_or_400(request.match_info)
pool: VmPool = request.app["vm_pool"]
execution = get_execution_or_404(vm_hash, pool=pool)
if not is_sender_authorized(authenticated_sender, execution.message):
return web.Response(status=403, body="Unauthorized sender")

response = web.StreamResponse()
response.headers["Content-Type"] = "text/plain"
await response.prepare(request)

for entry in execution.vm.past_logs():
msg = f'{entry["__REALTIME_TIMESTAMP"].isoformat()}> {entry["MESSAGE"]}'
await response.write(msg.encode())
await response.write_eof()
return response


@cors_allow_all
@require_jwk_authentication
async def operate_logs_json(request: web.Request, authenticated_sender: str) -> web.StreamResponse:
"""Logs of a VM (not streaming)"""
"""Logs of a VM (not streaming) as json"""
vm_hash = get_itemhash_or_400(request.match_info)

# This endpoint allow logs for past executions, so we look into the database if any execution by that hash
# occurred, which we can then use to look for right
record = await metrics.get_last_record_for_vm(vm_hash=vm_hash)
if not record:
raise aiohttp.web_exceptions.HTTPNotFound(body="No execution found for this VM")

message = get_message_executable_content(json.loads(record.message))
# occurred, which we can then use to look for rights. We still check in the pool first, it is faster
pool: VmPool = request.app["vm_pool"]
execution = pool.executions.get(vm_hash)
if execution:
message = execution.message

Check warning on line 115 in src/aleph/vm/orchestrator/views/operator.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/orchestrator/views/operator.py#L115

Added line #L115 was not covered by tests
else:
record = await metrics.get_last_record_for_vm(vm_hash=vm_hash)
if not record:
raise aiohttp.web_exceptions.HTTPNotFound(body="No execution found for this VM")

Check warning on line 119 in src/aleph/vm/orchestrator/views/operator.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/orchestrator/views/operator.py#L119

Added line #L119 was not covered by tests
message = get_message_executable_content(json.loads(record.message))
if not is_sender_authorized(authenticated_sender, message):
return web.Response(status=403, body="Unauthorized sender")

Expand Down
101 changes: 2 additions & 99 deletions tests/supervisor/views/test_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,103 +228,6 @@ class FakeVmPool:
assert pool.systemd_manager.restart.call_count == 1


@pytest.mark.asyncio
async def test_logs(aiohttp_client, mocker):
mock_address = "mock_address"
mock_hash = "fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_"
mocker.patch(
"aleph.vm.orchestrator.views.authentication.authenticate_jwk",
return_value=mock_address,
)

# noinspection PyMissingConstructor
class FakeVmPool(VmPool):
def __init__(self):
pass

executions = {
mock_hash: mocker.Mock(
vm_hash=mock_hash,
message=mocker.Mock(address=mock_address),
is_confidential=False,
is_running=True,
vm=mocker.Mock(
past_logs=mocker.Mock(
return_value=[
EntryDict(
SYSLOG_IDENTIFIER="stdout",
MESSAGE="logline1",
__REALTIME_TIMESTAMP=datetime.datetime(2020, 10, 12, 1, 2),
),
EntryDict(
SYSLOG_IDENTIFIER="stdout",
MESSAGE="logline2",
__REALTIME_TIMESTAMP=datetime.datetime(2020, 10, 12, 1, 3),
),
]
)
),
),
}
systemd_manager = mocker.Mock(restart=mocker.Mock())

app = setup_webapp()
pool = FakeVmPool()
app["vm_pool"] = pool
app["pubsub"] = FakeVmPool()
client = await aiohttp_client(app)
response = await client.get(
f"/control/machine/{mock_hash}/logs",
)
assert response.status == 200
assert await response.text() == "2020-10-12T01:02:00> logline12020-10-12T01:03:00> logline2"


@pytest.mark.asyncio
async def test_websocket_logs(aiohttp_client, mocker):
mock_address = "mock_address"
mock_hash = "fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_"
mocker.patch(
"aleph.vm.orchestrator.views.operator.authenticate_websocket_message",
return_value=mock_address,
)
fake_queue: Queue[tuple[str, str]] = asyncio.Queue()
await fake_queue.put(("stdout", "this is a first log entry"))

fakeVmPool = mocker.Mock(
executions={
mock_hash: mocker.Mock(
vm_hash=mock_hash,
message=mocker.Mock(address=mock_address),
is_confidential=False,
is_running=True,
vm=mocker.Mock(
get_log_queue=mocker.Mock(return_value=fake_queue),
),
),
},
)
app = setup_webapp()
app["vm_pool"] = fakeVmPool
app["pubsub"] = None
client = await aiohttp_client(app)
websocket = await client.ws_connect(
f"/control/machine/{mock_hash}/stream_logs",
)
await websocket.send_json({"auth": "auth is disabled"})
response = await websocket.receive_json()
assert response == {"status": "connected"}

response = await websocket.receive_json()
assert response == {"message": "this is a first log entry", "type": "stdout"}

await fake_queue.put(("stdout", "this is a second log entry"))
response = await websocket.receive_json()
assert response == {"message": "this is a second log entry", "type": "stdout"}
await websocket.close()
assert websocket.closed


@pytest.mark.asyncio
async def test_websocket_logs_missing_auth(aiohttp_client, mocker):
mock_address = "mock_address"
Expand Down Expand Up @@ -538,12 +441,12 @@ async def test_get_past_logs(aiohttp_client, mocker, patch_datetime_now):
)

app = setup_webapp()
pool = mocker.MagicMock()
pool = mocker.MagicMock(executions={})
app["vm_pool"] = pool
app["pubsub"] = mocker.MagicMock()
client = await aiohttp_client(app)
response = await client.get(
f"/control/machine/{mock_hash}/logs.json",
f"/control/machine/{mock_hash}/logs",
)

assert response.status == 200
Expand Down

0 comments on commit 3342851

Please sign in to comment.