diff --git a/charms/worker/charmcraft.yaml b/charms/worker/charmcraft.yaml index b567c149..26d308d8 100644 --- a/charms/worker/charmcraft.yaml +++ b/charms/worker/charmcraft.yaml @@ -85,6 +85,14 @@ parts: rm -rf $CRAFT_PRIME/lib $CRAFT_PRIME/templates mv $CRAFT_PRIME/k8s/lib $CRAFT_PRIME/lib mv $CRAFT_PRIME/k8s/templates $CRAFT_PRIME/templates +actions: + pre-upgrade-check: + description: Run necessary pre-upgrade checks before executing a charm upgrade. + +peers: + upgrade: + interface: upgrade + provides: cos-agent: diff --git a/charms/worker/k8s/charmcraft.yaml b/charms/worker/k8s/charmcraft.yaml index 0f555fd0..d64ce509 100644 --- a/charms/worker/k8s/charmcraft.yaml +++ b/charms/worker/k8s/charmcraft.yaml @@ -192,6 +192,9 @@ actions: server: description: Override the server endpoint with this field type: string + pre-upgrade-check: + description: Run necessary pre-upgrade checks before executing a charm upgrade. + parts: charm: @@ -203,6 +206,8 @@ peers: interface: k8s-cluster cos-tokens: interface: cos-k8s-tokens + upgrade: + interface: upgrade provides: cos-agent: diff --git a/charms/worker/k8s/lib/charms/data_platform_libs/v0/upgrade.py b/charms/worker/k8s/lib/charms/data_platform_libs/v0/upgrade.py new file mode 100644 index 00000000..4d909d64 --- /dev/null +++ b/charms/worker/k8s/lib/charms/data_platform_libs/v0/upgrade.py @@ -0,0 +1,1102 @@ +# Copyright 2023 Canonical Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +r"""Library to manage in-place upgrades for charms running on VMs and K8s. + +This library contains handlers for `upgrade` relation events used to coordinate +between units in an application during a `juju refresh`, as well as `Pydantic` models +for instantiating, validating and comparing dependencies. + +An upgrade on VMs is initiated with the command `juju refresh`. Once executed, the following +events are emitted to each unit at random: + - `upgrade-charm` + - `config-changed` + - `leader-settings-changed` - Non-leader only + +Charm authors can implement the classes defined in this library to streamline the process of +coordinating which unit updates when, achieved through updating of unit-data `state` throughout. + +At a high-level, the upgrade steps are as follows: + - Run pre-checks on the cluster to confirm it is safe to upgrade + - Create stack of unit.ids, to serve as the upgrade order (generally workload leader is last) + - Start the upgrade by issuing a Juju CLI command + - The unit at the top of the stack gets permission to upgrade + - The unit handles the upgrade and restarts their service + - Repeat, until all units have restarted + +### Usage by charm authors + +#### `upgrade` relation + +Charm authors must implement an additional peer-relation. + +As this library uses relation data exchanged between units to coordinate, charm authors +need to add a new relation interface. The relation name does not matter. + +`metadata.yaml` +```yaml +peers: + upgrade: + interface: upgrade +``` + +#### Dependencies JSON/Dict + +Charm authors must implement a dict object tracking current charm versions, requirements + upgradability. + +Many workload versions may be incompatible with older/newer versions. This same idea also can apply to +charm or snap versions. Workloads with required related applications (e.g Kafka + ZooKeeper) also need to +ensure their versions are compatible during an upgrade, to avoid cluster failure. + +As such, it is necessasry to freeze any dependencies within each published charm. An example of this could +be creating a `DEPENDENCIES` dict within the charm code, with the following structure: + +`src/literals.py` +```python +DEPENDENCIES = { + "kafka_charm": { + "dependencies": {"zookeeper": ">50"}, + "name": "kafka", + "upgrade_supported": ">90", + "version": "100", + }, + "kafka_service": { + "dependencies": {"zookeeper": "^3"}, + "name": "kafka", + "upgrade_supported": ">=0.8", + "version": "3.3.2", + }, +} +``` + +The first-level key names are arbitrary labels for tracking what those versions+dependencies are for. +The `dependencies` second-level values are a key-value map of any required external applications, + and the versions this packaged charm can support. +The `upgrade_suppported` second-level values are requirements from which an in-place upgrade can be + supported by the charm. +The `version` second-level values correspond to the current version of this packaged charm. + +Any requirements comply with [`poetry`'s dependency specifications](https://python-poetry.org/docs/dependency-specification/#caret-requirements). + +### Dependency Model + +Charm authors must implement their own class inheriting from `DependencyModel`. + +Using a `Pydantic` model to instantiate the aforementioned `DEPENDENCIES` dict gives stronger type safety and additional +layers of validation. + +Implementation just needs to ensure that the top-level key names from `DEPENDENCIES` are defined as attributed in the model. + +`src/upgrade.py` +```python +from pydantic import BaseModel + +class KafkaDependenciesModel(BaseModel): + kafka_charm: DependencyModel + kafka_service: DependencyModel +``` + +### Overrides for `DataUpgrade` + +Charm authors must define their own class, inheriting from `DataUpgrade`, overriding all required `abstractmethod`s. + +```python +class ZooKeeperUpgrade(DataUpgrade): + def __init__(self, charm: "ZooKeeperUpgrade", **kwargs): + super().__init__(charm, **kwargs) + self.charm = charm +``` + +#### Implementation of `pre_upgrade_check()` + +Before upgrading a cluster, it's a good idea to check that it is stable and healthy before permitting it. +Here, charm authors can validate upgrade safety through API calls, relation-data checks, etc. +If any of these checks fail, raise `ClusterNotReadyError`. + +```python + @override + def pre_upgrade_check(self) -> None: + default_message = "Pre-upgrade check failed and cannot safely upgrade" + try: + if not self.client.members_broadcasting or not len(self.client.server_members) == len( + self.charm.cluster.peer_units + ): + raise ClusterNotReadyError( + message=default_message, + cause="Not all application units are connected and broadcasting in the quorum", + ) + + if self.client.members_syncing: + raise ClusterNotReadyError( + message=default_message, cause="Some quorum members are syncing data" + ) + + if not self.charm.cluster.stable: + raise ClusterNotReadyError( + message=default_message, cause="Charm has not finished initialising" + ) + + except QuorumLeaderNotFoundError: + raise ClusterNotReadyError(message=default_message, cause="Quorum leader not found") + except ConnectionClosedError: + raise ClusterNotReadyError( + message=default_message, cause="Unable to connect to the cluster" + ) +``` + +#### Implementation of `build_upgrade_stack()` - VM ONLY + +Oftentimes, it is necessary to ensure that the workload leader is the last unit to upgrade, +to ensure high-availability during the upgrade process. +Here, charm authors can create a LIFO stack of unit.ids, represented as a list of unit.id strings, +with the leader unit being at i[0]. + +```python +@override +def build_upgrade_stack(self) -> list[int]: + upgrade_stack = [] + for unit in self.charm.cluster.peer_units: + config = self.charm.cluster.unit_config(unit=unit) + + # upgrade quorum leader last + if config["host"] == self.client.leader: + upgrade_stack.insert(0, int(config["unit_id"])) + else: + upgrade_stack.append(int(config["unit_id"])) + + return upgrade_stack +``` + +#### Implementation of `_on_upgrade_granted()` + +On relation-changed events, each unit will check the current upgrade-stack persisted to relation data. +If that unit is at the top of the stack, it will emit an `upgrade-granted` event, which must be handled. +Here, workloads can be re-installed with new versions, checks can be made, data synced etc. +If the new unit successfully rejoined the cluster, call `set_unit_completed()`. +If the new unit failed to rejoin the cluster, call `set_unit_failed()`. + +NOTE - It is essential here to manually call `on_upgrade_changed` if the unit is the current leader. +This ensures that the leader gets it's own relation-changed event, and updates the upgrade-stack for +other units to follow suit. + +```python +@override +def _on_upgrade_granted(self, event: UpgradeGrantedEvent) -> None: + self.charm.snap.stop_snap_service() + + if not self.charm.snap.install(): + logger.error("Unable to install ZooKeeper Snap") + self.set_unit_failed() + return None + + logger.info(f"{self.charm.unit.name} upgrading service...") + self.charm.snap.restart_snap_service() + + try: + logger.debug("Running post-upgrade check...") + self.pre_upgrade_check() + + logger.debug("Marking unit completed...") + self.set_unit_completed() + + # ensures leader gets it's own relation-changed when it upgrades + if self.charm.unit.is_leader(): + logger.debug("Re-emitting upgrade-changed on leader...") + self.on_upgrade_changed(event) + + except ClusterNotReadyError as e: + logger.error(e.cause) + self.set_unit_failed() +``` + +#### Implementation of `log_rollback_instructions()` + +If the upgrade fails, manual intervention may be required for cluster recovery. +Here, charm authors can log out any necessary steps to take to recover from a failed upgrade. +When a unit fails, this library will automatically log out this message. + +```python +@override +def log_rollback_instructions(self) -> None: + logger.error("Upgrade failed. Please run `juju refresh` to previous version.") +``` + +### Instantiating in the charm and deferring events + +Charm authors must add a class attribute for the child class of `DataUpgrade` in the main charm. +They must also ensure that any non-upgrade related events that may be unsafe to handle during +an upgrade, are deferred if the unit is not in the `idle` state - i.e not currently upgrading. + +```python +class ZooKeeperCharm(CharmBase): + def __init__(self, *args): + super().__init__(*args) + self.upgrade = ZooKeeperUpgrade( + self, + relation_name = "upgrade", + substrate = "vm", + dependency_model=ZooKeeperDependencyModel( + **DEPENDENCIES + ), + ) + + def restart(self, event) -> None: + if not self.upgrade.state == "idle": + event.defer() + return None + + self.restart_snap_service() +``` +""" + +import json +import logging +from abc import ABC, abstractmethod +from typing import Dict, List, Literal, Optional, Set, Tuple + +import poetry.core.constraints.version as poetry_version +from ops.charm import ( + ActionEvent, + CharmBase, + CharmEvents, + RelationCreatedEvent, + UpgradeCharmEvent, +) +from ops.framework import EventBase, EventSource, Object +from ops.model import ActiveStatus, BlockedStatus, MaintenanceStatus, Relation, Unit, WaitingStatus +from pydantic import BaseModel, root_validator, validator + +# The unique Charmhub library identifier, never change it +LIBID = "156258aefb79435a93d933409a8c8684" + +# Increment this major API version when introducing breaking changes +LIBAPI = 0 + +# Increment this PATCH version before using `charmcraft publish-lib` or reset +# to 0 if you are raising the major API version +LIBPATCH = 18 + +PYDEPS = ["pydantic>=1.10,<2", "poetry-core"] + +logger = logging.getLogger(__name__) + +# --- DEPENDENCY RESOLUTION FUNCTIONS --- + + +def verify_requirements(version: str, requirement: str) -> bool: + """Verifies a specified version against defined constraint. + + Supports Poetry version constraints + https://python-poetry.org/docs/dependency-specification/#version-constraints + + Args: + version: the version currently in use + requirement: Poetry version constraint + + Returns: + True if `version` meets defined `requirement`. Otherwise False + """ + return poetry_version.parse_constraint(requirement).allows( + poetry_version.Version.parse(version) + ) + + +# --- DEPENDENCY MODEL TYPES --- + + +class DependencyModel(BaseModel): + """Manager for a single dependency. + + To be used as part of another model representing a collection of arbitrary dependencies. + + Example:: + + class KafkaDependenciesModel(BaseModel): + kafka_charm: DependencyModel + kafka_service: DependencyModel + + deps = { + "kafka_charm": { + "dependencies": {"zookeeper": ">5"}, + "name": "kafka", + "upgrade_supported": ">5", + "version": "10", + }, + "kafka_service": { + "dependencies": {"zookeeper": "^3.6"}, + "name": "kafka", + "upgrade_supported": "~3.3", + "version": "3.3.2", + }, + } + + model = KafkaDependenciesModel(**deps) # loading dict in to model + + print(model.dict()) # exporting back validated deps + """ + + dependencies: Dict[str, str] + name: str + upgrade_supported: str + version: str + + @validator("dependencies", "upgrade_supported", each_item=True) + @classmethod + def dependencies_validator(cls, value): + """Validates version constraint.""" + if isinstance(value, dict): + deps = value.values() + else: + deps = [value] + + for dep in deps: + poetry_version.parse_constraint(dep) + + return value + + @root_validator(skip_on_failure=True) + @classmethod + def version_upgrade_supported_validator(cls, values): + """Validates specified `version` meets `upgrade_supported` requirement.""" + if not verify_requirements( + version=values.get("version"), requirement=values.get("upgrade_supported") + ): + raise ValueError( + f"upgrade_supported value {values.get('upgrade_supported')} greater than version value {values.get('version')} for {values.get('name')}." + ) + + return values + + def can_upgrade(self, dependency: "DependencyModel") -> bool: + """Compares two instances of :class:`DependencyModel` for upgradability. + + Args: + dependency: a dependency model to compare this model against + + Returns: + True if current model can upgrade from dependent model. Otherwise False + """ + return verify_requirements(version=self.version, requirement=dependency.upgrade_supported) + + +# --- CUSTOM EXCEPTIONS --- + + +class UpgradeError(Exception): + """Base class for upgrade related exceptions in the module.""" + + def __init__(self, message: str, cause: Optional[str], resolution: Optional[str]): + super().__init__(message) + self.message = message + self.cause = cause or "" + self.resolution = resolution or "" + + def __repr__(self): + """Representation of the UpgradeError class.""" + return f"{type(self).__module__}.{type(self).__name__} - {str(vars(self))}" + + def __str__(self): + """String representation of the UpgradeError class.""" + return repr(self) + + +class ClusterNotReadyError(UpgradeError): + """Exception flagging that the cluster is not ready to start upgrading. + + For example, if the cluster fails :class:`DataUpgrade._on_pre_upgrade_check_action` + + Args: + message: string message to be logged out + cause: short human-readable description of the cause of the error + resolution: short human-readable instructions for manual error resolution (optional) + """ + + def __init__(self, message: str, cause: str, resolution: Optional[str] = None): + super().__init__(message, cause=cause, resolution=resolution) + + +class KubernetesClientError(UpgradeError): + """Exception flagging that a call to Kubernetes API failed. + + For example, if the cluster fails :class:`DataUpgrade._set_rolling_update_partition` + + Args: + message: string message to be logged out + cause: short human-readable description of the cause of the error + resolution: short human-readable instructions for manual error resolution (optional) + """ + + def __init__(self, message: str, cause: str, resolution: Optional[str] = None): + super().__init__(message, cause=cause, resolution=resolution) + + +class VersionError(UpgradeError): + """Exception flagging that the old `version` fails to meet the new `upgrade_supported`s. + + For example, upgrades from version `2.x` --> `4.x`, + but `4.x` only supports upgrading from `3.x` onwards + + Args: + message: string message to be logged out + cause: short human-readable description of the cause of the error + resolution: short human-readable instructions for manual solutions to the error (optional) + """ + + def __init__(self, message: str, cause: str, resolution: Optional[str] = None): + super().__init__(message, cause=cause, resolution=resolution) + + +class DependencyError(UpgradeError): + """Exception flagging that some new `dependency` is not being met. + + For example, new version requires related App version `2.x`, but currently is `1.x` + + Args: + message: string message to be logged out + cause: short human-readable description of the cause of the error + resolution: short human-readable instructions for manual solutions to the error (optional) + """ + + def __init__(self, message: str, cause: str, resolution: Optional[str] = None): + super().__init__(message, cause=cause, resolution=resolution) + + +# --- CUSTOM EVENTS --- + + +class UpgradeGrantedEvent(EventBase): + """Used to tell units that they can process an upgrade.""" + + +class UpgradeFinishedEvent(EventBase): + """Used to tell units that they finished the upgrade.""" + + +class UpgradeEvents(CharmEvents): + """Upgrade events. + + This class defines the events that the lib can emit. + """ + + upgrade_granted = EventSource(UpgradeGrantedEvent) + upgrade_finished = EventSource(UpgradeFinishedEvent) + + +# --- EVENT HANDLER --- + + +class DataUpgrade(Object, ABC): + """Manages `upgrade` relation operations for in-place upgrades.""" + + STATES = ["recovery", "failed", "idle", "ready", "upgrading", "completed"] + + on = UpgradeEvents() # pyright: ignore [reportAssignmentType] + + def __init__( + self, + charm: CharmBase, + dependency_model: BaseModel, + relation_name: str = "upgrade", + substrate: Literal["vm", "k8s"] = "vm", + ): + super().__init__(charm, relation_name) + self.charm = charm + self.dependency_model = dependency_model + self.relation_name = relation_name + self.substrate = substrate + self._upgrade_stack = None + + # events + self.framework.observe( + self.charm.on[relation_name].relation_created, self._on_upgrade_created + ) + self.framework.observe( + self.charm.on[relation_name].relation_changed, self.on_upgrade_changed + ) + self.framework.observe(self.charm.on.upgrade_charm, self._on_upgrade_charm) + self.framework.observe(getattr(self.on, "upgrade_granted"), self._on_upgrade_granted) + self.framework.observe(getattr(self.on, "upgrade_finished"), self._on_upgrade_finished) + + # actions + self.framework.observe( + getattr(self.charm.on, "pre_upgrade_check_action"), self._on_pre_upgrade_check_action + ) + if self.substrate == "k8s": + self.framework.observe( + getattr(self.charm.on, "resume_upgrade_action"), self._on_resume_upgrade_action + ) + + @property + def peer_relation(self) -> Optional[Relation]: + """The upgrade peer relation.""" + return self.charm.model.get_relation(self.relation_name) + + @property + def app_units(self) -> Set[Unit]: + """The peer-related units in the application.""" + if not self.peer_relation: + return set() + + return set([self.charm.unit] + list(self.peer_relation.units)) + + @property + def state(self) -> Optional[str]: + """The unit state from the upgrade peer relation.""" + if not self.peer_relation: + return None + + return self.peer_relation.data[self.charm.unit].get("state", None) + + @property + def stored_dependencies(self) -> Optional[BaseModel]: + """The application dependencies from the upgrade peer relation.""" + if not self.peer_relation: + return None + + if not (deps := self.peer_relation.data[self.charm.app].get("dependencies", "")): + return None + + return type(self.dependency_model)(**json.loads(deps)) + + @property + def upgrade_stack(self) -> Optional[List[int]]: + """Gets the upgrade stack from the upgrade peer relation. + + Unit.ids are ordered Last-In-First-Out (LIFO). + i.e unit.id at index `-1` is the first unit to upgrade. + unit.id at index `0` is the last unit to upgrade. + + Returns: + List of integer unit.ids, ordered in upgrade order in a stack + """ + if not self.peer_relation: + return None + + # lazy-load + if self._upgrade_stack is None: + self._upgrade_stack = ( + json.loads(self.peer_relation.data[self.charm.app].get("upgrade-stack", "[]")) + or None + ) + + return self._upgrade_stack + + @upgrade_stack.setter + def upgrade_stack(self, stack: List[int]) -> None: + """Sets the upgrade stack to the upgrade peer relation. + + Unit.ids are ordered Last-In-First-Out (LIFO). + i.e unit.id at index `-1` is the first unit to upgrade. + unit.id at index `0` is the last unit to upgrade. + """ + if not self.peer_relation: + return + + self.peer_relation.data[self.charm.app].update({"upgrade-stack": json.dumps(stack)}) + self._upgrade_stack = stack + + @property + def other_unit_states(self) -> list: + """Current upgrade state for other units. + + Returns: + Unsorted list of upgrade states for other units. + """ + if not self.peer_relation: + return [] + + return [ + self.peer_relation.data[unit].get("state", "") + for unit in list(self.peer_relation.units) + ] + + @property + def unit_states(self) -> list: + """Current upgrade state for all units. + + Returns: + Unsorted list of upgrade states for all units. + """ + if not self.peer_relation: + return [] + + return [self.peer_relation.data[unit].get("state", "") for unit in self.app_units] + + @property + def cluster_state(self) -> Optional[str]: + """Current upgrade state for cluster units. + + Determined from :class:`DataUpgrade.STATE`, taking the lowest ordinal unit state. + + For example, if units in have states: `["ready", "upgrading", "completed"]`, + the overall state for the cluster is `ready`. + + Returns: + String of upgrade state from the furthest behind unit. + """ + if not self.unit_states: + return None + + try: + return sorted(self.unit_states, key=self.STATES.index)[0] + except (ValueError, KeyError): + return None + + @property + def idle(self) -> Optional[bool]: + """Flag for whether the cluster is in an idle upgrade state. + + Returns: + True if all application units in idle state. Otherwise False + """ + return set(self.unit_states) == {"idle"} + + @abstractmethod + def pre_upgrade_check(self) -> None: + """Runs necessary checks validating the cluster is in a healthy state to upgrade. + + Called by all units during :meth:`_on_pre_upgrade_check_action`. + + Raises: + :class:`ClusterNotReadyError`: if cluster is not ready to upgrade + """ + pass + + def build_upgrade_stack(self) -> List[int]: + """Builds ordered iterable of all application unit.ids to upgrade in. + + Called by leader unit during :meth:`_on_pre_upgrade_check_action`. + + Returns: + Iterable of integer unit.ids, LIFO ordered in upgrade order + i.e `[5, 2, 4, 1, 3]`, unit `3` upgrades first, `5` upgrades last + """ + # don't raise if k8s substrate, uses default statefulset order + if self.substrate == "k8s": + return [] + + raise NotImplementedError + + @abstractmethod + def log_rollback_instructions(self) -> None: + """Sets charm state and logs out rollback instructions. + + Called by all units when `state=failed` found during :meth:`_on_upgrade_changed`. + """ + pass + + def _repair_upgrade_stack(self) -> None: + """Ensures completed units are re-added to the upgrade-stack after failure.""" + # need to update the stack as it was not refreshed by rollback run of pre-upgrade-check + # avoids difficult health check implementation by charm-authors needing to exclude dead units + + # if the first unit in the stack fails, the stack will be the same length as units + # i.e this block not ran + if ( + self.cluster_state in ["failed", "recovery"] + and self.upgrade_stack + and len(self.upgrade_stack) != len(self.app_units) + and self.charm.unit.is_leader() + ): + new_stack = self.upgrade_stack + for unit in self.app_units: + unit_id = int(unit.name.split("/")[1]) + + # if a unit fails, it rolls back first + if unit_id not in new_stack: + new_stack.insert(-1, unit_id) + logger.debug(f"Inserted {unit_id} in to upgrade-stack - {new_stack}") + + self.upgrade_stack = new_stack + + def set_unit_failed(self, cause: Optional[str] = None) -> None: + """Sets unit `state=failed` to the upgrade peer data. + + Args: + cause: short description of cause of failure + """ + if not self.peer_relation: + return None + + # needed to refresh the stack + # now leader pulls a fresh stack from newly updated relation data + if self.charm.unit.is_leader(): + self._upgrade_stack = None + + self.charm.unit.status = BlockedStatus(cause if cause else "") + self.peer_relation.data[self.charm.unit].update({"state": "failed"}) + self.log_rollback_instructions() + + def set_unit_completed(self) -> None: + """Sets unit `state=completed` to the upgrade peer data.""" + if not self.peer_relation: + return None + + # needed to refresh the stack + # now leader pulls a fresh stack from newly updated relation data + if self.charm.unit.is_leader(): + self._upgrade_stack = None + + self.charm.unit.status = MaintenanceStatus("upgrade completed") + self.peer_relation.data[self.charm.unit].update({"state": "completed"}) + + # Emit upgrade_finished event to run unit's post upgrade operations. + if self.substrate == "k8s": + logger.debug( + f"{self.charm.unit.name} has completed the upgrade, emitting `upgrade_finished` event..." + ) + getattr(self.on, "upgrade_finished").emit() + + def _on_upgrade_created(self, event: RelationCreatedEvent) -> None: + """Handler for `upgrade-relation-created` events.""" + if not self.peer_relation: + event.defer() + return + + # setting initial idle state needed to avoid execution on upgrade-changed events + self.peer_relation.data[self.charm.unit].update({"state": "idle"}) + + if self.charm.unit.is_leader(): + logger.debug("Persisting dependencies to upgrade relation data...") + self.peer_relation.data[self.charm.app].update( + {"dependencies": json.dumps(self.dependency_model.dict())} + ) + + def _on_pre_upgrade_check_action(self, event: ActionEvent) -> None: + """Handler for `pre-upgrade-check-action` events.""" + if not self.peer_relation: + event.fail(message="Could not find upgrade relation.") + return + + if not self.charm.unit.is_leader(): + event.fail(message="Action must be ran on the Juju leader.") + return + + if self.cluster_state == "failed": + logger.info("Entering recovery state for rolling-back to previous version...") + self._repair_upgrade_stack() + self.charm.unit.status = BlockedStatus("ready to rollback application") + self.peer_relation.data[self.charm.unit].update({"state": "recovery"}) + return + + # checking if upgrade in progress + if self.cluster_state != "idle": + event.fail("Cannot run pre-upgrade checks, cluster already upgrading.") + return + + try: + logger.info("Running pre-upgrade-check...") + self.pre_upgrade_check() + + if self.substrate == "k8s": + logger.info("Building upgrade-stack for K8s...") + built_upgrade_stack = sorted( + [int(unit.name.split("/")[1]) for unit in self.app_units] + ) + else: + logger.info("Building upgrade-stack for VMs...") + built_upgrade_stack = self.build_upgrade_stack() + + logger.debug(f"Built upgrade stack of {built_upgrade_stack}") + + except ClusterNotReadyError as e: + logger.error(e) + event.fail(message=e.message) + return + except Exception as e: + logger.error(e) + event.fail(message="Unknown error found.") + return + + logger.info("Setting upgrade-stack to relation data...") + self.upgrade_stack = built_upgrade_stack + + def _on_resume_upgrade_action(self, event: ActionEvent) -> None: + """Handle resume upgrade action. + + Continue the upgrade by setting the partition to the next unit. + """ + if not self.peer_relation: + event.fail(message="Could not find upgrade relation.") + return + + if not self.charm.unit.is_leader(): + event.fail(message="Action must be ran on the Juju leader.") + return + + if not self.upgrade_stack: + event.fail(message="Nothing to resume, upgrade stack unset.") + return + + # Check whether this is being run after juju refresh was called + # (the size of the upgrade stack should match the number of total + # unit minus one). + if len(self.upgrade_stack) != len(self.peer_relation.units): + event.fail(message="Upgrade can be resumed only once after juju refresh is called.") + return + + try: + next_partition = self.upgrade_stack[-1] + self._set_rolling_update_partition(partition=next_partition) + event.set_results({"message": f"Upgrade will resume on unit {next_partition}"}) + except KubernetesClientError: + event.fail(message="Cannot set rolling update partition.") + + def _upgrade_supported_check(self) -> None: + """Checks if previous versions can be upgraded to new versions. + + Raises: + :class:`VersionError` if upgrading to existing `version` is not supported + """ + keys = self.dependency_model.__fields__.keys() + + compatible = True + incompatibilities: List[Tuple[str, str, str, str]] = [] + for key in keys: + old_dep: DependencyModel = getattr(self.stored_dependencies, key) + new_dep: DependencyModel = getattr(self.dependency_model, key) + + if not old_dep.can_upgrade(dependency=new_dep): + compatible = False + incompatibilities.append( + (key, old_dep.version, new_dep.version, new_dep.upgrade_supported) + ) + + base_message = "Versions incompatible" + base_cause = "Upgrades only supported for specific versions" + if not compatible: + for incompat in incompatibilities: + base_message += ( + f", {incompat[0]} {incompat[1]} can not be upgraded to {incompat[2]}" + ) + base_cause += f", {incompat[0]} versions satisfying requirement {incompat[3]}" + + raise VersionError( + message=base_message, + cause=base_cause, + ) + + def _on_upgrade_charm(self, event: UpgradeCharmEvent) -> None: + """Handler for `upgrade-charm` events.""" + # defer if not all units have pre-upgraded + if not self.peer_relation: + event.defer() + return + + if not self.upgrade_stack: + logger.error("Cluster upgrade failed, ensure pre-upgrade checks are ran first.") + return + + if self.substrate == "vm": + # for VM run version checks on leader only + if self.charm.unit.is_leader(): + try: + self._upgrade_supported_check() + except VersionError as e: # not ready if not passed check + logger.error(e) + self.set_unit_failed() + return + top_unit_id = self.upgrade_stack[-1] + top_unit = self.charm.model.get_unit(f"{self.charm.app.name}/{top_unit_id}") + if ( + top_unit == self.charm.unit + and self.peer_relation.data[self.charm.unit].get("state") == "recovery" + ): + # While in a rollback and the Juju leader unit is the top unit in the upgrade stack, emit the event + # for this unit to start the rollback. + self.peer_relation.data[self.charm.unit].update({"state": "ready"}) + self.on_upgrade_changed(event) + return + self.charm.unit.status = WaitingStatus("other units upgrading first...") + self.peer_relation.data[self.charm.unit].update({"state": "ready"}) + + if len(self.app_units) == 1: + # single unit upgrade, emit upgrade_granted event right away + getattr(self.on, "upgrade_granted").emit() + + else: + # for k8s run version checks only on highest ordinal unit + if ( + self.charm.unit.name + == f"{self.charm.app.name}/{self.charm.app.planned_units() -1}" + ): + try: + self._upgrade_supported_check() + except VersionError as e: # not ready if not passed check + logger.error(e) + self.set_unit_failed() + return + # On K8s an unit that receives the upgrade-charm event is upgrading + self.charm.unit.status = MaintenanceStatus("upgrading unit") + self.peer_relation.data[self.charm.unit].update({"state": "upgrading"}) + + def on_upgrade_changed(self, event: EventBase) -> None: + """Handler for `upgrade-relation-changed` events.""" + if not self.peer_relation: + return + + # if any other unit failed, don't continue with upgrade + if self.cluster_state == "failed": + logger.debug("Cluster failed to upgrade, exiting...") + return + + if self.substrate == "vm" and self.cluster_state == "recovery": + # skip run while in recovery. The event will be retrigged when the cluster is ready + logger.debug("Cluster in recovery, skip...") + return + + # if all units completed, mark as complete + if not self.upgrade_stack: + if self.state == "completed" and self.cluster_state in ["idle", "completed"]: + logger.info("All units completed upgrade, setting idle upgrade state...") + self.charm.unit.status = ActiveStatus() + self.peer_relation.data[self.charm.unit].update({"state": "idle"}) + + if self.charm.unit.is_leader(): + logger.debug("Persisting new dependencies to upgrade relation data...") + self.peer_relation.data[self.charm.app].update( + {"dependencies": json.dumps(self.dependency_model.dict())} + ) + return + + if self.cluster_state == "idle": + logger.debug("upgrade-changed event handled before pre-checks, exiting...") + return + + logger.debug("Did not find upgrade-stack or completed cluster state, skipping...") + return + + # upgrade ongoing, set status for waiting units + if "upgrading" in self.unit_states and self.state in ["idle", "ready"]: + self.charm.unit.status = WaitingStatus("other units upgrading first...") + + # pop mutates the `upgrade_stack` attr + top_unit_id = self.upgrade_stack.pop() + top_unit = self.charm.model.get_unit(f"{self.charm.app.name}/{top_unit_id}") + top_state = self.peer_relation.data[top_unit].get("state") + + # if top of stack is completed, leader pops it + if self.charm.unit.is_leader() and top_state == "completed": + logger.debug(f"{top_unit} has finished upgrading, updating stack...") + + # writes the mutated attr back to rel data + self.peer_relation.data[self.charm.app].update( + {"upgrade-stack": json.dumps(self.upgrade_stack)} + ) + + # recurse on leader to ensure relation changed event not lost + # in case leader is next or the last unit to complete + self.on_upgrade_changed(event) + + # if unit top of stack and all units ready (i.e stack), emit granted event + if ( + self.charm.unit == top_unit + and top_state in ["ready", "upgrading"] + and self.cluster_state == "ready" + and "upgrading" not in self.other_unit_states + ): + logger.debug( + f"{top_unit.name} is next to upgrade, emitting `upgrade_granted` event and upgrading..." + ) + self.charm.unit.status = MaintenanceStatus("upgrading...") + self.peer_relation.data[self.charm.unit].update({"state": "upgrading"}) + + try: + getattr(self.on, "upgrade_granted").emit() + except DependencyError as e: + logger.error(e) + self.set_unit_failed() + return + + def _on_upgrade_granted(self, event: UpgradeGrantedEvent) -> None: + """Handler for `upgrade-granted` events. + + Handlers of this event must meet the following: + - SHOULD check for related application deps from :class:`DataUpgrade.dependencies` + - MAY raise :class:`DependencyError` if dependency not met + - MUST update unit `state` after validating the success of the upgrade, calling one of: + - :class:`DataUpgrade.set_unit_failed` if the unit upgrade fails + - :class:`DataUpgrade.set_unit_completed` if the unit upgrade succeeds + - MUST call :class:`DataUpgarde.on_upgrade_changed` on exit so event not lost on leader + """ + # don't raise if k8s substrate, only return + if self.substrate == "k8s": + return + + raise NotImplementedError + + def _on_upgrade_finished(self, _) -> None: + """Handler for `upgrade-finished` events.""" + if self.substrate == "vm" or not self.peer_relation: + return + + # Emit the upgrade relation changed event in the leader to update the upgrade_stack. + if self.charm.unit.is_leader(): + self.charm.on[self.relation_name].relation_changed.emit( + self.model.get_relation(self.relation_name) + ) + + # This hook shouldn't run for the last unit (the first that is upgraded). For that unit it + # should be done through an action after the upgrade success on that unit is double-checked. + unit_number = int(self.charm.unit.name.split("/")[1]) + if unit_number == len(self.peer_relation.units): + logger.info( + f"{self.charm.unit.name} unit upgraded. Evaluate and run `resume-upgrade` action to continue upgrade" + ) + return + + # Also, the hook shouldn't run for the first unit (the last that is upgraded). + if unit_number == 0: + logger.info(f"{self.charm.unit.name} unit upgraded. Upgrade is complete") + return + + try: + # Use the unit number instead of the upgrade stack to avoid race conditions + # (i.e. the leader updates the upgrade stack after this hook runs). + next_partition = unit_number - 1 + logger.debug(f"Set rolling update partition to unit {next_partition}") + self._set_rolling_update_partition(partition=next_partition) + except KubernetesClientError: + logger.exception("Cannot set rolling update partition") + self.set_unit_failed() + self.log_rollback_instructions() + + def _set_rolling_update_partition(self, partition: int) -> None: + """Patch the StatefulSet's `spec.updateStrategy.rollingUpdate.partition`. + + Args: + partition: partition to set. + + K8s only. It should decrement the rolling update strategy partition by using a code + like the following: + + from lightkube.core.client import Client + from lightkube.core.exceptions import ApiError + from lightkube.resources.apps_v1 import StatefulSet + + try: + patch = {"spec": {"updateStrategy": {"rollingUpdate": {"partition": partition}}}} + Client().patch(StatefulSet, name=self.charm.model.app.name, namespace=self.charm.model.name, obj=patch) + logger.debug(f"Kubernetes StatefulSet partition set to {partition}") + except ApiError as e: + if e.status.code == 403: + cause = "`juju trust` needed" + else: + cause = str(e) + raise KubernetesClientError("Kubernetes StatefulSet patch failed", cause) + """ + if self.substrate == "vm": + return + + raise NotImplementedError diff --git a/charms/worker/k8s/requirements.txt b/charms/worker/k8s/requirements.txt index 8f2e0805..01a13b16 100644 --- a/charms/worker/k8s/requirements.txt +++ b/charms/worker/k8s/requirements.txt @@ -10,3 +10,5 @@ tomli ==2.1.0 tomli-w == 1.0.0 typing_extensions==4.12.2 websocket-client==1.8.0 +poetry-core==1.9.1 +lightkube==0.15.5 diff --git a/charms/worker/k8s/src/charm.py b/charms/worker/k8s/src/charm.py index 98a2ccec..1347a1ba 100755 --- a/charms/worker/k8s/src/charm.py +++ b/charms/worker/k8s/src/charm.py @@ -56,10 +56,13 @@ from charms.node_base import LabelMaker from charms.reconciler import Reconciler from cos_integration import COSIntegration +from inspector import ClusterInspector +from literals import DEPENDENCIES from snap import management as snap_management from snap import version as snap_version from token_distributor import ClusterTokenType, TokenCollector, TokenDistributor, TokenStrategy from typing_extensions import Literal +from upgrade import K8sDependenciesModel, K8sUpgrade # Log messages can be retrieved using juju debug-log log = logging.getLogger(__name__) @@ -126,6 +129,14 @@ def __init__(self, *args): self.api_manager = K8sdAPIManager(factory) xcp_relation = "external-cloud-provider" if self.is_control_plane else "" self.xcp = ExternalCloudProvider(self, xcp_relation) + self.cluster_inspector = ClusterInspector(kubeconfig_path=KUBECONFIG) + self.upgrade = K8sUpgrade( + self, + node_manager=self.cluster_inspector, + relation_name="upgrade", + substrate="vm", + dependency_model=K8sDependenciesModel(**DEPENDENCIES), + ) self.cos = COSIntegration(self) self.reconciler = Reconciler(self, self._reconcile) self.distributor = TokenDistributor(self, self.get_node_name(), self.api_manager) @@ -293,7 +304,7 @@ def _bootstrap_k8s_snap(self): log.info("K8s cluster already bootstrapped") return - bootstrap_config = BootstrapConfig() + bootstrap_config = BootstrapConfig.construct() self._configure_datastore(bootstrap_config) self._configure_cloud_provider(bootstrap_config) self._configure_cluster_config(bootstrap_config) @@ -633,6 +644,7 @@ def _update_kubernetes_version(self): """ relation = self.model.get_relation("cluster") if not relation: + status.add(ops.BlockedStatus("Missing cluster integration")) raise ReconcilerError("Missing cluster integration") if version := snap_version("k8s"): relation.data[self.unit]["version"] = version diff --git a/charms/worker/k8s/src/inspector.py b/charms/worker/k8s/src/inspector.py new file mode 100644 index 00000000..1b3f1a18 --- /dev/null +++ b/charms/worker/k8s/src/inspector.py @@ -0,0 +1,94 @@ +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. + +"""A module for inspecting a Kubernetes cluster.""" + +import logging +from pathlib import Path +from typing import List, Optional + +from lightkube import ApiError, Client, KubeConfig +from lightkube.core.client import LabelSelector +from lightkube.resources.core_v1 import Node, Pod + +log = logging.getLogger(__name__) + + +class ClusterInspector: + """A helper class for inspecting a Kubernetes cluster.""" + + class ClusterInspectorError(Exception): + """Base exception for ClusterInspector errors.""" + + def __init__( + self, + kubeconfig_path: Path, + ): + """Initialize the ClusterInspector. + + Args: + kubeconfig_path: The path to the kubeconfig file. + """ + self.kubeconfig_path = kubeconfig_path + # NOTE (mateoflorido): The client is set to None to avoid + # initializing it when the object is created (e.g. during + # the charm install as we don't have the kubeconfig yet). + # The client will be initialized when it's needed using the + # _get_client method. + self.client: Optional[Client] = None + + def _get_client(self) -> Client: + """Return the client instance.""" + if self.client is None: + config = KubeConfig.from_file(str(self.kubeconfig_path)) + self.client = Client(config=config.get()) + return self.client + + def get_nodes(self, labels: LabelSelector) -> Optional[List[Node]]: + """Get nodes from the cluster. + + Args: + labels: A dictionary of labels to filter nodes. + + Returns: + A list of the failed nodes that match the label selector. + + Raises: + ClusterInspectorError: If the nodes cannot be retrieved. + """ + client = self._get_client() + unready_nodes = [] + try: + for node in client.list(Node, labels=labels): + if node.status != "Ready": + unready_nodes.append(node) + except ApiError as e: + raise ClusterInspector.ClusterInspectorError(f"Failed to get nodes: {e}") from e + return unready_nodes or None + + def verify_pods_running(self, namespaces: List[str]) -> Optional[str]: + """Verify that all pods in the specified namespaces are running. + + Args: + namespaces: A list of namespaces to check. + + Returns: + None if all pods are running, otherwise returns a string + containing the namespaces that have pods not running. + + Raises: + ClusterInspectorError: If the pods cannot be retrieved. + """ + client = self._get_client() + + failing_pods = [] + try: + for namespace in namespaces: + for pod in client.list(Pod, namespace=namespace): + if pod.status.phase != "Running": # type: ignore + failing_pods.append(f"{namespace}/{pod.metadata.name}") # type: ignore + if failing_pods: + return ", ".join(failing_pods) + except ApiError as e: + raise ClusterInspector.ClusterInspectorError(f"Failed to get pods: {e}") from e + return None diff --git a/charms/worker/k8s/src/literals.py b/charms/worker/k8s/src/literals.py new file mode 100644 index 00000000..656f95af --- /dev/null +++ b/charms/worker/k8s/src/literals.py @@ -0,0 +1,21 @@ +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Literals for the charm.""" + +DEPENDENCIES = { + # NOTE: Update the dependencies for the k8s-charm before releasing. + "k8s_charm": { + "dependencies": {"k8s-worker": ">2"}, + "name": "k8s", + "upgrade_supported": ">=1", + "version": "2", + }, + # NOTE: Update the dependencies for the k8s-service before releasing. + "k8s_service": { + "dependencies": {"k8s-worker": "^1.31.0"}, + "name": "k8s", + "upgrade_supported": "^1.30.0", + "version": "1.31.2", + }, +} diff --git a/charms/worker/k8s/src/token_distributor.py b/charms/worker/k8s/src/token_distributor.py index d507d606..257e1de8 100644 --- a/charms/worker/k8s/src/token_distributor.py +++ b/charms/worker/k8s/src/token_distributor.py @@ -35,9 +35,20 @@ class K8sCharm(Protocol): unit (ops.Unit): The unit object. """ - app: ops.Application - model: ops.Model - unit: ops.Unit + @property + def app(self) -> ops.Application: + """The application object.""" + ... # pylint: disable=unnecessary-ellipsis + + @property + def model(self) -> ops.Model: + """The model object.""" + ... # pylint: disable=unnecessary-ellipsis + + @property + def unit(self) -> ops.Unit: + """The unit object.""" + ... # pylint: disable=unnecessary-ellipsis def get_cluster_name(self) -> str: """Get the cluster name.""" diff --git a/charms/worker/k8s/src/upgrade.py b/charms/worker/k8s/src/upgrade.py new file mode 100644 index 00000000..fc2548f6 --- /dev/null +++ b/charms/worker/k8s/src/upgrade.py @@ -0,0 +1,99 @@ +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. + +"""A module for upgrading the k8s and k8s-worker charms.""" + +import logging +from typing import List + +from charms.data_platform_libs.v0.upgrade import ClusterNotReadyError, DataUpgrade, DependencyModel +from inspector import ClusterInspector +from pydantic import BaseModel + +log = logging.getLogger(__name__) + + +class K8sDependenciesModel(BaseModel): + """A model for the k8s and k8s-worker charm dependencies. + + Attributes: + k8s_charm: The k8s charm dependency model. + k8s_service: The k8s-service charm dependency model. + """ + + k8s_charm: DependencyModel + k8s_service: DependencyModel + + +class K8sUpgrade(DataUpgrade): + """A helper class for upgrading the k8s and k8s-worker charms.""" + + def __init__(self, charm, node_manager: ClusterInspector, **kwargs): + """Initialize the K8sUpgrade. + + Args: + charm: The charm instance. + node_manager: The ClusterInspector instance. + kwargs: Additional keyword arguments. + """ + super().__init__(charm, **kwargs) + self.charm = charm + self.node_manager = node_manager + + def pre_upgrade_check(self) -> None: + """Check if the cluster is ready for an upgrade. + + It verifies that the cluster nodes are ready before proceeding and + if the pods in the specified namespace are ready. + + Raises: + ClusterNotReadyError: If the cluster is not ready for an upgrade. + """ + try: + nodes = self.node_manager.get_nodes( + labels={"juju-charm": "k8s-worker" if self.charm.is_worker else "k8s"} + ) + except ClusterInspector.ClusterInspectorError as e: + raise ClusterNotReadyError( + message="Cluster is not ready for an upgrade", + cause=str(e), + resolution="""API server may not be reachable. + Please check that the API server is up and running.""", + ) from e + + unready_nodes = nodes or [] + + if unready_nodes: + raise ClusterNotReadyError( + message="Cluster is not ready for an upgrade", + cause=f"Nodes not ready: {', '.join(unready_nodes)}", + resolution="""Node(s) may be in a bad state. + Please check the node(s) for more information.""", + ) + + if failing_pods := self.node_manager.verify_pods_running(["kube-system"]): + raise ClusterNotReadyError( + message="Cluster is not ready", + cause=f"Pods not running in namespace(s): {failing_pods}", + resolution="Check the logs for the failing pods.", + ) + + def build_upgrade_stack(self) -> List[int]: + """Return a list of unit numbers to upgrade in order. + + Returns: + A list of unit numbers to upgrade in order. + """ + relation = self.charm.model.get_relation("cluster") + if not relation: + return [int(self.charm.unit.name.split("/")[-1])] + + return [ + int(unit.name.split("/")[-1]) for unit in ({self.charm.unit} | set(relation.units)) + ] + + def log_rollback_instructions(self) -> None: + """Log instructions for rolling back the upgrade.""" + log.critical( + "To rollback the upgrade, run: `juju refresh` to the previously deployed revision." + ) diff --git a/charms/worker/k8s/tests/unit/test_inspector.py b/charms/worker/k8s/tests/unit/test_inspector.py new file mode 100644 index 00000000..2e532095 --- /dev/null +++ b/charms/worker/k8s/tests/unit/test_inspector.py @@ -0,0 +1,101 @@ +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Tests for the inspector module.""" + +import unittest +from pathlib import Path +from typing import List +from unittest.mock import MagicMock + +from inspector import ClusterInspector +from lightkube.core.exceptions import ApiError +from lightkube.resources.core_v1 import Node, Pod + + +class TestClusterInspector(unittest.TestCase): + """Tests for the ClusterInspector class.""" + + def setUp(self): + """Set up common test fixtures.""" + self.inspector = ClusterInspector(Path("/path/to/kubeconfig")) + self.mock_client = MagicMock() + self.inspector.client = self.mock_client + + def test_get_nodes_returns_unready(self): + """Test that get_nodes returns unready nodes.""" + mock_node1 = MagicMock(spec=Node) + mock_node1.status = "Ready" + mock_node1.metadata.name = "node1" + + mock_node2 = MagicMock(spec=Node) + mock_node2.status = "NotReady" + mock_node2.metadata.name = "node2" + + self.mock_client.list.return_value = [mock_node1, mock_node2] + + nodes: List[Node] = self.inspector.get_nodes({"role": "control-plane"}) + + self.mock_client.list.assert_called_once_with(Node, labels={"role": "control-plane"}) + self.assertEqual(len(nodes), 1) + # pylint: disable=unsubscriptable-object + self.assertEqual(nodes[0].metadata.name, "node2") # type: ignore + + def test_get_nodes_api_error(self): + """Test get_nodes handles API errors.""" + self.mock_client.list.side_effect = ApiError(response=MagicMock()) + with self.assertRaises(ClusterInspector.ClusterInspectorError): + self.inspector.get_nodes({"role": "control-plane"}) + + def test_verify_pods_running_failed_pods(self): + """Test verify_pods_running when some pods are not running.""" + mock_pod = MagicMock(spec=Pod) + mock_pod.status.phase = "Running" + mock_pod.metadata.name = "pod1" + + mock_pod2 = MagicMock(spec=Pod) + mock_pod2.status.phase = "Failed" + mock_pod2.metadata.name = "pod2" + + self.mock_client.list.return_value = [mock_pod, mock_pod2] + + result = self.inspector.verify_pods_running(["kube-system"]) + + self.assertEqual(result, "kube-system/pod2") + self.mock_client.list.assert_called_once_with(Pod, namespace="kube-system") + + def test_verify_pods_running_multiple_namespaces(self): + """Test verify_pods_running with multiple namespaces.""" + + def mock_list_pods(_, namespace): + """Mock the list method to return pods in different states. + + Args: + namespace: The namespace to list pods from. + + Returns: + A list of pods in different states. + """ + if namespace == "ns1": + mock_pod = MagicMock(spec=Pod) + mock_pod.status.phase = "Running" + mock_pod.metadata.name = "pod1" + return [mock_pod] + mock_pod = MagicMock(spec=Pod) + mock_pod.status.phase = "Failed" + mock_pod.metadata.name = "pod2" + return [mock_pod] + + self.mock_client.list.side_effect = mock_list_pods + + result = self.inspector.verify_pods_running(["ns1", "ns2"]) + + self.assertEqual(result, "ns2/pod2") + self.assertEqual(self.mock_client.list.call_count, 2) + + def test_verify_pods_running_api_error(self): + """Test verify_pods_running handles API errors.""" + self.mock_client.list.side_effect = ApiError(response=MagicMock()) + + with self.assertRaises(ClusterInspector.ClusterInspectorError): + self.inspector.verify_pods_running(["default"]) diff --git a/charms/worker/k8s/tests/unit/test_upgrade.py b/charms/worker/k8s/tests/unit/test_upgrade.py new file mode 100644 index 00000000..edeb5003 --- /dev/null +++ b/charms/worker/k8s/tests/unit/test_upgrade.py @@ -0,0 +1,119 @@ +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Tests for the upgrade module.""" + +import unittest +from unittest.mock import MagicMock + +from charms.data_platform_libs.v0.upgrade import ClusterNotReadyError +from inspector import ClusterInspector +from upgrade import K8sDependenciesModel, K8sUpgrade + + +class TestK8sUpgrade(unittest.TestCase): + """Tests for the K8sUpgrade class.""" + + def setUp(self): + """Set up common test fixtures.""" + self.charm = MagicMock() + self.node_manager = MagicMock(spec=ClusterInspector) + self.upgrade = K8sUpgrade( + self.charm, + node_manager=self.node_manager, + relation_name="upgrade", + substrate="vm", + dependency_model=K8sDependenciesModel( + **{ + "k8s_charm": { + "dependencies": {"k8s-worker": ">50"}, + "name": "k8s", + "upgrade_supported": ">90", + "version": "100", + }, + "k8s_service": { + "dependencies": {"k8s-worker": "^3"}, + "name": "k8s", + "upgrade_supported": ">=0.8", + "version": "1.31.1", + }, + } + ), + ) + + def test_pre_upgrade_check_worker_success(self): + """Test pre_upgrade_check succeeds for worker nodes.""" + self.charm.is_worker = True + self.node_manager.get_nodes.return_value = [] + self.node_manager.verify_pods_running.return_value = None + + self.upgrade.pre_upgrade_check() + + self.node_manager.get_nodes.assert_called_once_with(labels={"juju-charm": "k8s-worker"}) + self.node_manager.verify_pods_running.assert_called_once_with(["kube-system"]) + + def test_pre_upgrade_check_control_plane_success(self): + """Test pre_upgrade_check succeeds for control plane nodes.""" + self.charm.is_worker = False + self.node_manager.get_nodes.return_value = [] + self.node_manager.verify_pods_running.return_value = None + + self.upgrade.pre_upgrade_check() + + self.node_manager.get_nodes.assert_called_once_with(labels={"juju-charm": "k8s"}) + + def test_pre_upgrade_check_unready_nodes(self): + """Test pre_upgrade_check fails when nodes are not ready.""" + self.charm.is_worker = True + self.node_manager.get_nodes.return_value = [ + "worker-1", + "worker-2", + "worker-3", + ] + + with self.assertRaises(ClusterNotReadyError): + self.upgrade.pre_upgrade_check() + + def test_pre_upgrade_check_cluster_inspector_error(self): + """Test pre_upgrade_check handles ClusterInspectorError.""" + self.node_manager.get_nodes.side_effect = ClusterInspector.ClusterInspectorError( + "test error" + ) + + with self.assertRaises(ClusterNotReadyError): + self.upgrade.pre_upgrade_check() + + def test_pre_upgrade_check_pods_not_ready(self): + """Test pre_upgrade_check fails when pods are not ready.""" + self.charm.is_worker = True + self.node_manager.get_nodes.return_value = None + self.node_manager.verify_pods_running.return_value = "kube-system/pod-1" + + with self.assertRaises(ClusterNotReadyError): + self.upgrade.pre_upgrade_check() + + def test_build_upgrade_stack_no_relation(self): + """Test build_upgrade_stack when no cluster relation exists.""" + self.charm.unit.name = "k8s/0" + self.charm.model.get_relation.return_value = None + + result = self.upgrade.build_upgrade_stack() + + self.assertEqual(result, [0]) + self.charm.model.get_relation.assert_called_once_with("cluster") + + def test_build_upgrade_stack_with_relation(self): + """Test build_upgrade_stack with cluster relation.""" + self.charm.unit.name = "k8s/0" + relation = MagicMock() + unit_1 = MagicMock() + unit_1.name = "k8s/1" + unit_2 = MagicMock() + unit_2.name = "k8s/2" + relation.units = {unit_1, unit_2} + self.charm.model.get_relation.return_value = relation + + result = self.upgrade.build_upgrade_stack() + + self.assertEqual(sorted(result), [0, 1, 2]) + self.charm.model.get_relation.assert_called_once_with("cluster")