diff --git a/azimuth_capi/config.py b/azimuth_capi/config.py index 93ac89b..46f5b57 100644 --- a/azimuth_capi/config.py +++ b/azimuth_capi/config.py @@ -52,7 +52,7 @@ class CAPIHelmConfig(Section): #: The name of the CAPI Helm chart to use to deploy clusters chart_name: constr(min_length = 1) = "openstack-cluster" #: The version of the CAPI Helm chart to use to deploy clusters - chart_version: SemVerVersion = "0.1.2-dev.0.main.51" + chart_version: SemVerVersion = "0.1.2-dev.0.main.55" #: The default values to use for all clusters #: Values defined in templates take precedence default_values: t.Dict[str, t.Any] = Field(default_factory = dict) @@ -199,6 +199,9 @@ class Config: #: The number of seconds to wait between timer executions timer_interval: conint(gt = 0) = 60 + #: The length of the random suffix for clusters + suffix_length: conint(gt = 0) = 5 + #: The field manager name to use for server-side apply easykube_field_manager: constr(min_length = 1) = "azimuth-capi-operator" diff --git a/azimuth_capi/models/v1alpha1/cluster.py b/azimuth_capi/models/v1alpha1/cluster.py index 0435a17..b31f3f1 100644 --- a/azimuth_capi/models/v1alpha1/cluster.py +++ b/azimuth_capi/models/v1alpha1/cluster.py @@ -333,6 +333,10 @@ class Config: None, description = "The name of the secret containing the kubeconfig file, if known." ) + helm_release_name: t.Optional[str] = Field( + None, + description = "The name of the Helm release for the cluster." + ) phase: ClusterPhase = Field( ClusterPhase.UNKNOWN.value, description = "The overall phase of the cluster." diff --git a/azimuth_capi/operator.py b/azimuth_capi/operator.py index d9d7828..5fa8acf 100644 --- a/azimuth_capi/operator.py +++ b/azimuth_capi/operator.py @@ -4,7 +4,9 @@ import json import logging import pathlib +import secrets import ssl +import string import sys import kopf @@ -255,6 +257,20 @@ async def on_cluster_create(instance, name, namespace, patch, **kwargs): # Then fetch the template ekresource = await ekresource_for_model(api.ClusterTemplate) template = api.ClusterTemplate.parse_obj(await ekresource.fetch(instance.spec.template_name)) + # Get the name for the Helm release + if not instance.status.helm_release_name: + try: + # Check if a Helm release already exists with the cluster name + _ = await helm_client.get_current_revision(name, namespace = namespace) + except helm_errors.ReleaseNotFoundError: + # This means that we have an opportunity to use a random suffix + chars = string.ascii_lowercase + string.digits + suffix = "".join(secrets.choice(chars) for _ in range(settings.suffix_length)) + instance.status.helm_release_name = f"{name}-{suffix}" + else: + # This means that the cluster was created prior to the random suffixes + instance.status.helm_release_name = name + await save_cluster_status(instance) # Generate the Helm values for the release helm_values = mergeconcat( settings.capi_helm.default_values, @@ -268,7 +284,7 @@ async def on_cluster_create(instance, name, namespace, patch, **kwargs): ) # Use Helm to install or upgrade the release _ = await helm_client.install_or_upgrade_release( - name, + instance.status.helm_release_name, await helm_client.get_chart( settings.capi_helm.chart_name, repo = settings.capi_helm.chart_repository, @@ -281,30 +297,32 @@ async def on_cluster_create(instance, name, namespace, patch, **kwargs): ) # Ensure that a Zenith operator instance exists for the cluster if settings.zenith.enabled: - operator_resources = await zenith_operator_resources(name, namespace, secret) + operator_resources = await zenith_operator_resources(instance, secret) for resource in operator_resources: kopf.adopt(resource, instance.dict(by_alias = True)) await ekclient.apply_object(resource, force = True) - # Patch the labels to include the cluster template - # This is used by the admission webhook to search for clusters using a template in - # order to prevent deletion of cluster templates that are in use - labels = patch.setdefault("metadata", {}).setdefault("labels", {}) - labels[f"{settings.api_group}/cluster-template"] = instance.spec.template_name + # Patch the labels to include the cluster template and the release name + # These are used to search for the cluster later + patch.setdefault("metadata", {}).setdefault("labels", {}).update({ + f"{settings.api_group}/cluster-template": instance.spec.template_name, + f"{settings.api_group}/release-name": instance.status.helm_release_name, + }) @model_handler(api.Cluster, kopf.on.delete) -async def on_cluster_delete(name, namespace, **kwargs): +async def on_cluster_delete(instance, name, namespace, **kwargs): """ Executes whenever a cluster is deleted. """ # Delete the corresponding Helm release + release_name = instance.status.helm_release_name or name try: - await helm_client.uninstall_release(name, namespace = namespace) + await helm_client.uninstall_release(release_name, namespace = namespace) except helm_errors.ReleaseNotFoundError: pass # Wait until the associated CAPI cluster no longer exists ekresource = await ekclient.api(CLUSTER_API_VERSION).resource("clusters") - cluster, stream = await ekresource.watch_one(name, namespace = namespace) + cluster, stream = await ekresource.watch_one(release_name, namespace = namespace) if cluster: async with stream as events: async for event in events: @@ -313,7 +331,7 @@ async def on_cluster_delete(name, namespace, **kwargs): @model_handler(api.Cluster, kopf.on.resume) -async def on_cluster_resume(instance, name, namespace, **kwargs): +async def on_cluster_resume(instance, name, namespace, patch, **kwargs): """ Executes for each cluster when the operator is resumed. """ @@ -322,7 +340,8 @@ async def on_cluster_resume(instance, name, namespace, **kwargs): # However if CAPI objects have been deleted while the operator was down, we will # not receive delete events for those # So on resume we remove any items in the status that no longer exist - labels = { "capi.stackhpc.com/cluster": name } + release_name = instance.status.helm_release_name or name + labels = { "capi.stackhpc.com/cluster": release_name } # Get easykube resources for the Cluster API types ekcapi = ekclient.api(CLUSTER_API_VERSION) ekclusters = await ekcapi.resource("clusters") @@ -360,16 +379,10 @@ async def on_cluster_resume(instance, name, namespace, **kwargs): instance, [ addon - async for addon in ekhelmreleases.list( - namespace = namespace, - labels = { "capi.stackhpc.com/cluster": name } - ) + async for addon in ekhelmreleases.list(labels = labels, namespace = namespace) ] + [ addon - async for addon in ekmanifests.list( - namespace = namespace, - labels = { "capi.stackhpc.com/cluster": name } - ) + async for addon in ekmanifests.list(labels = labels, namespace = namespace) ] ) await save_cluster_status(instance) @@ -390,24 +403,37 @@ async def wrapper(**inner): # Retry the fetch and updating of the state until it succeeds without conflict # kopf retry logic does not apply to events while True: - try: - cluster = api.Cluster.parse_obj( - await ekclusters.fetch( - inner["labels"][cluster_label], - namespace = inner["namespace"] + # Try to find the cluster with the name in the cluster label + # We use the release name label first, falling back to by name + # This is because all of these objects are created by the Helm release + cluster_name = inner["labels"][cluster_label] + cluster_namespace = inner["namespace"] + cluster = await ekclusters.first( + labels = { f"{settings.api_group}/release-name": cluster_name }, + namespace = cluster_namespace + ) + if not cluster: + try: + cluster = await ekclusters.fetch( + cluster_name, + namespace = cluster_namespace ) - ) + except ApiError as exc: + if exc.status_code == 404: + # We couldn't find the cluster - let's give up for now + break + else: + raise + cluster = api.Cluster.parse_obj(cluster) + try: await func(cluster = cluster, **inner) await save_cluster_status(cluster) except ApiError as exc: - # On a 404, don't bother trying again as the cluster is gone - if exc.status_code == 404: - break - # On a conflict response, go round again - elif exc.status_code == 409: + if exc.status_code == 409: + # On a conflict response, go round again continue - # Any other error should be bubbled up else: + # Any other error should be bubbled up raise else: # On success, we can break the loop @@ -614,8 +640,7 @@ async def on_kubernetes_app_event( async def annotate_addon_for_reservation( - cluster_name, - cluster_namespace, + instance, reservation, service_name, service_status = None @@ -624,6 +649,8 @@ async def annotate_addon_for_reservation( Annotates the addon for the reservation, if one exists, with information about the reservation. """ + cluster_name = instance.status.helm_release_name or instance.metadata.name + cluster_namespace = instance.metadata.namespace # If the reservation is not part of a Helm release, it isn't part of an addon annotations = reservation["metadata"].get("annotations", {}) release_namespace = annotations.get("meta.helm.sh/release-namespace") @@ -698,19 +725,20 @@ def get_service_status(reservation): @model_handler( api.Cluster, kopf.daemon, - include_instance = False, cancellation_timeout = 1 ) -async def monitor_cluster_services(name, namespace, **kwargs): +async def monitor_cluster_services(instance, name, namespace, **kwargs): """ Daemon that monitors Zenith reservations """ if not settings.zenith.enabled: return + if not instance.status.kubeconfig_secret_name: + raise kopf.TemporaryError("kubeconfig for cluster not set yet") eksecrets = await ekclient.api("v1").resource("secrets") try: kubeconfig = await eksecrets.fetch( - f"{name}-kubeconfig", + instance.status.kubeconfig_secret_name, namespace = namespace ) except ApiError as exc: @@ -739,8 +767,7 @@ async def monitor_cluster_services(name, namespace, **kwargs): service_name = get_service_name(reservation) service_status = get_service_status(reservation) addon = await annotate_addon_for_reservation( - name, - namespace, + instance, reservation, service_name, service_status @@ -769,8 +796,7 @@ async def monitor_cluster_services(name, namespace, **kwargs): if reservation.get("status", {}).get("phase", "Unknown") == "Ready": service_status = get_service_status(reservation) addon = await annotate_addon_for_reservation( - name, - namespace, + instance, reservation, service_name, service_status @@ -800,8 +826,7 @@ async def monitor_cluster_services(name, namespace, **kwargs): namespace = namespace ) await annotate_addon_for_reservation( - name, - namespace, + instance, reservation, service_name ) diff --git a/azimuth_capi/zenith.py b/azimuth_capi/zenith.py index 28ea2d1..6b698ab 100644 --- a/azimuth_capi/zenith.py +++ b/azimuth_capi/zenith.py @@ -243,7 +243,7 @@ async def zenith_values(client, cluster, addons): ) -async def zenith_operator_resources(name, namespace, cloud_credentials_secret): +async def zenith_operator_resources(instance, cloud_credentials_secret): """ Returns the resources required to enable the Zenith operator for the given cluster. """ @@ -266,12 +266,15 @@ async def zenith_operator_resources(name, namespace, cloud_credentials_secret): repo = settings.zenith.chart_repository, version = settings.zenith.chart_version ), - name, + instance.metadata.name, mergeconcat( settings.zenith.operator_defaults, { "kubeconfigSecret": { - "name": f"{name}-kubeconfig", + "name": ( + instance.status.kubeconfig_secret_name or + f"{instance.status.helm_release_name}-kubeconfig" + ), "key": "value", }, "config": { @@ -284,6 +287,6 @@ async def zenith_operator_resources(name, namespace, cloud_credentials_secret): }, } ), - namespace = namespace + namespace = instance.metadata.namespace ) )