From 32c2da00e4afc8af673041bbc78a14ed8720ba8a Mon Sep 17 00:00:00 2001 From: Mateo Florido <32885896+mateoflorido@users.noreply.github.com> Date: Tue, 19 Nov 2024 09:56:14 -0500 Subject: [PATCH 1/5] Implement `pre-upgrade-check` action (#168) Implement the pre-upgrade-check action in the charm. --- charms/worker/charmcraft.yaml | 8 + charms/worker/k8s/charmcraft.yaml | 5 + .../charms/data_platform_libs/v0/upgrade.py | 1102 +++++++++++++++++ charms/worker/k8s/requirements.txt | 2 + charms/worker/k8s/src/charm.py | 14 +- charms/worker/k8s/src/inspector.py | 94 ++ charms/worker/k8s/src/literals.py | 21 + charms/worker/k8s/src/token_distributor.py | 17 +- charms/worker/k8s/src/upgrade.py | 99 ++ .../worker/k8s/tests/unit/test_inspector.py | 101 ++ charms/worker/k8s/tests/unit/test_upgrade.py | 119 ++ 11 files changed, 1578 insertions(+), 4 deletions(-) create mode 100644 charms/worker/k8s/lib/charms/data_platform_libs/v0/upgrade.py create mode 100644 charms/worker/k8s/src/inspector.py create mode 100644 charms/worker/k8s/src/literals.py create mode 100644 charms/worker/k8s/src/upgrade.py create mode 100644 charms/worker/k8s/tests/unit/test_inspector.py create mode 100644 charms/worker/k8s/tests/unit/test_upgrade.py diff --git a/charms/worker/charmcraft.yaml b/charms/worker/charmcraft.yaml index b567c149..26d308d8 100644 --- a/charms/worker/charmcraft.yaml +++ b/charms/worker/charmcraft.yaml @@ -85,6 +85,14 @@ parts: rm -rf $CRAFT_PRIME/lib $CRAFT_PRIME/templates mv $CRAFT_PRIME/k8s/lib $CRAFT_PRIME/lib mv $CRAFT_PRIME/k8s/templates $CRAFT_PRIME/templates +actions: + pre-upgrade-check: + description: Run necessary pre-upgrade checks before executing a charm upgrade. + +peers: + upgrade: + interface: upgrade + provides: cos-agent: diff --git a/charms/worker/k8s/charmcraft.yaml b/charms/worker/k8s/charmcraft.yaml index b68cb8a3..70ce0156 100644 --- a/charms/worker/k8s/charmcraft.yaml +++ b/charms/worker/k8s/charmcraft.yaml @@ -169,6 +169,9 @@ actions: server: description: Override the server endpoint with this field type: string + pre-upgrade-check: + description: Run necessary pre-upgrade checks before executing a charm upgrade. + parts: charm: @@ -180,6 +183,8 @@ peers: interface: k8s-cluster cos-tokens: interface: cos-k8s-tokens + upgrade: + interface: upgrade provides: cos-agent: diff --git a/charms/worker/k8s/lib/charms/data_platform_libs/v0/upgrade.py b/charms/worker/k8s/lib/charms/data_platform_libs/v0/upgrade.py new file mode 100644 index 00000000..4d909d64 --- /dev/null +++ b/charms/worker/k8s/lib/charms/data_platform_libs/v0/upgrade.py @@ -0,0 +1,1102 @@ +# Copyright 2023 Canonical Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +r"""Library to manage in-place upgrades for charms running on VMs and K8s. + +This library contains handlers for `upgrade` relation events used to coordinate +between units in an application during a `juju refresh`, as well as `Pydantic` models +for instantiating, validating and comparing dependencies. + +An upgrade on VMs is initiated with the command `juju refresh`. Once executed, the following +events are emitted to each unit at random: + - `upgrade-charm` + - `config-changed` + - `leader-settings-changed` - Non-leader only + +Charm authors can implement the classes defined in this library to streamline the process of +coordinating which unit updates when, achieved through updating of unit-data `state` throughout. + +At a high-level, the upgrade steps are as follows: + - Run pre-checks on the cluster to confirm it is safe to upgrade + - Create stack of unit.ids, to serve as the upgrade order (generally workload leader is last) + - Start the upgrade by issuing a Juju CLI command + - The unit at the top of the stack gets permission to upgrade + - The unit handles the upgrade and restarts their service + - Repeat, until all units have restarted + +### Usage by charm authors + +#### `upgrade` relation + +Charm authors must implement an additional peer-relation. + +As this library uses relation data exchanged between units to coordinate, charm authors +need to add a new relation interface. The relation name does not matter. + +`metadata.yaml` +```yaml +peers: + upgrade: + interface: upgrade +``` + +#### Dependencies JSON/Dict + +Charm authors must implement a dict object tracking current charm versions, requirements + upgradability. + +Many workload versions may be incompatible with older/newer versions. This same idea also can apply to +charm or snap versions. Workloads with required related applications (e.g Kafka + ZooKeeper) also need to +ensure their versions are compatible during an upgrade, to avoid cluster failure. + +As such, it is necessasry to freeze any dependencies within each published charm. An example of this could +be creating a `DEPENDENCIES` dict within the charm code, with the following structure: + +`src/literals.py` +```python +DEPENDENCIES = { + "kafka_charm": { + "dependencies": {"zookeeper": ">50"}, + "name": "kafka", + "upgrade_supported": ">90", + "version": "100", + }, + "kafka_service": { + "dependencies": {"zookeeper": "^3"}, + "name": "kafka", + "upgrade_supported": ">=0.8", + "version": "3.3.2", + }, +} +``` + +The first-level key names are arbitrary labels for tracking what those versions+dependencies are for. +The `dependencies` second-level values are a key-value map of any required external applications, + and the versions this packaged charm can support. +The `upgrade_suppported` second-level values are requirements from which an in-place upgrade can be + supported by the charm. +The `version` second-level values correspond to the current version of this packaged charm. + +Any requirements comply with [`poetry`'s dependency specifications](https://python-poetry.org/docs/dependency-specification/#caret-requirements). + +### Dependency Model + +Charm authors must implement their own class inheriting from `DependencyModel`. + +Using a `Pydantic` model to instantiate the aforementioned `DEPENDENCIES` dict gives stronger type safety and additional +layers of validation. + +Implementation just needs to ensure that the top-level key names from `DEPENDENCIES` are defined as attributed in the model. + +`src/upgrade.py` +```python +from pydantic import BaseModel + +class KafkaDependenciesModel(BaseModel): + kafka_charm: DependencyModel + kafka_service: DependencyModel +``` + +### Overrides for `DataUpgrade` + +Charm authors must define their own class, inheriting from `DataUpgrade`, overriding all required `abstractmethod`s. + +```python +class ZooKeeperUpgrade(DataUpgrade): + def __init__(self, charm: "ZooKeeperUpgrade", **kwargs): + super().__init__(charm, **kwargs) + self.charm = charm +``` + +#### Implementation of `pre_upgrade_check()` + +Before upgrading a cluster, it's a good idea to check that it is stable and healthy before permitting it. +Here, charm authors can validate upgrade safety through API calls, relation-data checks, etc. +If any of these checks fail, raise `ClusterNotReadyError`. + +```python + @override + def pre_upgrade_check(self) -> None: + default_message = "Pre-upgrade check failed and cannot safely upgrade" + try: + if not self.client.members_broadcasting or not len(self.client.server_members) == len( + self.charm.cluster.peer_units + ): + raise ClusterNotReadyError( + message=default_message, + cause="Not all application units are connected and broadcasting in the quorum", + ) + + if self.client.members_syncing: + raise ClusterNotReadyError( + message=default_message, cause="Some quorum members are syncing data" + ) + + if not self.charm.cluster.stable: + raise ClusterNotReadyError( + message=default_message, cause="Charm has not finished initialising" + ) + + except QuorumLeaderNotFoundError: + raise ClusterNotReadyError(message=default_message, cause="Quorum leader not found") + except ConnectionClosedError: + raise ClusterNotReadyError( + message=default_message, cause="Unable to connect to the cluster" + ) +``` + +#### Implementation of `build_upgrade_stack()` - VM ONLY + +Oftentimes, it is necessary to ensure that the workload leader is the last unit to upgrade, +to ensure high-availability during the upgrade process. +Here, charm authors can create a LIFO stack of unit.ids, represented as a list of unit.id strings, +with the leader unit being at i[0]. + +```python +@override +def build_upgrade_stack(self) -> list[int]: + upgrade_stack = [] + for unit in self.charm.cluster.peer_units: + config = self.charm.cluster.unit_config(unit=unit) + + # upgrade quorum leader last + if config["host"] == self.client.leader: + upgrade_stack.insert(0, int(config["unit_id"])) + else: + upgrade_stack.append(int(config["unit_id"])) + + return upgrade_stack +``` + +#### Implementation of `_on_upgrade_granted()` + +On relation-changed events, each unit will check the current upgrade-stack persisted to relation data. +If that unit is at the top of the stack, it will emit an `upgrade-granted` event, which must be handled. +Here, workloads can be re-installed with new versions, checks can be made, data synced etc. +If the new unit successfully rejoined the cluster, call `set_unit_completed()`. +If the new unit failed to rejoin the cluster, call `set_unit_failed()`. + +NOTE - It is essential here to manually call `on_upgrade_changed` if the unit is the current leader. +This ensures that the leader gets it's own relation-changed event, and updates the upgrade-stack for +other units to follow suit. + +```python +@override +def _on_upgrade_granted(self, event: UpgradeGrantedEvent) -> None: + self.charm.snap.stop_snap_service() + + if not self.charm.snap.install(): + logger.error("Unable to install ZooKeeper Snap") + self.set_unit_failed() + return None + + logger.info(f"{self.charm.unit.name} upgrading service...") + self.charm.snap.restart_snap_service() + + try: + logger.debug("Running post-upgrade check...") + self.pre_upgrade_check() + + logger.debug("Marking unit completed...") + self.set_unit_completed() + + # ensures leader gets it's own relation-changed when it upgrades + if self.charm.unit.is_leader(): + logger.debug("Re-emitting upgrade-changed on leader...") + self.on_upgrade_changed(event) + + except ClusterNotReadyError as e: + logger.error(e.cause) + self.set_unit_failed() +``` + +#### Implementation of `log_rollback_instructions()` + +If the upgrade fails, manual intervention may be required for cluster recovery. +Here, charm authors can log out any necessary steps to take to recover from a failed upgrade. +When a unit fails, this library will automatically log out this message. + +```python +@override +def log_rollback_instructions(self) -> None: + logger.error("Upgrade failed. Please run `juju refresh` to previous version.") +``` + +### Instantiating in the charm and deferring events + +Charm authors must add a class attribute for the child class of `DataUpgrade` in the main charm. +They must also ensure that any non-upgrade related events that may be unsafe to handle during +an upgrade, are deferred if the unit is not in the `idle` state - i.e not currently upgrading. + +```python +class ZooKeeperCharm(CharmBase): + def __init__(self, *args): + super().__init__(*args) + self.upgrade = ZooKeeperUpgrade( + self, + relation_name = "upgrade", + substrate = "vm", + dependency_model=ZooKeeperDependencyModel( + **DEPENDENCIES + ), + ) + + def restart(self, event) -> None: + if not self.upgrade.state == "idle": + event.defer() + return None + + self.restart_snap_service() +``` +""" + +import json +import logging +from abc import ABC, abstractmethod +from typing import Dict, List, Literal, Optional, Set, Tuple + +import poetry.core.constraints.version as poetry_version +from ops.charm import ( + ActionEvent, + CharmBase, + CharmEvents, + RelationCreatedEvent, + UpgradeCharmEvent, +) +from ops.framework import EventBase, EventSource, Object +from ops.model import ActiveStatus, BlockedStatus, MaintenanceStatus, Relation, Unit, WaitingStatus +from pydantic import BaseModel, root_validator, validator + +# The unique Charmhub library identifier, never change it +LIBID = "156258aefb79435a93d933409a8c8684" + +# Increment this major API version when introducing breaking changes +LIBAPI = 0 + +# Increment this PATCH version before using `charmcraft publish-lib` or reset +# to 0 if you are raising the major API version +LIBPATCH = 18 + +PYDEPS = ["pydantic>=1.10,<2", "poetry-core"] + +logger = logging.getLogger(__name__) + +# --- DEPENDENCY RESOLUTION FUNCTIONS --- + + +def verify_requirements(version: str, requirement: str) -> bool: + """Verifies a specified version against defined constraint. + + Supports Poetry version constraints + https://python-poetry.org/docs/dependency-specification/#version-constraints + + Args: + version: the version currently in use + requirement: Poetry version constraint + + Returns: + True if `version` meets defined `requirement`. Otherwise False + """ + return poetry_version.parse_constraint(requirement).allows( + poetry_version.Version.parse(version) + ) + + +# --- DEPENDENCY MODEL TYPES --- + + +class DependencyModel(BaseModel): + """Manager for a single dependency. + + To be used as part of another model representing a collection of arbitrary dependencies. + + Example:: + + class KafkaDependenciesModel(BaseModel): + kafka_charm: DependencyModel + kafka_service: DependencyModel + + deps = { + "kafka_charm": { + "dependencies": {"zookeeper": ">5"}, + "name": "kafka", + "upgrade_supported": ">5", + "version": "10", + }, + "kafka_service": { + "dependencies": {"zookeeper": "^3.6"}, + "name": "kafka", + "upgrade_supported": "~3.3", + "version": "3.3.2", + }, + } + + model = KafkaDependenciesModel(**deps) # loading dict in to model + + print(model.dict()) # exporting back validated deps + """ + + dependencies: Dict[str, str] + name: str + upgrade_supported: str + version: str + + @validator("dependencies", "upgrade_supported", each_item=True) + @classmethod + def dependencies_validator(cls, value): + """Validates version constraint.""" + if isinstance(value, dict): + deps = value.values() + else: + deps = [value] + + for dep in deps: + poetry_version.parse_constraint(dep) + + return value + + @root_validator(skip_on_failure=True) + @classmethod + def version_upgrade_supported_validator(cls, values): + """Validates specified `version` meets `upgrade_supported` requirement.""" + if not verify_requirements( + version=values.get("version"), requirement=values.get("upgrade_supported") + ): + raise ValueError( + f"upgrade_supported value {values.get('upgrade_supported')} greater than version value {values.get('version')} for {values.get('name')}." + ) + + return values + + def can_upgrade(self, dependency: "DependencyModel") -> bool: + """Compares two instances of :class:`DependencyModel` for upgradability. + + Args: + dependency: a dependency model to compare this model against + + Returns: + True if current model can upgrade from dependent model. Otherwise False + """ + return verify_requirements(version=self.version, requirement=dependency.upgrade_supported) + + +# --- CUSTOM EXCEPTIONS --- + + +class UpgradeError(Exception): + """Base class for upgrade related exceptions in the module.""" + + def __init__(self, message: str, cause: Optional[str], resolution: Optional[str]): + super().__init__(message) + self.message = message + self.cause = cause or "" + self.resolution = resolution or "" + + def __repr__(self): + """Representation of the UpgradeError class.""" + return f"{type(self).__module__}.{type(self).__name__} - {str(vars(self))}" + + def __str__(self): + """String representation of the UpgradeError class.""" + return repr(self) + + +class ClusterNotReadyError(UpgradeError): + """Exception flagging that the cluster is not ready to start upgrading. + + For example, if the cluster fails :class:`DataUpgrade._on_pre_upgrade_check_action` + + Args: + message: string message to be logged out + cause: short human-readable description of the cause of the error + resolution: short human-readable instructions for manual error resolution (optional) + """ + + def __init__(self, message: str, cause: str, resolution: Optional[str] = None): + super().__init__(message, cause=cause, resolution=resolution) + + +class KubernetesClientError(UpgradeError): + """Exception flagging that a call to Kubernetes API failed. + + For example, if the cluster fails :class:`DataUpgrade._set_rolling_update_partition` + + Args: + message: string message to be logged out + cause: short human-readable description of the cause of the error + resolution: short human-readable instructions for manual error resolution (optional) + """ + + def __init__(self, message: str, cause: str, resolution: Optional[str] = None): + super().__init__(message, cause=cause, resolution=resolution) + + +class VersionError(UpgradeError): + """Exception flagging that the old `version` fails to meet the new `upgrade_supported`s. + + For example, upgrades from version `2.x` --> `4.x`, + but `4.x` only supports upgrading from `3.x` onwards + + Args: + message: string message to be logged out + cause: short human-readable description of the cause of the error + resolution: short human-readable instructions for manual solutions to the error (optional) + """ + + def __init__(self, message: str, cause: str, resolution: Optional[str] = None): + super().__init__(message, cause=cause, resolution=resolution) + + +class DependencyError(UpgradeError): + """Exception flagging that some new `dependency` is not being met. + + For example, new version requires related App version `2.x`, but currently is `1.x` + + Args: + message: string message to be logged out + cause: short human-readable description of the cause of the error + resolution: short human-readable instructions for manual solutions to the error (optional) + """ + + def __init__(self, message: str, cause: str, resolution: Optional[str] = None): + super().__init__(message, cause=cause, resolution=resolution) + + +# --- CUSTOM EVENTS --- + + +class UpgradeGrantedEvent(EventBase): + """Used to tell units that they can process an upgrade.""" + + +class UpgradeFinishedEvent(EventBase): + """Used to tell units that they finished the upgrade.""" + + +class UpgradeEvents(CharmEvents): + """Upgrade events. + + This class defines the events that the lib can emit. + """ + + upgrade_granted = EventSource(UpgradeGrantedEvent) + upgrade_finished = EventSource(UpgradeFinishedEvent) + + +# --- EVENT HANDLER --- + + +class DataUpgrade(Object, ABC): + """Manages `upgrade` relation operations for in-place upgrades.""" + + STATES = ["recovery", "failed", "idle", "ready", "upgrading", "completed"] + + on = UpgradeEvents() # pyright: ignore [reportAssignmentType] + + def __init__( + self, + charm: CharmBase, + dependency_model: BaseModel, + relation_name: str = "upgrade", + substrate: Literal["vm", "k8s"] = "vm", + ): + super().__init__(charm, relation_name) + self.charm = charm + self.dependency_model = dependency_model + self.relation_name = relation_name + self.substrate = substrate + self._upgrade_stack = None + + # events + self.framework.observe( + self.charm.on[relation_name].relation_created, self._on_upgrade_created + ) + self.framework.observe( + self.charm.on[relation_name].relation_changed, self.on_upgrade_changed + ) + self.framework.observe(self.charm.on.upgrade_charm, self._on_upgrade_charm) + self.framework.observe(getattr(self.on, "upgrade_granted"), self._on_upgrade_granted) + self.framework.observe(getattr(self.on, "upgrade_finished"), self._on_upgrade_finished) + + # actions + self.framework.observe( + getattr(self.charm.on, "pre_upgrade_check_action"), self._on_pre_upgrade_check_action + ) + if self.substrate == "k8s": + self.framework.observe( + getattr(self.charm.on, "resume_upgrade_action"), self._on_resume_upgrade_action + ) + + @property + def peer_relation(self) -> Optional[Relation]: + """The upgrade peer relation.""" + return self.charm.model.get_relation(self.relation_name) + + @property + def app_units(self) -> Set[Unit]: + """The peer-related units in the application.""" + if not self.peer_relation: + return set() + + return set([self.charm.unit] + list(self.peer_relation.units)) + + @property + def state(self) -> Optional[str]: + """The unit state from the upgrade peer relation.""" + if not self.peer_relation: + return None + + return self.peer_relation.data[self.charm.unit].get("state", None) + + @property + def stored_dependencies(self) -> Optional[BaseModel]: + """The application dependencies from the upgrade peer relation.""" + if not self.peer_relation: + return None + + if not (deps := self.peer_relation.data[self.charm.app].get("dependencies", "")): + return None + + return type(self.dependency_model)(**json.loads(deps)) + + @property + def upgrade_stack(self) -> Optional[List[int]]: + """Gets the upgrade stack from the upgrade peer relation. + + Unit.ids are ordered Last-In-First-Out (LIFO). + i.e unit.id at index `-1` is the first unit to upgrade. + unit.id at index `0` is the last unit to upgrade. + + Returns: + List of integer unit.ids, ordered in upgrade order in a stack + """ + if not self.peer_relation: + return None + + # lazy-load + if self._upgrade_stack is None: + self._upgrade_stack = ( + json.loads(self.peer_relation.data[self.charm.app].get("upgrade-stack", "[]")) + or None + ) + + return self._upgrade_stack + + @upgrade_stack.setter + def upgrade_stack(self, stack: List[int]) -> None: + """Sets the upgrade stack to the upgrade peer relation. + + Unit.ids are ordered Last-In-First-Out (LIFO). + i.e unit.id at index `-1` is the first unit to upgrade. + unit.id at index `0` is the last unit to upgrade. + """ + if not self.peer_relation: + return + + self.peer_relation.data[self.charm.app].update({"upgrade-stack": json.dumps(stack)}) + self._upgrade_stack = stack + + @property + def other_unit_states(self) -> list: + """Current upgrade state for other units. + + Returns: + Unsorted list of upgrade states for other units. + """ + if not self.peer_relation: + return [] + + return [ + self.peer_relation.data[unit].get("state", "") + for unit in list(self.peer_relation.units) + ] + + @property + def unit_states(self) -> list: + """Current upgrade state for all units. + + Returns: + Unsorted list of upgrade states for all units. + """ + if not self.peer_relation: + return [] + + return [self.peer_relation.data[unit].get("state", "") for unit in self.app_units] + + @property + def cluster_state(self) -> Optional[str]: + """Current upgrade state for cluster units. + + Determined from :class:`DataUpgrade.STATE`, taking the lowest ordinal unit state. + + For example, if units in have states: `["ready", "upgrading", "completed"]`, + the overall state for the cluster is `ready`. + + Returns: + String of upgrade state from the furthest behind unit. + """ + if not self.unit_states: + return None + + try: + return sorted(self.unit_states, key=self.STATES.index)[0] + except (ValueError, KeyError): + return None + + @property + def idle(self) -> Optional[bool]: + """Flag for whether the cluster is in an idle upgrade state. + + Returns: + True if all application units in idle state. Otherwise False + """ + return set(self.unit_states) == {"idle"} + + @abstractmethod + def pre_upgrade_check(self) -> None: + """Runs necessary checks validating the cluster is in a healthy state to upgrade. + + Called by all units during :meth:`_on_pre_upgrade_check_action`. + + Raises: + :class:`ClusterNotReadyError`: if cluster is not ready to upgrade + """ + pass + + def build_upgrade_stack(self) -> List[int]: + """Builds ordered iterable of all application unit.ids to upgrade in. + + Called by leader unit during :meth:`_on_pre_upgrade_check_action`. + + Returns: + Iterable of integer unit.ids, LIFO ordered in upgrade order + i.e `[5, 2, 4, 1, 3]`, unit `3` upgrades first, `5` upgrades last + """ + # don't raise if k8s substrate, uses default statefulset order + if self.substrate == "k8s": + return [] + + raise NotImplementedError + + @abstractmethod + def log_rollback_instructions(self) -> None: + """Sets charm state and logs out rollback instructions. + + Called by all units when `state=failed` found during :meth:`_on_upgrade_changed`. + """ + pass + + def _repair_upgrade_stack(self) -> None: + """Ensures completed units are re-added to the upgrade-stack after failure.""" + # need to update the stack as it was not refreshed by rollback run of pre-upgrade-check + # avoids difficult health check implementation by charm-authors needing to exclude dead units + + # if the first unit in the stack fails, the stack will be the same length as units + # i.e this block not ran + if ( + self.cluster_state in ["failed", "recovery"] + and self.upgrade_stack + and len(self.upgrade_stack) != len(self.app_units) + and self.charm.unit.is_leader() + ): + new_stack = self.upgrade_stack + for unit in self.app_units: + unit_id = int(unit.name.split("/")[1]) + + # if a unit fails, it rolls back first + if unit_id not in new_stack: + new_stack.insert(-1, unit_id) + logger.debug(f"Inserted {unit_id} in to upgrade-stack - {new_stack}") + + self.upgrade_stack = new_stack + + def set_unit_failed(self, cause: Optional[str] = None) -> None: + """Sets unit `state=failed` to the upgrade peer data. + + Args: + cause: short description of cause of failure + """ + if not self.peer_relation: + return None + + # needed to refresh the stack + # now leader pulls a fresh stack from newly updated relation data + if self.charm.unit.is_leader(): + self._upgrade_stack = None + + self.charm.unit.status = BlockedStatus(cause if cause else "") + self.peer_relation.data[self.charm.unit].update({"state": "failed"}) + self.log_rollback_instructions() + + def set_unit_completed(self) -> None: + """Sets unit `state=completed` to the upgrade peer data.""" + if not self.peer_relation: + return None + + # needed to refresh the stack + # now leader pulls a fresh stack from newly updated relation data + if self.charm.unit.is_leader(): + self._upgrade_stack = None + + self.charm.unit.status = MaintenanceStatus("upgrade completed") + self.peer_relation.data[self.charm.unit].update({"state": "completed"}) + + # Emit upgrade_finished event to run unit's post upgrade operations. + if self.substrate == "k8s": + logger.debug( + f"{self.charm.unit.name} has completed the upgrade, emitting `upgrade_finished` event..." + ) + getattr(self.on, "upgrade_finished").emit() + + def _on_upgrade_created(self, event: RelationCreatedEvent) -> None: + """Handler for `upgrade-relation-created` events.""" + if not self.peer_relation: + event.defer() + return + + # setting initial idle state needed to avoid execution on upgrade-changed events + self.peer_relation.data[self.charm.unit].update({"state": "idle"}) + + if self.charm.unit.is_leader(): + logger.debug("Persisting dependencies to upgrade relation data...") + self.peer_relation.data[self.charm.app].update( + {"dependencies": json.dumps(self.dependency_model.dict())} + ) + + def _on_pre_upgrade_check_action(self, event: ActionEvent) -> None: + """Handler for `pre-upgrade-check-action` events.""" + if not self.peer_relation: + event.fail(message="Could not find upgrade relation.") + return + + if not self.charm.unit.is_leader(): + event.fail(message="Action must be ran on the Juju leader.") + return + + if self.cluster_state == "failed": + logger.info("Entering recovery state for rolling-back to previous version...") + self._repair_upgrade_stack() + self.charm.unit.status = BlockedStatus("ready to rollback application") + self.peer_relation.data[self.charm.unit].update({"state": "recovery"}) + return + + # checking if upgrade in progress + if self.cluster_state != "idle": + event.fail("Cannot run pre-upgrade checks, cluster already upgrading.") + return + + try: + logger.info("Running pre-upgrade-check...") + self.pre_upgrade_check() + + if self.substrate == "k8s": + logger.info("Building upgrade-stack for K8s...") + built_upgrade_stack = sorted( + [int(unit.name.split("/")[1]) for unit in self.app_units] + ) + else: + logger.info("Building upgrade-stack for VMs...") + built_upgrade_stack = self.build_upgrade_stack() + + logger.debug(f"Built upgrade stack of {built_upgrade_stack}") + + except ClusterNotReadyError as e: + logger.error(e) + event.fail(message=e.message) + return + except Exception as e: + logger.error(e) + event.fail(message="Unknown error found.") + return + + logger.info("Setting upgrade-stack to relation data...") + self.upgrade_stack = built_upgrade_stack + + def _on_resume_upgrade_action(self, event: ActionEvent) -> None: + """Handle resume upgrade action. + + Continue the upgrade by setting the partition to the next unit. + """ + if not self.peer_relation: + event.fail(message="Could not find upgrade relation.") + return + + if not self.charm.unit.is_leader(): + event.fail(message="Action must be ran on the Juju leader.") + return + + if not self.upgrade_stack: + event.fail(message="Nothing to resume, upgrade stack unset.") + return + + # Check whether this is being run after juju refresh was called + # (the size of the upgrade stack should match the number of total + # unit minus one). + if len(self.upgrade_stack) != len(self.peer_relation.units): + event.fail(message="Upgrade can be resumed only once after juju refresh is called.") + return + + try: + next_partition = self.upgrade_stack[-1] + self._set_rolling_update_partition(partition=next_partition) + event.set_results({"message": f"Upgrade will resume on unit {next_partition}"}) + except KubernetesClientError: + event.fail(message="Cannot set rolling update partition.") + + def _upgrade_supported_check(self) -> None: + """Checks if previous versions can be upgraded to new versions. + + Raises: + :class:`VersionError` if upgrading to existing `version` is not supported + """ + keys = self.dependency_model.__fields__.keys() + + compatible = True + incompatibilities: List[Tuple[str, str, str, str]] = [] + for key in keys: + old_dep: DependencyModel = getattr(self.stored_dependencies, key) + new_dep: DependencyModel = getattr(self.dependency_model, key) + + if not old_dep.can_upgrade(dependency=new_dep): + compatible = False + incompatibilities.append( + (key, old_dep.version, new_dep.version, new_dep.upgrade_supported) + ) + + base_message = "Versions incompatible" + base_cause = "Upgrades only supported for specific versions" + if not compatible: + for incompat in incompatibilities: + base_message += ( + f", {incompat[0]} {incompat[1]} can not be upgraded to {incompat[2]}" + ) + base_cause += f", {incompat[0]} versions satisfying requirement {incompat[3]}" + + raise VersionError( + message=base_message, + cause=base_cause, + ) + + def _on_upgrade_charm(self, event: UpgradeCharmEvent) -> None: + """Handler for `upgrade-charm` events.""" + # defer if not all units have pre-upgraded + if not self.peer_relation: + event.defer() + return + + if not self.upgrade_stack: + logger.error("Cluster upgrade failed, ensure pre-upgrade checks are ran first.") + return + + if self.substrate == "vm": + # for VM run version checks on leader only + if self.charm.unit.is_leader(): + try: + self._upgrade_supported_check() + except VersionError as e: # not ready if not passed check + logger.error(e) + self.set_unit_failed() + return + top_unit_id = self.upgrade_stack[-1] + top_unit = self.charm.model.get_unit(f"{self.charm.app.name}/{top_unit_id}") + if ( + top_unit == self.charm.unit + and self.peer_relation.data[self.charm.unit].get("state") == "recovery" + ): + # While in a rollback and the Juju leader unit is the top unit in the upgrade stack, emit the event + # for this unit to start the rollback. + self.peer_relation.data[self.charm.unit].update({"state": "ready"}) + self.on_upgrade_changed(event) + return + self.charm.unit.status = WaitingStatus("other units upgrading first...") + self.peer_relation.data[self.charm.unit].update({"state": "ready"}) + + if len(self.app_units) == 1: + # single unit upgrade, emit upgrade_granted event right away + getattr(self.on, "upgrade_granted").emit() + + else: + # for k8s run version checks only on highest ordinal unit + if ( + self.charm.unit.name + == f"{self.charm.app.name}/{self.charm.app.planned_units() -1}" + ): + try: + self._upgrade_supported_check() + except VersionError as e: # not ready if not passed check + logger.error(e) + self.set_unit_failed() + return + # On K8s an unit that receives the upgrade-charm event is upgrading + self.charm.unit.status = MaintenanceStatus("upgrading unit") + self.peer_relation.data[self.charm.unit].update({"state": "upgrading"}) + + def on_upgrade_changed(self, event: EventBase) -> None: + """Handler for `upgrade-relation-changed` events.""" + if not self.peer_relation: + return + + # if any other unit failed, don't continue with upgrade + if self.cluster_state == "failed": + logger.debug("Cluster failed to upgrade, exiting...") + return + + if self.substrate == "vm" and self.cluster_state == "recovery": + # skip run while in recovery. The event will be retrigged when the cluster is ready + logger.debug("Cluster in recovery, skip...") + return + + # if all units completed, mark as complete + if not self.upgrade_stack: + if self.state == "completed" and self.cluster_state in ["idle", "completed"]: + logger.info("All units completed upgrade, setting idle upgrade state...") + self.charm.unit.status = ActiveStatus() + self.peer_relation.data[self.charm.unit].update({"state": "idle"}) + + if self.charm.unit.is_leader(): + logger.debug("Persisting new dependencies to upgrade relation data...") + self.peer_relation.data[self.charm.app].update( + {"dependencies": json.dumps(self.dependency_model.dict())} + ) + return + + if self.cluster_state == "idle": + logger.debug("upgrade-changed event handled before pre-checks, exiting...") + return + + logger.debug("Did not find upgrade-stack or completed cluster state, skipping...") + return + + # upgrade ongoing, set status for waiting units + if "upgrading" in self.unit_states and self.state in ["idle", "ready"]: + self.charm.unit.status = WaitingStatus("other units upgrading first...") + + # pop mutates the `upgrade_stack` attr + top_unit_id = self.upgrade_stack.pop() + top_unit = self.charm.model.get_unit(f"{self.charm.app.name}/{top_unit_id}") + top_state = self.peer_relation.data[top_unit].get("state") + + # if top of stack is completed, leader pops it + if self.charm.unit.is_leader() and top_state == "completed": + logger.debug(f"{top_unit} has finished upgrading, updating stack...") + + # writes the mutated attr back to rel data + self.peer_relation.data[self.charm.app].update( + {"upgrade-stack": json.dumps(self.upgrade_stack)} + ) + + # recurse on leader to ensure relation changed event not lost + # in case leader is next or the last unit to complete + self.on_upgrade_changed(event) + + # if unit top of stack and all units ready (i.e stack), emit granted event + if ( + self.charm.unit == top_unit + and top_state in ["ready", "upgrading"] + and self.cluster_state == "ready" + and "upgrading" not in self.other_unit_states + ): + logger.debug( + f"{top_unit.name} is next to upgrade, emitting `upgrade_granted` event and upgrading..." + ) + self.charm.unit.status = MaintenanceStatus("upgrading...") + self.peer_relation.data[self.charm.unit].update({"state": "upgrading"}) + + try: + getattr(self.on, "upgrade_granted").emit() + except DependencyError as e: + logger.error(e) + self.set_unit_failed() + return + + def _on_upgrade_granted(self, event: UpgradeGrantedEvent) -> None: + """Handler for `upgrade-granted` events. + + Handlers of this event must meet the following: + - SHOULD check for related application deps from :class:`DataUpgrade.dependencies` + - MAY raise :class:`DependencyError` if dependency not met + - MUST update unit `state` after validating the success of the upgrade, calling one of: + - :class:`DataUpgrade.set_unit_failed` if the unit upgrade fails + - :class:`DataUpgrade.set_unit_completed` if the unit upgrade succeeds + - MUST call :class:`DataUpgarde.on_upgrade_changed` on exit so event not lost on leader + """ + # don't raise if k8s substrate, only return + if self.substrate == "k8s": + return + + raise NotImplementedError + + def _on_upgrade_finished(self, _) -> None: + """Handler for `upgrade-finished` events.""" + if self.substrate == "vm" or not self.peer_relation: + return + + # Emit the upgrade relation changed event in the leader to update the upgrade_stack. + if self.charm.unit.is_leader(): + self.charm.on[self.relation_name].relation_changed.emit( + self.model.get_relation(self.relation_name) + ) + + # This hook shouldn't run for the last unit (the first that is upgraded). For that unit it + # should be done through an action after the upgrade success on that unit is double-checked. + unit_number = int(self.charm.unit.name.split("/")[1]) + if unit_number == len(self.peer_relation.units): + logger.info( + f"{self.charm.unit.name} unit upgraded. Evaluate and run `resume-upgrade` action to continue upgrade" + ) + return + + # Also, the hook shouldn't run for the first unit (the last that is upgraded). + if unit_number == 0: + logger.info(f"{self.charm.unit.name} unit upgraded. Upgrade is complete") + return + + try: + # Use the unit number instead of the upgrade stack to avoid race conditions + # (i.e. the leader updates the upgrade stack after this hook runs). + next_partition = unit_number - 1 + logger.debug(f"Set rolling update partition to unit {next_partition}") + self._set_rolling_update_partition(partition=next_partition) + except KubernetesClientError: + logger.exception("Cannot set rolling update partition") + self.set_unit_failed() + self.log_rollback_instructions() + + def _set_rolling_update_partition(self, partition: int) -> None: + """Patch the StatefulSet's `spec.updateStrategy.rollingUpdate.partition`. + + Args: + partition: partition to set. + + K8s only. It should decrement the rolling update strategy partition by using a code + like the following: + + from lightkube.core.client import Client + from lightkube.core.exceptions import ApiError + from lightkube.resources.apps_v1 import StatefulSet + + try: + patch = {"spec": {"updateStrategy": {"rollingUpdate": {"partition": partition}}}} + Client().patch(StatefulSet, name=self.charm.model.app.name, namespace=self.charm.model.name, obj=patch) + logger.debug(f"Kubernetes StatefulSet partition set to {partition}") + except ApiError as e: + if e.status.code == 403: + cause = "`juju trust` needed" + else: + cause = str(e) + raise KubernetesClientError("Kubernetes StatefulSet patch failed", cause) + """ + if self.substrate == "vm": + return + + raise NotImplementedError diff --git a/charms/worker/k8s/requirements.txt b/charms/worker/k8s/requirements.txt index 8f2e0805..01a13b16 100644 --- a/charms/worker/k8s/requirements.txt +++ b/charms/worker/k8s/requirements.txt @@ -10,3 +10,5 @@ tomli ==2.1.0 tomli-w == 1.0.0 typing_extensions==4.12.2 websocket-client==1.8.0 +poetry-core==1.9.1 +lightkube==0.15.5 diff --git a/charms/worker/k8s/src/charm.py b/charms/worker/k8s/src/charm.py index 8d28884a..d3775be0 100755 --- a/charms/worker/k8s/src/charm.py +++ b/charms/worker/k8s/src/charm.py @@ -56,10 +56,13 @@ from charms.node_base import LabelMaker from charms.reconciler import Reconciler from cos_integration import COSIntegration +from inspector import ClusterInspector +from literals import DEPENDENCIES from snap import management as snap_management from snap import version as snap_version from token_distributor import ClusterTokenType, TokenCollector, TokenDistributor, TokenStrategy from typing_extensions import Literal +from upgrade import K8sDependenciesModel, K8sUpgrade # Log messages can be retrieved using juju debug-log log = logging.getLogger(__name__) @@ -126,6 +129,14 @@ def __init__(self, *args): self.api_manager = K8sdAPIManager(factory) xcp_relation = "external-cloud-provider" if self.is_control_plane else "" self.xcp = ExternalCloudProvider(self, xcp_relation) + self.cluster_inspector = ClusterInspector(kubeconfig_path=KUBECONFIG) + self.upgrade = K8sUpgrade( + self, + node_manager=self.cluster_inspector, + relation_name="upgrade", + substrate="vm", + dependency_model=K8sDependenciesModel(**DEPENDENCIES), + ) self.cos = COSIntegration(self) self.reconciler = Reconciler(self, self._reconcile) self.distributor = TokenDistributor(self, self.get_node_name(), self.api_manager) @@ -293,7 +304,7 @@ def _bootstrap_k8s_snap(self): log.info("K8s cluster already bootstrapped") return - bootstrap_config = BootstrapConfig() + bootstrap_config = BootstrapConfig.construct() self._configure_datastore(bootstrap_config) self._configure_cloud_provider(bootstrap_config) self._configure_annotations(bootstrap_config) @@ -610,6 +621,7 @@ def _update_kubernetes_version(self): """ relation = self.model.get_relation("cluster") if not relation: + status.add(ops.BlockedStatus("Missing cluster integration")) raise ReconcilerError("Missing cluster integration") if version := snap_version("k8s"): relation.data[self.unit]["version"] = version diff --git a/charms/worker/k8s/src/inspector.py b/charms/worker/k8s/src/inspector.py new file mode 100644 index 00000000..1b3f1a18 --- /dev/null +++ b/charms/worker/k8s/src/inspector.py @@ -0,0 +1,94 @@ +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. + +"""A module for inspecting a Kubernetes cluster.""" + +import logging +from pathlib import Path +from typing import List, Optional + +from lightkube import ApiError, Client, KubeConfig +from lightkube.core.client import LabelSelector +from lightkube.resources.core_v1 import Node, Pod + +log = logging.getLogger(__name__) + + +class ClusterInspector: + """A helper class for inspecting a Kubernetes cluster.""" + + class ClusterInspectorError(Exception): + """Base exception for ClusterInspector errors.""" + + def __init__( + self, + kubeconfig_path: Path, + ): + """Initialize the ClusterInspector. + + Args: + kubeconfig_path: The path to the kubeconfig file. + """ + self.kubeconfig_path = kubeconfig_path + # NOTE (mateoflorido): The client is set to None to avoid + # initializing it when the object is created (e.g. during + # the charm install as we don't have the kubeconfig yet). + # The client will be initialized when it's needed using the + # _get_client method. + self.client: Optional[Client] = None + + def _get_client(self) -> Client: + """Return the client instance.""" + if self.client is None: + config = KubeConfig.from_file(str(self.kubeconfig_path)) + self.client = Client(config=config.get()) + return self.client + + def get_nodes(self, labels: LabelSelector) -> Optional[List[Node]]: + """Get nodes from the cluster. + + Args: + labels: A dictionary of labels to filter nodes. + + Returns: + A list of the failed nodes that match the label selector. + + Raises: + ClusterInspectorError: If the nodes cannot be retrieved. + """ + client = self._get_client() + unready_nodes = [] + try: + for node in client.list(Node, labels=labels): + if node.status != "Ready": + unready_nodes.append(node) + except ApiError as e: + raise ClusterInspector.ClusterInspectorError(f"Failed to get nodes: {e}") from e + return unready_nodes or None + + def verify_pods_running(self, namespaces: List[str]) -> Optional[str]: + """Verify that all pods in the specified namespaces are running. + + Args: + namespaces: A list of namespaces to check. + + Returns: + None if all pods are running, otherwise returns a string + containing the namespaces that have pods not running. + + Raises: + ClusterInspectorError: If the pods cannot be retrieved. + """ + client = self._get_client() + + failing_pods = [] + try: + for namespace in namespaces: + for pod in client.list(Pod, namespace=namespace): + if pod.status.phase != "Running": # type: ignore + failing_pods.append(f"{namespace}/{pod.metadata.name}") # type: ignore + if failing_pods: + return ", ".join(failing_pods) + except ApiError as e: + raise ClusterInspector.ClusterInspectorError(f"Failed to get pods: {e}") from e + return None diff --git a/charms/worker/k8s/src/literals.py b/charms/worker/k8s/src/literals.py new file mode 100644 index 00000000..656f95af --- /dev/null +++ b/charms/worker/k8s/src/literals.py @@ -0,0 +1,21 @@ +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Literals for the charm.""" + +DEPENDENCIES = { + # NOTE: Update the dependencies for the k8s-charm before releasing. + "k8s_charm": { + "dependencies": {"k8s-worker": ">2"}, + "name": "k8s", + "upgrade_supported": ">=1", + "version": "2", + }, + # NOTE: Update the dependencies for the k8s-service before releasing. + "k8s_service": { + "dependencies": {"k8s-worker": "^1.31.0"}, + "name": "k8s", + "upgrade_supported": "^1.30.0", + "version": "1.31.2", + }, +} diff --git a/charms/worker/k8s/src/token_distributor.py b/charms/worker/k8s/src/token_distributor.py index d507d606..257e1de8 100644 --- a/charms/worker/k8s/src/token_distributor.py +++ b/charms/worker/k8s/src/token_distributor.py @@ -35,9 +35,20 @@ class K8sCharm(Protocol): unit (ops.Unit): The unit object. """ - app: ops.Application - model: ops.Model - unit: ops.Unit + @property + def app(self) -> ops.Application: + """The application object.""" + ... # pylint: disable=unnecessary-ellipsis + + @property + def model(self) -> ops.Model: + """The model object.""" + ... # pylint: disable=unnecessary-ellipsis + + @property + def unit(self) -> ops.Unit: + """The unit object.""" + ... # pylint: disable=unnecessary-ellipsis def get_cluster_name(self) -> str: """Get the cluster name.""" diff --git a/charms/worker/k8s/src/upgrade.py b/charms/worker/k8s/src/upgrade.py new file mode 100644 index 00000000..fc2548f6 --- /dev/null +++ b/charms/worker/k8s/src/upgrade.py @@ -0,0 +1,99 @@ +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. + +"""A module for upgrading the k8s and k8s-worker charms.""" + +import logging +from typing import List + +from charms.data_platform_libs.v0.upgrade import ClusterNotReadyError, DataUpgrade, DependencyModel +from inspector import ClusterInspector +from pydantic import BaseModel + +log = logging.getLogger(__name__) + + +class K8sDependenciesModel(BaseModel): + """A model for the k8s and k8s-worker charm dependencies. + + Attributes: + k8s_charm: The k8s charm dependency model. + k8s_service: The k8s-service charm dependency model. + """ + + k8s_charm: DependencyModel + k8s_service: DependencyModel + + +class K8sUpgrade(DataUpgrade): + """A helper class for upgrading the k8s and k8s-worker charms.""" + + def __init__(self, charm, node_manager: ClusterInspector, **kwargs): + """Initialize the K8sUpgrade. + + Args: + charm: The charm instance. + node_manager: The ClusterInspector instance. + kwargs: Additional keyword arguments. + """ + super().__init__(charm, **kwargs) + self.charm = charm + self.node_manager = node_manager + + def pre_upgrade_check(self) -> None: + """Check if the cluster is ready for an upgrade. + + It verifies that the cluster nodes are ready before proceeding and + if the pods in the specified namespace are ready. + + Raises: + ClusterNotReadyError: If the cluster is not ready for an upgrade. + """ + try: + nodes = self.node_manager.get_nodes( + labels={"juju-charm": "k8s-worker" if self.charm.is_worker else "k8s"} + ) + except ClusterInspector.ClusterInspectorError as e: + raise ClusterNotReadyError( + message="Cluster is not ready for an upgrade", + cause=str(e), + resolution="""API server may not be reachable. + Please check that the API server is up and running.""", + ) from e + + unready_nodes = nodes or [] + + if unready_nodes: + raise ClusterNotReadyError( + message="Cluster is not ready for an upgrade", + cause=f"Nodes not ready: {', '.join(unready_nodes)}", + resolution="""Node(s) may be in a bad state. + Please check the node(s) for more information.""", + ) + + if failing_pods := self.node_manager.verify_pods_running(["kube-system"]): + raise ClusterNotReadyError( + message="Cluster is not ready", + cause=f"Pods not running in namespace(s): {failing_pods}", + resolution="Check the logs for the failing pods.", + ) + + def build_upgrade_stack(self) -> List[int]: + """Return a list of unit numbers to upgrade in order. + + Returns: + A list of unit numbers to upgrade in order. + """ + relation = self.charm.model.get_relation("cluster") + if not relation: + return [int(self.charm.unit.name.split("/")[-1])] + + return [ + int(unit.name.split("/")[-1]) for unit in ({self.charm.unit} | set(relation.units)) + ] + + def log_rollback_instructions(self) -> None: + """Log instructions for rolling back the upgrade.""" + log.critical( + "To rollback the upgrade, run: `juju refresh` to the previously deployed revision." + ) diff --git a/charms/worker/k8s/tests/unit/test_inspector.py b/charms/worker/k8s/tests/unit/test_inspector.py new file mode 100644 index 00000000..2e532095 --- /dev/null +++ b/charms/worker/k8s/tests/unit/test_inspector.py @@ -0,0 +1,101 @@ +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Tests for the inspector module.""" + +import unittest +from pathlib import Path +from typing import List +from unittest.mock import MagicMock + +from inspector import ClusterInspector +from lightkube.core.exceptions import ApiError +from lightkube.resources.core_v1 import Node, Pod + + +class TestClusterInspector(unittest.TestCase): + """Tests for the ClusterInspector class.""" + + def setUp(self): + """Set up common test fixtures.""" + self.inspector = ClusterInspector(Path("/path/to/kubeconfig")) + self.mock_client = MagicMock() + self.inspector.client = self.mock_client + + def test_get_nodes_returns_unready(self): + """Test that get_nodes returns unready nodes.""" + mock_node1 = MagicMock(spec=Node) + mock_node1.status = "Ready" + mock_node1.metadata.name = "node1" + + mock_node2 = MagicMock(spec=Node) + mock_node2.status = "NotReady" + mock_node2.metadata.name = "node2" + + self.mock_client.list.return_value = [mock_node1, mock_node2] + + nodes: List[Node] = self.inspector.get_nodes({"role": "control-plane"}) + + self.mock_client.list.assert_called_once_with(Node, labels={"role": "control-plane"}) + self.assertEqual(len(nodes), 1) + # pylint: disable=unsubscriptable-object + self.assertEqual(nodes[0].metadata.name, "node2") # type: ignore + + def test_get_nodes_api_error(self): + """Test get_nodes handles API errors.""" + self.mock_client.list.side_effect = ApiError(response=MagicMock()) + with self.assertRaises(ClusterInspector.ClusterInspectorError): + self.inspector.get_nodes({"role": "control-plane"}) + + def test_verify_pods_running_failed_pods(self): + """Test verify_pods_running when some pods are not running.""" + mock_pod = MagicMock(spec=Pod) + mock_pod.status.phase = "Running" + mock_pod.metadata.name = "pod1" + + mock_pod2 = MagicMock(spec=Pod) + mock_pod2.status.phase = "Failed" + mock_pod2.metadata.name = "pod2" + + self.mock_client.list.return_value = [mock_pod, mock_pod2] + + result = self.inspector.verify_pods_running(["kube-system"]) + + self.assertEqual(result, "kube-system/pod2") + self.mock_client.list.assert_called_once_with(Pod, namespace="kube-system") + + def test_verify_pods_running_multiple_namespaces(self): + """Test verify_pods_running with multiple namespaces.""" + + def mock_list_pods(_, namespace): + """Mock the list method to return pods in different states. + + Args: + namespace: The namespace to list pods from. + + Returns: + A list of pods in different states. + """ + if namespace == "ns1": + mock_pod = MagicMock(spec=Pod) + mock_pod.status.phase = "Running" + mock_pod.metadata.name = "pod1" + return [mock_pod] + mock_pod = MagicMock(spec=Pod) + mock_pod.status.phase = "Failed" + mock_pod.metadata.name = "pod2" + return [mock_pod] + + self.mock_client.list.side_effect = mock_list_pods + + result = self.inspector.verify_pods_running(["ns1", "ns2"]) + + self.assertEqual(result, "ns2/pod2") + self.assertEqual(self.mock_client.list.call_count, 2) + + def test_verify_pods_running_api_error(self): + """Test verify_pods_running handles API errors.""" + self.mock_client.list.side_effect = ApiError(response=MagicMock()) + + with self.assertRaises(ClusterInspector.ClusterInspectorError): + self.inspector.verify_pods_running(["default"]) diff --git a/charms/worker/k8s/tests/unit/test_upgrade.py b/charms/worker/k8s/tests/unit/test_upgrade.py new file mode 100644 index 00000000..edeb5003 --- /dev/null +++ b/charms/worker/k8s/tests/unit/test_upgrade.py @@ -0,0 +1,119 @@ +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Tests for the upgrade module.""" + +import unittest +from unittest.mock import MagicMock + +from charms.data_platform_libs.v0.upgrade import ClusterNotReadyError +from inspector import ClusterInspector +from upgrade import K8sDependenciesModel, K8sUpgrade + + +class TestK8sUpgrade(unittest.TestCase): + """Tests for the K8sUpgrade class.""" + + def setUp(self): + """Set up common test fixtures.""" + self.charm = MagicMock() + self.node_manager = MagicMock(spec=ClusterInspector) + self.upgrade = K8sUpgrade( + self.charm, + node_manager=self.node_manager, + relation_name="upgrade", + substrate="vm", + dependency_model=K8sDependenciesModel( + **{ + "k8s_charm": { + "dependencies": {"k8s-worker": ">50"}, + "name": "k8s", + "upgrade_supported": ">90", + "version": "100", + }, + "k8s_service": { + "dependencies": {"k8s-worker": "^3"}, + "name": "k8s", + "upgrade_supported": ">=0.8", + "version": "1.31.1", + }, + } + ), + ) + + def test_pre_upgrade_check_worker_success(self): + """Test pre_upgrade_check succeeds for worker nodes.""" + self.charm.is_worker = True + self.node_manager.get_nodes.return_value = [] + self.node_manager.verify_pods_running.return_value = None + + self.upgrade.pre_upgrade_check() + + self.node_manager.get_nodes.assert_called_once_with(labels={"juju-charm": "k8s-worker"}) + self.node_manager.verify_pods_running.assert_called_once_with(["kube-system"]) + + def test_pre_upgrade_check_control_plane_success(self): + """Test pre_upgrade_check succeeds for control plane nodes.""" + self.charm.is_worker = False + self.node_manager.get_nodes.return_value = [] + self.node_manager.verify_pods_running.return_value = None + + self.upgrade.pre_upgrade_check() + + self.node_manager.get_nodes.assert_called_once_with(labels={"juju-charm": "k8s"}) + + def test_pre_upgrade_check_unready_nodes(self): + """Test pre_upgrade_check fails when nodes are not ready.""" + self.charm.is_worker = True + self.node_manager.get_nodes.return_value = [ + "worker-1", + "worker-2", + "worker-3", + ] + + with self.assertRaises(ClusterNotReadyError): + self.upgrade.pre_upgrade_check() + + def test_pre_upgrade_check_cluster_inspector_error(self): + """Test pre_upgrade_check handles ClusterInspectorError.""" + self.node_manager.get_nodes.side_effect = ClusterInspector.ClusterInspectorError( + "test error" + ) + + with self.assertRaises(ClusterNotReadyError): + self.upgrade.pre_upgrade_check() + + def test_pre_upgrade_check_pods_not_ready(self): + """Test pre_upgrade_check fails when pods are not ready.""" + self.charm.is_worker = True + self.node_manager.get_nodes.return_value = None + self.node_manager.verify_pods_running.return_value = "kube-system/pod-1" + + with self.assertRaises(ClusterNotReadyError): + self.upgrade.pre_upgrade_check() + + def test_build_upgrade_stack_no_relation(self): + """Test build_upgrade_stack when no cluster relation exists.""" + self.charm.unit.name = "k8s/0" + self.charm.model.get_relation.return_value = None + + result = self.upgrade.build_upgrade_stack() + + self.assertEqual(result, [0]) + self.charm.model.get_relation.assert_called_once_with("cluster") + + def test_build_upgrade_stack_with_relation(self): + """Test build_upgrade_stack with cluster relation.""" + self.charm.unit.name = "k8s/0" + relation = MagicMock() + unit_1 = MagicMock() + unit_1.name = "k8s/1" + unit_2 = MagicMock() + unit_2.name = "k8s/2" + relation.units = {unit_1, unit_2} + self.charm.model.get_relation.return_value = relation + + result = self.upgrade.build_upgrade_stack() + + self.assertEqual(sorted(result), [0, 1, 2]) + self.charm.model.get_relation.assert_called_once_with("cluster") From 7558411716e841a4035ed02e101d78a2718fbc4d Mon Sep 17 00:00:00 2001 From: Adam Dyess Date: Tue, 19 Nov 2024 15:56:25 -0600 Subject: [PATCH 2/5] Integrate with cloud-provider and test with AWS (#162) * Integrate with cloud-provider and test with AWS * Cloud integrations registered at charm start, in order to react to juju events * Move kube_control relation management to its own module * Testing for cloud integrations * Update charms/worker/k8s/tests/unit/test_cloud_integration.py Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> * cloud integration is dependent on the remote cluster-tag --------- Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> --- charms/worker/charmcraft.yaml | 6 + charms/worker/k8s/charmcraft.yaml | 12 +- .../k8s/lib/charms/k8s/v0/k8sd_api_manager.py | 152 ++++++------ charms/worker/k8s/requirements.txt | 4 + charms/worker/k8s/src/charm.py | 103 +++++--- charms/worker/k8s/src/cloud_integration.py | 159 +++++++++++++ charms/worker/k8s/src/kube_control.py | 72 ++++++ charms/worker/k8s/src/protocols.py | 39 +++ charms/worker/k8s/src/token_distributor.py | 85 ++++--- .../k8s/tests/unit/test_cloud_integration.py | 223 ++++++++++++++++++ pyproject.toml | 3 + 11 files changed, 713 insertions(+), 145 deletions(-) create mode 100644 charms/worker/k8s/src/cloud_integration.py create mode 100644 charms/worker/k8s/src/kube_control.py create mode 100644 charms/worker/k8s/src/protocols.py create mode 100644 charms/worker/k8s/tests/unit/test_cloud_integration.py diff --git a/charms/worker/charmcraft.yaml b/charms/worker/charmcraft.yaml index 26d308d8..326a4b9b 100644 --- a/charms/worker/charmcraft.yaml +++ b/charms/worker/charmcraft.yaml @@ -98,6 +98,10 @@ provides: cos-agent: interface: cos_agent requires: + aws: + interface: aws-integration + azure: + interface: azure-integration cluster: interface: k8s-cluster # interface to connect with the k8s charm to provide @@ -108,3 +112,5 @@ requires: interface: cos-k8s-tokens containerd: interface: containerd + gcp: + interface: gcp-integration diff --git a/charms/worker/k8s/charmcraft.yaml b/charms/worker/k8s/charmcraft.yaml index 70ce0156..df1db8cc 100644 --- a/charms/worker/k8s/charmcraft.yaml +++ b/charms/worker/k8s/charmcraft.yaml @@ -189,17 +189,25 @@ peers: provides: cos-agent: interface: cos_agent - k8s-cluster: - interface: k8s-cluster cos-worker-tokens: interface: cos-k8s-tokens containerd: interface: containerd ceph-k8s-info: interface: kubernetes-info + k8s-cluster: + interface: k8s-cluster + kube-control: + interface: kube-control requires: + aws: + interface: aws-integration + azure: + interface: azure-integration etcd: interface: etcd external-cloud-provider: interface: external_cloud_provider + gcp: + interface: gcp-integration diff --git a/charms/worker/k8s/lib/charms/k8s/v0/k8sd_api_manager.py b/charms/worker/k8s/lib/charms/k8s/v0/k8sd_api_manager.py index d175ea97..543e8216 100644 --- a/charms/worker/k8s/lib/charms/k8s/v0/k8sd_api_manager.py +++ b/charms/worker/k8s/lib/charms/k8s/v0/k8sd_api_manager.py @@ -45,7 +45,7 @@ class utilises different connection factories (UnixSocketConnectionFactory # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 2 +LIBPATCH = 3 logger = logging.getLogger(__name__) @@ -190,8 +190,8 @@ class ClusterMember(BaseModel): name: str address: str - cluster_role: Optional[str] = Field(None, alias="cluster-role") - datastore_role: Optional[str] = Field(None, alias="datastore-role") + cluster_role: Optional[str] = Field(default=None, alias="cluster-role") + datastore_role: Optional[str] = Field(default=None, alias="datastore-role") class DNSConfig(BaseModel): @@ -204,10 +204,10 @@ class DNSConfig(BaseModel): upstream_nameservers: List of upstream nameservers for DNS resolution. """ - enabled: Optional[bool] = Field(None) - cluster_domain: Optional[str] = Field(None, alias="cluster-domain") - service_ip: Optional[str] = Field(None, alias="service-ip") - upstream_nameservers: Optional[List[str]] = Field(None, alias="upstream-nameservers") + enabled: Optional[bool] = Field(default=None) + cluster_domain: Optional[str] = Field(default=None, alias="cluster-domain") + service_ip: Optional[str] = Field(default=None, alias="service-ip") + upstream_nameservers: Optional[List[str]] = Field(default=None, alias="upstream-nameservers") class IngressConfig(BaseModel): @@ -219,9 +219,9 @@ class IngressConfig(BaseModel): enable_proxy_protocol: Optional flag to enable or disable proxy protocol. """ - enabled: Optional[bool] = Field(None) - default_tls_secret: Optional[str] = Field(None, alias="default-tls-secret") - enable_proxy_protocol: Optional[bool] = Field(None, alias="enable-proxy-protocol") + enabled: Optional[bool] = Field(default=None) + default_tls_secret: Optional[str] = Field(default=None, alias="default-tls-secret") + enable_proxy_protocol: Optional[bool] = Field(default=None, alias="enable-proxy-protocol") class LoadBalancerConfig(BaseModel): @@ -239,15 +239,15 @@ class LoadBalancerConfig(BaseModel): bgp_peer_port: The port for BGP peering. """ - enabled: Optional[bool] = Field(None) - cidrs: Optional[List[str]] = Field(None) - l2_enabled: Optional[bool] = Field(None, alias="l2-enabled") - l2_interfaces: Optional[List[str]] = Field(None, alias="l2-interfaces") - bgp_enabled: Optional[bool] = Field(None, alias="bgp-enabled") - bgp_local_asn: Optional[int] = Field(None, alias="bgp-local-asn") - bgp_peer_address: Optional[str] = Field(None, alias="bgp-peer-address") - bgp_peer_asn: Optional[int] = Field(None, alias="bgp-peer-asn") - bgp_peer_port: Optional[int] = Field(None, alias="bgp-peer-port") + enabled: Optional[bool] = Field(default=None) + cidrs: Optional[List[str]] = Field(default=None) + l2_enabled: Optional[bool] = Field(default=None, alias="l2-enabled") + l2_interfaces: Optional[List[str]] = Field(default=None, alias="l2-interfaces") + bgp_enabled: Optional[bool] = Field(default=None, alias="bgp-enabled") + bgp_local_asn: Optional[int] = Field(default=None, alias="bgp-local-asn") + bgp_peer_address: Optional[str] = Field(default=None, alias="bgp-peer-address") + bgp_peer_asn: Optional[int] = Field(default=None, alias="bgp-peer-asn") + bgp_peer_port: Optional[int] = Field(default=None, alias="bgp-peer-port") class LocalStorageConfig(BaseModel): @@ -260,10 +260,10 @@ class LocalStorageConfig(BaseModel): set_default: Optional flag to set this as the default storage option. """ - enabled: Optional[bool] = Field(None) - local_path: Optional[str] = Field(None, alias="local-path") - reclaim_policy: Optional[str] = Field(None, alias="reclaim-policy") - set_default: Optional[bool] = Field(None, alias="set-default") + enabled: Optional[bool] = Field(default=None) + local_path: Optional[str] = Field(default=None, alias="local-path") + reclaim_policy: Optional[str] = Field(default=None, alias="reclaim-policy") + set_default: Optional[bool] = Field(default=None, alias="set-default") class NetworkConfig(BaseModel): @@ -273,7 +273,7 @@ class NetworkConfig(BaseModel): enabled: Optional flag which represents the status of Network. """ - enabled: Optional[bool] = Field(None) + enabled: Optional[bool] = Field(default=None) class GatewayConfig(BaseModel): @@ -283,7 +283,7 @@ class GatewayConfig(BaseModel): enabled: Optional flag which represents the status of Gateway. """ - enabled: Optional[bool] = Field(None) + enabled: Optional[bool] = Field(default=None) class MetricsServerConfig(BaseModel): @@ -293,7 +293,7 @@ class MetricsServerConfig(BaseModel): enabled: Optional flag which represents the status of MetricsServer. """ - enabled: Optional[bool] = Field(None) + enabled: Optional[bool] = Field(default=None) class UserFacingClusterConfig(BaseModel, allow_population_by_field_name=True): @@ -311,15 +311,15 @@ class UserFacingClusterConfig(BaseModel, allow_population_by_field_name=True): annotations: Dictionary that can be used to store arbitrary metadata configuration. """ - network: Optional[NetworkConfig] = Field(None) - dns: Optional[DNSConfig] = Field(None) - ingress: Optional[IngressConfig] = Field(None) - load_balancer: Optional[LoadBalancerConfig] = Field(None, alias="load-balancer") - local_storage: Optional[LocalStorageConfig] = Field(None, alias="local-storage") - gateway: Optional[GatewayConfig] = Field(None) - metrics_server: Optional[MetricsServerConfig] = Field(None, alias="metrics-server") - cloud_provider: Optional[str] = Field(None, alias="cloud-provider") - annotations: Optional[Dict[str, str]] = Field(None) + network: Optional[NetworkConfig] = Field(default=None) + dns: Optional[DNSConfig] = Field(default=None) + ingress: Optional[IngressConfig] = Field(default=None) + load_balancer: Optional[LoadBalancerConfig] = Field(default=None, alias="load-balancer") + local_storage: Optional[LocalStorageConfig] = Field(default=None, alias="local-storage") + gateway: Optional[GatewayConfig] = Field(default=None) + metrics_server: Optional[MetricsServerConfig] = Field(default=None, alias="metrics-server") + cloud_provider: Optional[str] = Field(default=None, alias="cloud-provider") + annotations: Optional[Dict[str, str]] = Field(default=None) class UserFacingDatastoreConfig(BaseModel, allow_population_by_field_name=True): @@ -333,11 +333,11 @@ class UserFacingDatastoreConfig(BaseModel, allow_population_by_field_name=True): client_key: client key of the external datastore cluster in PEM format. """ - type: Optional[str] = Field(None) - servers: Optional[List[str]] = Field(None) - ca_crt: Optional[str] = Field(None, alias="ca-crt") - client_crt: Optional[str] = Field(None, alias="client-crt") - client_key: Optional[str] = Field(None, alias="client-key") + type: Optional[str] = Field(default=None) + servers: Optional[List[str]] = Field(default=None) + ca_crt: Optional[str] = Field(default=None, alias="ca-crt") + client_crt: Optional[str] = Field(default=None, alias="client-crt") + client_key: Optional[str] = Field(default=None, alias="client-key") class BootstrapConfig(BaseModel): @@ -357,21 +357,25 @@ class BootstrapConfig(BaseModel): datastore_client_cert (str): The client certificate for accessing the datastore. datastore_client_key (str): The client key for accessing the datastore. extra_sans (List[str]): List of extra sans for the self-signed certificates + extra_node_kube_controller_manager_args ([Dict[str,str]]): key-value service args """ - cluster_config: Optional[UserFacingClusterConfig] = Field(None, alias="cluster-config") - control_plane_taints: Optional[List[str]] = Field(None, alias="control-plane-taints") - pod_cidr: Optional[str] = Field(None, alias="pod-cidr") - service_cidr: Optional[str] = Field(None, alias="service-cidr") - disable_rbac: Optional[bool] = Field(None, alias="disable-rbac") - secure_port: Optional[int] = Field(None, alias="secure-port") - k8s_dqlite_port: Optional[int] = Field(None, alias="k8s-dqlite-port") - datastore_type: Optional[str] = Field(None, alias="datastore-type") - datastore_servers: Optional[List[AnyHttpUrl]] = Field(None, alias="datastore-servers") - datastore_ca_cert: Optional[str] = Field(None, alias="datastore-ca-crt") - datastore_client_cert: Optional[str] = Field(None, alias="datastore-client-crt") - datastore_client_key: Optional[str] = Field(None, alias="datastore-client-key") - extra_sans: Optional[List[str]] = Field(None, alias="extra-sans") + cluster_config: Optional[UserFacingClusterConfig] = Field(default=None, alias="cluster-config") + control_plane_taints: Optional[List[str]] = Field(default=None, alias="control-plane-taints") + pod_cidr: Optional[str] = Field(default=None, alias="pod-cidr") + service_cidr: Optional[str] = Field(default=None, alias="service-cidr") + disable_rbac: Optional[bool] = Field(default=None, alias="disable-rbac") + secure_port: Optional[int] = Field(default=None, alias="secure-port") + k8s_dqlite_port: Optional[int] = Field(default=None, alias="k8s-dqlite-port") + datastore_type: Optional[str] = Field(default=None, alias="datastore-type") + datastore_servers: Optional[List[AnyHttpUrl]] = Field(default=None, alias="datastore-servers") + datastore_ca_cert: Optional[str] = Field(default=None, alias="datastore-ca-crt") + datastore_client_cert: Optional[str] = Field(default=None, alias="datastore-client-crt") + datastore_client_key: Optional[str] = Field(default=None, alias="datastore-client-key") + extra_sans: Optional[List[str]] = Field(default=None, alias="extra-sans") + extra_node_kube_controller_manager_args: Optional[Dict[str, str]] = Field( + default=None, alias="extra-node-kube-controller-manager-args" + ) class CreateClusterRequest(BaseModel): @@ -396,8 +400,8 @@ class UpdateClusterConfigRequest(BaseModel): datastore (Optional[UserFacingDatastoreConfig]): The clusters datastore configuration. """ - config: Optional[UserFacingClusterConfig] = Field(None) - datastore: Optional[UserFacingDatastoreConfig] = Field(None) + config: Optional[UserFacingClusterConfig] = Field(default=None) + datastore: Optional[UserFacingDatastoreConfig] = Field(default=None) class NodeJoinConfig(BaseModel, allow_population_by_field_name=True): @@ -408,8 +412,8 @@ class NodeJoinConfig(BaseModel, allow_population_by_field_name=True): kubelet_key (str): node's certificate key """ - kubelet_crt: Optional[str] = Field(None, alias="kubelet-crt") - kubelet_key: Optional[str] = Field(None, alias="kubelet-key") + kubelet_crt: Optional[str] = Field(default=None, alias="kubelet-crt") + kubelet_key: Optional[str] = Field(default=None, alias="kubelet-key") class ControlPlaneNodeJoinConfig(NodeJoinConfig, allow_population_by_field_name=True): @@ -423,12 +427,12 @@ class ControlPlaneNodeJoinConfig(NodeJoinConfig, allow_population_by_field_name= front_proxy_client_key (str): front-proxy certificate key """ - extra_sans: Optional[List[str]] = Field(None, alias="extra-sans") + extra_sans: Optional[List[str]] = Field(default=None, alias="extra-sans") - apiserver_crt: Optional[str] = Field(None, alias="apiserver-crt") - apiserver_client_key: Optional[str] = Field(None, alias="apiserver-key") - front_proxy_client_crt: Optional[str] = Field(None, alias="front-proxy-client-crt") - front_proxy_client_key: Optional[str] = Field(None, alias="front-proxy-client-key") + apiserver_crt: Optional[str] = Field(default=None, alias="apiserver-crt") + apiserver_client_key: Optional[str] = Field(default=None, alias="apiserver-key") + front_proxy_client_crt: Optional[str] = Field(default=None, alias="front-proxy-client-crt") + front_proxy_client_key: Optional[str] = Field(default=None, alias="front-proxy-client-key") class JoinClusterRequest(BaseModel, allow_population_by_field_name=True): @@ -444,7 +448,7 @@ class JoinClusterRequest(BaseModel, allow_population_by_field_name=True): name: str address: str token: SecretStr - config: Optional[NodeJoinConfig] = Field(None) + config: Optional[NodeJoinConfig] = Field(default=None) def dict(self, **kwds) -> Dict[Any, Any]: """Render object into a dict. @@ -470,8 +474,8 @@ class DatastoreStatus(BaseModel): servers: (List(str)): list of server addresses of the external datastore cluster. """ - datastore_type: Optional[str] = Field(None, alias="type") - servers: Optional[List[str]] = Field(None, alias="servers") + datastore_type: Optional[str] = Field(default=None, alias="type") + servers: Optional[List[str]] = Field(default=None, alias="servers") class ClusterStatus(BaseModel): @@ -485,9 +489,9 @@ class ClusterStatus(BaseModel): """ ready: bool = Field(False) - members: Optional[List[ClusterMember]] = Field(None) - config: Optional[UserFacingClusterConfig] = Field(None) - datastore: Optional[DatastoreStatus] = Field(None) + members: Optional[List[ClusterMember]] = Field(default=None) + config: Optional[UserFacingClusterConfig] = Field(default=None) + datastore: Optional[DatastoreStatus] = Field(default=None) class ClusterMetadata(BaseModel): @@ -508,7 +512,7 @@ class GetClusterStatusResponse(BaseRequestModel): Can be None if the status is not available. """ - metadata: Optional[ClusterMetadata] = Field(None) + metadata: Optional[ClusterMetadata] = Field(default=None) class KubeConfigMetadata(BaseModel): @@ -830,6 +834,16 @@ def request_auth_token(self, username: str, groups: List[str]) -> SecretStr: auth_response = self._send_request(endpoint, "POST", AuthTokenResponse, body) return auth_response.metadata.token + def revoke_auth_token(self, token: str) -> None: + """Revoke a Kubernetes authentication token. + + Args: + token (str): The authentication token. + """ + endpoint = "/1.0/kubernetes/auth/tokens" + body = {"token": token} + self._send_request(endpoint, "DELETE", EmptyResponse, body) + def get_kubeconfig(self, server: Optional[str]) -> str: """Request a Kubernetes admin config. diff --git a/charms/worker/k8s/requirements.txt b/charms/worker/k8s/requirements.txt index 01a13b16..1fd57d98 100644 --- a/charms/worker/k8s/requirements.txt +++ b/charms/worker/k8s/requirements.txt @@ -2,6 +2,10 @@ charm-lib-contextual-status @ git+https://github.com/charmed-kubernetes/charm-li charm-lib-interface-external-cloud-provider @ git+https://github.com/charmed-kubernetes/charm-lib-interface-external-cloud-provider@e1c5fc69e98100a7d43c0ad5a7969bba1ecbcd40 charm-lib-node-base @ git+https://github.com/charmed-kubernetes/layer-kubernetes-node-base@9b212854e768f13c26cc907bed51444e97e51b50#subdirectory=ops charm-lib-reconciler @ git+https://github.com/charmed-kubernetes/charm-lib-reconciler@f818cc30d1a22be43ffdfecf7fbd9c3fd2967502 +ops-interface-kube-control @ git+https://github.com/charmed-kubernetes/interface-kube-control.git@main#subdirectory=ops +ops.interface_aws @ git+https://github.com/charmed-kubernetes/interface-aws-integration@main#subdirectory=ops +ops.interface_gcp @ git+https://github.com/charmed-kubernetes/interface-gcp-integration@main#subdirectory=ops +ops.interface_azure @ git+https://github.com/charmed-kubernetes/interface-azure-integration@main#subdirectory=ops cosl==0.0.42 ops==2.17.0 pydantic==1.10.19 diff --git a/charms/worker/k8s/src/charm.py b/charms/worker/k8s/src/charm.py index d3775be0..78190174 100755 --- a/charms/worker/k8s/src/charm.py +++ b/charms/worker/k8s/src/charm.py @@ -15,12 +15,12 @@ certificate storage. """ +import hashlib import logging import os import shlex import socket import subprocess -import uuid from functools import cached_property from pathlib import Path from time import sleep @@ -55,9 +55,12 @@ from charms.kubernetes_libs.v0.etcd import EtcdReactiveRequires from charms.node_base import LabelMaker from charms.reconciler import Reconciler +from cloud_integration import CloudIntegration from cos_integration import COSIntegration from inspector import ClusterInspector +from kube_control import configure as configure_kube_control from literals import DEPENDENCIES +from ops.interface_kube_control import KubeControlProvides from snap import management as snap_management from snap import version as snap_version from token_distributor import ClusterTokenType, TokenCollector, TokenDistributor, TokenStrategy @@ -128,6 +131,7 @@ def __init__(self, *args): factory = UnixSocketConnectionFactory(unix_socket=K8SD_SNAP_SOCKET, timeout=320) self.api_manager = K8sdAPIManager(factory) xcp_relation = "external-cloud-provider" if self.is_control_plane else "" + self.cloud_integration = CloudIntegration(self, self.is_control_plane) self.xcp = ExternalCloudProvider(self, xcp_relation) self.cluster_inspector = ClusterInspector(kubeconfig_path=KUBECONFIG) self.upgrade = K8sUpgrade( @@ -162,6 +166,7 @@ def __init__(self, *args): self.framework.observe(self.on.update_status, self._on_update_status) if self.is_control_plane: self.etcd = EtcdReactiveRequires(self) + self.kube_control = KubeControlProvides(self, endpoint="kube-control") self.framework.observe(self.on.get_kubeconfig_action, self._get_external_kubeconfig) def _k8s_info(self, event: ops.EventBase): @@ -223,6 +228,16 @@ def _apply_proxy_environment(self): with open("/etc/environment", mode="w", encoding="utf-8") as file: file.write("\n".join([f"{k}={v}" for k, v in current_env.items()])) + def _generate_unique_cluster_name(self) -> str: + """Use a stable input to generate a unique cluster name. + + Returns: + str: The unique cluster name. + """ + stable_input = f"{self.app.name}-{self.model.uuid}" + hashed = hashlib.sha256(stable_input.encode()).hexdigest()[:32] + return f"k8s-{hashed}" + def get_cluster_name(self) -> str: """Craft a unique name for the cluster once joined or bootstrapped. @@ -233,8 +248,7 @@ def get_cluster_name(self) -> str: """ if self._stored.cluster_name == "": if self.lead_control_plane and self.api_manager.is_cluster_bootstrapped(): - # TODO: replace with API call once available from the snap - self._stored.cluster_name = str(uuid.uuid4()) + self._stored.cluster_name = self._generate_unique_cluster_name() elif not (relation := self.model.get_relation("cluster")): pass elif any( @@ -311,6 +325,9 @@ def _bootstrap_k8s_snap(self): bootstrap_config.service_cidr = str(self.config["service-cidr"]) bootstrap_config.control_plane_taints = str(self.config["register-with-taints"]).split() bootstrap_config.extra_sans = [_get_public_address()] + bootstrap_config.extra_node_kube_controller_manager_args = { + "--cluster-name": self._generate_unique_cluster_name() + } status.add(ops.MaintenanceStatus("Bootstrapping Cluster")) @@ -453,19 +470,20 @@ def _configure_datastore(self, config: Union[BootstrapConfig, UpdateClusterConfi config.datastore_servers = self.etcd.get_connection_string().split(",") log.info("etcd servers: %s", config.datastore_servers) elif isinstance(config, UpdateClusterConfigRequest): - config.datastore = UserFacingDatastoreConfig( - type="external", - servers=self.etcd.get_connection_string().split(","), - ca_crt=etcd_config.get("client_ca", ""), - client_crt=etcd_config.get("client_cert", ""), - client_key=etcd_config.get("client_key", ""), - ) + config.datastore = UserFacingDatastoreConfig() + config.datastore.type = "external" + config.datastore.servers = self.etcd.get_connection_string().split(",") + config.datastore.ca_crt = etcd_config.get("client_ca", "") + config.datastore.client_crt = etcd_config.get("client_cert", "") + config.datastore.client_key = etcd_config.get("client_key", "") log.info("etcd servers: %s", config.datastore.servers) elif datastore == "dqlite": log.info("Using dqlite as datastore") - def _configure_cloud_provider(self, config: BootstrapConfig): + def _configure_cloud_provider( + self, config: Union[BootstrapConfig, UpdateClusterConfigRequest] + ): """Configure the cloud-provider for the Kubernetes cluster. Args: @@ -475,7 +493,14 @@ def _configure_cloud_provider(self, config: BootstrapConfig): """ if self.xcp.has_xcp: log.info("Using external as cloud-provider") - config.cloud_provider = "external" + if isinstance(config, BootstrapConfig): + if not (ufcg := config.cluster_config): + ufcg = config.cluster_config = UserFacingClusterConfig() + elif isinstance(config, UpdateClusterConfigRequest): + if not (ufcg := config.config): + ufcg = config.config = UserFacingClusterConfig() + + ufcg.cloud_provider = "external" def _revoke_cluster_tokens(self, event: ops.EventBase): """Revoke tokens for the units in the cluster and k8s-cluster relations. @@ -588,6 +613,8 @@ def _ensure_cluster_config(self): update_request = UpdateClusterConfigRequest() self._configure_datastore(update_request) + self._configure_cloud_provider(update_request) + configure_kube_control(self) self.api_manager.update_cluster_config(update_request) def _get_scrape_jobs(self): @@ -682,29 +709,45 @@ def _get_proxy_env(self) -> Dict[str, str]: InvalidResponseError, K8sdConnectionError, ) - def _join_cluster(self): - """Retrieve the join token from secret databag and join the cluster.""" + def _join_cluster(self, event: ops.EventBase): + """Retrieve the join token from secret databag and join the cluster. + + Args: + event (ops.EventBase): event triggering the join + """ if not (relation := self.model.get_relation("cluster")): status.add(ops.BlockedStatus("Missing cluster integration")) - assert False, "Missing cluster integration" # nosec + raise ReconcilerError("Missing cluster integration") - if self.get_cluster_name(): + if local_cluster := self.get_cluster_name(): + self.cloud_integration.integrate(local_cluster, event) return status.add(ops.MaintenanceStatus("Joining cluster")) with self.collector.recover_token(relation) as token: - binding = self.model.get_binding(relation.name) - address = binding and binding.network.ingress_address - node_name = self.get_node_name() - cluster_addr = f"{address}:{K8SD_PORT}" - log.info("Joining %s(%s) to %s...", self.unit, node_name, cluster_addr) - request = JoinClusterRequest(name=node_name, address=cluster_addr, token=token) - if self.is_control_plane: - request.config = ControlPlaneNodeJoinConfig() - request.config.extra_sans = [_get_public_address()] - - self.api_manager.join_cluster(request) - log.info("Joined %s(%s)", self.unit, node_name) + remote_cluster = self.collector.cluster_name(relation, False) if relation else "" + self.cloud_integration.integrate(remote_cluster, event) + self._join_with_token(relation, token) + + def _join_with_token(self, relation: ops.Relation, token: str): + """Join the cluster with the given token. + + Args: + relation (ops.Relation): The relation to use for the token. + token (str): The token to use for joining the cluster. + """ + binding = self.model.get_binding(relation.name) + address = binding and binding.network.ingress_address + node_name = self.get_node_name() + cluster_addr = f"{address}:{K8SD_PORT}" + log.info("Joining %s(%s) to %s...", self.unit, node_name, cluster_addr) + request = JoinClusterRequest(name=node_name, address=cluster_addr, token=token) + if self.is_control_plane: + request.config = ControlPlaneNodeJoinConfig() + request.config.extra_sans = [_get_public_address()] + + self.api_manager.join_cluster(request) + log.info("Joined %s(%s)", self.unit, node_name) @on_error(WaitingStatus("Awaiting cluster removal")) def _death_handler(self, event: ops.EventBase): @@ -756,7 +799,7 @@ def _reconcile(self, event: ops.EventBase): self._revoke_cluster_tokens(event) self._ensure_cluster_config() self._announce_kubernetes_version() - self._join_cluster() + self._join_cluster(event) self._config_containerd_registries() self._configure_cos_integration() self._update_status() @@ -958,4 +1001,4 @@ def _get_external_kubeconfig(self, event: ops.ActionEvent): if __name__ == "__main__": # pragma: nocover - ops.main.main(K8sCharm) + ops.main(K8sCharm) diff --git a/charms/worker/k8s/src/cloud_integration.py b/charms/worker/k8s/src/cloud_integration.py new file mode 100644 index 00000000..87b1a5e4 --- /dev/null +++ b/charms/worker/k8s/src/cloud_integration.py @@ -0,0 +1,159 @@ +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Cloud Integration for Canonical k8s Operator.""" + +import logging +from typing import Mapping, Optional, Union + +import charms.contextual_status as status +import ops +from ops.interface_aws.requires import AWSIntegrationRequires +from ops.interface_azure.requires import AzureIntegrationRequires +from ops.interface_gcp.requires import GCPIntegrationRequires +from protocols import K8sCharmProtocol + +log = logging.getLogger(__name__) + +CloudSpecificIntegration = Union[ + AWSIntegrationRequires, AzureIntegrationRequires, GCPIntegrationRequires +] + + +class CloudIntegration: + """Utility class that handles the integration with clouds for Canonical k8s. + + This class provides methods to configure instance tags and roles for control-plane + units + + Attributes: + charm (K8sCharm): Reference to the base charm instance. + cloud (CloudSpecificIntegration): Reference to the cloud-specific integration. + """ + + def __init__(self, charm: K8sCharmProtocol, is_control_plane: bool) -> None: + """Integrate with all possible clouds. + + Args: + charm (K8sCharm): Reference to the base charm instance. + is_control_plane (bool): Flag to determine if the unit is a control-plane node. + """ + self.charm = charm + self.is_control_plane = is_control_plane + self.cloud_support: Mapping[str, CloudSpecificIntegration] = { + "aws": AWSIntegrationRequires(charm), + "gce": GCPIntegrationRequires(charm), + "azure": AzureIntegrationRequires(charm), + } + + @property + def cloud(self) -> Optional[CloudSpecificIntegration]: + """Determine if we're integrated with a known cloud.""" + cloud_name = self.charm.get_cloud_name() + if not (cloud := self.cloud_support.get(cloud_name)): + log.warning("Skipping direct cloud integration: cloud %s", cloud_name) + return None + + if not cloud.relation: + log.info( + "Skipping Cloud integration: Needs an active %s relation to integrate.", cloud_name + ) + return None + return cloud + + def _integrate_aws(self, cloud: AWSIntegrationRequires, cluster_tag: str): + """Integrate with AWS cloud. + + Args: + cloud (AWSIntegrationRequires): AWS cloud integration. + cluster_tag (str): Tag to identify the cluster. + """ + aws_cluster_tag = {f"kubernetes.io/cluster/{cluster_tag}": "owned"} + if self.is_control_plane: + # wokeignore:rule=master + cloud.tag_instance({**aws_cluster_tag, "k8s.io/role/master": "true"}) + cloud.tag_instance_security_group(aws_cluster_tag) + cloud.tag_instance_subnet(aws_cluster_tag) + cloud.enable_object_storage_management(["kubernetes-*"]) + cloud.enable_load_balancer_management() + + # Necessary for cloud-provider-aws + cloud.enable_autoscaling_readonly() + cloud.enable_instance_modification() + cloud.enable_region_readonly() + else: + cloud.tag_instance(aws_cluster_tag) + cloud.tag_instance_security_group(aws_cluster_tag) + cloud.tag_instance_subnet(aws_cluster_tag) + cloud.enable_object_storage_management(["kubernetes-*"]) + + def _integrate_gcp(self, cloud: GCPIntegrationRequires, cluster_tag: str): + """Integrate with GCP cloud. + + Args: + cloud (GCPIntegrationRequires): GCP cloud integration. + cluster_tag (str): Tag to identify the cluster. + """ + gcp_cluster_tag = {"k8s-io-cluster-name": cluster_tag} + if self.is_control_plane: + # wokeignore:rule=master + cloud.tag_instance({**gcp_cluster_tag, "k8s-io-role-master": "master"}) + cloud.enable_object_storage_management() + cloud.enable_security_management() + else: + cloud.tag_instance(gcp_cluster_tag) + cloud.enable_object_storage_management() + + def _integrate_azure(self, cloud: AzureIntegrationRequires, cluster_tag: str): + """Integrate with Azure cloud. + + Args: + cloud (AzureIntegrationRequires): Azure cloud integration. + cluster_tag (str): Tag to identify the cluster. + """ + azure_cluster_tag = {"k8s-io-cluster-name": cluster_tag} + if self.is_control_plane: + # wokeignore:rule=master + cloud.tag_instance({**azure_cluster_tag, "k8s-io-role-master": "master"}) + cloud.enable_object_storage_management() + cloud.enable_security_management() + cloud.enable_loadbalancer_management() + else: + cloud.tag_instance(azure_cluster_tag) + cloud.enable_object_storage_management() + + @status.on_error(ops.WaitingStatus("Waiting for cloud-integration")) + def integrate(self, cluster_tag: str, event: ops.EventBase): + """Request tags and permissions for a control-plane node. + + Args: + cluster_tag (str): Tag to identify the integrating cluster. + event (ops.EventBase): Event that triggered the integration + + Raises: + ValueError: If the cloud integration evaluation fails + """ + if not (cloud := self.cloud): + return + + if not cluster_tag: + raise ValueError("Cluster-tag is required for cloud integration") + + cloud_name = self.charm.get_cloud_name() + + status.add(ops.MaintenanceStatus(f"Integrate with {cloud_name}")) + if isinstance(cloud, AWSIntegrationRequires): + self._integrate_aws(cloud, cluster_tag) + elif isinstance(cloud, GCPIntegrationRequires): + self._integrate_gcp(cloud, cluster_tag) + elif isinstance(cloud, AzureIntegrationRequires): + self._integrate_azure(cloud, cluster_tag) + cloud.enable_instance_inspection() + cloud.enable_dns_management() + if self.is_control_plane: + cloud.enable_network_management() + cloud.enable_block_storage_management() + errors = cloud.evaluate_relation(event) + if errors: + log.error("Failed to evaluate cloud integration: %s", errors) + raise ValueError("Failed to evaluate cloud integration") diff --git a/charms/worker/k8s/src/kube_control.py b/charms/worker/k8s/src/kube_control.py new file mode 100644 index 00000000..facb5796 --- /dev/null +++ b/charms/worker/k8s/src/kube_control.py @@ -0,0 +1,72 @@ +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Relation kube-control module.""" +import logging +from base64 import b64decode + +import charms.contextual_status as status +import ops +import yaml +from protocols import K8sCharmProtocol + +# Log messages can be retrieved using juju debug-log +log = logging.getLogger(__name__) + + +def configure(charm: K8sCharmProtocol): + """Configure kube-control for the Kubernetes cluster. + + Args: + charm (K8sCharmProtocol): The charm instance. + """ + if not (binding := charm.model.get_binding("kube-control")): + return + + status.add(ops.MaintenanceStatus("Configuring Kube Control")) + ca_cert, endpoints = "", [f"https://{binding.network.bind_address}:6443"] + labels = str(charm.model.config["labels"]) + taints = str(charm.model.config["register-with-taints"]) + if charm._internal_kubeconfig.exists(): + kubeconfig = yaml.safe_load(charm._internal_kubeconfig.read_text()) + cluster = kubeconfig["clusters"][0]["cluster"] + ca_cert_b64 = cluster["certificate-authority-data"] + ca_cert = b64decode(ca_cert_b64).decode("utf-8") + + charm.kube_control.set_api_endpoints(endpoints) + charm.kube_control.set_ca_certificate(ca_cert) + + if ( + (cluster_status := charm.api_manager.get_cluster_status()) + and cluster_status.metadata + and cluster_status.metadata.status.config + and (dns := cluster_status.metadata.status.config.dns) + ): + charm.kube_control.set_dns_address(dns.service_ip or "") + charm.kube_control.set_dns_domain(dns.cluster_domain or "") + charm.kube_control.set_dns_enabled(dns.enabled) + charm.kube_control.set_dns_port(53) + + charm.kube_control.set_default_cni("") + charm.kube_control.set_image_registry("rocks.canonical.com") + + charm.kube_control.set_cluster_name(charm.get_cluster_name()) + charm.kube_control.set_has_external_cloud_provider(charm.xcp.has_xcp) + charm.kube_control.set_labels(labels.split()) + charm.kube_control.set_taints(taints.split()) + + for request in charm.kube_control.auth_requests: + log.info("Signing kube-control request for '%s 'in '%s'", request.user, request.group) + client_token = charm.api_manager.request_auth_token( + username=request.user, groups=[request.group] + ) + charm.kube_control.sign_auth_request( + request, + client_token=client_token.get_secret_value(), + kubelet_token=str(), + proxy_token=str(), + ) + + for user, cred in charm.kube_control.closed_auth_creds(): + log.info("Revoke auth-token for '%s'", user) + charm.api_manager.revoke_auth_token(cred.load_client_token(charm.model, user)) diff --git a/charms/worker/k8s/src/protocols.py b/charms/worker/k8s/src/protocols.py new file mode 100644 index 00000000..f06f05c1 --- /dev/null +++ b/charms/worker/k8s/src/protocols.py @@ -0,0 +1,39 @@ +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Protocol definitions module.""" + +import ops +from charms.interface_external_cloud_provider import ExternalCloudProvider +from charms.k8s.v0.k8sd_api_manager import K8sdAPIManager +from ops.interface_kube_control import KubeControlProvides + + +class K8sCharmProtocol(ops.CharmBase): + """Typing for the K8sCharm. + + Attributes: + api_manager (K8sdAPIManager): The API manager for the charm. + kube_control (KubeControlProvides): The kube-control interface. + xcp (ExternalCloudProvider): The external cloud provider interface. + """ + + api_manager: K8sdAPIManager + kube_control: KubeControlProvides + xcp: ExternalCloudProvider + + def get_cluster_name(self) -> str: + """Get the cluster name. + + Raises: + NotImplementedError: If the method is not implemented. + """ + raise NotImplementedError + + def get_cloud_name(self) -> str: + """Get the cloud name. + + Raises: + NotImplementedError: If the method is not implemented. + """ + raise NotImplementedError diff --git a/charms/worker/k8s/src/token_distributor.py b/charms/worker/k8s/src/token_distributor.py index 257e1de8..c3d65c41 100644 --- a/charms/worker/k8s/src/token_distributor.py +++ b/charms/worker/k8s/src/token_distributor.py @@ -7,7 +7,7 @@ import logging import re from enum import Enum, auto -from typing import Dict, Optional, Protocol, Union +from typing import Dict, Generator, Optional, Union import charms.contextual_status as status import ops @@ -17,6 +17,7 @@ K8sdAPIManager, K8sdConnectionError, ) +from protocols import K8sCharmProtocol from pydantic import SecretStr log = logging.getLogger(__name__) @@ -26,35 +27,6 @@ UNIT_RE = re.compile(r"k8s(-worker)?/\d+") -class K8sCharm(Protocol): - """Typing for the K8sCharm. - - Attributes: - app (ops.Application): The application object. - model (ops.Model): The model object. - unit (ops.Unit): The unit object. - """ - - @property - def app(self) -> ops.Application: - """The application object.""" - ... # pylint: disable=unnecessary-ellipsis - - @property - def model(self) -> ops.Model: - """The model object.""" - ... # pylint: disable=unnecessary-ellipsis - - @property - def unit(self) -> ops.Unit: - """The unit object.""" - ... # pylint: disable=unnecessary-ellipsis - - def get_cluster_name(self) -> str: - """Get the cluster name.""" - ... # pylint: disable=unnecessary-ellipsis - - class TokenStrategy(Enum): """Enumeration defining strategy for token creation. @@ -115,12 +87,13 @@ def create(self, name: str, token_type: ClusterTokenType) -> SecretStr: worker = token_type == ClusterTokenType.WORKER return self.api_manager.create_join_token(name, worker=worker) - def revoke(self, name: str, ignore_errors: bool): + def revoke(self, name: str, _secret: Optional[ops.Secret], ignore_errors: bool): """Remove a cluster token. Args: - name (str): The name of the node. - ignore_errors (bool): Whether or not errors can be ignored + name (str): The name of the node. + _secret (Optional[ops.Secret]): The secret to revoke + ignore_errors (bool): Whether or not errors can be ignored Raises: K8sdConnectionError: reraises cluster token revoke failures @@ -128,7 +101,7 @@ def revoke(self, name: str, ignore_errors: bool): try: self.api_manager.remove_node(name) except (K8sdConnectionError, InvalidResponseError) as e: - if ignore_errors or e.code == ErrorCodes.STATUS_NODE_UNAVAILABLE: + if ignore_errors or getattr(e, "code") == ErrorCodes.STATUS_NODE_UNAVAILABLE: # Let's just ignore some of these expected errors: # "Remote end closed connection without response" # "Failed to check if node is control-plane" @@ -174,19 +147,38 @@ def create(self, name: str, token_type: ClusterTokenType) -> SecretStr: username=f"system:cos:{name}", groups=["system:cos"] ) - def revoke(self, name: str, ignore_errors: bool): + def revoke(self, _name: str, secret: Optional[ops.Secret], ignore_errors: bool): """Remove a COS token intentionally left unimplemented. Args: - name (str): The name of the node. - ignore_errors (bool): Whether or not errors can be ignored + _name (str): The name of the node. + secret (Optional[ops.Secret]): The secret to revoke + ignore_errors (bool): Whether or not errors can be ignored + + Raises: + K8sdConnectionError: reraises cluster token revoke failures """ + # pylint: disable=unused-argument + if not secret: + return + content = secret.get_content(refresh=True) + try: + self.api_manager.revoke_auth_token(content["token"]) + except (K8sdConnectionError, InvalidResponseError) as e: + if ignore_errors or getattr(e, "code") == ErrorCodes.STATUS_NODE_UNAVAILABLE: + # Let's just ignore some of these expected errors: + # "Remote end closed connection without response" + # "Failed to check if node is control-plane" + # Removing a node that doesn't exist + log.warning("Revoke_Auth_Token %s: but with an expected error: %s", _name, e) + else: + raise class TokenCollector: """Helper class for collecting tokens for units in a relation.""" - def __init__(self, charm: K8sCharm, node_name: str): + def __init__(self, charm: K8sCharmProtocol, node_name: str): """Initialize a TokenCollector instance. Args: @@ -235,7 +227,7 @@ def cluster_name(self, relation: ops.Relation, local: bool) -> str: return cluster_name or "" @contextlib.contextmanager - def recover_token(self, relation: ops.Relation): + def recover_token(self, relation: ops.Relation) -> Generator[str, None, None]: """Request, recover token, and acknowledge token once used. Args: @@ -271,7 +263,7 @@ def recover_token(self, relation: ops.Relation): class TokenDistributor: """Helper class for distributing tokens to units in a relation.""" - def __init__(self, charm: K8sCharm, node_name: str, api_manager: K8sdAPIManager): + def __init__(self, charm: K8sCharmProtocol, node_name: str, api_manager: K8sdAPIManager): """Initialize a TokenDistributor instance. Args: @@ -286,7 +278,7 @@ def __init__(self, charm: K8sCharm, node_name: str, api_manager: K8sdAPIManager) TokenStrategy.COS: CosTokenManager(api_manager), } - def _get_juju_secret(self, relation: ops.Relation, unit: ops.Unit) -> Optional[str]: + def _get_juju_secret(self, relation: ops.Relation, unit: ops.Unit) -> Optional[ops.Secret]: """Lookup juju secret offered to a unit on this relation. Args: @@ -294,9 +286,12 @@ def _get_juju_secret(self, relation: ops.Relation, unit: ops.Unit) -> Optional[s unit (ops.Unit): The unit the secret is intended for Returns: - secret_id (None | str) if on the relation + secret_id (None | ops.Secret) if on the relation """ - return relation.data[self.charm.unit].get(SECRET_ID.format(unit.name)) + secret_id = SECRET_ID.format(unit.name) + if juju_secret := relation.data[self.charm.unit].get(secret_id): + return self.charm.model.get_secret(id=juju_secret) + return None def _revoke_juju_secret(self, relation: ops.Relation, unit: ops.Unit) -> None: """Revoke and remove juju secret offered to a unit on this relation. @@ -517,7 +512,9 @@ def revoke_tokens( ignore_errors |= state == "pending" # on pending tokens # if cluster doesn't match ignore_errors |= self.charm.get_cluster_name() != joined_cluster(relation, unit) - self.token_strategies[token_strategy].revoke(node, ignore_errors) + self.token_strategies[token_strategy].revoke( + node, self._get_juju_secret(relation, unit), ignore_errors + ) self.drop_node(relation, unit) self._revoke_juju_secret(relation, unit) diff --git a/charms/worker/k8s/tests/unit/test_cloud_integration.py b/charms/worker/k8s/tests/unit/test_cloud_integration.py new file mode 100644 index 00000000..a8e53ec2 --- /dev/null +++ b/charms/worker/k8s/tests/unit/test_cloud_integration.py @@ -0,0 +1,223 @@ +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Unit tests cloud-integration module.""" +from pathlib import Path +from unittest import mock + +import ops +import ops.testing +import pytest +from charm import K8sCharm +from ops.interface_aws.requires import AWSIntegrationRequires +from ops.interface_azure.requires import AzureIntegrationRequires +from ops.interface_gcp.requires import GCPIntegrationRequires + +TEST_CLUSTER_NAME = "my-cluster" + + +@pytest.fixture(autouse=True) +def vendor_name(): + """Mock the ExternalCloudProvider name property.""" + with mock.patch( + "charms.interface_external_cloud_provider.ExternalCloudProvider.name", + new_callable=mock.PropertyMock, + ) as mock_vendor_name: + yield mock_vendor_name + + +@pytest.fixture(params=["worker", "control-plane"]) +def harness(request): + """Craft a ops test harness. + + Args: + request: pytest request object + """ + meta = Path(__file__).parent / "../../charmcraft.yaml" + if request.param == "worker": + meta = Path(__file__).parent / "../../../charmcraft.yaml" + harness = ops.testing.Harness(K8sCharm, meta=meta.read_text()) + harness.begin() + harness.charm.is_worker = request.param == "worker" + with mock.patch.object(harness.charm, "get_cloud_name"): + with mock.patch.object(harness.charm.reconciler, "reconcile"): + yield harness + harness.cleanup() + + +@pytest.mark.parametrize( + "cloud_name, cloud_relation", + [ + ("aws", "aws"), + ("gce", "gcp"), + ("azure", "azure"), + ("unknown", None), + ], + ids=["aws", "gce", "azure", "unknown"], +) +def test_cloud_detection(harness, cloud_name, cloud_relation): + """Test that the cloud property returns the correct integration requires object. + + Args: + harness (ops.testing.Harness): The test harness + cloud_name (str): The name of the cloud + cloud_relation (str): The name of the relation + vendor_name (mock.PropertyMock): The mock for the ExternalCloudProvider name property + """ + harness.charm.get_cloud_name.return_value = cloud_name + integration = harness.charm.cloud_integration + assert integration.cloud is None + if cloud_name != "unknown": + harness.add_relation(cloud_relation, "cloud-integrator") + assert integration.cloud + + +def test_cloud_aws(harness): + """Test that the cloud property returns the correct integration requires object. + + Args: + harness (ops.testing.Harness): The test harness + """ + harness.charm.get_cloud_name.return_value = "aws" + with mock.patch( + "cloud_integration.CloudIntegration.cloud", + new_callable=mock.PropertyMock, + return_value=mock.create_autospec(AWSIntegrationRequires), + ) as mock_property: + mock_cloud = mock_property() + mock_cloud.evaluate_relation.return_value = None + event = mock.MagicMock() + harness.charm.cloud_integration.integrate(TEST_CLUSTER_NAME, event) + if harness.charm.is_worker: + mock_cloud.tag_instance.assert_called_once_with( + {"kubernetes.io/cluster/my-cluster": "owned"} + ) + else: + mock_cloud.tag_instance.assert_called_once_with( + { + "kubernetes.io/cluster/my-cluster": "owned", + "k8s.io/role/master": "true", # wokeignore:rule=master + } + ) + mock_cloud.tag_instance_security_group.assert_called_once_with( + {"kubernetes.io/cluster/my-cluster": "owned"} + ) + mock_cloud.tag_instance_subnet.assert_called_once_with( + {"kubernetes.io/cluster/my-cluster": "owned"} + ) + mock_cloud.enable_object_storage_management.assert_called_once_with(["kubernetes-*"]) + if harness.charm.is_worker: + mock_cloud.enable_load_balancer_management.assert_not_called() + mock_cloud.enable_autoscaling_readonly.assert_not_called() + mock_cloud.enable_instance_modification.assert_not_called() + mock_cloud.enable_region_readonly.assert_not_called() + mock_cloud.enable_network_management.assert_not_called() + mock_cloud.enable_block_storage_management.assert_not_called() + else: + mock_cloud.enable_load_balancer_management.assert_called_once() + mock_cloud.enable_autoscaling_readonly.assert_called_once() + mock_cloud.enable_instance_modification.assert_called_once() + mock_cloud.enable_region_readonly.assert_called_once() + mock_cloud.enable_network_management.assert_called_once() + mock_cloud.enable_block_storage_management.assert_called_once() + mock_cloud.enable_instance_inspection.assert_called_once() + mock_cloud.enable_dns_management.assert_called_once() + mock_cloud.evaluate_relation.assert_called_once_with(event) + + +def test_cloud_gce(harness): + """Test that the cloud property returns the correct integration requires object. + + Args: + harness (ops.testing.Harness): The test harness + """ + harness.charm.get_cloud_name.return_value = "gce" + with mock.patch( + "cloud_integration.CloudIntegration.cloud", + new_callable=mock.PropertyMock, + return_value=mock.create_autospec(GCPIntegrationRequires), + ) as mock_property: + mock_cloud = mock_property() + mock_cloud.evaluate_relation.return_value = None + event = mock.MagicMock() + harness.charm.cloud_integration.integrate(TEST_CLUSTER_NAME, event) + + if harness.charm.is_worker: + mock_cloud.tag_instance.assert_called_once_with({"k8s-io-cluster-name": "my-cluster"}) + else: + mock_cloud.tag_instance.assert_called_once_with( + { + "k8s-io-cluster-name": "my-cluster", + "k8s-io-role-master": "master", # wokeignore:rule=master + } + ) + mock_cloud.enable_object_storage_management.assert_called_once() + if harness.charm.is_worker: + mock_cloud.enable_security_management.assert_not_called() + mock_cloud.enable_network_management.assert_not_called() + mock_cloud.enable_block_storage_management.assert_not_called() + else: + mock_cloud.enable_security_management.assert_called_once() + mock_cloud.enable_network_management.assert_called_once() + mock_cloud.enable_block_storage_management.assert_called_once() + mock_cloud.enable_instance_inspection.assert_called_once() + mock_cloud.enable_dns_management.assert_called_once() + mock_cloud.evaluate_relation.assert_called_once_with(event) + + +def test_cloud_azure(harness): + """Test that the cloud property returns the correct integration requires object. + + Args: + harness (ops.testing.Harness): The test harness + """ + harness.charm.get_cloud_name.return_value = "azure" + with mock.patch( + "cloud_integration.CloudIntegration.cloud", + new_callable=mock.PropertyMock, + return_value=mock.create_autospec(AzureIntegrationRequires), + ) as mock_property: + mock_cloud = mock_property() + mock_cloud.evaluate_relation.return_value = None + event = mock.MagicMock() + harness.charm.cloud_integration.integrate(TEST_CLUSTER_NAME, event) + if harness.charm.is_worker: + mock_cloud.tag_instance.assert_called_once_with({"k8s-io-cluster-name": "my-cluster"}) + else: + mock_cloud.tag_instance.assert_called_once_with( + { + "k8s-io-cluster-name": "my-cluster", + "k8s-io-role-master": "master", # wokeignore:rule=master + } + ) + mock_cloud.enable_object_storage_management.assert_called_once() + if harness.charm.is_worker: + mock_cloud.enable_security_management.assert_not_called() + mock_cloud.enable_loadbalancer_management.assert_not_called() + mock_cloud.enable_network_management.assert_not_called() + mock_cloud.enable_block_storage_management.assert_not_called() + else: + mock_cloud.enable_security_management.assert_called_once() + mock_cloud.enable_loadbalancer_management.assert_called_once() + mock_cloud.enable_network_management.assert_called_once() + mock_cloud.enable_block_storage_management.assert_called_once() + mock_cloud.enable_dns_management.assert_called_once() + mock_cloud.enable_instance_inspection.assert_called_once() + mock_cloud.evaluate_relation.assert_called_once_with(event) + + +def test_cloud_unknown(harness): + """Test that the cloud property returns the correct integration requires object. + + Args: + harness (ops.testing.Harness): The test harness + """ + harness.charm.get_cloud_name.return_value = "unknown" + with mock.patch( + "cloud_integration.CloudIntegration.cloud", + new_callable=mock.PropertyMock, + return_value=None, + ) as mock_property: + event = mock.MagicMock() + harness.charm.cloud_integration.integrate(TEST_CLUSTER_NAME, event) + assert mock_property.called diff --git a/pyproject.toml b/pyproject.toml index 6be5fa48..363ba3ba 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -48,6 +48,8 @@ plugins = "pydantic.mypy" disable = "wrong-import-order,redefined-outer-name,too-many-instance-attributes,too-few-public-methods,no-self-argument,fixme,protected-access" # Ignore Pydantic check: https://github.com/pydantic/pydantic/issues/1961 extension-pkg-whitelist = "pydantic" # wokeignore:rule=whitelist +# Modules can be bigger than 1000 lines +max-module-lines = 1500 [tool.pylint.typecheck] # Ignore typechecking on pylxd manager classes @@ -82,5 +84,6 @@ max-complexity = 10 [tool.codespell] skip = "build,lib,venv,icon.svg,.tox,.git,.mypy_cache,.ruff_cache,.coverage" + [tool.pyright] extraPaths = ["./charms/worker/k8s/lib"] From 0f578ae463940615243f80947a988ebda8d91758 Mon Sep 17 00:00:00 2001 From: "renovate[bot]" <29139614+renovate[bot]@users.noreply.github.com> Date: Tue, 19 Nov 2024 22:39:23 -0600 Subject: [PATCH 3/5] chore(deps): update aquasecurity/trivy-action action to v0.29.0 (#178) Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com> --- .github/workflows/trivy.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/trivy.yaml b/.github/workflows/trivy.yaml index fdb7ac6f..dbe031cd 100644 --- a/.github/workflows/trivy.yaml +++ b/.github/workflows/trivy.yaml @@ -40,7 +40,7 @@ jobs: ref: ${{ matrix.branch }} fetch-depth: 0 - name: Run Trivy vulnerability scanner in repo mode - uses: aquasecurity/trivy-action@0.28.0 + uses: aquasecurity/trivy-action@0.29.0 with: scan-type: "fs" ignore-unfixed: true From 5f78ebbaabe85760a33654bf0303ca094f1eab00 Mon Sep 17 00:00:00 2001 From: Benjamin Schimke Date: Wed, 20 Nov 2024 17:25:39 +0100 Subject: [PATCH 4/5] Add Local storage configuration through charm config (#169) --- charms/worker/k8s/charmcraft.yaml | 25 +++++- .../k8s/lib/charms/k8s/v0/k8sd_api_manager.py | 14 ++-- charms/worker/k8s/src/charm.py | 78 ++++++------------- 3 files changed, 53 insertions(+), 64 deletions(-) diff --git a/charms/worker/k8s/charmcraft.yaml b/charms/worker/k8s/charmcraft.yaml index df1db8cc..b138925b 100644 --- a/charms/worker/k8s/charmcraft.yaml +++ b/charms/worker/k8s/charmcraft.yaml @@ -140,7 +140,7 @@ config: type: string description: | Labels can be used to organize and to select subsets of nodes in the - cluster. Declare node labels in key=value format, separated by spaces. + cluster. Declare node labels in key=value format, separated by spaces. register-with-taints: type: string default: "" @@ -161,6 +161,29 @@ config: CIDR to use for Kubernetes services. After deployment it is only possible to increase the size of the IP range. It is not possible to change or shrink the address range after deployment. + local-storage-enabled: + type: boolean + default: true + description: | + Enable local storage provisioning. This will create a storage class + named "local-storage" that uses the hostPath provisioner. This is + useful for development and testing purposes. It is not recommended for + production use. + local-storage-local-path: + type: string + default: "/var/snap/k8s/common/rawfile-storage" + description: | + The path on the host where local storage will be provisioned. This + path must be writable by the kubelet. This is only used if + local-storage.enabled is set to true. + local-storage-reclaim-policy: + type: string + default: Delete + description: | + The reclaim policy for local storage. This can be either "Delete" or + "Retain". If set to "Delete", the storage will be deleted when the + PersistentVolumeClaim is deleted. If set to "Retain", the storage will + be retained when the PersistentVolumeClaim is deleted. actions: get-kubeconfig: diff --git a/charms/worker/k8s/lib/charms/k8s/v0/k8sd_api_manager.py b/charms/worker/k8s/lib/charms/k8s/v0/k8sd_api_manager.py index 543e8216..6b0cf534 100644 --- a/charms/worker/k8s/lib/charms/k8s/v0/k8sd_api_manager.py +++ b/charms/worker/k8s/lib/charms/k8s/v0/k8sd_api_manager.py @@ -194,7 +194,7 @@ class ClusterMember(BaseModel): datastore_role: Optional[str] = Field(default=None, alias="datastore-role") -class DNSConfig(BaseModel): +class DNSConfig(BaseModel, allow_population_by_field_name=True): """Configuration for the DNS settings of the cluster. Attributes: @@ -210,7 +210,7 @@ class DNSConfig(BaseModel): upstream_nameservers: Optional[List[str]] = Field(default=None, alias="upstream-nameservers") -class IngressConfig(BaseModel): +class IngressConfig(BaseModel, allow_population_by_field_name=True): """Configuration for the ingress settings of the cluster. Attributes: @@ -224,7 +224,7 @@ class IngressConfig(BaseModel): enable_proxy_protocol: Optional[bool] = Field(default=None, alias="enable-proxy-protocol") -class LoadBalancerConfig(BaseModel): +class LoadBalancerConfig(BaseModel, allow_population_by_field_name=True): """Configuration for the load balancer settings of the cluster. Attributes: @@ -250,7 +250,7 @@ class LoadBalancerConfig(BaseModel): bgp_peer_port: Optional[int] = Field(default=None, alias="bgp-peer-port") -class LocalStorageConfig(BaseModel): +class LocalStorageConfig(BaseModel, allow_population_by_field_name=True): """Configuration for the local storage settings of the cluster. Attributes: @@ -266,7 +266,7 @@ class LocalStorageConfig(BaseModel): set_default: Optional[bool] = Field(default=None, alias="set-default") -class NetworkConfig(BaseModel): +class NetworkConfig(BaseModel, allow_population_by_field_name=True): """Configuration for the network settings of the cluster. Attributes: @@ -276,7 +276,7 @@ class NetworkConfig(BaseModel): enabled: Optional[bool] = Field(default=None) -class GatewayConfig(BaseModel): +class GatewayConfig(BaseModel, allow_population_by_field_name=True): """Configuration for the gateway settings of the cluster. Attributes: @@ -286,7 +286,7 @@ class GatewayConfig(BaseModel): enabled: Optional[bool] = Field(default=None) -class MetricsServerConfig(BaseModel): +class MetricsServerConfig(BaseModel, allow_population_by_field_name=True): """Configuration for the metrics server settings of the cluster. Attributes: diff --git a/charms/worker/k8s/src/charm.py b/charms/worker/k8s/src/charm.py index 78190174..162b2716 100755 --- a/charms/worker/k8s/src/charm.py +++ b/charms/worker/k8s/src/charm.py @@ -148,7 +148,7 @@ def __init__(self, *args): self.labeller = LabelMaker( self, kubeconfig_path=self._internal_kubeconfig, kubectl=KUBECTL_PATH ) - self._stored.set_default(is_dying=False, cluster_name=str(), annotations={}) + self._stored.set_default(is_dying=False, cluster_name=str()) self.cos_agent = COSAgentProvider( self, @@ -320,8 +320,7 @@ def _bootstrap_k8s_snap(self): bootstrap_config = BootstrapConfig.construct() self._configure_datastore(bootstrap_config) - self._configure_cloud_provider(bootstrap_config) - self._configure_annotations(bootstrap_config) + bootstrap_config.cluster_config = self._assemble_cluster_config() bootstrap_config.service_cidr = str(self.config["service-cidr"]) bootstrap_config.control_plane_taints = str(self.config["register-with-taints"]).split() bootstrap_config.extra_sans = [_get_public_address()] @@ -400,40 +399,29 @@ def _get_valid_annotations(self) -> Optional[dict]: return annotations - def _configure_annotations(self, config: BootstrapConfig): - """Configure the annotations for the Canonical Kubernetes cluster. + def _assemble_cluster_config(self) -> UserFacingClusterConfig: + """Retrieve the cluster config from charm configuration and charm relations. - Args: - config (BootstrapConfig): Configuration object to bootstrap the cluster. + Returns: + UserFacingClusterConfig: The expected cluster configuration. """ - annotations = self._get_valid_annotations() - if annotations is None: - return - - if not config.cluster_config: - config.cluster_config = UserFacingClusterConfig(annotations=annotations) - else: - config.cluster_config.annotations = annotations - - @status.on_error( - ops.BlockedStatus("Invalid Annotations"), - AssertionError, - ) - def _update_annotations(self): - """Update the annotations for the Canonical Kubernetes cluster.""" - annotations = self._get_valid_annotations() - if annotations is None: - return - - status.add(ops.MaintenanceStatus("Updating Annotations")) - log.info("Updating Annotations") + local_storage = LocalStorageConfig( + enabled=self.config.get("local-storage-enabled"), + local_path=self.config.get("local-storage-local-path"), + reclaim_policy=self.config.get("local-storage-reclaim-policy"), + # Note(ben): set_default is intentionally omitted, see: + # https://github.com/canonical/k8s-operator/pull/169/files#r1847378214 + ) - if self._stored.annotations == annotations: - return + cloud_provider = None + if self.xcp.has_xcp: + cloud_provider = "external" - config = UserFacingClusterConfig(annotations=annotations) - update_request = UpdateClusterConfigRequest(config=config) - self.api_manager.update_cluster_config(update_request) + return UserFacingClusterConfig( + local_storage=local_storage, + annotations=self._get_valid_annotations(), + cloud_provider=cloud_provider, + ) def _configure_datastore(self, config: Union[BootstrapConfig, UpdateClusterConfigRequest]): """Configure the datastore for the Kubernetes cluster. @@ -481,27 +469,6 @@ def _configure_datastore(self, config: Union[BootstrapConfig, UpdateClusterConfi elif datastore == "dqlite": log.info("Using dqlite as datastore") - def _configure_cloud_provider( - self, config: Union[BootstrapConfig, UpdateClusterConfigRequest] - ): - """Configure the cloud-provider for the Kubernetes cluster. - - Args: - config (BootstrapConfig): The bootstrap configuration object for - the Kubernetes cluster that is being configured. This object - will be modified in-place. - """ - if self.xcp.has_xcp: - log.info("Using external as cloud-provider") - if isinstance(config, BootstrapConfig): - if not (ufcg := config.cluster_config): - ufcg = config.cluster_config = UserFacingClusterConfig() - elif isinstance(config, UpdateClusterConfigRequest): - if not (ufcg := config.config): - ufcg = config.config = UserFacingClusterConfig() - - ufcg.cloud_provider = "external" - def _revoke_cluster_tokens(self, event: ops.EventBase): """Revoke tokens for the units in the cluster and k8s-cluster relations. @@ -613,7 +580,7 @@ def _ensure_cluster_config(self): update_request = UpdateClusterConfigRequest() self._configure_datastore(update_request) - self._configure_cloud_provider(update_request) + update_request.config = self._assemble_cluster_config() configure_kube_control(self) self.api_manager.update_cluster_config(update_request) @@ -792,7 +759,6 @@ def _reconcile(self, event: ops.EventBase): self._k8s_info(event) self._bootstrap_k8s_snap() self._enable_functionalities() - self._update_annotations() self._create_cluster_tokens() self._create_cos_tokens() self._apply_cos_requirements() From bfb31660590a3b587e92b7d599db2a1142632fd2 Mon Sep 17 00:00:00 2001 From: Benjamin Schimke Date: Wed, 20 Nov 2024 20:10:45 +0100 Subject: [PATCH 5/5] Add Gateway charm options (#177) --- charms/worker/k8s/charmcraft.yaml | 5 +++++ charms/worker/k8s/src/charm.py | 6 ++++++ 2 files changed, 11 insertions(+) diff --git a/charms/worker/k8s/charmcraft.yaml b/charms/worker/k8s/charmcraft.yaml index b138925b..c5b3ceab 100644 --- a/charms/worker/k8s/charmcraft.yaml +++ b/charms/worker/k8s/charmcraft.yaml @@ -184,6 +184,11 @@ config: "Retain". If set to "Delete", the storage will be deleted when the PersistentVolumeClaim is deleted. If set to "Retain", the storage will be retained when the PersistentVolumeClaim is deleted. + gateway-enabled: + type: boolean + default: false + description: | + Enable/Disable the gateway feature on the cluster. actions: get-kubeconfig: diff --git a/charms/worker/k8s/src/charm.py b/charms/worker/k8s/src/charm.py index 162b2716..7794ac84 100755 --- a/charms/worker/k8s/src/charm.py +++ b/charms/worker/k8s/src/charm.py @@ -41,6 +41,7 @@ ControlPlaneNodeJoinConfig, CreateClusterRequest, DNSConfig, + GatewayConfig, InvalidResponseError, JoinClusterRequest, K8sdAPIManager, @@ -413,12 +414,17 @@ def _assemble_cluster_config(self) -> UserFacingClusterConfig: # https://github.com/canonical/k8s-operator/pull/169/files#r1847378214 ) + gateway = GatewayConfig( + enabled=self.config.get("gateway-enabled"), + ) + cloud_provider = None if self.xcp.has_xcp: cloud_provider = "external" return UserFacingClusterConfig( local_storage=local_storage, + gateway=gateway, annotations=self._get_valid_annotations(), cloud_provider=cloud_provider, )