diff --git a/src/aleph/vm/orchestrator/views/operator.py b/src/aleph/vm/orchestrator/views/operator.py index 75d7e7a8..7985687f 100644 --- a/src/aleph/vm/orchestrator/views/operator.py +++ b/src/aleph/vm/orchestrator/views/operator.py @@ -108,47 +108,47 @@ async def stream_logs(request: web.Request) -> web.StreamResponse: async def operate_logs_json(request: web.Request, authenticated_sender: str) -> web.StreamResponse: """Logs of a VM (not streaming) as json""" vm_hash = get_itemhash_or_400(request.match_info) + with set_vm_for_logging(vm_hash=vm_hash): + # This endpoint allow logs for past executions, so we look into the database if any execution by that hash + # occurred, which we can then use to look for rights. We still check in the pool first, it is faster + pool: VmPool = request.app["vm_pool"] + execution = pool.executions.get(vm_hash) + if execution: + message = execution.message + else: + record = await metrics.get_last_record_for_vm(vm_hash=vm_hash) + if not record: + raise aiohttp.web_exceptions.HTTPNotFound(body="No execution found for this VM") + message = get_message_executable_content(json.loads(record.message)) + if not is_sender_authorized(authenticated_sender, message): + return web.Response(status=403, body="Unauthorized sender") - # This endpoint allow logs for past executions, so we look into the database if any execution by that hash - # occurred, which we can then use to look for rights. We still check in the pool first, it is faster - pool: VmPool = request.app["vm_pool"] - execution = pool.executions.get(vm_hash) - if execution: - message = execution.message - else: - record = await metrics.get_last_record_for_vm(vm_hash=vm_hash) - if not record: - raise aiohttp.web_exceptions.HTTPNotFound(body="No execution found for this VM") - message = get_message_executable_content(json.loads(record.message)) - if not is_sender_authorized(authenticated_sender, message): - return web.Response(status=403, body="Unauthorized sender") - - _journal_stdout_name = f"vm-{vm_hash}-stdout" - _journal_stderr_name = f"vm-{vm_hash}-stderr" - - response = web.StreamResponse() - response.headers["Transfer-encoding"] = "chunked" - response.headers["Content-Type"] = "application/json" - await response.prepare(request) - await response.write(b"[") - - first = True - for entry in get_past_vm_logs(_journal_stdout_name, _journal_stderr_name): - if not first: - await response.write(b",\n") - first = False - log_type = "stdout" if entry["SYSLOG_IDENTIFIER"] == _journal_stdout_name else "stderr" - msg = { - "SYSLOG_IDENTIFIER": entry["SYSLOG_IDENTIFIER"], - "MESSAGE": entry["MESSAGE"], - "file": log_type, - "__REALTIME_TIMESTAMP": entry["__REALTIME_TIMESTAMP"], - } - await response.write(dumps_for_json(msg).encode()) - await response.write(b"]") - - await response.write_eof() - return response + _journal_stdout_name = f"vm-{vm_hash}-stdout" + _journal_stderr_name = f"vm-{vm_hash}-stderr" + + response = web.StreamResponse() + response.headers["Transfer-encoding"] = "chunked" + response.headers["Content-Type"] = "application/json" + await response.prepare(request) + await response.write(b"[") + + first = True + for entry in get_past_vm_logs(_journal_stdout_name, _journal_stderr_name): + if not first: + await response.write(b",\n") + first = False + log_type = "stdout" if entry["SYSLOG_IDENTIFIER"] == _journal_stdout_name else "stderr" + msg = { + "SYSLOG_IDENTIFIER": entry["SYSLOG_IDENTIFIER"], + "MESSAGE": entry["MESSAGE"], + "file": log_type, + "__REALTIME_TIMESTAMP": entry["__REALTIME_TIMESTAMP"], + } + await response.write(dumps_for_json(msg).encode()) + await response.write(b"]") + + await response.write_eof() + return response async def authenticate_websocket_for_vm_or_403(execution: VmExecution, vm_hash: ItemHash, ws: web.WebSocketResponse):