Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP][Serve] Enable launching multiple external LB on controller. #4362

Draft
wants to merge 9 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions examples/serve/external-lb.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# SkyServe YAML to run multiple Load Balancers in different region.
#
# Usage:
# 1. Register the hosted zone in Route53.
# 2. Go to your DNS manager and setup name servers to Route53.
# 3. `sky serve up examples/serve/external-lb.yaml`.

name: multi-lb

service:
readiness_probe:
path: /health
initial_delay_seconds: 20
replicas: 2
# TODO(tian): Change the config to a cloud-agnostic way.
route53_hosted_zone: aws.cblmemo.net
external_load_balancers:
- resources:
cloud: aws
region: us-east-2
# cloud: gcp
# region: us-east1
load_balancing_policy: round_robin
- resources:
cloud: aws
region: ap-northeast-1
# cloud: gcp
# region: asia-northeast1
load_balancing_policy: round_robin

resources:
cloud: aws
ports: 8080
cpus: 2+

workdir: examples/serve/http_server

run: python3 server.py
10 changes: 10 additions & 0 deletions sky/serve/constants.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Constants used for SkyServe."""

CONTROLLER_TEMPLATE = 'sky-serve-controller.yaml.j2'
EXTERNAL_LB_TEMPLATE = 'sky-serve-external-load-balancer.yaml.j2'

SKYSERVE_METADATA_DIR = '~/.sky/serve'

Expand All @@ -15,6 +16,11 @@
# Time to wait in seconds for service to register on the controller.
SERVICE_REGISTER_TIMEOUT_SECONDS = 60

# Time to wait in seconds for service to register on the controller with
# external load balancer. We need to wait longer for external load balancer to
# be ready for the ip address of the service.
SERVICE_REGISTER_TIMEOUT_SECONDS_WITH_EXTERNAL_LB = 300

# The time interval in seconds for load balancer to sync with controller. Every
# time the load balancer syncs with controller, it will update all available
# replica ips for each service, also send the number of requests in last query
Expand Down Expand Up @@ -79,9 +85,13 @@
# Default port range start for controller and load balancer. Ports will be
# automatically generated from this start port.
CONTROLLER_PORT_START = 20001
CONTROLLER_PORT_RANGE = '20001-20020'
LOAD_BALANCER_PORT_START = 30001
LOAD_BALANCER_PORT_RANGE = '30001-30020'

# Port for external load balancer.
EXTERNAL_LB_PORT = 8000

# Initial version of service.
INITIAL_VERSION = 1

Expand Down
53 changes: 44 additions & 9 deletions sky/serve/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,12 +174,21 @@ def up(
vars_to_fill,
output_path=controller_file.name)
controller_task = task_lib.Task.from_yaml(controller_file.name)
# TODO(tian): Currently we exposed the controller port to the public
# network, for external load balancer to access. We should implement
# encrypted communication between controller and load balancer, and
# not expose the controller to the public network.
assert task.service is not None
ports_to_open_in_controller = (serve_constants.CONTROLLER_PORT_RANGE
if task.service.external_load_balancers
is not None else
serve_constants.LOAD_BALANCER_PORT_RANGE)
# TODO(tian): Probably run another sky.launch after we get the load
# balancer port from the controller? So we don't need to open so many
# ports here. Or, we should have a nginx traffic control to refuse
# any connection to the unregistered ports.
controller_resources = {
r.copy(ports=[serve_constants.LOAD_BALANCER_PORT_RANGE])
r.copy(ports=[ports_to_open_in_controller])
for r in controller_resources
}
controller_task.set_resources(controller_resources)
Expand Down Expand Up @@ -230,15 +239,15 @@ def up(
assert isinstance(backend, backends.CloudVmRayBackend)
assert isinstance(controller_handle,
backends.CloudVmRayResourceHandle)
returncode, lb_port_payload, _ = backend.run_on_head(
returncode, service_init_payload, _ = backend.run_on_head(
controller_handle,
code,
require_outputs=True,
stream_logs=False)
try:
subprocess_utils.handle_returncode(
returncode, code, 'Failed to wait for service initialization',
lb_port_payload)
service_init_payload)
except exceptions.CommandError:
statuses = backend.get_job_status(controller_handle,
[controller_job_id],
Expand Down Expand Up @@ -267,12 +276,20 @@ def up(
'Failed to spin up the service. Please '
'check the logs above for more details.') from None
else:
lb_port = serve_utils.load_service_initialization_result(
lb_port_payload)
endpoint = backend_utils.get_endpoints(
controller_handle.cluster_name, lb_port,
skip_status_check=True).get(lb_port)
assert endpoint is not None, 'Did not get endpoint for controller.'
service_init_result = (
serve_utils.load_service_initialization_result(
service_init_payload))
if task.service.external_load_balancers is None:
assert isinstance(service_init_result, int)
endpoint = backend_utils.get_endpoints(
controller_handle.cluster_name,
service_init_result,
skip_status_check=True).get(service_init_result)
assert endpoint is not None, (
'Did not get endpoint for controller.')
else:
assert isinstance(service_init_result, str)
endpoint = service_init_result

sky_logging.print(
f'{fore.CYAN}Service name: '
Expand Down Expand Up @@ -320,6 +337,7 @@ def update(
task: sky.Task to update.
service_name: Name of the service.
"""
# TODO(tian): Implement update of external LBs.
_validate_service_task(task)
# Always apply the policy again here, even though it might have been applied
# in the CLI. This is to ensure that we apply the policy to the final DAG
Expand Down Expand Up @@ -584,7 +602,10 @@ def status(
'policy': (Optional[str]) load balancer policy description,
'requested_resources_str': (str) str representation of
requested resources,
'dns_endpoint': (Optional[str]) DNS endpoint,
'replica_info': (List[Dict[str, Any]]) replica information,
'external_lb_info': (Dict[str, Any]) external load balancer
information,
}

