Skip to content

Commit

Permalink
Merge branch 'main' into extra-sans
Browse files Browse the repository at this point in the history
  • Loading branch information
addyess authored Dec 4, 2024
2 parents cd7f9b5 + 5d6af4b commit f703956
Show file tree
Hide file tree
Showing 9 changed files with 324 additions and 25 deletions.
1 change: 1 addition & 0 deletions charms/worker/k8s/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ typing_extensions==4.12.2
websocket-client==1.8.0
poetry-core==1.9.1
lightkube==0.15.5
httpx==0.27.2
48 changes: 41 additions & 7 deletions charms/worker/k8s/src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@
import shlex
import socket
import subprocess
from collections import defaultdict
from functools import cached_property
from pathlib import Path
from time import sleep
from typing import Dict, Optional, Union
from typing import Dict, List, Optional, Union
from urllib.parse import urlparse

import charms.contextual_status as status
Expand Down Expand Up @@ -123,6 +124,8 @@ class K8sCharm(ops.CharmBase):
is_worker: true if this is a worker unit
is_control_plane: true if this is a control-plane unit
lead_control_plane: true if this is a control-plane unit and its the leader
is_upgrade_granted: true if the upgrade has been granted
datastore: the datastore used for Kubernetes
"""

_stored = ops.StoredState()
Expand All @@ -139,16 +142,16 @@ def __init__(self, *args):
xcp_relation = "external-cloud-provider" if self.is_control_plane else ""
self.cloud_integration = CloudIntegration(self, self.is_control_plane)
self.xcp = ExternalCloudProvider(self, xcp_relation)
self.cluster_inspector = ClusterInspector(kubeconfig_path=KUBECONFIG)
self.cluster_inspector = ClusterInspector(kubeconfig_path=self._internal_kubeconfig)
self.upgrade = K8sUpgrade(
self,
node_manager=self.cluster_inspector,
cluster_inspector=self.cluster_inspector,
relation_name="upgrade",
substrate="vm",
dependency_model=K8sDependenciesModel(**DEPENDENCIES),
)
self.cos = COSIntegration(self)
self.update_status = update_status.Handler(self)
self.update_status = update_status.Handler(self, self.upgrade)
self.reconciler = Reconciler(
self, self._reconcile, exit_status=self.update_status.active_status
)
Expand All @@ -161,7 +164,8 @@ def __init__(self, *args):
user_label_key="node-labels",
timeout=15,
)
self._stored.set_default(is_dying=False, cluster_name=str())
self._upgrade_snap = False
self._stored.set_default(is_dying=False, cluster_name=str(), upgrade_granted=False)

self.cos_agent = COSAgentProvider(
self,
Expand Down Expand Up @@ -227,6 +231,35 @@ def is_worker(self) -> bool:
"""Returns true if the unit is a worker."""
return self.meta.name == "k8s-worker"

@property
def datastore(self) -> str:
"""Return the datastore type."""
return str(self.config.get("bootstrap-datastore"))

def get_worker_versions(self) -> Dict[str, List[ops.Unit]]:
"""Get the versions of the worker units.
Returns:
Dict[str, List[ops.Unit]]: A dictionary of versions and the units that have them.
"""
if not (relation := self.model.get_relation("k8s-cluster")):
return {}

versions = defaultdict(list)
for unit in relation.units:
if version := relation.data[unit].get("version"):
versions[version].append(unit)
return versions

def grant_upgrade(self):
"""Grant the upgrade to the charm."""
self._upgrade_snap = True

@property
def is_upgrade_granted(self) -> bool:
"""Check if the upgrade has been granted."""
return self._upgrade_snap

def _apply_proxy_environment(self):
"""Apply the proxy settings from environment variables."""
proxy_settings = self._get_proxy_env()
Expand Down Expand Up @@ -701,8 +734,9 @@ def _announce_kubernetes_version(self):
if not unit_version:
raise ReconcilerError(f"Waiting for version from {unit.name}")
if unit_version != local_version:
status.add(ops.BlockedStatus(f"Version mismatch with {unit.name}"))
raise ReconcilerError(f"Version mismatch with {unit.name}")
# NOTE: Add a check to validate if we are doing an upgrade
status.add(ops.WaitingStatus("Upgrading the cluster"))
return
relation.data[self.app]["version"] = local_version

def _get_proxy_env(self) -> Dict[str, str]:
Expand Down
8 changes: 6 additions & 2 deletions charms/worker/k8s/src/events/update_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import reschedule
from protocols import K8sCharmProtocol
from snap import version as snap_version
from upgrade import K8sUpgrade

# Log messages can be retrieved using juju debug-log
log = logging.getLogger(__name__)
Expand Down Expand Up @@ -64,24 +65,27 @@ class Handler(ops.Object):
the unit's status during the update process.
"""

