Skip to content

Commit

Permalink
Problem: If a user wants to assign a GPU to a QEmu VM he cannot do it.
Browse files Browse the repository at this point in the history
Solution: Implement GPU assignation feature that will be pass-though to QEmu VMs with native performance.
  • Loading branch information
nesitor committed Dec 3, 2024
1 parent fb379ff commit aaaec2a
Show file tree
Hide file tree
Showing 9 changed files with 103 additions and 21 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ dependencies = [
"aioredis==1.3.1",
"aiosqlite==0.19",
"alembic==1.13.1",
"aleph-message==0.5",
"aleph-message @ git+https://github.com/aleph-im/aleph-message@andres-feature-add_gpu_requirement",
"aleph-superfluid~=0.2.1",
"dbus-python==1.3.2",
"eth-account~=0.10",
Expand Down
6 changes: 6 additions & 0 deletions src/aleph/vm/controllers/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ class QemuVMHostVolume(BaseModel):
read_only: bool


class QemuGPU(BaseModel):
pci_host: str


class QemuVMConfiguration(BaseModel):
qemu_bin_path: str
cloud_init_drive_path: str | None
Expand All @@ -33,6 +37,7 @@ class QemuVMConfiguration(BaseModel):
mem_size_mb: int
interface_name: str | None
host_volumes: list[QemuVMHostVolume]
gpus: list[QemuGPU]


class QemuConfidentialVMConfiguration(BaseModel):
Expand All @@ -45,6 +50,7 @@ class QemuConfidentialVMConfiguration(BaseModel):
mem_size_mb: int
interface_name: str | None
host_volumes: list[QemuVMHostVolume]
gpus: list[QemuGPU]
ovmf_path: Path
sev_session_file: Path
sev_dh_cert_file: Path
Expand Down
5 changes: 5 additions & 0 deletions src/aleph/vm/controllers/firecracker/executable.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ class HostVolume:
read_only: bool


@dataclass
class HostGPU:
pci_host: str


@dataclass
class BaseConfiguration:
vm_hash: ItemHash
Expand Down
10 changes: 9 additions & 1 deletion src/aleph/vm/controllers/qemu/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from asyncio import Task
from asyncio.subprocess import Process
from pathlib import Path
from typing import Generic, TypeVar
from typing import Generic, TypeVar, List

import psutil
from aleph_message.models import ItemHash
Expand All @@ -19,11 +19,13 @@
HypervisorType,
QemuVMConfiguration,
QemuVMHostVolume,
QemuGPU,
save_controller_configuration,
)
from aleph.vm.controllers.firecracker.executable import (
AlephFirecrackerResources,
VmSetupError,
HostGPU,
)
from aleph.vm.controllers.interface import AlephVmControllerInterface
from aleph.vm.controllers.qemu.cloudinit import CloudInitMixin
Expand All @@ -36,6 +38,8 @@


class AlephQemuResources(AlephFirecrackerResources):
gpus: list[HostGPU]

async def download_runtime(self) -> None:
volume = self.message_content.rootfs
parent_image_path = await get_rootfs_base_path(volume.parent.ref)
Expand Down Expand Up @@ -200,6 +204,10 @@ async def configure(self):
)
for volume in self.resources.volumes
],
gpus=[
QemuGPU(pci_host=gpu.pci_host)
for gpu in self.resources.gpus
]
)

configuration = Configuration(
Expand Down
5 changes: 5 additions & 0 deletions src/aleph/vm/controllers/qemu_confidential/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
Configuration,
HypervisorType,
QemuConfidentialVMConfiguration,
QemuGPU,
QemuVMHostVolume,
save_controller_configuration,
)
Expand Down Expand Up @@ -126,6 +127,10 @@ async def configure(self):
)
for volume in self.resources.volumes
],
gpus=[
QemuGPU(pci_host=gpu.pci_host)
for gpu in self.resources.gpus
]
)

