Skip to content
This repository has been archived by the owner on Jun 8, 2019. It is now read-only.

Commit

Permalink
Remove support for reservations
Browse files Browse the repository at this point in the history
  • Loading branch information
cberner committed Jun 26, 2017
1 parent 88a6255 commit 1f50b6c
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 303 deletions.
140 changes: 9 additions & 131 deletions autoscaler/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
from autoscaler.azure_api import AzureWriteThroughCachedApi, AzureWrapper
import autoscaler.capacity as capacity
from autoscaler.kube import KubePod, KubeNode, KubeResource, KubePodStatus
import autoscaler.reservations as reservations
import autoscaler.utils as utils

# we are interested in all pods, incl. system ones
Expand All @@ -47,7 +46,6 @@ class ClusterNodeState(object):
UNDER_UTILIZED_DRAINABLE = 'under-utilized-drainable'
UNDER_UTILIZED_UNDRAINABLE = 'under-utilized-undrainable'
LAUNCH_HR_GRACE_PERIOD = 'launch-hr-grace-period'
RESERVED = 'reserved'
DETACHED = 'detached'


Expand Down Expand Up @@ -153,8 +151,6 @@ def __init__(self, aws_regions, aws_access_key, aws_secret_key,

self.azure_groups = azure.AzureGroups(resource_groups, self.azure_client)

self.reservation_client = reservations.ReservationClient()

# config
self.azure_resource_group_names = azure_resource_group_names
self.azure_regions = azure_regions
Expand Down Expand Up @@ -231,29 +227,24 @@ def scale_loop(self):
time.time() - pods_to_schedule_lookup_start_time,
)

res_data = self.reservation_client.list_reservations()
reservations_map = dict((r['id'], reservations.Reservation(r, managed_nodes))
for r in res_data['reservations'])

pods_by_node = {}
for p in running_or_pending_assigned_pods:
pods_by_node.setdefault(p.node_name, []).append(p)
pending_reservations = self.assign_nodes_to_reservations(managed_nodes, pods_by_node, reservations_map)

if self.scale_up:
logger.info(
"++++++++++++++ Scaling Up Begins ++++++++++++++++")
self.scale(
pods_to_schedule, all_nodes, scaling_groups,
running_insts_map, pending_reservations)
running_insts_map)
logger.info("++++++++++++++ Scaling Up Ends ++++++++++++++++")
if self.maintainance:
logger.info(
"++++++++++++++ Maintenance Begins ++++++++++++++++")
self.maintain(
managed_nodes, running_insts_map,
pods_to_schedule, running_or_pending_assigned_pods,
scaling_groups, reservations_map)
scaling_groups)
logger.info("++++++++++++++ Maintenance Ends ++++++++++++++++")

self.stats.gauge('autoscaler.scaling_loop_time', time.time() - start_time)
Expand All @@ -263,7 +254,7 @@ def scale_loop(self):
logger.warn(e)
return False

