Skip to content

Commit

Permalink
refactored using low-level DB queries
Browse files Browse the repository at this point in the history
  • Loading branch information
leust committed Jul 25, 2023
1 parent 58e20eb commit 5dd9c7b
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 105 deletions.
63 changes: 63 additions & 0 deletions nova/db/main/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2087,6 +2087,69 @@ def instance_get_active_by_window_joined(context, begin, end=None,
return _instances_fill_metadata(context, query.all(), manual_joins)


@require_context
@pick_context_manager_reader_allow_async
def instance_get_host_by_tag(context, tag, filters=None):
count_label = func.count('*').label('count')
query = context.session.query(models.Instance, count_label). \
join(models.Tag, models.Tag.resource_id == models.Instance.uuid)
query = _handle_instance_host_query_filters(query, filters)
query = query.filter(models.Instance.deleted == 0,
models.Tag.tag == tag)

query = query.group_by(models.Instance.host). \
order_by(sql.desc(count_label)).limit(1)

result = query.all()
if result:
return result[0]
else:
return None


@require_context
@pick_context_manager_reader_allow_async
def instance_get_host_by_metadata(context, meta_key, meta_value,
filters=None):
count_label = func.count('*').label('count')
query = context.session.query(models.Instance.host, count_label). \
join(models.InstanceMetadata,
models.InstanceMetadata.instance_uuid == models.Instance.uuid)
query = _handle_instance_host_query_filters(query, filters)
query = query.filter(models.Instance.deleted == 0,
models.InstanceMetadata.deleted == 0,
models.InstanceMetadata.key == meta_key,
models.InstanceMetadata.value == meta_value)
query = query.group_by(models.Instance.host). \
order_by(sql.desc(count_label)). \
limit(1)

result = query.all()
if result:
return result[0]
else:
return None


def _handle_instance_host_query_filters(query, filters=None):
if not filters:
return query
hv_type = filters.get('hv_type')
if hv_type:
query = query.join(
models.ComputeNode,
models.Instance.node == models.ComputeNode.hypervisor_hostname)

availability_zone = filters.get('availability_zone')
if availability_zone:
query = query.filter(
models.Instance.availability_zone == availability_zone)
if hv_type:
query = query.filter(models.ComputeNode.deleted == 0,
models.ComputeNode.hypervisor_type == hv_type)
return query


def _instance_get_all_query(context, project_only=False, joins=None):
if joins is None:
joins = ['info_cache', 'security_groups']
Expand Down
104 changes: 41 additions & 63 deletions nova/scheduler/filters/shard_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@

import nova.conf
from nova import context as nova_context
from nova.db.main import api as main_db_api
from nova.objects.aggregate import AggregateList
from nova.objects.build_request import BuildRequest
from nova.objects.instance import InstanceList
from nova.scheduler import filters
from nova.scheduler import utils
from nova import utils as nova_utils
Expand All @@ -32,9 +32,10 @@
CONF = nova.conf.CONF

_SERVICE_AUTH = None
GARDENER_PREFIX = "kubernetes.io-cluster-shoot--garden--"
GARDENER_PREFIX = "kubernetes.io-cluster-"
KKS_PREFIX = "kubernikus:kluster"
HANA_PREFIX = "hana_"
VMWARE_HV_TYPE = 'VMware vCenter Server'


class ShardFilter(filters.BaseHostFilter):
Expand Down Expand Up @@ -123,87 +124,63 @@ def _get_shards(self, project_id):

return self._PROJECT_SHARD_CACHE.get(project_id)

def _get_k8s_cluster_hosts(self, spec_obj):
"""If the instance will be part of a K8S cluster, it returns
the list of all other hosts that are already part of it,
if any. If there are multiple shards part of the existing k8s
cluster, the hosts of the dominant shard are returned.
"""
def _get_k8s_shard(self, spec_obj):
if (spec_obj.flavor.name.startswith(HANA_PREFIX) or
utils.request_is_resize(spec_obj)):
return None
elevated = nova_context.get_admin_context()
build_request = BuildRequest.get_by_instance_uuid(
elevated, spec_obj.instance_uuid)

k8s_filter = self._k8s_instance_query_filter(spec_obj)

if not k8s_filter:
return None

k8s_filter['project_id'] = spec_obj.project_id

instances = InstanceList.get_by_filters(
nova_context.get_admin_context(), filters=k8s_filter,
expected_attrs=['flavor', 'metadata', 'tags'])
kks_tag = next((t.tag for t in build_request.tags
if t.tag.startswith(KKS_PREFIX)), None)
gardener_meta = None
if not kks_tag:
gardener_meta = \
{k: v for k, v in build_request.instance.metadata.items()
if k.startswith(GARDENER_PREFIX)}

if not instances:
if not kks_tag and not gardener_meta:
return None

q_filters = {'hv_type': VMWARE_HV_TYPE}
if spec_obj.availability_zone:
instances = [i for i in instances
if i.availability_zone == spec_obj.availability_zone]
q_filters['availability_zone'] = spec_obj.availability_zone

if not instances:
k8s_host = None
if kks_tag:
k8s_host = nova_context.scatter_gather_skip_cell0(
elevated, main_db_api.instance_get_host_by_tag,
kks_tag, filters=q_filters)
elif gardener_meta:
(meta_key, meta_value) = next(
(k, v) for k, v in gardener_meta.items())
k8s_host = nova_context.scatter_gather_skip_cell0(
elevated, main_db_api.instance_get_host_by_metadata,
meta_key, meta_value, filters=q_filters)

if not k8s_host:
return None

aggrs = [aggr for aggr in
AggregateList.get_all(nova_context.get_admin_context())
aggrs = [aggr.name for aggr in
AggregateList.get_by_host(elevated, k8s_host)
if aggr.name.startswith(self._SHARD_PREFIX)]

if not aggrs:
if aggrs:
return aggrs[0]
else:
return None

shards_size = {
aggr: sum(1 for i in instances if i.host in aggr.hosts)
for aggr in aggrs
}

shard_aggr = next(s[0] for s in
sorted(shards_size.items(),
key=lambda i: i[1], reverse=True)
)

return set(i.host for i in instances if i.host in shard_aggr.hosts)

def _k8s_instance_query_filter(self, spec_obj):
elevated = nova_context.get_admin_context()
build_request = BuildRequest.get_by_instance_uuid(
elevated, spec_obj.instance_uuid)

# Kubernikus
kks_tag = next((t.tag for t in build_request.tags
if t.tag.startswith(KKS_PREFIX)), None)
if kks_tag:
return {'tags': [kks_tag]}

# Gardener
gardener_meta = \
{k: v for k, v in build_request.instance.metadata.items()
if k.startswith(GARDENER_PREFIX)}
if gardener_meta:
return {'metadata': gardener_meta}

return None

def filter_all(self, filter_obj_list, spec_obj):
# Only VMware
if utils.is_non_vmware_spec(spec_obj):
return filter_obj_list

k8s_hosts = self._get_k8s_cluster_hosts(spec_obj)
k8s_shard = self._get_k8s_shard(spec_obj)

return [host_state for host_state in filter_obj_list
if self._host_passes(host_state, spec_obj, k8s_hosts)]
if self._host_passes(host_state, spec_obj, k8s_shard)]

def _host_passes(self, host_state, spec_obj, k8s_hosts):
def _host_passes(self, host_state, spec_obj, k8s_shard):
host_shard_aggrs = [aggr for aggr in host_state.aggregates
if aggr.name.startswith(self._SHARD_PREFIX)]

Expand Down Expand Up @@ -247,8 +224,9 @@ def _host_passes(self, host_state, spec_obj, k8s_hosts):
'project_shards': shards})
return False

