Skip to content

Commit

Permalink
error message for different lb policy
Browse files Browse the repository at this point in the history
  • Loading branch information
cblmemo committed Dec 10, 2024
1 parent f2f0890 commit c627f2e
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 12 deletions.
11 changes: 11 additions & 0 deletions sky/serve/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,17 @@ def update(
with ux_utils.print_exception_no_traceback():
raise RuntimeError(prompt)

original_lb_policy = service_record['load_balancing_policy']
assert task.service is not None, 'Service section not found.'
if original_lb_policy != task.service.load_balancing_policy:
logger.warning(
f'{colorama.Fore.YELLOW}Current load balancing policy '
f'{original_lb_policy!r} is different from the new policy '
f'{task.service.load_balancing_policy!r}. Updating the load '
'balancing policy is not supported yet and it will be ignored. '
'The service will continue to use the current load balancing '
f'policy.{colorama.Style.RESET_ALL}')

with rich_utils.safe_status(
ux_utils.spinner_message('Initializing service')):
controller_utils.maybe_translate_local_file_mounts_and_sync_up(
Expand Down
16 changes: 13 additions & 3 deletions sky/serve/load_balancing_policies.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
# Define a registry for load balancing policies
LB_POLICIES = {}
DEFAULT_LB_POLICY = None
# Prior to #4439, the default policy was round_robin. We store the legacy
# default policy here to maintain backwards compatibility. Remove this after
# 2 minor release, i.e., 0.9.0.
LEGACY_DEFAULT_POLICY = 'round_robin'


def _request_repr(request: 'fastapi.Request') -> str:
Expand All @@ -40,11 +44,17 @@ def __init_subclass__(cls, name: str, default: bool = False):
DEFAULT_LB_POLICY = name

@classmethod
def make(cls, policy_name: Optional[str] = None) -> 'LoadBalancingPolicy':
"""Create a load balancing policy from a name."""
def make_policy_name(cls, policy_name: Optional[str]) -> str:
"""Return the policy name."""
assert DEFAULT_LB_POLICY is not None, 'No default policy set.'
if policy_name is None:
policy_name = DEFAULT_LB_POLICY
return DEFAULT_LB_POLICY
return policy_name

@classmethod
def make(cls, policy_name: Optional[str] = None) -> 'LoadBalancingPolicy':
"""Create a load balancing policy from a name."""
policy_name = cls.make_policy_name(policy_name)
if policy_name not in LB_POLICIES:
raise ValueError(f'Unknown load balancing policy: {policy_name}')
return LB_POLICIES[policy_name]()
Expand Down
20 changes: 15 additions & 5 deletions sky/serve/serve_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import colorama

from sky.serve import constants
from sky.serve import load_balancing_policies as lb_policies
from sky.utils import db_utils

if typing.TYPE_CHECKING:
Expand Down Expand Up @@ -76,6 +77,8 @@ def create_table(cursor: 'sqlite3.Cursor', conn: 'sqlite3.Connection') -> None:
db_utils.add_column_to_table(_DB.cursor, _DB.conn, 'services',
'active_versions',
f'TEXT DEFAULT {json.dumps([])!r}')
db_utils.add_column_to_table(_DB.cursor, _DB.conn, 'services',
'load_balancing_policy', 'TEXT DEFAULT NULL')
_UNIQUE_CONSTRAINT_FAILED_ERROR_MSG = 'UNIQUE constraint failed: services.name'


Expand Down Expand Up @@ -241,7 +244,8 @@ def from_replica_statuses(


def add_service(name: str, controller_job_id: int, policy: str,
requested_resources_str: str, status: ServiceStatus) -> bool:
requested_resources_str: str, load_balancing_policy: str,
status: ServiceStatus) -> bool:
"""Add a service in the database.
Returns:
Expand All @@ -254,10 +258,10 @@ def add_service(name: str, controller_job_id: int, policy: str,
"""\
INSERT INTO services
(name, controller_job_id, status, policy,
requested_resources_str)
VALUES (?, ?, ?, ?, ?)""",
requested_resources_str, load_balancing_policy)
VALUES (?, ?, ?, ?, ?, ?)""",
(name, controller_job_id, status.value, policy,
requested_resources_str))
requested_resources_str, load_balancing_policy))

