diff --git a/packaging/Makefile b/packaging/Makefile index 73cc2328..7e4a395c 100644 --- a/packaging/Makefile +++ b/packaging/Makefile @@ -15,7 +15,7 @@ debian-package-code: cp ../examples/instance_message_from_aleph.json ./aleph-vm/opt/aleph-vm/examples/instance_message_from_aleph.json cp -r ../examples/data ./aleph-vm/opt/aleph-vm/examples/data mkdir -p ./aleph-vm/opt/aleph-vm/examples/volumes - pip3 install --target ./aleph-vm/opt/aleph-vm/ 'aleph-message==0.4.9' 'eth-account==0.10' 'sentry-sdk==1.31.0' 'qmp==1.1.0' 'superfluid==0.2.1' 'sqlalchemy[asyncio]>=2.0' 'aiosqlite==0.19.0' 'alembic==1.13.1' 'aiohttp_cors==0.7.0' 'pyroute2==0.7.12' 'python-cpuid==0.1.0' + pip3 install --target ./aleph-vm/opt/aleph-vm/ 'aleph-message==0.4.9' 'eth-account==0.10' 'sentry-sdk==1.31.0' 'qmp==1.1.0' 'aleph-superfluid~=0.2.1' 'sqlalchemy[asyncio]>=2.0' 'aiosqlite==0.19.0' 'alembic==1.13.1' 'aiohttp_cors==0.7.0' 'pyroute2==0.7.12' 'python-cpuid==0.1.0' python3 -m compileall ./aleph-vm/opt/aleph-vm/ debian-package-resources: firecracker-bins vmlinux download-ipfs-kubo target/bin/sevctl diff --git a/pyproject.toml b/pyproject.toml index 9b1fd06a..aa74d1de 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -44,7 +44,7 @@ dependencies = [ "dbus-python==1.3.2", "systemd-python==235", "systemd-python==235", - "superfluid~=0.2.1", + "aleph-superfluid~=0.2.1", "sqlalchemy[asyncio]>=2.0", "aiosqlite==0.19.0", "alembic==1.13.1", diff --git a/src/aleph/vm/conf.py b/src/aleph/vm/conf.py index 98e3772e..3ef96127 100644 --- a/src/aleph/vm/conf.py +++ b/src/aleph/vm/conf.py @@ -11,11 +11,13 @@ from subprocess import CalledProcessError, check_output from typing import Any, Literal, NewType, Optional, Union +from aleph_message.models import Chain from aleph_message.models.execution.environment import HypervisorType from pydantic import BaseSettings, Field, HttpUrl from pydantic.env_settings import DotenvType, env_file_sentinel from pydantic.typing import StrPath +from aleph.vm.orchestrator.chain import STREAM_CHAINS, ChainInfo from aleph.vm.utils import ( check_amd_sev_es_supported, check_amd_sev_supported, @@ -224,22 +226,17 @@ class Settings(BaseSettings): description="Address of the account receiving payments", ) # This address is the ALEPH SuperToken on SuperFluid Testnet - PAYMENT_SUPER_TOKEN: str = Field( - default="0xc0Fbc4967259786C743361a5885ef49380473dCF", # Mainnet - # default="0x1290248e01ed2f9f863a9752a8aad396ef3a1b00", # Testnet - description="Address of the ALEPH SuperToken on SuperFluid", - ) PAYMENT_PRICING_AGGREGATE: str = "" # TODO: Missing - PAYMENT_RPC_API: HttpUrl = Field( - default="https://api.avax.network/ext/bc/C/rpc", - # default="https://api.avax-test.network/ext/bc/C/rpc", - description="Default to Avalanche Testnet RPC", + # Use to check PAYG payment + RPC_AVAX: HttpUrl = Field( + default=STREAM_CHAINS[Chain.AVAX].rpc, + description="RPC API Endpoint for AVAX chain", ) - PAYMENT_CHAIN_ID: int = Field( - default=43114, # Avalanche Mainnet - # default=43113, # Avalanche Fuji Testnet - description="Avalanche chain ID", + + RPC_BASE: HttpUrl = Field( + default=STREAM_CHAINS[Chain.BASE].rpc, + description="RPC API Endpoint for BASE chain", ) PAYMENT_BUFFER: Decimal = Field( @@ -401,6 +398,13 @@ def check(self): def setup(self): """Setup the environment defined by the settings. Call this method after loading the settings.""" + + # Update chain RPC + STREAM_CHAINS[Chain.AVAX].rpc = str(self.RPC_AVAX) + STREAM_CHAINS[Chain.BASE].rpc = str(self.RPC_BASE) + + logger.info(STREAM_CHAINS) + os.makedirs(self.MESSAGE_CACHE, exist_ok=True) os.makedirs(self.CODE_CACHE, exist_ok=True) os.makedirs(self.RUNTIME_CACHE, exist_ok=True) diff --git a/src/aleph/vm/orchestrator/chain.py b/src/aleph/vm/orchestrator/chain.py new file mode 100644 index 00000000..2cedd816 --- /dev/null +++ b/src/aleph/vm/orchestrator/chain.py @@ -0,0 +1,67 @@ +import logging +from typing import Dict, Optional, Union + +from aleph_message.models import Chain +from pydantic import BaseModel, root_validator + +logger = logging.getLogger(__name__) + + +class ChainInfo(BaseModel): + """ + A chain information. + """ + + chain_id: int + rpc: str + standard_token: Optional[str] = None + super_token: Optional[str] = None + testnet: bool = False + active: bool = True + + @property + def token(self) -> Optional[str]: + return self.super_token or self.standard_token + + @root_validator(pre=True) + def check_tokens(cls, values): + if not values.get("standard_token") and not values.get("super_token"): + raise ValueError("At least one of standard_token or super_token must be provided.") + return values + + +STREAM_CHAINS: Dict[Union[Chain, str], ChainInfo] = { + # TESTNETS + "SEPOLIA": ChainInfo( + chain_id=11155111, + rpc="https://eth-sepolia.public.blastapi.io", + standard_token="0xc4bf5cbdabe595361438f8c6a187bdc330539c60", + super_token="0x22064a21fee226d8ffb8818e7627d5ff6d0fc33a", + active=False, + testnet=True, + ), + # MAINNETS + Chain.ETH: ChainInfo( + chain_id=1, + rpc="https://eth-mainnet.public.blastapi.io", + standard_token="0x27702a26126e0B3702af63Ee09aC4d1A084EF628", + active=False, + ), + Chain.AVAX: ChainInfo( + chain_id=43114, + rpc="https://api.avax.network/ext/bc/C/rpc", + super_token="0xc0Fbc4967259786C743361a5885ef49380473dCF", + ), + Chain.BASE: ChainInfo( + chain_id=8453, + rpc="https://base-mainnet.public.blastapi.io", + super_token="0xc0Fbc4967259786C743361a5885ef49380473dCF", + ), +} + + +def get_chain(chain: str) -> ChainInfo: + try: + return STREAM_CHAINS[chain] + except KeyError as error: + raise ValueError(f"Unknown chain id for chain {chain}") diff --git a/src/aleph/vm/orchestrator/payment.py b/src/aleph/vm/orchestrator/payment.py index 65e642e0..420754ec 100644 --- a/src/aleph/vm/orchestrator/payment.py +++ b/src/aleph/vm/orchestrator/payment.py @@ -14,6 +14,8 @@ from aleph.vm.models import VmExecution from aleph.vm.utils import to_normalized_address +from .chain import ChainInfo, get_chain + logger = logging.getLogger(__name__) @@ -87,18 +89,25 @@ class InvalidAddressError(ValueError): pass -async def get_stream(sender: str, receiver: str, chain) -> Decimal: +class InvalidChainError(ValueError): + pass + + +async def get_stream(sender: str, receiver: str, chain: str) -> Decimal: """ Get the stream of the user from the Superfluid API. See https://community.aleph.im/t/pay-as-you-go-using-superfluid/98/11 """ - chain_id = settings.PAYMENT_CHAIN_ID - superfluid_instance = CFA_V1(settings.PAYMENT_RPC_API, chain_id) + chain_info: ChainInfo = get_chain(chain=chain) + if not chain_info.active: + raise InvalidChainError(f"Chain : {chain} is not active for superfluid") + + superfluid_instance = CFA_V1(chain_info.rpc, chain_info.chain_id) try: - super_token: HexAddress = to_normalized_address(settings.PAYMENT_SUPER_TOKEN) + super_token: HexAddress = to_normalized_address(chain_info.super_token) except ValueError as error: - raise InvalidAddressError(f"Invalid token address '{settings.PAYMENT_SUPER_TOKEN}' - {error.args}") from error + raise InvalidAddressError(f"Invalid token address '{chain_info.super_token}' - {error.args}") from error try: sender_address: HexAddress = to_normalized_address(sender) diff --git a/src/aleph/vm/orchestrator/tasks.py b/src/aleph/vm/orchestrator/tasks.py index fab864a6..3f468785 100644 --- a/src/aleph/vm/orchestrator/tasks.py +++ b/src/aleph/vm/orchestrator/tasks.py @@ -184,7 +184,7 @@ async def monitor_payments(app: web.Application): required_stream = await compute_required_flow(executions) logger.debug(f"Required stream for Sender {sender} executions: {required_stream}") # Stop executions until the required stream is reached - while stream < (required_stream + settings.PAYMENT_BUFFER): + while (stream + settings.PAYMENT_BUFFER) < required_stream: try: last_execution = executions.pop(-1) except IndexError: # Empty list diff --git a/src/aleph/vm/orchestrator/views/__init__.py b/src/aleph/vm/orchestrator/views/__init__.py index 61431446..c99b0e38 100644 --- a/src/aleph/vm/orchestrator/views/__init__.py +++ b/src/aleph/vm/orchestrator/views/__init__.py @@ -24,10 +24,12 @@ from aleph.vm.controllers.firecracker.program import FileTooLargeError from aleph.vm.hypervisors.firecracker.microvm import MicroVMFailedInitError from aleph.vm.orchestrator import payment, status +from aleph.vm.orchestrator.chain import STREAM_CHAINS, ChainInfo from aleph.vm.orchestrator.messages import try_get_message from aleph.vm.orchestrator.metrics import get_execution_records from aleph.vm.orchestrator.payment import ( InvalidAddressError, + InvalidChainError, fetch_execution_flow_price, get_stream, ) @@ -299,6 +301,11 @@ async def status_check_version(request: web.Request): @cors_allow_all async def status_public_config(request: web.Request): """Expose the public fields from the configuration""" + + available_payments = { + str(chain_name): chain_info for chain_name, chain_info in STREAM_CHAINS.items() if chain_info.active + } + return web.json_response( { "DOMAIN_NAME": settings.DOMAIN_NAME, @@ -329,8 +336,7 @@ async def status_public_config(request: web.Request): }, "payment": { "PAYMENT_RECEIVER_ADDRESS": settings.PAYMENT_RECEIVER_ADDRESS, - "PAYMENT_SUPER_TOKEN": settings.PAYMENT_SUPER_TOKEN, - "PAYMENT_CHAIN_ID": settings.PAYMENT_CHAIN_ID, + "AVAILABLE_PAYMENTS": available_payments, "PAYMENT_MONITOR_INTERVAL": settings.PAYMENT_MONITOR_INTERVAL, }, "computing": { @@ -494,6 +500,9 @@ async def notify_allocation(request: web.Request): except InvalidAddressError as error: logger.warning(f"Invalid address {error}", exc_info=True) return web.HTTPBadRequest(reason=f"Invalid address {error}") + except InvalidChainError as error: + logger.warning(f"Invalid chain {error}", exc_info=True) + return web.HTTPBadRequest(reason=f"Invalid Chain {error}") if not active_flow: raise web.HTTPPaymentRequired(reason="Empty payment stream for this instance")