Skip to content

Commit

Permalink
Improve node removal by validating departed|broken relations (#80)
Browse files Browse the repository at this point in the history
* Improve node removal by validating departed|broken relations

* use proper internal kubectl and kubeconfig

* Fix logic around last gasp to confirm 3 reported down nodes

* Improving typing, and prepare kubectl and kubectl_get for future

* call kubectl_get with positional args
  • Loading branch information
addyess authored Apr 26, 2024
1 parent 13311fe commit 66b4af0
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 22 deletions.
117 changes: 101 additions & 16 deletions charms/worker/k8s/src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
from cos_integration import COSIntegration
from snap import management as snap_management
from token_distributor import ClusterTokenType, TokenCollector, TokenDistributor, TokenStrategy
from typing_extensions import Literal

# Log messages can be retrieved using juju debug-log
log = logging.getLogger(__name__)
Expand All @@ -78,6 +79,22 @@ def _get_public_address() -> str:
return subprocess.check_output(cmd).decode("UTF-8").strip()


def _cluster_departing_unit(event: ops.EventBase) -> Union[Literal[False], ops.Unit]:
"""Determine if the given event signals the end of the cluster for this unit.
Args:
event (ops.EventBase): event to consider.
Returns:
Literal[False] | ops.Unit - False or the Unit leaving the cluster
"""
return (
isinstance(event, ops.RelationDepartedEvent)
and event.relation.name in ["k8s-cluster", "cluster"]
and event.departing_unit
)


class K8sCharm(ops.CharmBase):
"""A charm for managing a K8s cluster via the k8s snap.
Expand Down Expand Up @@ -323,13 +340,22 @@ def _configure_cloud_provider(self, config: BootstrapConfig):
log.info("Using external as cloud-provider")
config.cloud_provider = "external"

def _revoke_cluster_tokens(self):
def _revoke_cluster_tokens(self, event: ops.EventBase):
"""Revoke tokens for the units in the cluster and k8s-cluster relations.
if self is dying, only try to remove itself from the cluster
if event is relation_departed, remove that unit
Args:
event (ops.Event): event triggering token revocation
"""
log.info("Garbage collect cluster tokens")
to_remove = {self.unit} if self.is_dying else None
to_remove = set()
if self.is_dying:
to_remove |= {self.unit}
if unit := _cluster_departing_unit(event):
to_remove |= {unit}

if peer := self.model.get_relation("cluster"):
self.distributor.revoke_tokens(
Expand Down Expand Up @@ -513,18 +539,20 @@ def _join_cluster(self):
self.api_manager.join_cluster(request)
log.info("Success")

def _reconcile(self, event):
def _reconcile(self, event: ops.EventBase):
"""Reconcile state change events.
Args:
event: ops.EventBase - event that triggered the reconciliation
"""
log.info("Reconcile event=%s", event)

self._evaluate_removal(event)
if self.is_dying and self.lead_control_plane:
self._revoke_cluster_tokens()
self._revoke_cluster_tokens(event)
if self.is_dying:
self._update_status()
self._last_gasp(event)
self._last_gasp()
return

self._apply_proxy_environment()
Expand All @@ -537,7 +565,7 @@ def _reconcile(self, event):
self._create_cluster_tokens()
self._create_cos_tokens()
self._apply_cos_requirements()
self._revoke_cluster_tokens()
self._revoke_cluster_tokens(event)
self._ensure_cluster_config()
self._join_cluster()
self._configure_cos_integration()
Expand Down Expand Up @@ -569,6 +597,10 @@ def _update_status(self):
if version := self._get_snap_version():
self.unit.set_workload_version(version)

if not self._is_node_ready():
status.add(ops.WaitingStatus("Node not yet Ready"))
return

def _evaluate_removal(self, event: ops.EventBase):
"""Determine if my unit is being removed.
Expand All @@ -577,25 +609,45 @@ def _evaluate_removal(self, event: ops.EventBase):
"""
if self.is_dying:
return
if isinstance(event, ops.RelationDepartedEvent):
# if a unit is departing, and it's me?
self._stored.removing = event.departing_unit == self.unit
if unit := _cluster_departing_unit(event):
# Juju says I am being removed
self._stored.removing = unit == self.unit
elif isinstance(event, ops.RelationBrokenEvent) and event.relation.name == "cluster":
# Control-plane never experience RelationBroken on "cluster", it's a peer relation
# Worker units experience RelationBroken on "cluster" when the relation is removed
# or this unit is being removed.
self._stored.removing = self.is_worker
elif isinstance(event, (ops.RemoveEvent, ops.StopEvent)):
# If I myself am dying, its me!
self._stored.removing = True

def _last_gasp(self, event):
"""Busy wait on stop event until the unit isn't clustered anymore.
def _is_node_ready(self, node: str = "") -> bool:
"""Determine if node is in the kubernetes cluster.
Args:
event: ops.EventBase - event that triggered charm hook
node (str): name of node
Returns:
bool: True when this unit is marked as Ready
"""
if not isinstance(event, ops.StopEvent):
return
busy_wait = 30
node = node or self.get_node_name()
cmd = ["nodes", node, '-o=jsonpath={.status.conditions[?(@.type=="Ready")].status}']
try:
return self.kubectl_get(*cmd) == "True"
except subprocess.CalledProcessError:
return False

def _last_gasp(self):
"""Busy wait on stop event until the unit isn't clustered anymore."""
busy_wait, reported_down = 30, 0
status.add(ops.MaintenanceStatus("Ensuring cluster removal"))
while busy_wait and self.api_manager.is_cluster_bootstrapped():
while busy_wait and reported_down != 3:
log.info("Waiting for this unit to uncluster")
if self._is_node_ready() or self.api_manager.is_cluster_bootstrapped():
log.info("Node is still reportedly clustered")
reported_down = 0
else:
reported_down += 1
sleep(1)
busy_wait -= 1

Expand All @@ -620,6 +672,39 @@ def _on_update_status(self, _event: ops.UpdateStatusEvent):
except status.ReconcilerError:
log.exception("Can't update_status")

def kubectl(self, *args) -> str:
"""Run kubectl command.
Arguments:
args: arguments passed to kubectl
Returns:
string response
Raises:
CalledProcessError: in the event of a failed kubectl
"""
cmd = [KUBECTL_PATH, f"--kubeconfig={self._internal_kubeconfig}", *args]
log.info("Executing %s", cmd)
try:
return subprocess.check_output(cmd, text=True)
except subprocess.CalledProcessError as e:
log.error(
"Command failed: %s}\nreturncode: %s\nstdout: %s", cmd, e.returncode, e.output
)
raise

def kubectl_get(self, *args) -> str:
"""Run kubectl get command.
Arguments:
args: arguments passed to kubectl get
Returns:
string response
"""
return self.kubectl("get", *args)

@property
def _internal_kubeconfig(self) -> Path:
"""Return the highest authority kube config for this unit."""
Expand Down
12 changes: 6 additions & 6 deletions charms/worker/k8s/templates/snap_installation.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
# See LICENSE file for licensing details.

amd64:
- name: k8s
install-type: store
channel: edge
- name: k8s
install-type: store
channel: edge
arm64:
- name: k8s
install-type: store
channel: edge
- name: k8s
install-type: store
channel: edge

0 comments on commit 66b4af0

Please sign in to comment.