def scale(self, pods_to_schedule, all_nodes, asgs, running_insts_map, pending_reservations):
def scale(self, pods_to_schedule, all_nodes, asgs, running_insts_map):
"""
scale up logic
"""
Expand Down Expand Up @@ -298,16 +289,6 @@ def scale(self, pods_to_schedule, all_nodes, asgs, running_insts_map, pending_re
# because a pod may be able to fit in multiple groups
# pick a group now
selectors = dict(pod.selectors)
reservation_id = selectors.get('openai.org/reservation-id')
if reservation_id == reservations.DEFAULT_ID:
# Don't include the reservation id in the selector hash
del selectors['openai.org/reservation-id']
elif reservation_id is not None:
logger.info(
"{pod} is pending for reservation {reservation}".format(
pod=pod, reservation=reservation_id))
# Don't try to scale for this pod, since it's part of a reservation
continue
pending_pods.setdefault(utils.selectors_to_hash(selectors), []).append(pod)
logger.info(
"{pod} is pending ({selectors_hash})".format(
Expand All @@ -317,28 +298,16 @@ def scale(self, pods_to_schedule, all_nodes, asgs, running_insts_map, pending_re
logger.info("{pod} fits on {node}".format(pod=pod,
node=fitting))

# selectors -> list of reservations
reservations_by_selector = {}

for reservation in pending_reservations:
selectors_hash = utils.selectors_to_hash(reservation.node_selectors)
reservations_by_selector.setdefault(selectors_hash, []).append(reservation)
pending_resources, num_instances = reservation.get_pending_resources()
logger.debug('pending %s: resources[%s] instances [%s]',
reservation, pending_resources, num_instances)

# scale each node type to reach the new capacity
for selectors_hash in set(pending_pods.keys()).union(set(reservations_by_selector.keys())):
for selectors_hash in set(pending_pods.keys()):
self.fulfill_pending(asgs,
selectors_hash,
pending_pods.get(selectors_hash, []),
reservations_by_selector.get(selectors_hash, []))
pending_pods.get(selectors_hash, []))

# TODO: make sure desired capacities of untouched groups are consistent

def maintain(self, cached_managed_nodes, running_insts_map,
pods_to_schedule, running_or_pending_assigned_pods, asgs,
reservations_map):
pods_to_schedule, running_or_pending_assigned_pods, asgs):
"""
maintains running instances:
- determines if idle nodes should be drained and terminated
Expand All @@ -351,9 +320,6 @@ def maintain(self, cached_managed_nodes, running_insts_map,
# in order to speed up job start up time
idle_selector_hash = collections.Counter()

# keep track of how much resources each reservation has
reservation_resources = dict()

pods_by_node = {}
for p in running_or_pending_assigned_pods:
pods_by_node.setdefault(p.node_name, []).append(p)
Expand All @@ -366,8 +332,7 @@ def maintain(self, cached_managed_nodes, running_insts_map,
asg = utils.get_group_for_node(asgs, node)
state = self.get_node_state(
node, asg, pods_by_node.get(node.name, []), pods_to_schedule,
running_insts_map, idle_selector_hash, reservations_map,
reservation_resources)
running_insts_map, idle_selector_hash)

logger.info("node: %-*s state: %s" % (75, node, state))
self.stats.increment(
Expand All @@ -380,7 +345,6 @@ def maintain(self, cached_managed_nodes, running_insts_map,
ClusterNodeState.TYPE_GRACE_PERIOD,
ClusterNodeState.ASG_MIN_SIZE,
ClusterNodeState.LAUNCH_HR_GRACE_PERIOD,
ClusterNodeState.RESERVED,
ClusterNodeState.DETACHED):
# do nothing
pass
Expand Down Expand Up @@ -506,52 +470,7 @@ def maintain(self, cached_managed_nodes, running_insts_map,
except TimeoutError:
logger.warn("Timeout while deleting Azure node")

def assign_nodes_to_reservations(self, nodes, pods_by_node, reservations_map):
pending_reservations = set()
for reservation in reservations_map.values():
resources, instances = reservation.get_pending_resources()
if resources.possible or instances > 0:
pending_reservations.add(reservation)

busy_nodes = {}
for node_name, pods in pods_by_node.items():
busy_nodes[node_name] = any(not pod.is_mirrored() for pod in pods)

for node in nodes:
if not busy_nodes.get(node.name, False) and not node.unschedulable and \
(node.reservation_id is None or node.reservation_id == 'none'):
# node is unused, look for a reservation to assign it to
found_match = False
for reservation in pending_reservations:
if reservation.is_match(node):
found_match = True
if not self.dry_run:
# There's a race here, but it shouldn't matter. A pod may have been scheduled on this node
# after we observed the node's state. However, since the reservation-id is None, that pod
# must be scheduled with a selector that doesn't include reservation-id (in which case
# there's no race), or with the !openai.org/reservation-id selector (which doesn't seem
# like a valid use-case)
node.reservation_id = reservation.id
reservation.add_node(node)
else:
logger.info("[Dry run]: Would have assigned node (%s) to reservation (%s)",
node.name, reservation)
resources, instances = reservation.get_pending_resources()
if not resources.possible and instances <= 0:
pending_reservations.remove(reservation)
break

# Assign it to the default reservation, if no other reservation requested this node
if not found_match:
if not self.dry_run:
node.reservation_id = reservations.DEFAULT_ID
else:
logger.info("[Dry run]: Would have assigned node (%s) to reservation (%s)",
node.name, reservations.DEFAULT_ID)

return pending_reservations

def fulfill_pending(self, asgs, selectors_hash, pods, reservations_list):
def fulfill_pending(self, asgs, selectors_hash, pods):
"""
selectors_hash - string repr of selectors
pods - list of KubePods that are pending
Expand All @@ -562,7 +481,6 @@ def fulfill_pending(self, asgs, selectors_hash, pods, reservations_list):

accounted_pods = dict((p, False) for p in pods)
num_unaccounted = len(pods)
pending_reservations = dict((r.id, r) for r in reservations_list)

groups = utils.get_groups_for_hash(asgs, selectors_hash)

Expand Down Expand Up @@ -594,22 +512,6 @@ def fulfill_pending(self, asgs, selectors_hash, pods, reservations_list):
unit_capacity - pod.resources)
assigned_pods.append([pod])

fulfilled_reservations = []
for reservation_id, reservation in pending_reservations.items():
if not group.is_match_for_selectors(reservation.node_selectors):
continue

resources, instances = reservation.get_pending_resources()
units_needed = self._get_required_capacity(resources, group)
units_requested = max(units_needed, instances)
# Multiply by 0, since we're reserving the whole machine
new_instance_resources.extend([unit_capacity * 0] * units_requested)
fulfilled_reservations.append(reservation_id)
logger.info('Planning to request %s more of %s for %s', units_requested, group, reservation_id)

for reservation_id in fulfilled_reservations:
del pending_reservations[reservation_id]

# new desired # machines = # running nodes + # machines required to fit jobs that don't
# fit on running nodes. This scaling is conservative but won't
# create starving
Expand Down Expand Up @@ -661,9 +563,6 @@ def notify_if_scaled(future):
logger.warn('Failed to scale sufficiently.')
self.notifier.notify_failed_to_scale(selectors_hash, pods)

for reservation_id in pending_reservations.keys():
logger.warn('Failed to scale sufficiently for %s', reservation_id)

for operation in async_operations:
try:
operation.result()
Expand Down Expand Up @@ -788,8 +687,7 @@ def sort_key(group):
return sorted(groups, key=sort_key)

def get_node_state(self, node, asg, node_pods, pods_to_schedule,
running_insts_map, idle_selector_hash,
reservations_map, reservation_resources):
running_insts_map, idle_selector_hash):
"""
returns the ClusterNodeState for the given node
Expand All @@ -800,8 +698,6 @@ def get_node_state(self, node, asg, node_pods, pods_to_schedule,
pods_to_schedule - list of all pending pods
running_inst_map - map of all (instance_id -> ec2.Instance object)
idle_selector_hash - current map of idle nodes by type. may be modified
reservations_map - map of all (reservation_id -> Reservation object)
reservation_resources - map of all (reservation_id -> (KubeResource, num instances))
"""
pending_list = []
for pods in pods_to_schedule.values():
Expand Down Expand Up @@ -845,24 +741,6 @@ def get_node_state(self, node, asg, node_pods, pods_to_schedule,
if node.is_dead():
return ClusterNodeState.DEAD

if node.reservation_id in reservations_map and not node.unschedulable:
# this node is in a reservation
# let's see if the reservation needs its resources
# so we can scale down appropriately if the reservation
# has been downsized
reservation = reservations_map[node.reservation_id]
(fulfilled_resources, fulfilled_instances) = (
reservation_resources.get(reservation.id, (KubeResource(), 0)))

if not ((fulfilled_resources - reservation.kube_resources_requested).possible
and fulfilled_instances >= reservation.num_instances_requested):
fulfilled_resources += node.capacity
fulfilled_instances += 1
reservation_resources[reservation.id] = (fulfilled_resources, fulfilled_instances)
return ClusterNodeState.RESERVED

logger.warn('reserved extra node!!! %s %s', fulfilled_resources, fulfilled_instances)

if asg and len(asg.nodes) <= asg.min_size:
return ClusterNodeState.ASG_MIN_SIZE

Expand Down
24 changes: 0 additions & 24 deletions autoscaler/kube.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ class KubePodStatus(object):
FAILED = 'Failed'

_CORDON_LABEL = 'openai/cordoned-by-autoscaler'
_NUM_API_RETRIES = 3


class KubePod(object):
Expand Down Expand Up @@ -173,29 +172,6 @@ def _get_instance_data(self):
def selectors(self):
return self.original.obj['metadata'].get('labels', {})

@property
def reservation_id(self):
return self.selectors.get('openai.org/reservation-id')

@reservation_id.setter
def reservation_id(self, reservation_id):
if reservation_id == self.reservation_id:
return

for _ in range(_NUM_API_RETRIES):
try:
self.original.reload()
self.original.obj['metadata'].setdefault('labels', {})['openai.org/reservation-id'] = reservation_id
self.original.update()
logger.info("Assigned %s (%s) to reservation %s", self.name, self.instance_id, reservation_id)
return
except pykube.exceptions.HTTPError as e:
if e.code == 409:
logger.info('Retrying reservation assignment [%s (%s) -> %s]',
self.name, self.instance_id, reservation_id)
else:
raise e

@property
def unschedulable(self):
return self.original.obj['spec'].get('unschedulable', False)
Expand Down
71 changes: 0 additions & 71 deletions autoscaler/reservations.py

This file was deleted.

Loading

0 comments on commit 1f50b6c

Please sign in to comment.