Each entry in replica_info has the following fields:
Expand All @@ -600,6 +621,18 @@ def status(
'handle': (ResourceHandle) handle of the replica cluster,
}

Each entry in external_lb_info has the following fields:

.. code-block:: python

{
'lb_id': (int) index of the external load balancer,
'cluster_name': (str) cluster name of the external load balancer,
'region': (str) region of the external load balancer,
'ip': (str) ip of the external load balancer,
'port': (int) port of the external load balancer,
}

For possible service statuses and replica statuses, please refer to
sky.cli.serve_status.

Expand Down Expand Up @@ -695,6 +728,8 @@ def tail_logs(
sky.exceptions.ClusterNotUpError: the sky serve controller is not up.
ValueError: arguments not valid, or failed to tail the logs.
"""
# TODO(tian): Support tail logs for external load balancer. It should be
# similar to tail replica logs.
if isinstance(target, str):
target = serve_utils.ServiceComponent(target)
if not isinstance(target, serve_utils.ServiceComponent):
Expand Down
84 changes: 83 additions & 1 deletion sky/serve/serve_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

import colorama

from sky import global_user_state
from sky import status_lib
from sky.serve import constants
from sky.utils import db_utils

Expand Down Expand Up @@ -58,6 +60,14 @@ def create_table(cursor: 'sqlite3.Cursor', conn: 'sqlite3.Connection') -> None:
service_name TEXT,
spec BLOB,
PRIMARY KEY (service_name, version))""")
cursor.execute("""\
CREATE TABLE IF NOT EXISTS external_load_balancers (
lb_id INTEGER,
service_name TEXT,
cluster_name TEXT,
region TEXT,
port INTEGER,
PRIMARY KEY (service_name, lb_id))""")
conn.commit()


