Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Serve] Not using previously failed replica when retry a failed request #3916

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sky/serve/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
# time the load balancer syncs with controller, it will update all available
# replica ips for each service, also send the number of requests in last query
# interval.
LB_CONTROLLER_SYNC_INTERVAL_SECONDS = 20
LB_CONTROLLER_SYNC_INTERVAL_SECONDS = 10

# The maximum retry times for load balancer for each request. After changing to
# proxy implementation, we do retry for failed requests.
Expand Down
21 changes: 19 additions & 2 deletions sky/serve/load_balancer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import asyncio
import logging
import threading
from typing import Dict, Union
from typing import Dict, Set, Union

import aiohttp
import fastapi
Expand Down Expand Up @@ -160,11 +160,26 @@ async def _proxy_with_retries(
# SkyServe supports serving on Spot Instances. To avoid preemptions
# during request handling, we add a retry here.
retry_cnt = 0
# We keep track of the failed replicas for the current request, because
# we have a global round-robin policy, and if there is a large load,
# all retries for the same request can go to the same replica by chance.
# If the same replica is in `NOT_READY` state but the new state has not
# been synced from the controller, the current request will fail.
#
# We maintain a per-request failed replica set instead of the global
# one to allow multiple requests to still try failed replicas for one
# time, in case that replica is failed by transient network issue.
failed_replica_urls: Set[str] = set()
while True:
retry_cnt += 1
with self._client_pool_lock:
# If all replicas are failed, clear the record and retry them
# again as some of them might be transient networking issues.
if (len(failed_replica_urls) ==
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering how effective the new failed_repica_urls is compared to the original globally increased index? Can we simulate the case when a replica went down and there is a different load and see the success rate / latency?

self._load_balancing_policy.num_ready_replicas()):
Comment on lines +178 to +179
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, is it possible that the num_ready_replicas updated when it is retrying in this fail loop, for example, the replicas scaled down. There should be concurrency issue here.

Should we instead check if not (set(self._load_balancing_policy.ready_replicas) - failed_replica_urls):

failed_replica_urls.clear()
ready_replica_url = self._load_balancing_policy.select_replica(
request)
request, failed_replica_urls)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems if this is the case, we will never retry for a transient network issue. How about we allow retrying on previously failed URLs if retry_cnt has not reached the maximum amount of retries?

If that cause too much overheads for retries, we can probably reduce the interval between retries?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only retry logic in one request and it will still be selected by following requests. This PR is mainly for the case when the replica is in the NOT_READY state in the controller but not synced to LB yet; For the transient network, it will convert back to the READY state soon. Besides, if all possible replicas are in failed_replica_urls, then we will still choose from them.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I am talking about a single request. Should we fail directly for that specific request if there are replicas available but just transient network issue during the time load balancer is sending that request to the replica? I think our original purpose for this retry is to retry on network issues. I am proposing the following to allow retries for the same replica if we have not reach max retry count yet.

if ready_replica_url is None and failed_replica_urls:
    failed_replica_urls = []

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed! PTAL again

if ready_replica_url is None:
response_or_exception = fastapi.HTTPException(
# 503 means that the server is currently
Expand All @@ -184,6 +199,8 @@ async def _proxy_with_retries(
# 499 means a client terminates the connection
# before the server is able to respond.
return fastapi.responses.Response(status_code=499)
assert ready_replica_url is not None
failed_replica_urls.add(ready_replica_url)
# TODO(tian): Fail fast for errors like 404 not found.
if retry_cnt == constants.LB_MAX_RETRY:
if isinstance(response_or_exception, fastapi.HTTPException):
Expand Down
24 changes: 16 additions & 8 deletions sky/serve/load_balancing_policies.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""LoadBalancingPolicy: Policy to select endpoint."""
import random
import typing
from typing import List, Optional
from typing import List, Optional, Set

from sky import sky_logging

Expand All @@ -28,8 +28,9 @@ def __init__(self) -> None:
def set_ready_replicas(self, ready_replicas: List[str]) -> None:
raise NotImplementedError

def select_replica(self, request: 'fastapi.Request') -> Optional[str]:
replica = self._select_replica(request)
def select_replica(self, request: 'fastapi.Request',
disabled_replicas: Set[str]) -> Optional[str]:
replica = self._select_replica(request, disabled_replicas)
if replica is not None:
logger.info(f'Selected replica {replica} '
f'for request {_request_repr(request)}')
Expand All @@ -38,9 +39,13 @@ def select_replica(self, request: 'fastapi.Request') -> Optional[str]:
f'{_request_repr(request)}')
return replica

def num_ready_replicas(self) -> int:
return len(self.ready_replicas)

# TODO(tian): We should have an abstract class for Request to
# compatible with all frameworks.
def _select_replica(self, request: 'fastapi.Request') -> Optional[str]:
def _select_replica(self, request: 'fastapi.Request',
disabled_replicas: Set[str]) -> Optional[str]:
raise NotImplementedError


Expand All @@ -61,10 +66,13 @@ def set_ready_replicas(self, ready_replicas: List[str]) -> None:
self.ready_replicas = ready_replicas
self.index = 0

def _select_replica(self, request: 'fastapi.Request') -> Optional[str]:
def _select_replica(self, request: 'fastapi.Request',
disabled_replicas: Set[str]) -> Optional[str]:
del request # Unused.
if not self.ready_replicas:
return None
ready_replica_url = self.ready_replicas[self.index]
self.index = (self.index + 1) % len(self.ready_replicas)
return ready_replica_url
while True:
ready_replica_url = self.ready_replicas[self.index]
self.index = (self.index + 1) % len(self.ready_replicas)
if ready_replica_url not in disabled_replicas:
return ready_replica_url
Loading