Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement GPU Support #728

Merged
merged 11 commits into from
Dec 12, 2024
2 changes: 1 addition & 1 deletion packaging/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ debian-package-code:
cp -r ../examples/data ./aleph-vm/opt/aleph-vm/examples/data
mkdir -p ./aleph-vm/opt/aleph-vm/examples/volumes
# Fixing this protobuf dependency version to avoid getting CI errors as version 5.29.0 have this compilation issue
pip3 install --progress-bar off --target ./aleph-vm/opt/aleph-vm/ 'aleph-message==0.5.0' 'eth-account==0.10' 'sentry-sdk==1.31.0' 'qmp==1.1.0' 'aleph-superfluid~=0.2.1' 'sqlalchemy[asyncio]>=2.0' 'aiosqlite==0.19.0' 'alembic==1.13.1' 'aiohttp_cors==0.7.0' 'pyroute2==0.7.12' 'python-cpuid==0.1.0' 'solathon==1.0.2' 'protobuf==5.28.3'
pip3 install --progress-bar off --target ./aleph-vm/opt/aleph-vm/ 'aleph-message@git+https://github.com/aleph-im/aleph-message@andres-feature-add_gpu_requirement' 'eth-account==0.10' 'sentry-sdk==1.31.0' 'qmp==1.1.0' 'aleph-superfluid~=0.2.1' 'sqlalchemy[asyncio]>=2.0' 'aiosqlite==0.19.0' 'alembic==1.13.1' 'aiohttp_cors==0.7.0' 'pyroute2==0.7.12' 'python-cpuid==0.1.0' 'solathon==1.0.2' 'protobuf==5.28.3'
python3 -m compileall ./aleph-vm/opt/aleph-vm/

debian-package-resources: firecracker-bins vmlinux download-ipfs-kubo target/bin/sevctl
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ dependencies = [
"aioredis==1.3.1",
"aiosqlite==0.19",
"alembic==1.13.1",
"aleph-message==0.5",
"aleph-message @ git+https://github.com/aleph-im/aleph-message@andres-feature-add_gpu_requirement",
"aleph-superfluid~=0.2.1",
"dbus-python==1.3.2",
"eth-account~=0.10",
Expand Down
6 changes: 6 additions & 0 deletions src/aleph/vm/controllers/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ class QemuVMHostVolume(BaseModel):
read_only: bool


class QemuGPU(BaseModel):
pci_host: str


class QemuVMConfiguration(BaseModel):
qemu_bin_path: str
cloud_init_drive_path: str | None
Expand All @@ -33,6 +37,7 @@ class QemuVMConfiguration(BaseModel):
mem_size_mb: int
interface_name: str | None
host_volumes: list[QemuVMHostVolume]
gpus: list[QemuGPU]


class QemuConfidentialVMConfiguration(BaseModel):
Expand All @@ -45,6 +50,7 @@ class QemuConfidentialVMConfiguration(BaseModel):
mem_size_mb: int
interface_name: str | None
host_volumes: list[QemuVMHostVolume]
gpus: list[QemuGPU]
ovmf_path: Path
sev_session_file: Path
sev_dh_cert_file: Path
Expand Down
7 changes: 6 additions & 1 deletion src/aleph/vm/controllers/qemu/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from asyncio import Task
from asyncio.subprocess import Process
from pathlib import Path
from typing import Generic, TypeVar
from typing import Generic, List, TypeVar

import psutil
from aleph_message.models import ItemHash
Expand All @@ -17,6 +17,7 @@
from aleph.vm.controllers.configuration import (
Configuration,
HypervisorType,
QemuGPU,
QemuVMConfiguration,
QemuVMHostVolume,
save_controller_configuration,
Expand All @@ -29,13 +30,16 @@
from aleph.vm.controllers.qemu.cloudinit import CloudInitMixin
from aleph.vm.network.firewall import teardown_nftables_for_vm
from aleph.vm.network.interfaces import TapInterface
from aleph.vm.resources import HostGPU
from aleph.vm.storage import get_rootfs_base_path
from aleph.vm.utils import HostNotFoundError, ping, run_in_subprocess

logger = logging.getLogger(__name__)


class AlephQemuResources(AlephFirecrackerResources):
gpus: List[HostGPU] = []

async def download_runtime(self) -> None:
volume = self.message_content.rootfs
parent_image_path = await get_rootfs_base_path(volume.parent.ref)
Expand Down Expand Up @@ -200,6 +204,7 @@ async def configure(self):
)
for volume in self.resources.volumes
],
gpus=[QemuGPU(pci_host=gpu.pci_host) for gpu in self.resources.gpus],
)

