Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Problem: Websocket were required to fetch logs #645

Merged
merged 14 commits into from
Jul 4, 2024
5 changes: 4 additions & 1 deletion src/aleph/vm/controllers/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from aleph.vm.controllers.firecracker.snapshots import CompressedDiskVolumeSnapshot
from aleph.vm.network.interfaces import TapInterface
from aleph.vm.utils.logs import make_logs_queue
from aleph.vm.utils.logs import get_past_vm_logs, make_logs_queue

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -118,3 +118,6 @@ def _journal_stdout_name(self) -> str:
@property
def _journal_stderr_name(self) -> str:
return f"vm-{self.vm_hash}-stderr"

def past_logs(self):
yield from get_past_vm_logs(self._journal_stdout_name, self._journal_stderr_name)
4 changes: 0 additions & 4 deletions src/aleph/vm/controllers/qemu/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,3 @@ async def teardown(self):
if self.tap_interface:
await self.tap_interface.delete()
await self.stop_guest_api()

def print_logs(self) -> None:
"""Print logs to our output for debugging"""
queue = self.get_log_queue()
4 changes: 3 additions & 1 deletion src/aleph/vm/orchestrator/supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
from .views.operator import (
operate_erase,
operate_expire,
operate_logs,
operate_reboot,
operate_stop,
stream_logs,
Expand Down Expand Up @@ -100,7 +101,8 @@ def setup_webapp():
web.get("/about/config", about_config),
# /control APIs are used to control the VMs and access their logs
web.post("/control/allocation/notify", notify_allocation),
web.get("/control/machine/{ref}/logs", stream_logs),
web.get("/control/machine/{ref}/stream_logs", stream_logs),
web.get("/control/machine/{ref}/logs", operate_logs),
web.post("/control/machine/{ref}/expire", operate_expire),
web.post("/control/machine/{ref}/stop", operate_stop),
web.post("/control/machine/{ref}/erase", operate_erase),
Expand Down
23 changes: 23 additions & 0 deletions src/aleph/vm/orchestrator/views/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ async def stream_logs(request: web.Request) -> web.StreamResponse:
queue = None
try:
ws = web.WebSocketResponse()
logger.info(f"starting websocket: {request.path}")
await ws.prepare(request)
try:
await authenticate_websocket_for_vm_or_403(execution, vm_hash, ws)
Expand All @@ -75,6 +76,7 @@ async def stream_logs(request: web.Request) -> web.StreamResponse:
while True:
log_type, message = await queue.get()
assert log_type in ("stdout", "stderr")
logger.debug(message)

await ws.send_json({"type": log_type, "message": message})

Expand All @@ -87,6 +89,27 @@ async def stream_logs(request: web.Request) -> web.StreamResponse:
execution.vm.unregister_queue(queue)


@cors_allow_all
@require_jwk_authentication
async def operate_logs(request: web.Request, authenticated_sender: str) -> web.StreamResponse:
hoh marked this conversation as resolved.
Show resolved Hide resolved
"""Logs of a VM (not streaming)"""
vm_hash = get_itemhash_or_400(request.match_info)
pool: VmPool = request.app["vm_pool"]
execution = get_execution_or_404(vm_hash, pool=pool)
if not is_sender_authorized(authenticated_sender, execution.message):
return web.Response(status=403, body="Unauthorized sender")

response = web.StreamResponse()
response.headers["Content-Type"] = "text/plain"
await response.prepare(request)

for entry in execution.vm.past_logs():
msg = f'{entry["__REALTIME_TIMESTAMP"].isoformat()}> {entry["MESSAGE"]}'
await response.write(msg.encode())
olethanh marked this conversation as resolved.
Show resolved Hide resolved
await response.write_eof()
return response


async def authenticate_websocket_for_vm_or_403(execution: VmExecution, vm_hash: ItemHash, ws: web.WebSocketResponse):
"""Authenticate a websocket connection.

Expand Down
26 changes: 25 additions & 1 deletion src/aleph/vm/utils/logs.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
import logging
from typing import Callable, TypedDict
from datetime import datetime
from typing import Callable, Generator, TypedDict

from systemd import journal

Expand All @@ -10,6 +11,7 @@
class EntryDict(TypedDict):
SYSLOG_IDENTIFIER: str
MESSAGE: str
__REALTIME_TIMESTAMP: datetime


def make_logs_queue(stdout_identifier, stderr_identifier, skip_past=False) -> tuple[asyncio.Queue, Callable[[], None]]:
Expand Down Expand Up @@ -56,3 +58,25 @@ def do_cancel():
r.close()

return queue, do_cancel


def get_past_vm_logs(stdout_identifier, stderr_identifier) -> Generator[EntryDict, None, None]:
"""Get existing log for the VM identifiers.

@param stdout_identifier: journald identifier for process stdout
@param stderr_identifier: journald identifier for process stderr
@return: an iterator of log entry

Works by creating a journald reader, and using `add_reader` to call a callback when
data is available for reading.

For more information refer to the sd-journal(3) manpage
and systemd.journal module documentation.
"""
r = journal.Reader()
r.add_match(SYSLOG_IDENTIFIER=stdout_identifier)
r.add_match(SYSLOG_IDENTIFIER=stderr_identifier)

r.seek_head()
for entry in r:
yield entry
58 changes: 33 additions & 25 deletions tests/supervisor/test_authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import pytest
from aiohttp import web
from eth_account.datastructures import SignedMessage
from eth_account.signers.local import LocalAccount
from jwcrypto import jwk, jws
from jwcrypto.common import base64url_decode
from jwcrypto.jwa import JWA
Expand Down Expand Up @@ -257,15 +258,35 @@ async def test_require_jwk_authentication_good_key(aiohttp_client, patch_datetim
"""An HTTP request to a view decorated by `@require_jwk_authentication`
auth correctly a temporary key signed by a wallet and an operation signed by that key"""
app = web.Application()
payload = {"time": "2010-12-25T17:05:55Z", "method": "GET", "path": "/"}
signer_account, headers = await generate_signer_and_signed_headers_for_operation(patch_datetime_now, payload)

@require_jwk_authentication
async def view(request, authenticated_sender):
assert authenticated_sender == signer_account.address
return web.Response(text="ok")

app.router.add_get("", view)
client = await aiohttp_client(app)

resp = await client.get("/", headers=headers)
assert resp.status == 200, await resp.text()

r = await resp.text()
assert "ok" == r


async def generate_signer_and_signed_headers_for_operation(
patch_datetime_now, operation_payload: dict
) -> tuple[LocalAccount, dict]:
"""Generate a temporary eth_account for testing and sign the operation with it"""
account = eth_account.Account()
signer_account = account.create()
key = jwk.JWK.generate(
kty="EC",
crv="P-256",
# key_ops=["verify"],
)

pubkey = {
"pubkey": json.loads(key.export_public()),
"alg": "ECDSA",
Expand All @@ -283,32 +304,19 @@ async def test_require_jwk_authentication_good_key(aiohttp_client, patch_datetim
"signature": pubkey_signature,
}
)
payload_as_bytes = json.dumps(operation_payload).encode("utf-8")

@require_jwk_authentication
async def view(request, authenticated_sender):
assert authenticated_sender == signer_account.address
return web.Response(text="ok")

app.router.add_get("", view)
client = await aiohttp_client(app)

payload = {"time": "2010-12-25T17:05:55Z", "method": "GET", "path": "/"}

payload_as_bytes = json.dumps(payload).encode("utf-8")
headers = {"X-SignedPubKey": pubkey_signature_header}
payload_signature = JWA.signing_alg("ES256").sign(key, payload_as_bytes)
headers["X-SignedOperation"] = json.dumps(
{
"payload": payload_as_bytes.hex(),
"signature": payload_signature.hex(),
}
)

resp = await client.get("/", headers=headers)
assert resp.status == 200, await resp.text()

r = await resp.text()
assert "ok" == r
headers = {
"X-SignedPubKey": pubkey_signature_header,
"X-SignedOperation": json.dumps(
{
"payload": payload_as_bytes.hex(),
"signature": payload_signature.hex(),
}
),
}
return signer_account, headers


@pytest.fixture
Expand Down
Loading
Loading