From a629e98fd30941ab784c092f632f69e128fbcbfc Mon Sep 17 00:00:00 2001 From: "Andres D. Molins" Date: Mon, 23 Oct 2023 13:48:25 +0200 Subject: [PATCH] Fix: Refactor VMPool class to decouple it from orchestrator path. --- .../firecracker/snapshot_manager.py | 32 +++++++++---------- src/aleph/vm/{orchestrator => }/models.py | 8 ++--- src/aleph/vm/orchestrator/run.py | 2 +- src/aleph/vm/{orchestrator => }/pool.py | 8 ++--- 4 files changed, 25 insertions(+), 25 deletions(-) rename src/aleph/vm/{orchestrator => }/models.py (98%) rename src/aleph/vm/{orchestrator => }/pool.py (96%) diff --git a/src/aleph/vm/controllers/firecracker/snapshot_manager.py b/src/aleph/vm/controllers/firecracker/snapshot_manager.py index 8ab3a4d6c..1b5eac64e 100644 --- a/src/aleph/vm/controllers/firecracker/snapshot_manager.py +++ b/src/aleph/vm/controllers/firecracker/snapshot_manager.py @@ -8,31 +8,31 @@ from schedule import Job, Scheduler from aleph.vm.conf import settings -from aleph.vm.orchestrator.models import VmExecution +from .executable import AlephFirecrackerExecutable from .snapshots import CompressedDiskVolumeSnapshot logger = logging.getLogger(__name__) -def wrap_async_snapshot(execution): - asyncio.run(do_execution_snapshot(execution)) +def wrap_async_snapshot(vm): + asyncio.run(do_vm_snapshot(vm)) -def run_threaded_snapshot(execution): - job_thread = threading.Thread(target=wrap_async_snapshot, args=(execution,)) +def run_threaded_snapshot(vm): + job_thread = threading.Thread(target=wrap_async_snapshot, args=(vm,)) job_thread.start() -async def do_execution_snapshot(execution: VmExecution) -> CompressedDiskVolumeSnapshot: +async def do_vm_snapshot(vm: AlephFirecrackerExecutable) -> CompressedDiskVolumeSnapshot: try: - logger.debug(f"Starting new snapshot for VM {execution.vm_hash}") - assert execution.vm, "VM execution not set" + logger.debug(f"Starting new snapshot for VM {vm.vm_hash}") + assert vm, "VM execution not set" - snapshot = await execution.vm.create_snapshot() + snapshot = await vm.create_snapshot() await snapshot.upload() - logger.debug(f"New snapshots for VM {execution.vm_hash} created in {snapshot.path}") + logger.debug(f"New snapshots for VM {vm.vm_hash} created in {snapshot.path}") return snapshot except ValueError: msg = "Something failed taking an snapshot" @@ -47,7 +47,7 @@ def infinite_run_scheduler_jobs(scheduler: Scheduler) -> None: class SnapshotExecution: vm_hash: ItemHash - execution: VmExecution + execution: AlephFirecrackerExecutable frequency: int _scheduler: Scheduler _job: Job @@ -56,7 +56,7 @@ def __init__( self, scheduler: Scheduler, vm_hash: ItemHash, - execution: VmExecution, + execution: AlephFirecrackerExecutable, frequency: int, ): self.vm_hash = vm_hash @@ -95,18 +95,18 @@ def run_snapshots(self) -> None: ) job_thread.start() - async def start_for(self, execution: VmExecution, frequency: Optional[int] = None) -> None: - if not execution.is_instance: + async def start_for(self, vm: AlephFirecrackerExecutable, frequency: Optional[int] = None) -> None: + if not vm.is_instance: msg = "Snapshots are not implemented for programs." raise NotImplementedError(msg) default_frequency = frequency or settings.SNAPSHOT_FREQUENCY - vm_hash = execution.vm_hash + vm_hash = vm.vm_hash snapshot_execution = SnapshotExecution( scheduler=self._scheduler, vm_hash=vm_hash, - execution=execution, + execution=vm, frequency=default_frequency, ) self.executions[vm_hash] = snapshot_execution diff --git a/src/aleph/vm/orchestrator/models.py b/src/aleph/vm/models.py similarity index 98% rename from src/aleph/vm/orchestrator/models.py rename to src/aleph/vm/models.py index 958847a0c..247c1b33b 100644 --- a/src/aleph/vm/orchestrator/models.py +++ b/src/aleph/vm/models.py @@ -24,10 +24,10 @@ from aleph.vm.network.interfaces import TapInterface from aleph.vm.utils import create_task_log_exceptions, dumps_for_json -from ..conf import settings -from .metrics import ExecutionRecord, save_execution_data, save_record -from .pubsub import PubSub -from .vm import AlephFirecrackerInstance +from aleph.vm.conf import settings +from aleph.vm.orchestrator.metrics import ExecutionRecord, save_execution_data, save_record +from aleph.vm.orchestrator.pubsub import PubSub +from aleph.vm.orchestrator.vm import AlephFirecrackerInstance if TYPE_CHECKING: from aleph.vm.controllers.firecracker.snapshot_manager import SnapshotManager diff --git a/src/aleph/vm/orchestrator/run.py b/src/aleph/vm/orchestrator/run.py index a92409d4c..a3d4c2c8f 100644 --- a/src/aleph/vm/orchestrator/run.py +++ b/src/aleph/vm/orchestrator/run.py @@ -18,9 +18,9 @@ from aleph.vm.utils import HostNotFoundError from ..conf import settings +from ..pool import VmPool from .messages import load_updated_message from .models import VmExecution -from .pool import VmPool from .pubsub import PubSub logger = logging.getLogger(__name__) diff --git a/src/aleph/vm/orchestrator/pool.py b/src/aleph/vm/pool.py similarity index 96% rename from src/aleph/vm/orchestrator/pool.py rename to src/aleph/vm/pool.py index ba16f67f8..8c99c573f 100644 --- a/src/aleph/vm/orchestrator/pool.py +++ b/src/aleph/vm/pool.py @@ -8,10 +8,10 @@ from aleph.vm.controllers.firecracker.snapshot_manager import SnapshotManager -from ..conf import settings -from ..network.hostnetwork import Network, make_ipv6_allocator +from aleph.vm.conf import settings +from aleph.vm.network.hostnetwork import Network, make_ipv6_allocator from .models import ExecutableContent, VmExecution -from .vm.vm_type import VmType +from aleph.vm.orchestrator.vm.vm_type import VmType logger = logging.getLogger(__name__) @@ -79,7 +79,7 @@ async def create_a_vm( # Start VM snapshots automatically if isinstance(message, InstanceContent): - await self.snapshot_manager.start_for(execution=execution) + await self.snapshot_manager.start_for(vm=execution.vm) return execution