From 5d6af4b6f05b4d1ee3926df39433f1a898d1c1b0 Mon Sep 17 00:00:00 2001 From: Mateo Florido Date: Wed, 4 Dec 2024 09:08:12 -0500 Subject: [PATCH] Implement `upgrade-relation` for control plane nodes (#200) This commit implements the upgrades for control plane nodes --- charms/worker/k8s/requirements.txt | 1 + charms/worker/k8s/src/charm.py | 48 +++++- charms/worker/k8s/src/events/update_status.py | 8 +- charms/worker/k8s/src/literals.py | 23 ++- charms/worker/k8s/src/protocols.py | 28 +++ charms/worker/k8s/src/snap.py | 49 +++++- charms/worker/k8s/src/upgrade.py | 159 +++++++++++++++++- .../k8s/templates/snap_installation.yaml | 2 +- charms/worker/k8s/tests/unit/test_upgrade.py | 31 +++- 9 files changed, 324 insertions(+), 25 deletions(-) diff --git a/charms/worker/k8s/requirements.txt b/charms/worker/k8s/requirements.txt index 3c639667..140861f3 100644 --- a/charms/worker/k8s/requirements.txt +++ b/charms/worker/k8s/requirements.txt @@ -16,3 +16,4 @@ typing_extensions==4.12.2 websocket-client==1.8.0 poetry-core==1.9.1 lightkube==0.15.5 +httpx==0.27.2 diff --git a/charms/worker/k8s/src/charm.py b/charms/worker/k8s/src/charm.py index 3e5310be..a3d5206a 100755 --- a/charms/worker/k8s/src/charm.py +++ b/charms/worker/k8s/src/charm.py @@ -21,10 +21,11 @@ import shlex import socket import subprocess +from collections import defaultdict from functools import cached_property from pathlib import Path from time import sleep -from typing import Dict, Optional, Union +from typing import Dict, List, Optional, Union from urllib.parse import urlparse import charms.contextual_status as status @@ -123,6 +124,8 @@ class K8sCharm(ops.CharmBase): is_worker: true if this is a worker unit is_control_plane: true if this is a control-plane unit lead_control_plane: true if this is a control-plane unit and its the leader + is_upgrade_granted: true if the upgrade has been granted + datastore: the datastore used for Kubernetes """ _stored = ops.StoredState() @@ -139,16 +142,16 @@ def __init__(self, *args): xcp_relation = "external-cloud-provider" if self.is_control_plane else "" self.cloud_integration = CloudIntegration(self, self.is_control_plane) self.xcp = ExternalCloudProvider(self, xcp_relation) - self.cluster_inspector = ClusterInspector(kubeconfig_path=KUBECONFIG) + self.cluster_inspector = ClusterInspector(kubeconfig_path=self._internal_kubeconfig) self.upgrade = K8sUpgrade( self, - node_manager=self.cluster_inspector, + cluster_inspector=self.cluster_inspector, relation_name="upgrade", substrate="vm", dependency_model=K8sDependenciesModel(**DEPENDENCIES), ) self.cos = COSIntegration(self) - self.update_status = update_status.Handler(self) + self.update_status = update_status.Handler(self, self.upgrade) self.reconciler = Reconciler( self, self._reconcile, exit_status=self.update_status.active_status ) @@ -161,7 +164,8 @@ def __init__(self, *args): user_label_key="node-labels", timeout=15, ) - self._stored.set_default(is_dying=False, cluster_name=str()) + self._upgrade_snap = False + self._stored.set_default(is_dying=False, cluster_name=str(), upgrade_granted=False) self.cos_agent = COSAgentProvider( self, @@ -227,6 +231,35 @@ def is_worker(self) -> bool: """Returns true if the unit is a worker.""" return self.meta.name == "k8s-worker" + @property + def datastore(self) -> str: + """Return the datastore type.""" + return str(self.config.get("bootstrap-datastore")) + + def get_worker_versions(self) -> Dict[str, List[ops.Unit]]: + """Get the versions of the worker units. + + Returns: + Dict[str, List[ops.Unit]]: A dictionary of versions and the units that have them. + """ + if not (relation := self.model.get_relation("k8s-cluster")): + return {} + + versions = defaultdict(list) + for unit in relation.units: + if version := relation.data[unit].get("version"): + versions[version].append(unit) + return versions + + def grant_upgrade(self): + """Grant the upgrade to the charm.""" + self._upgrade_snap = True + + @property + def is_upgrade_granted(self) -> bool: + """Check if the upgrade has been granted.""" + return self._upgrade_snap + def _apply_proxy_environment(self): """Apply the proxy settings from environment variables.""" proxy_settings = self._get_proxy_env() @@ -694,8 +727,9 @@ def _announce_kubernetes_version(self): if not unit_version: raise ReconcilerError(f"Waiting for version from {unit.name}") if unit_version != local_version: - status.add(ops.BlockedStatus(f"Version mismatch with {unit.name}")) - raise ReconcilerError(f"Version mismatch with {unit.name}") + # NOTE: Add a check to validate if we are doing an upgrade + status.add(ops.WaitingStatus("Upgrading the cluster")) + return relation.data[self.app]["version"] = local_version def _get_proxy_env(self) -> Dict[str, str]: diff --git a/charms/worker/k8s/src/events/update_status.py b/charms/worker/k8s/src/events/update_status.py index 9f67c1c1..6a480846 100644 --- a/charms/worker/k8s/src/events/update_status.py +++ b/charms/worker/k8s/src/events/update_status.py @@ -15,6 +15,7 @@ import reschedule from protocols import K8sCharmProtocol from snap import version as snap_version +from upgrade import K8sUpgrade # Log messages can be retrieved using juju debug-log log = logging.getLogger(__name__) @@ -64,24 +65,27 @@ class Handler(ops.Object): the unit's status during the update process. """ - def __init__(self, charm: K8sCharmProtocol): + def __init__(self, charm: K8sCharmProtocol, upgrade: K8sUpgrade): """Initialize the UpdateStatusEvent. Args: charm: The charm instance that is instantiating this event. + upgrade: The upgrade instance that handles the upgrade process. """ super().__init__(charm, "update_status") self.charm = charm + self.upgrade = upgrade self.active_status = DynamicActiveStatus() self.charm.framework.observe(self.charm.on.update_status, self._on_update_status) - def _on_update_status(self, _event: ops.UpdateStatusEvent): + def _on_update_status(self, event: ops.UpdateStatusEvent): """Handle update-status event.""" if not self.charm.reconciler.stored.reconciled: return try: with status.context(self.charm.unit, exit_status=self.active_status): + self.upgrade.set_upgrade_status(event) self.run() except status.ReconcilerError: log.exception("Can't update_status") diff --git a/charms/worker/k8s/src/literals.py b/charms/worker/k8s/src/literals.py index 950b9edd..2b102d4d 100644 --- a/charms/worker/k8s/src/literals.py +++ b/charms/worker/k8s/src/literals.py @@ -3,6 +3,27 @@ """Literals for the charm.""" +SNAP_NAME = "k8s" + +K8S_COMMON_SERVICES = [ + "kubelet", + "kube-proxy", + "k8sd", +] + +K8S_DQLITE_SERVICE = "k8s-dqlite" + +K8S_CONTROL_PLANE_SERVICES = [ + "kube-apiserver", + K8S_DQLITE_SERVICE, + "kube-controller-manager", + "kube-scheduler", +] + +K8S_WORKER_SERVICES = [ + "k8s-apiserver-proxy", +] + DEPENDENCIES = { # NOTE: Update the dependencies for the k8s-charm before releasing. "k8s_charm": { @@ -16,6 +37,6 @@ "dependencies": {"k8s-worker": "^1.30, < 1.32"}, "name": "k8s", "upgrade_supported": "^1.30, < 1.32", - "version": "1.31.2", + "version": "1.31.3", }, } diff --git a/charms/worker/k8s/src/protocols.py b/charms/worker/k8s/src/protocols.py index 0ca5fc64..dbf97318 100644 --- a/charms/worker/k8s/src/protocols.py +++ b/charms/worker/k8s/src/protocols.py @@ -3,6 +3,8 @@ """Protocol definitions module.""" +from typing import Dict, List + import ops from charms.interface_external_cloud_provider import ExternalCloudProvider from charms.k8s.v0.k8sd_api_manager import K8sdAPIManager @@ -18,12 +20,22 @@ class K8sCharmProtocol(ops.CharmBase): kube_control (KubeControlProvides): The kube-control interface. xcp (ExternalCloudProvider): The external cloud provider interface. reconciler (Reconciler): The reconciler for the charm + is_upgrade_granted (bool): Whether the upgrade is granted. + lead_control_plane (bool): Whether the charm is the lead control plane. + is_control_plane (bool): Whether the charm is a control plane. + is_worker (bool): Whether the charm is a worker. + datastore (str): The datastore for Kubernetes. """ api_manager: K8sdAPIManager kube_control: KubeControlProvides xcp: ExternalCloudProvider reconciler: Reconciler + is_upgrade_granted: bool + lead_control_plane: bool + is_control_plane: bool + is_worker: bool + datastore: str def get_cluster_name(self) -> str: """Get the cluster name. @@ -33,6 +45,14 @@ def get_cluster_name(self) -> str: """ raise NotImplementedError + def grant_upgrade(self) -> None: + """Grant the upgrade. + + Raises: + NotImplementedError: If the method is not implemented. + """ + raise NotImplementedError + def get_cloud_name(self) -> str: """Get the cloud name. @@ -48,3 +68,11 @@ def _is_node_ready(self) -> bool: NotImplementedError: If the method is not implemented. """ raise NotImplementedError + + def get_worker_versions(self) -> Dict[str, List[ops.Unit]]: + """Get the worker versions. + + Raises: + NotImplementedError: If the method is not implemented. + """ + raise NotImplementedError diff --git a/charms/worker/k8s/src/snap.py b/charms/worker/k8s/src/snap.py index e78291b4..0f19a7c5 100644 --- a/charms/worker/k8s/src/snap.py +++ b/charms/worker/k8s/src/snap.py @@ -19,6 +19,7 @@ import charms.operator_libs_linux.v2.snap as snap_lib import ops import yaml +from protocols import K8sCharmProtocol from pydantic import BaseModel, Field, ValidationError, parse_obj_as, validator from typing_extensions import Annotated @@ -263,7 +264,7 @@ def _parse_management_arguments(charm: ops.CharmBase) -> List[SnapArgument]: return args -def management(charm: ops.CharmBase) -> None: +def management(charm: K8sCharmProtocol) -> None: """Manage snap installations on this machine. Arguments: @@ -272,7 +273,7 @@ def management(charm: ops.CharmBase) -> None: cache = snap_lib.SnapCache() for args in _parse_management_arguments(charm): which = cache[args.name] - if block_refresh(which, args): + if block_refresh(which, args, charm.is_upgrade_granted): continue install_args = args.dict(exclude_none=True) if isinstance(args, SnapFileArgument) and which.revision != "x1": @@ -287,12 +288,13 @@ def management(charm: ops.CharmBase) -> None: which.ensure(**install_args) -def block_refresh(which: snap_lib.Snap, args: SnapArgument) -> bool: +def block_refresh(which: snap_lib.Snap, args: SnapArgument, upgrade_granted: bool = False) -> bool: """Block snap refreshes if the snap is in a specific state. Arguments: which: The snap to check args: The snap arguments + upgrade_granted: If the upgrade is granted Returns: bool: True if the snap should be blocked from refreshing @@ -303,6 +305,9 @@ def block_refresh(which: snap_lib.Snap, args: SnapArgument) -> bool: if _overridden_snap_installation().exists(): log.info("Allowing %s snap refresh due to snap installation override", args.name) return False + if upgrade_granted: + log.info("Allowing %s snap refresh due to upgrade-granted", args.name) + return False if isinstance(args, SnapStoreArgument) and args.revision: if block := which.revision != args.revision: log.info("Blocking %s snap refresh to revision=%s", args.name, args.revision) @@ -342,3 +347,41 @@ def version(snap: str) -> Tuple[Optional[str], bool]: log.info("Snap k8s not found or no version available.") return None, overridden + + +def stop(snap_name: str, services: List[str]) -> None: + """Stop the services of the snap on this machine. + + Arguments: + snap_name: The name of the snap + services: The services to stop + + Raises: + SnapError: If the snap isn't installed + """ + cache = snap_lib.SnapCache() + if snap_name not in cache: + message = f"Snap '{snap_name}' not installed" + log.error(message) + raise snap_lib.SnapError(message) + snap = cache[snap_name] + snap.stop(services=services) + + +def start(snap_name: str, services: List[str]) -> None: + """Start the services of the snap on this machine. + + Arguments: + snap_name: The name of the snap + services: The services to start + + Raises: + SnapError: If the snap isn't installed + """ + cache = snap_lib.SnapCache() + if snap_name not in cache: + message = f"Snap '{snap_name}' not installed" + log.error(message) + raise snap_lib.SnapError(message) + snap = cache[snap_name] + snap.start(services=services) diff --git a/charms/worker/k8s/src/upgrade.py b/charms/worker/k8s/src/upgrade.py index 4c8efef1..9038e262 100644 --- a/charms/worker/k8s/src/upgrade.py +++ b/charms/worker/k8s/src/upgrade.py @@ -4,11 +4,32 @@ """A module for upgrading the k8s and k8s-worker charms.""" import logging -from typing import List - -from charms.data_platform_libs.v0.upgrade import ClusterNotReadyError, DataUpgrade, DependencyModel +from typing import List, Union + +import charms.contextual_status as status +import ops +import reschedule +from charms.data_platform_libs.v0.upgrade import ( + ClusterNotReadyError, + DataUpgrade, + DependencyModel, + UpgradeGrantedEvent, + verify_requirements, +) +from charms.operator_libs_linux.v2.snap import SnapError from inspector import ClusterInspector +from literals import ( + K8S_COMMON_SERVICES, + K8S_CONTROL_PLANE_SERVICES, + K8S_DQLITE_SERVICE, + K8S_WORKER_SERVICES, + SNAP_NAME, +) +from protocols import K8sCharmProtocol from pydantic import BaseModel +from snap import management as snap_management +from snap import start, stop +from snap import version as snap_version log = logging.getLogger(__name__) @@ -28,17 +49,36 @@ class K8sDependenciesModel(BaseModel): class K8sUpgrade(DataUpgrade): """A helper class for upgrading the k8s and k8s-worker charms.""" - def __init__(self, charm, node_manager: ClusterInspector, **kwargs): + def __init__(self, charm: K8sCharmProtocol, cluster_inspector: ClusterInspector, **kwargs): """Initialize the K8sUpgrade. Args: charm: The charm instance. - node_manager: The ClusterInspector instance. + cluster_inspector: The ClusterInspector instance. kwargs: Additional keyword arguments. """ super().__init__(charm, **kwargs) self.charm = charm - self.node_manager = node_manager + self.cluster_inspector = cluster_inspector + + def set_upgrade_status(self, event: ops.UpdateStatusEvent) -> None: + """Set the Juju upgrade status. + + Args: + event: The UpdateStatusEvent instance. + """ + upgrade_status = self.state + if not upgrade_status: + return + if upgrade_status == "upgrading": + if not self.charm.is_upgrade_granted: + self._upgrade(event) + elif upgrade_status == "recovery": + status.add(ops.MaintenanceStatus("Charm is in recovery mode. Please check the logs.")) + return + elif upgrade_status == "failed": + status.add(ops.BlockedStatus("Upgrade Failed. Please check the logs.")) + return def pre_upgrade_check(self) -> None: """Check if the cluster is ready for an upgrade. @@ -50,9 +90,10 @@ def pre_upgrade_check(self) -> None: ClusterNotReadyError: If the cluster is not ready for an upgrade. """ try: - nodes = self.node_manager.get_nodes( + nodes = self.cluster_inspector.get_nodes( labels={"juju-charm": "k8s-worker" if self.charm.is_worker else "k8s"} ) + failing_pods = self.cluster_inspector.verify_pods_running(["kube-system"]) except ClusterInspector.ClusterInspectorError as e: raise ClusterNotReadyError( message="Cluster is not ready for an upgrade", @@ -71,13 +112,115 @@ def pre_upgrade_check(self) -> None: Please check the node(s) for more information.""", ) - if failing_pods := self.node_manager.verify_pods_running(["kube-system"]): + if failing_pods: raise ClusterNotReadyError( message="Cluster is not ready", cause=f"Pods not running in namespace(s): {failing_pods}", resolution="Check the logs for the failing pods.", ) + def _verify_worker_versions(self) -> bool: + """Verify that the k8s-worker charm versions meet the requirements. + + This method verifies that all applications related to the cluster relation + satisfy the requirements of the k8s-worker charm. + + Returns: + bool: True if all worker versions meet the requirements, False otherwise. + """ + worker_versions = self.charm.get_worker_versions() + if not worker_versions: + return True + dependency_model: DependencyModel = getattr(self.dependency_model, "k8s_service") + + incompatible = { + version: units + for version, units in worker_versions.items() + if not verify_requirements( + version=version, requirement=dependency_model.dependencies["k8s-worker"] + ) + } + + if incompatible: + units_str = "\n".join( + f"[{v}]: {', '.join(u.name for u in units)}" for v, units in incompatible.items() + ) + log.error( + "k8s worker charm version requirements not met. Incompatible units: %s", units_str + ) + return False + + return True + + def _perform_upgrade(self, services: List[str]) -> None: + """Perform the upgrade. + + Args: + services: The services to stop and start during the upgrade. + """ + status.add(ops.MaintenanceStatus("Stopping the K8s services")) + stop(SNAP_NAME, services) + status.add(ops.MaintenanceStatus("Upgrading the k8s snap.")) + snap_management(self.charm) + status.add(ops.MaintenanceStatus("Starting the K8s services")) + start(SNAP_NAME, services) + + def _on_upgrade_granted(self, event: UpgradeGrantedEvent) -> None: + """Handle the upgrade granted event. + + Args: + event: The UpgradeGrantedEvent instance. + """ + with status.context(self.charm.unit, exit_status=ops.ActiveStatus("Ready")): + self._upgrade(event) + + def _upgrade(self, event: Union[ops.EventBase, ops.HookEvent]) -> None: + """Upgrade the snap workload.""" + trigger = reschedule.PeriodicEvent(self.charm) + current_version, _ = snap_version("k8s") + + status.add(ops.MaintenanceStatus("Verifying the cluster is ready for an upgrade.")) + if not current_version: + log.error("Failed to get the version of the k8s snap.") + self.set_unit_failed(cause="Failed to get the version of the k8s snap.") + status.add(ops.BlockedStatus("Failed to get the version of the k8s snap.")) + return + + status.add(ops.MaintenanceStatus("Upgrading the charm.")) + + if self.charm.lead_control_plane: + if not self._verify_worker_versions(): + self.set_unit_failed( + cause="The k8s worker charm version does not meet the requirements." + ) + trigger.cancel() + return + + self.charm.grant_upgrade() + + services = ( + K8S_CONTROL_PLANE_SERVICES + K8S_COMMON_SERVICES + if self.charm.is_control_plane + else K8S_COMMON_SERVICES + K8S_WORKER_SERVICES + ) + + if K8S_DQLITE_SERVICE in services and self.charm.datastore == "dqlite": + services.remove(K8S_DQLITE_SERVICE) + + try: + self._perform_upgrade(services=services) + self.set_unit_completed() + + if self.charm.unit.is_leader(): + self.on_upgrade_changed(event) + + trigger.cancel() + except SnapError: + status.add(ops.WaitingStatus("Waiting for the snap to be installed.")) + log.exception("Failed to upgrade the snap. Will retry...") + trigger.create(reschedule.Period(seconds=30)) + return + def build_upgrade_stack(self) -> List[int]: """Return a list of unit numbers to upgrade in order. diff --git a/charms/worker/k8s/templates/snap_installation.yaml b/charms/worker/k8s/templates/snap_installation.yaml index f8528062..828d2da3 100644 --- a/charms/worker/k8s/templates/snap_installation.yaml +++ b/charms/worker/k8s/templates/snap_installation.yaml @@ -10,4 +10,4 @@ arm64: - name: k8s install-type: store channel: edge - classic: true \ No newline at end of file + classic: true diff --git a/charms/worker/k8s/tests/unit/test_upgrade.py b/charms/worker/k8s/tests/unit/test_upgrade.py index 60e66bb4..e55f214c 100644 --- a/charms/worker/k8s/tests/unit/test_upgrade.py +++ b/charms/worker/k8s/tests/unit/test_upgrade.py @@ -6,6 +6,7 @@ import unittest from unittest.mock import MagicMock +import ops from charms.data_platform_libs.v0.upgrade import ClusterNotReadyError from inspector import ClusterInspector from lightkube.models.core_v1 import Node @@ -22,7 +23,7 @@ def setUp(self): self.node_manager = MagicMock(spec=ClusterInspector) self.upgrade = K8sUpgrade( self.charm, - node_manager=self.node_manager, + cluster_inspector=self.node_manager, relation_name="upgrade", substrate="vm", dependency_model=K8sDependenciesModel( @@ -34,9 +35,9 @@ def setUp(self): "version": "100", }, "k8s_service": { - "dependencies": {"k8s-worker": "^3"}, + "dependencies": {"k8s-worker": "^1.30, < 1.32"}, "name": "k8s", - "upgrade_supported": ">=0.8", + "upgrade_supported": "^1.30, < 1.32", "version": "1.31.1", }, } @@ -119,3 +120,27 @@ def test_build_upgrade_stack_with_relation(self): self.assertEqual(sorted(result), [0, 1, 2]) self.charm.model.get_relation.assert_called_once_with("cluster") + + def test_verify_worker_versions_compatible(self): + """Test _verify_worker_versions returns True when worker versions is compatible.""" + unit_1 = MagicMock(spec=ops.Unit) + unit_1.name = "k8s-worker/0" + unit_2 = MagicMock(spec=ops.Unit) + unit_2.name = "k8s-worker/1" + self.charm.get_worker_versions.return_value = {"1.31.0": [unit_1], "1.31.5": [unit_2]} + + result = self.upgrade._verify_worker_versions() + + self.assertTrue(result) + + def test_verify_worker_versions_incompatible(self): + """Test _verify_worker_versions returns False when worker versions is incompatible.""" + unit_1 = MagicMock(spec=ops.Unit) + unit_1.name = "k8s-worker/0" + unit_2 = MagicMock(spec=ops.Unit) + unit_2.name = "k8s-worker/1" + self.charm.get_worker_versions.return_value = {"1.32.0": [unit_1], "1.33.0": [unit_2]} + + result = self.upgrade._verify_worker_versions() + + self.assertFalse(result)