if k8s_hosts:
return self._host_passes_k8s(host_shard_aggrs, k8s_hosts)
if k8s_shard:
return any(host_shard == k8s_shard
for host_shard in host_shard_names)

return True

Expand Down
69 changes: 27 additions & 42 deletions nova/tests/unit/scheduler/filters/test_shard_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import mock

from nova.db.main import api as main_db_api
from nova import objects
from nova.scheduler.filters import shard_filter
from nova import test
Expand Down Expand Up @@ -264,13 +265,13 @@ def test_shard_project_has_sharding_enabled_and_single_shards(
mock.sentinel.ctx, expected_attrs=['extra_specs']))
self._assert_passes(host, spec_obj, True)

@mock.patch('nova.objects.AggregateList.get_all')
@mock.patch('nova.objects.InstanceList.get_by_filters')
@mock.patch('nova.objects.AggregateList.get_by_host')
@mock.patch('nova.context.scatter_gather_skip_cell0')
@mock.patch('nova.objects.BuildRequest.get_by_instance_uuid')
@mock.patch('nova.context.get_admin_context')
def test_same_shard_for_kubernikus_cluster(self, get_context,
get_by_uuid,
get_by_filters,
gather_host,
get_aggrs):
kks_cluster = 'kubernikus:kluster-example'
build_req = objects.BuildRequest()
Expand All @@ -279,25 +280,27 @@ def test_same_shard_for_kubernikus_cluster(self, get_context,
])

result = self._filter_k8s_hosts(build_req, get_context,
get_by_uuid, get_by_filters,
get_by_uuid, gather_host,
get_aggrs)

get_by_filters.assert_called_once_with(
gather_host.assert_called_once_with(
get_context.return_value,
filters={'tags': ['kubernikus:kluster-example'],
'project_id': 'foo'},
expected_attrs=['flavor', 'metadata', 'tags'])
main_db_api.instance_get_host_by_tag,
'kubernikus:kluster-example',
filters={'hv_type': 'VMware vCenter Server',
'availability_zone': 'az-2'})

self.assertEqual(2, len(result))
self.assertEqual(result[0].host, 'host4')
self.assertEqual(result[1].host, 'host5')

