Skip to content

Commit

Permalink
[AWS] Support targeted on-demand capacity reservations (#3852)
Browse files Browse the repository at this point in the history
* wip

Allow prioritize reservations

format

Allow open capacity reservations

Add check reserved resources

format

Remove specific reservations

* parent fcf1f60
author Zhanghao Wu <[email protected]> 1724175607 +0000
committer Zhanghao Wu <[email protected]> 1724210666 +0000

wip

Allow prioritize reservations

format

Add check reserved resources

format

* Support target capacity reservation provisioning

* Fix comments

* Add doc

* format
  • Loading branch information
Michaelvll authored Aug 22, 2024
1 parent 40749e3 commit 1cd2444
Show file tree
Hide file tree
Showing 9 changed files with 257 additions and 16 deletions.
36 changes: 34 additions & 2 deletions docs/source/reference/config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,35 @@ Available fields and semantics:
# Default: false.
disk_encrypted: false
# Reserved capacity (optional).
#
# Whether to prioritize capacity reservations (considered as 0 cost) in the
# optimizer.
#
# If you have capacity reservations in your AWS project:
# Setting this to true guarantees the optimizer will pick any matching
# reservation within all regions and AWS will auto consume your reservations
# with instance match criteria to "open", and setting to false means
# optimizer uses regular, non-zero pricing in optimization (if by chance any
# matching reservation exists, AWS will still consume the reservation).
#
# Note: this setting is default to false for performance reasons, as it can
# take half a minute to retrieve the reservations from AWS when set to true.
#
# Default: false.
prioritize_reservations: false
#
# The targeted capacity reservations (CapacityReservationId) to be
# considered when provisioning clusters on AWS. SkyPilot will automatically
# prioritize this reserved capacity (considered as zero cost) if the
# requested resources matches the reservation.
#
# Ref: https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/capacity-reservations-launch.html
specific_reservations:
- cr-a1234567
- cr-b2345678
# Identity to use for AWS instances (optional).
#
# LOCAL_CREDENTIALS: The user's local credential files will be uploaded to
Expand Down Expand Up @@ -307,13 +336,16 @@ Available fields and semantics:
# Setting this to true guarantees the optimizer will pick any matching
# reservation and GCP will auto consume your reservation, and setting to
# false means optimizer uses regular, non-zero pricing in optimization (if
# by chance any matching reservation is selected, GCP still auto consumes
# the reservation).
# by chance any matching reservation exists, GCP still auto consumes the
# reservation).
#
# If you have "specifically targeted" reservations (set by the
# `specific_reservations` field below): This field will automatically be set
# to true.
#
# Note: this setting is default to false for performance reasons, as it can
# take half a minute to retrieve the reservations from GCP when set to true.
#
# Default: false.
prioritize_reservations: false
#
Expand Down
42 changes: 40 additions & 2 deletions sky/clouds/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import subprocess
import time
import typing
from typing import Any, Dict, Iterator, List, Optional, Tuple
from typing import Any, Dict, Iterator, List, Optional, Set, Tuple

from sky import clouds
from sky import exceptions
Expand All @@ -17,6 +17,7 @@
from sky import skypilot_config
from sky.adaptors import aws
from sky.clouds import service_catalog
from sky.clouds.utils import aws_utils
from sky.skylet import constants
from sky.utils import common_utils
from sky.utils import resources_utils
Expand Down Expand Up @@ -173,6 +174,10 @@ def regions_with_offering(cls, instance_type: str,
regions = [r for r in regions if r.zones]
return regions

@classmethod
def optimize_by_zone(cls) -> bool:
return aws_utils.use_reservations()

@classmethod
def zones_provision_loop(
cls,
Expand All @@ -197,11 +202,13 @@ def zones_provision_loop(
zone=None)
for r in regions:
assert r.zones is not None, r
if num_nodes > 1:
if num_nodes > 1 or aws_utils.use_reservations():
# When num_nodes > 1, we shouldn't pass a list of zones to the
# AWS NodeProvider to try, because it may then place the nodes of
# the same cluster in different zones. This is an artifact of the
# current AWS NodeProvider implementation.
# Also, when using reservations, they are zone-specific, so we
# should return one zone at a time.
for z in r.zones:
yield [z]
else:
Expand Down Expand Up @@ -856,6 +863,37 @@ def check_quota_available(cls,
# Quota found to be greater than zero, try provisioning
return True

def get_reservations_available_resources(
self,
instance_type: str,
region: str,
zone: Optional[str],
specific_reservations: Set[str],
) -> Dict[str, int]:
if zone is None:
# For backward compatibility, the cluster in INIT state launched
# before #2352 may not have zone information. In this case, we
# return 0 for all reservations.
return {reservation: 0 for reservation in specific_reservations}
reservations = aws_utils.list_reservations_for_instance_type(
instance_type, region)

filtered_reservations = []
for r in reservations:
if zone != r.zone:
continue
if r.targeted:
if r.name in specific_reservations:
filtered_reservations.append(r)
else:
filtered_reservations.append(r)
reservation_available_resources = {
r.name: r.available_resources for r in filtered_reservations
}
logger.debug('Get AWS reservations available resources:'
f'{region}-{zone}: {reservation_available_resources}')
return reservation_available_resources

@classmethod
def query_status(cls, name: str, tag_filters: Dict[str, str],
region: Optional[str], zone: Optional[str],
Expand Down
5 changes: 5 additions & 0 deletions sky/clouds/cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,11 @@ def regions_with_offering(cls, instance_type: str,
"""
raise NotImplementedError

@classmethod
def optimize_by_zone(cls) -> bool:
"""Returns whether to optimize this cloud by zone (default: region)."""
return False

@classmethod
def zones_provision_loop(
cls,
Expand Down
4 changes: 4 additions & 0 deletions sky/clouds/gcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,10 @@ def regions_with_offering(cls, instance_type: str,
regions = [r for r in regions if r.zones]
return regions

@classmethod
def optimize_by_zone(cls) -> bool:
return True

@classmethod
def zones_provision_loop(
cls,
Expand Down
57 changes: 57 additions & 0 deletions sky/clouds/utils/aws_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
"""Utilities for AWS."""
import dataclasses
import time
from typing import List

import cachetools

from sky import skypilot_config
from sky.adaptors import aws


@dataclasses.dataclass
class AWSReservation:
name: str
instance_type: str
zone: str
available_resources: int
# Whether the reservation is targeted, i.e. can only be consumed when
# the reservation name is specified.
targeted: bool


def use_reservations() -> bool:
prioritize_reservations = skypilot_config.get_nested(
('aws', 'prioritize_reservations'), False)
specific_reservations = skypilot_config.get_nested(
('aws', 'specific_reservations'), set())
return prioritize_reservations or specific_reservations


@cachetools.cached(cache=cachetools.TTLCache(maxsize=100,
ttl=300,
timer=time.time))
def list_reservations_for_instance_type(
instance_type: str,
region: str,
) -> List[AWSReservation]:
if not use_reservations():
return []
ec2 = aws.client('ec2', region_name=region)
response = ec2.describe_capacity_reservations(Filters=[{
'Name': 'instance-type',
'Values': [instance_type]
}, {
'Name': 'state',
'Values': ['active']
}])
reservations = response['CapacityReservations']
return [
AWSReservation(
name=r['CapacityReservationId'],
instance_type=r['InstanceType'],
zone=r['AvailabilityZone'],
available_resources=r['AvailableInstanceCount'],
targeted=r['InstanceMatchCriteria'] == 'targeted',
) for r in reservations
]
35 changes: 31 additions & 4 deletions sky/optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
from sky.adaptors import common as adaptors_common
from sky.utils import env_options
from sky.utils import log_utils
from sky.utils import rich_utils
from sky.utils import subprocess_utils
from sky.utils import ux_utils

if typing.TYPE_CHECKING:
Expand Down Expand Up @@ -252,6 +254,26 @@ def _estimate_nodes_cost_or_time(
# node -> cloud -> list of resources that satisfy user's requirements.
node_to_candidate_map: _TaskToPerCloudCandidates = {}

def get_available_reservations(
launchable_resources: Dict[resources_lib.Resources,
List[resources_lib.Resources]]
) -> Dict[resources_lib.Resources, int]:
num_available_reserved_nodes_per_resource = {}

def get_reservations_available_resources(
resources: resources_lib.Resources):
num_available_reserved_nodes_per_resource[resources] = sum(
resources.get_reservations_available_resources().values())

launchable_resources_list: List[resources_lib.Resources] = sum(
launchable_resources.values(), [])
with rich_utils.safe_status(
'[cyan]Checking reserved resources...[/]'):
subprocess_utils.run_in_parallel(
get_reservations_available_resources,
launchable_resources_list)
return num_available_reserved_nodes_per_resource

# Compute the estimated cost/time for each node.
for node_i, node in enumerate(topo_order):
if node_i == 0:
Expand Down Expand Up @@ -279,7 +301,11 @@ def _estimate_nodes_cost_or_time(
list(node.resources)[0]: list(node.resources)
}

# Fetch reservations in advance and in parallel to speed up the
# reservation info fetching.
num_resources = len(list(node.resources))
num_available_reserved_nodes_per_resource = (
get_available_reservations(launchable_resources))

for orig_resources, launchable_list in launchable_resources.items():
if num_resources == 1 and node.time_estimator_func is None:
Expand All @@ -302,15 +328,16 @@ def _estimate_nodes_cost_or_time(
else:
estimated_runtime = node.estimate_runtime(
orig_resources)

for resources in launchable_list:
if do_print:
logger.debug(f'resources: {resources}')

if minimize_cost:
cost_per_node = resources.get_cost(estimated_runtime)
num_available_reserved_nodes = sum(
resources.get_reservations_available_resources(
).values())
num_available_reserved_nodes = (
num_available_reserved_nodes_per_resource[resources]
)

# We consider the cost of the unused reservation
# resources to be 0 since we are already paying for
Expand Down Expand Up @@ -1116,7 +1143,7 @@ def _make_launchables_for_valid_region_zones(
regions = launchable_resources.get_valid_regions_for_launchable()
for region in regions:
if (launchable_resources.use_spot and region.zones is not None or
isinstance(launchable_resources.cloud, clouds.GCP)):
launchable_resources.cloud.optimize_by_zone()):
# Spot instances.
# Do not batch the per-zone requests.
for zone in region.zones:
Expand Down
79 changes: 71 additions & 8 deletions sky/provision/aws/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from sky import status_lib
from sky.adaptors import aws
from sky.clouds import aws as aws_cloud
from sky.clouds.utils import aws_utils
from sky.provision import common
from sky.provision import constants
from sky.provision.aws import utils
Expand Down Expand Up @@ -429,19 +430,81 @@ def _create_node_tag(target_instance, is_head: bool = True) -> str:
head_instance_id = _create_node_tag(resumed_instances[0])

if to_start_count > 0:
target_reservations = (config.node_config.get(
'CapacityReservationSpecification',
{}).get('CapacityReservationTarget',
{}).get('CapacityReservationId', []))
created_instances = []
if target_reservations:
node_config = copy.deepcopy(config.node_config)
# Clear the capacity reservation specification settings in the
# original node config, as we will create instances with
# reservations with specific settings for each reservation.
node_config['CapacityReservationSpecification'] = {
'CapacityReservationTarget': {}
}

reservations = aws_utils.list_reservations_for_instance_type(
node_config['InstanceType'], region=region)
# Filter the reservations by the user-specified ones, because
# reservations contain 'open' reservations as well, which do not
# need to explicitly specify in the config for creating instances.
target_reservations_to_count = {}
for reservation in reservations:
if (reservation.targeted and
reservation.name in target_reservations):
target_reservations_to_count[
reservation.name] = reservation.available_resources

target_reservations_list = sorted(
target_reservations_to_count.items(),
key=lambda x: x[1],
reverse=True)
for reservation, reservation_count in target_reservations_list:
if reservation_count <= 0:
# We have sorted the reservations by the available
# resources, so if the reservation is not available, the
# following reservations are not available either.
break
reservation_count = min(reservation_count, to_start_count)
logger.debug(f'Creating {reservation_count} instances '
f'with reservation {reservation}')
node_config['CapacityReservationSpecification'][
'CapacityReservationTarget'] = {
'CapacityReservationId': reservation
}
created_reserved_instances = _create_instances(
ec2_fail_fast,
cluster_name_on_cloud,
node_config,
tags,
reservation_count,
associate_public_ip_address=(
not config.provider_config['use_internal_ips']))
created_instances.extend(created_reserved_instances)
to_start_count -= reservation_count
if to_start_count <= 0:
break

# TODO(suquark): If there are existing instances (already running or
# resumed), then we cannot guarantee that they will be in the same
# availability zone (when there are multiple zones specified).
# This is a known issue before.

created_instances = _create_instances(
ec2_fail_fast,
cluster_name_on_cloud,
config.node_config,
tags,
to_start_count,
associate_public_ip_address=(
not config.provider_config['use_internal_ips']))
if to_start_count > 0:
# Remove the capacity reservation specification from the node config
# as we have already created the instances with the reservations.
config.node_config.get('CapacityReservationSpecification',
{}).pop('CapacityReservationTarget', None)
created_remaining_instances = _create_instances(
ec2_fail_fast,
cluster_name_on_cloud,
config.node_config,
tags,
to_start_count,
associate_public_ip_address=(
not config.provider_config['use_internal_ips']))
created_instances.extend(created_remaining_instances)
created_instances.sort(key=lambda x: x.id)

created_instance_ids = [n.id for n in created_instances]
Expand Down
Loading

0 comments on commit 1cd2444

Please sign in to comment.