Skip to content

Commit

Permalink
Fix: Errors in allocation exited entire scheduling
Browse files Browse the repository at this point in the history
Problem: An error in the allocation of one persistent VM or instance crashed the allocation of all remaining VMs.

Solution: Handle errors in the creation of VMs as not to stop the entire process. Return the allocations that were successful and those who were not.

Split the exception handling between direct HTTP calls and allocations for distinct handling.
  • Loading branch information
hoh committed Sep 25, 2023
1 parent b0204f3 commit 6098a39
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 33 deletions.
30 changes: 27 additions & 3 deletions vm_supervisor/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,37 @@ async def create_vm_execution(vm_hash: ItemHash) -> VmExecution:
except HostNotFoundError as error:
logger.exception(error)
pool.forget_vm(vm_hash=vm_hash)
raise HTTPInternalServerError(reason="Host did not respond to ping")

if not execution.vm:
raise ValueError("The VM has not been created")

return execution


async def create_vm_execution_or_raise_http_error(vm_hash: ItemHash) -> VmExecution:
try:
return await create_vm_execution(vm_hash=vm_hash)
except ResourceDownloadError as error:
logger.exception(error)
pool.forget_vm(vm_hash=vm_hash)
raise HTTPBadRequest(reason="Code, runtime or data not available")
except FileTooLargeError as error:
raise HTTPInternalServerError(reason=error.args[0])
except VmSetupError as error:
logger.exception(error)
pool.forget_vm(vm_hash=vm_hash)
raise HTTPInternalServerError(reason="Error during vm initialisation")
except MicroVMFailedInit as error:
logger.exception(error)
pool.forget_vm(vm_hash=vm_hash)
raise HTTPInternalServerError(reason="Error during runtime initialisation")
except HostNotFoundError as error:
logger.exception(error)
pool.forget_vm(vm_hash=vm_hash)
raise HTTPInternalServerError(reason="Host did not respond to ping")


