From d3667446d41e377a58f7f122d47a997019ce7f5e Mon Sep 17 00:00:00 2001 From: michael Date: Mon, 26 Aug 2024 13:16:33 +0300 Subject: [PATCH 1/4] requests patch --- charmcraft.yaml | 12 +++++++ requirements.txt | 4 +-- src/charm.py | 2 ++ tests/scenario/conftest.py | 9 ++++- tests/scenario/test_charm_statuses.py | 52 ++++++++++++++++++++++++++- tests/unit/test_charm.py | 11 +++++- 6 files changed, 85 insertions(+), 5 deletions(-) diff --git a/charmcraft.yaml b/charmcraft.yaml index fd98faa..def4bc1 100644 --- a/charmcraft.yaml +++ b/charmcraft.yaml @@ -149,3 +149,15 @@ config: description: Force-enable the receiver for the 'jaeger_grpc' protocol in Tempo, even if there is no integration currently requesting it. type: boolean default: false + cpu: + description: | + K8s cpu resource limit, e.g. "1" or "500m". Default is unset (no limit). This value is used + for the "limits" portion of the resource requirements. + See https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/ + type: string + memory: + description: | + K8s memory resource limit, e.g. "1Gi". Default is unset (no limit). This value is used + for the "limits" portion of the resource requirements. + See https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/ + type: string \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 3834f32..e317799 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,7 +3,7 @@ importlib-metadata~=6.0.0 ops crossplane jsonschema==4.17.0 -lightkube==0.11.0 +lightkube>=0.15.4 lightkube-models==1.24.1.4 tenacity==8.2.3 # crossplane is a package from nginxinc to interact with the Nginx config @@ -19,4 +19,4 @@ cryptography # lib/charms/tempo_k8s/v1/tracing.py pydantic>=2 # lib/charms/prometheus_k8s/v0/prometheus_scrape.py -cosl +cosl>=0.0.24 \ No newline at end of file diff --git a/src/charm.py b/src/charm.py index 48f163f..055c55b 100755 --- a/src/charm.py +++ b/src/charm.py @@ -65,6 +65,8 @@ def __init__(self, *args): nginx_config=NginxConfig(server_name=self.hostname).config, workers_config=self.tempo.config, tracing_receivers=self.requested_receivers_urls, + resources_requests={"cpu": "50m", "memory": "100Mi"}, + container_name="charm", ) # configure this tempo as a datasource in grafana diff --git a/tests/scenario/conftest.py b/tests/scenario/conftest.py index 84c387e..d1fa3d8 100644 --- a/tests/scenario/conftest.py +++ b/tests/scenario/conftest.py @@ -1,6 +1,7 @@ from unittest.mock import MagicMock, patch import pytest +from ops import ActiveStatus from scenario import Container, Context, Relation from charm import TempoCoordinatorCharm @@ -15,7 +16,13 @@ def coordinator(): def tempo_charm(): with patch("lightkube.core.client.GenericSyncClient"): with patch("charm.TempoCoordinatorCharm.are_certificates_on_disk", False): - yield TempoCoordinatorCharm + with patch.multiple( + "cosl.coordinated_workers.coordinator.KubernetesComputeResourcesPatch", + _namespace="test-namespace", + _patch=lambda _: None, + get_status=lambda _: ActiveStatus(""), + ): + yield TempoCoordinatorCharm @pytest.fixture(scope="function") diff --git a/tests/scenario/test_charm_statuses.py b/tests/scenario/test_charm_statuses.py index b623a74..caca3bc 100644 --- a/tests/scenario/test_charm_statuses.py +++ b/tests/scenario/test_charm_statuses.py @@ -1,4 +1,4 @@ -from unittest.mock import patch +from unittest.mock import MagicMock, patch import ops from scenario import PeerRelation, State @@ -65,3 +65,53 @@ def test_happy_status( ), ) assert state_out.unit_status.name == "active" + + +@patch( + "cosl.coordinated_workers.coordinator.KubernetesComputeResourcesPatch.get_status", + MagicMock(return_value=ops.BlockedStatus("`juju trust` this application")), +) +@patch("charm.TempoCoordinatorCharm.is_workload_ready", return_value=True) +def test_k8s_patch_failed( + workload_ready_mock, + context, + s3, + all_worker, + nginx_container, + nginx_prometheus_exporter_container, +): + state_out = context.run( + "update_status", + State( + relations=[PeerRelation("peers", peers_data={1: {}, 2: {}}), s3, all_worker], + containers=[nginx_container, nginx_prometheus_exporter_container], + unit_status=ops.ActiveStatus(), + leader=True, + ), + ) + assert state_out.unit_status == ops.BlockedStatus("`juju trust` this application") + + +@patch( + "cosl.coordinated_workers.coordinator.KubernetesComputeResourcesPatch.get_status", + MagicMock(return_value=ops.WaitingStatus("waiting")), +) +@patch("charm.TempoCoordinatorCharm.is_workload_ready", return_value=True) +def test_k8s_patch_waiting( + workload_ready_mock, + context, + s3, + all_worker, + nginx_container, + nginx_prometheus_exporter_container, +): + state_out = context.run( + "config_changed", + State( + relations=[PeerRelation("peers", peers_data={1: {}, 2: {}}), s3, all_worker], + containers=[nginx_container, nginx_prometheus_exporter_container], + unit_status=ops.ActiveStatus(), + leader=True, + ), + ) + assert state_out.unit_status == ops.WaitingStatus("waiting") diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py index 1da8294..e7e8214 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm.py @@ -2,6 +2,7 @@ # See LICENSE file for licensing details. import unittest +from unittest.mock import patch from ops.testing import Harness @@ -9,9 +10,17 @@ CONTAINER_NAME = "nginx" +k8s_resource_multipatch = patch.multiple( + "cosl.coordinated_workers.coordinator.KubernetesComputeResourcesPatch", + _namespace="test-namespace", + _patch=lambda _: None, +) + class TestTempoCoordinatorCharm(unittest.TestCase): - def setUp(self): + + @k8s_resource_multipatch + def setUp(self, *_): self.harness = Harness(TempoCoordinatorCharm) self.harness.set_model_name("testmodel") self.addCleanup(self.harness.cleanup) From 98972e8150f52a32d6efdfe39ae141f6b1fbe543 Mon Sep 17 00:00:00 2001 From: michael Date: Mon, 26 Aug 2024 16:53:06 +0300 Subject: [PATCH 2/4] change config option names --- charmcraft.yaml | 4 ++-- src/charm.py | 6 +++++- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/charmcraft.yaml b/charmcraft.yaml index def4bc1..fb24508 100644 --- a/charmcraft.yaml +++ b/charmcraft.yaml @@ -149,13 +149,13 @@ config: description: Force-enable the receiver for the 'jaeger_grpc' protocol in Tempo, even if there is no integration currently requesting it. type: boolean default: false - cpu: + cpu_limit: description: | K8s cpu resource limit, e.g. "1" or "500m". Default is unset (no limit). This value is used for the "limits" portion of the resource requirements. See https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/ type: string - memory: + memory_limit: description: | K8s memory resource limit, e.g. "1Gi". Default is unset (no limit). This value is used for the "limits" portion of the resource requirements. diff --git a/src/charm.py b/src/charm.py index 055c55b..a1973ca 100755 --- a/src/charm.py +++ b/src/charm.py @@ -65,7 +65,7 @@ def __init__(self, *args): nginx_config=NginxConfig(server_name=self.hostname).config, workers_config=self.tempo.config, tracing_receivers=self.requested_receivers_urls, - resources_requests={"cpu": "50m", "memory": "100Mi"}, + resources_requests=self.get_resources_requests, container_name="charm", ) @@ -387,6 +387,10 @@ def is_workload_ready(self): return False return out == "ready" + def get_resources_requests(self, _) -> Dict[str, str]: + """Returns a dictionary for the "requests" portion of the resources requirements.""" + return {"cpu": "50m", "memory": "100Mi"} + if __name__ == "__main__": # pragma: nocover main(TempoCoordinatorCharm) From 8c34c824feda4c749860fee6c64abe7067b468ca Mon Sep 17 00:00:00 2001 From: michael Date: Fri, 30 Aug 2024 14:14:53 +0300 Subject: [PATCH 3/4] use latest lib --- requirements.txt | 2 +- src/charm.py | 2 +- src/tempo_config.py | 1 + tests/unit/test_charm.py | 2 ++ tests/unit/test_coherence.py | 2 +- 5 files changed, 6 insertions(+), 3 deletions(-) diff --git a/requirements.txt b/requirements.txt index e317799..c3e8286 100644 --- a/requirements.txt +++ b/requirements.txt @@ -19,4 +19,4 @@ cryptography # lib/charms/tempo_k8s/v1/tracing.py pydantic>=2 # lib/charms/prometheus_k8s/v0/prometheus_scrape.py -cosl>=0.0.24 \ No newline at end of file +cosl>=0.0.26 \ No newline at end of file diff --git a/src/charm.py b/src/charm.py index a1973ca..304ea70 100755 --- a/src/charm.py +++ b/src/charm.py @@ -20,7 +20,7 @@ receiver_protocol_to_transport_protocol, ) from charms.traefik_route_k8s.v0.traefik_route import TraefikRouteRequirer -from cosl.coordinated_workers.coordinator import Coordinator, ClusterRolesConfig +from cosl.coordinated_workers.coordinator import ClusterRolesConfig, Coordinator from cosl.coordinated_workers.nginx import CA_CERT_PATH, CERT_PATH, KEY_PATH from ops.charm import CharmBase, RelationEvent from ops.main import main diff --git a/src/tempo_config.py b/src/tempo_config.py index d90be69..7c7ff25 100644 --- a/src/tempo_config.py +++ b/src/tempo_config.py @@ -10,6 +10,7 @@ from pathlib import Path from typing import Any, Dict, List, Optional from urllib.parse import urlparse + from cosl.coordinated_workers.coordinator import ClusterRolesConfig from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py index e7e8214..1800ebb 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm.py @@ -15,11 +15,13 @@ _namespace="test-namespace", _patch=lambda _: None, ) +lightkube_client_patch = patch("lightkube.core.client.GenericSyncClient") class TestTempoCoordinatorCharm(unittest.TestCase): @k8s_resource_multipatch + @lightkube_client_patch def setUp(self, *_): self.harness = Harness(TempoCoordinatorCharm) self.harness.set_model_name("testmodel") diff --git a/tests/unit/test_coherence.py b/tests/unit/test_coherence.py index 274cffe..ff7b7aa 100644 --- a/tests/unit/test_coherence.py +++ b/tests/unit/test_coherence.py @@ -6,8 +6,8 @@ from tempo_config import ( MINIMAL_DEPLOYMENT, RECOMMENDED_DEPLOYMENT, - TempoRole, TEMPO_ROLES_CONFIG, + TempoRole, ) From 5ad45ba411ad93dae41e57e7da0001b7b9bef36c Mon Sep 17 00:00:00 2001 From: michael Date: Fri, 30 Aug 2024 14:20:26 +0300 Subject: [PATCH 4/4] Add missing lib --- .../v0/kubernetes_compute_resources_patch.py | 766 ++++++++++++++++++ 1 file changed, 766 insertions(+) create mode 100644 lib/charms/observability_libs/v0/kubernetes_compute_resources_patch.py diff --git a/lib/charms/observability_libs/v0/kubernetes_compute_resources_patch.py b/lib/charms/observability_libs/v0/kubernetes_compute_resources_patch.py new file mode 100644 index 0000000..34dd026 --- /dev/null +++ b/lib/charms/observability_libs/v0/kubernetes_compute_resources_patch.py @@ -0,0 +1,766 @@ +# Copyright 2022 Canonical Ltd. +# See LICENSE file for licensing details. + +"""# KubernetesComputeResourcesPatch Library. + +This library is designed to enable developers to more simply patch the Kubernetes compute resource +limits and requests created by Juju during the deployment of a charm. + +When initialised, this library binds a handler to the parent charm's `config-changed` event. +The config-changed event is used because it is guaranteed to fire on startup, on upgrade and on +pod churn. Additionally, resource limits may be set by charm config options, which would also be +caught out-of-the-box by this handler. The handler applies the patch to the app's StatefulSet. +This should ensure that the resource limits are correct throughout the charm's life. Additional +optional user-provided events for re-applying the patch are supported but discouraged. + +The constructor takes a reference to the parent charm, a 'limits' and a 'requests' dictionaries +that together define the resource requirements. For information regarding the `lightkube` +`ResourceRequirements` model, please visit the `lightkube` +[docs](https://gtsystem.github.io/lightkube-models/1.23/models/core_v1/#resourcerequirements). + + +## Getting Started + +To get started using the library, you just need to fetch the library using `charmcraft`. **Note +that you also need to add `lightkube` and `lightkube-models` to your charm's `requirements.txt`.** + +```shell +cd some-charm +charmcraft fetch-lib charms.observability_libs.v0.kubernetes_compute_resources_patch +cat << EOF >> requirements.txt +lightkube +lightkube-models +EOF +``` + +Then, to initialise the library: + +```python +# ... +from charms.observability_libs.v0.kubernetes_compute_resources_patch import ( + KubernetesComputeResourcesPatch, + ResourceRequirements, +) + +class SomeCharm(CharmBase): + def __init__(self, *args): + # ... + self.resources_patch = KubernetesComputeResourcesPatch( + self, + "container-name", + resource_reqs_func=lambda: ResourceRequirements( + limits={"cpu": "2"}, requests={"cpu": "1"} + ), + ) + self.framework.observe(self.resources_patch.on.patch_failed, self._on_resource_patch_failed) + + def _on_resource_patch_failed(self, event): + self.unit.status = BlockedStatus(event.message) + # ... +``` + +Or, if, for example, the resource specs are coming from config options: + +```python +class SomeCharm(CharmBase): + def __init__(self, *args): + # ... + self.resources_patch = KubernetesComputeResourcesPatch( + self, + "container-name", + resource_reqs_func=self._resource_spec_from_config, + ) + + def _resource_spec_from_config(self) -> ResourceRequirements: + spec = {"cpu": self.model.config.get("cpu"), "memory": self.model.config.get("memory")} + return ResourceRequirements(limits=spec, requests=spec) +``` + +If you wish to pull the state of the resources patch operation and set the charm unit status based on that patch result, +you can achieve that using `get_status()` function. +```python +class SomeCharm(CharmBase): + def __init__(self, *args): + #... + self.framework.observe(self.on.collect_unit_status, self._on_collect_unit_status) + #... + def _on_collect_unit_status(self, event: CollectStatusEvent): + event.add_status(self.resources_patch.get_status()) +``` + +Additionally, you may wish to use mocks in your charm's unit testing to ensure that the library +does not try to make any API calls, or open any files during testing that are unlikely to be +present, and could break your tests. The easiest way to do this is during your test `setUp`: + +```python +# ... +from ops import ActiveStatus + +@patch.multiple( + "charm.KubernetesComputeResourcesPatch", + _namespace="test-namespace", + _is_patched=lambda *a, **kw: True, + is_ready=lambda *a, **kw: True, + get_status=lambda _: ActiveStatus(), +) +@patch("lightkube.core.client.GenericSyncClient") +def setUp(self, *unused): + self.harness = Harness(SomeCharm) + # ... +``` + +References: +- https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/ +- https://gtsystem.github.io/lightkube-models/1.23/models/core_v1/#resourcerequirements +""" + +import decimal +import logging +from decimal import Decimal +from math import ceil, floor +from typing import Any, Callable, Dict, List, Optional, Tuple, Union + +import tenacity +from lightkube import ApiError, Client # pyright: ignore +from lightkube.core import exceptions +from lightkube.models.apps_v1 import StatefulSetSpec +from lightkube.models.core_v1 import ( + Container, + PodSpec, + PodTemplateSpec, + ResourceRequirements, +) +from lightkube.resources.apps_v1 import StatefulSet +from lightkube.resources.core_v1 import Pod +from lightkube.types import PatchType +from lightkube.utils.quantity import equals_canonically, parse_quantity +from ops import ActiveStatus, BlockedStatus, WaitingStatus +from ops.charm import CharmBase +from ops.framework import BoundEvent, EventBase, EventSource, Object, ObjectEvents +from ops.model import StatusBase + +logger = logging.getLogger(__name__) + +# The unique Charmhub library identifier, never change it +LIBID = "2a6066f701444e8db44ba2f6af28da90" + +# Increment this major API version when introducing breaking changes +LIBAPI = 0 + +# Increment this PATCH version before using `charmcraft publish-lib` or reset +# to 0 if you are raising the major API version +LIBPATCH = 8 + + +_Decimal = Union[Decimal, float, str, int] # types that are potentially convertible to Decimal + + +def adjust_resource_requirements( + limits: Optional[Dict[Any, Any]], + requests: Optional[Dict[Any, Any]], + adhere_to_requests: bool = True, +) -> ResourceRequirements: + """Adjust resource limits so that `limits` and `requests` are consistent with each other. + + Args: + limits: the "limits" portion of the resource spec. + requests: the "requests" portion of the resource spec. + adhere_to_requests: a flag indicating which portion should be adjusted when "limits" is + lower than "requests": + - if True, "limits" will be adjusted to max(limits, requests). + - if False, "requests" will be adjusted to min(limits, requests). + + Returns: + An adjusted (limits, requests) 2-tuple. + + >>> adjust_resource_requirements({}, {}) + ResourceRequirements(claims=None, limits={}, requests={}) + >>> adjust_resource_requirements({"cpu": "1"}, {}) + ResourceRequirements(claims=None, limits={'cpu': '1'}, requests={'cpu': '1'}) + >>> adjust_resource_requirements({"cpu": "1"}, {"cpu": "2"}, True) + ResourceRequirements(claims=None, limits={'cpu': '2'}, requests={'cpu': '2'}) + >>> adjust_resource_requirements({"cpu": "1"}, {"cpu": "2"}, False) + ResourceRequirements(claims=None, limits={'cpu': '1'}, requests={'cpu': '1'}) + >>> adjust_resource_requirements({"cpu": "1"}, {"memory": "1G"}, True) + ResourceRequirements(claims=None, limits={'cpu': '1'}, requests={'memory': '1G', 'cpu': '1'}) + >>> adjust_resource_requirements({"cpu": "1"}, {"memory": "1G"}, False) + ResourceRequirements(claims=None, limits={'cpu': '1'}, requests={'memory': '1G', 'cpu': '1'}) + >>> adjust_resource_requirements({"cpu": "1", "memory": "1"}, {"memory": "2"}, True) + ResourceRequirements(\ +claims=None, limits={'cpu': '1', 'memory': '2'}, requests={'memory': '2', 'cpu': '1'}) + >>> adjust_resource_requirements({"cpu": "1", "memory": "1"}, {"memory": "1G"}, False) + ResourceRequirements(\ +claims=None, limits={'cpu': '1', 'memory': '1'}, requests={'memory': '1', 'cpu': '1'}) + >>> adjust_resource_requirements({"custom-resource": "1"}, {"custom-resource": "2"}, False) + Traceback (most recent call last): + ... + ValueError: Invalid limits spec: {'custom-resource': '1'} + """ + if not is_valid_spec(limits): + raise ValueError("Invalid limits spec: {}".format(limits)) + if not is_valid_spec(requests): + raise ValueError("Invalid default requests spec: {}".format(requests)) + + limits = sanitize_resource_spec_dict(limits) or {} + requests = sanitize_resource_spec_dict(requests) or {} + + # Make sure we do not modify in-place + limits, requests = limits.copy(), requests.copy() + + # Need to copy key-val pairs from "limits" to "requests", if they are not present in + # "requests". This replicates K8s behavior: + # https://kubernetes.io/docs/concepts/configuration/manage-resources-containers + requests.update({k: limits[k] for k in limits if k not in requests}) + + if adhere_to_requests: + # Keep limits fixed when `limits` is too low + adjusted, fixed = limits, requests + func = max + else: + # Pull down requests when limit is too low + fixed, adjusted = limits, requests + func = min + + # adjusted = {} + for k in adjusted: + if k not in fixed: + # The resource constraint is present in the "adjusted" dict but not in the "fixed" + # dict. Keep the "adjusted" value as is + continue + + adjusted_value = func(parse_quantity(fixed[k]), parse_quantity(adjusted[k])) # type: ignore[type-var] + adjusted[k] = ( + str(adjusted_value.quantize(decimal.Decimal("0.001"), rounding=decimal.ROUND_UP)) # type: ignore[union-attr] + .rstrip("0") + .rstrip(".") + ) + + return ( + ResourceRequirements(limits=adjusted, requests=fixed) + if adhere_to_requests + else ResourceRequirements(limits=fixed, requests=adjusted) + ) + + +def is_valid_spec(spec: Optional[dict], debug=False) -> bool: # noqa: C901 + """Check if the spec dict is valid. + + TODO: generally, the keys can be anything, not just cpu and memory. Perhaps user could pass + list of custom allowed keys in addition to the K8s ones? + """ + if spec is None: + return True + if not isinstance(spec, dict): + if debug: + logger.error("Invalid resource spec type '%s': must be either None or dict.", spec) + return False + + for k, v in spec.items(): + valid_keys = ["cpu", "memory"] # K8s permits custom keys, but we limit here to what we use + if k not in valid_keys: + if debug: + logger.error("Invalid key in resource spec: %s; valid keys: %s.", k, valid_keys) + return False + try: + assert isinstance(v, (str, type(None))) # for type checker + pv = parse_quantity(v) + except ValueError: + if debug: + logger.error("Invalid resource spec entry: {%s: %s}.", k, v) + return False + + if pv and pv < 0: + if debug: + logger.error("Invalid resource spec entry: {%s: %s}; must be non-negative.", k, v) + return False + + return True + + +def sanitize_resource_spec_dict(spec: Optional[dict]) -> Optional[dict]: + """Fix spec values without altering semantics. + + The purpose of this helper function is to correct known issues. + This function is not intended for fixing user mistakes such as incorrect keys present; that is + left for the `is_valid_spec` function. + """ + if not spec: + return spec + + d = spec.copy() + + for k, v in spec.items(): + if not v: + # Need to ignore empty values input, otherwise the StatefulSet will have "0" as the + # setpoint, the pod will not be scheduled and the charm would be stuck in unknown/lost. + # This slightly changes the spec semantics compared to lightkube/k8s: a setpoint of + # `None` would be interpreted here as "no limit". + del d[k] + + # Round up memory to whole bytes. This is need to avoid K8s errors such as: + # fractional byte value "858993459200m" (0.8Gi) is invalid, must be an integer + memory = d.get("memory") + if memory: + as_decimal = parse_quantity(memory) + if as_decimal and as_decimal.remainder_near(floor(as_decimal)): + d["memory"] = str(ceil(as_decimal)) + return d + + +def _retry_on_condition(exception): + """Retry if the exception is an ApiError with a status code != 403. + + Returns: a boolean value to indicate whether to retry or not. + """ + if isinstance(exception, ApiError) and str(exception.status.code) != "403": + return True + if isinstance(exception, exceptions.ConfigError) or isinstance(exception, ValueError): + return True + return False + + +class K8sResourcePatchFailedEvent(EventBase): + """Emitted when patching fails.""" + + def __init__(self, handle, message=None): + super().__init__(handle) + self.message = message + + def snapshot(self) -> Dict: + """Save grafana source information.""" + return {"message": self.message} + + def restore(self, snapshot): + """Restore grafana source information.""" + self.message = snapshot["message"] + + +class K8sResourcePatchEvents(ObjectEvents): + """Events raised by :class:`K8sResourcePatchEvents`.""" + + patch_failed = EventSource(K8sResourcePatchFailedEvent) + + +class ContainerNotFoundError(ValueError): + """Raised when a given container does not exist in the list of containers.""" + + +class ResourcePatcher: + """Helper class for patching a container's resource limits in a given StatefulSet.""" + + def __init__(self, namespace: str, statefulset_name: str, container_name: str): + self.namespace = namespace + self.statefulset_name = statefulset_name + self.container_name = container_name + self.client = Client() # pyright: ignore + + def _patched_delta(self, resource_reqs: ResourceRequirements) -> StatefulSet: + statefulset = self.client.get( + StatefulSet, name=self.statefulset_name, namespace=self.namespace + ) + + return StatefulSet( + spec=StatefulSetSpec( + selector=statefulset.spec.selector, # type: ignore[attr-defined] + serviceName=statefulset.spec.serviceName, # type: ignore[attr-defined] + template=PodTemplateSpec( + spec=PodSpec( + containers=[Container(name=self.container_name, resources=resource_reqs)] + ) + ), + ) + ) + + @classmethod + def _get_container(cls, container_name: str, containers: List[Container]) -> Container: + """Find our container from the container list, assuming list is unique by name. + + Typically, *.spec.containers[0] is the charm container, and [1] is the (only) workload. + + Raises: + ContainerNotFoundError, if the user-provided container name does not exist in the list. + + Returns: + An instance of :class:`Container` whose name matches the given name. + """ + try: + return next(iter(filter(lambda ctr: ctr.name == container_name, containers))) + except StopIteration: + raise ContainerNotFoundError(f"Container '{container_name}' not found") + + def is_patched(self, resource_reqs: ResourceRequirements) -> bool: + """Reports if the resource patch has been applied to the StatefulSet. + + Returns: + bool: A boolean indicating if the service patch has been applied. + """ + return equals_canonically(self.get_templated(), resource_reqs) # pyright: ignore + + def get_templated(self) -> Optional[ResourceRequirements]: + """Returns the resource limits specified in the StatefulSet template.""" + statefulset = self.client.get( + StatefulSet, name=self.statefulset_name, namespace=self.namespace + ) + podspec_tpl = self._get_container( + self.container_name, + statefulset.spec.template.spec.containers, # type: ignore[attr-defined] + ) + return podspec_tpl.resources + + def get_actual(self, pod_name: str) -> Optional[ResourceRequirements]: + """Return the resource limits that are in effect for the container in the given pod.""" + pod = self.client.get(Pod, name=pod_name, namespace=self.namespace) + podspec = self._get_container( + self.container_name, pod.spec.containers # type: ignore[attr-defined] + ) + return podspec.resources + + def is_failed( + self, resource_reqs_func: Callable[[], ResourceRequirements] + ) -> Tuple[bool, str]: + """Returns a tuple indicating whether a patch operation has failed along with a failure message. + + Implementation is based on dry running the patch operation to catch if there would be failures (e.g: Wrong spec and Auth errors). + """ + try: + resource_reqs = resource_reqs_func() + limits = resource_reqs.limits + requests = resource_reqs.requests + except ValueError as e: + msg = f"Failed obtaining resource limit spec: {e}" + logger.error(msg) + return True, msg + + # Dry run does not catch negative values for resource requests and limits. + if not is_valid_spec(limits) or not is_valid_spec(requests): + msg = f"Invalid resource requirements specs: {limits}, {requests}" + logger.error(msg) + return True, msg + + resource_reqs = ResourceRequirements( + limits=sanitize_resource_spec_dict(limits), # type: ignore[arg-type] + requests=sanitize_resource_spec_dict(requests), # type: ignore[arg-type] + ) + + try: + self.apply(resource_reqs, dry_run=True) + except ApiError as e: + if e.status.code == 403: + msg = f"Kubernetes resources patch failed: `juju trust` this application. {e}" + else: + msg = f"Kubernetes resources patch failed: {e}" + return True, msg + except ValueError as e: + msg = f"Kubernetes resources patch failed: {e}" + return True, msg + + return False, "" + + def is_in_progress(self) -> bool: + """Returns a boolean to indicate whether a patch operation is in progress. + + Implementation follows a similar approach to `kubectl rollout status statefulset` to track the progress of a rollout. + Reference: https://github.com/kubernetes/kubectl/blob/kubernetes-1.31.0/pkg/polymorphichelpers/rollout_status.go + """ + try: + sts = self.client.get( + StatefulSet, name=self.statefulset_name, namespace=self.namespace + ) + except (ValueError, ApiError) as e: + # Assumption: if there was a persistent issue, it'd have been caught in `is_failed` + # Wait until next run to try again. + logger.error(f"Failed to fetch statefulset from K8s api: {e}") + return False + + if sts.status is None or sts.spec is None: + logger.debug("status/spec are not yet available") + return False + if sts.status.observedGeneration == 0 or ( + sts.metadata + and sts.status.observedGeneration + and sts.metadata.generation + and sts.metadata.generation > sts.status.observedGeneration + ): + logger.debug("waiting for statefulset spec update to be observed...") + return True + if ( + sts.spec.replicas is not None + and sts.status.readyReplicas is not None + and sts.status.readyReplicas < sts.spec.replicas + ): + logger.debug( + f"Waiting for {sts.spec.replicas-sts.status.readyReplicas} pods to be ready..." + ) + return True + + if ( + sts.spec.updateStrategy + and sts.spec.updateStrategy.type == "rollingUpdate" + and sts.spec.updateStrategy.rollingUpdate is not None + ): + if ( + sts.spec.replicas is not None + and sts.spec.updateStrategy.rollingUpdate.partition is not None + ): + if sts.status.updatedReplicas and sts.status.updatedReplicas < ( + sts.spec.replicas - sts.spec.updateStrategy.rollingUpdate.partition + ): + logger.debug( + f"Waiting for partitioned roll out to finish: {sts.status.updatedReplicas} out of {sts.spec.replicas - sts.spec.updateStrategy.rollingUpdate.partition} new pods have been updated..." + ) + return True + logger.debug( + f"partitioned roll out complete: {sts.status.updatedReplicas} new pods have been updated..." + ) + return False + + if sts.status.updateRevision != sts.status.currentRevision: + logger.debug( + f"waiting for statefulset rolling update to complete {sts.status.updatedReplicas} pods at revision {sts.status.updateRevision}..." + ) + return True + + logger.debug( + f"statefulset rolling update complete pods at revision {sts.status.currentRevision}" + ) + return False + + def is_ready(self, pod_name, resource_reqs: ResourceRequirements): + """Reports if the resource patch has been applied and is in effect. + + Returns: + bool: A boolean indicating if the service patch has been applied and is in effect. + """ + return self.is_patched(resource_reqs) and equals_canonically( # pyright: ignore + resource_reqs, self.get_actual(pod_name) # pyright: ignore + ) + + def apply(self, resource_reqs: ResourceRequirements, dry_run=False) -> None: + """Patch the Kubernetes resources created by Juju to limit cpu or mem.""" + # Need to ignore invalid input, otherwise the StatefulSet gives "FailedCreate" and the + # charm would be stuck in unknown/lost. + if not dry_run and self.is_patched(resource_reqs): + logger.debug(f"Resource requests are already patched: {resource_reqs}") + return + + self.client.patch( + StatefulSet, + self.statefulset_name, + self._patched_delta(resource_reqs), + namespace=self.namespace, + patch_type=PatchType.APPLY, + field_manager=self.__class__.__name__, + dry_run=dry_run, + ) + + +class KubernetesComputeResourcesPatch(Object): + """A utility for patching the Kubernetes compute resources set up by Juju.""" + + on = K8sResourcePatchEvents() # pyright: ignore + PATCH_RETRY_STOP = tenacity.stop_after_delay(20) + PATCH_RETRY_WAIT = tenacity.wait_fixed(5) + PATCH_RETRY_IF = tenacity.retry_if_exception(_retry_on_condition) + + def __init__( + self, + charm: CharmBase, + container_name: str, + *, + resource_reqs_func: Callable[[], ResourceRequirements], + refresh_event: Optional[Union[BoundEvent, List[BoundEvent]]] = None, + ): + """Constructor for KubernetesComputeResourcesPatch. + + References: + - https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/ + + Args: + charm: the charm that is instantiating the library. + container_name: the container for which to apply the resource limits. + resource_reqs_func: a callable returning a `ResourceRequirements`; if raises, should + only raise ValueError. + refresh_event: an optional bound event or list of bound events which + will be observed to re-apply the patch. + """ + super().__init__(charm, "{}_{}".format(self.__class__.__name__, container_name)) + self._charm = charm + self._container_name = container_name + self.resource_reqs_func = resource_reqs_func + self.patcher = ResourcePatcher(self._namespace, self._app, container_name) + + # Ensure this patch is applied during the 'config-changed' event, which is emitted every + # startup and every upgrade. The config-changed event is a good time to apply this kind of + # patch because it is always emitted after storage-attached, leadership and peer-created, + # all of which only fire after install. Patching the statefulset prematurely could result + # in those events firing without a workload. + self.framework.observe(charm.on.config_changed, self._on_config_changed) + + if not refresh_event: + refresh_event = [] + elif not isinstance(refresh_event, list): + refresh_event = [refresh_event] + for ev in refresh_event: + self.framework.observe(ev, self._on_config_changed) + + def _on_config_changed(self, _): + self._patch() + + def _patch(self) -> None: + """Patch the Kubernetes resources created by Juju to limit cpu or mem. + + This method will keep on retrying to patch the kubernetes resource for a default duration of 20 seconds + if the patching failure is due to a recoverable error (e.g: Network Latency). + """ + try: + resource_reqs = self.resource_reqs_func() + limits = resource_reqs.limits + requests = resource_reqs.requests + except ValueError as e: + msg = f"Failed obtaining resource limit spec: {e}" + logger.error(msg) + self.on.patch_failed.emit(message=msg) + return + + for spec in (limits, requests): + if not is_valid_spec(spec): + msg = f"Invalid resource limit spec: {spec}" + logger.error(msg) + self.on.patch_failed.emit(message=msg) + return + + resource_reqs = ResourceRequirements( + limits=sanitize_resource_spec_dict(limits), # type: ignore[arg-type] + requests=sanitize_resource_spec_dict(requests), # type: ignore[arg-type] + ) + + try: + for attempt in tenacity.Retrying( + retry=self.PATCH_RETRY_IF, + stop=self.PATCH_RETRY_STOP, + wait=self.PATCH_RETRY_WAIT, + # if you don't succeed raise the last caught exception when you're done + reraise=True, + ): + with attempt: + logger.debug( + f"attempt #{attempt.retry_state.attempt_number} to patch resource limits" + ) + self.patcher.apply(resource_reqs) + + except exceptions.ConfigError as e: + msg = f"Error creating k8s client: {e}" + logger.error(msg) + self.on.patch_failed.emit(message=msg) + return + + except ApiError as e: + if e.status.code == 403: + msg = f"Kubernetes resources patch failed: `juju trust` this application. {e}" + + else: + msg = f"Kubernetes resources patch failed: {e}" + + logger.error(msg) + self.on.patch_failed.emit(message=msg) + + except ValueError as e: + msg = f"Kubernetes resources patch failed: {e}" + logger.error(msg) + self.on.patch_failed.emit(message=msg) + + else: + logger.info( + "Kubernetes resources for app '%s', container '%s' patched successfully: %s", + self._app, + self._container_name, + resource_reqs, + ) + + def is_ready(self) -> bool: + """Reports if the resource patch has been applied and is in effect. + + Returns: + bool: A boolean indicating if the service patch has been applied and is in effect. + """ + try: + resource_reqs = self.resource_reqs_func() + limits = resource_reqs.limits + requests = resource_reqs.requests + except ValueError as e: + msg = f"Failed obtaining resource limit spec: {e}" + logger.error(msg) + return False + + if not is_valid_spec(limits) or not is_valid_spec(requests): + logger.error("Invalid resource requirements specs: %s, %s", limits, requests) + return False + + resource_reqs = ResourceRequirements( + limits=sanitize_resource_spec_dict(limits), # type: ignore[arg-type] + requests=sanitize_resource_spec_dict(requests), # type: ignore[arg-type] + ) + + try: + return self.patcher.is_ready(self._pod, resource_reqs) + except (ValueError, ApiError) as e: + msg = f"Failed to apply resource limit patch: {e}" + logger.error(msg) + self.on.patch_failed.emit(message=msg) + return False + + def get_status(self) -> StatusBase: + """Return the status of patching the resource limits in a `StatusBase` format. + + Returns: + StatusBase: There is a 1:1 mapping between the state of the patching operation and a `StatusBase` value that the charm can be set to. + Possible values are: + - ActiveStatus: The patch was applied successfully. + - BlockedStatus: The patch failed and requires a human intervention. + - WaitingStatus: The patch is still in progress. + + Example: + - ActiveStatus("Patch applied successfully") + - BlockedStatus("Failed due to missing permissions") + - WaitingStatus("Patch is in progress") + """ + failed, msg = self.patcher.is_failed(self.resource_reqs_func) + if failed: + return BlockedStatus(msg) + if self.patcher.is_in_progress(): + return WaitingStatus("waiting for resources patch to apply") + # patch successful or nothing has been patched yet + return ActiveStatus() + + @property + def _app(self) -> str: + """Name of the current Juju application. + + Returns: + str: A string containing the name of the current Juju application. + """ + return self._charm.app.name + + @property + def _pod(self) -> str: + """Name of the unit's pod. + + Returns: + str: A string containing the name of the current unit's pod. + """ + return "-".join(self._charm.unit.name.rsplit("/", 1)) + + @property + def _namespace(self) -> str: + """The Kubernetes namespace we're running in. + + If a charm is deployed into the controller model (which certainly could happen as we move + to representing the controller as a charm) then self._charm.model.name !== k8s namespace. + Instead, the model name is controller in Juju and controller- for the + namespace in K8s. + + Returns: + str: A string containing the name of the current Kubernetes namespace. + """ + with open("/var/run/secrets/kubernetes.io/serviceaccount/namespace", "r") as f: + return f.read().strip()