Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add json logs endpoint which works for finished execution #718

Merged
merged 4 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions src/aleph/vm/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,10 @@ class Settings(BaseSettings):
CONNECTIVITY_DNS_HOSTNAME = "example.org"

USE_JAILER = True
# System logs make boot ~2x slower
PRINT_SYSTEM_LOGS = False
# Changelog: PRINT_SYSTEM_LOGS use to print the MicroVM logs with the supervisor output.
# They are now in separate journald entries, disabling the settings disable the logs output of Firecracker VM (only)
# via the serial console. This break the logs endpoint for program, as such disabling it in prod is not recommended.
PRINT_SYSTEM_LOGS = True
nesitor marked this conversation as resolved.
Show resolved Hide resolved
IGNORE_TRACEBACK_FROM_DIAGNOSTICS = True
LOG_LEVEL = "WARNING"
DEBUG_ASYNCIO = False
Expand Down
9 changes: 9 additions & 0 deletions src/aleph/vm/orchestrator/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,3 +115,12 @@
executions = result.scalars().all()
await session.commit()
return executions


async def get_last_record_for_vm(vm_hash) -> ExecutionRecord | None:
"""Get the execution records from the database."""
async with AsyncSessionMaker() as session: # Use AsyncSession in a context manager
result = await session.execute(

Check warning on line 123 in src/aleph/vm/orchestrator/metrics.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/orchestrator/metrics.py#L122-L123

Added lines #L122 - L123 were not covered by tests
select(ExecutionRecord).where(ExecutionRecord.vm_hash == vm_hash).limit(1)
) # Use execute for querying
return result.scalar()

Check warning on line 126 in src/aleph/vm/orchestrator/metrics.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/orchestrator/metrics.py#L126