configuration = Configuration(
Expand Down
34 changes: 28 additions & 6 deletions src/aleph/vm/hypervisors/qemu/qemuvm.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import qmp
from systemd import journal

from aleph.vm.controllers.configuration import QemuVMConfiguration
from aleph.vm.controllers.configuration import QemuVMConfiguration, QemuGPU
from aleph.vm.controllers.qemu.instance import logger


Expand All @@ -28,6 +28,7 @@ class QemuVM:
interface_name: str
qemu_process: Process | None = None
host_volumes: list[HostVolume]
gpus: list[QemuGPU]
journal_stdout: TextIO | None
journal_stderr: TextIO | None

Expand Down Expand Up @@ -55,6 +56,7 @@ def __init__(self, vm_hash, config: QemuVMConfiguration):
)
for volume in config.host_volumes
]
self.gpus = config.gpus

@property
def _journal_stdout_name(self) -> str:
Expand Down Expand Up @@ -102,21 +104,23 @@ async def start(
# Tell to put the output to std fd, so we can include them in the log
"-serial",
"stdio",
# Use host-phys-bits-limit argument for GPU support. TODO: Investigate how to get the correct bits size
#
"-cpu",
"host,host-phys-bits-limit=0x28"
# Uncomment for debug
# "-serial", "telnet:localhost:4321,server,nowait",
# "-snapshot", # Do not save anything to disk
]
for volume in self.host_volumes:
args += [
"-drive",
f"file={volume.path_on_host},format=raw,readonly={'on' if volume.read_only else 'off'},media=disk,if=virtio",
]
if self.interface_name:
# script=no, downscript=no tell qemu not to try to set up the network itself
args += ["-net", "nic,model=virtio", "-net", f"tap,ifname={self.interface_name},script=no,downscript=no"]

if self.cloud_init_drive_path:
args += ["-cdrom", f"{self.cloud_init_drive_path}"]

args += self._get_host_volumes_args()
args += self._get_gpu_args()
print(*args)

