From d74953a541969bb6bc875a8ea5377ed3eb20a585 Mon Sep 17 00:00:00 2001 From: Matt Pryor Date: Fri, 23 Aug 2024 10:22:50 +0100 Subject: [PATCH] Add a CRD for managing Blazar leases (#53) * Add initial code to call Blazar * Improve the test coverage a little * Lease CRD working * Got into Error on 400 errors TODO: notifiy caas about the type of error, cloud full or out of credit * Lease creation and deletion working + non-Blazar case * Fix issues with tox * Add map of size names for CAPI * Wait until starts_at before setting non-Blazar leases to ACTIVE * Run black * Allow Blazar support to be disabled, even when Blazar is available * Add metrics for the lease CRD * Add starts_at metric for leases * Add alert for lease phase * Add debugging step to workflow * Add functional tests for lease CRD * Move tmate step earlier in job * Small tweaks to functional test * Move tmate step before k3s * Allow tmate to be used at multiple steps * Use non-conflicting CIDRs for k3s * Move log outputting to signal handler * Fix handling of path prefix for catalog URL * Add tmate step back in * Fix OpenStack client prefix calculation * Fix catalog URL * Fix typo in catalog URL calculation * Use correct error message * Fix broken tox tests * Add unit tests for the lease CRD --------- Co-authored-by: John Garbutt --- .github/workflows/functional.yaml | 38 +- azimuth_schedule_operator/__main__.py | 2 +- azimuth_schedule_operator/metrics.py | 50 + azimuth_schedule_operator/models/registry.py | 3 +- .../models/v1alpha1/lease.py | 127 ++ azimuth_schedule_operator/openstack.py | 255 +++ azimuth_schedule_operator/operator.py | 409 ++++- .../tests/models/test_crds.py | 186 ++- .../tests/models/test_registry.py | 6 +- azimuth_schedule_operator/tests/test_lease.py | 1462 +++++++++++++++++ .../{test_operator.py => test_schedule.py} | 2 +- azimuth_schedule_operator/tests/util.py | 143 ++ .../templates/clusterrole-operator.yaml | 4 + charts/operator/templates/deployment.yaml | 9 + charts/operator/templates/prometheusrule.yaml | 13 + charts/operator/values.yaml | 10 + test-requirements.txt | 1 + tools/functional_test.sh | 188 ++- tools/lease.yaml | 23 + tox.ini | 4 +- 20 files changed, 2905 insertions(+), 30 deletions(-) create mode 100644 azimuth_schedule_operator/models/v1alpha1/lease.py create mode 100644 azimuth_schedule_operator/openstack.py create mode 100644 azimuth_schedule_operator/tests/test_lease.py rename azimuth_schedule_operator/tests/{test_operator.py => test_schedule.py} (99%) create mode 100644 azimuth_schedule_operator/tests/util.py create mode 100644 tools/lease.yaml diff --git a/.github/workflows/functional.yaml b/.github/workflows/functional.yaml index a17e2ab..194172e 100644 --- a/.github/workflows/functional.yaml +++ b/.github/workflows/functional.yaml @@ -5,21 +5,35 @@ on: jobs: functional_test: - name: Operator functional tests via tox - timeout-minutes: 10 + name: Operator functional tests runs-on: ubuntu-latest steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@v4 - - name: Set up Helm - uses: azure/setup-helm@v4 - with: - version: v3.11.3 + - name: Deploy devstack + uses: EmilienM/devstack-action@v0.15 - - name: Create k8s Kind Cluster - uses: helm/kind-action@v1.10.0 + - name: Install k3s + run: | + set -eo pipefail + curl -sfL https://get.k3s.io | \ + bash -s - \ + --disable traefik \ + --cluster-cidr 172.30.0.0/16 \ + --service-cidr 172.31.0.0/16 + mkdir $HOME/.kube + sudo cp /etc/rancher/k3s/k3s.yaml $HOME/.kube/config + sudo chown $USER $HOME/.kube/config - - name: Run test - timeout-minutes: 10 - run: tools/functional_test.sh + - name: Install gomplate + run: | + GOBIN=/usr/local/bin \ + go install github.com/hairyhenderson/gomplate/v4/cmd/gomplate@latest + gomplate --version + + - name: Run functional tests + timeout-minutes: 15 + run: | + source devstack/openrc demo demo + tools/functional_test.sh diff --git a/azimuth_schedule_operator/__main__.py b/azimuth_schedule_operator/__main__.py index 71cbb50..8ecaa04 100644 --- a/azimuth_schedule_operator/__main__.py +++ b/azimuth_schedule_operator/__main__.py @@ -12,7 +12,7 @@ async def main(): # This import is required to pick up the operator handlers from . import operator # noqa - kopf.configure() + kopf.configure(log_prefix=True) tasks = await kopf.spawn_tasks( clusterwide=True, liveness_endpoint="http://0.0.0.0:8000/healthz" ) diff --git a/azimuth_schedule_operator/metrics.py b/azimuth_schedule_operator/metrics.py index 8e0065d..ee40578 100644 --- a/azimuth_schedule_operator/metrics.py +++ b/azimuth_schedule_operator/metrics.py @@ -1,4 +1,5 @@ import asyncio +import datetime import functools from aiohttp import web @@ -72,6 +73,51 @@ def value(self, obj): return 1 if obj.get("status", {}).get("refDeleteTriggered", False) else 0 +class LeaseMetric(Metric): + prefix = "azimuth_lease" + + def labels(self, obj): + return { + "lease_namespace": obj.metadata.namespace, + "lease_name": obj.metadata.name, + } + + +class LeasePhase(LeaseMetric): + suffix = "phase" + description = "The phase of the lease" + + def labels(self, obj): + return { + **super().labels(obj), + "phase": obj.get("status", {}).get("phase", "Unknown"), + } + + +class LeaseStartsAt(LeaseMetric): + suffix = "starts_at" + type = "gauge" + description = "The start time of the lease" + + def value(self, obj): + created_at = obj.metadata["creationTimestamp"] + starts_at = obj.get("spec", {}).get("startsAt", created_at) + return datetime.datetime.fromisoformat(starts_at).timestamp() + + +class LeaseEndsAt(LeaseMetric): + suffix = "ends_at" + type = "gauge" + description = "The end time of the lease" + + def value(self, obj): + ends_at = obj.get("spec", {}).get("endsAt") + if ends_at: + return datetime.datetime.fromisoformat(ends_at).timestamp() + else: + return -1 + + def escape(content): """Escape the given content for use in metric output.""" return content.replace("\\", r"\\").replace("\n", r"\n").replace('"', r"\"") @@ -116,6 +162,10 @@ def render_openmetrics(*metrics): METRICS = { registry.API_GROUP: { + "leases": [ + LeasePhase, + LeaseEndsAt, + ], "schedules": [ ScheduleRefFound, ScheduleDeleteTriggered, diff --git a/azimuth_schedule_operator/models/registry.py b/azimuth_schedule_operator/models/registry.py index 0f10e65..77bc292 100644 --- a/azimuth_schedule_operator/models/registry.py +++ b/azimuth_schedule_operator/models/registry.py @@ -1,6 +1,6 @@ import kube_custom_resource as crd -from azimuth_schedule_operator.models.v1alpha1 import schedule +from azimuth_schedule_operator.models.v1alpha1 import lease, schedule API_GROUP = "scheduling.azimuth.stackhpc.com" API_VERSION = API_GROUP + "/v1alpha1" @@ -9,6 +9,7 @@ def get_registry(): registry = crd.CustomResourceRegistry(API_GROUP, CATEGORIES) + registry.discover_models(lease) registry.discover_models(schedule) return registry diff --git a/azimuth_schedule_operator/models/v1alpha1/lease.py b/azimuth_schedule_operator/models/v1alpha1/lease.py new file mode 100644 index 0000000..3b5d06d --- /dev/null +++ b/azimuth_schedule_operator/models/v1alpha1/lease.py @@ -0,0 +1,127 @@ +import datetime as dt +import typing as t + +from pydantic import Field + +from kube_custom_resource import CustomResource, schema + + +class Machine(schema.BaseModel): + """Represents a reservation for a machine.""" + + size_id: schema.constr(min_length=1) = Field( + ..., description="The ID of the size for the machine." + ) + count: schema.conint(gt=0) = Field( + ..., description="The number of machines of this size to reserve." + ) + + +class ResourcesSpec(schema.BaseModel): + """The resources that a lease is reserving.""" + + machines: t.List[Machine] = Field( + default_factory=list, + description="Machines that should be reserved by the lease.", + ) + + +class LeaseSpec(schema.BaseModel): + """The spec of a lease.""" + + cloud_credentials_secret_name: schema.constr(min_length=1) = Field( + ..., description="The name of the secret containing the cloud credentials." + ) + starts_at: schema.Optional[dt.datetime] = Field( + None, + description=( + "The start time for the lease. " + "If no start time is given, it is assumed to start immediately." + ), + ) + ends_at: schema.Optional[dt.datetime] = Field( + None, + description=( + "The end time for the lease. " + "If no end time is given, the lease is assumed to be infinite." + ), + ) + grace_period: schema.Optional[schema.conint(ge=0)] = Field( + None, + description=( + "The grace period before the end of the lease that the platform " + "will be given to shut down gracefully. " + "If not given, the operator default grace period will be used." + ), + ) + resources: ResourcesSpec = Field( + ..., description="The resources that the lease is reserving." + ) + + +class LeasePhase(str, schema.Enum): + """The phase of a lease.""" + + # Stable phases + PENDING = "Pending" + ACTIVE = "Active" + TERMINATED = "Terminated" + ERROR = "Error" + # Transitional phases + CREATING = "Creating" + STARTING = "Starting" + UPDATING = "Updating" + TERMINATING = "Terminating" + DELETING = "Deleting" + UNKNOWN = "Unknown" + + +class LeaseStatus(schema.BaseModel, extra="allow"): + """The status of a lease.""" + + phase: LeasePhase = Field(LeasePhase.UNKNOWN, description="The phase of the lease.") + error_message: str = Field( + "", description="The error message for the lease, if known." + ) + size_map: schema.Dict[str, str] = Field( + default_factory=dict, + description="Mapping of original size ID to reserved size ID.", + ) + size_name_map: schema.Dict[str, str] = Field( + default_factory=dict, + description="Mapping of original size name to reserved size name.", + ) + + def set_phase(self, phase: LeasePhase, error_message: t.Optional[str] = None): + """Set the phase of the lease, along with an optional error message.""" + self.phase = phase + self.error_message = error_message if phase == LeasePhase.ERROR else "" + + +class Lease( + CustomResource, + subresources={"status": {}}, + printer_columns=[ + { + "name": "Starts At", + "type": "string", + "format": "date-time", + "jsonPath": ".spec.startsAt", + }, + { + "name": "Ends At", + "type": "string", + "format": "date-time", + "jsonPath": ".spec.endsAt", + }, + { + "name": "phase", + "type": "string", + "jsonPath": ".status.phase", + }, + ], +): + """A lease consisting of one or more reserved resources.""" + + spec: LeaseSpec + status: LeaseStatus = Field(default_factory=LeaseStatus) diff --git a/azimuth_schedule_operator/openstack.py b/azimuth_schedule_operator/openstack.py new file mode 100644 index 0000000..df18ed0 --- /dev/null +++ b/azimuth_schedule_operator/openstack.py @@ -0,0 +1,255 @@ +import asyncio +import base64 +import contextlib +import urllib.parse + +import httpx + +import yaml + +from easykube import rest + + +class UnsupportedAuthenticationError(Exception): + """Raised when an unsupported authentication method is used.""" + + def __init__(self, auth_type): + super().__init__(f"unsupported authentication type: {auth_type}") + + +class ApiNotSupportedError(Exception): + """Raised when the requested API is not supported.""" + + def __init__(self, api): + super().__init__(f"api '{api}' is not supported") + + +class Auth(httpx.Auth): + """Authenticator class for OpenStack connections.""" + + def __init__( + self, auth_url, application_credential_id, application_credential_secret + ): + self.url = auth_url + self._application_credential_id = application_credential_id + self._application_credential_secret = application_credential_secret + self._token = None + self._user_id = None + self._lock = asyncio.Lock() + + @contextlib.asynccontextmanager + async def _refresh_token(self): + """ + Context manager to ensure only one request at a time triggers a token refresh. + """ + token = self._token + async with self._lock: + # Only yield to the wrapped block if the token has not changed + # in the time it took to acquire the lock + if token == self._token: + yield + + def _build_token_request(self): + return httpx.Request( + "POST", + f"{self.url}/v3/auth/tokens", + json={ + "auth": { + "identity": { + "methods": ["application_credential"], + "application_credential": { + "id": self._application_credential_id, + "secret": self._application_credential_secret, + }, + }, + }, + }, + ) + + def _handle_token_response(self, response): + response.raise_for_status() + self._token = response.headers["X-Subject-Token"] + self._user_id = response.json()["token"]["user"]["id"] + + async def async_auth_flow(self, request): + if self._token is None: + async with self._refresh_token(): + response = yield self._build_token_request() + await response.aread() + self._handle_token_response(response) + request.headers["X-Auth-Token"] = self._token + # TODO(johngarbutt): this is needed for blazar + request.headers["Content-Type"] = "application/json" + # TODO(johngarbutt): this is needed for nova flavor extra spec info + request.headers["X-OpenStack-Nova-API-Version"] = "2.61" + response = yield request + + +class Resource(rest.Resource): + """Base resource for OpenStack APIs.""" + + def __init__(self, client, name, prefix=None, plural_name=None, singular_name=None): + super().__init__(client, name, prefix) + # Some resources support a /detail endpoint + # In this case, we just want to use the name up to the slash as the plural name + self._plural_name = plural_name or self._name.split("/")[0] + # If no singular name is given, assume the name ends in 's' + self._singular_name = singular_name or self._plural_name[:-1] + + @property + def singular_name(self): + return self._singular_name + + def _extract_list(self, response): + # Some resources support a /detail endpoint + # In this case, we just want to use the name up to the slash + return response.json()[self._plural_name] + + def _extract_next_page(self, response): + next_url = next( + ( + link["href"] + for link in response.json().get(f"{self._plural_name}_links", []) + if link["rel"] == "next" + ), + None, + ) + # Sometimes, the returned URLs have http where they should have https + # To mitigate this, we split the URL and return the path and params separately + url = urllib.parse.urlsplit(next_url) + params = urllib.parse.parse_qs(url.query) + return url.path, params + + def _extract_one(self, response): + content_type = response.headers.get("content-type") + if content_type == "application/json": + return response.json()[self._singular_name] + else: + return super()._extract_one(response) + + +class Client(rest.AsyncClient): + """Client for OpenStack APIs.""" + + def __init__(self, /, base_url, prefix=None, **kwargs): + # Extract the path part of the base_url + url = urllib.parse.urlsplit(base_url) + # Initialise the client with the scheme/host + super().__init__(base_url=f"{url.scheme}://{url.netloc}", **kwargs) + # Add the path to the given prefix to use as the full prefix + # Not having this on the base URL ensures pagination works without + # duplicating the prefix + self._prefix = "/".join([url.path.rstrip("/"), (prefix or "").lstrip("/")]) + + def __aenter__(self): + # Prevent individual clients from being used in a context manager + raise RuntimeError("clients must be used via a cloud object") + + def resource(self, name, prefix=None, plural_name=None, singular_name=None): + # If an additional prefix is given, combine it with the existing prefix + if prefix: + prefix = "/".join([self._prefix.rstrip("/"), prefix.lstrip("/")]) + else: + prefix = self._prefix + return Resource(self, name, prefix, plural_name, singular_name) + + +class Cloud: + """Object for interacting with OpenStack clouds.""" + + def __init__(self, auth, transport, interface): + self._auth = auth + self._transport = transport + self._interface = interface + self._endpoints = {} + # A map of api name to client + self._clients = {} + + async def __aenter__(self): + await self._transport.__aenter__() + # Once the transport has been initialised, we can initialise the endpoints + client = Client( + base_url=self._auth.url, auth=self._auth, transport=self._transport + ) + # We have to slightly artifically create the catalog URL as we don't + # benefit from the prefix handling that the resources use + catalog_url = self._auth.url.rstrip("/") + "/v3/auth/catalog" + try: + response = await client.get(catalog_url) + except httpx.HTTPStatusError as exc: + # If the auth fails, we just have an empty app catalog + if exc.response.status_code == 404: + return self + else: + raise + self._endpoints = { + entry["type"]: next( + ep["url"] + for ep in entry["endpoints"] + if ep["interface"] == self._interface + ) + for entry in response.json()["catalog"] + if len(entry["endpoints"]) > 0 + } + return self + + async def __aexit__(self, exc_type, exc_value, traceback): + await self._transport.__aexit__(exc_type, exc_value, traceback) + + @property + def is_authenticated(self): + """True if the cloud is authenticated, False otherwise.""" + return bool(self._endpoints) + + @property + def application_credential_id(self): + """The ID of the application credential used to authenticate.""" + return self._auth._application_credential_id + + @property + def current_user_id(self): + """The ID of the current user.""" + return self._auth._user_id + + def api_client(self, name, prefix=None, **kwargs): + """Returns a client for the named API.""" + if name not in self._clients: + if name not in self._endpoints: + raise ApiNotSupportedError(name) + self._clients[name] = Client( + base_url=self._endpoints[name], + prefix=prefix, + auth=self._auth, + transport=self._transport, + **kwargs, + ) + return self._clients[name] + + +def from_clouds(clouds, cloud, cacert): + """Returns an OpenStack cloud object from the content of a clouds file.""" + config = clouds["clouds"][cloud] + if config["auth_type"] != "v3applicationcredential": + raise UnsupportedAuthenticationError(config["auth_type"]) + auth = Auth( + config["auth"]["auth_url"], + config["auth"]["application_credential_id"], + config["auth"]["application_credential_secret"], + ) + # Create a default context using the verification from the config + context = httpx.create_ssl_context(verify=config.get("verify", True)) + # If a cacert was given, load it into the context + if cacert is not None: + context.load_verify_locations(cadata=cacert) + transport = httpx.AsyncHTTPTransport(verify=context) + return Cloud(auth, transport, config.get("interface", "public")) + + +def from_secret_data(secret_data): + """Returns an OpenStack cloud object from the given secret data.""" + clouds = yaml.safe_load(base64.b64decode(secret_data["clouds.yaml"])) + if "cacert" in secret_data: + cacert = base64.b64decode(secret_data["cacert"]).decode() + else: + cacert = None + return from_clouds(clouds, next(c for c in clouds["clouds"]), cacert) diff --git a/azimuth_schedule_operator/operator.py b/azimuth_schedule_operator/operator.py index a311d81..82bac1a 100644 --- a/azimuth_schedule_operator/operator.py +++ b/azimuth_schedule_operator/operator.py @@ -1,25 +1,62 @@ import asyncio +import collections import datetime +import json import logging import os import sys +import httpx import kopf +import easykube + from azimuth_schedule_operator.models import registry -from azimuth_schedule_operator.models.v1alpha1 import schedule as schedule_crd +from azimuth_schedule_operator.models.v1alpha1 import ( + lease as lease_crd, + schedule as schedule_crd, +) +from azimuth_schedule_operator import openstack from azimuth_schedule_operator.utils import k8s LOG = logging.getLogger(__name__) K8S_CLIENT = None CHECK_INTERVAL_SECONDS = int( - os.environ.get("AZIMUTH_SCHEDULE_CHECK_INTERVAL_SECONDS", "60") + os.environ.get( + "AZIMUTH_SCHEDULE_CHECK_INTERVAL_SECONDS", + # By default, check schedules and leases every 60s + "60", + ) +) +LEASE_CHECK_INTERVAL_SECONDS = int( + os.environ.get("AZIMUTH_LEASE_CHECK_INTERVAL_SECONDS", CHECK_INTERVAL_SECONDS) ) +LEASE_DEFAULT_GRACE_PERIOD_SECONDS = int( + os.environ.get( + "AZIMUTH_LEASE_DEFAULT_GRACE_PERIOD_SECONDS", + # Give platforms 10 minutes to delete by default + "600", + ) +) +# Indicates whether leases should use Blazar +# Valid values are "yes", "no" and "auto" +# The default is "auto", which means Blazar will be used iff it is available +LEASE_BLAZAR_ENABLED = os.environ.get("AZIMUTH_LEASE_BLAZAR_ENABLED", "auto") @kopf.on.startup() async def startup(settings, **kwargs): + # Use a scheduler-specific format for the finalizer + settings.persistence.finalizer = registry.API_GROUP + # Use the annotation-based storage only (not status) + settings.persistence.progress_storage = kopf.AnnotationsProgressStorage( + prefix=registry.API_GROUP + ) + settings.persistence.diffbase_storage = kopf.AnnotationsDiffBaseStorage( + prefix=registry.API_GROUP, + key="last-handled-configuration", + ) # Apply kopf setting to force watches to restart periodically settings.watching.client_timeout = int(os.environ.get("KOPF_WATCH_TIMEOUT", "600")) global K8S_CLIENT @@ -57,6 +94,38 @@ async def cleanup(**_): LOG.info("Cleanup complete.") +async def ekresource_for_model(model, subresource=None): + """Returns an easykube resource for the given model.""" + api = K8S_CLIENT.api(f"{registry.API_GROUP}/{model._meta.version}") + resource = model._meta.plural_name + if subresource: + resource = f"{resource}/{subresource}" + return await api.resource(resource) + + +async def save_instance_status(instance): + """Save the status of the given instance.""" + ekresource = await ekresource_for_model(instance.__class__, "status") + try: + data = await ekresource.replace( + instance.metadata.name, + { + # Include the resource version for optimistic concurrency + "metadata": {"resourceVersion": instance.metadata.resource_version}, + "status": instance.status.model_dump(exclude_defaults=True), + }, + namespace=instance.metadata.namespace, + ) + except easykube.ApiError as exc: + # Retry as soon as possible after a 409 + if exc.status_code == 409: + raise kopf.TemporaryError("conflict updating status", delay=1) + else: + raise + # Store the new resource version + instance.metadata.resource_version = data["metadata"]["resourceVersion"] + + async def get_reference(namespace: str, ref: schedule_crd.ScheduleRef): resource = await K8S_CLIENT.api(ref.api_version).resource(ref.kind) object = await resource.fetch(ref.name, namespace=namespace) @@ -120,3 +189,339 @@ async def schedule_check(body, namespace, **_): if not schedule.status.ref_delete_triggered: await check_for_delete(namespace, schedule) + + +async def find_blazar_lease(blazar_client, lease_name): + return await anext( + ( + lease + async for lease in blazar_client.resource("leases").list() + if lease["name"] == lease_name + ), + None, + ) + + +def blazar_enabled(cloud): + """Returns True if Blazar should be used, False otherwise.""" + if LEASE_BLAZAR_ENABLED == "yes": + return True + elif LEASE_BLAZAR_ENABLED == "auto": + try: + _ = cloud.api_client("reservation") + except openstack.ApiNotSupportedError: + return False + else: + return True + else: + return False + + +class BlazarLeaseCreateError(Exception): + """Raised when there is a permanent error creating a Blazar lease.""" + + +async def create_blazar_lease(blazar_client, lease_name, lease): + # Sum the requested machine counts by flavor ID + flavor_counts = collections.defaultdict(int) + for machine in lease.spec.resources.machines: + flavor_counts[machine.size_id] += machine.count + try: + return await blazar_client.resource("leases").create( + { + "name": lease_name, + "start_date": ( + lease.spec.starts_at.strftime("%Y-%m-%d %H:%M") + if lease.spec.starts_at + else "now" + ), + "end_date": lease.spec.ends_at.strftime("%Y-%m-%d %H:%M"), + "reservations": [ + { + "amount": int(count), + "flavor_id": flavor_id, + "resource_type": "flavor:instance", + "affinity": "None", + } + for flavor_id, count in flavor_counts.items() + ], + "events": [], + "before_end_date": None, + } + ) + except httpx.HTTPStatusError as exc: + if exc.response.status_code == 400: + try: + message = exc.response.json()["error_message"] + except (json.JSONDecodeError, TypeError, KeyError): + message = exc.response.text + raise BlazarLeaseCreateError(f"error creating blazar lease - {message}") + else: + raise + + +def get_size_map(blazar_lease): + """Produce a map from requested size ID to reservation size ID.""" + size_map = {} + for reservation in blazar_lease.get("reservations", []): + if ( + reservation["resource_type"] == "flavor:instance" + and "resource_properties" in reservation + ): + properties = json.loads(reservation["resource_properties"]) + size_map[properties["id"]] = reservation["id"] + return size_map + + +async def get_size_name_map(cloud, size_map): + """Produce a size name map for the given size map.""" + compute_client = cloud.api_client("compute") + flavor_names = { + flavor.id: flavor.name + async for flavor in compute_client.resource("flavors").list() + } + size_name_map = {} + for original_id, new_id in size_map.items(): + try: + size_name_map[flavor_names[original_id]] = flavor_names[new_id] + except KeyError: + pass + return size_name_map + + +async def update_lease_status_no_blazar(cloud, lease): + """Updates the lease status when Blazar is not used for the lease.""" + if lease.spec.starts_at: + now = datetime.datetime.now(datetime.timezone.utc) + lease_started = now >= lease.spec.starts_at + else: + # No start date means start now + lease_started = True + if lease_started: + lease.status.set_phase(lease_crd.LeasePhase.ACTIVE) + lease.status.size_map = { + m.size_id: m.size_id for m in lease.spec.resources.machines + } + lease.status.size_name_map = await get_size_name_map( + cloud, lease.status.size_map + ) + else: + lease.status.set_phase(lease_crd.LeasePhase.PENDING) + + +@kopf.on.create(registry.API_GROUP, "lease") +@kopf.on.resume(registry.API_GROUP, "lease") +async def reconcile_lease(body, logger, **_): + lease = lease_crd.Lease.model_validate(body) + + # Put the lease into a pending state as soon as possible + if lease.status.phase == lease_crd.LeasePhase.UNKNOWN: + lease.status.set_phase(lease_crd.LeasePhase.PENDING) + await save_instance_status(lease) + + # Create a cloud instance from the referenced credential secret + secrets = await K8S_CLIENT.api("v1").resource("secrets") + cloud_creds = await secrets.fetch( + lease.spec.cloud_credentials_secret_name, namespace=lease.metadata.namespace + ) + async with openstack.from_secret_data(cloud_creds.data) as cloud: + # If the lease has no end date, we don't attempt to use Blazar + if not lease.spec.ends_at: + logger.info("lease has no end date") + await update_lease_status_no_blazar(cloud, lease) + await save_instance_status(lease) + return + + # If the lease has an end date, we might need to do some Blazar stuff + if blazar_enabled(cloud): + blazar_client = cloud.api_client("reservation", timeout=30) + logger.info("checking if blazar lease exists") + blazar_lease_name = f"az-{lease.metadata.name}" + blazar_lease = await find_blazar_lease(blazar_client, blazar_lease_name) + if not blazar_lease: + # NOTE(mkjpryor) + # + # We only create the Blazar lease if we are in the PENDING phase + # + # If we are in any other phase and the lease does not exist, then we + # leave the status as-is but log it. This can happen in one of three + # ways: + # + # 1. The lease was not created due to an unrecoverable error + # 2. The lease we created was deleted by someone else + # 3. Blazar has been enabled on a cloud after a lease has already been + # processed using the non-Blazar code path + if lease.status.phase == lease_crd.LeasePhase.PENDING: + logger.info("creating blazar lease") + try: + blazar_lease = await create_blazar_lease( + blazar_client, blazar_lease_name, lease + ) + except BlazarLeaseCreateError as exc: + logger.error(str(exc)) + lease.status.set_phase(lease_crd.LeasePhase.ERROR, str(exc)) + await save_instance_status(lease) + return + else: + phase = lease.status.phase.name + logger.warn(f"phase is {phase} but blazar lease does not exist") + # Set the status from the created lease + if blazar_lease: + blazar_lease_status = blazar_lease["status"] + logger.info(f"blazar lease has status '{blazar_lease_status}'") + lease.status.set_phase(lease_crd.LeasePhase[blazar_lease_status]) + if lease.status.phase == lease_crd.LeasePhase.ACTIVE: + lease.status.size_map = get_size_map(blazar_lease) + lease.status.size_name_map = await get_size_name_map( + cloud, lease.status.size_map + ) + # Save the currrent status of the lease + await save_instance_status(lease) + else: + # We are not using Blazar + # We just control the phase based on the start and end times + logger.info("not attempting to use blazar") + await update_lease_status_no_blazar(cloud, lease) + await save_instance_status(lease) + return + + +@kopf.timer( + registry.API_GROUP, + "lease", + interval=LEASE_CHECK_INTERVAL_SECONDS, + # This means that the timer will not run while we are modifying the resource + idle=LEASE_CHECK_INTERVAL_SECONDS, +) +async def check_lease(body, logger, **_): + lease = lease_crd.Lease.model_validate(body) + + # Create a cloud instance from the referenced credential secret + secrets = await K8S_CLIENT.api("v1").resource("secrets") + cloud_creds = await secrets.fetch( + lease.spec.cloud_credentials_secret_name, namespace=lease.metadata.namespace + ) + async with openstack.from_secret_data(cloud_creds.data) as cloud: + if not lease.spec.ends_at: + await update_lease_status_no_blazar(cloud, lease) + await save_instance_status(lease) + return + + # If the lease has an end date, we may need to contact Blazar + if blazar_enabled(cloud): + blazar_client = cloud.api_client("reservation", timeout=30) + logger.info("checking if blazar lease exists") + blazar_lease_name = f"az-{lease.metadata.name}" + blazar_lease = await find_blazar_lease(blazar_client, blazar_lease_name) + if blazar_lease: + blazar_lease_status = blazar_lease["status"] + logger.info(f"blazar lease has status '{blazar_lease_status}'") + # Set the phase from the Blazar lease status + lease.status.set_phase(lease_crd.LeasePhase[blazar_lease_status]) + # If the lease is active, report the size map + if lease.status.phase == lease_crd.LeasePhase.ACTIVE: + lease.status.size_map = get_size_map(blazar_lease) + lease.status.size_name_map = await get_size_name_map( + cloud, lease.status.size_map + ) + await save_instance_status(lease) + else: + phase = lease.status.phase.name + logger.warn(f"phase is {phase} but blazar lease does not exist") + else: + logger.info("not attempting to use blazar") + await update_lease_status_no_blazar(cloud, lease) + await save_instance_status(lease) + + # Calculate the grace period before the end of the lease that we want to use + grace_period = ( + lease.spec.grace_period + if lease.spec.grace_period is not None + else LEASE_DEFAULT_GRACE_PERIOD_SECONDS + ) + # Calculate the threshold time at which we want to issue a delete + threshold = lease.spec.ends_at - datetime.timedelta(seconds=grace_period) + # Issue the delete if the threshold time has passed + if threshold < datetime.datetime.now(datetime.timezone.utc): + logger.info("lease is ending within grace period - deleting owners") + for owner in lease.metadata.owner_references: + resource = await K8S_CLIENT.api(owner.api_version).resource(owner.kind) + await resource.delete( + owner.name, + # Make sure that we block the owner from deleting, if configured + propagation_policy="Foreground", + namespace=lease.metadata.namespace, + ) + else: + logger.info("lease is not within the grace period of ending") + + +@kopf.on.delete(registry.API_GROUP, "lease") +async def delete_lease(body, logger, **_): + lease = lease_crd.Lease.model_validate(body) + + # Wait until our finalizer is the only finalizer + if any(f != registry.API_GROUP for f in lease.metadata.finalizers): + raise kopf.TemporaryError("waiting for finalizers to be removed", delay=15) + + # Put the lease into a deleting state once we are able to start deleting + if lease.status.phase != lease_crd.LeasePhase.DELETING: + lease.status.set_phase(lease_crd.LeasePhase.DELETING) + await save_instance_status(lease) + + # Once all other finalizers have been removed, we can do our teardown + # This involves deleting the Blazar lease, if one exists, and the app cred + secrets = await K8S_CLIENT.api("v1").resource("secrets") + try: + cloud_creds = await secrets.fetch( + lease.spec.cloud_credentials_secret_name, namespace=lease.metadata.namespace + ) + except easykube.ApiError as exc: + if exc.status_code == 404: + # If we can't find the cloud credential, there isn't much we can do + logger.warn("cloud credential missing - no action taken") + return + else: + raise + async with openstack.from_secret_data(cloud_creds.data) as cloud: + # It is possible that the app cred was deleted but the secret wasn't + # In that case, the cloud will report as unauthenticated + if cloud.is_authenticated: + # Check if there is any work to do to delete a Blazar lease + if lease.spec.ends_at and blazar_enabled(cloud): + logger.info("checking for blazar lease") + blazar_client = cloud.api_client("reservation", timeout=30) + blazar_lease_name = f"az-{lease.metadata.name}" + blazar_lease = await find_blazar_lease(blazar_client, blazar_lease_name) + if blazar_lease: + logger.info("deleting blazar lease") + await blazar_client.resource("leases").delete(blazar_lease["id"]) + raise kopf.TemporaryError( + "waiting for blazar lease to delete", delay=15 + ) + else: + logger.warn("blazar lease does not exist") + else: + logger.info("blazar is not used for this lease") + + # Delete the application credential + identityapi = cloud.api_client("identity", "v3") + appcreds = identityapi.resource( + "application_credentials", + # appcreds are user-namespaced + prefix=f"users/{cloud.current_user_id}", + ) + try: + await appcreds.delete(cloud.application_credential_id) + except httpx.HTTPStatusError as exc: + if exc.response.status_code == 403: + logger.warn("unable to delete application credential for cluster") + else: + raise + logger.info("deleted application credential for cluster") + + # Now the appcred is gone, we can delete the secret + await secrets.delete( + lease.spec.cloud_credentials_secret_name, namespace=lease.metadata.namespace + ) + logger.info("cloud credential secret deleted") diff --git a/azimuth_schedule_operator/tests/models/test_crds.py b/azimuth_schedule_operator/tests/models/test_crds.py index dc41666..abe1a2d 100644 --- a/azimuth_schedule_operator/tests/models/test_crds.py +++ b/azimuth_schedule_operator/tests/models/test_crds.py @@ -5,7 +5,7 @@ class TestModels(base.TestCase): - def test_cluster_type_crd_json(self): + def test_schedule_crd_json(self): schedule_crd = None for resource in registry.get_crd_resources(): meta = resource.get("metadata", {}) @@ -111,3 +111,187 @@ def test_cluster_type_crd_json(self): } }""" self.assertEqual(expected, actual) + + def test_lease_crd_json(self): + lease_crd = None + for resource in registry.get_crd_resources(): + meta = resource.get("metadata", {}) + name = meta.get("name") + if name == "leases.scheduling.azimuth.stackhpc.com": + lease_crd = resource + + actual = json.dumps(lease_crd, indent=2) + expected = """\ +{ + "apiVersion": "apiextensions.k8s.io/v1", + "kind": "CustomResourceDefinition", + "metadata": { + "name": "leases.scheduling.azimuth.stackhpc.com" + }, + "spec": { + "group": "scheduling.azimuth.stackhpc.com", + "scope": "Namespaced", + "names": { + "kind": "Lease", + "singular": "lease", + "plural": "leases", + "shortNames": [], + "categories": [ + "azimuth" + ] + }, + "versions": [ + { + "name": "v1alpha1", + "served": true, + "storage": true, + "schema": { + "openAPIV3Schema": { + "description": "A lease consisting of one or more reserved resources.", + "properties": { + "spec": { + "description": "The spec of a lease.", + "properties": { + "cloudCredentialsSecretName": { + "description": "The name of the secret containing the cloud credentials.", + "minLength": 1, + "type": "string" + }, + "startsAt": { + "description": "The start time for the lease. If no start time is given, it is assumed to start immediately.", + "format": "date-time", + "nullable": true, + "type": "string" + }, + "endsAt": { + "description": "The end time for the lease. If no end time is given, the lease is assumed to be infinite.", + "format": "date-time", + "nullable": true, + "type": "string" + }, + "gracePeriod": { + "description": "The grace period before the end of the lease that the platform will be given to shut down gracefully. If not given, the operator default grace period will be used.", + "minimum": 0, + "nullable": true, + "type": "integer" + }, + "resources": { + "description": "The resources that a lease is reserving.", + "properties": { + "machines": { + "description": "Machines that should be reserved by the lease.", + "items": { + "description": "Represents a reservation for a machine.", + "properties": { + "sizeId": { + "description": "The ID of the size for the machine.", + "minLength": 1, + "type": "string" + }, + "count": { + "description": "The number of machines of this size to reserve.", + "exclusiveMinimum": true, + "minimum": 0, + "type": "integer" + } + }, + "required": [ + "sizeId", + "count" + ], + "type": "object" + }, + "type": "array" + } + }, + "type": "object" + } + }, + "required": [ + "cloudCredentialsSecretName", + "resources" + ], + "type": "object" + }, + "status": { + "description": "The status of a lease.", + "properties": { + "phase": { + "description": "The phase of a lease.", + "enum": [ + "Pending", + "Active", + "Terminated", + "Error", + "Creating", + "Starting", + "Updating", + "Terminating", + "Deleting", + "Unknown" + ], + "type": "string" + }, + "errorMessage": { + "description": "The error message for the lease, if known.", + "type": "string" + }, + "sizeMap": { + "additionalProperties": { + "type": "string" + }, + "description": "Mapping of original size ID to reserved size ID.", + "type": "object", + "x-kubernetes-preserve-unknown-fields": true + }, + "sizeNameMap": { + "additionalProperties": { + "type": "string" + }, + "description": "Mapping of original size name to reserved size name.", + "type": "object", + "x-kubernetes-preserve-unknown-fields": true + } + }, + "type": "object", + "x-kubernetes-preserve-unknown-fields": true + } + }, + "required": [ + "spec" + ], + "type": "object" + } + }, + "subresources": { + "status": {} + }, + "additionalPrinterColumns": [ + { + "name": "Starts At", + "type": "string", + "format": "date-time", + "jsonPath": ".spec.startsAt" + }, + { + "name": "Ends At", + "type": "string", + "format": "date-time", + "jsonPath": ".spec.endsAt" + }, + { + "name": "phase", + "type": "string", + "jsonPath": ".status.phase" + }, + { + "name": "Age", + "type": "date", + "jsonPath": ".metadata.creationTimestamp" + } + ] + } + ] + } +}""" # noqa + self.assertEqual(expected, actual) diff --git a/azimuth_schedule_operator/tests/models/test_registry.py b/azimuth_schedule_operator/tests/models/test_registry.py index d026f31..59fef22 100644 --- a/azimuth_schedule_operator/tests/models/test_registry.py +++ b/azimuth_schedule_operator/tests/models/test_registry.py @@ -2,11 +2,11 @@ from azimuth_schedule_operator.tests import base -class TestRegustry(base.TestCase): +class TestRegistry(base.TestCase): def test_registry_size(self): reg = registry.get_registry() - self.assertEqual(1, len(list(reg))) + self.assertEqual(2, len(list(reg))) def test_get_crd_resources(self): crds = registry.get_crd_resources() - self.assertEqual(1, len(list(crds))) + self.assertEqual(2, len(list(crds))) diff --git a/azimuth_schedule_operator/tests/test_lease.py b/azimuth_schedule_operator/tests/test_lease.py new file mode 100644 index 0000000..37422b0 --- /dev/null +++ b/azimuth_schedule_operator/tests/test_lease.py @@ -0,0 +1,1462 @@ +import json +import unittest +from unittest import mock + +from easykube.rest.util import PropertyDict + +import freezegun + +import kopf + +from ..models.v1alpha1 import lease as lease_crd +from .. import openstack, operator + +from . import util + + +API_VERSION = "scheduling.azimuth.stackhpc.com/v1alpha1" + + +def fake_credential(): + return { + "apiVersion": "v1", + "kind": "Secret", + "metadata": { + "name": "fake-credential", + "namespace": "fake-ns", + }, + "data": { + "clouds.yaml": "NOT A REAL CREDENTIAL", + }, + } + + +def fake_lease(start=True, end=True, phase=None): + lease = { + "apiVersion": API_VERSION, + "kind": "Lease", + "metadata": { + "name": "fake-lease", + "namespace": "fake-ns", + "resourceVersion": "currentversion", + "ownerReferences": [ + { + "apiVersion": "v1", + "kind": "ConfigMap", + "name": "fake-lease-owner", + "uid": "fake-uid", + "blockOwnerDeletion": True, + }, + ], + "finalizers": [ + "scheduling.azimuth.stackhpc.com", + ], + }, + "spec": { + "cloudCredentialsSecretName": "fake-credential", + "resources": { + "machines": [ + { + "sizeId": "id1", + "count": 3, + }, + { + "sizeId": "id2", + "count": 5, + }, + ], + }, + }, + } + if start: + lease["spec"]["startsAt"] = "2024-08-21T15:00:00Z" + if end: + lease["spec"]["endsAt"] = "2024-08-21T16:00:00Z" + if phase: + lease.setdefault("status", {})["phase"] = str(phase) + if phase == lease_crd.LeasePhase.ACTIVE: + lease["status"]["sizeMap"] = {"id1": "newid1", "id2": "newid2"} + lease["status"]["sizeNameMap"] = { + "flavor1": "newflavor1", + "flavor2": "newflavor2", + } + return lease + + +def fake_blazar_lease_request(start=True): + return { + "name": "az-fake-lease", + "start_date": "2024-08-21 15:00" if start else "now", + "end_date": "2024-08-21 16:00", + "reservations": [ + { + "amount": 3, + "flavor_id": "id1", + "resource_type": "flavor:instance", + "affinity": "None", + }, + { + "amount": 5, + "flavor_id": "id2", + "resource_type": "flavor:instance", + "affinity": "None", + }, + ], + "events": [], + "before_end_date": None, + } + + +def fake_blazar_lease(status="PENDING"): + lease = { + "id": "blazarleaseid", + "name": "az-fake-lease", + "status": status, + } + if status == "ACTIVE": + lease["reservations"] = [ + { + "id": "newid1", + "resource_type": "flavor:instance", + "resource_properties": json.dumps({"id": "id1", "foo": "bar", "x": 1}), + }, + { + "id": "newid2", + "resource_type": "flavor:instance", + "resource_properties": json.dumps({"id": "id2", "foo": "baz", "y": 2}), + }, + { + "id": "notused", + "resource_type": "physical:host", + "resource_properties": "", + }, + ] + return lease + + +class TestLease(unittest.IsolatedAsyncioTestCase): + @mock.patch.object(operator, "K8S_CLIENT", new_callable=util.mock_k8s_client) + async def test_ekresource_for_model(self, k8s_client): + resource = await operator.ekresource_for_model(lease_crd.Lease) + # Check that the client was interacted with as expected + k8s_client.api.assert_called_once_with(API_VERSION) + k8s_client.apis[API_VERSION].resource.assert_awaited_once_with("leases") + # Check that the resource is the one returned by the client + self.assertIs(resource, k8s_client.apis[API_VERSION].resources["leases"]) + + @mock.patch.object(operator, "K8S_CLIENT", new_callable=util.mock_k8s_client) + async def test_ekresource_for_model_subresource(self, k8s_client): + resource = await operator.ekresource_for_model( + lease_crd.Lease, subresource="status" + ) + # Check that the client was interacted with as expected + k8s_client.api.assert_called_once_with(API_VERSION) + k8s_client.apis[API_VERSION].resource.assert_awaited_once_with("leases/status") + # Check that the resource is the one returned by the client + self.assertIs(resource, k8s_client.apis[API_VERSION].resources["leases/status"]) + + @mock.patch.object(operator, "K8S_CLIENT", new_callable=util.mock_k8s_client) + async def test_save_instance_status(self, k8s_client): + lease_data = fake_lease(phase=lease_crd.LeasePhase.ACTIVE) + + k8s_client.apis[API_VERSION].resources["leases/status"].replace.return_value = { + **lease_data, + "metadata": {**lease_data["metadata"], "resourceVersion": "nextversion"}, + } + + lease = lease_crd.Lease.model_validate(lease_data) + await operator.save_instance_status(lease) + + self.assertEqual(lease.metadata.resource_version, "nextversion") + k8s_client.apis[API_VERSION].resources[ + "leases/status" + ].replace.assert_awaited_once_with( + lease_data["metadata"]["name"], + { + "metadata": { + "resourceVersion": lease_data["metadata"]["resourceVersion"] + }, + "status": lease_data["status"], + }, + namespace=lease_data["metadata"]["namespace"], + ) + + @mock.patch.object(operator, "K8S_CLIENT", new_callable=util.mock_k8s_client) + async def test_save_instance_status_conflict(self, k8s_client): + lease_data = fake_lease(phase=lease_crd.LeasePhase.PENDING) + + k8s_client.apis[API_VERSION].resources["leases/status"].replace.side_effect = ( + util.k8s_api_error(409) + ) + + lease = lease_crd.Lease.model_validate(lease_data) + with self.assertRaises(kopf.TemporaryError): + await operator.save_instance_status(lease) + + k8s_client.apis[API_VERSION].resources[ + "leases/status" + ].replace.assert_awaited_once_with( + lease_data["metadata"]["name"], + { + "metadata": { + "resourceVersion": lease_data["metadata"]["resourceVersion"] + }, + "status": lease_data["status"], + }, + namespace=lease_data["metadata"]["namespace"], + ) + + async def test_find_blazar_lease(self): + fake_lease = {"name": "az-fake-lease"} + blazar_client = util.mock_openstack_client() + blazar_client.resources["leases"].list.return_value = util.as_async_iterable( + [fake_lease] + ) + + lease = await operator.find_blazar_lease(blazar_client, fake_lease["name"]) + + self.assertEqual(lease, fake_lease) + blazar_client.resources["leases"].list.assert_called_once_with() + + async def test_find_blazar_lease_not_present(self): + fake_lease = {"name": "az-fake-lease"} + blazar_client = util.mock_openstack_cloud().clients["reservation"] + blazar_client.resources["leases"].list.side_effect = ( + lambda: util.as_async_iterable([fake_lease]) + ) + + lease = await operator.find_blazar_lease(blazar_client, "doesnotexist") + + self.assertIsNone(lease) + blazar_client.resources["leases"].list.assert_called_once_with() + + @mock.patch.object(operator, "LEASE_BLAZAR_ENABLED", "yes") + def test_blazar_enabled_yes(self): + cloud = util.mock_openstack_cloud() + + self.assertTrue(operator.blazar_enabled(cloud)) + + cloud.api_client.assert_not_called() + + @mock.patch.object(operator, "LEASE_BLAZAR_ENABLED", "no") + def test_blazar_enabled_no(self): + cloud = util.mock_openstack_cloud() + + self.assertFalse(operator.blazar_enabled(cloud)) + + cloud.api_client.assert_not_called() + + def test_blazar_enabled_auto_blazar_available(self): + cloud = util.mock_openstack_cloud() + + self.assertTrue(operator.blazar_enabled(cloud)) + + cloud.api_client.assert_called_once_with("reservation") + + def test_blazar_enabled_auto_blazar_not_available(self): + cloud = util.mock_openstack_cloud() + cloud.api_client.side_effect = openstack.ApiNotSupportedError("reservation") + + self.assertFalse(operator.blazar_enabled(cloud)) + + cloud.api_client.assert_called_once_with("reservation") + + async def test_create_blazar_lease_no_start(self): + blazar_client = util.mock_openstack_client() + blazar_lease_data = {"name": "az-fake-lease"} + blazar_client.resources["leases"].create.return_value = blazar_lease_data + + lease = lease_crd.Lease.model_validate(fake_lease(start=False)) + created = await operator.create_blazar_lease( + blazar_client, f"az-{lease.metadata.name}", lease + ) + + self.assertEqual(created, blazar_lease_data) + blazar_client.resources["leases"].create.assert_awaited_once_with( + fake_blazar_lease_request(start=False) + ) + + async def test_create_blazar_lease_with_start(self): + blazar_client = util.mock_openstack_client() + blazar_lease_data = {"name": "az-fake-lease"} + blazar_client.resources["leases"].create.return_value = blazar_lease_data + + lease = lease_crd.Lease.model_validate(fake_lease()) + created = await operator.create_blazar_lease( + blazar_client, f"az-{lease.metadata.name}", lease + ) + + self.assertEqual(created, blazar_lease_data) + blazar_client.resources["leases"].create.assert_awaited_once_with( + fake_blazar_lease_request() + ) + + async def test_create_blazar_lease_error_json_message(self): + blazar_client = util.mock_openstack_client() + blazar_client.resources["leases"].create.side_effect = util.httpx_status_error( + 400, json={"error_message": "this is an error from blazar"} + ) + + lease = lease_crd.Lease.model_validate(fake_lease()) + with self.assertRaises(operator.BlazarLeaseCreateError) as ctx: + _ = await operator.create_blazar_lease( + blazar_client, f"az-{lease.metadata.name}", lease + ) + + self.assertEqual( + str(ctx.exception), + "error creating blazar lease - this is an error from blazar", + ) + blazar_client.resources["leases"].create.assert_awaited_once() + + async def test_create_blazar_lease_error_not_valid_json(self): + blazar_client = util.mock_openstack_client() + blazar_client.resources["leases"].create.side_effect = util.httpx_status_error( + 400, text="this is not valid json" + ) + + lease = lease_crd.Lease.model_validate(fake_lease()) + with self.assertRaises(operator.BlazarLeaseCreateError) as ctx: + _ = await operator.create_blazar_lease( + blazar_client, f"az-{lease.metadata.name}", lease + ) + + self.assertEqual( + str(ctx.exception), "error creating blazar lease - this is not valid json" + ) + blazar_client.resources["leases"].create.assert_awaited_once() + + def test_get_size_map(self): + blazar_lease = fake_blazar_lease(status="ACTIVE") + size_map = operator.get_size_map(blazar_lease) + self.assertEqual(size_map, {"id1": "newid1", "id2": "newid2"}) + + async def test_get_size_name_map(self): + cloud = util.mock_openstack_cloud() + cloud.clients["compute"].resources["flavors"].list.return_value = ( + util.as_async_iterable( + [ + PropertyDict({"id": "id1", "name": "flavor1"}), + PropertyDict({"id": "id2", "name": "flavor2"}), + PropertyDict({"id": "newid1", "name": "newflavor1"}), + PropertyDict({"id": "newid2", "name": "newflavor2"}), + ] + ) + ) + + size_map = {"id1": "newid1", "id2": "newid2", "id3": "newid3"} + size_name_map = await operator.get_size_name_map(cloud, size_map) + + self.assertEqual( + size_name_map, {"flavor1": "newflavor1", "flavor2": "newflavor2"} + ) + cloud.clients["compute"].resources["flavors"].list.assert_called_once_with() + + @mock.patch.object(operator, "get_size_name_map") + async def test_update_lease_status_no_blazar_no_start(self, get_size_name_map): + cloud = util.mock_openstack_cloud() + + size_name_map = {"flavor1": "flavor1", "flavor2": "flavor2"} + get_size_name_map.return_value = size_name_map + + lease = lease_crd.Lease.model_validate(fake_lease(start=False)) + await operator.update_lease_status_no_blazar(cloud, lease) + + self.assertEqual(lease.status.phase, lease_crd.LeasePhase.ACTIVE) + self.assertEqual(lease.status.size_map, {"id1": "id1", "id2": "id2"}) + self.assertEqual(lease.status.size_name_map, size_name_map) + + @mock.patch.object(operator, "get_size_name_map") + async def test_update_lease_status_no_blazar_started(self, get_size_name_map): + cloud = util.mock_openstack_cloud() + + size_name_map = {"flavor1": "flavor1", "flavor2": "flavor2"} + get_size_name_map.return_value = size_name_map + + lease = lease_crd.Lease.model_validate(fake_lease()) + with freezegun.freeze_time("2024-08-21T15:30:00Z"): + await operator.update_lease_status_no_blazar(cloud, lease) + + self.assertEqual(lease.status.phase, lease_crd.LeasePhase.ACTIVE) + self.assertEqual(lease.status.size_map, {"id1": "id1", "id2": "id2"}) + self.assertEqual(lease.status.size_name_map, size_name_map) + + @mock.patch.object(operator, "get_size_name_map") + async def test_update_lease_status_no_blazar_not_started(self, get_size_name_map): + cloud = util.mock_openstack_cloud() + + lease = lease_crd.Lease.model_validate(fake_lease()) + with freezegun.freeze_time("2024-08-21T14:30:00Z"): + await operator.update_lease_status_no_blazar(cloud, lease) + + get_size_name_map.assert_not_called() + self.assertEqual(lease.status.phase, lease_crd.LeasePhase.PENDING) + self.assertEqual(lease.status.size_map, {}) + self.assertEqual(lease.status.size_name_map, {}) + + def k8s_client_config_common(self, k8s_client): + k8s_secrets = k8s_client.apis["v1"].resources["secrets"] + k8s_secrets.fetch.return_value = PropertyDict(fake_credential()) + k8s_leases_status = k8s_client.apis[API_VERSION].resources["leases/status"] + k8s_leases_status.replace.return_value = { + "metadata": {"resourceVersion": "nextversion"} + } + + def os_cloud_config_common(self, os_cloud): + type(os_cloud).is_authenticated = mock.PropertyMock(return_value=True) + type(os_cloud).application_credential_id = mock.PropertyMock( + return_value="appcredid" + ) + os_flavors = os_cloud.clients["compute"].resources["flavors"] + os_flavors.list.return_value = util.as_async_iterable( + [ + PropertyDict({"id": "id1", "name": "flavor1"}), + PropertyDict({"id": "id2", "name": "flavor2"}), + PropertyDict({"id": "newid1", "name": "newflavor1"}), + PropertyDict({"id": "newid2", "name": "newflavor2"}), + ] + ) + + @mock.patch.object(openstack, "from_secret_data") + @mock.patch.object(operator, "K8S_CLIENT", new_callable=util.mock_k8s_client) + @mock.patch.object(operator, "LEASE_BLAZAR_ENABLED", "no") + async def test_reconcile_lease_no_blazar_no_end_no_start( + self, k8s_client, openstack_from_secret_data + ): + # Configure the Kubernetes client + self.k8s_client_config_common(k8s_client) + + # Configure the OpenStack cloud + os_cloud = openstack_from_secret_data.return_value = util.mock_openstack_cloud() + self.os_cloud_config_common(os_cloud) + + lease_data = fake_lease(start=False, end=False) + await operator.reconcile_lease(lease_data, mock.Mock()) + + # Check that the lease was patched as expected given the setup + lease_status_replace = ( + k8s_client.apis[API_VERSION].resources["leases/status"].replace + ) + self.assertEqual(lease_status_replace.call_count, 2) + lease_status_replace.assert_has_calls( + [ + # The unknown phase should be updated to pending + mock.call( + "fake-lease", + util.LeaseStatusMatcher(lease_crd.LeasePhase.PENDING), + namespace="fake-ns", + ), + # The lease should be made active + mock.call( + "fake-lease", + util.LeaseStatusMatcher( + lease_crd.LeasePhase.ACTIVE, + {"id1": "id1", "id2": "id2"}, + {"flavor1": "flavor1", "flavor2": "flavor2"}, + ), + namespace="fake-ns", + ), + ] + ) + + @mock.patch.object(openstack, "from_secret_data") + @mock.patch.object(operator, "K8S_CLIENT", new_callable=util.mock_k8s_client) + @mock.patch.object(operator, "LEASE_BLAZAR_ENABLED", "no") + async def test_reconcile_lease_no_blazar_no_end_start_in_past( + self, k8s_client, openstack_from_secret_data + ): + # Configure the Kubernetes client + self.k8s_client_config_common(k8s_client) + + # Configure the OpenStack cloud + os_cloud = openstack_from_secret_data.return_value = util.mock_openstack_cloud() + self.os_cloud_config_common(os_cloud) + + lease_data = fake_lease( + start=True, end=False, phase=lease_crd.LeasePhase.PENDING + ) + with freezegun.freeze_time("2024-08-21T15:30:00Z"): + await operator.reconcile_lease(lease_data, mock.Mock()) + + # Check that the lease was patched as expected + k8s_client.apis[API_VERSION].resources[ + "leases/status" + ].replace.assert_called_once_with( + "fake-lease", + util.LeaseStatusMatcher( + lease_crd.LeasePhase.ACTIVE, + {"id1": "id1", "id2": "id2"}, + {"flavor1": "flavor1", "flavor2": "flavor2"}, + ), + namespace="fake-ns", + ) + + @mock.patch.object(openstack, "from_secret_data") + @mock.patch.object(operator, "K8S_CLIENT", new_callable=util.mock_k8s_client) + @mock.patch.object(operator, "LEASE_BLAZAR_ENABLED", "no") + async def test_reconcile_lease_no_blazar_no_end_start_in_future( + self, k8s_client, openstack_from_secret_data + ): + # Configure the Kubernetes client + self.k8s_client_config_common(k8s_client) + + # Configure the OpenStack cloud + os_cloud = openstack_from_secret_data.return_value = util.mock_openstack_cloud() + self.os_cloud_config_common(os_cloud) + + lease_data = fake_lease( + start=True, end=False, phase=lease_crd.LeasePhase.PENDING + ) + with freezegun.freeze_time("2024-08-21T14:30:00Z"): + await operator.reconcile_lease(lease_data, mock.Mock()) + + # Check that the lease was patched as expected + k8s_client.apis[API_VERSION].resources[ + "leases/status" + ].replace.assert_called_once_with( + "fake-lease", + util.LeaseStatusMatcher(lease_crd.LeasePhase.PENDING), + namespace="fake-ns", + ) + + @mock.patch.object(openstack, "from_secret_data") + @mock.patch.object(operator, "K8S_CLIENT", new_callable=util.mock_k8s_client) + @mock.patch.object(operator, "LEASE_BLAZAR_ENABLED", "no") + async def test_reconcile_lease_no_blazar_end_no_start( + self, k8s_client, openstack_from_secret_data + ): + # Configure the Kubernetes client + self.k8s_client_config_common(k8s_client) + + # Configure the OpenStack cloud + os_cloud = openstack_from_secret_data.return_value = util.mock_openstack_cloud() + self.os_cloud_config_common(os_cloud) + + lease_data = fake_lease( + start=False, end=True, phase=lease_crd.LeasePhase.PENDING + ) + with freezegun.freeze_time("2024-08-21T15:30:00Z"): + await operator.reconcile_lease(lease_data, mock.Mock()) + + # Check that the lease was patched as expected + k8s_client.apis[API_VERSION].resources[ + "leases/status" + ].replace.assert_called_once_with( + "fake-lease", + util.LeaseStatusMatcher( + lease_crd.LeasePhase.ACTIVE, + {"id1": "id1", "id2": "id2"}, + {"flavor1": "flavor1", "flavor2": "flavor2"}, + ), + namespace="fake-ns", + ) + + @mock.patch.object(openstack, "from_secret_data") + @mock.patch.object(operator, "K8S_CLIENT", new_callable=util.mock_k8s_client) + @mock.patch.object(operator, "LEASE_BLAZAR_ENABLED", "no") + async def test_reconcile_lease_no_blazar_end_start_in_past( + self, k8s_client, openstack_from_secret_data + ): + # Configure the Kubernetes client + self.k8s_client_config_common(k8s_client) + + # Configure the OpenStack cloud + os_cloud = openstack_from_secret_data.return_value = util.mock_openstack_cloud() + self.os_cloud_config_common(os_cloud) + + lease_data = fake_lease( + start=True, end=True, phase=lease_crd.LeasePhase.PENDING + ) + with freezegun.freeze_time("2024-08-21T15:30:00Z"): + await operator.reconcile_lease(lease_data, mock.Mock()) + + # Check that the lease was patched as expected + k8s_client.apis[API_VERSION].resources[ + "leases/status" + ].replace.assert_called_once_with( + "fake-lease", + util.LeaseStatusMatcher( + lease_crd.LeasePhase.ACTIVE, + {"id1": "id1", "id2": "id2"}, + {"flavor1": "flavor1", "flavor2": "flavor2"}, + ), + namespace="fake-ns", + ) + + @mock.patch.object(openstack, "from_secret_data") + @mock.patch.object(operator, "K8S_CLIENT", new_callable=util.mock_k8s_client) + @mock.patch.object(operator, "LEASE_BLAZAR_ENABLED", "no") + async def test_reconcile_lease_no_blazar_end_start_in_future( + self, k8s_client, openstack_from_secret_data + ): + # Configure the Kubernetes client + self.k8s_client_config_common(k8s_client) + + # Configure the OpenStack cloud + os_cloud = openstack_from_secret_data.return_value = util.mock_openstack_cloud() + self.os_cloud_config_common(os_cloud) + + lease_data = fake_lease( + start=True, end=True, phase=lease_crd.LeasePhase.PENDING + ) + with freezegun.freeze_time("2024-08-21T14:30:00Z"): + await operator.reconcile_lease(lease_data, mock.Mock()) + + # Check that the lease was patched as expected + k8s_client.apis[API_VERSION].resources[ + "leases/status" + ].replace.assert_called_once_with( + "fake-lease", + util.LeaseStatusMatcher(lease_crd.LeasePhase.PENDING), + namespace="fake-ns", + ) + + @mock.patch.object(openstack, "from_secret_data") + @mock.patch.object(operator, "K8S_CLIENT", new_callable=util.mock_k8s_client) + @mock.patch.object(operator, "LEASE_BLAZAR_ENABLED", "yes") + async def test_reconcile_lease_blazar_lease_created( + self, k8s_client, openstack_from_secret_data + ): + # Configure the Kubernetes client + self.k8s_client_config_common(k8s_client) + + # Configure the OpenStack cloud + os_cloud = openstack_from_secret_data.return_value = util.mock_openstack_cloud() + self.os_cloud_config_common(os_cloud) + os_leases = os_cloud.clients["reservation"].resources["leases"] + os_leases.create.return_value = fake_blazar_lease(status="CREATING") + + lease_data = fake_lease(phase=lease_crd.LeasePhase.PENDING) + await operator.reconcile_lease(lease_data, mock.Mock()) + + # Check that the Blazar lease was requested as expected + os_leases.create.assert_called_once_with(fake_blazar_lease_request()) + + # Check that the lease was patched as expected + k8s_client.apis[API_VERSION].resources[ + "leases/status" + ].replace.assert_called_once_with( + "fake-lease", + util.LeaseStatusMatcher(lease_crd.LeasePhase.CREATING), + namespace="fake-ns", + ) + + @mock.patch.object(openstack, "from_secret_data") + @mock.patch.object(operator, "K8S_CLIENT", new_callable=util.mock_k8s_client) + @mock.patch.object(operator, "LEASE_BLAZAR_ENABLED", "yes") + async def test_reconcile_lease_blazar_lease_create_error( + self, k8s_client, openstack_from_secret_data + ): + # Configure the Kubernetes client + self.k8s_client_config_common(k8s_client) + + # Configure the OpenStack cloud + os_cloud = openstack_from_secret_data.return_value = util.mock_openstack_cloud() + self.os_cloud_config_common(os_cloud) + os_leases = os_cloud.clients["reservation"].resources["leases"] + os_leases.create.side_effect = util.httpx_status_error( + 400, json={"error_message": "from blazar"} + ) + + lease_data = fake_lease(phase=lease_crd.LeasePhase.PENDING) + await operator.reconcile_lease(lease_data, mock.Mock()) + + # Check that the Blazar lease was requested as expected + os_leases.create.assert_called_once_with(fake_blazar_lease_request()) + + # Check that the lease was patched as expected + k8s_client.apis[API_VERSION].resources[ + "leases/status" + ].replace.assert_called_once_with( + "fake-lease", + util.LeaseStatusMatcher( + lease_crd.LeasePhase.ERROR, + error_message="error creating blazar lease - from blazar", + ), + namespace="fake-ns", + ) + + @mock.patch.object(openstack, "from_secret_data") + @mock.patch.object(operator, "K8S_CLIENT", new_callable=util.mock_k8s_client) + @mock.patch.object(operator, "LEASE_BLAZAR_ENABLED", "yes") + async def test_reconcile_lease_blazar_no_lease_not_pending( + self, k8s_client, openstack_from_secret_data + ): + # Configure the Kubernetes client + self.k8s_client_config_common(k8s_client) + + # Configure the OpenStack cloud + os_cloud = openstack_from_secret_data.return_value = util.mock_openstack_cloud() + self.os_cloud_config_common(os_cloud) + os_leases = os_cloud.clients["reservation"].resources["leases"] + os_leases.create.side_effect = util.httpx_status_error( + 400, json={"error_message": "from blazar"} + ) + + lease_data = fake_lease(phase=lease_crd.LeasePhase.ERROR) + await operator.reconcile_lease(lease_data, mock.Mock()) + + # If the lease in in any state other than Pending, we will not attempt to + # create the lease and the lease status should be left unchanged + os_leases.create.assert_not_called() + k8s_client.apis[API_VERSION].resources[ + "leases/status" + ].replace.assert_not_called() + + @mock.patch.object(openstack, "from_secret_data") + @mock.patch.object(operator, "K8S_CLIENT", new_callable=util.mock_k8s_client) + @mock.patch.object(operator, "LEASE_BLAZAR_ENABLED", "yes") + async def test_reconcile_lease_blazar_existing_lease( + self, k8s_client, openstack_from_secret_data + ): + # Configure the Kubernetes client + self.k8s_client_config_common(k8s_client) + + # Configure the OpenStack cloud + os_cloud = openstack_from_secret_data.return_value = util.mock_openstack_cloud() + self.os_cloud_config_common(os_cloud) + os_leases = os_cloud.clients["reservation"].resources["leases"] + os_leases.list.return_value = util.as_async_iterable( + [fake_blazar_lease("ACTIVE")] + ) + + lease_data = fake_lease(phase=lease_crd.LeasePhase.PENDING) + await operator.reconcile_lease(lease_data, mock.Mock()) + + # We should not have attempted to create a lease + os_leases.create.assert_not_called() + + # Check that the lease was patched as expected + k8s_client.apis[API_VERSION].resources[ + "leases/status" + ].replace.assert_called_once_with( + "fake-lease", + util.LeaseStatusMatcher( + lease_crd.LeasePhase.ACTIVE, + {"id1": "newid1", "id2": "newid2"}, + {"flavor1": "newflavor1", "flavor2": "newflavor2"}, + ), + namespace="fake-ns", + ) + + def assert_lease_owner_deleted(self, k8s_client): + k8s_client.apis["v1"].resources["ConfigMap"].delete.assert_called_once_with( + "fake-lease-owner", propagation_policy="Foreground", namespace="fake-ns" + ) + + def assert_lease_owner_not_deleted(self, k8s_client): + k8s_client.apis["v1"].resources["ConfigMap"].delete.assert_not_called() + + @mock.patch.object(openstack, "from_secret_data") + @mock.patch.object(operator, "K8S_CLIENT", new_callable=util.mock_k8s_client) + @mock.patch.object(operator, "LEASE_BLAZAR_ENABLED", "no") + async def test_check_lease_no_blazar_no_end_no_start( + self, k8s_client, openstack_from_secret_data + ): + # Configure the Kubernetes client + self.k8s_client_config_common(k8s_client) + + # Configure the OpenStack cloud + os_cloud = openstack_from_secret_data.return_value = util.mock_openstack_cloud() + self.os_cloud_config_common(os_cloud) + + lease_data = fake_lease(start=False, end=False) + await operator.check_lease(lease_data, mock.Mock()) + + k8s_client.apis[API_VERSION].resources[ + "leases/status" + ].replace.assert_called_once_with( + "fake-lease", + util.LeaseStatusMatcher( + lease_crd.LeasePhase.ACTIVE, + {"id1": "id1", "id2": "id2"}, + {"flavor1": "flavor1", "flavor2": "flavor2"}, + ), + namespace="fake-ns", + ) + + self.assert_lease_owner_not_deleted(k8s_client) + + @mock.patch.object(openstack, "from_secret_data") + @mock.patch.object(operator, "K8S_CLIENT", new_callable=util.mock_k8s_client) + @mock.patch.object(operator, "LEASE_BLAZAR_ENABLED", "no") + async def test_check_lease_no_blazar_no_end_start_in_past( + self, k8s_client, openstack_from_secret_data + ): + # Configure the Kubernetes client + self.k8s_client_config_common(k8s_client) + + # Configure the OpenStack cloud + os_cloud = openstack_from_secret_data.return_value = util.mock_openstack_cloud() + self.os_cloud_config_common(os_cloud) + + lease_data = fake_lease(start=True, end=False) + with freezegun.freeze_time("2024-08-21T15:30:00Z"): + await operator.check_lease(lease_data, mock.Mock()) + + k8s_client.apis[API_VERSION].resources[ + "leases/status" + ].replace.assert_called_once_with( + "fake-lease", + util.LeaseStatusMatcher( + lease_crd.LeasePhase.ACTIVE, + {"id1": "id1", "id2": "id2"}, + {"flavor1": "flavor1", "flavor2": "flavor2"}, + ), + namespace="fake-ns", + ) + + self.assert_lease_owner_not_deleted(k8s_client) + + @mock.patch.object(openstack, "from_secret_data") + @mock.patch.object(operator, "K8S_CLIENT", new_callable=util.mock_k8s_client) + @mock.patch.object(operator, "LEASE_BLAZAR_ENABLED", "no") + async def test_check_lease_no_blazar_no_end_start_in_future( + self, k8s_client, openstack_from_secret_data + ): + # Configure the Kubernetes client + self.k8s_client_config_common(k8s_client) + + # Configure the OpenStack cloud + os_cloud = openstack_from_secret_data.return_value = util.mock_openstack_cloud() + self.os_cloud_config_common(os_cloud) + + lease_data = fake_lease(start=True, end=False) + with freezegun.freeze_time("2024-08-21T14:30:00Z"): + await operator.check_lease(lease_data, mock.Mock()) + + k8s_client.apis[API_VERSION].resources[ + "leases/status" + ].replace.assert_called_once_with( + "fake-lease", + util.LeaseStatusMatcher(lease_crd.LeasePhase.PENDING), + namespace="fake-ns", + ) + + self.assert_lease_owner_not_deleted(k8s_client) + + @mock.patch.object(openstack, "from_secret_data") + @mock.patch.object(operator, "K8S_CLIENT", new_callable=util.mock_k8s_client) + @mock.patch.object(operator, "LEASE_BLAZAR_ENABLED", "no") + async def test_check_lease_no_blazar_end_in_future_no_start( + self, k8s_client, openstack_from_secret_data + ): + # Configure the Kubernetes client + self.k8s_client_config_common(k8s_client) + + # Configure the OpenStack cloud + os_cloud = openstack_from_secret_data.return_value = util.mock_openstack_cloud() + self.os_cloud_config_common(os_cloud) + + lease_data = fake_lease(start=False, end=True) + with freezegun.freeze_time("2024-08-21T15:30:00Z"): + await operator.check_lease(lease_data, mock.Mock()) + + k8s_client.apis[API_VERSION].resources[ + "leases/status" + ].replace.assert_called_once_with( + "fake-lease", + util.LeaseStatusMatcher( + lease_crd.LeasePhase.ACTIVE, + {"id1": "id1", "id2": "id2"}, + {"flavor1": "flavor1", "flavor2": "flavor2"}, + ), + namespace="fake-ns", + ) + + self.assert_lease_owner_not_deleted(k8s_client) + + @mock.patch.object(openstack, "from_secret_data") + @mock.patch.object(operator, "K8S_CLIENT", new_callable=util.mock_k8s_client) + @mock.patch.object(operator, "LEASE_BLAZAR_ENABLED", "no") + async def test_check_lease_no_blazar_end_in_future_start_in_past( + self, k8s_client, openstack_from_secret_data + ): + # Configure the Kubernetes client + self.k8s_client_config_common(k8s_client) + + # Configure the OpenStack cloud + os_cloud = openstack_from_secret_data.return_value = util.mock_openstack_cloud() + self.os_cloud_config_common(os_cloud) + + lease_data = fake_lease(start=True, end=True) + with freezegun.freeze_time("2024-08-21T15:30:00Z"): + await operator.check_lease(lease_data, mock.Mock()) + + k8s_client.apis[API_VERSION].resources[ + "leases/status" + ].replace.assert_called_once_with( + "fake-lease", + util.LeaseStatusMatcher( + lease_crd.LeasePhase.ACTIVE, + {"id1": "id1", "id2": "id2"}, + {"flavor1": "flavor1", "flavor2": "flavor2"}, + ), + namespace="fake-ns", + ) + + self.assert_lease_owner_not_deleted(k8s_client) + + @mock.patch.object(openstack, "from_secret_data") + @mock.patch.object(operator, "K8S_CLIENT", new_callable=util.mock_k8s_client) + @mock.patch.object(operator, "LEASE_BLAZAR_ENABLED", "no") + async def test_check_lease_no_blazar_end_and_start_in_future( + self, k8s_client, openstack_from_secret_data + ): + # Configure the Kubernetes client + self.k8s_client_config_common(k8s_client) + + # Configure the OpenStack cloud + os_cloud = openstack_from_secret_data.return_value = util.mock_openstack_cloud() + self.os_cloud_config_common(os_cloud) + + lease_data = fake_lease(start=True, end=True) + with freezegun.freeze_time("2024-08-21T14:30:00Z"): + await operator.check_lease(lease_data, mock.Mock()) + + k8s_client.apis[API_VERSION].resources[ + "leases/status" + ].replace.assert_called_once_with( + "fake-lease", + util.LeaseStatusMatcher(lease_crd.LeasePhase.PENDING), + namespace="fake-ns", + ) + + self.assert_lease_owner_not_deleted(k8s_client) + + @mock.patch.object(openstack, "from_secret_data") + @mock.patch.object(operator, "K8S_CLIENT", new_callable=util.mock_k8s_client) + @mock.patch.object(operator, "LEASE_BLAZAR_ENABLED", "no") + async def test_check_lease_no_blazar_end_in_past_no_start( + self, k8s_client, openstack_from_secret_data + ): + # Configure the Kubernetes client + self.k8s_client_config_common(k8s_client) + + # Configure the OpenStack cloud + os_cloud = openstack_from_secret_data.return_value = util.mock_openstack_cloud() + self.os_cloud_config_common(os_cloud) + + lease_data = fake_lease(start=False, end=True) + with freezegun.freeze_time("2024-08-21T16:30:00Z"): + await operator.check_lease(lease_data, mock.Mock()) + + k8s_client.apis[API_VERSION].resources[ + "leases/status" + ].replace.assert_called_once_with( + "fake-lease", + util.LeaseStatusMatcher( + lease_crd.LeasePhase.ACTIVE, + {"id1": "id1", "id2": "id2"}, + {"flavor1": "flavor1", "flavor2": "flavor2"}, + ), + namespace="fake-ns", + ) + + self.assert_lease_owner_deleted(k8s_client) + + @mock.patch.object(openstack, "from_secret_data") + @mock.patch.object(operator, "K8S_CLIENT", new_callable=util.mock_k8s_client) + @mock.patch.object(operator, "LEASE_BLAZAR_ENABLED", "no") + async def test_check_lease_no_blazar_end_and_start_in_past( + self, k8s_client, openstack_from_secret_data + ): + # Configure the Kubernetes client + self.k8s_client_config_common(k8s_client) + + # Configure the OpenStack cloud + os_cloud = openstack_from_secret_data.return_value = util.mock_openstack_cloud() + self.os_cloud_config_common(os_cloud) + + lease_data = fake_lease(start=True, end=True) + with freezegun.freeze_time("2024-08-21T16:30:00Z"): + await operator.check_lease(lease_data, mock.Mock()) + + k8s_client.apis[API_VERSION].resources[ + "leases/status" + ].replace.assert_called_once_with( + "fake-lease", + util.LeaseStatusMatcher( + lease_crd.LeasePhase.ACTIVE, + {"id1": "id1", "id2": "id2"}, + {"flavor1": "flavor1", "flavor2": "flavor2"}, + ), + namespace="fake-ns", + ) + + self.assert_lease_owner_deleted(k8s_client) + + @mock.patch.object(openstack, "from_secret_data") + @mock.patch.object(operator, "K8S_CLIENT", new_callable=util.mock_k8s_client) + @mock.patch.object(operator, "LEASE_BLAZAR_ENABLED", "yes") + async def test_check_lease_blazar_lease_active_end_in_future( + self, k8s_client, openstack_from_secret_data + ): + # Configure the Kubernetes client + self.k8s_client_config_common(k8s_client) + + # Configure the OpenStack cloud + os_cloud = openstack_from_secret_data.return_value = util.mock_openstack_cloud() + self.os_cloud_config_common(os_cloud) + os_leases = os_cloud.clients["reservation"].resources["leases"] + os_leases.list.return_value = util.as_async_iterable( + [fake_blazar_lease("ACTIVE")] + ) + + lease_data = fake_lease(start=True, end=True) + with freezegun.freeze_time("2024-08-21T15:30:00Z"): + await operator.check_lease(lease_data, mock.Mock()) + + k8s_client.apis[API_VERSION].resources[ + "leases/status" + ].replace.assert_called_once_with( + "fake-lease", + util.LeaseStatusMatcher( + lease_crd.LeasePhase.ACTIVE, + {"id1": "newid1", "id2": "newid2"}, + {"flavor1": "newflavor1", "flavor2": "newflavor2"}, + ), + namespace="fake-ns", + ) + + self.assert_lease_owner_not_deleted(k8s_client) + + @mock.patch.object(openstack, "from_secret_data") + @mock.patch.object(operator, "K8S_CLIENT", new_callable=util.mock_k8s_client) + @mock.patch.object(operator, "LEASE_BLAZAR_ENABLED", "yes") + async def test_check_lease_blazar_lease_active_end_in_grace_period( + self, k8s_client, openstack_from_secret_data + ): + # Configure the Kubernetes client + self.k8s_client_config_common(k8s_client) + + # Configure the OpenStack cloud + os_cloud = openstack_from_secret_data.return_value = util.mock_openstack_cloud() + self.os_cloud_config_common(os_cloud) + os_leases = os_cloud.clients["reservation"].resources["leases"] + os_leases.list.return_value = util.as_async_iterable( + [fake_blazar_lease("ACTIVE")] + ) + + lease_data = fake_lease(start=True, end=True) + with freezegun.freeze_time("2024-08-21T15:55:00Z"): + await operator.check_lease(lease_data, mock.Mock()) + + k8s_client.apis[API_VERSION].resources[ + "leases/status" + ].replace.assert_called_once_with( + "fake-lease", + util.LeaseStatusMatcher( + lease_crd.LeasePhase.ACTIVE, + {"id1": "newid1", "id2": "newid2"}, + {"flavor1": "newflavor1", "flavor2": "newflavor2"}, + ), + namespace="fake-ns", + ) + + self.assert_lease_owner_deleted(k8s_client) + + @mock.patch.object(openstack, "from_secret_data") + @mock.patch.object(operator, "K8S_CLIENT", new_callable=util.mock_k8s_client) + @mock.patch.object(operator, "LEASE_BLAZAR_ENABLED", "yes") + async def test_check_lease_blazar_lease_active_end_in_past( + self, k8s_client, openstack_from_secret_data + ): + # Configure the Kubernetes client + self.k8s_client_config_common(k8s_client) + + # Configure the OpenStack cloud + os_cloud = openstack_from_secret_data.return_value = util.mock_openstack_cloud() + self.os_cloud_config_common(os_cloud) + os_leases = os_cloud.clients["reservation"].resources["leases"] + os_leases.list.return_value = util.as_async_iterable( + [fake_blazar_lease("ACTIVE")] + ) + + lease_data = fake_lease(start=True, end=True) + with freezegun.freeze_time("2024-08-21T16:30:00Z"): + await operator.check_lease(lease_data, mock.Mock()) + + k8s_client.apis[API_VERSION].resources[ + "leases/status" + ].replace.assert_called_once_with( + "fake-lease", + util.LeaseStatusMatcher( + lease_crd.LeasePhase.ACTIVE, + {"id1": "newid1", "id2": "newid2"}, + {"flavor1": "newflavor1", "flavor2": "newflavor2"}, + ), + namespace="fake-ns", + ) + + self.assert_lease_owner_deleted(k8s_client) + + @mock.patch.object(openstack, "from_secret_data") + @mock.patch.object(operator, "K8S_CLIENT", new_callable=util.mock_k8s_client) + @mock.patch.object(operator, "LEASE_BLAZAR_ENABLED", "yes") + async def test_check_lease_blazar_lease_not_active_end_in_future( + self, k8s_client, openstack_from_secret_data + ): + # Configure the Kubernetes client + self.k8s_client_config_common(k8s_client) + + # Configure the OpenStack cloud + os_cloud = openstack_from_secret_data.return_value = util.mock_openstack_cloud() + self.os_cloud_config_common(os_cloud) + os_leases = os_cloud.clients["reservation"].resources["leases"] + os_leases.list.return_value = util.as_async_iterable( + [fake_blazar_lease("STARTING")] + ) + + lease_data = fake_lease(start=True, end=True) + with freezegun.freeze_time("2024-08-21T15:30:00Z"): + await operator.check_lease(lease_data, mock.Mock()) + + k8s_client.apis[API_VERSION].resources[ + "leases/status" + ].replace.assert_called_once_with( + "fake-lease", + util.LeaseStatusMatcher(lease_crd.LeasePhase.STARTING), + namespace="fake-ns", + ) + + self.assert_lease_owner_not_deleted(k8s_client) + + @mock.patch.object(openstack, "from_secret_data") + @mock.patch.object(operator, "K8S_CLIENT", new_callable=util.mock_k8s_client) + @mock.patch.object(operator, "LEASE_BLAZAR_ENABLED", "yes") + async def test_check_lease_blazar_lease_not_active_end_in_grace_period( + self, k8s_client, openstack_from_secret_data + ): + # Configure the Kubernetes client + self.k8s_client_config_common(k8s_client) + + # Configure the OpenStack cloud + os_cloud = openstack_from_secret_data.return_value = util.mock_openstack_cloud() + self.os_cloud_config_common(os_cloud) + os_leases = os_cloud.clients["reservation"].resources["leases"] + os_leases.list.return_value = util.as_async_iterable( + [fake_blazar_lease("STARTING")] + ) + + lease_data = fake_lease(start=True, end=True) + with freezegun.freeze_time("2024-08-21T15:55:00Z"): + await operator.check_lease(lease_data, mock.Mock()) + + k8s_client.apis[API_VERSION].resources[ + "leases/status" + ].replace.assert_called_once_with( + "fake-lease", + util.LeaseStatusMatcher(lease_crd.LeasePhase.STARTING), + namespace="fake-ns", + ) + + self.assert_lease_owner_deleted(k8s_client) + + @mock.patch.object(openstack, "from_secret_data") + @mock.patch.object(operator, "K8S_CLIENT", new_callable=util.mock_k8s_client) + @mock.patch.object(operator, "LEASE_BLAZAR_ENABLED", "yes") + async def test_check_lease_blazar_lease_not_active_end_in_past( + self, k8s_client, openstack_from_secret_data + ): + # Configure the Kubernetes client + self.k8s_client_config_common(k8s_client) + + # Configure the OpenStack cloud + os_cloud = openstack_from_secret_data.return_value = util.mock_openstack_cloud() + self.os_cloud_config_common(os_cloud) + os_leases = os_cloud.clients["reservation"].resources["leases"] + os_leases.list.return_value = util.as_async_iterable( + [fake_blazar_lease("STARTING")] + ) + + lease_data = fake_lease(start=True, end=True) + with freezegun.freeze_time("2024-08-21T16:30:00Z"): + await operator.check_lease(lease_data, mock.Mock()) + + k8s_client.apis[API_VERSION].resources[ + "leases/status" + ].replace.assert_called_once_with( + "fake-lease", + util.LeaseStatusMatcher(lease_crd.LeasePhase.STARTING), + namespace="fake-ns", + ) + + self.assert_lease_owner_deleted(k8s_client) + + @mock.patch.object(openstack, "from_secret_data") + @mock.patch.object(operator, "K8S_CLIENT", new_callable=util.mock_k8s_client) + @mock.patch.object(operator, "LEASE_BLAZAR_ENABLED", "yes") + async def test_check_lease_blazar_no_lease_end_in_future( + self, k8s_client, openstack_from_secret_data + ): + # Configure the Kubernetes client + self.k8s_client_config_common(k8s_client) + + # Configure the OpenStack cloud + os_cloud = openstack_from_secret_data.return_value = util.mock_openstack_cloud() + self.os_cloud_config_common(os_cloud) + + lease_data = fake_lease(start=True, end=True) + with freezegun.freeze_time("2024-08-21T15:30:00Z"): + await operator.check_lease(lease_data, mock.Mock()) + + k8s_client.apis[API_VERSION].resources[ + "leases/status" + ].replace.assert_not_called() + + self.assert_lease_owner_not_deleted(k8s_client) + + @mock.patch.object(openstack, "from_secret_data") + @mock.patch.object(operator, "K8S_CLIENT", new_callable=util.mock_k8s_client) + @mock.patch.object(operator, "LEASE_BLAZAR_ENABLED", "yes") + async def test_check_lease_blazar_no_lease_end_in_grace_period( + self, k8s_client, openstack_from_secret_data + ): + # Configure the Kubernetes client + self.k8s_client_config_common(k8s_client) + + # Configure the OpenStack cloud + os_cloud = openstack_from_secret_data.return_value = util.mock_openstack_cloud() + self.os_cloud_config_common(os_cloud) + + lease_data = fake_lease(start=True, end=True) + with freezegun.freeze_time("2024-08-21T15:55:00Z"): + await operator.check_lease(lease_data, mock.Mock()) + + k8s_client.apis[API_VERSION].resources[ + "leases/status" + ].replace.assert_not_called() + + self.assert_lease_owner_deleted(k8s_client) + + @mock.patch.object(openstack, "from_secret_data") + @mock.patch.object(operator, "K8S_CLIENT", new_callable=util.mock_k8s_client) + @mock.patch.object(operator, "LEASE_BLAZAR_ENABLED", "yes") + async def test_check_lease_blazar_no_lease_end_in_past( + self, k8s_client, openstack_from_secret_data + ): + # Configure the Kubernetes client + self.k8s_client_config_common(k8s_client) + + # Configure the OpenStack cloud + os_cloud = openstack_from_secret_data.return_value = util.mock_openstack_cloud() + self.os_cloud_config_common(os_cloud) + + lease_data = fake_lease(start=True, end=True) + with freezegun.freeze_time("2024-08-21T16:30:00Z"): + await operator.check_lease(lease_data, mock.Mock()) + + k8s_client.apis[API_VERSION].resources[ + "leases/status" + ].replace.assert_not_called() + + self.assert_lease_owner_deleted(k8s_client) + + @mock.patch.object(operator, "K8S_CLIENT", new_callable=util.mock_k8s_client) + @mock.patch.object(operator, "LEASE_BLAZAR_ENABLED", "no") + async def test_delete_lease_finalizer_present(self, k8s_client): + lease = fake_lease() + lease["metadata"]["finalizers"].append("anotherfinalizer") + + with self.assertRaises(kopf.TemporaryError): + await operator.delete_lease(lease, mock.Mock()) + + # Check that the lease status was not updated + k8s_client.apis[API_VERSION].resources[ + "leases/status" + ].replace.assert_not_called() + + @mock.patch.object(operator, "K8S_CLIENT", new_callable=util.mock_k8s_client) + @mock.patch.object(operator, "LEASE_BLAZAR_ENABLED", "no") + async def test_delete_lease_secret_already_deleted(self, k8s_client): + # Configure the Kubernetes client + self.k8s_client_config_common(k8s_client) + k8s_client.apis["v1"].resources["secrets"].fetch.side_effect = ( + util.k8s_api_error(404) + ) + + lease = fake_lease() + await operator.delete_lease(lease, mock.Mock()) + + # Check that the phase was updated to deleting + k8s_client.apis[API_VERSION].resources[ + "leases/status" + ].replace.assert_called_once_with( + "fake-lease", + util.LeaseStatusMatcher(lease_crd.LeasePhase.DELETING), + namespace="fake-ns", + ) + + @mock.patch.object(openstack, "from_secret_data") + @mock.patch.object(operator, "K8S_CLIENT", new_callable=util.mock_k8s_client) + @mock.patch.object(operator, "LEASE_BLAZAR_ENABLED", "no") + async def test_delete_lease_appcred_already_deleted( + self, k8s_client, openstack_from_secret_data + ): + # Configure the Kubernetes client + self.k8s_client_config_common(k8s_client) + + # Configure the OpenStack cloud + os_cloud = openstack_from_secret_data.return_value = util.mock_openstack_cloud() + self.os_cloud_config_common(os_cloud) + type(os_cloud).is_authenticated = mock.PropertyMock(return_value=False) + + lease = fake_lease() + await operator.delete_lease(lease, mock.Mock()) + + # Assert that no openstack clients were used + os_cloud.api_client.assert_not_called() + + # Assert that the credential secret was still deleted + k8s_client.apis["v1"].resources["secrets"].delete.assert_called_once_with( + "fake-credential", namespace="fake-ns" + ) + + @mock.patch.object(openstack, "from_secret_data") + @mock.patch.object(operator, "K8S_CLIENT", new_callable=util.mock_k8s_client) + @mock.patch.object(operator, "LEASE_BLAZAR_ENABLED", "yes") + async def test_delete_lease_no_end(self, k8s_client, openstack_from_secret_data): + # Configure the Kubernetes client + self.k8s_client_config_common(k8s_client) + + # Configure the OpenStack cloud + os_cloud = openstack_from_secret_data.return_value = util.mock_openstack_cloud() + self.os_cloud_config_common(os_cloud) + + lease = fake_lease(end=False) + await operator.delete_lease(lease, mock.Mock()) + + # Assert that there was no attempt to find a lease + os_cloud.clients["reservation"].resources["leases"].list.assert_not_called() + + # Assert that the app cred was deleted + os_cloud.clients["identity"].resources[ + "application_credentials" + ].delete.assert_called_once_with("appcredid") + + # Assert that the credential secret was deleted + k8s_client.apis["v1"].resources["secrets"].delete.assert_called_once_with( + "fake-credential", namespace="fake-ns" + ) + + @mock.patch.object(openstack, "from_secret_data") + @mock.patch.object(operator, "K8S_CLIENT", new_callable=util.mock_k8s_client) + @mock.patch.object(operator, "LEASE_BLAZAR_ENABLED", "no") + async def test_delete_lease_no_blazar(self, k8s_client, openstack_from_secret_data): + # Configure the Kubernetes client + self.k8s_client_config_common(k8s_client) + + # Configure the OpenStack cloud + os_cloud = openstack_from_secret_data.return_value = util.mock_openstack_cloud() + self.os_cloud_config_common(os_cloud) + + lease = fake_lease() + await operator.delete_lease(lease, mock.Mock()) + + # Assert that there was no attempt to find a lease + os_cloud.clients["reservation"].resources["leases"].list.assert_not_called() + + # Assert that the app cred was deleted + os_cloud.clients["identity"].resources[ + "application_credentials" + ].delete.assert_called_once_with("appcredid") + + # Assert that the credential secret was deleted + k8s_client.apis["v1"].resources["secrets"].delete.assert_called_once_with( + "fake-credential", namespace="fake-ns" + ) + + @mock.patch.object(openstack, "from_secret_data") + @mock.patch.object(operator, "K8S_CLIENT", new_callable=util.mock_k8s_client) + @mock.patch.object(operator, "LEASE_BLAZAR_ENABLED", "yes") + async def test_delete_lease_blazar_no_lease( + self, k8s_client, openstack_from_secret_data + ): + # Configure the Kubernetes client + self.k8s_client_config_common(k8s_client) + + # Configure the OpenStack cloud + os_cloud = openstack_from_secret_data.return_value = util.mock_openstack_cloud() + self.os_cloud_config_common(os_cloud) + + lease = fake_lease() + await operator.delete_lease(lease, mock.Mock()) + + # Assert that there was an attempt to find the lease but no delete + os_leases = os_cloud.clients["reservation"].resources["leases"] + os_leases.list.assert_called_once_with() + os_leases.delete.assert_not_called() + + # Assert that the app cred was deleted + os_cloud.clients["identity"].resources[ + "application_credentials" + ].delete.assert_called_once_with("appcredid") + + # Assert that the credential secret was deleted + k8s_client.apis["v1"].resources["secrets"].delete.assert_called_once_with( + "fake-credential", namespace="fake-ns" + ) + + @mock.patch.object(openstack, "from_secret_data") + @mock.patch.object(operator, "K8S_CLIENT", new_callable=util.mock_k8s_client) + @mock.patch.object(operator, "LEASE_BLAZAR_ENABLED", "yes") + async def test_delete_lease_blazar_lease_exists( + self, k8s_client, openstack_from_secret_data + ): + # Configure the Kubernetes client + self.k8s_client_config_common(k8s_client) + + # Configure the OpenStack cloud + os_cloud = openstack_from_secret_data.return_value = util.mock_openstack_cloud() + self.os_cloud_config_common(os_cloud) + os_leases = os_cloud.clients["reservation"].resources["leases"] + os_leases.list.return_value = util.as_async_iterable([fake_blazar_lease()]) + + lease = fake_lease() + # This should raise a temporary error after the lease has been deleted to + # force a requeue + with self.assertRaises(kopf.TemporaryError): + await operator.delete_lease(lease, mock.Mock()) + + # Assert that the lease was deleted + os_leases.delete.assert_called_once_with("blazarleaseid") + + # Assert that the appcred was retained + os_cloud.clients["identity"].resources[ + "application_credentials" + ].delete.assert_not_called() + k8s_client.apis["v1"].resources["secrets"].delete.assert_not_called() + + @mock.patch.object(openstack, "from_secret_data") + @mock.patch.object(operator, "K8S_CLIENT", new_callable=util.mock_k8s_client) + @mock.patch.object(operator, "LEASE_BLAZAR_ENABLED", "no") + async def test_delete_lease_appcred_not_dangerous( + self, k8s_client, openstack_from_secret_data + ): + # Configure the Kubernetes client + self.k8s_client_config_common(k8s_client) + + # Configure the OpenStack cloud + os_cloud = openstack_from_secret_data.return_value = util.mock_openstack_cloud() + self.os_cloud_config_common(os_cloud) + os_appcreds = os_cloud.clients["identity"].resources["application_credentials"] + os_appcreds.delete.side_effect = util.httpx_status_error(403) + + logger = mock.Mock() + + lease = fake_lease() + await operator.delete_lease(lease, logger) + + # Assert that an attempt was made to delete the appcred + os_appcreds.delete.assert_called_once_with("appcredid") + + # Assert that a warning was logged that the appcred couldn't be deleted + logger.warn.assert_called_with( + "unable to delete application credential for cluster" + ) + + # Assert that the credential secret was still deleted + k8s_client.apis["v1"].resources["secrets"].delete.assert_called_once_with( + "fake-credential", namespace="fake-ns" + ) diff --git a/azimuth_schedule_operator/tests/test_operator.py b/azimuth_schedule_operator/tests/test_schedule.py similarity index 99% rename from azimuth_schedule_operator/tests/test_operator.py rename to azimuth_schedule_operator/tests/test_schedule.py index 6bcb9ff..469ddd4 100644 --- a/azimuth_schedule_operator/tests/test_operator.py +++ b/azimuth_schedule_operator/tests/test_schedule.py @@ -6,7 +6,7 @@ from azimuth_schedule_operator import operator -class TestOperator(unittest.IsolatedAsyncioTestCase): +class TestSchedule(unittest.IsolatedAsyncioTestCase): def _generate_fake_crd(self, name): plural_name, api_group = name.split(".", maxsplit=1) return { diff --git a/azimuth_schedule_operator/tests/util.py b/azimuth_schedule_operator/tests/util.py new file mode 100644 index 0000000..2e4ec77 --- /dev/null +++ b/azimuth_schedule_operator/tests/util.py @@ -0,0 +1,143 @@ +import collections +from unittest import mock + +import easykube +import httpx + + +class LeaseStatusMatcher: + def __init__(self, phase, size_map=None, size_name_map=None, error_message=None): + self.phase = phase + self.size_map = size_map + self.size_name_map = size_name_map + self.error_message = error_message + + def __eq__(self, data): + if not isinstance(data, dict): + return False + if "status" not in data: + return False + status = data["status"] + if "phase" not in status or self.phase != status["phase"]: + return False + if self.size_map is not None: + if "sizeMap" not in status or self.size_map != status["sizeMap"]: + return False + if self.size_name_map is not None: + if ( + "sizeNameMap" not in status + or self.size_name_map != status["sizeNameMap"] + ): + return False + if self.error_message is not None: + if ( + "errorMessage" not in status + or self.error_message != status["errorMessage"] + ): + return False + return True + + def __repr__(self): + props = [f"phase={repr(self.phase)}"] + if self.size_map is not None: + props.append(f"size_map={repr(self.size_map)}") + if self.size_name_map is not None: + props.append(f"size_name_map={repr(self.size_name_map)}") + return f"LeaseStatusMatcher<{', '.join(props)}>" + + +async def empty_async_iterator(): + if False: + yield 1 + + +async def as_async_iterable(sync_iterable): + for elem in sync_iterable: + yield elem + + +def side_effect_with_return_value(mock_, default_side_effect): + # This function allows the default side effect to be overridden by setting a + # return value (setting side_effect continues to work as normal) + def side_effect(*args, **kwargs): + ret = mock_._mock_return_value + if mock_._mock_delegate is not None: + ret = mock_._mock_delegate.return_value + if ret is not mock.DEFAULT: + return ret + else: + return default_side_effect(*args, **kwargs) + + mock_.side_effect = side_effect + + +def mock_k8s_client(): + client = mock.AsyncMock() + client.__aenter__.return_value = client + + apis = client.apis = collections.defaultdict(mock_k8s_api) + client.api = api_method = mock.Mock() + side_effect_with_return_value(api_method, lambda api, *_, **__: apis[api]) + + return client + + +def mock_k8s_api(): + api = mock.AsyncMock() + resources = api.resources = collections.defaultdict(mock_k8s_resource) + side_effect_with_return_value(api.resource, lambda name, *_, **__: resources[name]) + return api + + +def mock_k8s_resource(): + resource = mock.AsyncMock() + + # list is a synchronous method that returns an async iterator + resource.list = list_method = mock.Mock() + side_effect_with_return_value(list_method, empty_async_iterator) + + return resource + + +def k8s_api_error(status_code, message=None, text=None, json=None): + return easykube.ApiError(httpx_status_error(status_code, message, text, json)) + + +def httpx_status_error(status_code, message=None, text=None, json=None): + request = httpx.Request("GET", "fakeurl") + response = httpx.Response(status_code, text=text, json=json) + message = message or f"error with status {status_code}" + return httpx.HTTPStatusError(message, request=request, response=response) + + +def mock_openstack_cloud(): + cloud = mock.AsyncMock() + cloud.__aenter__.return_value = cloud + + clients = cloud.clients = collections.defaultdict(mock_openstack_client) + cloud.api_client = api_client_method = mock.Mock() + side_effect_with_return_value(api_client_method, lambda api, *_, **__: clients[api]) + + return cloud + + +def mock_openstack_client(): + client = mock.AsyncMock() + + resources = client.resources = collections.defaultdict(mock_openstack_resource) + client.resource = resource_method = mock.Mock() + side_effect_with_return_value( + resource_method, lambda name, *_, **__: resources[name] + ) + + return client + + +def mock_openstack_resource(): + resource = mock.AsyncMock() + + # list is a synchronous method that returns an async iterator + resource.list = list_method = mock.Mock() + side_effect_with_return_value(list_method, empty_async_iterator) + + return resource diff --git a/charts/operator/templates/clusterrole-operator.yaml b/charts/operator/templates/clusterrole-operator.yaml index 5b87900..1a20769 100644 --- a/charts/operator/templates/clusterrole-operator.yaml +++ b/charts/operator/templates/clusterrole-operator.yaml @@ -18,6 +18,10 @@ rules: - apiGroups: ["scheduling.azimuth.stackhpc.com"] resources: ["*"] verbs: ["*"] + # Allow secrets containing credentials to be read and deleted + - apiGroups: [""] + resources: ["secrets"] + verbs: ["get", "delete"] # Allow the managed resources to be deleted by the operator {{- range .Values.managedResources }} - apiGroups: diff --git a/charts/operator/templates/deployment.yaml b/charts/operator/templates/deployment.yaml index 4846c9a..56156f8 100644 --- a/charts/operator/templates/deployment.yaml +++ b/charts/operator/templates/deployment.yaml @@ -24,6 +24,15 @@ spec: securityContext: {{ toYaml .Values.securityContext | nindent 12 }} image: {{ printf "%s:%s" .Values.image.repository (default .Chart.AppVersion .Values.image.tag) }} imagePullPolicy: {{ .Values.image.pullPolicy }} + env: + - name: AZIMUTH_SCHEDULE_CHECK_INTERVAL_SECONDS + value: {{ quote .Values.config.checkInterval }} + - name: AZIMUTH_LEASE_CHECK_INTERVAL_SECONDS + value: {{ quote .Values.config.checkInterval }} + - name: AZIMUTH_LEASE_DEFAULT_GRACE_PERIOD_SECONDS + value: {{ quote .Values.config.defaultGracePeriod }} + - name: AZIMUTH_LEASE_BLAZAR_ENABLED + value: {{ quote .Values.config.blazarEnabled }} ports: - name: metrics containerPort: 8080 diff --git a/charts/operator/templates/prometheusrule.yaml b/charts/operator/templates/prometheusrule.yaml index 9fb22b5..c67f7e7 100644 --- a/charts/operator/templates/prometheusrule.yaml +++ b/charts/operator/templates/prometheusrule.yaml @@ -20,4 +20,17 @@ spec: summary: Azimuth schedule has not found its ref for longer than 15 mins. labels: severity: warning + + - alert: AzimuthLeaseNotPendingOrActive + expr: >- + sum(azimuth_lease_phase{phase!~"Pending|Active"}) by(lease_namespace, lease_name) > 0 + for: 1h + annotations: + description: >- + Azimuth lease + {{ "{{" }} $labels.lease_namespace {{ "}}" }}/{{ "{{" }} $labels.lease_name {{ "}}" }} + has been in a state other than Pending or Active for longer than one hour. + summary: Azimuth lease has been in an unstable state for more than one hour. + labels: + severity: warning {{- end }} diff --git a/charts/operator/values.yaml b/charts/operator/values.yaml index 4b4d41d..8baef1b 100644 --- a/charts/operator/values.yaml +++ b/charts/operator/values.yaml @@ -1,3 +1,13 @@ +# Configuration for the operator +config: + # Indicates whether Blazar leases should be used + # Valid values are yes, no or auto, AS STRINGS + blazarEnabled: "auto" + # The check interval for the operator timers + checkInterval: 60 + # The default grace period for leases + defaultGracePeriod: 600 + # The operator image to use image: repository: ghcr.io/azimuth-cloud/azimuth-schedule-operator diff --git a/test-requirements.txt b/test-requirements.txt index 85bfbf4..6e3c73a 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -2,6 +2,7 @@ hacking>=6.1.0 # Apache-2.0 black==24.4.2 coverage>=4.0,!=4.4 # Apache-2.0 +freezegun # Apache-2.0 python-subunit>=0.0.18 # Apache-2.0/BSD oslotest>=1.10.0 # Apache-2.0 stestr>=1.0.0 # Apache-2.0 diff --git a/tools/functional_test.sh b/tools/functional_test.sh index 83c13a2..e5e8b9c 100755 --- a/tools/functional_test.sh +++ b/tools/functional_test.sh @@ -1,6 +1,6 @@ #!/bin/bash -set -ex +set -exo pipefail SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) @@ -8,26 +8,200 @@ SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) # Make sure to use the images that we just built helm upgrade azimuth-schedule-operator ./charts/operator \ --dependency-update \ - --namespace azimuth-schedule-operator \ --create-namespace \ --install \ --wait \ --timeout 10m \ - --set-string image.tag=${GITHUB_SHA::7} \ + --set-string image.tag=${TAG:-"${GITHUB_SHA::7}"} \ + --set-string config.blazarEnabled=no \ + --set config.checkInterval=2 \ + --set config.defaultGracePeriod=0 \ --set-json 'managedResources=[{"apiGroup": "", "resources": ["configmaps"]}]' until [ `kubectl get crds | grep schedules.scheduling.azimuth.stackhpc.com | wc -l` -eq 1 ]; do echo "wait for crds"; sleep 5; done kubectl get crds -export AFTER=$(date --date="-1 hour" +"%Y-%m-%dT%H:%M:%SZ") -envsubst < $SCRIPT_DIR/test_schedule.yaml | kubectl apply -f - +get_date() ( + set +x -e + TZ=UTC date --date="$1" +"%Y-%m-%dT%H:%M:%SZ" +) + +##### +# Test the deprecated schedule CRD +##### +export AFTER="$(get_date "-1 hour")" +envsubst < $SCRIPT_DIR/test_schedule.yaml | kubectl apply -f - kubectl wait --for=jsonpath='{.status.refExists}'=true schedule caas-mycluster # ensure updatedAt is written out kubectl get schedule caas-mycluster -o yaml | grep "updatedAt" kubectl wait --for=jsonpath='{.status.refDeleteTriggered}'=true schedule caas-mycluster kubectl get schedule caas-mycluster -o yaml -# for debugging get the logs from the operator -kubectl logs -n azimuth-schedule-operator deployment/azimuth-schedule-operator + +JOB_ID="${GITHUB_JOB_ID:-test}" + + +create_credential_secret() ( + set +x -e + tmpfile="$(mktemp)" + openstack application credential create --unrestricted -f json "az-schedule-$1-$JOB_ID" > $tmpfile + kubectl apply -f - 1>&2 </dev/null 2>&1; then + echo "Application credential $2 still exists" 1>&2 + return 1 + fi + if kubectl get secret $1 >/dev/null 2>&1; then + echo "Kubernetes secret $1 still exists" 1>&2 + return 1 + fi + echo "Credential $1 deleted successfully" +) + + +create_configmap() ( + set +x -e + kubectl create configmap $1 \ + --from-literal=key1=config1 \ + --from-literal=key2=config2 \ + --output go-template='{{.metadata.uid}}' +) + + +verify_configmap_deleted() ( + set +x -e + if kubectl get configmap $1 >/dev/null 2>&1; then + echo "ConfigMap $1 still exists" 1>&2 + return 1 + fi + echo "ConfigMap $1 deleted successfully" +) + + +create_lease() ( + set +x -eo pipefail + LEASE_NAME="$1" \ + OWNER_UID="$2" \ + END_TIME="$3" \ + START_TIME="$4" \ + gomplate < "$SCRIPT_DIR/lease.yaml" | \ + kubectl apply -f - +) + + +check_lease_phase() ( + set +x -e + phase="$(kubectl get lease.scheduling $1 -o go-template='{{.status.phase}}')" + if [ "$phase" == "$2" ]; then + echo "Lease $1 has phase $phase, as expected" + return + else + echo "Lease $1 has phase $phase, not $2" 1>&2 + return 1 + fi +) + + +delete_lease() ( + set +x -e + kubectl delete lease.scheduling $1 +) + + +verify_lease_deleted() ( + set +x -e + if kubectl get lease.scheduling $1 >/dev/null 2>&1; then + echo "Lease $1 still exists" 1>&2 + return 1 + fi + echo "Lease $1 deleted successfully" +) + + +cleanup() { + # When we exit, delete all the leases and configmaps + set +xe + echo "Cleaning up resources..." + kubectl delete lease.scheduling --all + # for debugging get the logs from the operator + kubectl logs deployment/azimuth-schedule-operator +} +trap cleanup EXIT + + +##### +# Test the lease CRD with no start or end time +##### +appcred_id="$(create_credential_secret lease-no-end)" +create_lease lease-no-end +# Wait a few seconds then check that the lease has moved to active +sleep 5 +check_lease_phase lease-no-end Active +delete_lease lease-no-end +# Verify that the application credential has been deleted +verify_credential_deleted lease-no-end $appcred_id + +##### +# Test the lease CRD with an end time but no start time +##### +create_credential_secret lease-end-no-start +# Create the configmap that we will delete +owner_uid="$(create_configmap lease-end-no-start)" +# The end time will be one minute in the future +create_lease lease-end-no-start "$owner_uid" "$(get_date "+1 minute")" +# Wait for a few seconds, then check that the lease is active +sleep 5 +check_lease_phase lease-end-no-start Active +# Wait for another 60 seconds, then verify that the configmap, lease and credential are gone +sleep 60 +verify_configmap_deleted lease-end-no-start +verify_lease_deleted lease-end-no-start +verify_credential_deleted lease-end-no-start + +##### +# Test the lease CRD with a start time and end time +##### +create_credential_secret lease-start-end +# Create the configmap that we will delete +owner_uid="$(create_configmap lease-start-end)" +# The start time will be 30s in the future and the end time one minute +create_lease lease-start-end "$owner_uid" "$(get_date "+1 minute")" "$(get_date "+30 seconds")" +# Wait for a few seconds and check that the lease is still pending +sleep 15 +check_lease_phase lease-start-end Pending +# Wait for another 30s and check that the lease is active +sleep 30 +check_lease_phase lease-start-end Active +# Wait for another 20 seconds, the verify that the configmap, lease and credential are gone +sleep 20 +verify_configmap_deleted lease-start-end +verify_lease_deleted lease-start-end +verify_credential_deleted lease-start-end + +##### +# TODO(mkjpryor) Test the lease CRD with Blazar +##### diff --git a/tools/lease.yaml b/tools/lease.yaml new file mode 100644 index 0000000..8d1548c --- /dev/null +++ b/tools/lease.yaml @@ -0,0 +1,23 @@ +apiVersion: scheduling.azimuth.stackhpc.com/v1alpha1 +kind: Lease +metadata: + name: {{ .Env.LEASE_NAME }} + {{- if getenv "OWNER_UID" }} + ownerReferences: + - apiVersion: v1 + kind: ConfigMap + name: {{ .Env.LEASE_NAME }} + uid: {{ .Env.OWNER_UID }} + {{- end }} +spec: + cloudCredentialsSecretName: {{ .Env.LEASE_NAME }} + {{- if getenv "START_TIME" }} + startsAt: {{ .Env.START_TIME }} + {{- end }} + {{- if getenv "END_TIME" }} + endsAt: {{ .Env.END_TIME }} + {{- end }} + resources: + machines: + - sizeId: d2 # This flavor is valid for devstack + count: 2 diff --git a/tox.ini b/tox.ini index 277a501..453498e 100644 --- a/tox.ini +++ b/tox.ini @@ -4,7 +4,7 @@ envlist = py3,black,pep8 skipsdist = True [testenv] -basepython = python3.10 +basepython = python3 usedevelop = True setenv = PYTHONWARNINGS=default::DeprecationWarning @@ -67,7 +67,7 @@ commands = # E123, E125 skipped as they are invalid PEP-8. show-source = True # TODO add headers and remove H102 -ignore = E123,E125,H102,W503 +ignore = E123,E125,H102,H301,W503 builtins = _ exclude=.venv,.git,.tox,dist,doc,*lib/python*,*egg,build # match black