Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement upgrade-relation for control plane nodes #200

Merged
merged 4 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions charms/worker/k8s/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ typing_extensions==4.12.2
websocket-client==1.8.0
poetry-core==1.9.1
lightkube==0.15.5
httpx==0.27.2
48 changes: 41 additions & 7 deletions charms/worker/k8s/src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@
import shlex
import socket
import subprocess
from collections import defaultdict
from functools import cached_property
from pathlib import Path
from time import sleep
from typing import Dict, Optional, Union
from typing import Dict, List, Optional, Union
from urllib.parse import urlparse

import charms.contextual_status as status
Expand Down Expand Up @@ -123,6 +124,8 @@ class K8sCharm(ops.CharmBase):
is_worker: true if this is a worker unit
is_control_plane: true if this is a control-plane unit
lead_control_plane: true if this is a control-plane unit and its the leader
is_upgrade_granted: true if the upgrade has been granted
datastore: the datastore used for Kubernetes
"""

_stored = ops.StoredState()
Expand All @@ -139,16 +142,16 @@ def __init__(self, *args):
xcp_relation = "external-cloud-provider" if self.is_control_plane else ""
self.cloud_integration = CloudIntegration(self, self.is_control_plane)
self.xcp = ExternalCloudProvider(self, xcp_relation)
self.cluster_inspector = ClusterInspector(kubeconfig_path=KUBECONFIG)
self.cluster_inspector = ClusterInspector(kubeconfig_path=self._internal_kubeconfig)
self.upgrade = K8sUpgrade(
self,
node_manager=self.cluster_inspector,
cluster_inspector=self.cluster_inspector,
relation_name="upgrade",
substrate="vm",
dependency_model=K8sDependenciesModel(**DEPENDENCIES),
)
self.cos = COSIntegration(self)
self.update_status = update_status.Handler(self)
self.update_status = update_status.Handler(self, self.upgrade)
self.reconciler = Reconciler(
self, self._reconcile, exit_status=self.update_status.active_status
)
Expand All @@ -161,7 +164,8 @@ def __init__(self, *args):
user_label_key="node-labels",
timeout=15,
)
self._stored.set_default(is_dying=False, cluster_name=str())
self._upgrade_snap = False
self._stored.set_default(is_dying=False, cluster_name=str(), upgrade_granted=False)

self.cos_agent = COSAgentProvider(
self,
Expand Down Expand Up @@ -227,6 +231,35 @@ def is_worker(self) -> bool:
"""Returns true if the unit is a worker."""
return self.meta.name == "k8s-worker"

@property
def datastore(self) -> str:
"""Return the datastore type."""
return str(self.config.get("bootstrap-datastore"))

def get_worker_versions(self) -> Dict[str, List[ops.Unit]]:
"""Get the versions of the worker units.

Returns:
Dict[str, List[ops.Unit]]: A dictionary of versions and the units that have them.
"""
if not (relation := self.model.get_relation("k8s-cluster")):
return {}

versions = defaultdict(list)
for unit in relation.units:
if version := relation.data[unit].get("version"):
versions[version].append(unit)
return versions

def grant_upgrade(self):
"""Grant the upgrade to the charm."""
self._upgrade_snap = True
eaudetcobello marked this conversation as resolved.
Show resolved Hide resolved

@property
def is_upgrade_granted(self) -> bool:
"""Check if the upgrade has been granted."""
return self._upgrade_snap
eaudetcobello marked this conversation as resolved.
Show resolved Hide resolved

def _apply_proxy_environment(self):
"""Apply the proxy settings from environment variables."""
proxy_settings = self._get_proxy_env()
Expand Down Expand Up @@ -694,8 +727,9 @@ def _announce_kubernetes_version(self):
if not unit_version:
raise ReconcilerError(f"Waiting for version from {unit.name}")
if unit_version != local_version:
status.add(ops.BlockedStatus(f"Version mismatch with {unit.name}"))
raise ReconcilerError(f"Version mismatch with {unit.name}")
# NOTE: Add a check to validate if we are doing an upgrade
status.add(ops.WaitingStatus("Upgrading the cluster"))
return
Comment on lines -697 to +732
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method _announce_kubernetes_version feels similar to the above get_worker_version in some ways that it's reading the version field from the k8s-cluster or cluster relation.

So is a version mismatch now a waiting situation because an upgrade is in-progress? Is that why there's a NOTE here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh, i see now that it's because the raising of the reconciler error prevented the upgrade events from engaging. Whew -- we still need a check to let folks know they're running on out-of-spec-version of the applications

For now _announce_kubernetes_version is only run on the lead CP. say you deployed and related a kw 1.35 to a 1.31 cluster. I imagine the 1.35 workers may not join. Should they join? Is the k8s-cp the right place to gripe about it? You're right that we should at least make sure we're not in an upgrade scenario before we raise the reconciler error.

relation.data[self.app]["version"] = local_version

def _get_proxy_env(self) -> Dict[str, str]:
Expand Down
8 changes: 6 additions & 2 deletions charms/worker/k8s/src/events/update_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import reschedule
from protocols import K8sCharmProtocol
from snap import version as snap_version
from upgrade import K8sUpgrade

# Log messages can be retrieved using juju debug-log
log = logging.getLogger(__name__)
Expand Down Expand Up @@ -64,24 +65,27 @@ class Handler(ops.Object):
the unit's status during the update process.
"""