configuration = Configuration(
Expand Down
2 changes: 2 additions & 0 deletions src/aleph/vm/controllers/qemu_confidential/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
Configuration,
HypervisorType,
QemuConfidentialVMConfiguration,
QemuGPU,
QemuVMHostVolume,
save_controller_configuration,
)
Expand Down Expand Up @@ -126,6 +127,7 @@ async def configure(self):
)
for volume in self.resources.volumes
],
gpus=[QemuGPU(pci_host=gpu.pci_host) for gpu in self.resources.gpus],
)

configuration = Configuration(
Expand Down
34 changes: 28 additions & 6 deletions src/aleph/vm/hypervisors/qemu/qemuvm.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import qmp
from systemd import journal

from aleph.vm.controllers.configuration import QemuVMConfiguration
from aleph.vm.controllers.configuration import QemuGPU, QemuVMConfiguration
from aleph.vm.controllers.qemu.instance import logger


Expand All @@ -28,6 +28,7 @@
interface_name: str
qemu_process: Process | None = None
host_volumes: list[HostVolume]
gpus: list[QemuGPU]
journal_stdout: TextIO | None
journal_stderr: TextIO | None

Expand Down Expand Up @@ -55,6 +56,7 @@
)
for volume in config.host_volumes
]
self.gpus = config.gpus

@property
def _journal_stdout_name(self) -> str:
Expand Down Expand Up @@ -102,21 +104,23 @@
# Tell to put the output to std fd, so we can include them in the log
"-serial",
"stdio",
# Use host-phys-bits-limit argument for GPU support. TODO: Investigate how to get the correct bits size
#
"-cpu",
"host,host-phys-bits-limit=0x28",
olethanh marked this conversation as resolved.
Show resolved Hide resolved
# Uncomment for debug
# "-serial", "telnet:localhost:4321,server,nowait",
# "-snapshot", # Do not save anything to disk
]
for volume in self.host_volumes:
args += [
"-drive",
f"file={volume.path_on_host},format=raw,readonly={'on' if volume.read_only else 'off'},media=disk,if=virtio",
]
if self.interface_name:
# script=no, downscript=no tell qemu not to try to set up the network itself
args += ["-net", "nic,model=virtio", "-net", f"tap,ifname={self.interface_name},script=no,downscript=no"]

if self.cloud_init_drive_path:
args += ["-cdrom", f"{self.cloud_init_drive_path}"]

args += self._get_host_volumes_args()
args += self._get_gpu_args()
print(*args)

self.qemu_process = proc = await asyncio.create_subprocess_exec(
Expand All @@ -131,6 +135,24 @@
)
return proc

def _get_host_volumes_args(self):
args = []
for volume in self.host_volumes:
args += [
"-drive",
f"file={volume.path_on_host},format=raw,readonly={'on' if volume.read_only else 'off'},media=disk,if=virtio",
]
return args

def _get_gpu_args(self):
args = []
for gpu in self.gpus:
args += [

Check warning on line 150 in src/aleph/vm/hypervisors/qemu/qemuvm.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/hypervisors/qemu/qemuvm.py#L150

Added line #L150 was not covered by tests
"-device",
f"vfio-pci,host={gpu.pci_host},multifunction=on,x-vga=on",
]
return args

def _get_qmpclient(self) -> qmp.QEMUMonitorProtocol | None:
if not (self.qmp_socket_path and self.qmp_socket_path.exists()):
return None
Expand Down
12 changes: 6 additions & 6 deletions src/aleph/vm/hypervisors/qemu_confidential/qemuvm.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,24 +105,24 @@
# raise an error and prevent boot. Passing the argument --cpu host instruct the VM to use the same CPU
# model than the host thus the VM's kernel knows which method is used to get random numbers (Intel and
# AMD have different methods) and properly boot.
# Use host-phys-bits-limit argument for GPU support. TODO: Investigate how to get the correct bits size
"-cpu",
"host",
"host,host-phys-bits-limit=0x28",
# Uncomment following for debug
# "-serial", "telnet:localhost:4321,server,nowait",
# "-snapshot", # Do not save anything to disk
]
for volume in self.host_volumes:
args += [
"-drive",
f"file={volume.path_on_host},format=raw,readonly={'on' if volume.read_only else 'off'},media=disk,if=virtio",
]
if self.interface_name:
# script=no, downscript=no tell qemu not to try to set up the network itself
args += ["-net", "nic,model=virtio", "-net", f"tap,ifname={self.interface_name},script=no,downscript=no"]