Added line #L126 was not covered by tests
4 changes: 2 additions & 2 deletions src/aleph/vm/orchestrator/supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
operate_confidential_measurement,
operate_erase,
operate_expire,
operate_logs,
operate_logs_json,
operate_reboot,
operate_stop,
stream_logs,
Expand Down Expand Up @@ -104,7 +104,7 @@ def setup_webapp():
# /control APIs are used to control the VMs and access their logs
web.post("/control/allocation/notify", notify_allocation),
web.get("/control/machine/{ref}/stream_logs", stream_logs),
web.get("/control/machine/{ref}/logs", operate_logs),
web.get("/control/machine/{ref}/logs", operate_logs_json),
web.post("/control/machine/{ref}/expire", operate_expire),
web.post("/control/machine/{ref}/stop", operate_stop),
web.post("/control/machine/{ref}/erase", operate_erase),
Expand Down
3 changes: 1 addition & 2 deletions src/aleph/vm/orchestrator/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from aiohttp import web
from aleph_message.models import (
AlephMessage,
ItemHash,
PaymentType,
ProgramMessage,
parse_message,
Expand All @@ -23,7 +22,7 @@
from aleph.vm.pool import VmPool
from aleph.vm.utils import create_task_log_exceptions

from .messages import get_message_status, load_updated_message
from .messages import get_message_status
from .payment import (
compute_required_balance,
compute_required_flow,
Expand Down
50 changes: 41 additions & 9 deletions src/aleph/vm/orchestrator/views/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,20 @@
from aleph.vm.conf import settings
from aleph.vm.controllers.qemu.client import QemuVmClient
from aleph.vm.models import VmExecution
from aleph.vm.orchestrator import metrics
from aleph.vm.orchestrator.custom_logs import set_vm_for_logging
from aleph.vm.orchestrator.run import create_vm_execution_or_raise_http_error
from aleph.vm.orchestrator.views.authentication import (
authenticate_websocket_message,
require_jwk_authentication,
)
from aleph.vm.pool import VmPool
from aleph.vm.utils import cors_allow_all, dumps_for_json
from aleph.vm.utils import (
cors_allow_all,
dumps_for_json,
get_message_executable_content,
)
from aleph.vm.utils.logs import get_past_vm_logs

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -99,22 +105,48 @@

@cors_allow_all
@require_jwk_authentication
async def operate_logs(request: web.Request, authenticated_sender: str) -> web.StreamResponse:
"""Logs of a VM (not streaming)"""
async def operate_logs_json(request: web.Request, authenticated_sender: str) -> web.StreamResponse:
"""Logs of a VM (not streaming) as json"""
vm_hash = get_itemhash_or_400(request.match_info)
with set_vm_for_logging(vm_hash=vm_hash):
# This endpoint allow logs for past executions, so we look into the database if any execution by that hash
# occurred, which we can then use to look for rights. We still check in the pool first, it is faster
pool: VmPool = request.app["vm_pool"]
execution = get_execution_or_404(vm_hash, pool=pool)
if not is_sender_authorized(authenticated_sender, execution.message):
execution = pool.executions.get(vm_hash)
if execution:
message = execution.message

Check warning on line 117 in src/aleph/vm/orchestrator/views/operator.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/orchestrator/views/operator.py#L117

Added line #L117 was not covered by tests
else:
record = await metrics.get_last_record_for_vm(vm_hash=vm_hash)
if not record:
raise aiohttp.web_exceptions.HTTPNotFound(body="No execution found for this VM")

Check warning on line 121 in src/aleph/vm/orchestrator/views/operator.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/orchestrator/views/operator.py#L121

Added line #L121 was not covered by tests
message = get_message_executable_content(json.loads(record.message))
if not is_sender_authorized(authenticated_sender, message):
return web.Response(status=403, body="Unauthorized sender")

_journal_stdout_name = f"vm-{vm_hash}-stdout"
_journal_stderr_name = f"vm-{vm_hash}-stderr"

response = web.StreamResponse()
response.headers["Content-Type"] = "text/plain"
response.headers["Transfer-encoding"] = "chunked"
response.headers["Content-Type"] = "application/json"
await response.prepare(request)
await response.write(b"[")

first = True
for entry in get_past_vm_logs(_journal_stdout_name, _journal_stderr_name):
if not first:
await response.write(b",\n")
first = False
log_type = "stdout" if entry["SYSLOG_IDENTIFIER"] == _journal_stdout_name else "stderr"
msg = {
"SYSLOG_IDENTIFIER": entry["SYSLOG_IDENTIFIER"],
"MESSAGE": entry["MESSAGE"],
"file": log_type,
"__REALTIME_TIMESTAMP": entry["__REALTIME_TIMESTAMP"],
}
await response.write(dumps_for_json(msg).encode())
await response.write(b"]")

for entry in execution.vm.past_logs():
msg = f'{entry["__REALTIME_TIMESTAMP"].isoformat()}> {entry["MESSAGE"]}'
await response.write(msg.encode())
await response.write_eof()
return response

Expand Down
207 changes: 109 additions & 98 deletions tests/supervisor/views/test_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@
import aiohttp
import pytest
from aiohttp.test_utils import TestClient
from aleph_message.models import ItemHash
from aleph_message.models import ItemHash, ProgramMessage

from aleph.vm.conf import settings
from aleph.vm.orchestrator.metrics import ExecutionRecord
from aleph.vm.orchestrator.supervisor import setup_webapp
from aleph.vm.pool import VmPool
from aleph.vm.storage import get_message
Expand Down Expand Up @@ -303,103 +304,6 @@ class FakeVmPool:
assert pool.systemd_manager.restart.call_count == 1


@pytest.mark.asyncio
async def test_logs(aiohttp_client, mocker):
mock_address = "mock_address"
mock_hash = "fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_"
mocker.patch(
"aleph.vm.orchestrator.views.authentication.authenticate_jwk",
return_value=mock_address,
)

# noinspection PyMissingConstructor
class FakeVmPool(VmPool):
def __init__(self):
pass

executions = {
mock_hash: mocker.Mock(
vm_hash=mock_hash,
message=mocker.Mock(address=mock_address),
is_confidential=False,
is_running=True,
vm=mocker.Mock(
past_logs=mocker.Mock(
return_value=[
EntryDict(
SYSLOG_IDENTIFIER="stdout",
MESSAGE="logline1",
__REALTIME_TIMESTAMP=datetime.datetime(2020, 10, 12, 1, 2),
),
EntryDict(
SYSLOG_IDENTIFIER="stdout",
MESSAGE="logline2",
__REALTIME_TIMESTAMP=datetime.datetime(2020, 10, 12, 1, 3),
),
]
)
),
),
}
systemd_manager = mocker.Mock(restart=mocker.Mock())

app = setup_webapp()
pool = FakeVmPool()
app["vm_pool"] = pool
app["pubsub"] = FakeVmPool()
client = await aiohttp_client(app)
response = await client.get(
f"/control/machine/{mock_hash}/logs",
)
assert response.status == 200
assert await response.text() == "2020-10-12T01:02:00> logline12020-10-12T01:03:00> logline2"


@pytest.mark.asyncio
async def test_websocket_logs(aiohttp_client, mocker):
mock_address = "mock_address"
mock_hash = "fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_"
mocker.patch(
"aleph.vm.orchestrator.views.operator.authenticate_websocket_message",
return_value=mock_address,
)
fake_queue: Queue[tuple[str, str]] = asyncio.Queue()
await fake_queue.put(("stdout", "this is a first log entry"))

fakeVmPool = mocker.Mock(
executions={
mock_hash: mocker.Mock(
vm_hash=mock_hash,
message=mocker.Mock(address=mock_address),
is_confidential=False,
is_running=True,
vm=mocker.Mock(
get_log_queue=mocker.Mock(return_value=fake_queue),
),
),
},
)
app = setup_webapp()
app["vm_pool"] = fakeVmPool
app["pubsub"] = None
client = await aiohttp_client(app)
websocket = await client.ws_connect(
f"/control/machine/{mock_hash}/stream_logs",
)
await websocket.send_json({"auth": "auth is disabled"})
response = await websocket.receive_json()
assert response == {"status": "connected"}

response = await websocket.receive_json()
assert response == {"message": "this is a first log entry", "type": "stdout"}

await fake_queue.put(("stdout", "this is a second log entry"))
response = await websocket.receive_json()
assert response == {"message": "this is a second log entry", "type": "stdout"}
await websocket.close()
assert websocket.closed


@pytest.mark.asyncio
async def test_websocket_logs_missing_auth(aiohttp_client, mocker):
mock_address = "mock_address"
Expand Down Expand Up @@ -529,3 +433,110 @@ async def test_websocket_logs_good_auth(aiohttp_client, mocker, patch_datetime_n

await websocket.close()
assert websocket.closed


@pytest.mark.asyncio
async def test_get_past_logs(aiohttp_client, mocker, patch_datetime_now):
mock_address = "0x40684b43B88356F62DCc56017547B6A7AC68780B"
mock_hash = "fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_"
mocker.patch(
"aleph.vm.orchestrator.views.authentication.authenticate_jwk",
return_value=mock_address,
)
mocker.patch(
"aleph.vm.orchestrator.metrics.get_last_record_for_vm",
return_value=ExecutionRecord(
message="""{
"address": "0x40684b43B88356F62DCc56017547B6A7AC68780B",
"time": 1720816744.639107,
"allow_amend": false,
"metadata": null,
"authorized_keys": null,
"variables": null,
"environment": {
"reproducible": false,
"internet": true,
"aleph_api": true,
"shared_cache": false
},
"resources": {
"vcpus": 1,
"memory": 1024,
"seconds": 300,
"published_ports": null
},
"payment": null,
"requirements": null,
"volumes": [
{
"comment": null,
"mount": "/opt/packages",
"ref": "7338478721e2e966da6395dbfa37dab7b017b48da55b1be22d4eccf3487b836c",
"use_latest": true
}
],
"replaces": null,
"type": "vm-function",
"code": {
"encoding": "squashfs",
"entrypoint": "main:app",
"ref": "c4253bf514d2e0a271456c9023c4b3f13f324e53c176e9ec29b98b5972b02bc7",
"interface": null,
"args": null,
"use_latest": true
},
"runtime": {
"ref": "63f07193e6ee9d207b7d1fcf8286f9aee34e6f12f101d2ec77c1229f92964696",
"use_latest": true,
"comment": ""
},
"data": null,
"export": null,
"on": {
"http": true,
"message": null,
"persistent": false
}
}"""
),
)
mocker.patch(
"aleph.vm.orchestrator.views.operator.get_past_vm_logs",
return_value=[
EntryDict(
SYSLOG_IDENTIFIER=f"vm-{mock_hash}-stdout",
MESSAGE="logline1",
__REALTIME_TIMESTAMP=datetime.datetime(2020, 10, 12, 1, 2),
),
EntryDict(
SYSLOG_IDENTIFIER=f"vm-{mock_hash}-stderr",
MESSAGE="logline2",
__REALTIME_TIMESTAMP=datetime.datetime(2020, 10, 12, 1, 3),
),
],
)

app = setup_webapp()
pool = mocker.MagicMock(executions={})
app["vm_pool"] = pool
app["pubsub"] = mocker.MagicMock()
client = await aiohttp_client(app)
response = await client.get(
f"/control/machine/{mock_hash}/logs",
)

assert response.status == 200
assert await response.json() == [
{
"MESSAGE": "logline1",
"SYSLOG_IDENTIFIER": "vm-fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_-stdout",
"__REALTIME_TIMESTAMP": "2020-10-12 01:02:00",
"file": "stdout",
},
{
"MESSAGE": "logline2",
"SYSLOG_IDENTIFIER": "vm-fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_fake_vm_-stderr",
"__REALTIME_TIMESTAMP": "2020-10-12 01:03:00",
"file": "stderr",
},
]