Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Allow User to control their VM #124

Merged
merged 34 commits into from
Jul 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
f771d21
Feature: VmClient
1yam Jun 6, 2024
ac78d53
Fix: Protocol (http/https) should not be hardcoded.
hoh Jun 19, 2024
5672776
Fix: There was no test for `notify_allocation()`.
hoh Jun 19, 2024
301814a
WIP: Copy authentication functions from aleph-vm
hoh Jun 19, 2024
9abb642
Fix: vm client sessions wasn't close + authentifications for test wil…
1yam Jun 19, 2024
6860015
Add: Unit test for {perform_operation, stop, reboot, erase, expire}
1yam Jun 19, 2024
52faf6e
Refactor: logs didn't need to generate full header
1yam Jun 19, 2024
328e087
Add: get_logs test
1yam Jun 19, 2024
5c61b9b
Fix: black in aleph_vm_authentification.py
1yam Jun 19, 2024
a30f690
Fix: fully remove _generate_header call in get_logs
1yam Jun 19, 2024
fa998ae
Fix: black issue
1yam Jun 19, 2024
49c81b5
Fix: test fix workflow
1yam Jun 20, 2024
149778b
feat(vm_client): add missing types annotations
Psycojoker Jun 25, 2024
017bf01
refactor(vm_client): remove duplicated types annotations
Psycojoker Jun 25, 2024
7ec6c42
refactor(vm_client): avoid using single letter variable names
Psycojoker Jun 25, 2024
93dbb22
feat(vm_client): increase test_notify_allocation precision
Psycojoker Jun 25, 2024
f93f202
refactor(vm_client): add empty lines for code readability
Psycojoker Jun 25, 2024
3abcf36
style: run linting:fmt
Psycojoker Jun 25, 2024
ca16c5a
Fix: Required an old version of `aleph-message`
hoh Jun 21, 2024
5162096
Fix: Newer aleph-message requires InstanceEnvironment
hoh Jun 21, 2024
65a0dfe
Fix: Qemu was not the default hypervisor for instances.
hoh Jun 21, 2024
225b42a
Fix: Pythom 3.12 fails setup libsecp256k1
hoh Jun 21, 2024
93bffa9
doc(README): command to launch tests was incorrect
Psycojoker Jun 25, 2024
b53505d
Refactor: create and sign playload goes to utils and some fix
1yam Jun 28, 2024
3c66af0
Fix: linting issue
1yam Jun 28, 2024
0c62cd5
Fix: mypy issue
1yam Jun 28, 2024
0fab7c3
fix: black
1yam Jun 28, 2024
5380da5
feat: use bytes_from_hex where it makes sens
Psycojoker Jul 2, 2024
fc1e6af
chore: use ruff new CLI api
Psycojoker Jul 2, 2024
2f180e3
feat: add unit tests for authentication mechanisms of VmClient
Psycojoker Jul 2, 2024
d271038
fix: debug code remove
1yam Jul 3, 2024
5e88161
Update vmclient.py
1yam Jul 4, 2024
d9b1892
Fix: update unit test to use stream_logs endpoint instead of logs
1yam Jul 4, 2024
247dbfc
Implement `VmConfidentialClient` class (#138)
nesitor Jul 4, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .github/workflows/pytest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@ on:
jobs:
build:
strategy:
fail-fast: false
matrix:
python-version: [ "3.9", "3.10", "3.11", "3.12" ]
python-version: [ "3.9", "3.10", "3.11" ]
# An issue with secp256k1 prevents Python 3.12 from working
# See https://github.com/baking-bad/pytezos/issues/370
runs-on: ubuntu-latest

steps:
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ $ pip install -e .[all]
You can use the test env defined for hatch to run the tests:

```shell
$ hatch run test:run
$ hatch run testing:run
```

See `hatch env show` for more information about all the environments and their scripts.
Expand Down
10 changes: 7 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,16 @@ classifiers = [
]
dependencies = [
"aiohttp>=3.8.3",
"aleph-message~=0.4.4",
"aleph-message>=0.4.7",
"coincurve; python_version<\"3.11\"",
"coincurve>=19.0.0; python_version>=\"3.11\"",
"eth_abi>=4.0.0; python_version>=\"3.11\"",
"eth_account>=0.4.0,<0.11.0",
"jwcrypto==1.5.6",
"python-magic",
"typer",
"typing_extensions",
"aioresponses>=0.7.6"
]

[project.optional-dependencies]
Expand Down Expand Up @@ -122,6 +124,8 @@ dependencies = [
"pytest-cov==4.1.0",
"pytest-mock==3.12.0",
"pytest-asyncio==0.23.5",
"pytest-aiohttp==1.0.5",
"aioresponses==0.7.6",
"fastapi",
"httpx",
"secp256k1",
Expand Down Expand Up @@ -150,13 +154,13 @@ dependencies = [
[tool.hatch.envs.linting.scripts]
typing = "mypy --config-file=pyproject.toml {args:} ./src/ ./tests/ ./examples/"
style = [
"ruff {args:.} ./src/ ./tests/ ./examples/",
"ruff check {args:.} ./src/ ./tests/ ./examples/",
"black --check --diff {args:} ./src/ ./tests/ ./examples/",
"isort --check-only --profile black {args:} ./src/ ./tests/ ./examples/",
]
fmt = [
"black {args:} ./src/ ./tests/ ./examples/",
"ruff --fix {args:.} ./src/ ./tests/ ./examples/",
"ruff check --fix {args:.} ./src/ ./tests/ ./examples/",
"isort --profile black {args:} ./src/ ./tests/ ./examples/",
"style",
]
Expand Down
7 changes: 0 additions & 7 deletions src/aleph/sdk/chains/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,10 +170,3 @@ def get_fallback_private_key(path: Optional[Path] = None) -> bytes:
if not default_key_path.exists():
default_key_path.symlink_to(path)
return private_key


def bytes_from_hex(hex_string: str) -> bytes:
if hex_string.startswith("0x"):
hex_string = hex_string[2:]
hex_string = bytes.fromhex(hex_string)
return hex_string
8 changes: 2 additions & 6 deletions src/aleph/sdk/chains/ethereum.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,8 @@
from eth_keys.exceptions import BadSignature as EthBadSignatureError

from ..exceptions import BadSignatureError
from .common import (
BaseAccount,
bytes_from_hex,
get_fallback_private_key,
get_public_key,
)
from ..utils import bytes_from_hex
from .common import BaseAccount, get_fallback_private_key, get_public_key


class ETHAccount(BaseAccount):
Expand Down
3 changes: 2 additions & 1 deletion src/aleph/sdk/chains/substrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@

from ..conf import settings
from ..exceptions import BadSignatureError
from .common import BaseAccount, bytes_from_hex, get_verification_buffer
from ..utils import bytes_from_hex
from .common import BaseAccount, get_verification_buffer

logger = logging.getLogger(__name__)

Expand Down
10 changes: 6 additions & 4 deletions src/aleph/sdk/client/authenticated_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from aleph_message.models.execution.environment import (
FunctionEnvironment,
HypervisorType,
InstanceEnvironment,
MachineResources,
)
from aleph_message.models.execution.instance import RootfsVolume
Expand Down Expand Up @@ -534,16 +535,17 @@ async def create_instance(
timeout_seconds = timeout_seconds or settings.DEFAULT_VM_TIMEOUT

payment = payment or Payment(chain=Chain.ETH, type=PaymentType.hold)
hypervisor = hypervisor or HypervisorType.firecracker

# Default to the QEMU hypervisor for instances.
selected_hypervisor: HypervisorType = hypervisor or HypervisorType.qemu

content = InstanceContent(
address=address,
allow_amend=allow_amend,
environment=FunctionEnvironment(
reproducible=False,
environment=InstanceEnvironment(
internet=internet,
aleph_api=aleph_api,
hypervisor=hypervisor,
hypervisor=selected_hypervisor,
),
variables=environment_variables,
resources=MachineResources(
Expand Down
192 changes: 192 additions & 0 deletions src/aleph/sdk/client/vm_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
import datetime
import json
import logging
from typing import Any, AsyncGenerator, Dict, List, Optional, Tuple
from urllib.parse import urlparse

import aiohttp
from aleph_message.models import ItemHash
from eth_account.messages import encode_defunct
from jwcrypto import jwk

from aleph.sdk.types import Account
from aleph.sdk.utils import (
create_vm_control_payload,
sign_vm_control_payload,
to_0x_hex,
)

logger = logging.getLogger(__name__)


class VmClient:
account: Account
ephemeral_key: jwk.JWK
node_url: str
pubkey_payload: Dict[str, Any]
pubkey_signature_header: str
session: aiohttp.ClientSession

def __init__(
self,
account: Account,
node_url: str = "",
session: Optional[aiohttp.ClientSession] = None,
):
self.account = account
self.ephemeral_key = jwk.JWK.generate(kty="EC", crv="P-256")
self.node_url = node_url
self.pubkey_payload = self._generate_pubkey_payload()
self.pubkey_signature_header = ""
self.session = session or aiohttp.ClientSession()

def _generate_pubkey_payload(self) -> Dict[str, Any]:
return {
"pubkey": json.loads(self.ephemeral_key.export_public()),
"alg": "ECDSA",
"domain": self.node_domain,
"address": self.account.get_address(),
"expires": (
datetime.datetime.utcnow() + datetime.timedelta(days=1)
).isoformat()
+ "Z",
}

async def _generate_pubkey_signature_header(self) -> str:
pubkey_payload = json.dumps(self.pubkey_payload).encode("utf-8").hex()
signable_message = encode_defunct(hexstr=pubkey_payload)
buffer_to_sign = signable_message.body

signed_message = await self.account.sign_raw(buffer_to_sign)
pubkey_signature = to_0x_hex(signed_message)

return json.dumps(
{
"sender": self.account.get_address(),
"payload": pubkey_payload,
"signature": pubkey_signature,
"content": {"domain": self.node_domain},
}
)

async def _generate_header(
self, vm_id: ItemHash, operation: str, method: str
) -> Tuple[str, Dict[str, str]]:
payload = create_vm_control_payload(
vm_id, operation, domain=self.node_domain, method=method
)
signed_operation = sign_vm_control_payload(payload, self.ephemeral_key)

if not self.pubkey_signature_header:
self.pubkey_signature_header = (
await self._generate_pubkey_signature_header()
)

headers = {
"X-SignedPubKey": self.pubkey_signature_header,
"X-SignedOperation": signed_operation,
}

path = payload["path"]
return f"{self.node_url}{path}", headers

@property
def node_domain(self) -> str:
domain = urlparse(self.node_url).hostname
if not domain:
raise Exception("Could not parse node domain")
return domain

async def perform_operation(
self, vm_id: ItemHash, operation: str, method: str = "POST"
) -> Tuple[Optional[int], str]:
if not self.pubkey_signature_header:
self.pubkey_signature_header = (
await self._generate_pubkey_signature_header()
)

url, header = await self._generate_header(
vm_id=vm_id, operation=operation, method=method
)

try:
async with self.session.request(
method=method, url=url, headers=header
) as response:
response_text = await response.text()
return response.status, response_text

except aiohttp.ClientError as e:
logger.error(f"HTTP error during operation {operation}: {str(e)}")
return None, str(e)

async def get_logs(self, vm_id: ItemHash) -> AsyncGenerator[str, None]:
if not self.pubkey_signature_header:
self.pubkey_signature_header = (
await self._generate_pubkey_signature_header()
)

payload = create_vm_control_payload(
vm_id, "stream_logs", method="get", domain=self.node_domain
)
signed_operation = sign_vm_control_payload(payload, self.ephemeral_key)
path = payload["path"]
ws_url = f"{self.node_url}{path}"

async with self.session.ws_connect(ws_url) as ws:
auth_message = {
"auth": {
"X-SignedPubKey": json.loads(self.pubkey_signature_header),
"X-SignedOperation": json.loads(signed_operation),
}
}
await ws.send_json(auth_message)

async for msg in ws: # msg is of type aiohttp.WSMessage
if msg.type == aiohttp.WSMsgType.TEXT:
yield msg.data
elif msg.type == aiohttp.WSMsgType.ERROR:
break

async def start_instance(self, vm_id: ItemHash) -> Tuple[int, str]:
return await self.notify_allocation(vm_id)

async def stop_instance(self, vm_id: ItemHash) -> Tuple[Optional[int], str]:
return await self.perform_operation(vm_id, "stop")

async def reboot_instance(self, vm_id: ItemHash) -> Tuple[Optional[int], str]:
return await self.perform_operation(vm_id, "reboot")

async def erase_instance(self, vm_id: ItemHash) -> Tuple[Optional[int], str]:
return await self.perform_operation(vm_id, "erase")

async def expire_instance(self, vm_id: ItemHash) -> Tuple[Optional[int], str]:
return await self.perform_operation(vm_id, "expire")

async def notify_allocation(self, vm_id: ItemHash) -> Tuple[int, str]:
json_data = {"instance": vm_id}

async with self.session.post(
f"{self.node_url}/control/allocation/notify", json=json_data
) as session:
form_response_text = await session.text()

return session.status, form_response_text

async def manage_instance(
self, vm_id: ItemHash, operations: List[str]
) -> Tuple[int, str]:
for operation in operations:
status, response = await self.perform_operation(vm_id, operation)
if status != 200 and status:
return status, response
return 200, "All operations completed successfully"

async def close(self):
await self.session.close()

async def __aenter__(self):
return self

async def __aexit__(self, exc_type, exc_value, traceback):
await self.close()
Loading
Loading