if self.cloud_init_drive_path:
args += ["-cdrom", f"{self.cloud_init_drive_path}"]

args += self._get_host_volumes_args()
args += self._get_gpu_args()

Check warning on line 123 in src/aleph/vm/hypervisors/qemu_confidential/qemuvm.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/hypervisors/qemu_confidential/qemuvm.py#L122-L123

Added lines #L122 - L123 were not covered by tests
print(*args)

self.qemu_process = proc = await asyncio.create_subprocess_exec(
*args,
stdin=asyncio.subprocess.DEVNULL,
Expand Down
34 changes: 30 additions & 4 deletions src/aleph/vm/models.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,27 @@
import asyncio
import json
import logging
import uuid
from asyncio import Task
from collections.abc import Callable, Coroutine
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import List

from aleph_message.models import (
ExecutableContent,
InstanceContent,
ItemHash,
ProgramContent,
)
from aleph_message.models.execution.environment import HypervisorType
from aleph_message.models.execution.environment import GpuProperties, HypervisorType
from pydantic.json import pydantic_encoder

from aleph.vm.conf import settings
from aleph.vm.controllers.firecracker.executable import AlephFirecrackerExecutable
from aleph.vm.controllers.firecracker.instance import AlephInstanceResources
from aleph.vm.controllers.firecracker.program import (
AlephFirecrackerProgram,
AlephFirecrackerResources,
AlephProgramResources,
)
from aleph.vm.controllers.firecracker.snapshot_manager import SnapshotManager
Expand All @@ -38,6 +40,7 @@
)
from aleph.vm.orchestrator.pubsub import PubSub
from aleph.vm.orchestrator.vm import AlephFirecrackerInstance
from aleph.vm.resources import GpuDevice, HostGPU
from aleph.vm.systemd import SystemDManager
from aleph.vm.utils import create_task_log_exceptions, dumps_for_json

Expand Down Expand Up @@ -69,8 +72,11 @@
vm_hash: ItemHash
original: ExecutableContent
message: ExecutableContent
resources: AlephFirecrackerResources | None = None
vm: AlephFirecrackerExecutable | AlephQemuInstance | None = None
resources: (
AlephProgramResources | AlephInstanceResources | AlephQemuResources | AlephQemuConfidentialInstance | None
) = None
vm: AlephFirecrackerExecutable | AlephQemuInstance | AlephQemuConfidentialInstance | None = None
gpus: List[HostGPU] = []

times: VmExecutionTimes

Expand Down Expand Up @@ -202,6 +208,7 @@
resources = AlephQemuConfidentialResources(self.message, namespace=self.vm_hash)
else:
resources = AlephQemuResources(self.message, namespace=self.vm_hash)
resources.gpus = self.gpus
else:
msg = f"Unknown hypervisor type {self.hypervisor}"
raise ValueError(msg)
Expand All @@ -216,6 +223,24 @@
self.times.prepared_at = datetime.now(tz=timezone.utc)
self.resources = resources

def prepare_gpus(self, available_gpus: List[GpuDevice]) -> None:
gpus = []

Check warning on line 227 in src/aleph/vm/models.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/models.py#L227

Added line #L227 was not covered by tests
if self.message.requirements and self.message.requirements.gpu:
for gpu in self.message.requirements.gpu:
gpu = GpuProperties.parse_obj(gpu)

Check warning on line 230 in src/aleph/vm/models.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/models.py#L230

Added line #L230 was not covered by tests
for available_gpu in available_gpus:
if available_gpu.device_id == gpu.device_id:
gpus.append(HostGPU(pci_host=available_gpu.pci_host))
break
self.gpus = gpus

Check warning on line 235 in src/aleph/vm/models.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/models.py#L233-L235

Added lines #L233 - L235 were not covered by tests

def uses_gpu(self, pci_host: str) -> bool:
for gpu in self.gpus:
if gpu.pci_host == pci_host:
return True

Check warning on line 240 in src/aleph/vm/models.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/models.py#L240

Added line #L240 was not covered by tests

return False

Check warning on line 242 in src/aleph/vm/models.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/models.py#L242

Added line #L242 was not covered by tests

