From eedfe504d43fef3a916d23dcb9df43e5493e85af Mon Sep 17 00:00:00 2001 From: PietroPasotti Date: Wed, 4 Dec 2024 12:04:03 +0100 Subject: [PATCH] moved cluster over to new dir (#108) --- src/cosl/coordinated_workers/coordinator.py | 2 +- src/cosl/coordinated_workers/interface.py | 601 +----------------- src/cosl/coordinated_workers/worker.py | 2 +- src/cosl/interfaces/__init__.py | 5 + src/cosl/interfaces/cluster.py | 533 ++++++++++++++++ src/cosl/interfaces/utils.py | 83 +++ .../test_coordinator.py | 2 +- .../test_coordinator_status.py | 2 +- .../test_interface.py | 2 +- .../test_worker_status.py | 2 +- tests/test_databag_model.py | 2 +- 11 files changed, 639 insertions(+), 597 deletions(-) create mode 100644 src/cosl/interfaces/__init__.py create mode 100644 src/cosl/interfaces/cluster.py create mode 100644 src/cosl/interfaces/utils.py diff --git a/src/cosl/coordinated_workers/coordinator.py b/src/cosl/coordinated_workers/coordinator.py index 5d0df29..9c5afae 100644 --- a/src/cosl/coordinated_workers/coordinator.py +++ b/src/cosl/coordinated_workers/coordinator.py @@ -32,13 +32,13 @@ import cosl from cosl.coordinated_workers import worker -from cosl.coordinated_workers.interface import ClusterProvider, RemoteWriteEndpoint from cosl.coordinated_workers.nginx import ( Nginx, NginxMappingOverrides, NginxPrometheusExporter, ) from cosl.helpers import check_libs_installed +from cosl.interfaces.cluster import ClusterProvider, RemoteWriteEndpoint check_libs_installed( "charms.data_platform_libs.v0.s3", diff --git a/src/cosl/coordinated_workers/interface.py b/src/cosl/coordinated_workers/interface.py index 45188cb..4cc8b54 100644 --- a/src/cosl/coordinated_workers/interface.py +++ b/src/cosl/coordinated_workers/interface.py @@ -1,599 +1,20 @@ #!/usr/bin/env python3 -# Copyright 2024 Canonical +# Copyright 2024 Canonical Ltd. # See LICENSE file for licensing details. -"""Shared utilities for the coordinator -> worker "cluster" interface. +"""This module is only here for backwards compatibility. -As this relation is cluster-internal and not intended for third-party charms to interact with -`-coordinator-k8s`, its only user will be the -worker-k8s charm. As such, -it does not live in a charm lib as most other relation endpoint wrappers do. +Its contents have been moved over to cosl.interfaces.cluster and cosl.interfaces.utils. """ -import collections -import json -import logging -from typing import ( - Any, - Counter, - Dict, - FrozenSet, - Iterable, - List, - Mapping, - MutableMapping, - NamedTuple, - Optional, - Set, - Tuple, -) -from urllib.parse import urlparse - -import ops -import pydantic -import yaml -from ops import EventSource, Object, ObjectEvents, RelationCreatedEvent -from pydantic import ConfigDict -from typing_extensions import TypedDict - -import cosl - -log = logging.getLogger("_cluster") - -DEFAULT_ENDPOINT_NAME = "-cluster" -BUILTIN_JUJU_KEYS = {"ingress-address", "private-address", "egress-subnets"} - -# ================= -# | Databag Model | -# ================= - -# Note: MutableMapping is imported from the typing module and not collections.abc -# because subscripting collections.abc.MutableMapping was added in python 3.9, but -# most of our charms are based on 20.04, which has python 3.8. - -_RawDatabag = MutableMapping[str, str] - - -class DataValidationError(Exception): - """Raised when relation databag validation fails.""" - - -class DatabagModel(pydantic.BaseModel): - """Base databag model.""" - - model_config = ConfigDict( - # tolerate additional keys in databag - extra="ignore", - # Allow instantiating this class by field name (instead of forcing alias). - populate_by_name=True, - ) # type: ignore - """Pydantic config.""" - - @classmethod - def load(cls, databag: _RawDatabag): - """Load this model from a Juju databag.""" - try: - data = { - k: json.loads(v) - for k, v in databag.items() - # Don't attempt to parse model-external values - if k in {(f.alias or n) for n, f in cls.__fields__.items()} # type: ignore - } - except json.JSONDecodeError as e: - msg = f"invalid databag contents: expecting json. {databag}" - log.error(msg) - raise DataValidationError(msg) from e - - try: - return cls.model_validate_json(json.dumps(data)) # type: ignore - except pydantic.ValidationError as e: - msg = f"failed to validate databag: {databag}" - if databag: - log.debug(msg, exc_info=True) - raise DataValidationError(msg) from e - - def dump(self, databag: Optional[_RawDatabag] = None, clear: bool = True) -> _RawDatabag: - """Write the contents of this model to Juju databag. - - :param databag: the databag to write the data to. - :param clear: ensure the databag is cleared before writing it. - """ - _databag: _RawDatabag = {} if databag is None else databag - - if clear: - _databag.clear() - - dct = self.model_dump(mode="json", by_alias=True, exclude_defaults=True) # type: ignore - _databag.update({k: json.dumps(v) for k, v in dct.items()}) - return _databag - - -# ============= -# | Interface | -# ============= - - -class RemoteWriteEndpoint(TypedDict): - """Type of the remote write endpoints to be passed to the worker through cluster relation data.""" - - url: str - - -class ConfigReceivedEvent(ops.EventBase): - """Event emitted when the "-cluster" provider has shared a new config.""" - - config: Dict[str, Any] - """The config.""" - - def __init__(self, handle: ops.framework.Handle, config: Dict[str, Any]): - super().__init__(handle) - self.config = config - - def snapshot(self) -> Dict[str, Any]: - """Used by the framework to serialize the event to disk. - - Not meant to be called by charm code. - """ - return {"config": json.dumps(self.config)} - - def restore(self, snapshot: Dict[str, Any]): - """Used by the framework to deserialize the event from disk. - - Not meant to be called by charm code. - """ - self.relation = json.loads(snapshot["config"]) # noqa - - -class ClusterError(Exception): - """Base class for exceptions raised by this module.""" - - -class DatabagAccessPermissionError(ClusterError): - """Raised when a follower attempts to write leader settings.""" - - -class _Topology(pydantic.BaseModel): - """Juju topology information.""" - - application: str - unit: str - charm_name: str - - -class ClusterRequirerAppData(DatabagModel): - """App data that the worker sends to the coordinator.""" - - role: str - - -class ClusterRequirerUnitData(DatabagModel): - """Unit data the worker sends to the coordinator.""" - - juju_topology: _Topology - address: str - - -class ClusterProviderAppData(DatabagModel): - """App data that the coordinator sends to the worker.""" - - ### worker node configuration - worker_config: str - """The whole worker workload configuration, whatever it is. E.g. yaml-encoded things.""" - - ### self-monitoring stuff - loki_endpoints: Optional[Dict[str, str]] = None - """Endpoints to which the workload (and the worker charm) can push logs to.""" - charm_tracing_receivers: Optional[Dict[str, str]] = None - """Endpoints to which the the worker charm can push charm traces to.""" - workload_tracing_receivers: Optional[Dict[str, str]] = None - """Endpoints to which the the worker can push workload traces to.""" - remote_write_endpoints: Optional[List[RemoteWriteEndpoint]] = None - """Endpoints to which the workload (and the worker charm) can push metrics to.""" - - ### TLS stuff - ca_cert: Optional[str] = None - server_cert: Optional[str] = None - privkey_secret_id: Optional[str] = None - s3_tls_ca_chain: Optional[str] = None - - -class TLSData(NamedTuple): - """Section of the cluster data that concerns TLS information.""" - - ca_cert: Optional[str] - server_cert: Optional[str] - privkey_secret_id: Optional[str] - s3_tls_ca_chain: Optional[str] - - -class ClusterChangedEvent(ops.EventBase): - """Event emitted when any "-cluster" relation event fires.""" - - -class ClusterRemovedEvent(ops.EventBase): - """Event emitted when the relation with the "-cluster" provider has been severed. - - Or when the relation data has been wiped. - """ - - -class ClusterProviderEvents(ObjectEvents): - """Events emitted by the ClusterProvider "-cluster" endpoint wrapper.""" - changed = EventSource(ClusterChangedEvent) +from cosl.interfaces.utils import DatabagModel, DataValidationError # noqa # type:ignore +from cosl.interfaces.cluster import * # noqa # type:ignore +from logging import getLogger -class ClusterRequirerEvents(ObjectEvents): - """Events emitted by the ClusterRequirer "-cluster" endpoint wrapper.""" - - config_received = EventSource(ConfigReceivedEvent) - created = EventSource(RelationCreatedEvent) - removed = EventSource(ClusterRemovedEvent) - - -class ClusterProvider(Object): - """``-cluster`` provider endpoint wrapper.""" - - on = ClusterProviderEvents() # type: ignore - - def __init__( - self, - charm: ops.CharmBase, - roles: FrozenSet[str], - meta_roles: Optional[Mapping[str, Iterable[str]]] = None, - key: Optional[str] = None, - endpoint: str = DEFAULT_ENDPOINT_NAME, - ): - super().__init__(charm, key or endpoint) - self._charm = charm - self._roles = roles - self._meta_roles = meta_roles or {} - self.juju_topology = cosl.JujuTopology.from_charm(self._charm) - - # filter out common unhappy relation states - self._relations: List[ops.Relation] = [ - rel for rel in self.model.relations[endpoint] if (rel.app and rel.data) - ] - - # we coalesce all -cluster-relation-* events into a single cluster-changed API. - # the coordinator uses a common exit hook reconciler, that's why. - self.framework.observe(self._charm.on[endpoint].relation_created, self._on_cluster_changed) - self.framework.observe(self._charm.on[endpoint].relation_joined, self._on_cluster_changed) - self.framework.observe(self._charm.on[endpoint].relation_changed, self._on_cluster_changed) - self.framework.observe( - self._charm.on[endpoint].relation_departed, self._on_cluster_changed - ) - self.framework.observe(self._charm.on[endpoint].relation_broken, self._on_cluster_changed) - - def _on_cluster_changed(self, _: ops.EventBase) -> None: - self.on.changed.emit() - - def grant_privkey(self, label: str) -> str: - """Grant the secret containing the privkey to all relations, and return the secret ID.""" - secret = self.model.get_secret(label=label) - for relation in self._relations: - secret.grant(relation) - # can't return secret.id because secret was obtained by label, and so - # we don't have an ID unless we fetch it - return secret.get_info().id - - def publish_data( - self, - worker_config: str, - ca_cert: Optional[str] = None, - server_cert: Optional[str] = None, - s3_tls_ca_chain: Optional[str] = None, - privkey_secret_id: Optional[str] = None, - loki_endpoints: Optional[Dict[str, str]] = None, - charm_tracing_receivers: Optional[Dict[str, str]] = None, - workload_tracing_receivers: Optional[Dict[str, str]] = None, - remote_write_endpoints: Optional[List[RemoteWriteEndpoint]] = None, - ) -> None: - """Publish the config to all related worker clusters.""" - for relation in self._relations: - if relation and self._remote_data_ready(relation): - local_app_databag = ClusterProviderAppData( - worker_config=worker_config, - loki_endpoints=loki_endpoints, - ca_cert=ca_cert, - server_cert=server_cert, - privkey_secret_id=privkey_secret_id, - charm_tracing_receivers=charm_tracing_receivers, - workload_tracing_receivers=workload_tracing_receivers, - remote_write_endpoints=remote_write_endpoints, - s3_tls_ca_chain=s3_tls_ca_chain, - ) - local_app_databag.dump(relation.data[self.model.app]) - - @property - def has_workers(self) -> bool: - """Return True if the coordinator is connected to any worker.""" - # we use the presence of relations instead of addresses, because we want this - # check to fail early - return bool(self._relations) - - def _expand_roles(self, role_string: str) -> Set[str]: - """Expand the meta-roles from a comma-separated list of roles.""" - expanded_roles: Set[str] = set() - for role in role_string.split(","): - if role in self._meta_roles: - expanded_roles.update(self._meta_roles[role]) - else: - expanded_roles.update({role}) - return expanded_roles - - def gather_addresses_by_role(self) -> Dict[str, Set[str]]: - """Go through the worker's unit databags to collect all the addresses published by the units, by role.""" - data: Dict[str, Set[str]] = collections.defaultdict(set) - for relation in self._relations: - - if not relation.app: - log.debug(f"skipped {relation} as .app is None") - continue - - try: - worker_app_data = ClusterRequirerAppData.load(relation.data[relation.app]) - except DataValidationError as e: - log.info(f"invalid databag contents: {e}") - continue - - for worker_unit in relation.units: - try: - worker_data = ClusterRequirerUnitData.load(relation.data[worker_unit]) - unit_address = worker_data.address - for role in self._expand_roles(worker_app_data.role): - data[role].add(unit_address) - except DataValidationError as e: - log.info(f"invalid databag contents: {e}") - continue - return data - - def gather_addresses(self) -> Tuple[str, ...]: - """Go through the worker's unit databags to collect all the addresses published by the units.""" - data: Set[str] = set() - addresses_by_role = self.gather_addresses_by_role() - for _, address_set in addresses_by_role.items(): - data.update(address_set) - - return tuple(sorted(data)) - - def gather_roles(self) -> Dict[str, int]: - """Go through the worker's app databags and sum the available application roles.""" - data: Counter[str] = collections.Counter() - for relation in self._relations: - if relation.app: - remote_app_databag = relation.data[relation.app] - try: - worker_role: str = ClusterRequirerAppData.load(remote_app_databag).role - except DataValidationError as e: - log.error(f"invalid databag contents: {e}") - continue - - # the number of units with each role is the number of remote units - role_n = len(relation.units) # exclude this unit - for role in self._expand_roles(worker_role): - data[role] += role_n - - dct = dict(data) - return dct - - def gather_topology(self) -> List[Dict[str, str]]: - """Gather Juju topology by unit.""" - data: List[Dict[str, str]] = [] - for relation in self._relations: - if not relation.app: - continue - - for worker_unit in relation.units: - try: - worker_data = ClusterRequirerUnitData.load(relation.data[worker_unit]) - unit_address = worker_data.address - except DataValidationError as e: - log.info(f"invalid databag contents: {e}") - continue - worker_topology = { - "address": unit_address, - "application": worker_data.juju_topology.application, - "unit": worker_data.juju_topology.unit, - "charm_name": worker_data.juju_topology.charm_name, - } - data.append(worker_topology) - - return data - - def get_address_from_role(self, role: str) -> Optional[str]: - """Get datasource address.""" - addresses_by_role = self.gather_addresses_by_role() - if address_set := addresses_by_role.get(role, None): - return address_set.pop() - return None - - def _remote_data_ready(self, relation: ops.Relation) -> bool: - """Verify that each worker unit and the worker leader have published their data to the cluster relation. - - - unit address is published - - roles are published - """ - if not relation.app or not relation.units or not relation.data: - return False - - # check if unit data is published - for worker_unit in relation.units: - try: - ClusterRequirerUnitData.load(relation.data[worker_unit]) - except DataValidationError: - return False - - # check if app data is published - if self._charm.unit.is_leader(): - try: - ClusterRequirerAppData.load(relation.data[relation.app]) - except DataValidationError: - return False - - return True - - -class ClusterRequirer(Object): - """``-cluster`` requirer endpoint wrapper.""" - - on = ClusterRequirerEvents() # type: ignore - - def __init__( - self, - charm: ops.CharmBase, - key: Optional[str] = None, - endpoint: str = DEFAULT_ENDPOINT_NAME, - ): - super().__init__(charm, key or endpoint) - self._charm = charm - self.juju_topology = cosl.JujuTopology.from_charm(self._charm) - - relation = self.model.get_relation(endpoint) - self.relation: Optional[ops.Relation] = ( - relation if relation and relation.app and relation.data else None - ) - - self.framework.observe( - self._charm.on[endpoint].relation_changed, self._on_cluster_relation_changed # type: ignore - ) - self.framework.observe( - self._charm.on[endpoint].relation_created, self._on_cluster_relation_created # type: ignore - ) - self.framework.observe( - self._charm.on[endpoint].relation_broken, self._on_cluster_relation_broken # type: ignore - ) - - def _on_cluster_relation_broken(self, _event: ops.RelationBrokenEvent): - self.on.removed.emit() - - def _on_cluster_relation_created(self, event: ops.RelationCreatedEvent): - self.on.created.emit(relation=event.relation, app=event.app, unit=event.unit) - - def _on_cluster_relation_changed(self, _event: ops.RelationChangedEvent): - # to prevent the event from firing if the relation is in an unhealthy state (breaking...) - if self.relation: - new_config = self.get_worker_config() - if new_config: - self.on.config_received.emit(new_config) - - # if we have published our data, but we receive an empty/invalid config, - # then the remote end must have removed it. - elif self.is_published(): - self.on.removed.emit() - - def is_published(self): - """Verify that the local side has done all they need to do. - - - unit address is published - - roles are published - """ - relation = self.relation - if not relation: - return False - - unit_data = relation.data[self._charm.unit] - app_data = relation.data[self._charm.app] - - try: - ClusterRequirerUnitData.load(unit_data) - if self._charm.unit.is_leader(): - ClusterRequirerAppData.load(app_data) - except DataValidationError as e: - log.info(f"invalid databag contents: {e}") - return False - return True - - def publish_unit_address(self, url: str): - """Publish this unit's URL via the unit databag.""" - try: - urlparse(url) - except Exception as e: - raise ValueError(f"{url} is an invalid url") from e - - databag_model = ClusterRequirerUnitData( - juju_topology=dict(self.juju_topology.as_dict()), # type: ignore - address=url, - ) - relation = self.relation - if relation: - unit_databag = relation.data[self.model.unit] # type: ignore # all checks are done in __init__ - databag_model.dump(unit_databag) - - def publish_app_roles(self, roles: Iterable[str]): - """Publish this application's roles via the application databag.""" - if not self._charm.unit.is_leader(): - raise DatabagAccessPermissionError("only the leader unit can publish roles.") - - relation = self.relation - if relation: - databag_model = ClusterRequirerAppData(role=",".join(roles)) - databag_model.dump(relation.data[self.model.app]) - - def _get_data_from_coordinator(self) -> Optional[ClusterProviderAppData]: - """Fetch the contents of the doordinator databag.""" - data: Optional[ClusterProviderAppData] = None - relation = self.relation - # TODO: does this need a leader guard ??? maybe? - if relation: - try: - databag = relation.data[relation.app] # type: ignore # all checks are done in __init__ - coordinator_databag = ClusterProviderAppData.load(databag) - data = coordinator_databag - except DataValidationError as e: - log.info(f"invalid databag contents: {e}") - return None # explicit is better than implicit - - return data - - def get_worker_config(self) -> Dict[str, Any]: - """Fetch the worker config from the coordinator databag.""" - data = self._get_data_from_coordinator() - if data: - return yaml.safe_load(data.worker_config) - return {} - - def get_loki_endpoints(self) -> Dict[str, str]: - """Fetch the loki endpoints from the coordinator databag.""" - data = self._get_data_from_coordinator() - if data: - return data.loki_endpoints or {} - return {} - - def get_tls_data(self, allow_none: bool = False) -> Optional[TLSData]: - """Fetch certificates and the private key secrets id for the worker config.""" - data = self._get_data_from_coordinator() - if not data: - return None - - if ( - not data.ca_cert or not data.server_cert or not data.privkey_secret_id - ) and not allow_none: - return None - - return TLSData( - ca_cert=data.ca_cert, - server_cert=data.server_cert, - privkey_secret_id=data.privkey_secret_id, - s3_tls_ca_chain=data.s3_tls_ca_chain, - ) - - def get_charm_tracing_receivers(self) -> Dict[str, str]: - """Fetch the charm tracing receivers from the coordinator databag.""" - data = self._get_data_from_coordinator() - if data: - return data.charm_tracing_receivers or {} - return {} - - def get_workload_tracing_receivers(self) -> Dict[str, str]: - """Fetch the workload tracing receivers from the coordinator databag.""" - data = self._get_data_from_coordinator() - if data: - return data.workload_tracing_receivers or {} - return {} - - def get_remote_write_endpoints(self) -> List[RemoteWriteEndpoint]: - """Fetch the remote write endpoints from the coordinator databag.""" - data = self._get_data_from_coordinator() - if data: - return data.remote_write_endpoints or [] - return [] +logger = getLogger("interface-deprecated") +logger.warning( + "this module has been deprecated and may be removed in a future version: " + "please use cosl.interfaces.cluster and cosl.interfaces.utils instead." +) diff --git a/src/cosl/coordinated_workers/worker.py b/src/cosl/coordinated_workers/worker.py index 571889a..2c0cebd 100644 --- a/src/cosl/coordinated_workers/worker.py +++ b/src/cosl/coordinated_workers/worker.py @@ -22,8 +22,8 @@ from ops.pebble import Check, Layer, PathError, Plan, ProtocolError from cosl import JujuTopology -from cosl.coordinated_workers.interface import ClusterRequirer, TLSData from cosl.helpers import check_libs_installed +from cosl.interfaces.cluster import ClusterRequirer, TLSData check_libs_installed( "charms.loki_k8s.v1.loki_push_api", diff --git a/src/cosl/interfaces/__init__.py b/src/cosl/interfaces/__init__.py new file mode 100644 index 0000000..39356be --- /dev/null +++ b/src/cosl/interfaces/__init__.py @@ -0,0 +1,5 @@ +#!/usr/bin/env python3 +# Copyright 2024 Canonical +# See LICENSE file for licensing details. + +"""Interfaces module.""" diff --git a/src/cosl/interfaces/cluster.py b/src/cosl/interfaces/cluster.py new file mode 100644 index 0000000..abb9bdb --- /dev/null +++ b/src/cosl/interfaces/cluster.py @@ -0,0 +1,533 @@ +#!/usr/bin/env python3 +# Copyright 2024 Canonical +# See LICENSE file for licensing details. + +"""Shared utilities for the coordinator -> worker "cluster" interface. + +As this relation is cluster-internal and not intended for third-party charms to interact with +`-coordinator-k8s`, its only user will be the -worker-k8s charm. As such, +it does not live in a charm lib as most other relation endpoint wrappers do. +""" + +import collections +import json +import logging +from typing import ( + Any, + Counter, + Dict, + FrozenSet, + Iterable, + List, + Mapping, + NamedTuple, + Optional, + Set, + Tuple, +) +from urllib.parse import urlparse + +import ops +import pydantic +import yaml +from ops import EventSource, Object, ObjectEvents, RelationCreatedEvent +from typing_extensions import TypedDict + +import cosl +import cosl.interfaces.utils + +log = logging.getLogger("_cluster") + +DEFAULT_ENDPOINT_NAME = "-cluster" + + +# ============= +# | Interface | +# ============= + + +class RemoteWriteEndpoint(TypedDict): + """Type of the remote write endpoints to be passed to the worker through cluster relation data.""" + + url: str + + +class ConfigReceivedEvent(ops.EventBase): + """Event emitted when the "-cluster" provider has shared a new config.""" + + config: Dict[str, Any] + """The config.""" + + def __init__(self, handle: ops.framework.Handle, config: Dict[str, Any]): + super().__init__(handle) + self.config = config + + def snapshot(self) -> Dict[str, Any]: + """Used by the framework to serialize the event to disk. + + Not meant to be called by charm code. + """ + return {"config": json.dumps(self.config)} + + def restore(self, snapshot: Dict[str, Any]): + """Used by the framework to deserialize the event from disk. + + Not meant to be called by charm code. + """ + self.relation = json.loads(snapshot["config"]) # noqa + + +class ClusterError(Exception): + """Base class for exceptions raised by this module.""" + + +class DatabagAccessPermissionError(ClusterError): + """Raised when a follower attempts to write leader settings.""" + + +class _Topology(pydantic.BaseModel): + """Juju topology information.""" + + application: str + unit: str + charm_name: str + + +class ClusterRequirerAppData(cosl.interfaces.utils.DatabagModel): + """App data that the worker sends to the coordinator.""" + + role: str + + +class ClusterRequirerUnitData(cosl.interfaces.utils.DatabagModel): + """Unit data the worker sends to the coordinator.""" + + juju_topology: _Topology + address: str + + +class ClusterProviderAppData(cosl.interfaces.utils.DatabagModel): + """App data that the coordinator sends to the worker.""" + + ### worker node configuration + worker_config: str + """The whole worker workload configuration, whatever it is. E.g. yaml-encoded things.""" + + ### self-monitoring stuff + loki_endpoints: Optional[Dict[str, str]] = None + """Endpoints to which the workload (and the worker charm) can push logs to.""" + charm_tracing_receivers: Optional[Dict[str, str]] = None + """Endpoints to which the the worker charm can push charm traces to.""" + workload_tracing_receivers: Optional[Dict[str, str]] = None + """Endpoints to which the the worker can push workload traces to.""" + remote_write_endpoints: Optional[List[RemoteWriteEndpoint]] = None + """Endpoints to which the workload (and the worker charm) can push metrics to.""" + + ### TLS stuff + ca_cert: Optional[str] = None + server_cert: Optional[str] = None + privkey_secret_id: Optional[str] = None + s3_tls_ca_chain: Optional[str] = None + + +class TLSData(NamedTuple): + """Section of the cluster data that concerns TLS information.""" + + ca_cert: Optional[str] + server_cert: Optional[str] + privkey_secret_id: Optional[str] + s3_tls_ca_chain: Optional[str] + + +class ClusterChangedEvent(ops.EventBase): + """Event emitted when any "-cluster" relation event fires.""" + + +class ClusterRemovedEvent(ops.EventBase): + """Event emitted when the relation with the "-cluster" provider has been severed. + + Or when the relation data has been wiped. + """ + + +class ClusterProviderEvents(ObjectEvents): + """Events emitted by the ClusterProvider "-cluster" endpoint wrapper.""" + + changed = EventSource(ClusterChangedEvent) + + +class ClusterRequirerEvents(ObjectEvents): + """Events emitted by the ClusterRequirer "-cluster" endpoint wrapper.""" + + config_received = EventSource(ConfigReceivedEvent) + created = EventSource(RelationCreatedEvent) + removed = EventSource(ClusterRemovedEvent) + + +class ClusterProvider(Object): + """``-cluster`` provider endpoint wrapper.""" + + on = ClusterProviderEvents() # type: ignore + + def __init__( + self, + charm: ops.CharmBase, + roles: FrozenSet[str], + meta_roles: Optional[Mapping[str, Iterable[str]]] = None, + key: Optional[str] = None, + endpoint: str = DEFAULT_ENDPOINT_NAME, + ): + super().__init__(charm, key or endpoint) + self._charm = charm + self._roles = roles + self._meta_roles = meta_roles or {} + self.juju_topology = cosl.JujuTopology.from_charm(self._charm) + + # filter out common unhappy relation states + self._relations: List[ops.Relation] = [ + rel for rel in self.model.relations[endpoint] if (rel.app and rel.data) + ] + + # we coalesce all -cluster-relation-* events into a single cluster-changed API. + # the coordinator uses a common exit hook reconciler, that's why. + self.framework.observe(self._charm.on[endpoint].relation_created, self._on_cluster_changed) + self.framework.observe(self._charm.on[endpoint].relation_joined, self._on_cluster_changed) + self.framework.observe(self._charm.on[endpoint].relation_changed, self._on_cluster_changed) + self.framework.observe( + self._charm.on[endpoint].relation_departed, self._on_cluster_changed + ) + self.framework.observe(self._charm.on[endpoint].relation_broken, self._on_cluster_changed) + + def _on_cluster_changed(self, _: ops.EventBase) -> None: + self.on.changed.emit() + + def grant_privkey(self, label: str) -> str: + """Grant the secret containing the privkey to all relations, and return the secret ID.""" + secret = self.model.get_secret(label=label) + for relation in self._relations: + secret.grant(relation) + # can't return secret.id because secret was obtained by label, and so + # we don't have an ID unless we fetch it + return secret.get_info().id + + def publish_data( + self, + worker_config: str, + ca_cert: Optional[str] = None, + server_cert: Optional[str] = None, + s3_tls_ca_chain: Optional[str] = None, + privkey_secret_id: Optional[str] = None, + loki_endpoints: Optional[Dict[str, str]] = None, + charm_tracing_receivers: Optional[Dict[str, str]] = None, + workload_tracing_receivers: Optional[Dict[str, str]] = None, + remote_write_endpoints: Optional[List[RemoteWriteEndpoint]] = None, + ) -> None: + """Publish the config to all related worker clusters.""" + for relation in self._relations: + if relation and self._remote_data_ready(relation): + local_app_databag = ClusterProviderAppData( + worker_config=worker_config, + loki_endpoints=loki_endpoints, + ca_cert=ca_cert, + server_cert=server_cert, + privkey_secret_id=privkey_secret_id, + charm_tracing_receivers=charm_tracing_receivers, + workload_tracing_receivers=workload_tracing_receivers, + remote_write_endpoints=remote_write_endpoints, + s3_tls_ca_chain=s3_tls_ca_chain, + ) + local_app_databag.dump(relation.data[self.model.app]) + + @property + def has_workers(self) -> bool: + """Return True if the coordinator is connected to any worker.""" + # we use the presence of relations instead of addresses, because we want this + # check to fail early + return bool(self._relations) + + def _expand_roles(self, role_string: str) -> Set[str]: + """Expand the meta-roles from a comma-separated list of roles.""" + expanded_roles: Set[str] = set() + for role in role_string.split(","): + if role in self._meta_roles: + expanded_roles.update(self._meta_roles[role]) + else: + expanded_roles.update({role}) + return expanded_roles + + def gather_addresses_by_role(self) -> Dict[str, Set[str]]: + """Go through the worker's unit databags to collect all the addresses published by the units, by role.""" + data: Dict[str, Set[str]] = collections.defaultdict(set) + for relation in self._relations: + + if not relation.app: + log.debug(f"skipped {relation} as .app is None") + continue + + try: + worker_app_data = ClusterRequirerAppData.load(relation.data[relation.app]) + except cosl.interfaces.utils.DataValidationError as e: + log.info(f"invalid databag contents: {e}") + continue + + for worker_unit in relation.units: + try: + worker_data = ClusterRequirerUnitData.load(relation.data[worker_unit]) + unit_address = worker_data.address + for role in self._expand_roles(worker_app_data.role): + data[role].add(unit_address) + except cosl.interfaces.utils.DataValidationError as e: + log.info(f"invalid databag contents: {e}") + continue + return data + + def gather_addresses(self) -> Tuple[str, ...]: + """Go through the worker's unit databags to collect all the addresses published by the units.""" + data: Set[str] = set() + addresses_by_role = self.gather_addresses_by_role() + for _, address_set in addresses_by_role.items(): + data.update(address_set) + + return tuple(sorted(data)) + + def gather_roles(self) -> Dict[str, int]: + """Go through the worker's app databags and sum the available application roles.""" + data: Counter[str] = collections.Counter() + for relation in self._relations: + if relation.app: + remote_app_databag = relation.data[relation.app] + try: + worker_role: str = ClusterRequirerAppData.load(remote_app_databag).role + except cosl.interfaces.utils.DataValidationError as e: + log.error(f"invalid databag contents: {e}") + continue + + # the number of units with each role is the number of remote units + role_n = len(relation.units) # exclude this unit + for role in self._expand_roles(worker_role): + data[role] += role_n + + dct = dict(data) + return dct + + def gather_topology(self) -> List[Dict[str, str]]: + """Gather Juju topology by unit.""" + data: List[Dict[str, str]] = [] + for relation in self._relations: + if not relation.app: + continue + + for worker_unit in relation.units: + try: + worker_data = ClusterRequirerUnitData.load(relation.data[worker_unit]) + unit_address = worker_data.address + except cosl.interfaces.utils.DataValidationError as e: + log.info(f"invalid databag contents: {e}") + continue + worker_topology = { + "address": unit_address, + "application": worker_data.juju_topology.application, + "unit": worker_data.juju_topology.unit, + "charm_name": worker_data.juju_topology.charm_name, + } + data.append(worker_topology) + + return data + + def get_address_from_role(self, role: str) -> Optional[str]: + """Get datasource address.""" + addresses_by_role = self.gather_addresses_by_role() + if address_set := addresses_by_role.get(role, None): + return address_set.pop() + return None + + def _remote_data_ready(self, relation: ops.Relation) -> bool: + """Verify that each worker unit and the worker leader have published their data to the cluster relation. + + - unit address is published + - roles are published + """ + if not relation.app or not relation.units or not relation.data: + return False + + # check if unit data is published + for worker_unit in relation.units: + try: + ClusterRequirerUnitData.load(relation.data[worker_unit]) + except cosl.interfaces.utils.DataValidationError: + return False + + # check if app data is published + if self._charm.unit.is_leader(): + try: + ClusterRequirerAppData.load(relation.data[relation.app]) + except cosl.interfaces.utils.DataValidationError: + return False + + return True + + +class ClusterRequirer(Object): + """``-cluster`` requirer endpoint wrapper.""" + + on = ClusterRequirerEvents() # type: ignore + + def __init__( + self, + charm: ops.CharmBase, + key: Optional[str] = None, + endpoint: str = DEFAULT_ENDPOINT_NAME, + ): + super().__init__(charm, key or endpoint) + self._charm = charm + self.juju_topology = cosl.JujuTopology.from_charm(self._charm) + + relation = self.model.get_relation(endpoint) + self.relation: Optional[ops.Relation] = ( + relation if relation and relation.app and relation.data else None + ) + + self.framework.observe( + self._charm.on[endpoint].relation_changed, self._on_cluster_relation_changed # type: ignore + ) + self.framework.observe( + self._charm.on[endpoint].relation_created, self._on_cluster_relation_created # type: ignore + ) + self.framework.observe( + self._charm.on[endpoint].relation_broken, self._on_cluster_relation_broken # type: ignore + ) + + def _on_cluster_relation_broken(self, _event: ops.RelationBrokenEvent): + self.on.removed.emit() + + def _on_cluster_relation_created(self, event: ops.RelationCreatedEvent): + self.on.created.emit(relation=event.relation, app=event.app, unit=event.unit) + + def _on_cluster_relation_changed(self, _event: ops.RelationChangedEvent): + # to prevent the event from firing if the relation is in an unhealthy state (breaking...) + if self.relation: + new_config = self.get_worker_config() + if new_config: + self.on.config_received.emit(new_config) + + # if we have published our data, but we receive an empty/invalid config, + # then the remote end must have removed it. + elif self.is_published(): + self.on.removed.emit() + + def is_published(self): + """Verify that the local side has done all they need to do. + + - unit address is published + - roles are published + """ + relation = self.relation + if not relation: + return False + + unit_data = relation.data[self._charm.unit] + app_data = relation.data[self._charm.app] + + try: + ClusterRequirerUnitData.load(unit_data) + if self._charm.unit.is_leader(): + ClusterRequirerAppData.load(app_data) + except cosl.interfaces.utils.DataValidationError as e: + log.info(f"invalid databag contents: {e}") + return False + return True + + def publish_unit_address(self, url: str): + """Publish this unit's URL via the unit databag.""" + try: + urlparse(url) + except Exception as e: + raise ValueError(f"{url} is an invalid url") from e + + databag_model = ClusterRequirerUnitData( + juju_topology=dict(self.juju_topology.as_dict()), # type: ignore + address=url, + ) + relation = self.relation + if relation: + unit_databag = relation.data[self.model.unit] # type: ignore # all checks are done in __init__ + databag_model.dump(unit_databag) + + def publish_app_roles(self, roles: Iterable[str]): + """Publish this application's roles via the application databag.""" + if not self._charm.unit.is_leader(): + raise DatabagAccessPermissionError("only the leader unit can publish roles.") + + relation = self.relation + if relation: + databag_model = ClusterRequirerAppData(role=",".join(roles)) + databag_model.dump(relation.data[self.model.app]) + + def _get_data_from_coordinator(self) -> Optional[ClusterProviderAppData]: + """Fetch the contents of the doordinator databag.""" + data: Optional[ClusterProviderAppData] = None + relation = self.relation + # TODO: does this need a leader guard ??? maybe? + if relation: + try: + databag = relation.data[relation.app] # type: ignore # all checks are done in __init__ + coordinator_databag = ClusterProviderAppData.load(databag) + data = coordinator_databag + except cosl.interfaces.utils.DataValidationError as e: + log.info(f"invalid databag contents: {e}") + return None # explicit is better than implicit + + return data + + def get_worker_config(self) -> Dict[str, Any]: + """Fetch the worker config from the coordinator databag.""" + data = self._get_data_from_coordinator() + if data: + return yaml.safe_load(data.worker_config) + return {} + + def get_loki_endpoints(self) -> Dict[str, str]: + """Fetch the loki endpoints from the coordinator databag.""" + data = self._get_data_from_coordinator() + if data: + return data.loki_endpoints or {} + return {} + + def get_tls_data(self, allow_none: bool = False) -> Optional[TLSData]: + """Fetch certificates and the private key secrets id for the worker config.""" + data = self._get_data_from_coordinator() + if not data: + return None + + if ( + not data.ca_cert or not data.server_cert or not data.privkey_secret_id + ) and not allow_none: + return None + + return TLSData( + ca_cert=data.ca_cert, + server_cert=data.server_cert, + privkey_secret_id=data.privkey_secret_id, + s3_tls_ca_chain=data.s3_tls_ca_chain, + ) + + def get_charm_tracing_receivers(self) -> Dict[str, str]: + """Fetch the charm tracing receivers from the coordinator databag.""" + data = self._get_data_from_coordinator() + if data: + return data.charm_tracing_receivers or {} + return {} + + def get_workload_tracing_receivers(self) -> Dict[str, str]: + """Fetch the workload tracing receivers from the coordinator databag.""" + data = self._get_data_from_coordinator() + if data: + return data.workload_tracing_receivers or {} + return {} + + def get_remote_write_endpoints(self) -> List[RemoteWriteEndpoint]: + """Fetch the remote write endpoints from the coordinator databag.""" + data = self._get_data_from_coordinator() + if data: + return data.remote_write_endpoints or [] + return [] diff --git a/src/cosl/interfaces/utils.py b/src/cosl/interfaces/utils.py new file mode 100644 index 0000000..16b1557 --- /dev/null +++ b/src/cosl/interfaces/utils.py @@ -0,0 +1,83 @@ +#!/usr/bin/env python3 +# Copyright 2024 Canonical +# See LICENSE file for licensing details. + +"""Shared utilities for the cosl interfaces.""" + +import json +import logging +from typing import ( + MutableMapping, + Optional, +) + +import pydantic +from pydantic import ConfigDict + +log = logging.getLogger("utils") + +BUILTIN_JUJU_KEYS = {"ingress-address", "private-address", "egress-subnets"} + +# ================= +# | Databag Model | +# ================= + +# Note: MutableMapping is imported from the typing module and not collections.abc +# because subscripting collections.abc.MutableMapping was added in python 3.9, but +# most of our charms are based on 20.04, which has python 3.8. + +_RawDatabag = MutableMapping[str, str] + + +class DataValidationError(Exception): + """Raised when relation databag validation fails.""" + + +class DatabagModel(pydantic.BaseModel): + """Base databag model.""" + + model_config = ConfigDict( + # tolerate additional keys in databag + extra="ignore", + # Allow instantiating this class by field name (instead of forcing alias). + populate_by_name=True, + ) # type: ignore + """Pydantic config.""" + + @classmethod + def load(cls, databag: _RawDatabag): + """Load this model from a Juju databag.""" + try: + data = { + k: json.loads(v) + for k, v in databag.items() + # Don't attempt to parse model-external values + if k in {(f.alias or n) for n, f in cls.__fields__.items()} # type: ignore + } + except json.JSONDecodeError as e: + msg = f"invalid databag contents: expecting json. {databag}" + log.error(msg) + raise DataValidationError(msg) from e + + try: + return cls.model_validate_json(json.dumps(data)) # type: ignore + except pydantic.ValidationError as e: + msg = f"failed to validate databag: {databag}" + if databag: + log.debug(msg, exc_info=True) + raise DataValidationError(msg) from e + + def dump(self, databag: Optional[_RawDatabag] = None, clear: bool = True) -> _RawDatabag: + """Write the contents of this model to Juju databag. + + :param databag: the databag to write the data to. + :param clear: ensure the databag is cleared before writing it. + """ + _databag: _RawDatabag = {} if databag is None else databag + + if clear: + _databag.clear() + + dct = self.model_dump(mode="json", by_alias=True, exclude_defaults=True) # type: ignore + _databag.update({k: json.dumps(v) for k, v in dct.items()}) + return _databag diff --git a/tests/test_coordinated_workers/test_coordinator.py b/tests/test_coordinated_workers/test_coordinator.py index ca4c466..e79103f 100644 --- a/tests/test_coordinated_workers/test_coordinator.py +++ b/tests/test_coordinated_workers/test_coordinator.py @@ -10,7 +10,7 @@ Coordinator, S3NotFoundError, ) -from src.cosl.coordinated_workers.interface import ClusterRequirerAppData +from src.cosl.interfaces.cluster import ClusterRequirerAppData @pytest.fixture diff --git a/tests/test_coordinated_workers/test_coordinator_status.py b/tests/test_coordinated_workers/test_coordinator_status.py index 63232c2..9424ad5 100644 --- a/tests/test_coordinated_workers/test_coordinator_status.py +++ b/tests/test_coordinated_workers/test_coordinator_status.py @@ -9,7 +9,7 @@ from ops import testing from cosl.coordinated_workers.coordinator import ClusterRolesConfig, Coordinator -from cosl.coordinated_workers.interface import ClusterProviderAppData, ClusterRequirerAppData +from cosl.interfaces.cluster import ClusterProviderAppData, ClusterRequirerAppData from tests.test_coordinated_workers.test_worker_status import k8s_patch my_roles = ClusterRolesConfig( diff --git a/tests/test_coordinated_workers/test_interface.py b/tests/test_coordinated_workers/test_interface.py index b9f19cd..08e7b2f 100644 --- a/tests/test_coordinated_workers/test_interface.py +++ b/tests/test_coordinated_workers/test_interface.py @@ -1,4 +1,4 @@ -from cosl.coordinated_workers.interface import DatabagModel +from cosl.interfaces.utils import DatabagModel def test_databag_dump_update(): diff --git a/tests/test_coordinated_workers/test_worker_status.py b/tests/test_coordinated_workers/test_worker_status.py index 3ffe71f..77912da 100644 --- a/tests/test_coordinated_workers/test_worker_status.py +++ b/tests/test_coordinated_workers/test_worker_status.py @@ -8,11 +8,11 @@ import tenacity from ops import testing -from cosl.coordinated_workers.interface import ClusterProviderAppData from cosl.coordinated_workers.worker import ( NoReadinessCheckEndpointConfiguredError, Worker, ) +from cosl.interfaces.cluster import ClusterProviderAppData @pytest.fixture(params=[True, False]) diff --git a/tests/test_databag_model.py b/tests/test_databag_model.py index dcf17c1..cb967b7 100644 --- a/tests/test_databag_model.py +++ b/tests/test_databag_model.py @@ -2,7 +2,7 @@ import pytest -from cosl.coordinated_workers.interface import DatabagModel, DataValidationError +from cosl.interfaces.utils import DatabagModel, DataValidationError class MyDataModel(DatabagModel):