Skip to content

Commit

Permalink
Add json logs endpoint which works for finished execution (#718)
Browse files Browse the repository at this point in the history
* Add json logs endpoint which allow for past record

* review comment

* merge logs endpoint and new endpoint

* set logging system
  • Loading branch information
olethanh authored Nov 26, 2024
1 parent f1bad18 commit 6836d6d
Show file tree
Hide file tree
Showing 6 changed files with 166 additions and 113 deletions.
6 changes: 4 additions & 2 deletions src/aleph/vm/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,10 @@ class Settings(BaseSettings):
CONNECTIVITY_DNS_HOSTNAME = "example.org"

USE_JAILER = True
# System logs make boot ~2x slower
PRINT_SYSTEM_LOGS = False
# Changelog: PRINT_SYSTEM_LOGS use to print the MicroVM logs with the supervisor output.
# They are now in separate journald entries, disabling the settings disable the logs output of Firecracker VM (only)
# via the serial console. This break the logs endpoint for program, as such disabling it in prod is not recommended.
PRINT_SYSTEM_LOGS = True
IGNORE_TRACEBACK_FROM_DIAGNOSTICS = True
LOG_LEVEL = "WARNING"
DEBUG_ASYNCIO = False
Expand Down
9 changes: 9 additions & 0 deletions src/aleph/vm/orchestrator/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,3 +115,12 @@ async def get_execution_records() -> Iterable[ExecutionRecord]:
executions = result.scalars().all()
await session.commit()
return executions


async def get_last_record_for_vm(vm_hash) -> ExecutionRecord | None:
"""Get the execution records from the database."""
async with AsyncSessionMaker() as session: # Use AsyncSession in a context manager
result = await session.execute(
select(ExecutionRecord).where(ExecutionRecord.vm_hash == vm_hash).limit(1)
) # Use execute for querying
return result.scalar()
4 changes: 2 additions & 2 deletions src/aleph/vm/orchestrator/supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
operate_confidential_measurement,
operate_erase,
operate_expire,
operate_logs,
operate_logs_json,
operate_reboot,
operate_stop,
stream_logs,
Expand Down Expand Up @@ -104,7 +104,7 @@ def setup_webapp():
# /control APIs are used to control the VMs and access their logs
web.post("/control/allocation/notify", notify_allocation),
web.get("/control/machine/{ref}/stream_logs", stream_logs),
web.get("/control/machine/{ref}/logs", operate_logs),
web.get("/control/machine/{ref}/logs", operate_logs_json),
web.post("/control/machine/{ref}/expire", operate_expire),
web.post("/control/machine/{ref}/stop", operate_stop),
web.post("/control/machine/{ref}/erase", operate_erase),
Expand Down
3 changes: 1 addition & 2 deletions src/aleph/vm/orchestrator/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from aiohttp import web
from aleph_message.models import (
AlephMessage,
ItemHash,
PaymentType,
ProgramMessage,
parse_message,
Expand All @@ -23,7 +22,7 @@
from aleph.vm.pool import VmPool
from aleph.vm.utils import create_task_log_exceptions

from .messages import get_message_status, load_updated_message
from .messages import get_message_status
from .payment import (
compute_required_balance,
compute_required_flow,
Expand Down
50 changes: 41 additions & 9 deletions src/aleph/vm/orchestrator/views/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,20 @@
from aleph.vm.conf import settings
from aleph.vm.controllers.qemu.client import QemuVmClient
from aleph.vm.models import VmExecution
from aleph.vm.orchestrator import metrics
from aleph.vm.orchestrator.custom_logs import set_vm_for_logging
from aleph.vm.orchestrator.run import create_vm_execution_or_raise_http_error
from aleph.vm.orchestrator.views.authentication import (
authenticate_websocket_message,
require_jwk_authentication,
)
from aleph.vm.pool import VmPool
from aleph.vm.utils import cors_allow_all, dumps_for_json
from aleph.vm.utils import (
cors_allow_all,
dumps_for_json,
get_message_executable_content,
)
from aleph.vm.utils.logs import get_past_vm_logs

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -99,22 +105,48 @@ async def stream_logs(request: web.Request) -> web.StreamResponse:

@cors_allow_all
@require_jwk_authentication
async def operate_logs(request: web.Request, authenticated_sender: str) -> web.StreamResponse:
"""Logs of a VM (not streaming)"""
async def operate_logs_json(request: web.Request, authenticated_sender: str) -> web.StreamResponse:
"""Logs of a VM (not streaming) as json"""
vm_hash = get_itemhash_or_400(request.match_info)
with set_vm_for_logging(vm_hash=vm_hash):
# This endpoint allow logs for past executions, so we look into the database if any execution by that hash
# occurred, which we can then use to look for rights. We still check in the pool first, it is faster
pool: VmPool = request.app["vm_pool"]
execution = get_execution_or_404(vm_hash, pool=pool)
if not is_sender_authorized(authenticated_sender, execution.message):
execution = pool.executions.get(vm_hash)
if execution:
message = execution.message
else:
record = await metrics.get_last_record_for_vm(vm_hash=vm_hash)
if not record:
raise aiohttp.web_exceptions.HTTPNotFound(body="No execution found for this VM")
message = get_message_executable_content(json.loads(record.message))
if not is_sender_authorized(authenticated_sender, message):
return web.Response(status=403, body="Unauthorized sender")

_journal_stdout_name = f"vm-{vm_hash}-stdout"
_journal_stderr_name = f"vm-{vm_hash}-stderr"

response = web.StreamResponse()
response.headers["Content-Type"] = "text/plain"
response.headers["Transfer-encoding"] = "chunked"
response.headers["Content-Type"] = "application/json"
await response.prepare(request)
await response.write(b"[")

first = True
for entry in get_past_vm_logs(_journal_stdout_name, _journal_stderr_name):
if not first:
await response.write(b",\n")
first = False
log_type = "stdout" if entry["SYSLOG_IDENTIFIER"] == _journal_stdout_name else "stderr"
msg = {
"SYSLOG_IDENTIFIER": entry["SYSLOG_IDENTIFIER"],
"MESSAGE": entry["MESSAGE"],
"file": log_type,
"__REALTIME_TIMESTAMP": entry["__REALTIME_TIMESTAMP"],
}
await response.write(dumps_for_json(msg).encode())
await response.write(b"]")

for entry in execution.vm.past_logs():
msg = f'{entry["__REALTIME_TIMESTAMP"].isoformat()}> {entry["MESSAGE"]}'
await response.write(msg.encode())
await response.write_eof()
return response

Expand Down
207 changes: 109 additions & 98 deletions tests/supervisor/views/test_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@
import aiohttp
import pytest
from aiohttp.test_utils import TestClient
from aleph_message.models import ItemHash
from aleph_message.models import ItemHash, ProgramMessage

from aleph.vm.conf import settings
from aleph.vm.orchestrator.metrics import ExecutionRecord
from aleph.vm.orchestrator.supervisor import setup_webapp
from aleph.vm.pool import VmPool
from aleph.vm.storage import get_message
Expand Down Expand Up @@ -303,103 +304,6 @@ class FakeVmPool:
assert pool.systemd_manager.restart.call_count == 1


@pytest.mark.asyncio
async def test_logs(aiohttp_client, mocker):
mock_address = "mock_address"
mock_hash = "fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_"
mocker.patch(
"aleph.vm.orchestrator.views.authentication.authenticate_jwk",
return_value=mock_address,
)

# noinspection PyMissingConstructor
class FakeVmPool(VmPool):
def __init__(self):
pass

executions = {
mock_hash: mocker.Mock(
vm_hash=mock_hash,
message=mocker.Mock(address=mock_address),
is_confidential=False,
is_running=True,
vm=mocker.Mock(
past_logs=mocker.Mock(
return_value=[
EntryDict(
SYSLOG_IDENTIFIER="stdout",
MESSAGE="logline1",
__REALTIME_TIMESTAMP=datetime.datetime(2020, 10, 12, 1, 2),
),
EntryDict(
SYSLOG_IDENTIFIER="stdout",
MESSAGE="logline2",
__REALTIME_TIMESTAMP=datetime.datetime(2020, 10, 12, 1, 3),
),
]
)
),
),
}
systemd_manager = mocker.Mock(restart=mocker.Mock())

app = setup_webapp()
pool = FakeVmPool()
app["vm_pool"] = pool
app["pubsub"] = FakeVmPool()
client = await aiohttp_client(app)
response = await client.get(
f"/control/machine/{mock_hash}/logs",
)
assert response.status == 200
assert await response.text() == "2020-10-12T01:02:00> logline12020-10-12T01:03:00> logline2"


@pytest.mark.asyncio
async def test_websocket_logs(aiohttp_client, mocker):
mock_address = "mock_address"
mock_hash = "fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_"
mocker.patch(
"aleph.vm.orchestrator.views.operator.authenticate_websocket_message",
return_value=mock_address,
)
fake_queue: Queue[tuple[str, str]] = asyncio.Queue()
await fake_queue.put(("stdout", "this is a first log entry"))

fakeVmPool = mocker.Mock(
executions={
mock_hash: mocker.Mock(
vm_hash=mock_hash,
message=mocker.Mock(address=mock_address),
is_confidential=False,
is_running=True,
vm=mocker.Mock(
get_log_queue=mocker.Mock(return_value=fake_queue),
),
),
},
)
app = setup_webapp()
app["vm_pool"] = fakeVmPool
app["pubsub"] = None
client = await aiohttp_client(app)
websocket = await client.ws_connect(
f"/control/machine/{mock_hash}/stream_logs",
)
await websocket.send_json({"auth": "auth is disabled"})
response = await websocket.receive_json()
assert response == {"status": "connected"}

response = await websocket.receive_json()
assert response == {"message": "this is a first log entry", "type": "stdout"}

await fake_queue.put(("stdout", "this is a second log entry"))
response = await websocket.receive_json()
assert response == {"message": "this is a second log entry", "type": "stdout"}
await websocket.close()
assert websocket.closed


@pytest.mark.asyncio
async def test_websocket_logs_missing_auth(aiohttp_client, mocker):
mock_address = "mock_address"
Expand Down Expand Up @@ -529,3 +433,110 @@ async def test_websocket_logs_good_auth(aiohttp_client, mocker, patch_datetime_n

await websocket.close()
assert websocket.closed


@pytest.mark.asyncio
async def test_get_past_logs(aiohttp_client, mocker, patch_datetime_now):
mock_address = "0x40684b43B88356F62DCc56017547B6A7AC68780B"
mock_hash = "fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_"
mocker.patch(
"aleph.vm.orchestrator.views.authentication.authenticate_jwk",
return_value=mock_address,
)
mocker.patch(
"aleph.vm.orchestrator.metrics.get_last_record_for_vm",
return_value=ExecutionRecord(
message="""{
"address": "0x40684b43B88356F62DCc56017547B6A7AC68780B",
"time": 1720816744.639107,
"allow_amend": false,
"metadata": null,
"authorized_keys": null,
"variables": null,
"environment": {
"reproducible": false,
"internet": true,
"aleph_api": true,
"shared_cache": false
},
"resources": {
"vcpus": 1,
"memory": 1024,
"seconds": 300,
"published_ports": null
},
"payment": null,
"requirements": null,
"volumes": [
{
"comment": null,
"mount": "/opt/packages",
"ref": "7338478721e2e966da6395dbfa37dab7b017b48da55b1be22d4eccf3487b836c",
"use_latest": true
}
],
"replaces": null,
"type": "vm-function",
"code": {
"encoding": "squashfs",
"entrypoint": "main:app",
"ref": "c4253bf514d2e0a271456c9023c4b3f13f324e53c176e9ec29b98b5972b02bc7",
"interface": null,
"args": null,
"use_latest": true
},
"runtime": {
"ref": "63f07193e6ee9d207b7d1fcf8286f9aee34e6f12f101d2ec77c1229f92964696",
"use_latest": true,
"comment": ""
},
"data": null,
"export": null,
"on": {
"http": true,
"message": null,
"persistent": false
}
}"""
),
)
mocker.patch(
"aleph.vm.orchestrator.views.operator.get_past_vm_logs",
return_value=[
EntryDict(
SYSLOG_IDENTIFIER=f"vm-{mock_hash}-stdout",
MESSAGE="logline1",
__REALTIME_TIMESTAMP=datetime.datetime(2020, 10, 12, 1, 2),
),
EntryDict(
SYSLOG_IDENTIFIER=f"vm-{mock_hash}-stderr",
MESSAGE="logline2",
__REALTIME_TIMESTAMP=datetime.datetime(2020, 10, 12, 1, 3),
),
],
)

app = setup_webapp()
pool = mocker.MagicMock(executions={})
app["vm_pool"] = pool
app["pubsub"] = mocker.MagicMock()
client = await aiohttp_client(app)
response = await client.get(
f"/control/machine/{mock_hash}/logs",
)

assert response.status == 200
assert await response.json() == [
{
"MESSAGE": "logline1",
"SYSLOG_IDENTIFIER": "vm-fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_-stdout",
"__REALTIME_TIMESTAMP": "2020-10-12 01:02:00",
"file": "stdout",
},
{
"MESSAGE": "logline2",
"SYSLOG_IDENTIFIER": "vm-fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_-stderr",
"__REALTIME_TIMESTAMP": "2020-10-12 01:03:00",
"file": "stderr",
},
]

0 comments on commit 6836d6d

Please sign in to comment.