def __init__(self, charm: K8sCharmProtocol):
def __init__(self, charm: K8sCharmProtocol, upgrade: K8sUpgrade):
"""Initialize the UpdateStatusEvent.
Args:
charm: The charm instance that is instantiating this event.
upgrade: The upgrade instance that handles the upgrade process.
"""
super().__init__(charm, "update_status")
self.charm = charm
self.upgrade = upgrade
self.active_status = DynamicActiveStatus()
self.charm.framework.observe(self.charm.on.update_status, self._on_update_status)

def _on_update_status(self, _event: ops.UpdateStatusEvent):
def _on_update_status(self, event: ops.UpdateStatusEvent):
"""Handle update-status event."""
if not self.charm.reconciler.stored.reconciled:
return

try:
with status.context(self.charm.unit, exit_status=self.active_status):
self.upgrade.set_upgrade_status(event)
self.run()
except status.ReconcilerError:
log.exception("Can't update_status")
Expand Down
23 changes: 22 additions & 1 deletion charms/worker/k8s/src/literals.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,27 @@

"""Literals for the charm."""

SNAP_NAME = "k8s"

K8S_COMMON_SERVICES = [
"kubelet",
"kube-proxy",
"k8sd",
]

K8S_DQLITE_SERVICE = "k8s-dqlite"

K8S_CONTROL_PLANE_SERVICES = [
"kube-apiserver",
K8S_DQLITE_SERVICE,
"kube-controller-manager",
"kube-scheduler",
]

K8S_WORKER_SERVICES = [
"k8s-apiserver-proxy",
]

DEPENDENCIES = {
# NOTE: Update the dependencies for the k8s-charm before releasing.
"k8s_charm": {
Expand All @@ -16,6 +37,6 @@
"dependencies": {"k8s-worker": "^1.30, < 1.32"},
"name": "k8s",
"upgrade_supported": "^1.30, < 1.32",
"version": "1.31.2",
"version": "1.31.3",
},
}
28 changes: 28 additions & 0 deletions charms/worker/k8s/src/protocols.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

"""Protocol definitions module."""

from typing import Dict, List

