Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor VMPool class #444

Merged
merged 1 commit into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 16 additions & 16 deletions src/aleph/vm/controllers/firecracker/snapshot_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,31 +8,31 @@
from schedule import Job, Scheduler

from aleph.vm.conf import settings
from aleph.vm.orchestrator.models import VmExecution

from .executable import AlephFirecrackerExecutable
from .snapshots import CompressedDiskVolumeSnapshot

logger = logging.getLogger(__name__)


def wrap_async_snapshot(execution):
asyncio.run(do_execution_snapshot(execution))
def wrap_async_snapshot(vm):
asyncio.run(do_vm_snapshot(vm))


def run_threaded_snapshot(execution):
job_thread = threading.Thread(target=wrap_async_snapshot, args=(execution,))
def run_threaded_snapshot(vm):
job_thread = threading.Thread(target=wrap_async_snapshot, args=(vm,))
job_thread.start()


async def do_execution_snapshot(execution: VmExecution) -> CompressedDiskVolumeSnapshot:
async def do_vm_snapshot(vm: AlephFirecrackerExecutable) -> CompressedDiskVolumeSnapshot:
try:
logger.debug(f"Starting new snapshot for VM {execution.vm_hash}")
assert execution.vm, "VM execution not set"
logger.debug(f"Starting new snapshot for VM {vm.vm_hash}")
assert vm, "VM execution not set"

snapshot = await execution.vm.create_snapshot()
snapshot = await vm.create_snapshot()
await snapshot.upload()

logger.debug(f"New snapshots for VM {execution.vm_hash} created in {snapshot.path}")
logger.debug(f"New snapshots for VM {vm.vm_hash} created in {snapshot.path}")
return snapshot
except ValueError:
msg = "Something failed taking an snapshot"
Expand All @@ -47,7 +47,7 @@ def infinite_run_scheduler_jobs(scheduler: Scheduler) -> None:

class SnapshotExecution:
vm_hash: ItemHash
execution: VmExecution
execution: AlephFirecrackerExecutable
frequency: int
_scheduler: Scheduler
_job: Job
Expand All @@ -56,7 +56,7 @@ def __init__(
self,
scheduler: Scheduler,
vm_hash: ItemHash,
execution: VmExecution,
execution: AlephFirecrackerExecutable,
frequency: int,
):
self.vm_hash = vm_hash
Expand Down Expand Up @@ -95,18 +95,18 @@ def run_snapshots(self) -> None:
)
job_thread.start()

async def start_for(self, execution: VmExecution, frequency: Optional[int] = None) -> None:
if not execution.is_instance:
async def start_for(self, vm: AlephFirecrackerExecutable, frequency: Optional[int] = None) -> None:
if not vm.is_instance:
msg = "Snapshots are not implemented for programs."
raise NotImplementedError(msg)

default_frequency = frequency or settings.SNAPSHOT_FREQUENCY

vm_hash = execution.vm_hash
vm_hash = vm.vm_hash
snapshot_execution = SnapshotExecution(
scheduler=self._scheduler,
vm_hash=vm_hash,
execution=execution,
execution=vm,
frequency=default_frequency,
)
self.executions[vm_hash] = snapshot_execution
Expand Down
13 changes: 8 additions & 5 deletions src/aleph/vm/orchestrator/models.py → src/aleph/vm/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
ProgramContent,
)

from aleph.vm.conf import settings
from aleph.vm.controllers.firecracker.executable import AlephFirecrackerExecutable
from aleph.vm.controllers.firecracker.instance import AlephInstanceResources
from aleph.vm.controllers.firecracker.program import (
Expand All @@ -22,13 +23,15 @@
AlephProgramResources,
)
from aleph.vm.network.interfaces import TapInterface
from aleph.vm.orchestrator.metrics import (
ExecutionRecord,
save_execution_data,
save_record,
)
from aleph.vm.orchestrator.pubsub import PubSub
from aleph.vm.orchestrator.vm import AlephFirecrackerInstance
from aleph.vm.utils import create_task_log_exceptions, dumps_for_json

from ..conf import settings
from .metrics import ExecutionRecord, save_execution_data, save_record
from .pubsub import PubSub
from .vm import AlephFirecrackerInstance

if TYPE_CHECKING:
from aleph.vm.controllers.firecracker.snapshot_manager import SnapshotManager

Expand Down
4 changes: 0 additions & 4 deletions src/aleph/vm/orchestrator/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
from . import (
messages,
metrics,
models,
pool,
pubsub,
reactor,
resources,
Expand All @@ -20,8 +18,6 @@
__all__ = (
"messages",
"metrics",
"models",
"pool",
"pubsub",
"reactor",
"resources",
Expand Down
4 changes: 2 additions & 2 deletions src/aleph/vm/orchestrator/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
from aleph.vm.utils import HostNotFoundError

from ..conf import settings
from ..models import VmExecution
from ..pool import VmPool
from .messages import load_updated_message
from .models import VmExecution
from .pool import VmPool
from .pubsub import PubSub

logger = logging.getLogger(__name__)
Expand Down
2 changes: 1 addition & 1 deletion src/aleph/vm/orchestrator/views/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from eth_account.messages import encode_defunct
from jwskate import Jwk

from aleph.vm.orchestrator.models import VmExecution
from aleph.vm.models import VmExecution
from aleph.vm.orchestrator.run import pool

logger = logging.getLogger(__name__)
Expand Down
8 changes: 4 additions & 4 deletions src/aleph/vm/orchestrator/pool.py → src/aleph/vm/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
from aleph_message.models import ExecutableMessage, ItemHash
from aleph_message.models.execution.instance import InstanceContent

from aleph.vm.conf import settings
from aleph.vm.controllers.firecracker.snapshot_manager import SnapshotManager
from aleph.vm.network.hostnetwork import Network, make_ipv6_allocator
from aleph.vm.orchestrator.vm.vm_type import VmType

from ..conf import settings
from ..network.hostnetwork import Network, make_ipv6_allocator
from .models import ExecutableContent, VmExecution
from .vm.vm_type import VmType

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -79,7 +79,7 @@ async def create_a_vm(

# Start VM snapshots automatically
if isinstance(message, InstanceContent):
await self.snapshot_manager.start_for(execution=execution)
await self.snapshot_manager.start_for(vm=execution.vm)

return execution

Expand Down
Loading