@mock.patch('nova.objects.AggregateList.get_all')
@mock.patch('nova.objects.InstanceList.get_by_filters')
@mock.patch('nova.objects.AggregateList.get_by_host')
@mock.patch('nova.context.scatter_gather_skip_cell0')
@mock.patch('nova.objects.BuildRequest.get_by_instance_uuid')
@mock.patch('nova.context.get_admin_context')
def test_same_shard_for_gardener_cluster(self, get_context,
get_by_uuid,
get_by_filters,
gather_host,
get_aggrs):
gardener_cluster = 'kubernetes.io-cluster-shoot--garden--testCluster'
new_instance = fake_instance.fake_instance_obj(
Expand All @@ -309,48 +312,38 @@ def test_same_shard_for_gardener_cluster(self, get_context,
build_req.tags = objects.TagList()

result = self._filter_k8s_hosts(build_req, get_context,
get_by_uuid, get_by_filters,
get_by_uuid, gather_host,
get_aggrs)

get_by_filters.assert_called_once_with(
gather_host.assert_called_once_with(
get_context.return_value,
filters={'metadata': {gardener_cluster: '1'},
'project_id': 'foo'},
expected_attrs=['flavor', 'metadata', 'tags'])
main_db_api.instance_get_host_by_metadata,
gardener_cluster, '1',
filters={'hv_type': 'VMware vCenter Server',
'availability_zone': 'az-2'})

self.assertEqual(2, len(result))
self.assertEqual(result[0].host, 'host4')
self.assertEqual(result[1].host, 'host5')

def _filter_k8s_hosts(self, build_req, get_context, get_by_uuid,
get_by_filters, get_aggrs):
gather_host, get_aggrs):
"""Given a K8S cluster that spans across 3 shards
(vc-a-0, vc-b-0, vc-b-1) and 2 availability zones (az-1, az-2)
where the most k8s hosts are in the vc-b-1 shard. When there is
a RequestSpec for 'az-2', then the hosts in 'vc-b-1' shard must
be returned, since it's the dominant shard.
"""

sibling1 = fake_instance.fake_instance_obj(
get_context.return_value, availability_zone='az-1', host='host1')
sibling2 = fake_instance.fake_instance_obj(
get_context.return_value, availability_zone='az-2', host='host3')
sibling3 = fake_instance.fake_instance_obj(
get_context.return_value, availability_zone='az-2', host='host4')
sibling4 = fake_instance.fake_instance_obj(
get_context.return_value, availability_zone='az-2', host='host5')

get_by_uuid.return_value = build_req
get_by_filters.return_value = objects.InstanceList(
objects=[sibling1, sibling2, sibling3, sibling4])
gather_host.return_value = 'host5'

self.filt_cls._PROJECT_SHARD_CACHE['foo'] = ['sharding_enabled',
'vc-a-1']
agg1 = objects.Aggregate(id=1, name='vc-a-0', hosts=['host1'])
agg2 = objects.Aggregate(id=2, name='vc-b-0', hosts=['host2', 'host3'])
agg3 = objects.Aggregate(id=3, name='vc-b-1', hosts=['host4', 'host5'])

get_aggrs.return_value = [agg1, agg2, agg3]
get_aggrs.return_value = [agg3]

host1 = fakes.FakeHostState('host1', 'compute',
{'aggregates': [agg1]})
Expand All @@ -374,13 +367,13 @@ def _filter_k8s_hosts(self, build_req, get_context, get_by_uuid,
return list(self.filt_cls.filter_all(
[host1, host2, host3, host4, host5], spec_obj))

@mock.patch('nova.objects.AggregateList.get_all')
@mock.patch('nova.objects.InstanceList.get_by_filters')
@mock.patch('nova.objects.AggregateList.get_by_host')
@mock.patch('nova.context.scatter_gather_skip_cell0')
@mock.patch('nova.objects.BuildRequest.get_by_instance_uuid')
@mock.patch('nova.context.get_admin_context')
def test_k8s_bypass_hana_flavors(self, get_context,
get_by_uuid,
get_by_filters,
gather_host,
get_aggrs):
gardener_cluster = 'kubernetes.io-cluster-shoot--garden--testCluster'
hana_flavor = fake_flavor.fake_flavor_obj(
Expand All @@ -395,16 +388,7 @@ def test_k8s_bypass_hana_flavors(self, get_context,
build_req.instance = new_instance
build_req.tags = objects.TagList()

sibling1 = fake_instance.fake_instance_obj(
get_context.return_value, availability_zone='az-1',
host='host1')
hana_sibling = fake_instance.fake_instance_obj(
get_context.return_value, availability_zone='az-1',
flavor=hana_flavor, host='host2')

get_by_uuid.return_value = build_req
get_by_filters.return_value = objects.InstanceList(
objects=[sibling1, hana_sibling])

self.filt_cls._PROJECT_SHARD_CACHE['baz'] = ['sharding_enabled',
'vc-a-1']
Expand All @@ -431,6 +415,7 @@ def test_k8s_bypass_hana_flavors(self, get_context,
result = list(self.filt_cls.filter_all([host1, host2, host3],
spec_obj))

gather_host.assert_not_called()
self.assertEqual(3, len(result))
self.assertEqual(result[0].host, 'host1')
self.assertEqual(result[1].host, 'host2')
Expand Down

0 comments on commit 5dd9c7b

Please sign in to comment.