async def run_code_on_request(
vm_hash: ItemHash, path: str, request: web.Request
) -> web.Response:
Expand All @@ -96,7 +120,7 @@ async def run_code_on_request(
execution: Optional[VmExecution] = await pool.get_running_vm(vm_hash=vm_hash)

if not execution:
execution = await create_vm_execution(vm_hash=vm_hash)
execution = await create_vm_execution_or_raise_http_error(vm_hash=vm_hash)

logger.debug(f"Using vm={execution.vm_id}")

Expand Down Expand Up @@ -176,7 +200,7 @@ async def run_code_on_request(
if settings.REUSE_TIMEOUT > 0:
if settings.WATCH_FOR_UPDATES:
execution.start_watching_for_updates(pubsub=request.app["pubsub"])
execution.stop_after_timeout(timeout=settings.REUSE_TIMEOUT)
_ = execution.stop_after_timeout(timeout=settings.REUSE_TIMEOUT)
else:
await execution.stop()

Expand All @@ -189,7 +213,7 @@ async def run_code_on_event(vm_hash: ItemHash, event, pubsub: PubSub):
execution: Optional[VmExecution] = await pool.get_running_vm(vm_hash=vm_hash)

if not execution:
execution = await create_vm_execution(vm_hash=vm_hash)
execution = await create_vm_execution_or_raise_http_error(vm_hash=vm_hash)

logger.debug(f"Using vm={execution.vm_id}")

Expand Down
81 changes: 64 additions & 17 deletions vm_supervisor/views/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import asyncio
import binascii
import json
import logging
from hashlib import sha256
from pathlib import Path
from string import Template
from typing import Awaitable, Optional
from typing import Awaitable, Coroutine, Dict, List, Optional

import aiodns
import aiohttp
Expand All @@ -12,15 +14,23 @@
from aleph_message.models import ItemHash
from pydantic import ValidationError

from firecracker.microvm import MicroVMFailedInit
from packaging.version import InvalidVersion, Version
from vm_supervisor import status
from vm_supervisor.conf import settings
from vm_supervisor.metrics import get_execution_records
from vm_supervisor.pubsub import PubSub
from vm_supervisor.resources import Allocation
from vm_supervisor.run import pool, run_code_on_request, start_persistent_vm
from vm_supervisor.utils import b32_to_b16, dumps_for_json, get_ref_from_dns
from vm_supervisor.utils import (
HostNotFoundError,
b32_to_b16,
dumps_for_json,
get_ref_from_dns,
)
from vm_supervisor.version import __version__
from vm_supervisor.vm.firecracker.executable import ResourceDownloadError, VmSetupError
from vm_supervisor.vm.firecracker.program import FileTooLargeError

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -201,20 +211,7 @@ async def update_allocations(request: web.Request):

pubsub: PubSub = request.app["pubsub"]

# Start VMs
for vm_hash in allocation.persistent_vms:
vm_hash = ItemHash(vm_hash)
logger.info(f"Starting long running VM {vm_hash}")
await start_persistent_vm(vm_hash, pubsub)

# Start Instances
for instance_hash in allocation.instances:
instance_hash = ItemHash(instance_hash)
logger.info(f"Starting instance {instance_hash}")
await start_persistent_vm(instance_hash, pubsub)

# Stop unscheduled persistent programs and instances.
# Instances are also marked with persistent = True.
# First free resources from persistent programs and instances that are not scheduled anymore.
allocations = allocation.persistent_vms | allocation.instances
for execution in pool.get_persistent_executions():
if execution.vm_hash not in allocations:
Expand All @@ -223,10 +220,60 @@ async def update_allocations(request: web.Request):
await execution.stop()
execution.persistent = False

# Second start persistent VMs and instances sequentially to limit resource usage.

vm_creation_exceptions = (
ResourceDownloadError,
FileTooLargeError,
VmSetupError,
MicroVMFailedInit,
HostNotFoundError,
)

scheduling_errors: Dict[ItemHash, Exception] = {}

# Schedule the start of persistent VMs:
for vm_hash in allocation.persistent_vms:
vm_hash = ItemHash(vm_hash)
logger.info(f"Starting long running VM {vm_hash}")
try:
await start_persistent_vm(vm_hash, pubsub)
except vm_creation_exceptions as error:
logger.exception(error)
scheduling_errors[vm_hash] = error

# Schedule the start of instances:
for instance_hash in allocation.instances:
instance_hash = ItemHash(instance_hash)
logger.info(f"Starting instance {instance_hash}")
try:
await start_persistent_vm(vm_hash, pubsub)
except vm_creation_exceptions as error:
logger.exception(error)
scheduling_errors[vm_hash] = error

# Log unsupported features
if allocation.on_demand_vms:
logger.warning("Not supported yet: 'allocation.on_demand_vms'")
if allocation.jobs:
logger.warning("Not supported yet: 'allocation.on_demand_vms'")

return web.json_response(data={"success": True})
failing = set(scheduling_errors.keys())
successful = allocations - failing

status_code: int
if not failing:
status_code = 200 # OK
elif not successful:
status_code = 503 # Service Unavailable
else:
status_code = 207 # Multi-Status

return web.json_response(
data={
"success": not failing,
"successful": list(successful),
"failing": list(failing),
},
status=status_code,
)
14 changes: 1 addition & 13 deletions vm_supervisor/vm/firecracker/program.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from .executable import (
AlephFirecrackerExecutable,
AlephFirecrackerResources,
ResourceDownloadError,
VmInitNotConnected,
VmSetupError,
Volume,
Expand All @@ -45,19 +46,6 @@ class FileTooLargeError(Exception):
pass


class ResourceDownloadError(ClientResponseError):
"""An error occurred while downloading a VM resource file"""

def __init__(self, error: ClientResponseError):
super().__init__(
request_info=error.request_info,
history=error.history,
status=error.status,
message=error.message,
headers=error.headers,
)


def read_input_data(path_to_data: Optional[Path]) -> Optional[bytes]:
if not path_to_data:
return None
Expand Down

0 comments on commit 6098a39

Please sign in to comment.