Skip to content

Commit

Permalink
Allow k8s to integrate with multiple worker units (#221)
Browse files Browse the repository at this point in the history
  • Loading branch information
mateoflorido authored Dec 17, 2024
1 parent f83d7c5 commit 1562054
Showing 1 changed file with 36 additions and 33 deletions.
69 changes: 36 additions & 33 deletions charms/worker/k8s/src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
SUPPORTED_DATASTORES,
)
from ops.interface_kube_control import KubeControlProvides
from pydantic import SecretStr
from snap import management as snap_management
from snap import version as snap_version
from token_distributor import ClusterTokenType, TokenCollector, TokenDistributor, TokenStrategy
Expand Down Expand Up @@ -249,13 +250,14 @@ def get_worker_versions(self) -> Dict[str, List[ops.Unit]]:
Returns:
Dict[str, List[ops.Unit]]: A dictionary of versions and the units that have them.
"""
if not (relation := self.model.get_relation(CLUSTER_WORKER_RELATION)):
if not (relations := self.model.relations.get(CLUSTER_WORKER_RELATION)):
return {}

versions = defaultdict(list)
for unit in relation.units:
if version := relation.data[unit].get("version"):
versions[version].append(unit)
for relation in relations:
for unit in relation.units:
if version := relation.data[unit].get("version"):
versions[version].append(unit)
return versions

def grant_upgrade(self):
Expand Down Expand Up @@ -416,16 +418,16 @@ def _bootstrap_k8s_snap(self):
def _config_containerd_registries(self):
"""Apply containerd custom registries."""
registries, config = [], ""
containerd_relation = self.model.get_relation(CONTAINERD_RELATION)
if self.is_control_plane:
config = str(self.config["containerd-custom-registries"])
registries = containerd.parse_registries(config)
else:
registries = containerd.recover(containerd_relation)
self.unit.status = ops.MaintenanceStatus("Ensuring containerd registries")
containerd.ensure_registry_configs(registries)
if self.lead_control_plane:
containerd.share(config, self.app, containerd_relation)
for relation in self.model.relations.get(CONTAINERD_RELATION, []):
if self.is_control_plane:
config = str(self.config["containerd-custom-registries"])
registries = containerd.parse_registries(config)
else:
registries = containerd.recover(relation)
self.unit.status = ops.MaintenanceStatus("Ensuring containerd registries")
containerd.ensure_registry_configs(registries)
if self.lead_control_plane:
containerd.share(config, self.app, relation)

def _configure_cos_integration(self):
"""Retrieve the join token from secret databag and join the cluster."""
Expand Down Expand Up @@ -607,9 +609,9 @@ def _revoke_cluster_tokens(self, event: ops.EventBase):
to_remove=to_remove,
)

if workers := self.model.get_relation(CLUSTER_WORKER_RELATION):
for relation in self.model.relations.get(CLUSTER_WORKER_RELATION, []):
self.distributor.revoke_tokens(
relation=workers,
relation=relation,
token_strategy=TokenStrategy.CLUSTER,
token_type=ClusterTokenType.WORKER,
to_remove=to_remove,
Expand All @@ -625,9 +627,9 @@ def _create_cluster_tokens(self):
token_type=ClusterTokenType.CONTROL_PLANE,
)

if workers := self.model.get_relation(CLUSTER_WORKER_RELATION):
for relation in self.model.relations.get(CLUSTER_WORKER_RELATION, []):
self.distributor.allocate_tokens(
relation=workers,
relation=relation,
token_strategy=TokenStrategy.CLUSTER,
token_type=ClusterTokenType.WORKER,
)
Expand Down Expand Up @@ -729,26 +731,27 @@ def _announce_kubernetes_version(self):
if not local_version:
raise ReconcilerError("k8s-snap is not installed")

relation_config = {
"peer": self.model.get_relation(CLUSTER_RELATION),
"worker": self.model.get_relation(CLUSTER_WORKER_RELATION),
relation_config: Dict[str, List[ops.Relation]] = {
"peer": self.model.relations.get(CLUSTER_RELATION, []),
"worker": self.model.relations.get(CLUSTER_WORKER_RELATION, []),
}

waiting_units = {role: 0 for role in relation_config}

for role, relation in relation_config.items():
if not relation:
continue
for role, relations in relation_config.items():
for relation in relations:
if not relation.units:
continue

units = (unit for unit in relation.units if unit.name != self.unit.name)
for unit in units:
unit_version = relation.data[unit].get("version")
if not unit_version:
raise ReconcilerError(f"Waiting for version from {unit.name}")
if unit_version != local_version:
waiting_units[role] += 1
units = (unit for unit in relation.units if unit.name != self.unit.name)
for unit in units:
unit_version = relation.data[unit].get("version")
if not unit_version:
raise ReconcilerError(f"Waiting for version from {unit.name}")
if unit_version != local_version:
waiting_units[role] += 1

relation.data[self.app]["version"] = local_version
relation.data[self.app]["version"] = local_version

if not any(waiting_units.values()):
return
Expand Down Expand Up @@ -828,7 +831,7 @@ def _join_with_token(self, relation: ops.Relation, token: str, cluster_name: str
node_name = self.get_node_name()
cluster_addr = f"{address}:{K8SD_PORT}"
log.info("Joining %s(%s) to %s...", self.unit, node_name, cluster_name)
request = JoinClusterRequest(name=node_name, address=cluster_addr, token=token)
request = JoinClusterRequest(name=node_name, address=cluster_addr, token=SecretStr(token))
if self.is_control_plane:
request.config = ControlPlaneNodeJoinConfig()
request.config.extra_sans = self._get_extra_sans()
Expand Down

0 comments on commit 1562054

Please sign in to comment.