Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement upgrade-relation for control plane nodes #200

Merged
merged 4 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions charms/worker/k8s/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ typing_extensions==4.12.2
websocket-client==1.8.0
poetry-core==1.9.1
lightkube==0.15.5
httpx==0.27.2
39 changes: 34 additions & 5 deletions charms/worker/k8s/src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ class K8sCharm(ops.CharmBase):
is_worker: true if this is a worker unit
is_control_plane: true if this is a control-plane unit
lead_control_plane: true if this is a control-plane unit and its the leader
is_upgrade_granted: true if the upgrade has been granted
"""

_stored = ops.StoredState()
Expand All @@ -139,7 +140,7 @@ def __init__(self, *args):
xcp_relation = "external-cloud-provider" if self.is_control_plane else ""
self.cloud_integration = CloudIntegration(self, self.is_control_plane)
self.xcp = ExternalCloudProvider(self, xcp_relation)
self.cluster_inspector = ClusterInspector(kubeconfig_path=KUBECONFIG)
self.cluster_inspector = ClusterInspector(kubeconfig_path=self._internal_kubeconfig)
self.upgrade = K8sUpgrade(
self,
node_manager=self.cluster_inspector,
Expand All @@ -148,7 +149,7 @@ def __init__(self, *args):
dependency_model=K8sDependenciesModel(**DEPENDENCIES),
)
self.cos = COSIntegration(self)
self.update_status = update_status.Handler(self)
self.update_status = update_status.Handler(self, self.upgrade)
self.reconciler = Reconciler(
self, self._reconcile, exit_status=self.update_status.active_status
)
Expand All @@ -161,7 +162,7 @@ def __init__(self, *args):
user_label_key="node-labels",
timeout=15,
)
self._stored.set_default(is_dying=False, cluster_name=str())
self._stored.set_default(is_dying=False, cluster_name=str(), upgrade_granted=False)

self.cos_agent = COSAgentProvider(
self,
Expand Down Expand Up @@ -227,6 +228,33 @@ def is_worker(self) -> bool:
"""Returns true if the unit is a worker."""
return self.meta.name == "k8s-worker"

def get_worker_version(self) -> Optional[str]:
"""Retrieve the worker version from the k8s-cluster relation.

Returns:
Optional[str]: The worker version if available, otherwise None.
"""
if not (relation := self.model.get_relation("k8s-cluster")):
return None
mateoflorido marked this conversation as resolved.
Show resolved Hide resolved

for unit in relation.units:
if unit.name == self.unit.name:
return relation.data[unit].get("version")
mateoflorido marked this conversation as resolved.
Show resolved Hide resolved
return None

def grant_upgrade(self):
"""Grant the upgrade to the charm."""
self._stored.upgrade_granted = True

def reset_upgrade(self):
"""Reset the upgrade status."""
self._stored.upgrade_granted = False

@property
def is_upgrade_granted(self) -> bool:
"""Check if the upgrade has been granted."""
return bool(self._stored.upgrade_granted)
mateoflorido marked this conversation as resolved.
Show resolved Hide resolved

def _apply_proxy_environment(self):
"""Apply the proxy settings from environment variables."""
proxy_settings = self._get_proxy_env()
Expand Down Expand Up @@ -694,8 +722,9 @@ def _announce_kubernetes_version(self):
if not unit_version:
raise ReconcilerError(f"Waiting for version from {unit.name}")
if unit_version != local_version:
status.add(ops.BlockedStatus(f"Version mismatch with {unit.name}"))
raise ReconcilerError(f"Version mismatch with {unit.name}")
# NOTE: Add a check to validate if we are doing an upgrade
status.add(ops.WaitingStatus("Upgrading the cluster"))
return
Comment on lines -697 to +732
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method _announce_kubernetes_version feels similar to the above get_worker_version in some ways that it's reading the version field from the k8s-cluster or cluster relation.

So is a version mismatch now a waiting situation because an upgrade is in-progress? Is that why there's a NOTE here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh, i see now that it's because the raising of the reconciler error prevented the upgrade events from engaging. Whew -- we still need a check to let folks know they're running on out-of-spec-version of the applications

For now _announce_kubernetes_version is only run on the lead CP. say you deployed and related a kw 1.35 to a 1.31 cluster. I imagine the 1.35 workers may not join. Should they join? Is the k8s-cp the right place to gripe about it? You're right that we should at least make sure we're not in an upgrade scenario before we raise the reconciler error.

relation.data[self.app]["version"] = local_version

def _get_proxy_env(self) -> Dict[str, str]:
Expand Down
8 changes: 6 additions & 2 deletions charms/worker/k8s/src/events/update_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import reschedule
from protocols import K8sCharmProtocol
from snap import version as snap_version
from upgrade import K8sUpgrade

# Log messages can be retrieved using juju debug-log
log = logging.getLogger(__name__)
Expand Down Expand Up @@ -64,24 +65,27 @@ class Handler(ops.Object):
the unit's status during the update process.
"""

def __init__(self, charm: K8sCharmProtocol):
def __init__(self, charm: K8sCharmProtocol, upgrade: K8sUpgrade):
"""Initialize the UpdateStatusEvent.
Args:
charm: The charm instance that is instantiating this event.
upgrade: The upgrade instance that handles the upgrade process.
"""
super().__init__(charm, "update_status")
self.charm = charm
self.upgrade = upgrade
self.active_status = DynamicActiveStatus()
self.charm.framework.observe(self.charm.on.update_status, self._on_update_status)

