Skip to content

Commit

Permalink
Detect already running Persistent VMs (#541)
Browse files Browse the repository at this point in the history
Problem: Persistent VMs running were not detected after the orchestrator reboot.

Solution: Don't delete the entire table on the start process.

* Fix: Avoid to wait so long time to stop guest_api process. Put a timeout of 10 seconds.
* Fix: Avoid final `cannot unpack non-iterable VmExecution object` errors giving an empty list instead None value.
* Fix: If the execution already exist, only continue, not break the loop.

---------

Co-authored-by: Andres D. Molins <[email protected]>
  • Loading branch information
nesitor and Andres D. Molins authored Feb 20, 2024
1 parent efb5b30 commit a470f4e
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 14 deletions.
5 changes: 4 additions & 1 deletion src/aleph/vm/controllers/firecracker/executable.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,8 +306,11 @@ async def start_guest_api(self):
logger.debug(f"started guest API for {self.vm_id}")

async def stop_guest_api(self):
if self.guest_api_process and self.guest_api_process._popen:
if self.guest_api_process and self.guest_api_process.is_alive():
self.guest_api_process.terminate()
await asyncio.sleep(5)
if self.guest_api_process.is_alive():
self.guest_api_process.kill()

async def teardown(self):
if self.fvm:
Expand Down
1 change: 0 additions & 1 deletion src/aleph/vm/orchestrator/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ def setup_engine():

async def create_tables(engine: Engine):
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await conn.run_sync(Base.metadata.create_all)


Expand Down
4 changes: 2 additions & 2 deletions src/aleph/vm/orchestrator/views/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ async def operate_reboot(request: web.Request, authenticated_sender: str) -> web
if execution.is_running:
logger.info(f"Rebooting {execution.vm_hash}")
if execution.persistent:
await pool.systemd_manager.restart(execution.controller_service)
pool.systemd_manager.restart(execution.controller_service)
else:
await pool.stop_vm(vm_hash)
pool.forget_vm(vm_hash)
Expand Down Expand Up @@ -204,7 +204,7 @@ async def operate_erase(request: web.Request, authenticated_sender: str) -> web.

# Stop the VM
await pool.stop_vm(execution.vm_hash)
await pool.forget_vm(execution.vm_hash)
pool.forget_vm(execution.vm_hash)

# Delete all data
if execution.resources is not None:
Expand Down
12 changes: 7 additions & 5 deletions src/aleph/vm/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,6 @@ async def stop_persistent_execution(self, execution):
assert execution.persistent, "Execution isn't persistent"
self.systemd_manager.stop_and_disable(execution.controller_service)
await execution.stop()
execution.persistent = False

def forget_vm(self, vm_hash: ItemHash) -> None:
"""Remove a VM from the executions pool.
Expand All @@ -209,7 +208,7 @@ async def _load_persistent_executions(self):
for saved_execution in saved_executions:
# Prevent to load the same execution twice
if self.executions.get(saved_execution.vm_hash):
break
continue

vm_id = saved_execution.vm_id
message_dict = json.loads(saved_execution.message)
Expand Down Expand Up @@ -249,25 +248,28 @@ async def stop(self):
await asyncio.gather(*(execution.stop() for vm_hash, execution in self.get_ephemeral_executions()))

def get_ephemeral_executions(self) -> Iterable[VmExecution]:
return (
executions = (
execution
for _vm_hash, execution in self.executions.items()
if execution.is_running and not execution.persistent
)
return executions or []

def get_persistent_executions(self) -> Iterable[VmExecution]:
return (
executions = (
execution
for _vm_hash, execution in self.executions.items()
if execution.is_running and execution.persistent
)
return executions or []

def get_instance_executions(self) -> Iterable[VmExecution]:
return (
executions = (
execution
for _vm_hash, execution in self.executions.items()
if execution.is_running and execution.is_instance
)
return executions or []

def get_executions_by_sender(self, payment_type: PaymentType) -> Dict[str, Dict[str, list[VmExecution]]]:
"""Return all executions of the given type, grouped by sender and by chain."""
Expand Down
7 changes: 2 additions & 5 deletions src/aleph/vm/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,17 @@
import msgpack
from aiohttp_cors import ResourceOptions, custom_cors
from aleph_message.models import ExecutableContent, InstanceContent, ProgramContent
from aleph_message.models.execution.base import MachineType
from eth_typing import HexAddress, HexStr
from eth_utils import hexstr_if_str, is_address, to_hex

logger = logging.getLogger(__name__)


def get_message_executable_content(message_dict: Dict) -> ExecutableContent:
if message_dict["type"] == MachineType.vm_function:
try:
return ProgramContent.parse_obj(message_dict)
elif message_dict["type"] == MachineType.vm_instance:
except ValueError as error:
return InstanceContent.parse_obj(message_dict)
else:
raise ValueError(f"Unknown message type {message_dict['type']}")


def cors_allow_all(function):
Expand Down

0 comments on commit a470f4e

Please sign in to comment.