-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
K8s shard affinity #415
base: stable/xena-m3
Are you sure you want to change the base?
K8s shard affinity #415
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2087,6 +2087,86 @@ def instance_get_active_by_window_joined(context, begin, end=None, | |
return _instances_fill_metadata(context, query.all(), manual_joins) | ||
|
||
|
||
@require_context | ||
@pick_context_manager_reader_allow_async | ||
def get_k8s_hosts_by_instances_tag(context, tag, filters=None): | ||
"""Get the list of K8S hosts and the number of instances associated to | ||
the K8S cluster running on that host, querying by instances tags. | ||
|
||
Returns a list of tuple | ||
[(host1, 3), (host2, 1)] | ||
""" | ||
count_label = func.count('*').label('count') | ||
query = context.session.query(models.Instance.host, count_label). \ | ||
join(models.Instance.tags) | ||
query = _handle_k8s_hosts_query_filters(query, filters) | ||
query = query.filter(models.Instance.deleted == 0, | ||
models.Tag.tag == tag) | ||
|
||
query = query.group_by(models.Instance.host). \ | ||
order_by(sql.desc(count_label)) | ||
|
||
return query.all() | ||
|
||
|
||
@require_context | ||
@pick_context_manager_reader_allow_async | ||
def get_k8s_hosts_by_instances_metadata(context, meta_key, meta_value, | ||
filters=None): | ||
"""Get the list of K8S hosts and the number of instances associated to | ||
the K8S cluster running on that host, querying by instances metadata. | ||
|
||
Returns a list of tuple | ||
[(host1, 3), (host2, 1)] | ||
""" | ||
count_label = func.count('*').label('count') | ||
query = context.session.query(models.Instance.host, count_label). \ | ||
join(models.InstanceMetadata, | ||
models.InstanceMetadata.instance_uuid == models.Instance.uuid) | ||
query = _handle_k8s_hosts_query_filters(query, filters) | ||
query = query.filter(models.Instance.deleted == 0, | ||
models.InstanceMetadata.deleted == 0, | ||
models.InstanceMetadata.key == meta_key, | ||
models.InstanceMetadata.value == meta_value) | ||
query = query.group_by(models.Instance.host). \ | ||
order_by(sql.desc(count_label)) | ||
|
||
return query.all() | ||
|
||
|
||
def _handle_k8s_hosts_query_filters(query, filters=None): | ||
"""Applies filters to the K8S related queries. | ||
|
||
Supported filters: | ||
filters = { | ||
'hv_type': 'The hypervisor_type', | ||
'availability_zone': 'The availability zone' | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Missing |
||
""" | ||
if not filters: | ||
return query | ||
hv_type = filters.get('hv_type') | ||
if hv_type: | ||
query = query.join( | ||
models.ComputeNode, | ||
sql.and_( | ||
models.ComputeNode.deleted == 0, | ||
models.ComputeNode.hypervisor_hostname == models.Instance.node, | ||
models.ComputeNode.hypervisor_type == hv_type)) | ||
|
||
availability_zone = filters.get('availability_zone') | ||
if availability_zone: | ||
query = query.filter( | ||
models.Instance.availability_zone == availability_zone) | ||
|
||
skip_instance_uuid = filters.get('skip_instance_uuid') | ||
if skip_instance_uuid: | ||
query.filter( | ||
models.Instance.uuid != skip_instance_uuid) | ||
Comment on lines
+2164
to
+2165
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Missing a |
||
|
||
return query | ||
|
||
|
||
def _instance_get_all_query(context, project_only=False, joins=None): | ||
if joins is None: | ||
joins = ['info_cache', 'security_groups'] | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,15 +15,24 @@ | |
from oslo_log import log as logging | ||
|
||
import nova.conf | ||
from nova import context as nova_context | ||
from nova.objects.build_request import BuildRequest | ||
from nova.objects.instance import Instance | ||
from nova.scheduler import filters | ||
from nova.scheduler.mixins import ProjectTagMixin | ||
from nova.scheduler import utils | ||
from nova import utils as nova_utils | ||
from nova.virt.vmwareapi import shard_util | ||
|
||
LOG = logging.getLogger(__name__) | ||
|
||
CONF = nova.conf.CONF | ||
|
||
GARDENER_PREFIX = "kubernetes.io-cluster-" | ||
KKS_PREFIX = "kubernikus:kluster" | ||
HANA_PREFIX = "hana_" | ||
VMWARE_HV_TYPE = 'VMware vCenter Server' | ||
|
||
|
||
class ShardFilter(filters.BaseHostFilter, ProjectTagMixin): | ||
"""Filter hosts based on the vcenter-shard configured in their aggregate | ||
|
@@ -32,6 +41,8 @@ class ShardFilter(filters.BaseHostFilter, ProjectTagMixin): | |
|
||
Alternatively the project may have the "sharding_enabled" tag set, which | ||
enables the project for hosts in all shards. | ||
|
||
Implements `filter_all` directly instead of `host_passes` | ||
""" | ||
|
||
_ALL_SHARDS = "sharding_enabled" | ||
|
@@ -45,11 +56,64 @@ def _get_shards(self, project_id): | |
# _get_shards() so it's clear what we return | ||
return self._get_tags(project_id) | ||
|
||
def host_passes(self, host_state, spec_obj): | ||
def _get_k8s_shard(self, spec_obj): | ||
"""Returns the dominant shard of a K8S cluster. | ||
|
||
Returns None in any of the following scenarios: | ||
- the request is not for an instance that's part of a K8S cluster | ||
- this is the first instance of a new cluster | ||
- the request is for a HANA flavor | ||
- the request is for a resize/migration | ||
""" | ||
if (spec_obj.flavor.name.startswith(HANA_PREFIX) or | ||
utils.request_is_resize(spec_obj)): | ||
return None | ||
elevated = nova_context.get_admin_context() | ||
build_request = None | ||
instance = None | ||
|
||
def _get_tags(): | ||
return build_request.tags if build_request \ | ||
else instance.tags | ||
|
||
def _get_metadata(): | ||
return build_request.instance.metadata if build_request \ | ||
else instance.metadata | ||
|
||
check_type = spec_obj.get_scheduler_hint('_nova_check_type') | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this safe i.e. do we have a build request every time we do not set Would it make sense to try and get the |
||
if not check_type: | ||
build_request = BuildRequest.get_by_instance_uuid( | ||
elevated, spec_obj.instance_uuid) | ||
if not build_request: | ||
instance = Instance.get_by_uuid( | ||
elevated, spec_obj.instance_uuid, | ||
expected_attrs=['tags', 'metadata']) | ||
if not instance and not build_request: | ||
LOG.warning("There were no build_request and no instance " | ||
"for the uuid %s", spec_obj.instance_uuid) | ||
return | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
||
k8s_shard_aggrs = shard_util.get_sorted_k8s_shard_aggregates( | ||
elevated, _get_metadata(), _get_tags(), spec_obj.availability_zone) | ||
|
||
if not k8s_shard_aggrs: | ||
return None | ||
|
||
return k8s_shard_aggrs[0].name | ||
|
||
def filter_all(self, filter_obj_list, spec_obj): | ||
# Only VMware | ||
if utils.is_non_vmware_spec(spec_obj): | ||
return True | ||
LOG.debug("ShardFilter is not applicable for this non-vmware " | ||
"request") | ||
return filter_obj_list | ||
|
||
k8s_shard = self._get_k8s_shard(spec_obj) | ||
|
||
return [host_state for host_state in filter_obj_list | ||
if self._host_passes(host_state, spec_obj, k8s_shard)] | ||
|
||
def _host_passes(self, host_state, spec_obj, k8s_shard): | ||
host_shard_aggrs = [aggr for aggr in host_state.aggregates | ||
if aggr.name.startswith(self._SHARD_PREFIX)] | ||
|
||
|
@@ -79,18 +143,26 @@ def host_passes(self, host_state, spec_obj): | |
if self._ALL_SHARDS in shards: | ||
LOG.debug('project enabled for all shards %(project_shards)s.', | ||
{'project_shards': shards}) | ||
return True | ||
elif host_shard_names & set(shards): | ||
LOG.debug('%(host_state)s shard %(host_shard)s found in project ' | ||
'shards %(project_shards)s.', | ||
{'host_state': host_state, | ||
'host_shard': host_shard_names, | ||
'project_shards': shards}) | ||
return True | ||
else: | ||
LOG.debug('%(host_state)s shard %(host_shard)s not found in ' | ||
'project shards %(project_shards)s.', | ||
{'host_state': host_state, | ||
'host_shard': host_shard_names, | ||
'project_shards': shards}) | ||
return False | ||
|
||
if k8s_shard: | ||
if k8s_shard not in host_shard_names: | ||
LOG.debug("%(host_state)s is not part of the K8S " | ||
"cluster's shard '%(k8s_shard)s'", | ||
{'host_state': host_state, | ||
'k8s_shard': k8s_shard}) | ||
return False | ||
|
||
return True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A little docstring would be nice, e.g. why we need a lock here.