Skip to content
This repository has been archived by the owner on Jun 8, 2019. It is now read-only.

Commit

Permalink
Make scaling aware of Azure core quotas
Browse files Browse the repository at this point in the history
  • Loading branch information
cberner committed Oct 4, 2017
1 parent 86d3d2f commit 6630f64
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 71 deletions.
7 changes: 5 additions & 2 deletions autoscaler/azure.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,25 +179,28 @@ def set_desired_capacity(self, new_desired_capacity):
if scale_out == 0:
return CompletedFuture(False)

remaining_instances = self.client.get_remaining_instances(self.resource_group, self.instance_type)

futures = []
for scale_set in sorted(self.scale_sets.values(), key=lambda x: (x.priority, x.name)):
if scale_set.capacity < _SCALE_SET_SIZE_LIMIT:
if self.slow_scale:
new_group_capacity = scale_set.capacity + 1
else:
new_group_capacity = min(_SCALE_SET_SIZE_LIMIT, scale_set.capacity + scale_out)
new_group_capacity = min(_SCALE_SET_SIZE_LIMIT, scale_set.capacity + scale_out, scale_set.capacity + remaining_instances)
if scale_set.provisioning_state == 'Updating':
logger.warn("Update of {} already in progress".format(scale_set.name))
continue
if scale_set.provisioning_state == 'Failed':
logger.error("{} failed provisioning. Skipping it for scaling.".format(scale_set.name))
continue
scale_out -= (new_group_capacity - scale_set.capacity)
remaining_instances -= (new_group_capacity - scale_set.capacity)
# Update our cached version
self.scale_sets[scale_set.name].capacity = new_group_capacity
futures.append(self.client.update_scale_set(scale_set, new_group_capacity))
logger.info("Scaling Azure Scale Set {} to {}".format(scale_set.name, new_group_capacity))
if scale_out == 0:
if scale_out == 0 or remaining_instances == 0:
break

if scale_out > 0:
Expand Down
53 changes: 52 additions & 1 deletion autoscaler/azure_api.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
import json
import re

from azure.monitor import MonitorClient
from azure.monitor.models import EventData
Expand All @@ -12,6 +13,7 @@
from abc import ABC
from azure.mgmt.compute import ComputeManagementClient
from azure.mgmt.compute.models import VirtualMachineScaleSet, Sku
from azure.mgmt.resource import ResourceManagementClient

from autoscaler.utils import Future

Expand Down Expand Up @@ -94,14 +96,30 @@ def update_scale_set(self, scale_set: AzureScaleSet, new_capacity: int) -> Futur
def terminate_scale_set_instances(self, scale_set: AzureScaleSet, instances: List[AzureScaleSetInstance]) -> Future:
pass

def get_remaining_instances(self, resource_group_name: str, sku: str) -> int:
pass


TIMEOUT_PERIOD = timedelta(minutes=15)


# Mangles a SKU name into the family name used for quotas
def _azure_sku_family(name: str) -> str:
match = re.match('Standard_(?P<family>[A-Z]{1,2})[0-9]{1,2}_?(?P<version>v[0-9])?', name)
if match is None:
raise ValueError("SKU not from a recognized family: " + name)
result = "standard" + match.group('family')
if match.group('version') is not None:
result += match.group('version')
result += 'Family'
return result


class AzureWrapper(AzureApi):
def __init__(self, compute_client: ComputeManagementClient, monitor_client: MonitorClient) -> None:
def __init__(self, compute_client: ComputeManagementClient, monitor_client: MonitorClient, resource_client: ResourceManagementClient) -> None:
self._compute_client = compute_client
self._monitor_client = monitor_client
self._resource_client = resource_client

