From 6098a39abed62f1e2f735f1b9e67fb3d3c957f32 Mon Sep 17 00:00:00 2001 From: Hugo Herter Date: Mon, 25 Sep 2023 14:32:31 +0200 Subject: [PATCH] Fix: Errors in allocation exited entire scheduling Problem: An error in the allocation of one persistent VM or instance crashed the allocation of all remaining VMs. Solution: Handle errors in the creation of VMs as not to stop the entire process. Return the allocations that were successful and those who were not. Split the exception handling between direct HTTP calls and allocations for distinct handling. --- vm_supervisor/run.py | 30 ++++++++- vm_supervisor/views/__init__.py | 81 +++++++++++++++++++------ vm_supervisor/vm/firecracker/program.py | 14 +---- 3 files changed, 92 insertions(+), 33 deletions(-) diff --git a/vm_supervisor/run.py b/vm_supervisor/run.py index 6e8957893..94c408967 100644 --- a/vm_supervisor/run.py +++ b/vm_supervisor/run.py @@ -79,6 +79,7 @@ async def create_vm_execution(vm_hash: ItemHash) -> VmExecution: except HostNotFoundError as error: logger.exception(error) pool.forget_vm(vm_hash=vm_hash) + raise HTTPInternalServerError(reason="Host did not respond to ping") if not execution.vm: raise ValueError("The VM has not been created") @@ -86,6 +87,29 @@ async def create_vm_execution(vm_hash: ItemHash) -> VmExecution: return execution +async def create_vm_execution_or_raise_http_error(vm_hash: ItemHash) -> VmExecution: + try: + return await create_vm_execution(vm_hash=vm_hash) + except ResourceDownloadError as error: + logger.exception(error) + pool.forget_vm(vm_hash=vm_hash) + raise HTTPBadRequest(reason="Code, runtime or data not available") + except FileTooLargeError as error: + raise HTTPInternalServerError(reason=error.args[0]) + except VmSetupError as error: + logger.exception(error) + pool.forget_vm(vm_hash=vm_hash) + raise HTTPInternalServerError(reason="Error during vm initialisation") + except MicroVMFailedInit as error: + logger.exception(error) + pool.forget_vm(vm_hash=vm_hash) + raise HTTPInternalServerError(reason="Error during runtime initialisation") + except HostNotFoundError as error: + logger.exception(error) + pool.forget_vm(vm_hash=vm_hash) + raise HTTPInternalServerError(reason="Host did not respond to ping") + + async def run_code_on_request( vm_hash: ItemHash, path: str, request: web.Request ) -> web.Response: @@ -96,7 +120,7 @@ async def run_code_on_request( execution: Optional[VmExecution] = await pool.get_running_vm(vm_hash=vm_hash) if not execution: - execution = await create_vm_execution(vm_hash=vm_hash) + execution = await create_vm_execution_or_raise_http_error(vm_hash=vm_hash) logger.debug(f"Using vm={execution.vm_id}") @@ -176,7 +200,7 @@ async def run_code_on_request( if settings.REUSE_TIMEOUT > 0: if settings.WATCH_FOR_UPDATES: execution.start_watching_for_updates(pubsub=request.app["pubsub"]) - execution.stop_after_timeout(timeout=settings.REUSE_TIMEOUT) + _ = execution.stop_after_timeout(timeout=settings.REUSE_TIMEOUT) else: await execution.stop() @@ -189,7 +213,7 @@ async def run_code_on_event(vm_hash: ItemHash, event, pubsub: PubSub): execution: Optional[VmExecution] = await pool.get_running_vm(vm_hash=vm_hash) if not execution: - execution = await create_vm_execution(vm_hash=vm_hash) + execution = await create_vm_execution_or_raise_http_error(vm_hash=vm_hash) logger.debug(f"Using vm={execution.vm_id}") diff --git a/vm_supervisor/views/__init__.py b/vm_supervisor/views/__init__.py index bc034f21d..57842abff 100644 --- a/vm_supervisor/views/__init__.py +++ b/vm_supervisor/views/__init__.py @@ -1,9 +1,11 @@ +import asyncio import binascii +import json import logging from hashlib import sha256 from pathlib import Path from string import Template -from typing import Awaitable, Optional +from typing import Awaitable, Coroutine, Dict, List, Optional import aiodns import aiohttp @@ -12,6 +14,7 @@ from aleph_message.models import ItemHash from pydantic import ValidationError +from firecracker.microvm import MicroVMFailedInit from packaging.version import InvalidVersion, Version from vm_supervisor import status from vm_supervisor.conf import settings @@ -19,8 +22,15 @@ from vm_supervisor.pubsub import PubSub from vm_supervisor.resources import Allocation from vm_supervisor.run import pool, run_code_on_request, start_persistent_vm -from vm_supervisor.utils import b32_to_b16, dumps_for_json, get_ref_from_dns +from vm_supervisor.utils import ( + HostNotFoundError, + b32_to_b16, + dumps_for_json, + get_ref_from_dns, +) from vm_supervisor.version import __version__ +from vm_supervisor.vm.firecracker.executable import ResourceDownloadError, VmSetupError +from vm_supervisor.vm.firecracker.program import FileTooLargeError logger = logging.getLogger(__name__) @@ -201,20 +211,7 @@ async def update_allocations(request: web.Request): pubsub: PubSub = request.app["pubsub"] - # Start VMs - for vm_hash in allocation.persistent_vms: - vm_hash = ItemHash(vm_hash) - logger.info(f"Starting long running VM {vm_hash}") - await start_persistent_vm(vm_hash, pubsub) - - # Start Instances - for instance_hash in allocation.instances: - instance_hash = ItemHash(instance_hash) - logger.info(f"Starting instance {instance_hash}") - await start_persistent_vm(instance_hash, pubsub) - - # Stop unscheduled persistent programs and instances. - # Instances are also marked with persistent = True. + # First free resources from persistent programs and instances that are not scheduled anymore. allocations = allocation.persistent_vms | allocation.instances for execution in pool.get_persistent_executions(): if execution.vm_hash not in allocations: @@ -223,10 +220,60 @@ async def update_allocations(request: web.Request): await execution.stop() execution.persistent = False + # Second start persistent VMs and instances sequentially to limit resource usage. + + vm_creation_exceptions = ( + ResourceDownloadError, + FileTooLargeError, + VmSetupError, + MicroVMFailedInit, + HostNotFoundError, + ) + + scheduling_errors: Dict[ItemHash, Exception] = {} + + # Schedule the start of persistent VMs: + for vm_hash in allocation.persistent_vms: + vm_hash = ItemHash(vm_hash) + logger.info(f"Starting long running VM {vm_hash}") + try: + await start_persistent_vm(vm_hash, pubsub) + except vm_creation_exceptions as error: + logger.exception(error) + scheduling_errors[vm_hash] = error + + # Schedule the start of instances: + for instance_hash in allocation.instances: + instance_hash = ItemHash(instance_hash) + logger.info(f"Starting instance {instance_hash}") + try: + await start_persistent_vm(vm_hash, pubsub) + except vm_creation_exceptions as error: + logger.exception(error) + scheduling_errors[vm_hash] = error + # Log unsupported features if allocation.on_demand_vms: logger.warning("Not supported yet: 'allocation.on_demand_vms'") if allocation.jobs: logger.warning("Not supported yet: 'allocation.on_demand_vms'") - return web.json_response(data={"success": True}) + failing = set(scheduling_errors.keys()) + successful = allocations - failing + + status_code: int + if not failing: + status_code = 200 # OK + elif not successful: + status_code = 503 # Service Unavailable + else: + status_code = 207 # Multi-Status + + return web.json_response( + data={ + "success": not failing, + "successful": list(successful), + "failing": list(failing), + }, + status=status_code, + ) diff --git a/vm_supervisor/vm/firecracker/program.py b/vm_supervisor/vm/firecracker/program.py index 61e1b2ade..43c009b7b 100644 --- a/vm_supervisor/vm/firecracker/program.py +++ b/vm_supervisor/vm/firecracker/program.py @@ -33,6 +33,7 @@ from .executable import ( AlephFirecrackerExecutable, AlephFirecrackerResources, + ResourceDownloadError, VmInitNotConnected, VmSetupError, Volume, @@ -45,19 +46,6 @@ class FileTooLargeError(Exception): pass -class ResourceDownloadError(ClientResponseError): - """An error occurred while downloading a VM resource file""" - - def __init__(self, error: ClientResponseError): - super().__init__( - request_info=error.request_info, - history=error.history, - status=error.status, - message=error.message, - headers=error.headers, - ) - - def read_input_data(path_to_data: Optional[Path]) -> Optional[bytes]: if not path_to_data: return None