except sqlite3.IntegrityError as e:
if str(e) != _UNIQUE_CONSTRAINT_FAILED_ERROR_MSG:
Expand Down Expand Up @@ -324,7 +328,12 @@ def set_service_load_balancer_port(service_name: str,
def _get_service_from_row(row) -> Dict[str, Any]:
(current_version, name, controller_job_id, controller_port,
load_balancer_port, status, uptime, policy, _, _, requested_resources_str,
_, active_versions) = row[:13]
_, active_versions, load_balancing_policy) = row[:14]
if load_balancing_policy is None:
# This entry in database was added in #4439, and it will always be set
# to a str value. If it is None, it means it is an legacy entry and is
# using the legacy default policy.
load_balancing_policy = lb_policies.LEGACY_DEFAULT_POLICY
return {
'name': name,
'controller_job_id': controller_job_id,
Expand All @@ -341,6 +350,7 @@ def _get_service_from_row(row) -> Dict[str, Any]:
# integers in json format. This is mainly for display purpose.
'active_versions': json.loads(active_versions),
'requested_resources_str': requested_resources_str,
'load_balancing_policy': load_balancing_policy,
}


Expand Down
8 changes: 6 additions & 2 deletions sky/serve/serve_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -811,7 +811,9 @@ def format_service_table(service_records: List[Dict[str, Any]],
'NAME', 'VERSION', 'UPTIME', 'STATUS', 'REPLICAS', 'ENDPOINT'
]
if show_all:
service_columns.extend(['POLICY', 'REQUESTED_RESOURCES'])
service_columns.extend([
'AUTOSCALING_POLICY', 'LOAD_BALANCING_POLICY', 'REQUESTED_RESOURCES'
])
service_table = log_utils.create_table(service_columns)

replica_infos = []
Expand All @@ -832,6 +834,7 @@ def format_service_table(service_records: List[Dict[str, Any]],
endpoint = get_endpoint(record)
policy = record['policy']
requested_resources_str = record['requested_resources_str']
load_balancing_policy = record['load_balancing_policy']

service_values = [
service_name,
Expand All @@ -842,7 +845,8 @@ def format_service_table(service_records: List[Dict[str, Any]],
endpoint,
]
if show_all:
service_values.extend([policy, requested_resources_str])
service_values.extend(
[policy, load_balancing_policy, requested_resources_str])
service_table.add_row(service_values)

replica_table = _format_replica_table(replica_infos, show_all)
Expand Down
1 change: 1 addition & 0 deletions sky/serve/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ def _start(service_name: str, tmp_task_yaml: str, job_id: int):
controller_job_id=job_id,
policy=service_spec.autoscaling_policy_str(),
requested_resources_str=backend_utils.get_task_resources_str(task),
load_balancing_policy=service_spec.load_balancing_policy,
status=serve_state.ServiceStatus.CONTROLLER_INIT)
# Directly throw an error here. See sky/serve/api.py::up
# for more details.
Expand Down
6 changes: 4 additions & 2 deletions sky/serve/service_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from sky import serve
from sky.serve import constants
from sky.serve import load_balancing_policies as lb_policies
from sky.utils import common_utils
from sky.utils import schemas
from sky.utils import ux_utils
Expand Down Expand Up @@ -327,5 +328,6 @@ def use_ondemand_fallback(self) -> bool:
return self._use_ondemand_fallback

@property
def load_balancing_policy(self) -> Optional[str]:
return self._load_balancing_policy
def load_balancing_policy(self) -> str:
return lb_policies.LoadBalancingPolicy.make_policy_name(
self._load_balancing_policy)

0 comments on commit c627f2e

Please sign in to comment.