From 8a0b1a1bae67372f787b9163178a2d8d7388458c Mon Sep 17 00:00:00 2001 From: Zhanghao Wu Date: Thu, 22 Aug 2024 11:30:37 -0700 Subject: [PATCH] [AWS] Support capacity block reservation (#3853) * wip Allow prioritize reservations format Allow open capacity reservations Add check reserved resources format Remove specific reservations * parent fcf1f60e2903edf8af7cdddf98189dc0c79e34d1 author Zhanghao Wu 1724175607 +0000 committer Zhanghao Wu 1724210666 +0000 wip Allow prioritize reservations format Add check reserved resources format * Support target capacity reservation provisioning * Support capacity block * format * Fix reservation type * Fix target reservations * Fix comments * Remove aws internal tags * format * format * add comment for prioritize reservations --- sky/clouds/utils/aws_utils.py | 22 +++++++++++------ sky/provision/aws/instance.py | 45 ++++++++++++++++++++++------------- 2 files changed, 44 insertions(+), 23 deletions(-) diff --git a/sky/clouds/utils/aws_utils.py b/sky/clouds/utils/aws_utils.py index a6c21a15d5c..49cb2810e16 100644 --- a/sky/clouds/utils/aws_utils.py +++ b/sky/clouds/utils/aws_utils.py @@ -1,5 +1,6 @@ """Utilities for AWS.""" import dataclasses +import enum import time from typing import List @@ -9,6 +10,11 @@ from sky.adaptors import aws +class ReservationType(str, enum.Enum): + DEFAULT = 'default' + BLOCK = 'capacity-block' + + @dataclasses.dataclass class AWSReservation: name: str @@ -18,6 +24,7 @@ class AWSReservation: # Whether the reservation is targeted, i.e. can only be consumed when # the reservation name is specified. targeted: bool + type: ReservationType def use_reservations() -> bool: @@ -47,11 +54,12 @@ def list_reservations_for_instance_type( }]) reservations = response['CapacityReservations'] return [ - AWSReservation( - name=r['CapacityReservationId'], - instance_type=r['InstanceType'], - zone=r['AvailabilityZone'], - available_resources=r['AvailableInstanceCount'], - targeted=r['InstanceMatchCriteria'] == 'targeted', - ) for r in reservations + AWSReservation(name=r['CapacityReservationId'], + instance_type=r['InstanceType'], + zone=r['AvailabilityZone'], + available_resources=r['AvailableInstanceCount'], + targeted=r['InstanceMatchCriteria'] == 'targeted', + type=ReservationType(r.get('ReservationType', + 'default'))) + for r in reservations ] diff --git a/sky/provision/aws/instance.py b/sky/provision/aws/instance.py index 24173482f34..4da80cb9afe 100644 --- a/sky/provision/aws/instance.py +++ b/sky/provision/aws/instance.py @@ -209,6 +209,8 @@ def _create_instances(ec2_fail_fast, cluster_name: str, assert 'NetworkInterfaces' not in conf, conf assert security_group_ids is not None, conf + logger.debug(f'Creating {count} instances with config: \n{conf}') + # NOTE: This ensures that we try ALL availability zones before # throwing an error. num_subnets = len(subnet_ids) @@ -322,9 +324,14 @@ def _create_node_tag(target_instance, is_head: bool = True) -> str: 'Key': 'Name', 'Value': f'sky-{cluster_name_on_cloud}-worker' }) + # Remove AWS internal tags, as they are not allowed to be set by users. + target_instance_tags = [ + tag for tag in target_instance.tags + if not tag['Key'].startswith('aws:') + ] ec2.meta.client.create_tags( Resources=[target_instance.id], - Tags=target_instance.tags + node_tag, + Tags=target_instance_tags + node_tag, ) return target_instance.id @@ -430,12 +437,12 @@ def _create_node_tag(target_instance, is_head: bool = True) -> str: head_instance_id = _create_node_tag(resumed_instances[0]) if to_start_count > 0: - target_reservations = (config.node_config.get( + target_reservation_names = (config.node_config.get( 'CapacityReservationSpecification', {}).get('CapacityReservationTarget', {}).get('CapacityReservationId', [])) created_instances = [] - if target_reservations: + if target_reservation_names: node_config = copy.deepcopy(config.node_config) # Clear the capacity reservation specification settings in the # original node config, as we will create instances with @@ -449,29 +456,35 @@ def _create_node_tag(target_instance, is_head: bool = True) -> str: # Filter the reservations by the user-specified ones, because # reservations contain 'open' reservations as well, which do not # need to explicitly specify in the config for creating instances. - target_reservations_to_count = {} - for reservation in reservations: - if (reservation.targeted and - reservation.name in target_reservations): - target_reservations_to_count[ - reservation.name] = reservation.available_resources + target_reservations = [] + for r in reservations: + if (r.targeted and r.name in target_reservation_names): + target_reservations.append(r) + logger.debug(f'Reservations: {reservations}') + logger.debug(f'Target reservations: {target_reservations}') target_reservations_list = sorted( - target_reservations_to_count.items(), - key=lambda x: x[1], + target_reservations, + key=lambda x: x.available_resources, reverse=True) - for reservation, reservation_count in target_reservations_list: - if reservation_count <= 0: + for r in target_reservations_list: + if r.available_resources <= 0: # We have sorted the reservations by the available # resources, so if the reservation is not available, the # following reservations are not available either. break - reservation_count = min(reservation_count, to_start_count) + reservation_count = min(r.available_resources, to_start_count) logger.debug(f'Creating {reservation_count} instances ' - f'with reservation {reservation}') + f'with reservation {r.name}') node_config['CapacityReservationSpecification'][ 'CapacityReservationTarget'] = { - 'CapacityReservationId': reservation + 'CapacityReservationId': r.name + } + if r.type == aws_utils.ReservationType.BLOCK: + # Capacity block reservations needs to specify the market + # type during instance creation. + node_config['InstanceMarketOptions'] = { + 'MarketType': aws_utils.ReservationType.BLOCK.value } created_reserved_instances = _create_instances( ec2_fail_fast,