Skip to content

Commit

Permalink
Fix: Refactor VMPool class to decouple it from orchestrator path.
Browse files Browse the repository at this point in the history
  • Loading branch information
nesitor committed Oct 23, 2023
1 parent eb7da3b commit a629e98
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 25 deletions.
32 changes: 16 additions & 16 deletions src/aleph/vm/controllers/firecracker/snapshot_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,31 +8,31 @@
from schedule import Job, Scheduler

from aleph.vm.conf import settings
from aleph.vm.orchestrator.models import VmExecution

from .executable import AlephFirecrackerExecutable
from .snapshots import CompressedDiskVolumeSnapshot

logger = logging.getLogger(__name__)


def wrap_async_snapshot(execution):
asyncio.run(do_execution_snapshot(execution))
def wrap_async_snapshot(vm):
asyncio.run(do_vm_snapshot(vm))


def run_threaded_snapshot(execution):
job_thread = threading.Thread(target=wrap_async_snapshot, args=(execution,))
def run_threaded_snapshot(vm):
job_thread = threading.Thread(target=wrap_async_snapshot, args=(vm,))
job_thread.start()


async def do_execution_snapshot(execution: VmExecution) -> CompressedDiskVolumeSnapshot:
async def do_vm_snapshot(vm: AlephFirecrackerExecutable) -> CompressedDiskVolumeSnapshot:
try:
logger.debug(f"Starting new snapshot for VM {execution.vm_hash}")
assert execution.vm, "VM execution not set"
logger.debug(f"Starting new snapshot for VM {vm.vm_hash}")
assert vm, "VM execution not set"

snapshot = await execution.vm.create_snapshot()
snapshot = await vm.create_snapshot()
await snapshot.upload()

logger.debug(f"New snapshots for VM {execution.vm_hash} created in {snapshot.path}")
logger.debug(f"New snapshots for VM {vm.vm_hash} created in {snapshot.path}")
return snapshot
except ValueError:
msg = "Something failed taking an snapshot"
Expand All @@ -47,7 +47,7 @@ def infinite_run_scheduler_jobs(scheduler: Scheduler) -> None:

class SnapshotExecution:
vm_hash: ItemHash
execution: VmExecution
execution: AlephFirecrackerExecutable
frequency: int
_scheduler: Scheduler
_job: Job
Expand All @@ -56,7 +56,7 @@ def __init__(
self,
scheduler: Scheduler,
vm_hash: ItemHash,
execution: VmExecution,
execution: AlephFirecrackerExecutable,
frequency: int,
):
self.vm_hash = vm_hash
Expand Down Expand Up @@ -95,18 +95,18 @@ def run_snapshots(self) -> None:
)
job_thread.start()

async def start_for(self, execution: VmExecution, frequency: Optional[int] = None) -> None:
if not execution.is_instance:
async def start_for(self, vm: AlephFirecrackerExecutable, frequency: Optional[int] = None) -> None:
if not vm.is_instance:
msg = "Snapshots are not implemented for programs."
raise NotImplementedError(msg)

default_frequency = frequency or settings.SNAPSHOT_FREQUENCY

vm_hash = execution.vm_hash
vm_hash = vm.vm_hash
snapshot_execution = SnapshotExecution(
scheduler=self._scheduler,
vm_hash=vm_hash,
execution=execution,
execution=vm,
frequency=default_frequency,
)
self.executions[vm_hash] = snapshot_execution
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@
from aleph.vm.network.interfaces import TapInterface
from aleph.vm.utils import create_task_log_exceptions, dumps_for_json

from ..conf import settings
from .metrics import ExecutionRecord, save_execution_data, save_record
from .pubsub import PubSub
from .vm import AlephFirecrackerInstance
from aleph.vm.conf import settings
from aleph.vm.orchestrator.metrics import ExecutionRecord, save_execution_data, save_record
from aleph.vm.orchestrator.pubsub import PubSub
from aleph.vm.orchestrator.vm import AlephFirecrackerInstance

if TYPE_CHECKING:
from aleph.vm.controllers.firecracker.snapshot_manager import SnapshotManager
Expand Down
2 changes: 1 addition & 1 deletion src/aleph/vm/orchestrator/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
from aleph.vm.utils import HostNotFoundError

from ..conf import settings
from ..pool import VmPool
from .messages import load_updated_message
from .models import VmExecution
from .pool import VmPool
from .pubsub import PubSub

logger = logging.getLogger(__name__)
Expand Down
8 changes: 4 additions & 4 deletions src/aleph/vm/orchestrator/pool.py → src/aleph/vm/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@

from aleph.vm.controllers.firecracker.snapshot_manager import SnapshotManager

from ..conf import settings
from ..network.hostnetwork import Network, make_ipv6_allocator
from aleph.vm.conf import settings
from aleph.vm.network.hostnetwork import Network, make_ipv6_allocator
from .models import ExecutableContent, VmExecution
from .vm.vm_type import VmType
from aleph.vm.orchestrator.vm.vm_type import VmType

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -79,7 +79,7 @@ async def create_a_vm(

# Start VM snapshots automatically
if isinstance(message, InstanceContent):
await self.snapshot_manager.start_for(execution=execution)
await self.snapshot_manager.start_for(vm=execution.vm)

return execution

Expand Down

0 comments on commit a629e98

Please sign in to comment.