def _on_update_status(self, _event: ops.UpdateStatusEvent):
def _on_update_status(self, event: ops.UpdateStatusEvent):
"""Handle update-status event."""
if not self.charm.reconciler.stored.reconciled:
return

try:
with status.context(self.charm.unit, exit_status=self.active_status):
self.upgrade.set_upgrade_status(event)
self.run()
except status.ReconcilerError:
log.exception("Can't update_status")
Expand Down
16 changes: 15 additions & 1 deletion charms/worker/k8s/src/literals.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,20 @@

"""Literals for the charm."""

SNAP_NAME = "k8s"

K8S_COMMON_SERVICES = [
"kubelet",
"kube-proxy",
"k8sd",
]

K8S_CONTROL_PLANE_SERVICES = [
"kube-apiserver",
"kube-controller-manager",
"kube-scheduler",
]
mateoflorido marked this conversation as resolved.
Show resolved Hide resolved

DEPENDENCIES = {
# NOTE: Update the dependencies for the k8s-charm before releasing.
"k8s_charm": {
Expand All @@ -16,6 +30,6 @@
"dependencies": {"k8s-worker": "^1.30, < 1.32"},
"name": "k8s",
"upgrade_supported": "^1.30, < 1.32",
"version": "1.31.2",
"version": "1.31.3",
},
}
46 changes: 46 additions & 0 deletions charms/worker/k8s/src/protocols.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ class K8sCharmProtocol(ops.CharmBase):
kube_control (KubeControlProvides): The kube-control interface.
xcp (ExternalCloudProvider): The external cloud provider interface.
reconciler (Reconciler): The reconciler for the charm
is_upgrade_granted (bool): Whether the upgrade is granted.
lead_control_plane (bool): Whether the charm is the lead control plane.
is_control_plane (bool): Whether the charm is a control plane.
"""

api_manager: K8sdAPIManager
Expand All @@ -33,6 +36,14 @@ def get_cluster_name(self) -> str:
"""
raise NotImplementedError

def grant_upgrade(self) -> None:
"""Grant the upgrade.

Raises:
NotImplementedError: If the method is not implemented.
"""
raise NotImplementedError

def get_cloud_name(self) -> str:
"""Get the cloud name.

Expand All @@ -48,3 +59,38 @@ def _is_node_ready(self) -> bool:
NotImplementedError: If the method is not implemented.
"""
raise NotImplementedError

@property
def is_upgrade_granted(self) -> bool:
"""Check if the upgrade is granted.

Raises:
NotImplementedError: If the method is not implemented.
"""
raise NotImplementedError

@property
def lead_control_plane(self) -> bool:
"""Check if the charm is the lead control plane.

Raises:
NotImplementedError: If the method is not implemented.
"""
raise NotImplementedError

@property
def is_control_plane(self) -> bool:
"""Check if the charm is the control plane.

Raises:
NotImplementedError: If the method is not implemented.
"""
raise NotImplementedError
mateoflorido marked this conversation as resolved.
Show resolved Hide resolved

def get_worker_version(self) -> str:
"""Get the worker version.

Raises:
NotImplementedError: If the method is not implemented.
"""
raise NotImplementedError
43 changes: 41 additions & 2 deletions charms/worker/k8s/src/snap.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import charms.operator_libs_linux.v2.snap as snap_lib
import ops
import yaml
from protocols import K8sCharmProtocol
from pydantic import BaseModel, Field, ValidationError, parse_obj_as, validator
from typing_extensions import Annotated

Expand Down Expand Up @@ -263,7 +264,7 @@ def _parse_management_arguments(charm: ops.CharmBase) -> List[SnapArgument]:
return args


def management(charm: ops.CharmBase) -> None:
def management(charm: K8sCharmProtocol) -> None:
"""Manage snap installations on this machine.

Arguments:
Expand All @@ -272,7 +273,7 @@ def management(charm: ops.CharmBase) -> None:
cache = snap_lib.SnapCache()
for args in _parse_management_arguments(charm):
which = cache[args.name]
if block_refresh(which, args):
if block_refresh(which, args) and not charm.is_upgrade_granted:
mateoflorido marked this conversation as resolved.
Show resolved Hide resolved
continue
install_args = args.dict(exclude_none=True)
if isinstance(args, SnapFileArgument) and which.revision != "x1":
Expand Down Expand Up @@ -342,3 +343,41 @@ def version(snap: str) -> Tuple[Optional[str], bool]:

log.info("Snap k8s not found or no version available.")
return None, overridden


def stop(snap_name: str, services: List[str]) -> None:
"""Stop the services of the snap on this machine.

Arguments:
snap_name: The name of the snap
services: The services to stop

Raises:
SnapError: If the snap isn't installed
"""
cache = snap_lib.SnapCache()
if snap_name not in cache:
message = f"Snap '{snap_name}' not installed"
log.error(message)
raise snap_lib.SnapError(message)
snap = cache[snap_name]
snap.stop(services=services)


def start(snap_name: str, services: List[str]) -> None:
"""Start the services of the snap on this machine.

Arguments:
snap_name: The name of the snap
services: The services to start

Raises:
SnapError: If the snap isn't installed
"""
cache = snap_lib.SnapCache()
if snap_name not in cache:
message = f"Snap '{snap_name}' not installed"
log.error(message)
raise snap_lib.SnapError(message)
snap = cache[snap_name]
snap.start(services=services)
Loading
Loading