import ops
from charms.interface_external_cloud_provider import ExternalCloudProvider
from charms.k8s.v0.k8sd_api_manager import K8sdAPIManager
Expand All @@ -18,12 +20,22 @@ class K8sCharmProtocol(ops.CharmBase):
kube_control (KubeControlProvides): The kube-control interface.
xcp (ExternalCloudProvider): The external cloud provider interface.
reconciler (Reconciler): The reconciler for the charm
is_upgrade_granted (bool): Whether the upgrade is granted.
lead_control_plane (bool): Whether the charm is the lead control plane.
is_control_plane (bool): Whether the charm is a control plane.
is_worker (bool): Whether the charm is a worker.
datastore (str): The datastore for Kubernetes.
"""

api_manager: K8sdAPIManager
kube_control: KubeControlProvides
xcp: ExternalCloudProvider
reconciler: Reconciler
is_upgrade_granted: bool
lead_control_plane: bool
is_control_plane: bool
is_worker: bool
datastore: str

def get_cluster_name(self) -> str:
"""Get the cluster name.
Expand All @@ -33,6 +45,14 @@ def get_cluster_name(self) -> str:
"""
raise NotImplementedError

def grant_upgrade(self) -> None:
"""Grant the upgrade.
Raises:
NotImplementedError: If the method is not implemented.
"""
raise NotImplementedError

def get_cloud_name(self) -> str:
"""Get the cloud name.
Expand All @@ -48,3 +68,11 @@ def _is_node_ready(self) -> bool:
NotImplementedError: If the method is not implemented.
"""
raise NotImplementedError

def get_worker_versions(self) -> Dict[str, List[ops.Unit]]:
"""Get the worker versions.
Raises:
NotImplementedError: If the method is not implemented.
"""
raise NotImplementedError
49 changes: 46 additions & 3 deletions charms/worker/k8s/src/snap.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import charms.operator_libs_linux.v2.snap as snap_lib
import ops
import yaml
from protocols import K8sCharmProtocol
from pydantic import BaseModel, Field, ValidationError, parse_obj_as, validator
from typing_extensions import Annotated

Expand Down Expand Up @@ -263,7 +264,7 @@ def _parse_management_arguments(charm: ops.CharmBase) -> List[SnapArgument]:
return args


def management(charm: ops.CharmBase) -> None:
def management(charm: K8sCharmProtocol) -> None:
"""Manage snap installations on this machine.
Arguments:
Expand All @@ -272,7 +273,7 @@ def management(charm: ops.CharmBase) -> None:
cache = snap_lib.SnapCache()
for args in _parse_management_arguments(charm):
which = cache[args.name]
if block_refresh(which, args):
if block_refresh(which, args, charm.is_upgrade_granted):
continue
install_args = args.dict(exclude_none=True)
if isinstance(args, SnapFileArgument) and which.revision != "x1":
Expand All @@ -287,12 +288,13 @@ def management(charm: ops.CharmBase) -> None:
which.ensure(**install_args)


def block_refresh(which: snap_lib.Snap, args: SnapArgument) -> bool:
def block_refresh(which: snap_lib.Snap, args: SnapArgument, upgrade_granted: bool = False) -> bool:
"""Block snap refreshes if the snap is in a specific state.
Arguments:
which: The snap to check
args: The snap arguments
upgrade_granted: If the upgrade is granted
Returns:
bool: True if the snap should be blocked from refreshing
Expand All @@ -303,6 +305,9 @@ def block_refresh(which: snap_lib.Snap, args: SnapArgument) -> bool:
if _overridden_snap_installation().exists():
log.info("Allowing %s snap refresh due to snap installation override", args.name)
return False
if upgrade_granted:
log.info("Allowing %s snap refresh due to upgrade-granted", args.name)
return False
if isinstance(args, SnapStoreArgument) and args.revision:
if block := which.revision != args.revision:
log.info("Blocking %s snap refresh to revision=%s", args.name, args.revision)
Expand Down Expand Up @@ -342,3 +347,41 @@ def version(snap: str) -> Tuple[Optional[str], bool]:

log.info("Snap k8s not found or no version available.")
return None, overridden


def stop(snap_name: str, services: List[str]) -> None:
"""Stop the services of the snap on this machine.
Arguments:
snap_name: The name of the snap
services: The services to stop
Raises:
SnapError: If the snap isn't installed
"""
cache = snap_lib.SnapCache()
if snap_name not in cache:
message = f"Snap '{snap_name}' not installed"
log.error(message)
raise snap_lib.SnapError(message)
snap = cache[snap_name]
snap.stop(services=services)


def start(snap_name: str, services: List[str]) -> None:
"""Start the services of the snap on this machine.
Arguments:
snap_name: The name of the snap
services: The services to start
Raises:
SnapError: If the snap isn't installed
"""
cache = snap_lib.SnapCache()
if snap_name not in cache:
message = f"Snap '{snap_name}' not installed"
log.error(message)
raise snap_lib.SnapError(message)
snap = cache[snap_name]
snap.start(services=services)
Loading

0 comments on commit f703956

Please sign in to comment.