diff --git a/nova/compute/manager.py b/nova/compute/manager.py index e89199cd4bf..c687fe691b9 100644 --- a/nova/compute/manager.py +++ b/nova/compute/manager.py @@ -1733,6 +1733,15 @@ def _do_validation(context, instance, group): _do_validation(context, instance, group) + def _validate_driver_instance_group_policy(self, context, instance): + lock_id = "driver-instance-group-validation-%s" % instance.uuid + + @utils.synchronized(lock_id) + def _do_validation(context, instance): + self.driver.validate_instance_group_policy(context, instance) + + _do_validation(context, instance) + def _log_original_error(self, exc_info, instance_uuid): LOG.error('Error: %s', exc_info[1], instance_uuid=instance_uuid, exc_info=exc_info) @@ -2407,6 +2416,8 @@ def _build_and_run_instance(self, context, instance, image, injected_files, # the host is set on the instance. self._validate_instance_group_policy(context, instance, scheduler_hints) + self._validate_driver_instance_group_policy(context, + instance) image_meta = objects.ImageMeta.from_dict(image) with self._build_resources(context, instance, diff --git a/nova/db/main/api.py b/nova/db/main/api.py index ae245351063..fa8485a1603 100644 --- a/nova/db/main/api.py +++ b/nova/db/main/api.py @@ -2087,6 +2087,86 @@ def instance_get_active_by_window_joined(context, begin, end=None, return _instances_fill_metadata(context, query.all(), manual_joins) +@require_context +@pick_context_manager_reader_allow_async +def get_k8s_hosts_by_instances_tag(context, tag, filters=None): + """Get the list of K8S hosts and the number of instances associated to + the K8S cluster running on that host, querying by instances tags. + + Returns a list of tuple + [(host1, 3), (host2, 1)] + """ + count_label = func.count('*').label('count') + query = context.session.query(models.Instance.host, count_label). \ + join(models.Instance.tags) + query = _handle_k8s_hosts_query_filters(query, filters) + query = query.filter(models.Instance.deleted == 0, + models.Tag.tag == tag) + + query = query.group_by(models.Instance.host). \ + order_by(sql.desc(count_label)) + + return query.all() + + +@require_context +@pick_context_manager_reader_allow_async +def get_k8s_hosts_by_instances_metadata(context, meta_key, meta_value, + filters=None): + """Get the list of K8S hosts and the number of instances associated to + the K8S cluster running on that host, querying by instances metadata. + + Returns a list of tuple + [(host1, 3), (host2, 1)] + """ + count_label = func.count('*').label('count') + query = context.session.query(models.Instance.host, count_label). \ + join(models.InstanceMetadata, + models.InstanceMetadata.instance_uuid == models.Instance.uuid) + query = _handle_k8s_hosts_query_filters(query, filters) + query = query.filter(models.Instance.deleted == 0, + models.InstanceMetadata.deleted == 0, + models.InstanceMetadata.key == meta_key, + models.InstanceMetadata.value == meta_value) + query = query.group_by(models.Instance.host). \ + order_by(sql.desc(count_label)) + + return query.all() + + +def _handle_k8s_hosts_query_filters(query, filters=None): + """Applies filters to the K8S related queries. + + Supported filters: + filters = { + 'hv_type': 'The hypervisor_type', + 'availability_zone': 'The availability zone' + } + """ + if not filters: + return query + hv_type = filters.get('hv_type') + if hv_type: + query = query.join( + models.ComputeNode, + sql.and_( + models.ComputeNode.deleted == 0, + models.ComputeNode.hypervisor_hostname == models.Instance.node, + models.ComputeNode.hypervisor_type == hv_type)) + + availability_zone = filters.get('availability_zone') + if availability_zone: + query = query.filter( + models.Instance.availability_zone == availability_zone) + + skip_instance_uuid = filters.get('skip_instance_uuid') + if skip_instance_uuid: + query.filter( + models.Instance.uuid != skip_instance_uuid) + + return query + + def _instance_get_all_query(context, project_only=False, joins=None): if joins is None: joins = ['info_cache', 'security_groups'] diff --git a/nova/objects/compute_node.py b/nova/objects/compute_node.py index 62cf9d686b5..cf3a778100a 100644 --- a/nova/objects/compute_node.py +++ b/nova/objects/compute_node.py @@ -506,6 +506,17 @@ def get_by_hypervisor_type(cls, context, hv_type): return base.obj_make_list(context, cls(context), objects.ComputeNode, db_computes) + @base.remotable_classmethod + def get_k8s_hosts_by_instances_metadata(cls, context, meta_key, meta_value, + filters=None): + return db.get_k8s_hosts_by_instances_metadata( + context, meta_key, meta_value, filters=filters) + + @base.remotable_classmethod + def get_k8s_hosts_by_instances_tag(cls, context, tag, filters=None): + return db.get_k8s_hosts_by_instances_tag( + context, tag, filters=filters) + def _get_node_empty_ratio(context, max_count): """Query the DB for non-deleted compute_nodes with 0.0/None alloc ratios diff --git a/nova/scheduler/filters/shard_filter.py b/nova/scheduler/filters/shard_filter.py index bc9b5ff7f40..26efbbe07eb 100644 --- a/nova/scheduler/filters/shard_filter.py +++ b/nova/scheduler/filters/shard_filter.py @@ -15,15 +15,24 @@ from oslo_log import log as logging import nova.conf +from nova import context as nova_context +from nova.objects.build_request import BuildRequest +from nova.objects.instance import Instance from nova.scheduler import filters from nova.scheduler.mixins import ProjectTagMixin from nova.scheduler import utils from nova import utils as nova_utils +from nova.virt.vmwareapi import shard_util LOG = logging.getLogger(__name__) CONF = nova.conf.CONF +GARDENER_PREFIX = "kubernetes.io-cluster-" +KKS_PREFIX = "kubernikus:kluster" +HANA_PREFIX = "hana_" +VMWARE_HV_TYPE = 'VMware vCenter Server' + class ShardFilter(filters.BaseHostFilter, ProjectTagMixin): """Filter hosts based on the vcenter-shard configured in their aggregate @@ -32,6 +41,8 @@ class ShardFilter(filters.BaseHostFilter, ProjectTagMixin): Alternatively the project may have the "sharding_enabled" tag set, which enables the project for hosts in all shards. + + Implements `filter_all` directly instead of `host_passes` """ _ALL_SHARDS = "sharding_enabled" @@ -45,11 +56,64 @@ def _get_shards(self, project_id): # _get_shards() so it's clear what we return return self._get_tags(project_id) - def host_passes(self, host_state, spec_obj): + def _get_k8s_shard(self, spec_obj): + """Returns the dominant shard of a K8S cluster. + + Returns None in any of the following scenarios: + - the request is not for an instance that's part of a K8S cluster + - this is the first instance of a new cluster + - the request is for a HANA flavor + - the request is for a resize/migration + """ + if (spec_obj.flavor.name.startswith(HANA_PREFIX) or + utils.request_is_resize(spec_obj)): + return None + elevated = nova_context.get_admin_context() + build_request = None + instance = None + + def _get_tags(): + return build_request.tags if build_request \ + else instance.tags + + def _get_metadata(): + return build_request.instance.metadata if build_request \ + else instance.metadata + + check_type = spec_obj.get_scheduler_hint('_nova_check_type') + if not check_type: + build_request = BuildRequest.get_by_instance_uuid( + elevated, spec_obj.instance_uuid) + if not build_request: + instance = Instance.get_by_uuid( + elevated, spec_obj.instance_uuid, + expected_attrs=['tags', 'metadata']) + if not instance and not build_request: + LOG.warning("There were no build_request and no instance " + "for the uuid %s", spec_obj.instance_uuid) + return + + k8s_shard_aggrs = shard_util.get_sorted_k8s_shard_aggregates( + elevated, _get_metadata(), _get_tags(), spec_obj.availability_zone) + + if not k8s_shard_aggrs: + return None + + return k8s_shard_aggrs[0].name + + def filter_all(self, filter_obj_list, spec_obj): # Only VMware if utils.is_non_vmware_spec(spec_obj): - return True + LOG.debug("ShardFilter is not applicable for this non-vmware " + "request") + return filter_obj_list + + k8s_shard = self._get_k8s_shard(spec_obj) + return [host_state for host_state in filter_obj_list + if self._host_passes(host_state, spec_obj, k8s_shard)] + + def _host_passes(self, host_state, spec_obj, k8s_shard): host_shard_aggrs = [aggr for aggr in host_state.aggregates if aggr.name.startswith(self._SHARD_PREFIX)] @@ -79,14 +143,12 @@ def host_passes(self, host_state, spec_obj): if self._ALL_SHARDS in shards: LOG.debug('project enabled for all shards %(project_shards)s.', {'project_shards': shards}) - return True elif host_shard_names & set(shards): LOG.debug('%(host_state)s shard %(host_shard)s found in project ' 'shards %(project_shards)s.', {'host_state': host_state, 'host_shard': host_shard_names, 'project_shards': shards}) - return True else: LOG.debug('%(host_state)s shard %(host_shard)s not found in ' 'project shards %(project_shards)s.', @@ -94,3 +156,13 @@ def host_passes(self, host_state, spec_obj): 'host_shard': host_shard_names, 'project_shards': shards}) return False + + if k8s_shard: + if k8s_shard not in host_shard_names: + LOG.debug("%(host_state)s is not part of the K8S " + "cluster's shard '%(k8s_shard)s'", + {'host_state': host_state, + 'k8s_shard': k8s_shard}) + return False + + return True diff --git a/nova/tests/unit/objects/test_objects.py b/nova/tests/unit/objects/test_objects.py index 063e8d50810..4b1bdc404b2 100644 --- a/nova/tests/unit/objects/test_objects.py +++ b/nova/tests/unit/objects/test_objects.py @@ -1055,7 +1055,7 @@ def obj_name(cls): 'CellMapping': '1.1-5d652928000a5bc369d79d5bde7e497d', 'CellMappingList': '1.1-496ef79bb2ab41041fff8bcb57996352', 'ComputeNode': '1.19-af6bd29a6c3b225da436a0d8487096f2', - 'ComputeNodeList': '1.17-52f3b0962b1c86b98590144463ebb192', + 'ComputeNodeList': '1.17-bb54e3fd5415be274c5515577acafe3d', 'ConsoleAuthToken': '1.1-8da320fb065080eb4d3c2e5c59f8bf52', 'CpuDiagnostics': '1.0-d256f2e442d1b837735fd17dfe8e3d47', 'Destination': '1.4-3b440d29459e2c98987ad5b25ad1cb2c', diff --git a/nova/tests/unit/scheduler/filters/test_shard_filter.py b/nova/tests/unit/scheduler/filters/test_shard_filter.py index 93570192b4c..827906c2f63 100644 --- a/nova/tests/unit/scheduler/filters/test_shard_filter.py +++ b/nova/tests/unit/scheduler/filters/test_shard_filter.py @@ -19,6 +19,8 @@ from nova import objects from nova.scheduler.filters import shard_filter from nova import test +from nova.tests.unit import fake_flavor +from nova.tests.unit import fake_instance from nova.tests.unit.scheduler import fakes @@ -31,6 +33,13 @@ def setUp(self): 'foo': ['vc-a-0', 'vc-b-0'], 'last_modified': time.time() } + self.fake_instance = fake_instance.fake_instance_obj( + mock.sentinel.ctx, expected_attrs=['metadata', 'tags']) + build_req = objects.BuildRequest() + build_req.instance_uuid = self.fake_instance.uuid + build_req.tags = objects.TagList(objects=[]) + build_req.instance = self.fake_instance + self.fake_build_req = build_req @mock.patch('nova.scheduler.filters.shard_filter.' 'ShardFilter._update_cache') @@ -63,93 +72,138 @@ def set_cache(): ['vc-a-1', 'vc-b-0']) mock_update_cache.assert_called_once() + @mock.patch('nova.objects.BuildRequest.get_by_instance_uuid') @mock.patch('nova.scheduler.filters.utils.aggregate_metadata_get_by_host') - def test_shard_baremetal_passes(self, agg_mock): + def test_shard_baremetal_passes(self, agg_mock, get_by_uuid): + get_by_uuid.return_value = self.fake_build_req aggs = [objects.Aggregate(id=1, name='some-az-a', hosts=['host1']), objects.Aggregate(id=1, name='vc-a-0', hosts=['host1'])] host = fakes.FakeHostState('host1', 'compute', {'aggregates': aggs}) extra_specs = {'capabilities:cpu_arch': 'x86_64'} spec_obj = objects.RequestSpec( context=mock.sentinel.ctx, project_id='foo', - flavor=objects.Flavor(extra_specs=extra_specs)) - self.assertTrue(self.filt_cls.host_passes(host, spec_obj)) - + instance_uuid=self.fake_build_req.instance_uuid, + availability_zone=None, + flavor=fake_flavor.fake_flavor_obj( + mock.sentinel.ctx, expected_attrs=['extra_specs'], + extra_specs=extra_specs)) + self._assert_passes(host, spec_obj, True) + + @mock.patch('nova.objects.BuildRequest.get_by_instance_uuid') @mock.patch('nova.scheduler.filters.shard_filter.' 'ShardFilter._update_cache') @mock.patch('nova.scheduler.filters.utils.aggregate_metadata_get_by_host') - def test_shard_project_not_found(self, agg_mock, mock_update_cache): + def test_shard_project_not_found(self, agg_mock, mock_update_cache, + get_by_uuid): + get_by_uuid.return_value = self.fake_build_req aggs = [objects.Aggregate(id=1, name='some-az-a', hosts=['host1']), objects.Aggregate(id=1, name='vc-a-0', hosts=['host1'])] host = fakes.FakeHostState('host1', 'compute', {'aggregates': aggs}) spec_obj = objects.RequestSpec( context=mock.sentinel.ctx, project_id='bar', - flavor=objects.Flavor(extra_specs={})) - self.assertFalse(self.filt_cls.host_passes(host, spec_obj)) + instance_uuid=self.fake_build_req.instance_uuid, + availability_zone=None, + flavor=fake_flavor.fake_flavor_obj( + mock.sentinel.ctx, expected_attrs=['extra_specs'])) + self._assert_passes(host, spec_obj, False) + @mock.patch('nova.objects.BuildRequest.get_by_instance_uuid') @mock.patch('nova.scheduler.filters.utils.aggregate_metadata_get_by_host') - def test_shard_project_no_shards(self, agg_mock): + def test_shard_project_no_shards(self, agg_mock, get_by_uuid): + get_by_uuid.return_value = self.fake_build_req aggs = [objects.Aggregate(id=1, name='some-az-a', hosts=['host1']), objects.Aggregate(id=1, name='vc-a-0', hosts=['host1'])] host = fakes.FakeHostState('host1', 'compute', {'aggregates': aggs}) spec_obj = objects.RequestSpec( context=mock.sentinel.ctx, project_id='foo', - flavor=objects.Flavor(extra_specs={})) + instance_uuid=self.fake_build_req.instance_uuid, + availability_zone=None, + flavor=fake_flavor.fake_flavor_obj( + mock.sentinel.ctx, expected_attrs=['extra_specs'])) self.filt_cls._PROJECT_TAG_CACHE['foo'] = [] - self.assertFalse(self.filt_cls.host_passes(host, spec_obj)) + self._assert_passes(host, spec_obj, False) + @mock.patch('nova.objects.BuildRequest.get_by_instance_uuid') @mock.patch('nova.scheduler.filters.utils.aggregate_metadata_get_by_host') - def test_shard_host_no_shard_aggregate(self, agg_mock): + def test_shard_host_no_shard_aggregate(self, agg_mock, get_by_uuid): + get_by_uuid.return_value = self.fake_build_req host = fakes.FakeHostState('host1', 'compute', {}) spec_obj = objects.RequestSpec( context=mock.sentinel.ctx, project_id='foo', - flavor=objects.Flavor(extra_specs={})) + instance_uuid=self.fake_build_req.instance_uuid, + availability_zone=None, + flavor=fake_flavor.fake_flavor_obj( + mock.sentinel.ctx, expected_attrs=['extra_specs'])) agg_mock.return_value = {} - self.assertFalse(self.filt_cls.host_passes(host, spec_obj)) + self._assert_passes(host, spec_obj, False) - def test_shard_host_no_shards_in_aggregate(self): + @mock.patch('nova.objects.BuildRequest.get_by_instance_uuid') + def test_shard_host_no_shards_in_aggregate(self, get_by_uuid): + get_by_uuid.return_value = self.fake_build_req aggs = [objects.Aggregate(id=1, name='some-az-a', hosts=['host1'])] host = fakes.FakeHostState('host1', 'compute', {'aggregates': aggs}) spec_obj = objects.RequestSpec( context=mock.sentinel.ctx, project_id='foo', - flavor=objects.Flavor(extra_specs={})) + instance_uuid=self.fake_build_req.instance_uuid, + availability_zone=None, + flavor=fake_flavor.fake_flavor_obj( + mock.sentinel.ctx, expected_attrs=['extra_specs'])) - self.assertFalse(self.filt_cls.host_passes(host, spec_obj)) + self._assert_passes(host, spec_obj, False) - def test_shard_project_shard_match_host_shard(self): + @mock.patch('nova.objects.BuildRequest.get_by_instance_uuid') + def test_shard_project_shard_match_host_shard(self, get_by_uuid): + get_by_uuid.return_value = self.fake_build_req aggs = [objects.Aggregate(id=1, name='some-az-a', hosts=['host1']), objects.Aggregate(id=1, name='vc-a-0', hosts=['host1'])] host = fakes.FakeHostState('host1', 'compute', {'aggregates': aggs}) spec_obj = objects.RequestSpec( context=mock.sentinel.ctx, project_id='foo', - flavor=objects.Flavor(extra_specs={})) + instance_uuid=self.fake_build_req.instance_uuid, + availability_zone=None, + flavor=fake_flavor.fake_flavor_obj( + mock.sentinel.ctx, expected_attrs=['extra_specs'])) - self.assertTrue(self.filt_cls.host_passes(host, spec_obj)) + self._assert_passes(host, spec_obj, True) - def test_shard_project_shard_do_not_match_host_shard(self): + @mock.patch('nova.objects.BuildRequest.get_by_instance_uuid') + def test_shard_project_shard_do_not_match_host_shard(self, get_by_uuid): + get_by_uuid.return_value = self.fake_build_req aggs = [objects.Aggregate(id=1, name='some-az-a', hosts=['host1']), objects.Aggregate(id=1, name='vc-a-1', hosts=['host1'])] host = fakes.FakeHostState('host1', 'compute', {'aggregates': aggs}) spec_obj = objects.RequestSpec( context=mock.sentinel.ctx, project_id='foo', - flavor=objects.Flavor(extra_specs={})) + instance_uuid=self.fake_build_req.instance_uuid, + availability_zone=None, + flavor=fake_flavor.fake_flavor_obj( + mock.sentinel.ctx, expected_attrs=['extra_specs'])) - self.assertFalse(self.filt_cls.host_passes(host, spec_obj)) + self._assert_passes(host, spec_obj, False) - def test_shard_project_has_multiple_shards_per_az(self): + @mock.patch('nova.objects.BuildRequest.get_by_instance_uuid') + def test_shard_project_has_multiple_shards_per_az(self, get_by_uuid): + get_by_uuid.return_value = self.fake_build_req aggs = [objects.Aggregate(id=1, name='some-az-a', hosts=['host1']), objects.Aggregate(id=1, name='vc-a-1', hosts=['host1'])] host = fakes.FakeHostState('host1', 'compute', {'aggregates': aggs}) spec_obj = objects.RequestSpec( context=mock.sentinel.ctx, project_id='foo', - flavor=objects.Flavor(extra_specs={})) + instance_uuid=self.fake_build_req.instance_uuid, + availability_zone=None, + flavor=fake_flavor.fake_flavor_obj( + mock.sentinel.ctx, expected_attrs=['extra_specs'])) self.filt_cls._PROJECT_TAG_CACHE['foo'] = ['vc-a-0', 'vc-a-1', 'vc-b-0'] - self.assertTrue(self.filt_cls.host_passes(host, spec_obj)) + self._assert_passes(host, spec_obj, True) - def test_shard_project_has_multiple_shards_per_az_resize_same_shard(self): + @mock.patch('nova.objects.BuildRequest.get_by_instance_uuid') + def test_shard_project_has_multiple_shards_per_az_resize_same_shard( + self, get_by_uuid): + get_by_uuid.return_value = self.fake_build_req aggs = [objects.Aggregate(id=1, name='some-az-a', hosts=['host1', 'host2']), objects.Aggregate(id=1, name='vc-a-1', hosts=['host1', @@ -157,40 +211,58 @@ def test_shard_project_has_multiple_shards_per_az_resize_same_shard(self): host = fakes.FakeHostState('host1', 'compute', {'aggregates': aggs}) spec_obj = objects.RequestSpec( context=mock.sentinel.ctx, project_id='foo', - flavor=objects.Flavor(extra_specs={}), + instance_uuid=self.fake_build_req.instance_uuid, + availability_zone=None, + flavor=fake_flavor.fake_flavor_obj( + mock.sentinel.ctx, expected_attrs=['extra_specs']), scheduler_hints=dict(_nova_check_type=['resize'], source_host=['host2'])) self.filt_cls._PROJECT_TAG_CACHE['foo'] = ['vc-a-0', 'vc-a-1', 'vc-b-0'] - self.assertTrue(self.filt_cls.host_passes(host, spec_obj)) + self._assert_passes(host, spec_obj, True) - def test_shard_project_has_multiple_shards_per_az_resize_other_shard(self): + @mock.patch('nova.objects.BuildRequest.get_by_instance_uuid') + def test_shard_project_has_multiple_shards_per_az_resize_other_shard( + self, get_by_uuid): + get_by_uuid.return_value = self.fake_build_req aggs = [objects.Aggregate(id=1, name='some-az-a', hosts=['host1', 'host2']), objects.Aggregate(id=1, name='vc-a-1', hosts=['host1'])] host = fakes.FakeHostState('host1', 'compute', {'aggregates': aggs}) spec_obj = objects.RequestSpec( context=mock.sentinel.ctx, project_id='foo', - flavor=objects.Flavor(extra_specs={}), + flavor=fake_flavor.fake_flavor_obj( + mock.sentinel.ctx, expected_attrs=['extra_specs']), + instance_uuid=self.fake_build_req.instance_uuid, + availability_zone=None, scheduler_hints=dict(_nova_check_type=['resize'], source_host=['host2'])) self.filt_cls._PROJECT_TAG_CACHE['foo'] = ['vc-a-0', 'vc-a-1', 'vc-b-0'] - self.assertTrue(self.filt_cls.host_passes(host, spec_obj)) + self._assert_passes(host, spec_obj, True) - def test_shard_project_has_sharding_enabled_any_host_passes(self): + @mock.patch('nova.objects.BuildRequest.get_by_instance_uuid') + def test_shard_project_has_sharding_enabled_any_host_passes( + self, get_by_uuid): + get_by_uuid.return_value = self.fake_build_req self.filt_cls._PROJECT_TAG_CACHE['baz'] = ['sharding_enabled'] aggs = [objects.Aggregate(id=1, name='some-az-a', hosts=['host1']), objects.Aggregate(id=1, name='vc-a-0', hosts=['host1'])] host = fakes.FakeHostState('host1', 'compute', {'aggregates': aggs}) spec_obj = objects.RequestSpec( context=mock.sentinel.ctx, project_id='baz', - flavor=objects.Flavor(extra_specs={})) - self.assertTrue(self.filt_cls.host_passes(host, spec_obj)) - - def test_shard_project_has_sharding_enabled_and_single_shards(self): + instance_uuid=self.fake_build_req.instance_uuid, + availability_zone=None, + flavor=fake_flavor.fake_flavor_obj( + mock.sentinel.ctx, expected_attrs=['extra_specs'])) + self._assert_passes(host, spec_obj, True) + + @mock.patch('nova.objects.BuildRequest.get_by_instance_uuid') + def test_shard_project_has_sharding_enabled_and_single_shards( + self, get_by_uuid): + get_by_uuid.return_value = self.fake_build_req self.filt_cls._PROJECT_TAG_CACHE['baz'] = ['sharding_enabled', 'vc-a-1'] aggs = [objects.Aggregate(id=1, name='some-az-a', hosts=['host1']), @@ -198,16 +270,217 @@ def test_shard_project_has_sharding_enabled_and_single_shards(self): host = fakes.FakeHostState('host1', 'compute', {'aggregates': aggs}) spec_obj = objects.RequestSpec( context=mock.sentinel.ctx, project_id='baz', - flavor=objects.Flavor(extra_specs={})) - self.assertTrue(self.filt_cls.host_passes(host, spec_obj)) + instance_uuid=self.fake_build_req.instance_uuid, + availability_zone=None, + flavor=fake_flavor.fake_flavor_obj( + mock.sentinel.ctx, expected_attrs=['extra_specs'])) + self._assert_passes(host, spec_obj, True) + + @mock.patch('nova.objects.AggregateList.get_all') + @mock.patch('nova.context.scatter_gather_skip_cell0') + @mock.patch('nova.objects.BuildRequest.get_by_instance_uuid') + @mock.patch('nova.context.get_admin_context') + def test_same_shard_for_kubernikus_cluster(self, get_context, + get_by_uuid, + gather_host, + get_aggrs): + kks_cluster = 'kubernikus:kluster-example' + build_req = objects.BuildRequest() + build_req.tags = objects.TagList(objects=[ + objects.Tag(tag=kks_cluster) + ]) + build_req.instance = self.fake_instance + get_by_uuid.return_value = build_req + + result = self._filter_k8s_hosts(get_context, + gather_host, + get_aggrs) + + gather_host.assert_called_once_with( + get_context.return_value, + objects.ComputeNodeList.get_k8s_hosts_by_instances_tag, + 'kubernikus:kluster-example', + filters={'hv_type': 'VMware vCenter Server', + 'availability_zone': 'az-2'}) + + self.assertEqual(2, len(result)) + self.assertEqual(result[0].host, 'host4') + self.assertEqual(result[1].host, 'host5') + + @mock.patch('nova.objects.AggregateList.get_all') + @mock.patch('nova.context.scatter_gather_skip_cell0') + @mock.patch('nova.objects.BuildRequest.get_by_instance_uuid') + @mock.patch('nova.context.get_admin_context') + def test_same_shard_for_gardener_cluster(self, get_context, + get_by_uuid, + gather_host, + get_aggrs): + gardener_cluster = 'kubernetes.io-cluster-shoot--garden--testCluster' + new_instance = fake_instance.fake_instance_obj( + get_context.return_value, + expected_attrs=['metadata'], + metadata={gardener_cluster: '1'}, + uuid=self.fake_instance.uuid) + build_req = objects.BuildRequest() + build_req.instance = new_instance + build_req.tags = objects.TagList() + get_by_uuid.return_value = build_req + + result = self._filter_k8s_hosts(get_context, + gather_host, + get_aggrs) + + gather_host.assert_called_once_with( + get_context.return_value, + objects.ComputeNodeList.get_k8s_hosts_by_instances_metadata, + gardener_cluster, '1', + filters={'hv_type': 'VMware vCenter Server', + 'availability_zone': 'az-2'}) + + self.assertEqual(2, len(result)) + self.assertEqual(result[0].host, 'host4') + self.assertEqual(result[1].host, 'host5') + + @mock.patch('nova.objects.AggregateList.get_all') + @mock.patch('nova.context.scatter_gather_skip_cell0') + @mock.patch('nova.objects.Instance.get_by_uuid') + @mock.patch('nova.context.get_admin_context') + def test_same_shard_for_nonbuild_requests(self, get_context, + get_by_uuid, + gather_host, + get_aggrs): + gardener_cluster = 'kubernetes.io-cluster-shoot--garden--testCluster' + new_instance = fake_instance.fake_instance_obj( + get_context.return_value, + expected_attrs=['metadata'], + metadata={gardener_cluster: '1'}) + get_by_uuid.return_value = new_instance + + result = self._filter_k8s_hosts( + get_context, gather_host, get_aggrs, + scheduler_hints={'_nova_check_type': ['live_migrate']}) + + gather_host.assert_called_once_with( + get_context.return_value, + objects.ComputeNodeList.get_k8s_hosts_by_instances_metadata, + gardener_cluster, '1', + filters={'hv_type': 'VMware vCenter Server', + 'availability_zone': 'az-2'}) + + self.assertEqual(2, len(result)) + self.assertEqual(result[0].host, 'host4') + self.assertEqual(result[1].host, 'host5') + + def _filter_k8s_hosts(self, get_context, gather_host, get_aggrs, + **request_spec): + """Given a K8S cluster that spans across 3 shards + (vc-a-0, vc-b-0, vc-b-1) and 2 availability zones (az-1, az-2) + where the most k8s hosts are in the vc-b-1 shard. When there is + a RequestSpec for 'az-2', then the hosts in 'vc-b-1' shard must + be returned, since it's the dominant shard. + """ + gather_host.return_value = {'cell1': [ + ('host3', 4), ('host4', 2), ('host5', 3) + ]} + + self.filt_cls._PROJECT_TAG_CACHE['foo'] = ['sharding_enabled', + 'vc-a-1'] + agg1 = objects.Aggregate(id=1, name='vc-a-0', hosts=['host1']) + agg2 = objects.Aggregate(id=2, name='vc-b-0', hosts=['host2', 'host3']) + agg3 = objects.Aggregate(id=3, name='vc-b-1', hosts=['host4', 'host5']) + + get_aggrs.return_value = [agg1, agg2, agg3] + + host1 = fakes.FakeHostState('host1', 'compute', + {'aggregates': [agg1]}) + host2 = fakes.FakeHostState('host2', 'compute', + {'aggregates': [agg2]}) + host3 = fakes.FakeHostState('host3', 'compute', + {'aggregates': [agg2]}) + host4 = fakes.FakeHostState('host4', 'compute', + {'aggregates': [agg3]}) + host5 = fakes.FakeHostState('host5', 'compute', + {'aggregates': [agg3]}) + spec_obj = objects.RequestSpec( + context=get_context.return_value, project_id='foo', + availability_zone='az-2', + instance_uuid=self.fake_instance.uuid, + flavor=fake_flavor.fake_flavor_obj( + mock.sentinel.ctx, expected_attrs=['extra_specs'], + name='m1'), + **request_spec) + + return list(self.filt_cls.filter_all( + [host1, host2, host3, host4, host5], spec_obj)) + + @mock.patch('nova.objects.AggregateList.get_all') + @mock.patch('nova.context.scatter_gather_skip_cell0') + @mock.patch('nova.objects.BuildRequest.get_by_instance_uuid') + @mock.patch('nova.context.get_admin_context') + def test_k8s_bypass_hana_flavors(self, get_context, + get_by_uuid, + gather_host, + get_aggrs): + gardener_cluster = 'kubernetes.io-cluster-shoot--garden--testCluster' + hana_flavor = fake_flavor.fake_flavor_obj( + mock.sentinel.ctx, expected_attrs=['extra_specs'], + id=1, name='hana_flavor1', memory_mb=256, vcpus=1, root_gb=1) + new_instance = fake_instance.fake_instance_obj( + get_context.return_value, + flavor=hana_flavor, + expected_attrs=['metadata'], + metadata={gardener_cluster: '1'}) + build_req = objects.BuildRequest() + build_req.instance = new_instance + build_req.tags = objects.TagList() + + get_by_uuid.return_value = build_req + + self.filt_cls._PROJECT_TAG_CACHE['baz'] = ['sharding_enabled', + 'vc-a-1'] + agg1 = objects.Aggregate(id=1, name='vc-a-0', hosts=['host1']) + hana_agg = objects.Aggregate(id=1, name='vc-b-0', + hosts=['host2', 'host3']) + + host1 = fakes.FakeHostState('host1', 'compute', + {'aggregates': [agg1]}) + host2 = fakes.FakeHostState('host2', 'compute', + {'aggregates': [hana_agg]}) + host3 = fakes.FakeHostState('host3', 'compute', + {'aggregates': [hana_agg]}) + get_aggrs.return_value = [agg1, hana_agg] + + spec_obj = objects.RequestSpec( + context=get_context.return_value, project_id='foo', + availability_zone='az-1', + instance_uuid=self.fake_build_req.instance_uuid, + flavor=fake_flavor.fake_flavor_obj( + mock.sentinel.ctx, expected_attrs=['extra_specs'], + name='hana_flavor1')) + + result = list(self.filt_cls.filter_all([host1, host2, host3], + spec_obj)) + + gather_host.assert_not_called() + self.assertEqual(3, len(result)) + self.assertEqual(result[0].host, 'host1') + self.assertEqual(result[1].host, 'host2') + self.assertEqual(result[2].host, 'host3') + + @mock.patch('nova.objects.BuildRequest.get_by_instance_uuid') @mock.patch('nova.scheduler.filters.shard_filter.LOG') @mock.patch('nova.scheduler.filters.utils.aggregate_metadata_get_by_host') - def test_log_level_for_missing_vc_aggregate(self, agg_mock, log_mock): + def test_log_level_for_missing_vc_aggregate(self, agg_mock, log_mock, + get_by_uuid): + get_by_uuid.return_value = self.fake_build_req host = fakes.FakeHostState('host1', 'compute', {}) spec_obj = objects.RequestSpec( context=mock.sentinel.ctx, project_id='foo', - flavor=objects.Flavor(extra_specs={})) + instance_uuid=self.fake_build_req.instance_uuid, + availability_zone=None, + flavor=fake_flavor.fake_flavor_obj( + mock.sentinel.ctx, expected_attrs=['extra_specs'])) agg_mock.return_value = {} @@ -215,7 +488,7 @@ def test_log_level_for_missing_vc_aggregate(self, agg_mock, log_mock): log_mock.debug = mock.Mock() log_mock.error = mock.Mock() host.hypervisor_type = 'ironic' - self.assertFalse(self.filt_cls.host_passes(host, spec_obj)) + self._assert_passes(host, spec_obj, False) log_mock.debug.assert_called_once_with(mock.ANY, mock.ANY) log_mock.error.assert_not_called() @@ -223,14 +496,21 @@ def test_log_level_for_missing_vc_aggregate(self, agg_mock, log_mock): log_mock.debug = mock.Mock() log_mock.error = mock.Mock() host.hypervisor_type = 'Some HV' - self.assertFalse(self.filt_cls.host_passes(host, spec_obj)) + self._assert_passes(host, spec_obj, False) log_mock.error.assert_called_once_with(mock.ANY, mock.ANY) log_mock.debug.assert_not_called() @mock.patch('nova.scheduler.utils.is_non_vmware_spec', return_value=True) def test_non_vmware_spec(self, mock_is_non_vmware_spec): - host = mock.sentinel.host + host1 = mock.sentinel.host1 + host2 = mock.sentinel.host2 spec_obj = mock.sentinel.spec_obj - self.assertTrue(self.filt_cls.host_passes(host, spec_obj)) + result = list(self.filt_cls.filter_all([host1, host2], spec_obj)) + + self.assertEqual([host1, host2], result) mock_is_non_vmware_spec.assert_called_once_with(spec_obj) + + def _assert_passes(self, host, spec_obj, passes): + result = bool(list(self.filt_cls.filter_all([host], spec_obj))) + self.assertEqual(passes, result) diff --git a/nova/virt/driver.py b/nova/virt/driver.py index b11bb2a98d8..eaf1f6de27d 100644 --- a/nova/virt/driver.py +++ b/nova/virt/driver.py @@ -1807,6 +1807,14 @@ def in_cluster_vmotion(self, context, instance, host_moref_value): """ raise NotImplementedError() + def validate_instance_group_policy(self, context, instance): + """Validates that the instance meets driver-specific grouping policy + + The driver can raise exception.RescheduledException to reject and + trigger rescheduling of the instance to a different host. + """ + pass + def load_compute_driver(virtapi, compute_driver=None): """Load a compute driver module. diff --git a/nova/virt/vmwareapi/driver.py b/nova/virt/vmwareapi/driver.py index b7ca867f50b..165dab8578a 100644 --- a/nova/virt/vmwareapi/driver.py +++ b/nova/virt/vmwareapi/driver.py @@ -1319,3 +1319,6 @@ def in_cluster_vmotion(self, context, instance, host_moref_value): vim_util.get_moref_value(current_host_ref), vim_util.get_moref_value(host_ref), instance=instance) + + def validate_instance_group_policy(self, context, instance): + self._vmops._check_k8s_shard(instance) diff --git a/nova/virt/vmwareapi/shard_util.py b/nova/virt/vmwareapi/shard_util.py new file mode 100644 index 00000000000..fbe6e523ed2 --- /dev/null +++ b/nova/virt/vmwareapi/shard_util.py @@ -0,0 +1,93 @@ +# Copyright (c) 2023 SAP SE +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +from collections import defaultdict + +from nova import context as nova_context +from nova import exception +from nova.objects.aggregate import AggregateList +from nova.objects.compute_node import ComputeNodeList + +GARDENER_PREFIX = "kubernetes.io-cluster-" +KKS_PREFIX = "kubernikus:kluster" +VMWARE_HV_TYPE = 'VMware vCenter Server' +SHARD_PREFIX = 'vc-' + + +def get_sorted_k8s_shard_aggregates(context, metadata, tags, availability_zone, + skip_instance_uuid=None): + """Returns the shards of a K8S cluster sorted by the instances count. + + The K8S cluster is determined by Instance's metadata or tags. + Returns None if the cluster is new (first instance is being spawned there) + or if the K8S metadata/tags are not set. + """ + kks_tag = None + gardener_meta = None + no_ret = None + if tags: + kks_tag = next((t.tag for t in tags + if t.tag.startswith(KKS_PREFIX)), None) + if not kks_tag and metadata: + gardener_meta = \ + {k: v for k, v in metadata.items() + if k.startswith(GARDENER_PREFIX)} + + if not kks_tag and not gardener_meta: + return no_ret + + q_filters = {'hv_type': VMWARE_HV_TYPE} + if availability_zone: + q_filters['availability_zone'] = availability_zone + if skip_instance_uuid: + q_filters['skip_instance_uuid'] = skip_instance_uuid + + results = None + if kks_tag: + results = nova_context.scatter_gather_skip_cell0( + context, ComputeNodeList.get_k8s_hosts_by_instances_tag, + kks_tag, filters=q_filters) + elif gardener_meta: + (meta_key, meta_value) = next(iter(gardener_meta.items())) + results = nova_context.scatter_gather_skip_cell0( + context, ComputeNodeList.get_k8s_hosts_by_instances_metadata, + meta_key, meta_value, filters=q_filters) + + if not results: + return no_ret + + # hosts with count of instances from this K8S cluster + # {host: } + k8s_hosts = defaultdict(lambda: 0) + + for cell_uuid, cell_result in results.items(): + if nova_context.is_cell_failure_sentinel(cell_result): + raise exception.NovaException( + "Unable to schedule the K8S instance because " + "cell %s is not responding." % cell_uuid) + cell_hosts = dict(cell_result) + for h, c in cell_hosts.items(): + k8s_hosts[h] += c + + if not k8s_hosts: + return no_ret + + all_shard_aggrs = [agg for agg in AggregateList.get_all(context) + if agg.name.startswith(SHARD_PREFIX)] + + return sorted( + all_shard_aggrs, + reverse=True, + key=lambda aggr: sum(i for h, i in k8s_hosts.items() + if h in aggr.hosts)) diff --git a/nova/virt/vmwareapi/vmops.py b/nova/virt/vmwareapi/vmops.py index eea0310cc32..60981076646 100644 --- a/nova/virt/vmwareapi/vmops.py +++ b/nova/virt/vmwareapi/vmops.py @@ -74,6 +74,7 @@ from nova.virt.vmwareapi import imagecache from nova.virt.vmwareapi import images from nova.virt.vmwareapi.rpc import VmwareRpcApi +from nova.virt.vmwareapi import shard_util from nova.virt.vmwareapi import special_spawning from nova.virt.vmwareapi import vif as vmwarevif from nova.virt.vmwareapi import vim_util @@ -1358,6 +1359,28 @@ def prepare_for_spawn(self, instance): raise exception.InstanceUnacceptable(instance_id=instance.uuid, reason=reason) + def _check_k8s_shard(self, instance): + """Handles race condition when spawning K8S instances in parallel. + + If the instance is part of a K8S cluster, ensures that this host + is part of any shard bound to that cluster, otherwise we should + reschedule the instance. + """ + k8s_shard_aggrs = shard_util.get_sorted_k8s_shard_aggregates( + nova_context.get_admin_context(), instance.metadata, instance.tags, + instance.availability_zone, skip_instance_uuid=instance.uuid) + + if k8s_shard_aggrs: + matches = any(self._compute_host in aggr.hosts + for aggr in k8s_shard_aggrs) + if not matches: + msg = ("Host %(host)s rejected K8S instance %(instance_uuid)s " + "because the K8S cluster is not part to this shard." + % ({"host": self._compute_host, + "instance_uuid": instance.uuid})) + raise exception.RescheduledException( + instance_uuid=instance.uuid, reason=msg) + def spawn(self, context, instance, image_meta, injected_files, admin_password, network_info, block_device_info=None):