diff --git a/packaging/Makefile b/packaging/Makefile index 3c4f8a6b1..a601f1c9a 100644 --- a/packaging/Makefile +++ b/packaging/Makefile @@ -16,7 +16,7 @@ debian-package-code: cp -r ../examples/data ./aleph-vm/opt/aleph-vm/examples/data mkdir -p ./aleph-vm/opt/aleph-vm/examples/volumes # Fixing this protobuf dependency version to avoid getting CI errors as version 5.29.0 have this compilation issue - pip3 install --progress-bar off --target ./aleph-vm/opt/aleph-vm/ 'aleph-message==0.5.0' 'eth-account==0.10' 'sentry-sdk==1.31.0' 'qmp==1.1.0' 'aleph-superfluid~=0.2.1' 'sqlalchemy[asyncio]>=2.0' 'aiosqlite==0.19.0' 'alembic==1.13.1' 'aiohttp_cors==0.7.0' 'pyroute2==0.7.12' 'python-cpuid==0.1.0' 'solathon==1.0.2' 'protobuf==5.28.3' + pip3 install --progress-bar off --target ./aleph-vm/opt/aleph-vm/ 'aleph-message==0.6' 'eth-account==0.10' 'sentry-sdk==1.31.0' 'qmp==1.1.0' 'aleph-superfluid~=0.2.1' 'sqlalchemy[asyncio]>=2.0' 'aiosqlite==0.19.0' 'alembic==1.13.1' 'aiohttp_cors==0.7.0' 'pyroute2==0.7.12' 'python-cpuid==0.1.0' 'solathon==1.0.2' 'protobuf==5.28.3' python3 -m compileall ./aleph-vm/opt/aleph-vm/ debian-package-resources: firecracker-bins vmlinux download-ipfs-kubo target/bin/sevctl diff --git a/pyproject.toml b/pyproject.toml index faebfb9a4..1c934ff1f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,7 +36,7 @@ dependencies = [ "aioredis==1.3.1", "aiosqlite==0.19", "alembic==1.13.1", - "aleph-message==0.5", + "aleph-message==0.6", "aleph-superfluid~=0.2.1", "dbus-python==1.3.2", "eth-account~=0.10", diff --git a/src/aleph/vm/controllers/configuration.py b/src/aleph/vm/controllers/configuration.py index da10d8395..fb4b4ff1f 100644 --- a/src/aleph/vm/controllers/configuration.py +++ b/src/aleph/vm/controllers/configuration.py @@ -23,6 +23,10 @@ class QemuVMHostVolume(BaseModel): read_only: bool +class QemuGPU(BaseModel): + pci_host: str + + class QemuVMConfiguration(BaseModel): qemu_bin_path: str cloud_init_drive_path: str | None @@ -33,6 +37,7 @@ class QemuVMConfiguration(BaseModel): mem_size_mb: int interface_name: str | None host_volumes: list[QemuVMHostVolume] + gpus: list[QemuGPU] class QemuConfidentialVMConfiguration(BaseModel): @@ -45,6 +50,7 @@ class QemuConfidentialVMConfiguration(BaseModel): mem_size_mb: int interface_name: str | None host_volumes: list[QemuVMHostVolume] + gpus: list[QemuGPU] ovmf_path: Path sev_session_file: Path sev_dh_cert_file: Path diff --git a/src/aleph/vm/controllers/qemu/instance.py b/src/aleph/vm/controllers/qemu/instance.py index dd840e22b..259f84744 100644 --- a/src/aleph/vm/controllers/qemu/instance.py +++ b/src/aleph/vm/controllers/qemu/instance.py @@ -5,7 +5,7 @@ from asyncio import Task from asyncio.subprocess import Process from pathlib import Path -from typing import Generic, TypeVar +from typing import Generic, List, TypeVar import psutil from aleph_message.models import ItemHash @@ -17,6 +17,7 @@ from aleph.vm.controllers.configuration import ( Configuration, HypervisorType, + QemuGPU, QemuVMConfiguration, QemuVMHostVolume, save_controller_configuration, @@ -29,6 +30,7 @@ from aleph.vm.controllers.qemu.cloudinit import CloudInitMixin from aleph.vm.network.firewall import teardown_nftables_for_vm from aleph.vm.network.interfaces import TapInterface +from aleph.vm.resources import HostGPU from aleph.vm.storage import get_rootfs_base_path from aleph.vm.utils import HostNotFoundError, ping, run_in_subprocess @@ -36,6 +38,8 @@ class AlephQemuResources(AlephFirecrackerResources): + gpus: List[HostGPU] = [] + async def download_runtime(self) -> None: volume = self.message_content.rootfs parent_image_path = await get_rootfs_base_path(volume.parent.ref) @@ -200,6 +204,7 @@ async def configure(self): ) for volume in self.resources.volumes ], + gpus=[QemuGPU(pci_host=gpu.pci_host) for gpu in self.resources.gpus], ) configuration = Configuration( diff --git a/src/aleph/vm/controllers/qemu_confidential/instance.py b/src/aleph/vm/controllers/qemu_confidential/instance.py index f432cff69..37986b10c 100644 --- a/src/aleph/vm/controllers/qemu_confidential/instance.py +++ b/src/aleph/vm/controllers/qemu_confidential/instance.py @@ -13,6 +13,7 @@ Configuration, HypervisorType, QemuConfidentialVMConfiguration, + QemuGPU, QemuVMHostVolume, save_controller_configuration, ) @@ -126,6 +127,7 @@ async def configure(self): ) for volume in self.resources.volumes ], + gpus=[QemuGPU(pci_host=gpu.pci_host) for gpu in self.resources.gpus], ) configuration = Configuration( diff --git a/src/aleph/vm/hypervisors/qemu/qemuvm.py b/src/aleph/vm/hypervisors/qemu/qemuvm.py index 5949fbdc4..36003e595 100644 --- a/src/aleph/vm/hypervisors/qemu/qemuvm.py +++ b/src/aleph/vm/hypervisors/qemu/qemuvm.py @@ -7,7 +7,7 @@ import qmp from systemd import journal -from aleph.vm.controllers.configuration import QemuVMConfiguration +from aleph.vm.controllers.configuration import QemuGPU, QemuVMConfiguration from aleph.vm.controllers.qemu.instance import logger @@ -28,6 +28,7 @@ class QemuVM: interface_name: str qemu_process: Process | None = None host_volumes: list[HostVolume] + gpus: list[QemuGPU] journal_stdout: TextIO | None journal_stderr: TextIO | None @@ -55,6 +56,7 @@ def __init__(self, vm_hash, config: QemuVMConfiguration): ) for volume in config.host_volumes ] + self.gpus = config.gpus @property def _journal_stdout_name(self) -> str: @@ -106,17 +108,15 @@ async def start( # "-serial", "telnet:localhost:4321,server,nowait", # "-snapshot", # Do not save anything to disk ] - for volume in self.host_volumes: - args += [ - "-drive", - f"file={volume.path_on_host},format=raw,readonly={'on' if volume.read_only else 'off'},media=disk,if=virtio", - ] if self.interface_name: # script=no, downscript=no tell qemu not to try to set up the network itself args += ["-net", "nic,model=virtio", "-net", f"tap,ifname={self.interface_name},script=no,downscript=no"] if self.cloud_init_drive_path: args += ["-cdrom", f"{self.cloud_init_drive_path}"] + + args += self._get_host_volumes_args() + args += self._get_gpu_args() print(*args) self.qemu_process = proc = await asyncio.create_subprocess_exec( @@ -131,6 +131,28 @@ async def start( ) return proc + def _get_host_volumes_args(self): + args = [] + for volume in self.host_volumes: + args += [ + "-drive", + f"file={volume.path_on_host},format=raw,readonly={'on' if volume.read_only else 'off'},media=disk,if=virtio", + ] + return args + + def _get_gpu_args(self): + args = [ + # Use host-phys-bits-limit argument for GPU support. TODO: Investigate how to get the correct bits size + "-cpu", + "host,host-phys-bits-limit=0x28", + ] + for gpu in self.gpus: + args += [ + "-device", + f"vfio-pci,host={gpu.pci_host},multifunction=on,x-vga=on", + ] + return args + def _get_qmpclient(self) -> qmp.QEMUMonitorProtocol | None: if not (self.qmp_socket_path and self.qmp_socket_path.exists()): return None diff --git a/src/aleph/vm/hypervisors/qemu_confidential/qemuvm.py b/src/aleph/vm/hypervisors/qemu_confidential/qemuvm.py index 5e32e8990..1ef33e407 100644 --- a/src/aleph/vm/hypervisors/qemu_confidential/qemuvm.py +++ b/src/aleph/vm/hypervisors/qemu_confidential/qemuvm.py @@ -105,24 +105,24 @@ async def start( # raise an error and prevent boot. Passing the argument --cpu host instruct the VM to use the same CPU # model than the host thus the VM's kernel knows which method is used to get random numbers (Intel and # AMD have different methods) and properly boot. + # Use host-phys-bits-limit argument for GPU support. TODO: Investigate how to get the correct bits size "-cpu", - "host", + "host,host-phys-bits-limit=0x28", # Uncomment following for debug # "-serial", "telnet:localhost:4321,server,nowait", # "-snapshot", # Do not save anything to disk ] - for volume in self.host_volumes: - args += [ - "-drive", - f"file={volume.path_on_host},format=raw,readonly={'on' if volume.read_only else 'off'},media=disk,if=virtio", - ] if self.interface_name: # script=no, downscript=no tell qemu not to try to set up the network itself args += ["-net", "nic,model=virtio", "-net", f"tap,ifname={self.interface_name},script=no,downscript=no"] if self.cloud_init_drive_path: args += ["-cdrom", f"{self.cloud_init_drive_path}"] + + args += self._get_host_volumes_args() + args += self._get_gpu_args() print(*args) + self.qemu_process = proc = await asyncio.create_subprocess_exec( *args, stdin=asyncio.subprocess.DEVNULL, diff --git a/src/aleph/vm/models.py b/src/aleph/vm/models.py index 9aee9320a..7dd59091b 100644 --- a/src/aleph/vm/models.py +++ b/src/aleph/vm/models.py @@ -1,10 +1,12 @@ import asyncio +import json import logging import uuid from asyncio import Task from collections.abc import Callable, Coroutine from dataclasses import dataclass from datetime import datetime, timezone +from typing import List from aleph_message.models import ( ExecutableContent, @@ -12,14 +14,14 @@ ItemHash, ProgramContent, ) -from aleph_message.models.execution.environment import HypervisorType +from aleph_message.models.execution.environment import GpuProperties, HypervisorType +from pydantic.json import pydantic_encoder from aleph.vm.conf import settings from aleph.vm.controllers.firecracker.executable import AlephFirecrackerExecutable from aleph.vm.controllers.firecracker.instance import AlephInstanceResources from aleph.vm.controllers.firecracker.program import ( AlephFirecrackerProgram, - AlephFirecrackerResources, AlephProgramResources, ) from aleph.vm.controllers.firecracker.snapshot_manager import SnapshotManager @@ -38,6 +40,7 @@ ) from aleph.vm.orchestrator.pubsub import PubSub from aleph.vm.orchestrator.vm import AlephFirecrackerInstance +from aleph.vm.resources import GpuDevice, HostGPU from aleph.vm.systemd import SystemDManager from aleph.vm.utils import create_task_log_exceptions, dumps_for_json @@ -69,8 +72,11 @@ class VmExecution: vm_hash: ItemHash original: ExecutableContent message: ExecutableContent - resources: AlephFirecrackerResources | None = None - vm: AlephFirecrackerExecutable | AlephQemuInstance | None = None + resources: ( + AlephProgramResources | AlephInstanceResources | AlephQemuResources | AlephQemuConfidentialInstance | None + ) = None + vm: AlephFirecrackerExecutable | AlephQemuInstance | AlephQemuConfidentialInstance | None = None + gpus: List[HostGPU] = [] times: VmExecutionTimes @@ -202,6 +208,7 @@ async def prepare(self) -> None: resources = AlephQemuConfidentialResources(self.message, namespace=self.vm_hash) else: resources = AlephQemuResources(self.message, namespace=self.vm_hash) + resources.gpus = self.gpus else: msg = f"Unknown hypervisor type {self.hypervisor}" raise ValueError(msg) @@ -216,6 +223,24 @@ async def prepare(self) -> None: self.times.prepared_at = datetime.now(tz=timezone.utc) self.resources = resources + def prepare_gpus(self, available_gpus: List[GpuDevice]) -> None: + gpus = [] + if self.message.requirements and self.message.requirements.gpu: + for gpu in self.message.requirements.gpu: + gpu = GpuProperties.parse_obj(gpu) + for available_gpu in available_gpus: + if available_gpu.device_id == gpu.device_id: + gpus.append(HostGPU(pci_host=available_gpu.pci_host)) + break + self.gpus = gpus + + def uses_gpu(self, pci_host: str) -> bool: + for gpu in self.gpus: + if gpu.pci_host == pci_host: + return True + + return False + def create( self, vm_id: int, tap_interface: TapInterface | None = None, prepare: bool = True ) -> AlephVmControllerInterface: @@ -437,6 +462,7 @@ async def save(self): message=self.message.json(), original_message=self.original.json(), persistent=self.persistent, + gpus=json.dumps(self.gpus, default=pydantic_encoder), ) ) diff --git a/src/aleph/vm/orchestrator/chain.py b/src/aleph/vm/orchestrator/chain.py index 7321aa458..0b4174397 100644 --- a/src/aleph/vm/orchestrator/chain.py +++ b/src/aleph/vm/orchestrator/chain.py @@ -60,9 +60,13 @@ def check_tokens(cls, values): } +class InvalidChainError(ValueError): + pass + + def get_chain(chain: str) -> ChainInfo: try: return STREAM_CHAINS[chain] except KeyError: msg = f"Unknown chain id for chain {chain}" - raise ValueError(msg) + raise InvalidChainError(msg) diff --git a/src/aleph/vm/orchestrator/cli.py b/src/aleph/vm/orchestrator/cli.py index bbae396d4..740733e61 100644 --- a/src/aleph/vm/orchestrator/cli.py +++ b/src/aleph/vm/orchestrator/cli.py @@ -167,9 +167,6 @@ async def benchmark(runs: int): """Measure program performance by immediately running the supervisor with fake requests. """ - engine = metrics.setup_engine() - await metrics.create_tables(engine) - ref = ItemHash("cafecafecafecafecafecafecafecafecafecafecafecafecafecafecafecafe") settings.FAKE_DATA_PROGRAM = settings.BENCHMARK_FAKE_DATA_PROGRAM @@ -357,6 +354,10 @@ def main(): settings.check() logger.debug("Initialising the DB...") + # Check and create execution database + engine = metrics.setup_engine() + asyncio.run(metrics.create_tables(engine)) + # After creating it run the DB migrations asyncio.run(run_async_db_migrations()) logger.debug("DB up to date.") diff --git a/src/aleph/vm/orchestrator/metrics.py b/src/aleph/vm/orchestrator/metrics.py index f7f166481..6c9b8eea0 100644 --- a/src/aleph/vm/orchestrator/metrics.py +++ b/src/aleph/vm/orchestrator/metrics.py @@ -76,6 +76,8 @@ class ExecutionRecord(Base): original_message = Column(JSON, nullable=True) persistent = Column(Boolean, nullable=True) + gpus = Column(JSON, nullable=True) + def __repr__(self): return f"" diff --git a/src/aleph/vm/orchestrator/migrations/versions/0002_5c6ae643c69b_add_gpu_column_to_executions_table.py b/src/aleph/vm/orchestrator/migrations/versions/0002_5c6ae643c69b_add_gpu_column_to_executions_table.py new file mode 100644 index 000000000..4b739323b --- /dev/null +++ b/src/aleph/vm/orchestrator/migrations/versions/0002_5c6ae643c69b_add_gpu_column_to_executions_table.py @@ -0,0 +1,38 @@ +"""add gpu table + +Revision ID: 5c6ae643c69b +Revises: bbb12a12372e +Create Date: 2024-12-09 19:40:19.279735 + +""" + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +from sqlalchemy import create_engine +from sqlalchemy.engine import reflection + +from aleph.vm.conf import make_db_url + +revision = "5c6ae643c69b" +down_revision = "bbb12a12372e" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + engine = create_engine(make_db_url()) + inspector = reflection.Inspector.from_engine(engine) + + # The table already exists on most CRNs. + tables = inspector.get_table_names() + if "executions" in tables: + columns = inspector.get_columns("executions") + column_names = [c["name"] for c in columns] + if "gpus" not in column_names: + op.add_column("executions", sa.Column("gpus", sa.JSON(), nullable=True)) + + +def downgrade() -> None: + op.drop_column("executions", "gpus") diff --git a/src/aleph/vm/orchestrator/payment.py b/src/aleph/vm/orchestrator/payment.py index 7194f873a..f5a79bbca 100644 --- a/src/aleph/vm/orchestrator/payment.py +++ b/src/aleph/vm/orchestrator/payment.py @@ -13,7 +13,7 @@ from aleph.vm.models import VmExecution from aleph.vm.utils import to_normalized_address -from .chain import ChainInfo, get_chain +from .chain import ChainInfo, InvalidChainError, get_chain logger = logging.getLogger(__name__) @@ -91,10 +91,6 @@ class InvalidAddressError(ValueError): pass -class InvalidChainError(ValueError): - pass - - async def get_stream(sender: str, receiver: str, chain: str) -> Decimal: """ Get the stream of the user from the Superfluid API. diff --git a/src/aleph/vm/orchestrator/supervisor.py b/src/aleph/vm/orchestrator/supervisor.py index a5ca999a8..ae6436291 100644 --- a/src/aleph/vm/orchestrator/supervisor.py +++ b/src/aleph/vm/orchestrator/supervisor.py @@ -20,7 +20,6 @@ from aleph.vm.sevclient import SevClient from aleph.vm.version import __version__ -from .metrics import create_tables, setup_engine from .resources import about_certificates, about_system_usage from .tasks import ( start_payment_monitoring_task, @@ -151,9 +150,6 @@ def run(): """Run the VM Supervisor.""" settings.check() - engine = setup_engine() - asyncio.run(create_tables(engine)) - loop = asyncio.new_event_loop() pool = VmPool(loop) pool.setup() diff --git a/src/aleph/vm/orchestrator/tasks.py b/src/aleph/vm/orchestrator/tasks.py index 921a2265f..593c5fcdc 100644 --- a/src/aleph/vm/orchestrator/tasks.py +++ b/src/aleph/vm/orchestrator/tasks.py @@ -4,6 +4,7 @@ import math import time from collections.abc import AsyncIterable +from decimal import Decimal from typing import TypeVar import aiohttp @@ -175,10 +176,14 @@ async def monitor_payments(app: web.Application): # Check if the balance held in the wallet is sufficient stream tier resources for sender, chains in pool.get_executions_by_sender(payment_type=PaymentType.superfluid).items(): for chain, executions in chains.items(): - stream = await get_stream(sender=sender, receiver=settings.PAYMENT_RECEIVER_ADDRESS, chain=chain) - logger.debug( - f"Get stream flow from Sender {sender} to Receiver {settings.PAYMENT_RECEIVER_ADDRESS} of {stream}" - ) + try: + stream = await get_stream(sender=sender, receiver=settings.PAYMENT_RECEIVER_ADDRESS, chain=chain) + logger.debug( + f"Get stream flow from Sender {sender} to Receiver {settings.PAYMENT_RECEIVER_ADDRESS} of {stream}" + ) + except ValueError as error: + logger.error(f"Error found getting stream for chain {chain} and sender {sender}: {error}") + continue required_stream = await compute_required_flow(executions) logger.debug(f"Required stream for Sender {sender} executions: {required_stream}") diff --git a/src/aleph/vm/orchestrator/views/__init__.py b/src/aleph/vm/orchestrator/views/__init__.py index 4c9b3866d..9f0d6b32d 100644 --- a/src/aleph/vm/orchestrator/views/__init__.py +++ b/src/aleph/vm/orchestrator/views/__init__.py @@ -347,6 +347,7 @@ async def status_public_config(request: web.Request): "ENABLE_QEMU_SUPPORT": settings.ENABLE_QEMU_SUPPORT, "INSTANCE_DEFAULT_HYPERVISOR": settings.INSTANCE_DEFAULT_HYPERVISOR, "ENABLE_CONFIDENTIAL_COMPUTING": settings.ENABLE_CONFIDENTIAL_COMPUTING, + "ENABLE_GPU_SUPPORT": settings.ENABLE_GPU_SUPPORT, }, }, dumps=dumps_for_json, @@ -477,10 +478,14 @@ async def notify_allocation(request: web.Request): payment_type = message.content.payment and message.content.payment.type or PaymentType.hold is_confidential = message.content.environment.trusted_execution is not None - - if payment_type == PaymentType.hold and is_confidential: - # At the moment we will allow hold for PAYG - logger.debug("Confidential instance not using PAYG") + have_gpu = message.content.requirements and message.content.requirements.gpu is not None + + if payment_type == PaymentType.hold and (is_confidential or have_gpu): + # Log confidential and instances with GPU support + if is_confidential: + logger.debug(f"Confidential instance {item_hash} not using PAYG") + if have_gpu: + logger.debug(f"GPU Instance {item_hash} not using PAYG") user_balance = await payment.fetch_balance_of_address(message.sender) hold_price = await payment.fetch_execution_hold_price(item_hash) logger.debug(f"Address {message.sender} Balance: {user_balance}, Price: {hold_price}") diff --git a/src/aleph/vm/pool.py b/src/aleph/vm/pool.py index dfc4242ec..1ffabd419 100644 --- a/src/aleph/vm/pool.py +++ b/src/aleph/vm/pool.py @@ -14,12 +14,13 @@ Payment, PaymentType, ) +from pydantic import parse_raw_as from aleph.vm.conf import settings from aleph.vm.controllers.firecracker.snapshot_manager import SnapshotManager from aleph.vm.network.hostnetwork import Network, make_ipv6_allocator from aleph.vm.orchestrator.metrics import get_execution_records -from aleph.vm.resources import GpuDevice, get_gpu_devices +from aleph.vm.resources import GpuDevice, HostGPU, get_gpu_devices from aleph.vm.systemd import SystemDManager from aleph.vm.utils import get_message_executable_content from aleph.vm.vm_type import VmType @@ -112,7 +113,11 @@ async def create_a_vm( self.executions[vm_hash] = execution try: + # First assign Host GPUs from the available + execution.prepare_gpus(self.get_available_gpus()) + # Prepare VM general Resources and also the GPUs await execution.prepare() + vm_id = self.get_unique_vm_id() if self.network: @@ -236,6 +241,9 @@ async def load_persistent_executions(self): if execution.is_running: # TODO: Improve the way that we re-create running execution + # Load existing GPUs assigned to VMs + execution.gpus = parse_raw_as(List[HostGPU], saved_execution.gpus) + # Load and instantiate the rest of resources and already assigned GPUs await execution.prepare() if self.network: vm_type = VmType.from_message_content(execution.message) @@ -288,10 +296,17 @@ def get_instance_executions(self) -> Iterable[VmExecution]: ) return executions or [] - def get_available_gpus(self) -> Iterable[GpuDevice]: - # TODO: Filter already used GPUs on current executions and remove it from available - available_gpus = self.gpus - return available_gpus or [] + def get_available_gpus(self) -> List[GpuDevice]: + available_gpus = [] + for gpu in self.gpus: + used = False + for _, execution in self.executions.items(): + if execution.uses_gpu(gpu.pci_host): + used = True + break + if not used: + available_gpus.append(gpu) + return available_gpus def get_executions_by_sender(self, payment_type: PaymentType) -> dict[str, dict[str, list[VmExecution]]]: """Return all executions of the given type, grouped by sender and by chain.""" diff --git a/src/aleph/vm/resources.py b/src/aleph/vm/resources.py index 5532c2263..767b64906 100644 --- a/src/aleph/vm/resources.py +++ b/src/aleph/vm/resources.py @@ -3,10 +3,21 @@ from typing import List, Optional from aleph_message.models import HashableModel -from pydantic import Extra, Field +from pydantic import BaseModel, Extra, Field + + +class HostGPU(BaseModel): + """Host GPU properties detail.""" + + pci_host: str = Field(description="GPU PCI host address") + + class Config: + extra = Extra.forbid class GpuDeviceClass(str, Enum): + """GPU device class. Look at https://admin.pci-ids.ucw.cz/read/PD/03""" + VGA_COMPATIBLE_CONTROLLER = "0300" _3D_CONTROLLER = "0302"