def __init__(self, charm: K8sCharmProtocol):
def __init__(self, charm: K8sCharmProtocol, upgrade: K8sUpgrade):
"""Initialize the UpdateStatusEvent.

Args:
charm: The charm instance that is instantiating this event.
upgrade: The upgrade instance that handles the upgrade process.
"""
super().__init__(charm, "update_status")
self.charm = charm
self.upgrade = upgrade
self.active_status = DynamicActiveStatus()
self.charm.framework.observe(self.charm.on.update_status, self._on_update_status)

def _on_update_status(self, _event: ops.UpdateStatusEvent):
def _on_update_status(self, event: ops.UpdateStatusEvent):
"""Handle update-status event."""
if not self.charm.reconciler.stored.reconciled:
return

try:
with status.context(self.charm.unit, exit_status=self.active_status):
self.upgrade.set_upgrade_status(event)
self.run()
except status.ReconcilerError:
log.exception("Can't update_status")
Expand Down
23 changes: 22 additions & 1 deletion charms/worker/k8s/src/literals.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,27 @@

"""Literals for the charm."""

SNAP_NAME = "k8s"

K8S_COMMON_SERVICES = [
"kubelet",
"kube-proxy",
"k8sd",
]

K8S_DQLITE_SERVICE = "k8s-dqlite"

K8S_CONTROL_PLANE_SERVICES = [
"kube-apiserver",
K8S_DQLITE_SERVICE,
"kube-controller-manager",
"kube-scheduler",
]

K8S_WORKER_SERVICES = [
"k8s-apiserver-proxy",
]

DEPENDENCIES = {
# NOTE: Update the dependencies for the k8s-charm before releasing.
"k8s_charm": {
Expand All @@ -16,6 +37,6 @@
"dependencies": {"k8s-worker": "^1.30, < 1.32"},
"name": "k8s",
"upgrade_supported": "^1.30, < 1.32",
"version": "1.31.2",
"version": "1.31.3",
},
}
28 changes: 28 additions & 0 deletions charms/worker/k8s/src/protocols.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

"""Protocol definitions module."""

from typing import Dict, List

import ops
from charms.interface_external_cloud_provider import ExternalCloudProvider
from charms.k8s.v0.k8sd_api_manager import K8sdAPIManager
Expand All @@ -18,12 +20,22 @@ class K8sCharmProtocol(ops.CharmBase):
kube_control (KubeControlProvides): The kube-control interface.
xcp (ExternalCloudProvider): The external cloud provider interface.
reconciler (Reconciler): The reconciler for the charm
is_upgrade_granted (bool): Whether the upgrade is granted.
lead_control_plane (bool): Whether the charm is the lead control plane.
is_control_plane (bool): Whether the charm is a control plane.
is_worker (bool): Whether the charm is a worker.
datastore (str): The datastore for Kubernetes.
"""

api_manager: K8sdAPIManager
kube_control: KubeControlProvides
xcp: ExternalCloudProvider
reconciler: Reconciler
is_upgrade_granted: bool
lead_control_plane: bool
is_control_plane: bool
is_worker: bool
datastore: str

def get_cluster_name(self) -> str:
"""Get the cluster name.
Expand All @@ -33,6 +45,14 @@ def get_cluster_name(self) -> str:
"""
raise NotImplementedError

def grant_upgrade(self) -> None:
"""Grant the upgrade.