Expand All @@ -76,6 +86,8 @@ def create_table(cursor: 'sqlite3.Cursor', conn: 'sqlite3.Connection') -> None:
db_utils.add_column_to_table(_DB.cursor, _DB.conn, 'services',
'active_versions',
f'TEXT DEFAULT {json.dumps([])!r}')
db_utils.add_column_to_table(_DB.cursor, _DB.conn, 'services', 'dns_endpoint',
'TEXT DEFAULT NULL')
_UNIQUE_CONSTRAINT_FAILED_ERROR_MSG = 'UNIQUE constraint failed: services.name'


Expand Down Expand Up @@ -321,10 +333,19 @@ def set_service_load_balancer_port(service_name: str,
(load_balancer_port, service_name))


def set_service_dns_endpoint(service_name: str, dns_endpoint: str) -> None:
"""Sets the dns endpoint of a service."""
with db_utils.safe_cursor(_DB_PATH) as cursor:
cursor.execute(
"""\
UPDATE services SET
dns_endpoint=(?) WHERE name=(?)""", (dns_endpoint, service_name))


def _get_service_from_row(row) -> Dict[str, Any]:
(current_version, name, controller_job_id, controller_port,
load_balancer_port, status, uptime, policy, _, _, requested_resources_str,
_, active_versions) = row[:13]
_, active_versions, dns_endpoint) = row[:14]
return {
'name': name,
'controller_job_id': controller_job_id,
Expand All @@ -341,6 +362,7 @@ def _get_service_from_row(row) -> Dict[str, Any]:
# integers in json format. This is mainly for display purpose.
'active_versions': json.loads(active_versions),
'requested_resources_str': requested_resources_str,
'dns_endpoint': dns_endpoint,
}


Expand Down Expand Up @@ -538,3 +560,63 @@ def delete_all_versions(service_name: str) -> None:
"""\
DELETE FROM version_specs
WHERE service_name=(?)""", (service_name,))


# === External Load Balancer functions ===
# TODO(tian): Add a status column.
def add_external_load_balancer(service_name: str, lb_id: int, cluster_name: str,
region: str, port: int) -> None:
"""Adds an external load balancer to the database."""
with db_utils.safe_cursor(_DB_PATH) as cursor:
cursor.execute(
"""\
INSERT INTO external_load_balancers
(service_name, lb_id, cluster_name, region, port)
VALUES (?, ?, ?, ?, ?)""",
(service_name, lb_id, cluster_name, region, port))


def _get_external_load_balancer_from_row(row) -> Dict[str, Any]:
lb_id, cluster_name, region, port = row[:4]
lb_cluster_record = global_user_state.get_cluster_from_name(cluster_name)
if (lb_cluster_record is None or
lb_cluster_record['status'] != status_lib.ClusterStatus.UP):
# TODO(tian): We should implement a status for external lbs as well
# and returns a '-' when it is still provisioning.
lb_ip = '-'
else:
lb_ip = lb_cluster_record['handle'].head_ip
if lb_ip is None:
lb_ip = '-'
return {
'lb_id': lb_id,
'cluster_name': cluster_name,
'region': region,
'ip': lb_ip,
'port': port,
}


def get_external_load_balancers(service_name: str) -> List[Dict[str, Any]]:
"""Gets all external load balancers of a service."""
with db_utils.safe_cursor(_DB_PATH) as cursor:
rows = cursor.execute(
"""\
SELECT lb_id, cluster_name, region, port
FROM external_load_balancers
WHERE service_name=(?)""", (service_name,)).fetchall()
external_load_balancers = []
for row in rows:
external_load_balancers.append(
_get_external_load_balancer_from_row(row))
return external_load_balancers


def remove_external_load_balancer(service_name: str, lb_id: int) -> None:
"""Removes an external load balancer from the database."""
with db_utils.safe_cursor(_DB_PATH) as cursor:
cursor.execute(
"""\
DELETE FROM external_load_balancers
WHERE service_name=(?)
AND lb_id=(?)""", (service_name, lb_id))
Loading
Loading