def list_scale_sets(self, resource_group_name: str) -> List[AzureScaleSet]:
fifteen_minutes_ago = datetime.now(pytz.utc) - TIMEOUT_PERIOD
Expand Down Expand Up @@ -161,13 +179,32 @@ def terminate_scale_set_instances(self, scale_set: AzureScaleSet, instances: Lis
future = self._compute_client.virtual_machine_scale_sets.delete_instances(scale_set.resource_group, scale_set.name, [instance.instance_id for instance in instances])
return AzureOperationPollerFutureAdapter(future)

def get_remaining_instances(self, resource_group_name: str, sku: str):
resource_group = self._resource_client.resource_groups.get(resource_group_name)
cores_per_instance = None
for vm_size in self._compute_client.virtual_machine_sizes.list(location=resource_group.location):
if vm_size.name == sku:
cores_per_instance = vm_size.number_of_cores

logger.warn("No metadata found for sku: " + sku)
if cores_per_instance is None:
return 0

for usage in self._compute_client.usage.list(location=resource_group.location):
if usage.name.value == _azure_sku_family(sku):
return (usage.limit - usage.current_value) // cores_per_instance

logger.warn("No quota found matching: " + sku)
return 0


class AzureWriteThroughCachedApi(AzureApi):
def __init__(self, delegate: AzureApi) -> None:
self._delegate = delegate
self._lock = RLock()
self._instance_cache: MutableMapping[Tuple[str, str], List[AzureScaleSetInstance]] = {}
self._scale_set_cache: MutableMapping[str, List[AzureScaleSet]] = {}
self._remaining_instances_cache: MutableMapping[str, MutableMapping[str, int]] = {}

def list_scale_sets(self, resource_group_name: str, force_refresh=False) -> List[AzureScaleSet]:
if not force_refresh:
Expand Down Expand Up @@ -214,6 +251,17 @@ def terminate_scale_set_instances(self, scale_set: AzureScaleSet, instances: Lis
future.add_done_callback(lambda _: self._invalidate(scale_set.resource_group, scale_set.name))
return future

def get_remaining_instances(self, resource_group_name: str, sku: str):
with self._lock:
if resource_group_name in self._remaining_instances_cache:
cached = self._remaining_instances_cache[resource_group_name]
if sku in cached:
return cached[sku]
remaining = self._delegate.get_remaining_instances(resource_group_name, sku)
with self._lock:
self._remaining_instances_cache.setdefault(resource_group_name, {})[sku] = remaining
return remaining

def _invalidate(self, resource_group_name: str, scale_set_name: str) -> None:
with self._lock:
if (resource_group_name, scale_set_name) in self._instance_cache:
Expand All @@ -222,6 +270,9 @@ def _invalidate(self, resource_group_name: str, scale_set_name: str) -> None:
if resource_group_name in self._scale_set_cache:
del self._scale_set_cache[resource_group_name]

if resource_group_name in self._remaining_instances_cache:
del self._remaining_instances_cache[resource_group_name]


_AZURE_API_MAX_WAIT = 10*60

Expand Down
2 changes: 1 addition & 1 deletion autoscaler/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ def __init__(self, aws_regions, aws_access_key, aws_secret_key,

monitor_client = MonitorClient(azure_credentials, azure_subscription_id)
monitor_client.config.retry_policy.policy = azure.AzureBoundedRetry.from_retry(monitor_client.config.retry_policy.policy)
self.azure_client = AzureWriteThroughCachedApi(AzureWrapper(compute_client, monitor_client))
self.azure_client = AzureWriteThroughCachedApi(AzureWrapper(compute_client, monitor_client, resource_client))

self.azure_groups = azure.AzureGroups(resource_groups, azure_slow_scale_classes, self.azure_client)

Expand Down
128 changes: 63 additions & 65 deletions test/test_azure.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,32 +8,57 @@
import pytz
import yaml
from azure.mgmt.compute.models import VirtualMachineScaleSetVM, \
VirtualMachineInstanceView
VirtualMachineInstanceView, VirtualMachineSize, Usage, UsageName
from azure.mgmt.resource.resources.models import ResourceGroup

from autoscaler import KubePod
from autoscaler.azure import AzureVirtualScaleSet
from autoscaler.azure_api import AzureScaleSet, AzureWrapper


def _default_mock_clients(region, instances=[], quotas={'Dv2': 100, 'NC': 100}):
sizes = [
VirtualMachineSize(name="Standard_D1_v2", number_of_cores=1),
VirtualMachineSize(name="Standard_NC24", number_of_cores=24)
]
mock_client = mock.Mock()
mock_client.virtual_machine_scale_set_vms = mock.Mock()
mock_client.virtual_machine_scale_set_vms.list = mock.Mock(return_value=instances)
mock_client.virtual_machine_scale_sets = mock.Mock()
mock_client.virtual_machine_scale_sets.create_or_update = mock.Mock()
mock_client.virtual_machine_scale_sets.delete_instances = mock.Mock()
mock_client.virtual_machine_sizes = mock.Mock()
mock_client.virtual_machine_sizes.list = mock.Mock(return_value=sizes)
mock_client.usage = mock.Mock()
usage_limits = []
for k, v in quotas.items():
usage_limits.append(Usage(name=UsageName(value="standard" + k + "Family"), limit=v, current_value=0))
mock_client.usage.list = mock.Mock(return_value=usage_limits)

monitor_client = mock.Mock()
monitor_client.activity_logs = mock.Mock()
monitor_client.activity_logs.list = mock.Mock(return_value=[])

azure_resource_group = ResourceGroup(location=region)
resource_client = mock.Mock()
resource_client.resource_groups = mock.Mock()
resource_client.activity_logs.get = mock.Mock(return_value=azure_resource_group)

return (mock_client, monitor_client, resource_client)


class TestCluster(unittest.TestCase):
def test_failed_scale_up(self):
region = 'test'
mock_client = mock.Mock()
mock_client.virtual_machine_scale_set_vms = mock.Mock()
mock_client.virtual_machine_scale_set_vms.list = mock.Mock(return_value=[])
mock_client.virtual_machine_scale_sets = mock.Mock()
mock_client.virtual_machine_scale_sets.create_or_update = mock.Mock()

monitor_client = mock.Mock()
monitor_client.activity_logs = mock.Mock()
monitor_client.activity_logs.list = mock.Mock(return_value=[])
mock_client, monitor_client, resource_client = _default_mock_clients(region)

instance_type = 'Standard_D1_v2'
resource_group = 'test-resource-group'
failed_scale_set = AzureScaleSet(region, resource_group, 'test-scale-set1', instance_type, 0, 'Failed')
scale_set = AzureScaleSet(region, resource_group, 'test-scale-set2', instance_type, 0, 'Succeeded')

virtual_scale_set = AzureVirtualScaleSet(region, resource_group, AzureWrapper(mock_client, monitor_client), instance_type, False, [failed_scale_set, scale_set], [])
virtual_scale_set = AzureVirtualScaleSet(region, resource_group, AzureWrapper(mock_client, monitor_client, resource_client), instance_type, False, [failed_scale_set, scale_set], [])

virtual_scale_set.scale(5)

Expand All @@ -42,21 +67,14 @@ def test_failed_scale_up(self):

def test_scale_up(self):
region = 'test'
mock_client = mock.Mock()
mock_client.virtual_machine_scale_set_vms = mock.Mock()
mock_client.virtual_machine_scale_set_vms.list = mock.Mock(return_value=[])
mock_client.virtual_machine_scale_sets = mock.Mock()
mock_client.virtual_machine_scale_sets.create_or_update = mock.Mock()

monitor_client = mock.Mock()
monitor_client.activity_logs = mock.Mock()
monitor_client.activity_logs.list = mock.Mock(return_value=[])
mock_client, monitor_client, resource_client = _default_mock_clients(region)

instance_type = 'Standard_D1_v2'
resource_group = 'test-resource-group'
scale_set = AzureScaleSet(region, resource_group, 'test-scale-set', instance_type, 0, 'Succeeded')

virtual_scale_set = AzureVirtualScaleSet(region, resource_group, AzureWrapper(mock_client, monitor_client), instance_type, False, [scale_set], [])
virtual_scale_set = AzureVirtualScaleSet(region, resource_group, AzureWrapper(mock_client, monitor_client, resource_client), instance_type, False, [scale_set], [])

virtual_scale_set.scale(5)

Expand All @@ -65,23 +83,16 @@ def test_scale_up(self):

def test_priority(self):
region = 'test'
mock_client = mock.Mock()
mock_client.virtual_machine_scale_set_vms = mock.Mock()
mock_client.virtual_machine_scale_set_vms.list = mock.Mock(return_value=[])
mock_client.virtual_machine_scale_sets = mock.Mock()
mock_client.virtual_machine_scale_sets.create_or_update = mock.Mock()

monitor_client = mock.Mock()
monitor_client.activity_logs = mock.Mock()
monitor_client.activity_logs.list = mock.Mock(return_value=[])
mock_client, monitor_client, resource_client = _default_mock_clients(region)

instance_type = 'Standard_D1_v2'
resource_group = 'test-resource-group'
scale_set = AzureScaleSet(region, resource_group, 'test-scale-set', instance_type, 0, 'Succeeded', priority=-1)
# Name sorts lexicographically before previous scale set, but priority is after it
scale_set2 = AzureScaleSet(region, resource_group, 'a-test-scale-set', instance_type, 0, 'Succeeded', priority=1)

virtual_scale_set = AzureVirtualScaleSet(region, resource_group, AzureWrapper(mock_client, monitor_client), instance_type, True, [scale_set, scale_set2], [])
virtual_scale_set = AzureVirtualScaleSet(region, resource_group, AzureWrapper(mock_client, monitor_client, resource_client), instance_type, True, [scale_set, scale_set2], [])

virtual_scale_set.scale(1)

Expand All @@ -92,22 +103,15 @@ def test_priority(self):

def test_slow_scale_up(self):
region = 'test'
mock_client = mock.Mock()
mock_client.virtual_machine_scale_set_vms = mock.Mock()
mock_client.virtual_machine_scale_set_vms.list = mock.Mock(return_value=[])
mock_client.virtual_machine_scale_sets = mock.Mock()
mock_client.virtual_machine_scale_sets.create_or_update = mock.Mock()

monitor_client = mock.Mock()
monitor_client.activity_logs = mock.Mock()
monitor_client.activity_logs.list = mock.Mock(return_value=[])
mock_client, monitor_client, resource_client = _default_mock_clients(region)

instance_type = 'Standard_D1_v2'
resource_group = 'test-resource-group'
scale_set = AzureScaleSet(region, resource_group, 'test-scale-set', instance_type, 0, 'Succeeded')
scale_set2 = AzureScaleSet(region, resource_group, 'test-scale-set2', instance_type, 0, 'Succeeded')

virtual_scale_set = AzureVirtualScaleSet(region, resource_group, AzureWrapper(mock_client, monitor_client), instance_type, True, [scale_set, scale_set2], [])
virtual_scale_set = AzureVirtualScaleSet(region, resource_group, AzureWrapper(mock_client, monitor_client, resource_client), instance_type, True, [scale_set, scale_set2], [])

virtual_scale_set.scale(2)

Expand All @@ -117,21 +121,14 @@ def test_slow_scale_up(self):

def test_tainted_scale_set(self):
region = 'test'
mock_client = mock.Mock()
mock_client.virtual_machine_scale_set_vms = mock.Mock()
mock_client.virtual_machine_scale_set_vms.list = mock.Mock(return_value=[])
mock_client.virtual_machine_scale_sets = mock.Mock()
mock_client.virtual_machine_scale_sets.create_or_update = mock.Mock()

monitor_client = mock.Mock()
monitor_client.activity_logs = mock.Mock()
monitor_client.activity_logs.list = mock.Mock(return_value=[])
mock_client, monitor_client, resource_client = _default_mock_clients(region)

instance_type = 'Standard_NC24'
resource_group = 'test-resource-group'
scale_set = AzureScaleSet(region, resource_group, 'test-scale-set', instance_type, 0, 'Succeeded', no_schedule_taints={'gpu': 'yes'})

virtual_scale_set = AzureVirtualScaleSet(region, resource_group, AzureWrapper(mock_client, monitor_client), instance_type, True, [scale_set], [])
virtual_scale_set = AzureVirtualScaleSet(region, resource_group, AzureWrapper(mock_client, monitor_client, resource_client), instance_type, True, [scale_set], [])

dir_path = os.path.dirname(os.path.realpath(__file__))
with open(os.path.join(dir_path, 'data/busybox.yaml'), 'r') as f:
Expand All @@ -146,23 +143,32 @@ def test_tainted_scale_set(self):

def test_out_of_quota(self):
region = 'test'
mock_client = mock.Mock()
mock_client.virtual_machine_scale_set_vms = mock.Mock()
mock_client.virtual_machine_scale_set_vms.list = mock.Mock(return_value=[])
mock_client.virtual_machine_scale_sets = mock.Mock()
mock_client.virtual_machine_scale_sets.create_or_update = mock.Mock()

monitor_client = mock.Mock()
monitor_client.activity_logs = mock.Mock()
monitor_client.activity_logs.list = mock.Mock(return_value=[])
mock_client, monitor_client, resource_client = _default_mock_clients(region)

instance_type = 'Standard_D1_v2'
resource_group = 'test-resource-group'
scale_set = AzureScaleSet(region, resource_group, 'test-scale-set', instance_type, 0, 'Succeeded',
timeout_until=datetime.now(pytz.utc) + timedelta(minutes=10), timeout_reason="fake reason")
virtual_scale_set = AzureVirtualScaleSet(region, resource_group, AzureWrapper(mock_client, monitor_client), instance_type, False, [scale_set], [])
virtual_scale_set = AzureVirtualScaleSet(region, resource_group, AzureWrapper(mock_client, monitor_client, resource_client), instance_type, False, [scale_set], [])
self.assertTrue(virtual_scale_set.is_timed_out())

def test_near_quota_limit(self):
region = 'test'

mock_client, monitor_client, resource_client = _default_mock_clients(region, quotas={'Dv2': 5})

instance_type = 'Standard_D1_v2'
resource_group = 'test-resource-group'
scale_set = AzureScaleSet(region, resource_group, 'test-scale-set', instance_type, 0, 'Succeeded')

virtual_scale_set = AzureVirtualScaleSet(region, resource_group, AzureWrapper(mock_client, monitor_client, resource_client), instance_type, False, [scale_set], [])

virtual_scale_set.scale(10)

mock_client.virtual_machine_scale_sets.create_or_update.assert_called_once()
self.assertEqual(mock_client.virtual_machine_scale_sets.create_or_update.call_args[1]['parameters'].sku.capacity, 5)

def test_scale_in(self):
region = 'test'
resource_group = 'test-resource-group'
Expand All @@ -173,23 +179,15 @@ def test_scale_in(self):
instance.instance_view = VirtualMachineInstanceView()
instance.instance_view.statuses = []

mock_client = mock.Mock()
mock_client.virtual_machine_scale_set_vms = mock.Mock()
mock_client.virtual_machine_scale_set_vms.list = mock.Mock(return_value=[instance])
mock_client.virtual_machine_scale_sets = mock.Mock()
mock_client.virtual_machine_scale_sets.delete_instances = mock.Mock()

monitor_client = mock.Mock()
monitor_client.activity_logs = mock.Mock()
monitor_client.activity_logs.list = mock.Mock(return_value=[])
mock_client, monitor_client, resource_client = _default_mock_clients(region, instances=[instance])

TestNode = collections.namedtuple('TestNode', ['instance_id', 'unschedulable'])
test_node = TestNode(instance_id=instance.vm_id, unschedulable=False)

instance_type = 'Standard_D1_v2'
scale_set = AzureScaleSet(region, resource_group, 'test-scale-set', instance_type, 1, 'Succeeded')

virtual_scale_set = AzureVirtualScaleSet(region, resource_group, AzureWrapper(mock_client, monitor_client), instance_type, False, [scale_set], [test_node])
virtual_scale_set = AzureVirtualScaleSet(region, resource_group, AzureWrapper(mock_client, monitor_client, resource_client), instance_type, False, [scale_set], [test_node])

self.assertEqual(virtual_scale_set.instance_ids, {instance.vm_id})
self.assertEqual(virtual_scale_set.nodes, [test_node])
Expand Down
Loading

0 comments on commit 6630f64

Please sign in to comment.