Skip to content

Commit

Permalink
Feature: SCP (Samsung Cloud Platform) Support Contribution (skyplane-…
Browse files Browse the repository at this point in the history
…project#926)

## Overview
This pull request adds support for [SCP (Samsung Cloud Platform)
](https://cloud.samsungsds.com/serviceportal/index.html)by implementing
the necessary code. I have completed the development and testing phases
and followed all the procedures outlined in the contributing guide.

## Details
- **Contributor:** sangjun-kang (aau213)
- https://github.com/skypilot-sds/skyplane/tree/scp-contribution
- **Branch:** scp-contribution

## Progress
- Code development and testing completed
- Issue and discussion registration
- Code review and adjustments
  • Loading branch information
sangjun-kang authored Jan 9, 2024
1 parent 6ef3f4f commit 4602d9c
Show file tree
Hide file tree
Showing 36 changed files with 2,513 additions and 55 deletions.
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,13 @@ Copy https://github.com/skyplane-project/skyplane/blob/main/skyplane/compute/ibm
into `~/.bluemix/ibm_credentials` and fill your
IBM IAM key and credentials to your IBM Cloud object storage
---> For SCP:
$ # Create directory if required
$ mkdir -p ~/.scp
$ # Add the lines for "access_key", "secret_key", and "project_id" to scp_credential file
$ echo "access_key = <your_access_key>" >> ~/.scp/scp_credential
$ echo "secret_key = <your_secret_key>" >> ~/.scp/scp_credential
$ echo "project_id = <your_project_id>" >> ~/.scp/scp_credential
```
After authenticating with each cloud provider, you can run `skyplane init` to create a configuration file for Skyplane.
Expand Down Expand Up @@ -149,6 +156,11 @@ $ skyplane init
Enter the GCP project ID [XXXXXXX]:
GCP region config file saved to /home/ubuntu/.skyplane/gcp_config
(4) Configuring SCP:
Loaded SCP credentials from the scp_credntial file [access key: ...XXXXXX]
SCP region config file saved to /home/ubuntu/.skyplane/scp_config
Config file saved to /home/ubuntu/.skyplane/config
```

Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ aws = ["boto3"]
azure = ["azure-identity", "azure-mgmt-authorization", "azure-mgmt-compute", "azure-mgmt-network", "azure-mgmt-resource", "azure-mgmt-storage", "azure-mgmt-quota", "azure-mgmt-subscription", "azure-storage-blob"]
gcp = ["google-api-python-client", "google-auth", "google-cloud-compute", "google-cloud-storage"]
ibm = ["ibm-cloud-sdk-core", "ibm-cos-sdk", "ibm-vpc"]
scp = ["boto3"]
all = ["boto3", "azure-identity", "azure-mgmt-authorization", "azure-mgmt-compute", "azure-mgmt-network", "azure-mgmt-resource", "azure-mgmt-storage", "azure-mgmt-subscription", "azure-storage-blob", "google-api-python-client", "google-auth", "google-cloud-compute", "google-cloud-storage", "ibm-cloud-sdk-core", "ibm-cos-sdk", "ibm-vpc"]
gateway = ["flask", "lz4", "pynacl", "pyopenssl", "werkzeug"]
solver = ["cvxpy", "graphviz", "matplotlib", "numpy"]
Expand Down
1 change: 1 addition & 0 deletions skyplane/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,6 @@
"AWSConfig",
"AzureConfig",
"GCPConfig",
"SCPConfig",
"TransferHook",
]
5 changes: 4 additions & 1 deletion skyplane/api/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from skyplane.api.pipeline import Pipeline

if TYPE_CHECKING:
from skyplane.api.config import AWSConfig, AzureConfig, GCPConfig, TransferConfig, IBMCloudConfig
from skyplane.api.config import AWSConfig, AzureConfig, GCPConfig, TransferConfig, IBMCloudConfig, SCPConfig


class SkyplaneClient:
Expand All @@ -26,6 +26,7 @@ def __init__(
azure_config: Optional["AzureConfig"] = None,
gcp_config: Optional["GCPConfig"] = None,
ibmcloud_config: Optional["IBMCloudConfig"] = None,
scp_config: Optional["SCPConfig"] = None,
transfer_config: Optional[TransferConfig] = None,
log_dir: Optional[str] = None,
):
Expand All @@ -48,6 +49,7 @@ def __init__(
self.azure_auth = azure_config.make_auth_provider() if azure_config else None
self.gcp_auth = gcp_config.make_auth_provider() if gcp_config else None
self.ibmcloud_auth = ibmcloud_config.make_auth_provider() if ibmcloud_config else None
self.scp_auth = scp_config.make_auth_provider() if scp_config else None
self.transfer_config = transfer_config if transfer_config else TransferConfig()
self.log_dir = (
tmp_log_dir / "transfer_logs" / f"{datetime.now().strftime('%Y%m%d_%H%M%S')}-{uuid.uuid4().hex[:8]}"
Expand All @@ -66,6 +68,7 @@ def __init__(
azure_auth=self.azure_auth,
gcp_auth=self.gcp_auth,
ibmcloud_auth=self.ibmcloud_auth,
scp_auth=self.scp_auth,
)

def pipeline(self, planning_algorithm: Optional[str] = "direct", max_instances: Optional[int] = 1, debug=False):
Expand Down
19 changes: 18 additions & 1 deletion skyplane/api/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from skyplane import compute

from skyplane.config_paths import aws_quota_path, gcp_quota_path, azure_standardDv5_quota_path
from skyplane.config_paths import aws_quota_path, gcp_quota_path, azure_standardDv5_quota_path, scp_quota_path
from pathlib import Path


Expand Down Expand Up @@ -61,6 +61,17 @@ def make_auth_provider(self) -> compute.IBMCloudAuthentication:
# pytype: enable=attribute-error


@dataclass
class SCPConfig(AuthenticationConfig):
scp_access_key: Optional[str] = None
scp_secret_key: Optional[str] = None
scp_project_id: Optional[str] = None
scp_enabled: bool = False

def make_auth_provider(self) -> compute.SCPAuthentication:
return compute.SCPAuthentication(config=self) # type: ignore


@dataclass(frozen=True)
class TransferConfig:
autoterminate_minutes: int = 15
Expand All @@ -82,16 +93,22 @@ class TransferConfig:
azure_use_spot_instances: bool = False
gcp_use_spot_instances: bool = False
ibmcloud_use_spot_instances: bool = False
# Add SCP Support
scp_use_spot_instances: bool = False

aws_instance_class: str = "m5.8xlarge"
azure_instance_class: str = "Standard_D2_v5"
gcp_instance_class: str = "n2-standard-16"
ibmcloud_instance_class: str = "bx2-2x8"
gcp_use_premium_network: bool = True
# Add SCP Support
scp_instance_class: str = "h1v32m128"

aws_vcpu_file: Path = aws_quota_path
gcp_vcpu_file: Path = gcp_quota_path
azure_vcpu_file: Path = azure_standardDv5_quota_path
# Add SCP Support
scp_vcpu_file: Path = scp_quota_path
# TODO: add ibmcloud when the quota info is available

# multipart config
Expand Down
10 changes: 8 additions & 2 deletions skyplane/api/dataplane.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from skyplane.utils import logger
from skyplane.utils.definitions import gateway_docker_image, tmp_log_dir
from skyplane.utils.fn import PathLike, do_parallel
from skyplane.utils.retry import retry_backoff

if TYPE_CHECKING:
from skyplane.api.provisioner import Provisioner
Expand Down Expand Up @@ -156,6 +157,7 @@ def provision(
is_azure_used = any(n.region_tag.startswith("azure:") for n in self.topology.get_gateways())
is_gcp_used = any(n.region_tag.startswith("gcp:") for n in self.topology.get_gateways())
is_ibmcloud_used = any(n.region_tag.startswith("ibmcloud:") for n in self.topology.get_gateways())
is_scp_used = any(n.region_tag.startswith("scp:") for n in self.topology.get_gateways())

# create VMs from the topology
for node in self.topology.get_gateways():
Expand All @@ -172,7 +174,7 @@ def provision(
)

# initialize clouds
self.provisioner.init_global(aws=is_aws_used, azure=is_azure_used, gcp=is_gcp_used, ibmcloud=is_ibmcloud_used)
self.provisioner.init_global(aws=is_aws_used, azure=is_azure_used, gcp=is_gcp_used, ibmcloud=is_ibmcloud_used, scp=is_scp_used)

# provision VMs
uuids = self.provisioner.provision(
Expand Down Expand Up @@ -273,9 +275,13 @@ def deprovision(self, max_jobs: int = 64, spinner: bool = False):
def check_error_logs(self) -> Dict[str, List[str]]:
"""Get the error log from remote gateways if there is any error."""

def http_pool_request(instance):
return self.http_pool.request("GET", f"{instance.gateway_api_url}/api/v1/errors")

def get_error_logs(args):
_, instance = args
reply = self.http_pool.request("GET", f"{instance.gateway_api_url}/api/v1/errors")
# reply = self.http_pool.request("GET", f"{instance.gateway_api_url}/api/v1/errors")
reply = retry_backoff(partial(http_pool_request, instance))
if reply.status != 200:
raise Exception(f"Failed to get error logs from gateway instance {instance.instance_name()}: {reply.data.decode('utf-8')}")
return json.loads(reply.data.decode("utf-8"))["errors"]
Expand Down
2 changes: 2 additions & 0 deletions skyplane/api/obj_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ def create_bucket(self, region: str, bucket_name: str):
return f"s3://{bucket_name}"
elif provider == "gcp":
return f"gs://{bucket_name}"
elif provider == "scp":
return f"scp://{bucket_name}"
else:
raise NotImplementedError(f"Provider {provider} not implemented")

Expand Down
56 changes: 55 additions & 1 deletion skyplane/api/provisioner.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def __init__(
gcp_auth: Optional[compute.GCPAuthentication] = None,
host_uuid: Optional[str] = None,
ibmcloud_auth: Optional[compute.IBMCloudAuthentication] = None,
scp_auth: Optional[compute.SCPAuthentication] = None,
):
"""
:param aws_auth: authentication information for aws
Expand All @@ -64,12 +65,15 @@ def __init__(
:type host_uuid: string
:param ibmcloud_auth: authentication information for aws
:type ibmcloud_auth: compute.IBMCloudAuthentication
:param scp_auth: authentication information for scp
:type scp_auth: compute.SCPAuthentication
"""
self.aws_auth = aws_auth
self.azure_auth = azure_auth
self.gcp_auth = gcp_auth
self.host_uuid = host_uuid
self.ibmcloud_auth = ibmcloud_auth
self.scp_auth = scp_auth
self._make_cloud_providers()
self.temp_nodes: Set[compute.Server] = set() # temporary area to store nodes that should be terminated upon exit
self.pending_provisioner_tasks: List[ProvisionerTask] = []
Expand All @@ -85,8 +89,9 @@ def _make_cloud_providers(self):
self.azure = compute.AzureCloudProvider(auth=self.azure_auth)
self.gcp = compute.GCPCloudProvider(auth=self.gcp_auth)
self.ibmcloud = compute.IBMCloudProvider(auth=self.ibmcloud_auth)
self.scp = compute.SCPCloudProvider(auth=self.scp_auth)

def init_global(self, aws: bool = True, azure: bool = True, gcp: bool = True, ibmcloud: bool = True):
def init_global(self, aws: bool = True, azure: bool = True, gcp: bool = True, ibmcloud: bool = True, scp: bool = True):
"""
Initialize the global cloud providers by configuring with credentials
Expand All @@ -110,6 +115,9 @@ def init_global(self, aws: bool = True, azure: bool = True, gcp: bool = True, ib
jobs.append(self.gcp.setup_global)
if ibmcloud:
jobs.append(self.ibmcloud.setup_global)
if scp:
jobs.append(self.scp.create_ssh_key)
jobs.append(self.scp.setup_global)

do_parallel(lambda fn: fn(), jobs, spinner=False)

Expand Down Expand Up @@ -174,6 +182,10 @@ def _provision_task(self, task: ProvisionerTask):
elif task.cloud_provider == "ibmcloud":
assert self.ibmcloud.auth.enabled(), "IBM Cloud credentials not configured"
server = self.ibmcloud.provision_instance(task.region, task.vm_type, tags=task.tags)
elif task.cloud_provider == "scp":
assert self.scp.auth.enabled(), "SCP credentials not configured"
# print('def _provision_task : ', task.region, task.vm_type, task.tags)
server = self.scp.provision_instance(task.region, task.vm_type, tags=task.tags)
else:
raise NotImplementedError(f"Unknown provider {task.cloud_provider}")
logger.fs.debug(f"[Provisioner._provision_task] Provisioned {server} in {t.elapsed:.2f}s")
Expand Down Expand Up @@ -206,6 +218,8 @@ def provision(self, authorize_firewall: bool = True, max_jobs: int = 16, spinner
azure_provisioned = any([task.cloud_provider == "azure" for task in provision_tasks])
gcp_provisioned = any([task.cloud_provider == "gcp" for task in provision_tasks])
ibmcloud_provisioned = any([task.cloud_provider == "ibmcloud" for task in provision_tasks])
scp_regions = set([task.region for task in provision_tasks if task.cloud_provider == "scp"])
scp_provisioned = any([task.cloud_provider == "scp" for task in provision_tasks])

# configure regions
if aws_provisioned:
Expand All @@ -224,6 +238,25 @@ def provision(self, authorize_firewall: bool = True, max_jobs: int = 16, spinner
)
logger.fs.info(f"[Provisioner.provision] Configured IBM Cloud regions {ibmcloud_regions}")

if scp_provisioned:
logger.fs.info("SCP provisioning may sometimes take several minutes. Please be patient.")
do_parallel(
self.scp.setup_region,
list(set(scp_regions)),
spinner=spinner,
spinner_persist=False,
desc="Configuring SCP regions",
)
# server group create, add provision_tasks on tags(region)
for r in set(scp_regions):
servergroup = self.scp.network.create_server_group(r)
for task in provision_tasks:
if task.cloud_provider == "scp" and task.region == r:
task.tags["servergroup"] = servergroup
# print('provisioner.py - task.tags : ', task.tags)

logger.fs.info(f"[Provisioner.provision] Configured SCP regions {scp_regions}")

# provision VMs
logger.fs.info(f"[Provisioner.provision] Provisioning {len(provision_tasks)} VMs")
results: List[Tuple[ProvisionerTask, compute.Server]] = do_parallel(
Expand Down Expand Up @@ -253,6 +286,18 @@ def authorize_gcp_gateways():
self.gcp_firewall_rules.add(self.gcp.authorize_gateways(public_ips + private_ips))

authorize_ip_jobs.append(authorize_gcp_gateways)
if scp_provisioned:
# configure firewall for each scp region
for r in set(scp_regions):
scp_ips = [s.private_ip() for t, s in results if t.cloud_provider == "scp" and t.region == r]
# vpcids = [s.vpc_id for t, s in results if t.cloud_provider == "scp" and t.region == r] # pytype: disable=bad-return-type
vpcids = [
s.vpc_id if isinstance(s, compute.SCPServer) else None
for t, s in results
if t.cloud_provider == "scp" and t.region == r
]
# print('provisioner.py - scp_ips : ', scp_ips, ', vpcids : ', vpcids)
authorize_ip_jobs.extend([partial(self.scp.add_firewall_rule_all, r, scp_ips, vpcids)])

do_parallel(
lambda fn: fn(),
Expand Down Expand Up @@ -303,6 +348,7 @@ def deprovision_gateway_instance(server: compute.Server):
azure_deprovisioned = any([s.provider == "azure" for s in servers])
gcp_deprovisioned = any([s.provider == "gcp" for s in servers])
ibmcloud_deprovisioned = any([s.provider == "ibmcloud" for s in servers])
scp_deprovisioned = any([s.provider == "scp" for s in servers])
if azure_deprovisioned:
logger.warning("Azure deprovisioning is very slow. Please be patient.")
logger.fs.info(f"[Provisioner.deprovision] Deprovisioning {len(servers)} VMs")
Expand All @@ -327,6 +373,14 @@ def deprovision_gateway_instance(server: compute.Server):
if gcp_deprovisioned:
jobs.extend([partial(self.gcp.remove_gateway_rule, rule) for rule in self.gcp_firewall_rules])
logger.fs.info(f"[Provisioner.deprovision] Deauthorizing GCP gateways with firewalls: {self.gcp_firewall_rules}")
if scp_deprovisioned:
scp_regions = set([s.region() for s in servers if s.provider == "scp"])
for r in set(scp_regions):
scp_servers = [s for s in servers if s.provider == "scp" and s.region() == r]
scp_ips = [s.private_ip() for s in scp_servers]
vpcids = [s.vpc_id for s in scp_servers]
jobs.extend([partial(self.scp.remove_gateway_rule_region, r, scp_ips, vpcids)])
logger.fs.info(f"[Provisioner.deprovision] Deauthorizing SCP gateways with firewalls: {scp_ips}")
do_parallel(
lambda fn: fn(), jobs, n=max_jobs, spinner=spinner, spinner_persist=False, desc="Deauthorizing gateways from firewalls"
)
8 changes: 7 additions & 1 deletion skyplane/api/tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from abc import ABC
from datetime import datetime
from threading import Thread
from functools import partial

import urllib3
from typing import TYPE_CHECKING, Dict, List, Optional, Set
Expand All @@ -16,6 +17,7 @@
from skyplane.utils.fn import do_parallel
from skyplane.api.usage import UsageClient
from skyplane.utils.definitions import GB
from skyplane.utils.retry import retry_backoff

from skyplane.cli.impl.common import print_stats_completed

Expand Down Expand Up @@ -335,10 +337,14 @@ def monitor_transfer(pd, self, region_tag):
def _chunk_to_job_map(self):
return {chunk_id: job_uuid for job_uuid, cr_dict in self.job_chunk_requests.items() for chunk_id in cr_dict.keys()}

def http_pool_request(self, instance):
return self.http_pool.request("GET", f"{instance.gateway_api_url}/api/v1/chunk_status_log")

def _query_chunk_status(self):
def get_chunk_status(args):
node, instance = args
reply = self.http_pool.request("GET", f"{instance.gateway_api_url}/api/v1/chunk_status_log")
# reply = self.http_pool.request("GET", f"{instance.gateway_api_url}/api/v1/chunk_status_log")
reply = retry_backoff(partial(self.http_pool_request, instance))
if reply.status != 200:
raise Exception(
f"Failed to get chunk status from gateway instance {instance.instance_name()}: {reply.data.decode('utf-8')}"
Expand Down
Loading

0 comments on commit 4602d9c

Please sign in to comment.