From 1562054221d51382a249b90075f670c89c1c39d9 Mon Sep 17 00:00:00 2001 From: Mateo Florido Date: Tue, 17 Dec 2024 08:51:24 -0500 Subject: [PATCH] Allow `k8s` to integrate with multiple worker units (#221) --- charms/worker/k8s/src/charm.py | 69 ++++++++++++++++++---------------- 1 file changed, 36 insertions(+), 33 deletions(-) diff --git a/charms/worker/k8s/src/charm.py b/charms/worker/k8s/src/charm.py index 53c74a51..382c22de 100755 --- a/charms/worker/k8s/src/charm.py +++ b/charms/worker/k8s/src/charm.py @@ -83,6 +83,7 @@ SUPPORTED_DATASTORES, ) from ops.interface_kube_control import KubeControlProvides +from pydantic import SecretStr from snap import management as snap_management from snap import version as snap_version from token_distributor import ClusterTokenType, TokenCollector, TokenDistributor, TokenStrategy @@ -249,13 +250,14 @@ def get_worker_versions(self) -> Dict[str, List[ops.Unit]]: Returns: Dict[str, List[ops.Unit]]: A dictionary of versions and the units that have them. """ - if not (relation := self.model.get_relation(CLUSTER_WORKER_RELATION)): + if not (relations := self.model.relations.get(CLUSTER_WORKER_RELATION)): return {} versions = defaultdict(list) - for unit in relation.units: - if version := relation.data[unit].get("version"): - versions[version].append(unit) + for relation in relations: + for unit in relation.units: + if version := relation.data[unit].get("version"): + versions[version].append(unit) return versions def grant_upgrade(self): @@ -416,16 +418,16 @@ def _bootstrap_k8s_snap(self): def _config_containerd_registries(self): """Apply containerd custom registries.""" registries, config = [], "" - containerd_relation = self.model.get_relation(CONTAINERD_RELATION) - if self.is_control_plane: - config = str(self.config["containerd-custom-registries"]) - registries = containerd.parse_registries(config) - else: - registries = containerd.recover(containerd_relation) - self.unit.status = ops.MaintenanceStatus("Ensuring containerd registries") - containerd.ensure_registry_configs(registries) - if self.lead_control_plane: - containerd.share(config, self.app, containerd_relation) + for relation in self.model.relations.get(CONTAINERD_RELATION, []): + if self.is_control_plane: + config = str(self.config["containerd-custom-registries"]) + registries = containerd.parse_registries(config) + else: + registries = containerd.recover(relation) + self.unit.status = ops.MaintenanceStatus("Ensuring containerd registries") + containerd.ensure_registry_configs(registries) + if self.lead_control_plane: + containerd.share(config, self.app, relation) def _configure_cos_integration(self): """Retrieve the join token from secret databag and join the cluster.""" @@ -607,9 +609,9 @@ def _revoke_cluster_tokens(self, event: ops.EventBase): to_remove=to_remove, ) - if workers := self.model.get_relation(CLUSTER_WORKER_RELATION): + for relation in self.model.relations.get(CLUSTER_WORKER_RELATION, []): self.distributor.revoke_tokens( - relation=workers, + relation=relation, token_strategy=TokenStrategy.CLUSTER, token_type=ClusterTokenType.WORKER, to_remove=to_remove, @@ -625,9 +627,9 @@ def _create_cluster_tokens(self): token_type=ClusterTokenType.CONTROL_PLANE, ) - if workers := self.model.get_relation(CLUSTER_WORKER_RELATION): + for relation in self.model.relations.get(CLUSTER_WORKER_RELATION, []): self.distributor.allocate_tokens( - relation=workers, + relation=relation, token_strategy=TokenStrategy.CLUSTER, token_type=ClusterTokenType.WORKER, ) @@ -729,26 +731,27 @@ def _announce_kubernetes_version(self): if not local_version: raise ReconcilerError("k8s-snap is not installed") - relation_config = { - "peer": self.model.get_relation(CLUSTER_RELATION), - "worker": self.model.get_relation(CLUSTER_WORKER_RELATION), + relation_config: Dict[str, List[ops.Relation]] = { + "peer": self.model.relations.get(CLUSTER_RELATION, []), + "worker": self.model.relations.get(CLUSTER_WORKER_RELATION, []), } waiting_units = {role: 0 for role in relation_config} - for role, relation in relation_config.items(): - if not relation: - continue + for role, relations in relation_config.items(): + for relation in relations: + if not relation.units: + continue - units = (unit for unit in relation.units if unit.name != self.unit.name) - for unit in units: - unit_version = relation.data[unit].get("version") - if not unit_version: - raise ReconcilerError(f"Waiting for version from {unit.name}") - if unit_version != local_version: - waiting_units[role] += 1 + units = (unit for unit in relation.units if unit.name != self.unit.name) + for unit in units: + unit_version = relation.data[unit].get("version") + if not unit_version: + raise ReconcilerError(f"Waiting for version from {unit.name}") + if unit_version != local_version: + waiting_units[role] += 1 - relation.data[self.app]["version"] = local_version + relation.data[self.app]["version"] = local_version if not any(waiting_units.values()): return @@ -828,7 +831,7 @@ def _join_with_token(self, relation: ops.Relation, token: str, cluster_name: str node_name = self.get_node_name() cluster_addr = f"{address}:{K8SD_PORT}" log.info("Joining %s(%s) to %s...", self.unit, node_name, cluster_name) - request = JoinClusterRequest(name=node_name, address=cluster_addr, token=token) + request = JoinClusterRequest(name=node_name, address=cluster_addr, token=SecretStr(token)) if self.is_control_plane: request.config = ControlPlaneNodeJoinConfig() request.config.extra_sans = self._get_extra_sans()