Raises:
NotImplementedError: If the method is not implemented.
"""
raise NotImplementedError

def get_cloud_name(self) -> str:
"""Get the cloud name.

Expand All @@ -48,3 +68,11 @@ def _is_node_ready(self) -> bool:
NotImplementedError: If the method is not implemented.
"""
raise NotImplementedError

def get_worker_versions(self) -> Dict[str, List[ops.Unit]]:
"""Get the worker versions.

Raises:
NotImplementedError: If the method is not implemented.
"""
raise NotImplementedError
49 changes: 46 additions & 3 deletions charms/worker/k8s/src/snap.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import charms.operator_libs_linux.v2.snap as snap_lib
import ops
import yaml
from protocols import K8sCharmProtocol
from pydantic import BaseModel, Field, ValidationError, parse_obj_as, validator
from typing_extensions import Annotated

Expand Down Expand Up @@ -263,7 +264,7 @@ def _parse_management_arguments(charm: ops.CharmBase) -> List[SnapArgument]:
return args


def management(charm: ops.CharmBase) -> None:
def management(charm: K8sCharmProtocol) -> None:
"""Manage snap installations on this machine.

Arguments:
Expand All @@ -272,7 +273,7 @@ def management(charm: ops.CharmBase) -> None:
cache = snap_lib.SnapCache()
for args in _parse_management_arguments(charm):
which = cache[args.name]
if block_refresh(which, args):
if block_refresh(which, args, charm.is_upgrade_granted):
continue
install_args = args.dict(exclude_none=True)
if isinstance(args, SnapFileArgument) and which.revision != "x1":
Expand All @@ -287,12 +288,13 @@ def management(charm: ops.CharmBase) -> None:
which.ensure(**install_args)


def block_refresh(which: snap_lib.Snap, args: SnapArgument) -> bool:
def block_refresh(which: snap_lib.Snap, args: SnapArgument, upgrade_granted: bool = False) -> bool:
"""Block snap refreshes if the snap is in a specific state.

Arguments:
which: The snap to check
args: The snap arguments
upgrade_granted: If the upgrade is granted

Returns:
bool: True if the snap should be blocked from refreshing
Expand All @@ -303,6 +305,9 @@ def block_refresh(which: snap_lib.Snap, args: SnapArgument) -> bool:
if _overridden_snap_installation().exists():
log.info("Allowing %s snap refresh due to snap installation override", args.name)
return False
if upgrade_granted:
log.info("Allowing %s snap refresh due to upgrade-granted", args.name)
return False
if isinstance(args, SnapStoreArgument) and args.revision:
if block := which.revision != args.revision:
log.info("Blocking %s snap refresh to revision=%s", args.name, args.revision)
Expand Down Expand Up @@ -342,3 +347,41 @@ def version(snap: str) -> Tuple[Optional[str], bool]:

log.info("Snap k8s not found or no version available.")
return None, overridden


def stop(snap_name: str, services: List[str]) -> None:
"""Stop the services of the snap on this machine.

Arguments:
snap_name: The name of the snap
services: The services to stop

Raises:
SnapError: If the snap isn't installed
"""
cache = snap_lib.SnapCache()
if snap_name not in cache:
message = f"Snap '{snap_name}' not installed"
log.error(message)
raise snap_lib.SnapError(message)
snap = cache[snap_name]
snap.stop(services=services)


def start(snap_name: str, services: List[str]) -> None:
"""Start the services of the snap on this machine.

Arguments:
snap_name: The name of the snap
services: The services to start

Raises:
SnapError: If the snap isn't installed
"""
cache = snap_lib.SnapCache()
if snap_name not in cache:
message = f"Snap '{snap_name}' not installed"
log.error(message)
raise snap_lib.SnapError(message)
snap = cache[snap_name]
snap.start(services=services)
Loading
Loading