From 0c58cad7e4c521773a1c204e4578ba78ac3f3d12 Mon Sep 17 00:00:00 2001 From: "Andres D. Molins" Date: Fri, 6 Dec 2024 11:17:35 +0100 Subject: [PATCH] Fix: Solved issue getting already running executions with GPU --- src/aleph/vm/models.py | 3 +++ src/aleph/vm/orchestrator/metrics.py | 2 ++ src/aleph/vm/orchestrator/payment.py | 10 +++++++--- src/aleph/vm/orchestrator/tasks.py | 13 +++++++++---- src/aleph/vm/pool.py | 4 ++-- src/aleph/vm/resources.py | 13 ++++++++----- 6 files changed, 31 insertions(+), 14 deletions(-) diff --git a/src/aleph/vm/models.py b/src/aleph/vm/models.py index b0858d33..7dd59091 100644 --- a/src/aleph/vm/models.py +++ b/src/aleph/vm/models.py @@ -1,4 +1,5 @@ import asyncio +import json import logging import uuid from asyncio import Task @@ -14,6 +15,7 @@ ProgramContent, ) from aleph_message.models.execution.environment import GpuProperties, HypervisorType +from pydantic.json import pydantic_encoder from aleph.vm.conf import settings from aleph.vm.controllers.firecracker.executable import AlephFirecrackerExecutable @@ -460,6 +462,7 @@ async def save(self): message=self.message.json(), original_message=self.original.json(), persistent=self.persistent, + gpus=json.dumps(self.gpus, default=pydantic_encoder), ) ) diff --git a/src/aleph/vm/orchestrator/metrics.py b/src/aleph/vm/orchestrator/metrics.py index f7f16648..6c9b8eea 100644 --- a/src/aleph/vm/orchestrator/metrics.py +++ b/src/aleph/vm/orchestrator/metrics.py @@ -76,6 +76,8 @@ class ExecutionRecord(Base): original_message = Column(JSON, nullable=True) persistent = Column(Boolean, nullable=True) + gpus = Column(JSON, nullable=True) + def __repr__(self): return f"" diff --git a/src/aleph/vm/orchestrator/payment.py b/src/aleph/vm/orchestrator/payment.py index 7194f873..5c074ce0 100644 --- a/src/aleph/vm/orchestrator/payment.py +++ b/src/aleph/vm/orchestrator/payment.py @@ -100,9 +100,13 @@ async def get_stream(sender: str, receiver: str, chain: str) -> Decimal: Get the stream of the user from the Superfluid API. See https://community.aleph.im/t/pay-as-you-go-using-superfluid/98/11 """ - chain_info: ChainInfo = get_chain(chain=chain) - if not chain_info.active: - msg = f"Chain : {chain} is not active for superfluid" + try: + chain_info: ChainInfo = get_chain(chain=chain) + if not chain_info.active: + msg = f"Chain : {chain} is not active for superfluid" + raise InvalidChainError(msg) + except ValueError: + msg = f"Chain : {chain} is invalid" raise InvalidChainError(msg) superfluid_instance = CFA_V1(chain_info.rpc, chain_info.chain_id) diff --git a/src/aleph/vm/orchestrator/tasks.py b/src/aleph/vm/orchestrator/tasks.py index 921a2265..1a0afed8 100644 --- a/src/aleph/vm/orchestrator/tasks.py +++ b/src/aleph/vm/orchestrator/tasks.py @@ -4,6 +4,7 @@ import math import time from collections.abc import AsyncIterable +from decimal import Decimal from typing import TypeVar import aiohttp @@ -175,10 +176,14 @@ async def monitor_payments(app: web.Application): # Check if the balance held in the wallet is sufficient stream tier resources for sender, chains in pool.get_executions_by_sender(payment_type=PaymentType.superfluid).items(): for chain, executions in chains.items(): - stream = await get_stream(sender=sender, receiver=settings.PAYMENT_RECEIVER_ADDRESS, chain=chain) - logger.debug( - f"Get stream flow from Sender {sender} to Receiver {settings.PAYMENT_RECEIVER_ADDRESS} of {stream}" - ) + try: + stream = await get_stream(sender=sender, receiver=settings.PAYMENT_RECEIVER_ADDRESS, chain=chain) + logger.debug( + f"Get stream flow from Sender {sender} to Receiver {settings.PAYMENT_RECEIVER_ADDRESS} of {stream}" + ) + except ValueError as error: + logger.error(f"Error found getting stream for chain {chain} and sender {sender}: {error}") + stream = Decimal(0) required_stream = await compute_required_flow(executions) logger.debug(f"Required stream for Sender {sender} executions: {required_stream}") diff --git a/src/aleph/vm/pool.py b/src/aleph/vm/pool.py index 4268d9ed..1ffabd41 100644 --- a/src/aleph/vm/pool.py +++ b/src/aleph/vm/pool.py @@ -14,6 +14,7 @@ Payment, PaymentType, ) +from pydantic import parse_raw_as from aleph.vm.conf import settings from aleph.vm.controllers.firecracker.snapshot_manager import SnapshotManager @@ -241,8 +242,7 @@ async def load_persistent_executions(self): if execution.is_running: # TODO: Improve the way that we re-create running execution # Load existing GPUs assigned to VMs - for saved_gpu in saved_execution.gpus: - execution.gpus.append(HostGPU(pci_host=saved_gpu.pci_host)) + execution.gpus = parse_raw_as(List[HostGPU], saved_execution.gpus) # Load and instantiate the rest of resources and already assigned GPUs await execution.prepare() if self.network: diff --git a/src/aleph/vm/resources.py b/src/aleph/vm/resources.py index b237dc0e..1d01a2a1 100644 --- a/src/aleph/vm/resources.py +++ b/src/aleph/vm/resources.py @@ -1,15 +1,18 @@ import subprocess -from dataclasses import dataclass from enum import Enum from typing import List, Optional from aleph_message.models import HashableModel -from pydantic import Extra, Field +from pydantic import BaseModel, Extra, Field -@dataclass -class HostGPU: - pci_host: str +class HostGPU(BaseModel): + """Host GPU properties detail.""" + + pci_host: str = Field(description="GPU PCI host address") + + class Config: + extra = Extra.forbid class GpuDeviceClass(str, Enum):