def create(
self, vm_id: int, tap_interface: TapInterface | None = None, prepare: bool = True
) -> AlephVmControllerInterface:
Expand Down Expand Up @@ -437,6 +462,7 @@
message=self.message.json(),
original_message=self.original.json(),
persistent=self.persistent,
gpus=json.dumps(self.gpus, default=pydantic_encoder),
)
)

Expand Down
2 changes: 2 additions & 0 deletions src/aleph/vm/orchestrator/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ class ExecutionRecord(Base):
original_message = Column(JSON, nullable=True)
persistent = Column(Boolean, nullable=True)

gpus = Column(JSON, nullable=True)
olethanh marked this conversation as resolved.
Show resolved Hide resolved

def __repr__(self):
return f"<ExecutionRecord(uuid={self.uuid}, vm_hash={self.vm_hash}, vm_id={self.vm_id})>"

Expand Down
10 changes: 7 additions & 3 deletions src/aleph/vm/orchestrator/payment.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,13 @@
Get the stream of the user from the Superfluid API.
See https://community.aleph.im/t/pay-as-you-go-using-superfluid/98/11
"""
chain_info: ChainInfo = get_chain(chain=chain)
if not chain_info.active:
msg = f"Chain : {chain} is not active for superfluid"
try:
chain_info: ChainInfo = get_chain(chain=chain)

Check warning on line 104 in src/aleph/vm/orchestrator/payment.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/orchestrator/payment.py#L103-L104

Added lines #L103 - L104 were not covered by tests
if not chain_info.active:
msg = f"Chain : {chain} is not active for superfluid"
raise InvalidChainError(msg)
except ValueError:
msg = f"Chain : {chain} is invalid"

Check warning on line 109 in src/aleph/vm/orchestrator/payment.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/orchestrator/payment.py#L106-L109

Added lines #L106 - L109 were not covered by tests
nesitor marked this conversation as resolved.
Show resolved Hide resolved
raise InvalidChainError(msg)

superfluid_instance = CFA_V1(chain_info.rpc, chain_info.chain_id)
Expand Down
13 changes: 9 additions & 4 deletions src/aleph/vm/orchestrator/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import math
import time
from collections.abc import AsyncIterable
from decimal import Decimal
from typing import TypeVar

import aiohttp
Expand Down Expand Up @@ -175,10 +176,14 @@
# Check if the balance held in the wallet is sufficient stream tier resources
for sender, chains in pool.get_executions_by_sender(payment_type=PaymentType.superfluid).items():
for chain, executions in chains.items():
stream = await get_stream(sender=sender, receiver=settings.PAYMENT_RECEIVER_ADDRESS, chain=chain)
logger.debug(
f"Get stream flow from Sender {sender} to Receiver {settings.PAYMENT_RECEIVER_ADDRESS} of {stream}"
)
try:
stream = await get_stream(sender=sender, receiver=settings.PAYMENT_RECEIVER_ADDRESS, chain=chain)
logger.debug(

Check warning on line 181 in src/aleph/vm/orchestrator/tasks.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/orchestrator/tasks.py#L179-L181

Added lines #L179 - L181 were not covered by tests
f"Get stream flow from Sender {sender} to Receiver {settings.PAYMENT_RECEIVER_ADDRESS} of {stream}"
)
except ValueError as error:
logger.error(f"Error found getting stream for chain {chain} and sender {sender}: {error}")
stream = Decimal(0)

Check warning on line 186 in src/aleph/vm/orchestrator/tasks.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/orchestrator/tasks.py#L184-L186

Added lines #L184 - L186 were not covered by tests
nesitor marked this conversation as resolved.
Show resolved Hide resolved

required_stream = await compute_required_flow(executions)
logger.debug(f"Required stream for Sender {sender} executions: {required_stream}")
Expand Down
1 change: 1 addition & 0 deletions src/aleph/vm/orchestrator/views/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ async def status_public_config(request: web.Request):
"ENABLE_QEMU_SUPPORT": settings.ENABLE_QEMU_SUPPORT,
"INSTANCE_DEFAULT_HYPERVISOR": settings.INSTANCE_DEFAULT_HYPERVISOR,
"ENABLE_CONFIDENTIAL_COMPUTING": settings.ENABLE_CONFIDENTIAL_COMPUTING,
"ENABLE_GPU_SUPPORT": settings.ENABLE_GPU_SUPPORT,
},
},
dumps=dumps_for_json,
Expand Down
Loading
Loading