Skip to content

Commit

Permalink
Shard affinity for k8s workloads
Browse files Browse the repository at this point in the history
Instances that are part of the same K8S cluster will get scheduled
to the same shard (vCenter).

It identifies the K8S cluster by looking at the tags or metadata
set by the k8s cluster orchestrators when creating the instances.
Kubernikus and Gardener are supported for now.

It queries the database to determine the dominant shard, by looking
which shard contains the most instances of a given K8S cluster.

BigVMs are "out of the picture" and should not adhere to shards.
They are only scheduled on their allocated hosts.

The K8S logic is skipped for offline migrations (and thus for
resizes too) since offline migration is a non-usecase for K8S.

Change-Id: I73d04ba295d23db1d4728e9db124fc2a27c2d4bc
  • Loading branch information
leust committed Aug 22, 2023
1 parent fe8d4cf commit be090e5
Show file tree
Hide file tree
Showing 3 changed files with 505 additions and 47 deletions.
74 changes: 74 additions & 0 deletions nova/db/main/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2087,6 +2087,80 @@ def instance_get_active_by_window_joined(context, begin, end=None,
return _instances_fill_metadata(context, query.all(), manual_joins)


@require_context
@pick_context_manager_reader_allow_async
def get_k8s_hosts_by_instances_tag(context, tag, filters=None):
"""Get the list of K8S hosts and the number of instances associated to
that K8S running on that host, querying by instances tags.
Returns a list of tuple
[(host1, 3), (host2, 1)]
"""
count_label = func.count('*').label('count')
query = context.session.query(models.Instance, count_label). \
join(models.Instance.tags)
query = _handle_k8s_hosts_query_filters(query, filters)
query = query.filter(models.Instance.deleted == 0,
models.Tag.tag == tag)

query = query.group_by(models.Instance.host). \
order_by(sql.desc(count_label))

return query.all()


@require_context
@pick_context_manager_reader_allow_async
def get_k8s_hosts_by_instances_metadata(context, meta_key, meta_value,
filters=None):
"""Get the list of K8S hosts and the number of instances associated to
that K8S running on that host, querying by instances metadata.
Returns a list of tuple
[(host1, 3), (host2, 1)]
"""
count_label = func.count('*').label('count')
query = context.session.query(models.Instance.host, count_label). \
join(models.InstanceMetadata,
models.InstanceMetadata.instance_uuid == models.Instance.uuid)
query = _handle_k8s_hosts_query_filters(query, filters)
query = query.filter(models.Instance.deleted == 0,
models.InstanceMetadata.deleted == 0,
models.InstanceMetadata.key == meta_key,
models.InstanceMetadata.value == meta_value)
query = query.group_by(models.Instance.host). \
order_by(sql.desc(count_label))

return query.all()


def _handle_k8s_hosts_query_filters(query, filters=None):
"""Applies filters to the K8S related queries.
Supported filters:
filters = {
'hv_type': 'The hypervisor_type',
'availability_zone': 'The availability zone'
}
"""
if not filters:
return query
hv_type = filters.get('hv_type')
if hv_type:
query = query.join(
models.ComputeNode,
models.Instance.node == models.ComputeNode.hypervisor_hostname)

availability_zone = filters.get('availability_zone')
if availability_zone:
query = query.filter(
models.Instance.availability_zone == availability_zone)
if hv_type:
query = query.filter(models.ComputeNode.deleted == 0,
models.ComputeNode.hypervisor_type == hv_type)
return query


def _instance_get_all_query(context, project_only=False, joins=None):
if joins is None:
joins = ['info_cache', 'security_groups']
Expand Down
124 changes: 120 additions & 4 deletions nova/scheduler/filters/shard_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@
from oslo_log import log as logging

import nova.conf
from nova import context as nova_context
from nova.db.main import api as main_db_api
from nova.objects.aggregate import AggregateList
from nova.objects.build_request import BuildRequest
from nova.objects.instance import Instance
from nova.scheduler import filters
from nova.scheduler import utils
from nova import utils as nova_utils
Expand All @@ -28,6 +33,10 @@
CONF = nova.conf.CONF

_SERVICE_AUTH = None
GARDENER_PREFIX = "kubernetes.io-cluster-"
KKS_PREFIX = "kubernikus:kluster"
HANA_PREFIX = "hana_"
VMWARE_HV_TYPE = 'VMware vCenter Server'


class ShardFilter(filters.BaseHostFilter):
Expand All @@ -37,6 +46,8 @@ class ShardFilter(filters.BaseHostFilter):
Alternatively the project may have the "sharding_enabled" tag set, which
enables the project for hosts in all shards.
Implements `filter_all` directly instead of `host_passes`
"""

_PROJECT_SHARD_CACHE = {}
Expand Down Expand Up @@ -114,11 +125,106 @@ def _get_shards(self, project_id):

return self._PROJECT_SHARD_CACHE.get(project_id)

def host_passes(self, host_state, spec_obj):
def _get_k8s_shard(self, spec_obj):
"""Returns the dominant shard of a K8S cluster.
Returns None if the request is not for an instance that's part of
a K8S cluster, or if this is the first instance of a new cluster.
"""
if (spec_obj.flavor.name.startswith(HANA_PREFIX) or
utils.request_is_resize(spec_obj)):
return None
elevated = nova_context.get_admin_context()
build_request = None
instance = None

def _get_tags():
return build_request.tags if build_request \
else instance.tags

def _get_metadata():
return build_request.instance.metadata if build_request \
else instance.metadata

check_type = spec_obj.get_scheduler_hint('_nova_check_type')
if not check_type:
build_request = BuildRequest.get_by_instance_uuid(
elevated, spec_obj.instance_uuid)
else:
instance = Instance.get_by_uuid(
elevated, spec_obj.instance_uuid,
expected_attrs=['tags', 'metadata'])

kks_tag = next((t.tag for t in _get_tags()
if t.tag.startswith(KKS_PREFIX)), None)
gardener_meta = None
if not kks_tag:
gardener_meta = \
{k: v for k, v in _get_metadata().items()
if k.startswith(GARDENER_PREFIX)}

if not kks_tag and not gardener_meta:
return None

q_filters = {'hv_type': VMWARE_HV_TYPE}
if spec_obj.availability_zone:
q_filters['availability_zone'] = spec_obj.availability_zone

results = None
if kks_tag:
results = nova_context.scatter_gather_skip_cell0(
elevated, main_db_api.get_k8s_hosts_by_instances_tag,
kks_tag, filters=q_filters)
elif gardener_meta:
(meta_key, meta_value) = next(iter(gardener_meta.items()))
results = nova_context.scatter_gather_skip_cell0(
elevated, main_db_api.get_k8s_hosts_by_instances_metadata,
meta_key, meta_value, filters=q_filters)

if not results:
return None

# hosts with count of instances from this K8S cluster
# {host: <count>}
k8s_hosts = {}

for cell_result in results.values():
if not nova_context.is_cell_failure_sentinel(cell_result):
cell_hosts = dict(cell_result)
k8s_hosts = {
h: k8s_hosts.get(h, 0) + cell_hosts.get(h, 0)
for h in set(cell_hosts) | set(k8s_hosts)
}

if not k8s_hosts:
return None

all_shard_aggrs = [agg for agg in AggregateList.get_all(elevated)
if agg.name.startswith(self._SHARD_PREFIX)]
if not all_shard_aggrs:
return None

shard_aggr = sorted(
all_shard_aggrs,
reverse=True,
key=lambda aggr: sum(i for h, i in k8s_hosts.items()
if h in aggr.hosts))[0]

return shard_aggr.name

def filter_all(self, filter_obj_list, spec_obj):
# Only VMware
if utils.is_non_vmware_spec(spec_obj):
return True
LOG.debug("ShardFilter is not applicable for this non-vmware "
"request")
return filter_obj_list

k8s_shard = self._get_k8s_shard(spec_obj)

return [host_state for host_state in filter_obj_list
if self._host_passes(host_state, spec_obj, k8s_shard)]

def _host_passes(self, host_state, spec_obj, k8s_shard):
host_shard_aggrs = [aggr for aggr in host_state.aggregates
if aggr.name.startswith(self._SHARD_PREFIX)]

Expand Down Expand Up @@ -148,18 +254,28 @@ def host_passes(self, host_state, spec_obj):
if self._ALL_SHARDS in shards:
LOG.debug('project enabled for all shards %(project_shards)s.',
{'project_shards': shards})
return True
elif host_shard_names & set(shards):
LOG.debug('%(host_state)s shard %(host_shard)s found in project '
'shards %(project_shards)s.',
{'host_state': host_state,
'host_shard': host_shard_names,
'project_shards': shards})
return True
else:
LOG.debug('%(host_state)s shard %(host_shard)s not found in '
'project shards %(project_shards)s.',
{'host_state': host_state,
'host_shard': host_shard_names,
'project_shards': shards})
return False

if k8s_shard:
matches = any(host_shard == k8s_shard
for host_shard in host_shard_names)
if not matches:
LOG.debug("%(host_state)s is not part of the requested "
"K8S cluster shard '%(k8s_shard)s'",
{'host_state': host_state,
'k8s_shard': k8s_shard})
return matches

return True
Loading

0 comments on commit be090e5

Please sign in to comment.