Skip to content

Commit

Permalink
set logging system
Browse files Browse the repository at this point in the history
  • Loading branch information
olethanh committed Nov 20, 2024
1 parent 97f7117 commit bff9d52
Showing 1 changed file with 40 additions and 40 deletions.
80 changes: 40 additions & 40 deletions src/aleph/vm/orchestrator/views/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,47 +108,47 @@ async def stream_logs(request: web.Request) -> web.StreamResponse:
async def operate_logs_json(request: web.Request, authenticated_sender: str) -> web.StreamResponse:
"""Logs of a VM (not streaming) as json"""
vm_hash = get_itemhash_or_400(request.match_info)
with set_vm_for_logging(vm_hash=vm_hash):
# This endpoint allow logs for past executions, so we look into the database if any execution by that hash
# occurred, which we can then use to look for rights. We still check in the pool first, it is faster
pool: VmPool = request.app["vm_pool"]
execution = pool.executions.get(vm_hash)
if execution:
message = execution.message

Check warning on line 117 in src/aleph/vm/orchestrator/views/operator.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/orchestrator/views/operator.py#L117

Added line #L117 was not covered by tests
else:
record = await metrics.get_last_record_for_vm(vm_hash=vm_hash)
if not record:
raise aiohttp.web_exceptions.HTTPNotFound(body="No execution found for this VM")

Check warning on line 121 in src/aleph/vm/orchestrator/views/operator.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/orchestrator/views/operator.py#L121

Added line #L121 was not covered by tests
message = get_message_executable_content(json.loads(record.message))
if not is_sender_authorized(authenticated_sender, message):
return web.Response(status=403, body="Unauthorized sender")

# This endpoint allow logs for past executions, so we look into the database if any execution by that hash
# occurred, which we can then use to look for rights. We still check in the pool first, it is faster
pool: VmPool = request.app["vm_pool"]
execution = pool.executions.get(vm_hash)
if execution:
message = execution.message
else:
record = await metrics.get_last_record_for_vm(vm_hash=vm_hash)
if not record:
raise aiohttp.web_exceptions.HTTPNotFound(body="No execution found for this VM")
message = get_message_executable_content(json.loads(record.message))
if not is_sender_authorized(authenticated_sender, message):
return web.Response(status=403, body="Unauthorized sender")

_journal_stdout_name = f"vm-{vm_hash}-stdout"
_journal_stderr_name = f"vm-{vm_hash}-stderr"

response = web.StreamResponse()
response.headers["Transfer-encoding"] = "chunked"
response.headers["Content-Type"] = "application/json"
await response.prepare(request)
await response.write(b"[")

first = True
for entry in get_past_vm_logs(_journal_stdout_name, _journal_stderr_name):
if not first:
await response.write(b",\n")
first = False
log_type = "stdout" if entry["SYSLOG_IDENTIFIER"] == _journal_stdout_name else "stderr"
msg = {
"SYSLOG_IDENTIFIER": entry["SYSLOG_IDENTIFIER"],
"MESSAGE": entry["MESSAGE"],
"file": log_type,
"__REALTIME_TIMESTAMP": entry["__REALTIME_TIMESTAMP"],
}
await response.write(dumps_for_json(msg).encode())
await response.write(b"]")

await response.write_eof()
return response
_journal_stdout_name = f"vm-{vm_hash}-stdout"
_journal_stderr_name = f"vm-{vm_hash}-stderr"

response = web.StreamResponse()
response.headers["Transfer-encoding"] = "chunked"
response.headers["Content-Type"] = "application/json"
await response.prepare(request)
await response.write(b"[")

first = True
for entry in get_past_vm_logs(_journal_stdout_name, _journal_stderr_name):
if not first:
await response.write(b",\n")
first = False
log_type = "stdout" if entry["SYSLOG_IDENTIFIER"] == _journal_stdout_name else "stderr"
msg = {
"SYSLOG_IDENTIFIER": entry["SYSLOG_IDENTIFIER"],
"MESSAGE": entry["MESSAGE"],
"file": log_type,
"__REALTIME_TIMESTAMP": entry["__REALTIME_TIMESTAMP"],
}
await response.write(dumps_for_json(msg).encode())
await response.write(b"]")

await response.write_eof()
return response


async def authenticate_websocket_for_vm_or_403(execution: VmExecution, vm_hash: ItemHash, ws: web.WebSocketResponse):
Expand Down

0 comments on commit bff9d52

Please sign in to comment.