Skip to content

Commit

Permalink
Improving typing, and prepare kubectl and kubectl_get for future
Browse files Browse the repository at this point in the history
  • Loading branch information
addyess committed Apr 25, 2024
1 parent 29f2376 commit 1183125
Showing 1 changed file with 38 additions and 5 deletions.
43 changes: 38 additions & 5 deletions charms/worker/k8s/src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
from cos_integration import COSIntegration
from snap import management as snap_management
from token_distributor import ClusterTokenType, TokenCollector, TokenDistributor, TokenStrategy
from typing_extensions import Literal

# Log messages can be retrieved using juju debug-log
log = logging.getLogger(__name__)
Expand All @@ -78,14 +79,14 @@ def _get_public_address() -> str:
return subprocess.check_output(cmd).decode("UTF-8").strip()


def _cluster_departing_unit(event: ops.EventBase) -> Union[bool, ops.Unit]:
def _cluster_departing_unit(event: ops.EventBase) -> Union[Literal[False], ops.Unit]:
"""Determine if the given event signals the end of the cluster for this unit.
Args:
event (ops.EventBase): event to consider.
Returns:
Optional[ops.Unit] Unit leaving the cluster
Literal[False] | ops.Unit - False or the Unit leaving the cluster
"""
return (
isinstance(event, ops.RelationDepartedEvent)
Expand Down Expand Up @@ -628,10 +629,9 @@ def _is_node_ready(self, node: str = "") -> bool:
bool: True when this unit is marked as Ready
"""
node = node or self.get_node_name()
run = f"{KUBECTL_PATH} --kubeconfig {self._internal_kubeconfig} get nodes {node}"
cmd = shlex.split(run) + ['-o=jsonpath={.status.conditions[?(@.type=="Ready")].status}']
cmd = ["nodes", node, '-o=jsonpath={.status.conditions[?(@.type=="Ready")].status}']
try:
return subprocess.check_output(cmd) == b"True"
return self.kubectl_get(cmd) == "True"
except subprocess.CalledProcessError:
return False

Expand Down Expand Up @@ -670,6 +670,39 @@ def _on_update_status(self, _event: ops.UpdateStatusEvent):
except status.ReconcilerError:
log.exception("Can't update_status")

def kubectl(self, *args) -> str:
"""Run kubectl command.
Arguments:
args: arguments passed to kubectl
Returns:
string response
Raises:
CalledProcessError: in the event of a failed kubectl
"""
cmd = [KUBECTL_PATH, f"--kubeconfig={self._internal_kubeconfig}", *args]
log.info("Executing %s", cmd)
try:
return subprocess.check_output(cmd, text=True)
except subprocess.CalledProcessError as e:
log.error(
"Command failed: %s}\nreturncode: %s\nstdout: %s", cmd, e.returncode, e.output
)
raise

def kubectl_get(self, *args) -> str:
"""Run kubectl get command.
Arguments:
args: arguments passed to kubectl get
Returns:
string response
"""
return self.kubectl("get", *args)

@property
def _internal_kubeconfig(self) -> Path:
"""Return the highest authority kube config for this unit."""
Expand Down

0 comments on commit 1183125

Please sign in to comment.