Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

K8s shard affinity #415

Open
wants to merge 2 commits into
base: stable/xena-m3
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions nova/compute/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -1733,6 +1733,15 @@ def _do_validation(context, instance, group):

_do_validation(context, instance, group)

def _validate_driver_instance_group_policy(self, context, instance):
lock_id = "driver-instance-group-validation-%s" % instance.uuid

@utils.synchronized(lock_id)
def _do_validation(context, instance):
self.driver.validate_instance_group_policy(context, instance)

_do_validation(context, instance)
Comment on lines +1736 to +1743

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A little docstring would be nice, e.g. why we need a lock here.


def _log_original_error(self, exc_info, instance_uuid):
LOG.error('Error: %s', exc_info[1], instance_uuid=instance_uuid,
exc_info=exc_info)
Expand Down Expand Up @@ -2407,6 +2416,8 @@ def _build_and_run_instance(self, context, instance, image, injected_files,
# the host is set on the instance.
self._validate_instance_group_policy(context, instance,
scheduler_hints)
self._validate_driver_instance_group_policy(context,
instance)
image_meta = objects.ImageMeta.from_dict(image)

with self._build_resources(context, instance,
Expand Down
80 changes: 80 additions & 0 deletions nova/db/main/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2087,6 +2087,86 @@ def instance_get_active_by_window_joined(context, begin, end=None,
return _instances_fill_metadata(context, query.all(), manual_joins)


@require_context
@pick_context_manager_reader_allow_async
def get_k8s_hosts_by_instances_tag(context, tag, filters=None):
"""Get the list of K8S hosts and the number of instances associated to
the K8S cluster running on that host, querying by instances tags.

Returns a list of tuple
[(host1, 3), (host2, 1)]
"""
count_label = func.count('*').label('count')
query = context.session.query(models.Instance.host, count_label). \
join(models.Instance.tags)
query = _handle_k8s_hosts_query_filters(query, filters)
query = query.filter(models.Instance.deleted == 0,
models.Tag.tag == tag)

query = query.group_by(models.Instance.host). \
order_by(sql.desc(count_label))

return query.all()


@require_context
@pick_context_manager_reader_allow_async
def get_k8s_hosts_by_instances_metadata(context, meta_key, meta_value,
filters=None):
"""Get the list of K8S hosts and the number of instances associated to
the K8S cluster running on that host, querying by instances metadata.

Returns a list of tuple
[(host1, 3), (host2, 1)]
"""
count_label = func.count('*').label('count')
query = context.session.query(models.Instance.host, count_label). \
join(models.InstanceMetadata,
models.InstanceMetadata.instance_uuid == models.Instance.uuid)
query = _handle_k8s_hosts_query_filters(query, filters)
query = query.filter(models.Instance.deleted == 0,
models.InstanceMetadata.deleted == 0,
models.InstanceMetadata.key == meta_key,
models.InstanceMetadata.value == meta_value)
query = query.group_by(models.Instance.host). \
order_by(sql.desc(count_label))

return query.all()


def _handle_k8s_hosts_query_filters(query, filters=None):
"""Applies filters to the K8S related queries.

Supported filters:
filters = {
'hv_type': 'The hypervisor_type',
'availability_zone': 'The availability zone'
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing skip_instance_uuid in the documentation

"""
if not filters:
return query
hv_type = filters.get('hv_type')
if hv_type:
query = query.join(
models.ComputeNode,
sql.and_(
models.ComputeNode.deleted == 0,
models.ComputeNode.hypervisor_hostname == models.Instance.node,
models.ComputeNode.hypervisor_type == hv_type))

availability_zone = filters.get('availability_zone')
if availability_zone:
query = query.filter(
models.Instance.availability_zone == availability_zone)

skip_instance_uuid = filters.get('skip_instance_uuid')
if skip_instance_uuid:
query.filter(
models.Instance.uuid != skip_instance_uuid)
Comment on lines +2164 to +2165

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing a query =


return query


def _instance_get_all_query(context, project_only=False, joins=None):
if joins is None:
joins = ['info_cache', 'security_groups']
Expand Down
11 changes: 11 additions & 0 deletions nova/objects/compute_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,17 @@ def get_by_hypervisor_type(cls, context, hv_type):
return base.obj_make_list(context, cls(context), objects.ComputeNode,
db_computes)

@base.remotable_classmethod
def get_k8s_hosts_by_instances_metadata(cls, context, meta_key, meta_value,
filters=None):
return db.get_k8s_hosts_by_instances_metadata(
context, meta_key, meta_value, filters=filters)

@base.remotable_classmethod
def get_k8s_hosts_by_instances_tag(cls, context, tag, filters=None):
return db.get_k8s_hosts_by_instances_tag(
context, tag, filters=filters)


def _get_node_empty_ratio(context, max_count):
"""Query the DB for non-deleted compute_nodes with 0.0/None alloc ratios
Expand Down
80 changes: 76 additions & 4 deletions nova/scheduler/filters/shard_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,24 @@
from oslo_log import log as logging

import nova.conf
from nova import context as nova_context
from nova.objects.build_request import BuildRequest
from nova.objects.instance import Instance
from nova.scheduler import filters
from nova.scheduler.mixins import ProjectTagMixin
from nova.scheduler import utils
from nova import utils as nova_utils
from nova.virt.vmwareapi import shard_util

LOG = logging.getLogger(__name__)

CONF = nova.conf.CONF

GARDENER_PREFIX = "kubernetes.io-cluster-"
KKS_PREFIX = "kubernikus:kluster"
HANA_PREFIX = "hana_"
VMWARE_HV_TYPE = 'VMware vCenter Server'


class ShardFilter(filters.BaseHostFilter, ProjectTagMixin):
"""Filter hosts based on the vcenter-shard configured in their aggregate
Expand All @@ -32,6 +41,8 @@ class ShardFilter(filters.BaseHostFilter, ProjectTagMixin):

Alternatively the project may have the "sharding_enabled" tag set, which
enables the project for hosts in all shards.

Implements `filter_all` directly instead of `host_passes`
"""

_ALL_SHARDS = "sharding_enabled"
Expand All @@ -45,11 +56,64 @@ def _get_shards(self, project_id):
# _get_shards() so it's clear what we return
return self._get_tags(project_id)

def host_passes(self, host_state, spec_obj):
def _get_k8s_shard(self, spec_obj):
"""Returns the dominant shard of a K8S cluster.

Returns None in any of the following scenarios:
- the request is not for an instance that's part of a K8S cluster
- this is the first instance of a new cluster
- the request is for a HANA flavor
- the request is for a resize/migration
"""
if (spec_obj.flavor.name.startswith(HANA_PREFIX) or
utils.request_is_resize(spec_obj)):
return None
elevated = nova_context.get_admin_context()
build_request = None
instance = None

def _get_tags():
return build_request.tags if build_request \
else instance.tags

def _get_metadata():
return build_request.instance.metadata if build_request \
else instance.metadata

check_type = spec_obj.get_scheduler_hint('_nova_check_type')

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this safe i.e. do we have a build request every time we do not set _nova_check_type? I see resize, live-migrate and rebuild setting it, but e.g. not unshelve.

Would it make sense to try and get the BuildRequest if we don't have a check_type, but fall back to getting the instance instead?

if not check_type:
build_request = BuildRequest.get_by_instance_uuid(
elevated, spec_obj.instance_uuid)
if not build_request:
instance = Instance.get_by_uuid(
elevated, spec_obj.instance_uuid,
expected_attrs=['tags', 'metadata'])
if not instance and not build_request:
LOG.warning("There were no build_request and no instance "
"for the uuid %s", spec_obj.instance_uuid)
return
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return None , so that all paths return something explicitly.


k8s_shard_aggrs = shard_util.get_sorted_k8s_shard_aggregates(
elevated, _get_metadata(), _get_tags(), spec_obj.availability_zone)

if not k8s_shard_aggrs:
return None

return k8s_shard_aggrs[0].name

def filter_all(self, filter_obj_list, spec_obj):
# Only VMware
if utils.is_non_vmware_spec(spec_obj):
return True
LOG.debug("ShardFilter is not applicable for this non-vmware "
"request")
return filter_obj_list

k8s_shard = self._get_k8s_shard(spec_obj)

return [host_state for host_state in filter_obj_list
if self._host_passes(host_state, spec_obj, k8s_shard)]

def _host_passes(self, host_state, spec_obj, k8s_shard):
host_shard_aggrs = [aggr for aggr in host_state.aggregates
if aggr.name.startswith(self._SHARD_PREFIX)]

Expand Down Expand Up @@ -79,18 +143,26 @@ def host_passes(self, host_state, spec_obj):
if self._ALL_SHARDS in shards:
LOG.debug('project enabled for all shards %(project_shards)s.',
{'project_shards': shards})
return True
elif host_shard_names & set(shards):
LOG.debug('%(host_state)s shard %(host_shard)s found in project '
'shards %(project_shards)s.',
{'host_state': host_state,
'host_shard': host_shard_names,
'project_shards': shards})
return True
else:
LOG.debug('%(host_state)s shard %(host_shard)s not found in '
'project shards %(project_shards)s.',
{'host_state': host_state,
'host_shard': host_shard_names,
'project_shards': shards})
return False

if k8s_shard:
if k8s_shard not in host_shard_names:
LOG.debug("%(host_state)s is not part of the K8S "
"cluster's shard '%(k8s_shard)s'",
{'host_state': host_state,
'k8s_shard': k8s_shard})
return False

return True
2 changes: 1 addition & 1 deletion nova/tests/unit/objects/test_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -1055,7 +1055,7 @@ def obj_name(cls):
'CellMapping': '1.1-5d652928000a5bc369d79d5bde7e497d',
'CellMappingList': '1.1-496ef79bb2ab41041fff8bcb57996352',
'ComputeNode': '1.19-af6bd29a6c3b225da436a0d8487096f2',
'ComputeNodeList': '1.17-52f3b0962b1c86b98590144463ebb192',
'ComputeNodeList': '1.17-bb54e3fd5415be274c5515577acafe3d',
'ConsoleAuthToken': '1.1-8da320fb065080eb4d3c2e5c59f8bf52',
'CpuDiagnostics': '1.0-d256f2e442d1b837735fd17dfe8e3d47',
'Destination': '1.4-3b440d29459e2c98987ad5b25ad1cb2c',
Expand Down
Loading