self.qemu_process = proc = await asyncio.create_subprocess_exec(
Expand All @@ -131,6 +135,24 @@ async def start(
)
return proc

def _get_host_volumes_args(self):
args = []
for volume in self.host_volumes:
args += [
"-drive",
f"file={volume.path_on_host},format=raw,readonly={'on' if volume.read_only else 'off'},media=disk,if=virtio",
]
return args

def _get_gpu_args(self):
args = []
for gpu in self.gpus:
args += [
"-device",
f"vfio-pci,host={gpu.pci_host},multifunction=on,x-vga=on",
]
return args

def _get_qmpclient(self) -> qmp.QEMUMonitorProtocol | None:
if not (self.qmp_socket_path and self.qmp_socket_path.exists()):
return None
Expand Down
12 changes: 6 additions & 6 deletions src/aleph/vm/hypervisors/qemu_confidential/qemuvm.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,24 +105,24 @@ async def start(
# raise an error and prevent boot. Passing the argument --cpu host instruct the VM to use the same CPU
# model than the host thus the VM's kernel knows which method is used to get random numbers (Intel and
# AMD have different methods) and properly boot.
# Use host-phys-bits-limit argument for GPU support. TODO: Investigate how to get the correct bits size
"-cpu",
"host",
"host,host-phys-bits-limit=0x28"
# Uncomment following for debug
# "-serial", "telnet:localhost:4321,server,nowait",
# "-snapshot", # Do not save anything to disk
]
for volume in self.host_volumes:
args += [
"-drive",
f"file={volume.path_on_host},format=raw,readonly={'on' if volume.read_only else 'off'},media=disk,if=virtio",
]
if self.interface_name:
# script=no, downscript=no tell qemu not to try to set up the network itself
args += ["-net", "nic,model=virtio", "-net", f"tap,ifname={self.interface_name},script=no,downscript=no"]

if self.cloud_init_drive_path:
args += ["-cdrom", f"{self.cloud_init_drive_path}"]

args += self._get_host_volumes_args()
args += self._get_gpu_args()
print(*args)

self.qemu_process = proc = await asyncio.create_subprocess_exec(
*args,
stdin=asyncio.subprocess.DEVNULL,
Expand Down
31 changes: 27 additions & 4 deletions src/aleph/vm/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from collections.abc import Callable, Coroutine
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import List

from aleph_message.models import (
ExecutableContent,
Expand All @@ -15,11 +16,10 @@
from aleph_message.models.execution.environment import HypervisorType

from aleph.vm.conf import settings
from aleph.vm.controllers.firecracker.executable import AlephFirecrackerExecutable
from aleph.vm.controllers.firecracker.executable import AlephFirecrackerExecutable, HostGPU
from aleph.vm.controllers.firecracker.instance import AlephInstanceResources
from aleph.vm.controllers.firecracker.program import (
AlephFirecrackerProgram,
AlephFirecrackerResources,
AlephProgramResources,
)
from aleph.vm.controllers.firecracker.snapshot_manager import SnapshotManager
Expand All @@ -38,6 +38,7 @@
)
from aleph.vm.orchestrator.pubsub import PubSub
from aleph.vm.orchestrator.vm import AlephFirecrackerInstance
from aleph.vm.resources import GpuDevice
from aleph.vm.systemd import SystemDManager
from aleph.vm.utils import create_task_log_exceptions, dumps_for_json

Expand Down Expand Up @@ -69,8 +70,9 @@ class VmExecution:
vm_hash: ItemHash
original: ExecutableContent
message: ExecutableContent
resources: AlephFirecrackerResources | None = None
vm: AlephFirecrackerExecutable | AlephQemuInstance | None = None
resources: AlephProgramResources | AlephInstanceResources | AlephQemuResources | AlephQemuConfidentialInstance | None = None
vm: AlephFirecrackerExecutable | AlephQemuInstance | AlephQemuConfidentialInstance | None = None
gpus: List[HostGPU]

times: VmExecutionTimes

Expand Down Expand Up @@ -202,6 +204,7 @@ async def prepare(self) -> None:
resources = AlephQemuConfidentialResources(self.message, namespace=self.vm_hash)
else:
resources = AlephQemuResources(self.message, namespace=self.vm_hash)
resources.gpus = self.gpus
else:
msg = f"Unknown hypervisor type {self.hypervisor}"
raise ValueError(msg)
Expand All @@ -216,6 +219,26 @@ async def prepare(self) -> None:
self.times.prepared_at = datetime.now(tz=timezone.utc)
self.resources = resources

def prepare_gpus(self, available_gpus: List[GpuDevice]) -> None:
gpus = []
for gpu in self.message.requirements.gpu:
for available_gpu in available_gpus:
if available_gpu.device_id == gpu.device_id:
gpus.append(
HostGPU(
pci_host=available_gpu.pci_host
)
)
break
self.gpus = gpus

def uses_gpu(self, pci_host: str) -> bool:
for gpu in self.gpus:
if gpu.pci_host == pci_host:
return True

return False

def create(
self, vm_id: int, tap_interface: TapInterface | None = None, prepare: bool = True
) -> AlephVmControllerInterface:
Expand Down
19 changes: 16 additions & 3 deletions src/aleph/vm/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,11 @@ async def create_a_vm(
self.executions[vm_hash] = execution

try:
# First assign Host GPUs from the available
execution.prepare_gpus(self.get_available_gpus())
# Prepare VM general Resources and also the GPUs
await execution.prepare()

vm_id = self.get_unique_vm_id()

if self.network:
Expand Down Expand Up @@ -236,6 +240,10 @@ async def load_persistent_executions(self):

if execution.is_running:
# TODO: Improve the way that we re-create running execution
# Load existing GPUs assigned to VMs
for saved_gpu in saved_execution.gpus:
execution.gpus.append(HostGPU(pci_host=saved_gpu.pci_host))
# Load and instantiate the rest of resources and already assigned GPUs
await execution.prepare()
if self.network:
vm_type = VmType.from_message_content(execution.message)
Expand Down Expand Up @@ -288,9 +296,14 @@ def get_instance_executions(self) -> Iterable[VmExecution]:
)
return executions or []

def get_available_gpus(self) -> Iterable[GpuDevice]:
# TODO: Filter already used GPUs on current executions and remove it from available
available_gpus = self.gpus
def get_available_gpus(self) -> List[GpuDevice]:
available_gpus = (
gpu
for gpu in self.gpus
for _, execution in self.executions.items()
if (isinstance(execution.resources, AlephQemuResources) or isinstance(execution.resources, AlephQemuConfidentialResources)) and not execution.uses_device_gpu(gpu.pci_host)
)

return available_gpus or []

def get_executions_by_sender(self, payment_type: PaymentType) -> dict[str, dict[str, list[VmExecution]]]:
Expand Down

0 comments on commit aaaec2a

Please sign in to comment.