Skip to content

Commit

Permalink
Feature: Allow PAYG on base (#685)
Browse files Browse the repository at this point in the history
INFO: the settings `PAYMENT_RPC_API` has been renamed to `RPC_AVAX`

Problem:
Base chain isn't supported.

Solutions:
adding src/aleph/vm/orchestrator/chain.py to store Available Chains
Display available_payments in status_public_config
Adding checks that the chains sent is in the STREAM_CHAINS
Fix: use chain_info.super_token instead of settings.PAYMENT_SUPER_TOKEN
Update dependency superfluid to aleph-superfluid==0.2.1
Fix: wrong logic in monitor_payments for payg

Co-authored-by: nesitor <[email protected]>
Co-authored-by: Olivier Le Thanh Duong <[email protected]>
  • Loading branch information
3 people authored Sep 3, 2024
1 parent 91d6027 commit 02affa3
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 23 deletions.
2 changes: 1 addition & 1 deletion packaging/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ debian-package-code:
cp ../examples/instance_message_from_aleph.json ./aleph-vm/opt/aleph-vm/examples/instance_message_from_aleph.json
cp -r ../examples/data ./aleph-vm/opt/aleph-vm/examples/data
mkdir -p ./aleph-vm/opt/aleph-vm/examples/volumes
pip3 install --target ./aleph-vm/opt/aleph-vm/ 'aleph-message==0.4.9' 'eth-account==0.10' 'sentry-sdk==1.31.0' 'qmp==1.1.0' 'superfluid==0.2.1' 'sqlalchemy[asyncio]>=2.0' 'aiosqlite==0.19.0' 'alembic==1.13.1' 'aiohttp_cors==0.7.0' 'pyroute2==0.7.12' 'python-cpuid==0.1.0'
pip3 install --target ./aleph-vm/opt/aleph-vm/ 'aleph-message==0.4.9' 'eth-account==0.10' 'sentry-sdk==1.31.0' 'qmp==1.1.0' 'aleph-superfluid~=0.2.1' 'sqlalchemy[asyncio]>=2.0' 'aiosqlite==0.19.0' 'alembic==1.13.1' 'aiohttp_cors==0.7.0' 'pyroute2==0.7.12' 'python-cpuid==0.1.0'
python3 -m compileall ./aleph-vm/opt/aleph-vm/

debian-package-resources: firecracker-bins vmlinux download-ipfs-kubo target/bin/sevctl
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ dependencies = [
"dbus-python==1.3.2",
"systemd-python==235",
"systemd-python==235",
"superfluid~=0.2.1",
"aleph-superfluid~=0.2.1",
"sqlalchemy[asyncio]>=2.0",
"aiosqlite==0.19.0",
"alembic==1.13.1",
Expand Down
30 changes: 17 additions & 13 deletions src/aleph/vm/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@
from subprocess import CalledProcessError, check_output
from typing import Any, Literal, NewType, Optional, Union

from aleph_message.models import Chain
from aleph_message.models.execution.environment import HypervisorType
from pydantic import BaseSettings, Field, HttpUrl
from pydantic.env_settings import DotenvType, env_file_sentinel
from pydantic.typing import StrPath

from aleph.vm.orchestrator.chain import STREAM_CHAINS, ChainInfo
from aleph.vm.utils import (
check_amd_sev_es_supported,
check_amd_sev_supported,
Expand Down Expand Up @@ -224,22 +226,17 @@ class Settings(BaseSettings):
description="Address of the account receiving payments",
)
# This address is the ALEPH SuperToken on SuperFluid Testnet
PAYMENT_SUPER_TOKEN: str = Field(
default="0xc0Fbc4967259786C743361a5885ef49380473dCF", # Mainnet
# default="0x1290248e01ed2f9f863a9752a8aad396ef3a1b00", # Testnet
description="Address of the ALEPH SuperToken on SuperFluid",
)
PAYMENT_PRICING_AGGREGATE: str = "" # TODO: Missing

PAYMENT_RPC_API: HttpUrl = Field(
default="https://api.avax.network/ext/bc/C/rpc",
# default="https://api.avax-test.network/ext/bc/C/rpc",
description="Default to Avalanche Testnet RPC",
# Use to check PAYG payment
RPC_AVAX: HttpUrl = Field(
default=STREAM_CHAINS[Chain.AVAX].rpc,
description="RPC API Endpoint for AVAX chain",
)
PAYMENT_CHAIN_ID: int = Field(
default=43114, # Avalanche Mainnet
# default=43113, # Avalanche Fuji Testnet
description="Avalanche chain ID",

RPC_BASE: HttpUrl = Field(
default=STREAM_CHAINS[Chain.BASE].rpc,
description="RPC API Endpoint for BASE chain",
)

PAYMENT_BUFFER: Decimal = Field(
Expand Down Expand Up @@ -401,6 +398,13 @@ def check(self):

def setup(self):
"""Setup the environment defined by the settings. Call this method after loading the settings."""

# Update chain RPC
STREAM_CHAINS[Chain.AVAX].rpc = str(self.RPC_AVAX)
STREAM_CHAINS[Chain.BASE].rpc = str(self.RPC_BASE)

logger.info(STREAM_CHAINS)

os.makedirs(self.MESSAGE_CACHE, exist_ok=True)
os.makedirs(self.CODE_CACHE, exist_ok=True)
os.makedirs(self.RUNTIME_CACHE, exist_ok=True)
Expand Down
67 changes: 67 additions & 0 deletions src/aleph/vm/orchestrator/chain.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import logging
from typing import Dict, Optional, Union

from aleph_message.models import Chain
from pydantic import BaseModel, root_validator

logger = logging.getLogger(__name__)


class ChainInfo(BaseModel):
"""
A chain information.
"""

chain_id: int
rpc: str
standard_token: Optional[str] = None
super_token: Optional[str] = None
testnet: bool = False
active: bool = True

@property
def token(self) -> Optional[str]:
return self.super_token or self.standard_token

@root_validator(pre=True)
def check_tokens(cls, values):
if not values.get("standard_token") and not values.get("super_token"):
raise ValueError("At least one of standard_token or super_token must be provided.")
return values


STREAM_CHAINS: Dict[Union[Chain, str], ChainInfo] = {
# TESTNETS
"SEPOLIA": ChainInfo(
chain_id=11155111,
rpc="https://eth-sepolia.public.blastapi.io",
standard_token="0xc4bf5cbdabe595361438f8c6a187bdc330539c60",
super_token="0x22064a21fee226d8ffb8818e7627d5ff6d0fc33a",
active=False,
testnet=True,
),
# MAINNETS
Chain.ETH: ChainInfo(
chain_id=1,
rpc="https://eth-mainnet.public.blastapi.io",
standard_token="0x27702a26126e0B3702af63Ee09aC4d1A084EF628",
active=False,
),
Chain.AVAX: ChainInfo(
chain_id=43114,
rpc="https://api.avax.network/ext/bc/C/rpc",
super_token="0xc0Fbc4967259786C743361a5885ef49380473dCF",
),
Chain.BASE: ChainInfo(
chain_id=8453,
rpc="https://base-mainnet.public.blastapi.io",
super_token="0xc0Fbc4967259786C743361a5885ef49380473dCF",
),
}


def get_chain(chain: str) -> ChainInfo:
try:
return STREAM_CHAINS[chain]
except KeyError as error:
raise ValueError(f"Unknown chain id for chain {chain}")
19 changes: 14 additions & 5 deletions src/aleph/vm/orchestrator/payment.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
from aleph.vm.models import VmExecution
from aleph.vm.utils import to_normalized_address

from .chain import ChainInfo, get_chain

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -87,18 +89,25 @@ class InvalidAddressError(ValueError):
pass


async def get_stream(sender: str, receiver: str, chain) -> Decimal:
class InvalidChainError(ValueError):
pass


async def get_stream(sender: str, receiver: str, chain: str) -> Decimal:
"""
Get the stream of the user from the Superfluid API.
See https://community.aleph.im/t/pay-as-you-go-using-superfluid/98/11
"""
chain_id = settings.PAYMENT_CHAIN_ID
superfluid_instance = CFA_V1(settings.PAYMENT_RPC_API, chain_id)
chain_info: ChainInfo = get_chain(chain=chain)
if not chain_info.active:
raise InvalidChainError(f"Chain : {chain} is not active for superfluid")

superfluid_instance = CFA_V1(chain_info.rpc, chain_info.chain_id)

try:
super_token: HexAddress = to_normalized_address(settings.PAYMENT_SUPER_TOKEN)
super_token: HexAddress = to_normalized_address(chain_info.super_token)
except ValueError as error:
raise InvalidAddressError(f"Invalid token address '{settings.PAYMENT_SUPER_TOKEN}' - {error.args}") from error
raise InvalidAddressError(f"Invalid token address '{chain_info.super_token}' - {error.args}") from error

try:
sender_address: HexAddress = to_normalized_address(sender)
Expand Down
2 changes: 1 addition & 1 deletion src/aleph/vm/orchestrator/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ async def monitor_payments(app: web.Application):
required_stream = await compute_required_flow(executions)
logger.debug(f"Required stream for Sender {sender} executions: {required_stream}")
# Stop executions until the required stream is reached
while stream < (required_stream + settings.PAYMENT_BUFFER):
while (stream + settings.PAYMENT_BUFFER) < required_stream:
try:
last_execution = executions.pop(-1)
except IndexError: # Empty list
Expand Down
13 changes: 11 additions & 2 deletions src/aleph/vm/orchestrator/views/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@
from aleph.vm.controllers.firecracker.program import FileTooLargeError
from aleph.vm.hypervisors.firecracker.microvm import MicroVMFailedInitError
from aleph.vm.orchestrator import payment, status
from aleph.vm.orchestrator.chain import STREAM_CHAINS, ChainInfo
from aleph.vm.orchestrator.messages import try_get_message
from aleph.vm.orchestrator.metrics import get_execution_records
from aleph.vm.orchestrator.payment import (
InvalidAddressError,
InvalidChainError,
fetch_execution_flow_price,
get_stream,
)
Expand Down Expand Up @@ -299,6 +301,11 @@ async def status_check_version(request: web.Request):
@cors_allow_all
async def status_public_config(request: web.Request):
"""Expose the public fields from the configuration"""

available_payments = {
str(chain_name): chain_info for chain_name, chain_info in STREAM_CHAINS.items() if chain_info.active
}

return web.json_response(
{
"DOMAIN_NAME": settings.DOMAIN_NAME,
Expand Down Expand Up @@ -329,8 +336,7 @@ async def status_public_config(request: web.Request):
},
"payment": {
"PAYMENT_RECEIVER_ADDRESS": settings.PAYMENT_RECEIVER_ADDRESS,
"PAYMENT_SUPER_TOKEN": settings.PAYMENT_SUPER_TOKEN,
"PAYMENT_CHAIN_ID": settings.PAYMENT_CHAIN_ID,
"AVAILABLE_PAYMENTS": available_payments,
"PAYMENT_MONITOR_INTERVAL": settings.PAYMENT_MONITOR_INTERVAL,
},
"computing": {
Expand Down Expand Up @@ -494,6 +500,9 @@ async def notify_allocation(request: web.Request):
except InvalidAddressError as error:
logger.warning(f"Invalid address {error}", exc_info=True)
return web.HTTPBadRequest(reason=f"Invalid address {error}")
except InvalidChainError as error:
logger.warning(f"Invalid chain {error}", exc_info=True)
return web.HTTPBadRequest(reason=f"Invalid Chain {error}")

if not active_flow:
raise web.HTTPPaymentRequired(reason="Empty payment stream for this instance")
Expand Down

0 comments on commit 02affa3

Please sign in to comment.