From f1cbe0ac9bfe2440aeccd3dd60429fc5bec7d683 Mon Sep 17 00:00:00 2001 From: Pietro Pasotti Date: Tue, 3 Dec 2024 12:04:35 +0100 Subject: [PATCH 1/9] fixed dependencies --- pyproject.toml | 49 ++++++++++++++++++++++++++++--------------------- tox.ini | 6 +----- 2 files changed, 29 insertions(+), 26 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 07f6ae6..41a48f2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,19 +6,21 @@ build-backend = "hatchling.build" name = "cosl" version = "0.0.44" authors = [ - { name="sed-i", email="82407168+sed-i@users.noreply.github.com" }, + { name = "sed-i", email = "82407168+sed-i@users.noreply.github.com" }, ] description = "Utils for COS Lite charms" readme = "README.md" -license = {file = "LICENSE"} +license = { file = "LICENSE" } requires-python = ">=3.8" dependencies = [ - "ops", - "pydantic", - "tenacity", - "PyYAML", - "typing-extensions", - "lightkube>=v0.15.4" + "ops", + "pydantic", + "tenacity", + "jsonschema", + "PyYAML", + "typing-extensions", + "lightkube>=v0.15.4", + "charm-relation-interfaces @ git+https://github.com/canonical/charm-relation-interfaces@grafana_datasource_exchange", ] classifiers = [ "Programming Language :: Python :: 3.8", @@ -60,21 +62,21 @@ extend-exclude = ["__pycache__", "*.egg_info"] [tool.ruff.lint] select = ["E", "W", "F", "C", "N", "D", "I001"] extend-ignore = [ - "D203", - "D204", - "D213", - "D215", - "D400", - "D404", - "D406", - "D407", - "D408", - "D409", - "D413", - "E402", + "D203", + "D204", + "D213", + "D215", + "D400", + "D404", + "D406", + "D407", + "D408", + "D409", + "D413", + "E402", ] ignore = ["E501", "D107"] -per-file-ignores = {"tests/*" = ["D100","D101","D102","D103","D104"]} +per-file-ignores = { "tests/*" = ["D100", "D101", "D102", "D103", "D104"] } [tool.ruff.lint.pydocstyle] convention = "google" @@ -99,3 +101,8 @@ reportTypeCommentUsage = false [tool.codespell] skip = ".git,.tox,build,lib,venv*,.mypy_cache" ignore-words-list = "assertIn" + +[tool.hatch.metadata] +# allow git+ dependencies in pyproject +allow-direct-references = true + diff --git a/tox.ini b/tox.ini index e579a02..c984ba9 100644 --- a/tox.ini +++ b/tox.ini @@ -70,17 +70,13 @@ commands = [testenv:unit] description = Run unit tests deps = + . deepdiff fs pytest pytest-cov ops[testing] PyYAML - typing_extensions - cryptography - jsonschema - lightkube>=v0.15.4 - lightkube-models==1.24.1.4 allowlist_externals = /usr/bin/env charmcraft From f58071cc69d3a654e38e89563e10bc0532207eae Mon Sep 17 00:00:00 2001 From: Pietro Pasotti Date: Tue, 3 Dec 2024 12:23:43 +0100 Subject: [PATCH 2/9] moved cluster over to new dir --- src/cosl/coordinated_workers/coordinator.py | 2 +- src/cosl/coordinated_workers/interface.py | 601 +----------------- src/cosl/coordinated_workers/worker.py | 2 +- src/cosl/interfaces/__init__.py | 5 + src/cosl/interfaces/cluster.py | 533 ++++++++++++++++ src/cosl/interfaces/utils.py | 83 +++ .../test_coordinator.py | 2 +- .../test_coordinator_status.py | 2 +- .../test_interface.py | 2 +- .../test_worker_status.py | 2 +- tests/test_databag_model.py | 2 +- 11 files changed, 639 insertions(+), 597 deletions(-) create mode 100644 src/cosl/interfaces/__init__.py create mode 100644 src/cosl/interfaces/cluster.py create mode 100644 src/cosl/interfaces/utils.py diff --git a/src/cosl/coordinated_workers/coordinator.py b/src/cosl/coordinated_workers/coordinator.py index 5d0df29..9c5afae 100644 --- a/src/cosl/coordinated_workers/coordinator.py +++ b/src/cosl/coordinated_workers/coordinator.py @@ -32,13 +32,13 @@ import cosl from cosl.coordinated_workers import worker -from cosl.coordinated_workers.interface import ClusterProvider, RemoteWriteEndpoint from cosl.coordinated_workers.nginx import ( Nginx, NginxMappingOverrides, NginxPrometheusExporter, ) from cosl.helpers import check_libs_installed +from cosl.interfaces.cluster import ClusterProvider, RemoteWriteEndpoint check_libs_installed( "charms.data_platform_libs.v0.s3", diff --git a/src/cosl/coordinated_workers/interface.py b/src/cosl/coordinated_workers/interface.py index 45188cb..4cc8b54 100644 --- a/src/cosl/coordinated_workers/interface.py +++ b/src/cosl/coordinated_workers/interface.py @@ -1,599 +1,20 @@ #!/usr/bin/env python3 -# Copyright 2024 Canonical +# Copyright 2024 Canonical Ltd. # See LICENSE file for licensing details. -"""Shared utilities for the coordinator -> worker "cluster" interface. +"""This module is only here for backwards compatibility. -As this relation is cluster-internal and not intended for third-party charms to interact with -`-coordinator-k8s`, its only user will be the -worker-k8s charm. As such, -it does not live in a charm lib as most other relation endpoint wrappers do. +Its contents have been moved over to cosl.interfaces.cluster and cosl.interfaces.utils. """ -import collections -import json -import logging -from typing import ( - Any, - Counter, - Dict, - FrozenSet, - Iterable, - List, - Mapping, - MutableMapping, - NamedTuple, - Optional, - Set, - Tuple, -) -from urllib.parse import urlparse - -import ops -import pydantic -import yaml -from ops import EventSource, Object, ObjectEvents, RelationCreatedEvent -from pydantic import ConfigDict -from typing_extensions import TypedDict - -import cosl - -log = logging.getLogger("_cluster") - -DEFAULT_ENDPOINT_NAME = "-cluster" -BUILTIN_JUJU_KEYS = {"ingress-address", "private-address", "egress-subnets"} - -# ================= -# | Databag Model | -# ================= - -# Note: MutableMapping is imported from the typing module and not collections.abc -# because subscripting collections.abc.MutableMapping was added in python 3.9, but -# most of our charms are based on 20.04, which has python 3.8. - -_RawDatabag = MutableMapping[str, str] - - -class DataValidationError(Exception): - """Raised when relation databag validation fails.""" - - -class DatabagModel(pydantic.BaseModel): - """Base databag model.""" - - model_config = ConfigDict( - # tolerate additional keys in databag - extra="ignore", - # Allow instantiating this class by field name (instead of forcing alias). - populate_by_name=True, - ) # type: ignore - """Pydantic config.""" - - @classmethod - def load(cls, databag: _RawDatabag): - """Load this model from a Juju databag.""" - try: - data = { - k: json.loads(v) - for k, v in databag.items() - # Don't attempt to parse model-external values - if k in {(f.alias or n) for n, f in cls.__fields__.items()} # type: ignore - } - except json.JSONDecodeError as e: - msg = f"invalid databag contents: expecting json. {databag}" - log.error(msg) - raise DataValidationError(msg) from e - - try: - return cls.model_validate_json(json.dumps(data)) # type: ignore - except pydantic.ValidationError as e: - msg = f"failed to validate databag: {databag}" - if databag: - log.debug(msg, exc_info=True) - raise DataValidationError(msg) from e - - def dump(self, databag: Optional[_RawDatabag] = None, clear: bool = True) -> _RawDatabag: - """Write the contents of this model to Juju databag. - - :param databag: the databag to write the data to. - :param clear: ensure the databag is cleared before writing it. - """ - _databag: _RawDatabag = {} if databag is None else databag - - if clear: - _databag.clear() - - dct = self.model_dump(mode="json", by_alias=True, exclude_defaults=True) # type: ignore - _databag.update({k: json.dumps(v) for k, v in dct.items()}) - return _databag - - -# ============= -# | Interface | -# ============= - - -class RemoteWriteEndpoint(TypedDict): - """Type of the remote write endpoints to be passed to the worker through cluster relation data.""" - - url: str - - -class ConfigReceivedEvent(ops.EventBase): - """Event emitted when the "-cluster" provider has shared a new config.""" - - config: Dict[str, Any] - """The config.""" - - def __init__(self, handle: ops.framework.Handle, config: Dict[str, Any]): - super().__init__(handle) - self.config = config - - def snapshot(self) -> Dict[str, Any]: - """Used by the framework to serialize the event to disk. - - Not meant to be called by charm code. - """ - return {"config": json.dumps(self.config)} - - def restore(self, snapshot: Dict[str, Any]): - """Used by the framework to deserialize the event from disk. - - Not meant to be called by charm code. - """ - self.relation = json.loads(snapshot["config"]) # noqa - - -class ClusterError(Exception): - """Base class for exceptions raised by this module.""" - - -class DatabagAccessPermissionError(ClusterError): - """Raised when a follower attempts to write leader settings.""" - - -class _Topology(pydantic.BaseModel): - """Juju topology information.""" - - application: str - unit: str - charm_name: str - - -class ClusterRequirerAppData(DatabagModel): - """App data that the worker sends to the coordinator.""" - - role: str - - -class ClusterRequirerUnitData(DatabagModel): - """Unit data the worker sends to the coordinator.""" - - juju_topology: _Topology - address: str - - -class ClusterProviderAppData(DatabagModel): - """App data that the coordinator sends to the worker.""" - - ### worker node configuration - worker_config: str - """The whole worker workload configuration, whatever it is. E.g. yaml-encoded things.""" - - ### self-monitoring stuff - loki_endpoints: Optional[Dict[str, str]] = None - """Endpoints to which the workload (and the worker charm) can push logs to.""" - charm_tracing_receivers: Optional[Dict[str, str]] = None - """Endpoints to which the the worker charm can push charm traces to.""" - workload_tracing_receivers: Optional[Dict[str, str]] = None - """Endpoints to which the the worker can push workload traces to.""" - remote_write_endpoints: Optional[List[RemoteWriteEndpoint]] = None - """Endpoints to which the workload (and the worker charm) can push metrics to.""" - - ### TLS stuff - ca_cert: Optional[str] = None - server_cert: Optional[str] = None - privkey_secret_id: Optional[str] = None - s3_tls_ca_chain: Optional[str] = None - - -class TLSData(NamedTuple): - """Section of the cluster data that concerns TLS information.""" - - ca_cert: Optional[str] - server_cert: Optional[str] - privkey_secret_id: Optional[str] - s3_tls_ca_chain: Optional[str] - - -class ClusterChangedEvent(ops.EventBase): - """Event emitted when any "-cluster" relation event fires.""" - - -class ClusterRemovedEvent(ops.EventBase): - """Event emitted when the relation with the "-cluster" provider has been severed. - - Or when the relation data has been wiped. - """ - - -class ClusterProviderEvents(ObjectEvents): - """Events emitted by the ClusterProvider "-cluster" endpoint wrapper.""" - changed = EventSource(ClusterChangedEvent) +from cosl.interfaces.utils import DatabagModel, DataValidationError # noqa # type:ignore +from cosl.interfaces.cluster import * # noqa # type:ignore +from logging import getLogger -class ClusterRequirerEvents(ObjectEvents): - """Events emitted by the ClusterRequirer "-cluster" endpoint wrapper.""" - - config_received = EventSource(ConfigReceivedEvent) - created = EventSource(RelationCreatedEvent) - removed = EventSource(ClusterRemovedEvent) - - -class ClusterProvider(Object): - """``-cluster`` provider endpoint wrapper.""" - - on = ClusterProviderEvents() # type: ignore - - def __init__( - self, - charm: ops.CharmBase, - roles: FrozenSet[str], - meta_roles: Optional[Mapping[str, Iterable[str]]] = None, - key: Optional[str] = None, - endpoint: str = DEFAULT_ENDPOINT_NAME, - ): - super().__init__(charm, key or endpoint) - self._charm = charm - self._roles = roles - self._meta_roles = meta_roles or {} - self.juju_topology = cosl.JujuTopology.from_charm(self._charm) - - # filter out common unhappy relation states - self._relations: List[ops.Relation] = [ - rel for rel in self.model.relations[endpoint] if (rel.app and rel.data) - ] - - # we coalesce all -cluster-relation-* events into a single cluster-changed API. - # the coordinator uses a common exit hook reconciler, that's why. - self.framework.observe(self._charm.on[endpoint].relation_created, self._on_cluster_changed) - self.framework.observe(self._charm.on[endpoint].relation_joined, self._on_cluster_changed) - self.framework.observe(self._charm.on[endpoint].relation_changed, self._on_cluster_changed) - self.framework.observe( - self._charm.on[endpoint].relation_departed, self._on_cluster_changed - ) - self.framework.observe(self._charm.on[endpoint].relation_broken, self._on_cluster_changed) - - def _on_cluster_changed(self, _: ops.EventBase) -> None: - self.on.changed.emit() - - def grant_privkey(self, label: str) -> str: - """Grant the secret containing the privkey to all relations, and return the secret ID.""" - secret = self.model.get_secret(label=label) - for relation in self._relations: - secret.grant(relation) - # can't return secret.id because secret was obtained by label, and so - # we don't have an ID unless we fetch it - return secret.get_info().id - - def publish_data( - self, - worker_config: str, - ca_cert: Optional[str] = None, - server_cert: Optional[str] = None, - s3_tls_ca_chain: Optional[str] = None, - privkey_secret_id: Optional[str] = None, - loki_endpoints: Optional[Dict[str, str]] = None, - charm_tracing_receivers: Optional[Dict[str, str]] = None, - workload_tracing_receivers: Optional[Dict[str, str]] = None, - remote_write_endpoints: Optional[List[RemoteWriteEndpoint]] = None, - ) -> None: - """Publish the config to all related worker clusters.""" - for relation in self._relations: - if relation and self._remote_data_ready(relation): - local_app_databag = ClusterProviderAppData( - worker_config=worker_config, - loki_endpoints=loki_endpoints, - ca_cert=ca_cert, - server_cert=server_cert, - privkey_secret_id=privkey_secret_id, - charm_tracing_receivers=charm_tracing_receivers, - workload_tracing_receivers=workload_tracing_receivers, - remote_write_endpoints=remote_write_endpoints, - s3_tls_ca_chain=s3_tls_ca_chain, - ) - local_app_databag.dump(relation.data[self.model.app]) - - @property - def has_workers(self) -> bool: - """Return True if the coordinator is connected to any worker.""" - # we use the presence of relations instead of addresses, because we want this - # check to fail early - return bool(self._relations) - - def _expand_roles(self, role_string: str) -> Set[str]: - """Expand the meta-roles from a comma-separated list of roles.""" - expanded_roles: Set[str] = set() - for role in role_string.split(","): - if role in self._meta_roles: - expanded_roles.update(self._meta_roles[role]) - else: - expanded_roles.update({role}) - return expanded_roles - - def gather_addresses_by_role(self) -> Dict[str, Set[str]]: - """Go through the worker's unit databags to collect all the addresses published by the units, by role.""" - data: Dict[str, Set[str]] = collections.defaultdict(set) - for relation in self._relations: - - if not relation.app: - log.debug(f"skipped {relation} as .app is None") - continue - - try: - worker_app_data = ClusterRequirerAppData.load(relation.data[relation.app]) - except DataValidationError as e: - log.info(f"invalid databag contents: {e}") - continue - - for worker_unit in relation.units: - try: - worker_data = ClusterRequirerUnitData.load(relation.data[worker_unit]) - unit_address = worker_data.address - for role in self._expand_roles(worker_app_data.role): - data[role].add(unit_address) - except DataValidationError as e: - log.info(f"invalid databag contents: {e}") - continue - return data - - def gather_addresses(self) -> Tuple[str, ...]: - """Go through the worker's unit databags to collect all the addresses published by the units.""" - data: Set[str] = set() - addresses_by_role = self.gather_addresses_by_role() - for _, address_set in addresses_by_role.items(): - data.update(address_set) - - return tuple(sorted(data)) - - def gather_roles(self) -> Dict[str, int]: - """Go through the worker's app databags and sum the available application roles.""" - data: Counter[str] = collections.Counter() - for relation in self._relations: - if relation.app: - remote_app_databag = relation.data[relation.app] - try: - worker_role: str = ClusterRequirerAppData.load(remote_app_databag).role - except DataValidationError as e: - log.error(f"invalid databag contents: {e}") - continue - - # the number of units with each role is the number of remote units - role_n = len(relation.units) # exclude this unit - for role in self._expand_roles(worker_role): - data[role] += role_n - - dct = dict(data) - return dct - - def gather_topology(self) -> List[Dict[str, str]]: - """Gather Juju topology by unit.""" - data: List[Dict[str, str]] = [] - for relation in self._relations: - if not relation.app: - continue - - for worker_unit in relation.units: - try: - worker_data = ClusterRequirerUnitData.load(relation.data[worker_unit]) - unit_address = worker_data.address - except DataValidationError as e: - log.info(f"invalid databag contents: {e}") - continue - worker_topology = { - "address": unit_address, - "application": worker_data.juju_topology.application, - "unit": worker_data.juju_topology.unit, - "charm_name": worker_data.juju_topology.charm_name, - } - data.append(worker_topology) - - return data - - def get_address_from_role(self, role: str) -> Optional[str]: - """Get datasource address.""" - addresses_by_role = self.gather_addresses_by_role() - if address_set := addresses_by_role.get(role, None): - return address_set.pop() - return None - - def _remote_data_ready(self, relation: ops.Relation) -> bool: - """Verify that each worker unit and the worker leader have published their data to the cluster relation. - - - unit address is published - - roles are published - """ - if not relation.app or not relation.units or not relation.data: - return False - - # check if unit data is published - for worker_unit in relation.units: - try: - ClusterRequirerUnitData.load(relation.data[worker_unit]) - except DataValidationError: - return False - - # check if app data is published - if self._charm.unit.is_leader(): - try: - ClusterRequirerAppData.load(relation.data[relation.app]) - except DataValidationError: - return False - - return True - - -class ClusterRequirer(Object): - """``-cluster`` requirer endpoint wrapper.""" - - on = ClusterRequirerEvents() # type: ignore - - def __init__( - self, - charm: ops.CharmBase, - key: Optional[str] = None, - endpoint: str = DEFAULT_ENDPOINT_NAME, - ): - super().__init__(charm, key or endpoint) - self._charm = charm - self.juju_topology = cosl.JujuTopology.from_charm(self._charm) - - relation = self.model.get_relation(endpoint) - self.relation: Optional[ops.Relation] = ( - relation if relation and relation.app and relation.data else None - ) - - self.framework.observe( - self._charm.on[endpoint].relation_changed, self._on_cluster_relation_changed # type: ignore - ) - self.framework.observe( - self._charm.on[endpoint].relation_created, self._on_cluster_relation_created # type: ignore - ) - self.framework.observe( - self._charm.on[endpoint].relation_broken, self._on_cluster_relation_broken # type: ignore - ) - - def _on_cluster_relation_broken(self, _event: ops.RelationBrokenEvent): - self.on.removed.emit() - - def _on_cluster_relation_created(self, event: ops.RelationCreatedEvent): - self.on.created.emit(relation=event.relation, app=event.app, unit=event.unit) - - def _on_cluster_relation_changed(self, _event: ops.RelationChangedEvent): - # to prevent the event from firing if the relation is in an unhealthy state (breaking...) - if self.relation: - new_config = self.get_worker_config() - if new_config: - self.on.config_received.emit(new_config) - - # if we have published our data, but we receive an empty/invalid config, - # then the remote end must have removed it. - elif self.is_published(): - self.on.removed.emit() - - def is_published(self): - """Verify that the local side has done all they need to do. - - - unit address is published - - roles are published - """ - relation = self.relation - if not relation: - return False - - unit_data = relation.data[self._charm.unit] - app_data = relation.data[self._charm.app] - - try: - ClusterRequirerUnitData.load(unit_data) - if self._charm.unit.is_leader(): - ClusterRequirerAppData.load(app_data) - except DataValidationError as e: - log.info(f"invalid databag contents: {e}") - return False - return True - - def publish_unit_address(self, url: str): - """Publish this unit's URL via the unit databag.""" - try: - urlparse(url) - except Exception as e: - raise ValueError(f"{url} is an invalid url") from e - - databag_model = ClusterRequirerUnitData( - juju_topology=dict(self.juju_topology.as_dict()), # type: ignore - address=url, - ) - relation = self.relation - if relation: - unit_databag = relation.data[self.model.unit] # type: ignore # all checks are done in __init__ - databag_model.dump(unit_databag) - - def publish_app_roles(self, roles: Iterable[str]): - """Publish this application's roles via the application databag.""" - if not self._charm.unit.is_leader(): - raise DatabagAccessPermissionError("only the leader unit can publish roles.") - - relation = self.relation - if relation: - databag_model = ClusterRequirerAppData(role=",".join(roles)) - databag_model.dump(relation.data[self.model.app]) - - def _get_data_from_coordinator(self) -> Optional[ClusterProviderAppData]: - """Fetch the contents of the doordinator databag.""" - data: Optional[ClusterProviderAppData] = None - relation = self.relation - # TODO: does this need a leader guard ??? maybe? - if relation: - try: - databag = relation.data[relation.app] # type: ignore # all checks are done in __init__ - coordinator_databag = ClusterProviderAppData.load(databag) - data = coordinator_databag - except DataValidationError as e: - log.info(f"invalid databag contents: {e}") - return None # explicit is better than implicit - - return data - - def get_worker_config(self) -> Dict[str, Any]: - """Fetch the worker config from the coordinator databag.""" - data = self._get_data_from_coordinator() - if data: - return yaml.safe_load(data.worker_config) - return {} - - def get_loki_endpoints(self) -> Dict[str, str]: - """Fetch the loki endpoints from the coordinator databag.""" - data = self._get_data_from_coordinator() - if data: - return data.loki_endpoints or {} - return {} - - def get_tls_data(self, allow_none: bool = False) -> Optional[TLSData]: - """Fetch certificates and the private key secrets id for the worker config.""" - data = self._get_data_from_coordinator() - if not data: - return None - - if ( - not data.ca_cert or not data.server_cert or not data.privkey_secret_id - ) and not allow_none: - return None - - return TLSData( - ca_cert=data.ca_cert, - server_cert=data.server_cert, - privkey_secret_id=data.privkey_secret_id, - s3_tls_ca_chain=data.s3_tls_ca_chain, - ) - - def get_charm_tracing_receivers(self) -> Dict[str, str]: - """Fetch the charm tracing receivers from the coordinator databag.""" - data = self._get_data_from_coordinator() - if data: - return data.charm_tracing_receivers or {} - return {} - - def get_workload_tracing_receivers(self) -> Dict[str, str]: - """Fetch the workload tracing receivers from the coordinator databag.""" - data = self._get_data_from_coordinator() - if data: - return data.workload_tracing_receivers or {} - return {} - - def get_remote_write_endpoints(self) -> List[RemoteWriteEndpoint]: - """Fetch the remote write endpoints from the coordinator databag.""" - data = self._get_data_from_coordinator() - if data: - return data.remote_write_endpoints or [] - return [] +logger = getLogger("interface-deprecated") +logger.warning( + "this module has been deprecated and may be removed in a future version: " + "please use cosl.interfaces.cluster and cosl.interfaces.utils instead." +) diff --git a/src/cosl/coordinated_workers/worker.py b/src/cosl/coordinated_workers/worker.py index 571889a..2c0cebd 100644 --- a/src/cosl/coordinated_workers/worker.py +++ b/src/cosl/coordinated_workers/worker.py @@ -22,8 +22,8 @@ from ops.pebble import Check, Layer, PathError, Plan, ProtocolError from cosl import JujuTopology -from cosl.coordinated_workers.interface import ClusterRequirer, TLSData from cosl.helpers import check_libs_installed +from cosl.interfaces.cluster import ClusterRequirer, TLSData check_libs_installed( "charms.loki_k8s.v1.loki_push_api", diff --git a/src/cosl/interfaces/__init__.py b/src/cosl/interfaces/__init__.py new file mode 100644 index 0000000..39356be --- /dev/null +++ b/src/cosl/interfaces/__init__.py @@ -0,0 +1,5 @@ +#!/usr/bin/env python3 +# Copyright 2024 Canonical +# See LICENSE file for licensing details. + +"""Interfaces module.""" diff --git a/src/cosl/interfaces/cluster.py b/src/cosl/interfaces/cluster.py new file mode 100644 index 0000000..abb9bdb --- /dev/null +++ b/src/cosl/interfaces/cluster.py @@ -0,0 +1,533 @@ +#!/usr/bin/env python3 +# Copyright 2024 Canonical +# See LICENSE file for licensing details. + +"""Shared utilities for the coordinator -> worker "cluster" interface. + +As this relation is cluster-internal and not intended for third-party charms to interact with +`-coordinator-k8s`, its only user will be the -worker-k8s charm. As such, +it does not live in a charm lib as most other relation endpoint wrappers do. +""" + +import collections +import json +import logging +from typing import ( + Any, + Counter, + Dict, + FrozenSet, + Iterable, + List, + Mapping, + NamedTuple, + Optional, + Set, + Tuple, +) +from urllib.parse import urlparse + +import ops +import pydantic +import yaml +from ops import EventSource, Object, ObjectEvents, RelationCreatedEvent +from typing_extensions import TypedDict + +import cosl +import cosl.interfaces.utils + +log = logging.getLogger("_cluster") + +DEFAULT_ENDPOINT_NAME = "-cluster" + + +# ============= +# | Interface | +# ============= + + +class RemoteWriteEndpoint(TypedDict): + """Type of the remote write endpoints to be passed to the worker through cluster relation data.""" + + url: str + + +class ConfigReceivedEvent(ops.EventBase): + """Event emitted when the "-cluster" provider has shared a new config.""" + + config: Dict[str, Any] + """The config.""" + + def __init__(self, handle: ops.framework.Handle, config: Dict[str, Any]): + super().__init__(handle) + self.config = config + + def snapshot(self) -> Dict[str, Any]: + """Used by the framework to serialize the event to disk. + + Not meant to be called by charm code. + """ + return {"config": json.dumps(self.config)} + + def restore(self, snapshot: Dict[str, Any]): + """Used by the framework to deserialize the event from disk. + + Not meant to be called by charm code. + """ + self.relation = json.loads(snapshot["config"]) # noqa + + +class ClusterError(Exception): + """Base class for exceptions raised by this module.""" + + +class DatabagAccessPermissionError(ClusterError): + """Raised when a follower attempts to write leader settings.""" + + +class _Topology(pydantic.BaseModel): + """Juju topology information.""" + + application: str + unit: str + charm_name: str + + +class ClusterRequirerAppData(cosl.interfaces.utils.DatabagModel): + """App data that the worker sends to the coordinator.""" + + role: str + + +class ClusterRequirerUnitData(cosl.interfaces.utils.DatabagModel): + """Unit data the worker sends to the coordinator.""" + + juju_topology: _Topology + address: str + + +class ClusterProviderAppData(cosl.interfaces.utils.DatabagModel): + """App data that the coordinator sends to the worker.""" + + ### worker node configuration + worker_config: str + """The whole worker workload configuration, whatever it is. E.g. yaml-encoded things.""" + + ### self-monitoring stuff + loki_endpoints: Optional[Dict[str, str]] = None + """Endpoints to which the workload (and the worker charm) can push logs to.""" + charm_tracing_receivers: Optional[Dict[str, str]] = None + """Endpoints to which the the worker charm can push charm traces to.""" + workload_tracing_receivers: Optional[Dict[str, str]] = None + """Endpoints to which the the worker can push workload traces to.""" + remote_write_endpoints: Optional[List[RemoteWriteEndpoint]] = None + """Endpoints to which the workload (and the worker charm) can push metrics to.""" + + ### TLS stuff + ca_cert: Optional[str] = None + server_cert: Optional[str] = None + privkey_secret_id: Optional[str] = None + s3_tls_ca_chain: Optional[str] = None + + +class TLSData(NamedTuple): + """Section of the cluster data that concerns TLS information.""" + + ca_cert: Optional[str] + server_cert: Optional[str] + privkey_secret_id: Optional[str] + s3_tls_ca_chain: Optional[str] + + +class ClusterChangedEvent(ops.EventBase): + """Event emitted when any "-cluster" relation event fires.""" + + +class ClusterRemovedEvent(ops.EventBase): + """Event emitted when the relation with the "-cluster" provider has been severed. + + Or when the relation data has been wiped. + """ + + +class ClusterProviderEvents(ObjectEvents): + """Events emitted by the ClusterProvider "-cluster" endpoint wrapper.""" + + changed = EventSource(ClusterChangedEvent) + + +class ClusterRequirerEvents(ObjectEvents): + """Events emitted by the ClusterRequirer "-cluster" endpoint wrapper.""" + + config_received = EventSource(ConfigReceivedEvent) + created = EventSource(RelationCreatedEvent) + removed = EventSource(ClusterRemovedEvent) + + +class ClusterProvider(Object): + """``-cluster`` provider endpoint wrapper.""" + + on = ClusterProviderEvents() # type: ignore + + def __init__( + self, + charm: ops.CharmBase, + roles: FrozenSet[str], + meta_roles: Optional[Mapping[str, Iterable[str]]] = None, + key: Optional[str] = None, + endpoint: str = DEFAULT_ENDPOINT_NAME, + ): + super().__init__(charm, key or endpoint) + self._charm = charm + self._roles = roles + self._meta_roles = meta_roles or {} + self.juju_topology = cosl.JujuTopology.from_charm(self._charm) + + # filter out common unhappy relation states + self._relations: List[ops.Relation] = [ + rel for rel in self.model.relations[endpoint] if (rel.app and rel.data) + ] + + # we coalesce all -cluster-relation-* events into a single cluster-changed API. + # the coordinator uses a common exit hook reconciler, that's why. + self.framework.observe(self._charm.on[endpoint].relation_created, self._on_cluster_changed) + self.framework.observe(self._charm.on[endpoint].relation_joined, self._on_cluster_changed) + self.framework.observe(self._charm.on[endpoint].relation_changed, self._on_cluster_changed) + self.framework.observe( + self._charm.on[endpoint].relation_departed, self._on_cluster_changed + ) + self.framework.observe(self._charm.on[endpoint].relation_broken, self._on_cluster_changed) + + def _on_cluster_changed(self, _: ops.EventBase) -> None: + self.on.changed.emit() + + def grant_privkey(self, label: str) -> str: + """Grant the secret containing the privkey to all relations, and return the secret ID.""" + secret = self.model.get_secret(label=label) + for relation in self._relations: + secret.grant(relation) + # can't return secret.id because secret was obtained by label, and so + # we don't have an ID unless we fetch it + return secret.get_info().id + + def publish_data( + self, + worker_config: str, + ca_cert: Optional[str] = None, + server_cert: Optional[str] = None, + s3_tls_ca_chain: Optional[str] = None, + privkey_secret_id: Optional[str] = None, + loki_endpoints: Optional[Dict[str, str]] = None, + charm_tracing_receivers: Optional[Dict[str, str]] = None, + workload_tracing_receivers: Optional[Dict[str, str]] = None, + remote_write_endpoints: Optional[List[RemoteWriteEndpoint]] = None, + ) -> None: + """Publish the config to all related worker clusters.""" + for relation in self._relations: + if relation and self._remote_data_ready(relation): + local_app_databag = ClusterProviderAppData( + worker_config=worker_config, + loki_endpoints=loki_endpoints, + ca_cert=ca_cert, + server_cert=server_cert, + privkey_secret_id=privkey_secret_id, + charm_tracing_receivers=charm_tracing_receivers, + workload_tracing_receivers=workload_tracing_receivers, + remote_write_endpoints=remote_write_endpoints, + s3_tls_ca_chain=s3_tls_ca_chain, + ) + local_app_databag.dump(relation.data[self.model.app]) + + @property + def has_workers(self) -> bool: + """Return True if the coordinator is connected to any worker.""" + # we use the presence of relations instead of addresses, because we want this + # check to fail early + return bool(self._relations) + + def _expand_roles(self, role_string: str) -> Set[str]: + """Expand the meta-roles from a comma-separated list of roles.""" + expanded_roles: Set[str] = set() + for role in role_string.split(","): + if role in self._meta_roles: + expanded_roles.update(self._meta_roles[role]) + else: + expanded_roles.update({role}) + return expanded_roles + + def gather_addresses_by_role(self) -> Dict[str, Set[str]]: + """Go through the worker's unit databags to collect all the addresses published by the units, by role.""" + data: Dict[str, Set[str]] = collections.defaultdict(set) + for relation in self._relations: + + if not relation.app: + log.debug(f"skipped {relation} as .app is None") + continue + + try: + worker_app_data = ClusterRequirerAppData.load(relation.data[relation.app]) + except cosl.interfaces.utils.DataValidationError as e: + log.info(f"invalid databag contents: {e}") + continue + + for worker_unit in relation.units: + try: + worker_data = ClusterRequirerUnitData.load(relation.data[worker_unit]) + unit_address = worker_data.address + for role in self._expand_roles(worker_app_data.role): + data[role].add(unit_address) + except cosl.interfaces.utils.DataValidationError as e: + log.info(f"invalid databag contents: {e}") + continue + return data + + def gather_addresses(self) -> Tuple[str, ...]: + """Go through the worker's unit databags to collect all the addresses published by the units.""" + data: Set[str] = set() + addresses_by_role = self.gather_addresses_by_role() + for _, address_set in addresses_by_role.items(): + data.update(address_set) + + return tuple(sorted(data)) + + def gather_roles(self) -> Dict[str, int]: + """Go through the worker's app databags and sum the available application roles.""" + data: Counter[str] = collections.Counter() + for relation in self._relations: + if relation.app: + remote_app_databag = relation.data[relation.app] + try: + worker_role: str = ClusterRequirerAppData.load(remote_app_databag).role + except cosl.interfaces.utils.DataValidationError as e: + log.error(f"invalid databag contents: {e}") + continue + + # the number of units with each role is the number of remote units + role_n = len(relation.units) # exclude this unit + for role in self._expand_roles(worker_role): + data[role] += role_n + + dct = dict(data) + return dct + + def gather_topology(self) -> List[Dict[str, str]]: + """Gather Juju topology by unit.""" + data: List[Dict[str, str]] = [] + for relation in self._relations: + if not relation.app: + continue + + for worker_unit in relation.units: + try: + worker_data = ClusterRequirerUnitData.load(relation.data[worker_unit]) + unit_address = worker_data.address + except cosl.interfaces.utils.DataValidationError as e: + log.info(f"invalid databag contents: {e}") + continue + worker_topology = { + "address": unit_address, + "application": worker_data.juju_topology.application, + "unit": worker_data.juju_topology.unit, + "charm_name": worker_data.juju_topology.charm_name, + } + data.append(worker_topology) + + return data + + def get_address_from_role(self, role: str) -> Optional[str]: + """Get datasource address.""" + addresses_by_role = self.gather_addresses_by_role() + if address_set := addresses_by_role.get(role, None): + return address_set.pop() + return None + + def _remote_data_ready(self, relation: ops.Relation) -> bool: + """Verify that each worker unit and the worker leader have published their data to the cluster relation. + + - unit address is published + - roles are published + """ + if not relation.app or not relation.units or not relation.data: + return False + + # check if unit data is published + for worker_unit in relation.units: + try: + ClusterRequirerUnitData.load(relation.data[worker_unit]) + except cosl.interfaces.utils.DataValidationError: + return False + + # check if app data is published + if self._charm.unit.is_leader(): + try: + ClusterRequirerAppData.load(relation.data[relation.app]) + except cosl.interfaces.utils.DataValidationError: + return False + + return True + + +class ClusterRequirer(Object): + """``-cluster`` requirer endpoint wrapper.""" + + on = ClusterRequirerEvents() # type: ignore + + def __init__( + self, + charm: ops.CharmBase, + key: Optional[str] = None, + endpoint: str = DEFAULT_ENDPOINT_NAME, + ): + super().__init__(charm, key or endpoint) + self._charm = charm + self.juju_topology = cosl.JujuTopology.from_charm(self._charm) + + relation = self.model.get_relation(endpoint) + self.relation: Optional[ops.Relation] = ( + relation if relation and relation.app and relation.data else None + ) + + self.framework.observe( + self._charm.on[endpoint].relation_changed, self._on_cluster_relation_changed # type: ignore + ) + self.framework.observe( + self._charm.on[endpoint].relation_created, self._on_cluster_relation_created # type: ignore + ) + self.framework.observe( + self._charm.on[endpoint].relation_broken, self._on_cluster_relation_broken # type: ignore + ) + + def _on_cluster_relation_broken(self, _event: ops.RelationBrokenEvent): + self.on.removed.emit() + + def _on_cluster_relation_created(self, event: ops.RelationCreatedEvent): + self.on.created.emit(relation=event.relation, app=event.app, unit=event.unit) + + def _on_cluster_relation_changed(self, _event: ops.RelationChangedEvent): + # to prevent the event from firing if the relation is in an unhealthy state (breaking...) + if self.relation: + new_config = self.get_worker_config() + if new_config: + self.on.config_received.emit(new_config) + + # if we have published our data, but we receive an empty/invalid config, + # then the remote end must have removed it. + elif self.is_published(): + self.on.removed.emit() + + def is_published(self): + """Verify that the local side has done all they need to do. + + - unit address is published + - roles are published + """ + relation = self.relation + if not relation: + return False + + unit_data = relation.data[self._charm.unit] + app_data = relation.data[self._charm.app] + + try: + ClusterRequirerUnitData.load(unit_data) + if self._charm.unit.is_leader(): + ClusterRequirerAppData.load(app_data) + except cosl.interfaces.utils.DataValidationError as e: + log.info(f"invalid databag contents: {e}") + return False + return True + + def publish_unit_address(self, url: str): + """Publish this unit's URL via the unit databag.""" + try: + urlparse(url) + except Exception as e: + raise ValueError(f"{url} is an invalid url") from e + + databag_model = ClusterRequirerUnitData( + juju_topology=dict(self.juju_topology.as_dict()), # type: ignore + address=url, + ) + relation = self.relation + if relation: + unit_databag = relation.data[self.model.unit] # type: ignore # all checks are done in __init__ + databag_model.dump(unit_databag) + + def publish_app_roles(self, roles: Iterable[str]): + """Publish this application's roles via the application databag.""" + if not self._charm.unit.is_leader(): + raise DatabagAccessPermissionError("only the leader unit can publish roles.") + + relation = self.relation + if relation: + databag_model = ClusterRequirerAppData(role=",".join(roles)) + databag_model.dump(relation.data[self.model.app]) + + def _get_data_from_coordinator(self) -> Optional[ClusterProviderAppData]: + """Fetch the contents of the doordinator databag.""" + data: Optional[ClusterProviderAppData] = None + relation = self.relation + # TODO: does this need a leader guard ??? maybe? + if relation: + try: + databag = relation.data[relation.app] # type: ignore # all checks are done in __init__ + coordinator_databag = ClusterProviderAppData.load(databag) + data = coordinator_databag + except cosl.interfaces.utils.DataValidationError as e: + log.info(f"invalid databag contents: {e}") + return None # explicit is better than implicit + + return data + + def get_worker_config(self) -> Dict[str, Any]: + """Fetch the worker config from the coordinator databag.""" + data = self._get_data_from_coordinator() + if data: + return yaml.safe_load(data.worker_config) + return {} + + def get_loki_endpoints(self) -> Dict[str, str]: + """Fetch the loki endpoints from the coordinator databag.""" + data = self._get_data_from_coordinator() + if data: + return data.loki_endpoints or {} + return {} + + def get_tls_data(self, allow_none: bool = False) -> Optional[TLSData]: + """Fetch certificates and the private key secrets id for the worker config.""" + data = self._get_data_from_coordinator() + if not data: + return None + + if ( + not data.ca_cert or not data.server_cert or not data.privkey_secret_id + ) and not allow_none: + return None + + return TLSData( + ca_cert=data.ca_cert, + server_cert=data.server_cert, + privkey_secret_id=data.privkey_secret_id, + s3_tls_ca_chain=data.s3_tls_ca_chain, + ) + + def get_charm_tracing_receivers(self) -> Dict[str, str]: + """Fetch the charm tracing receivers from the coordinator databag.""" + data = self._get_data_from_coordinator() + if data: + return data.charm_tracing_receivers or {} + return {} + + def get_workload_tracing_receivers(self) -> Dict[str, str]: + """Fetch the workload tracing receivers from the coordinator databag.""" + data = self._get_data_from_coordinator() + if data: + return data.workload_tracing_receivers or {} + return {} + + def get_remote_write_endpoints(self) -> List[RemoteWriteEndpoint]: + """Fetch the remote write endpoints from the coordinator databag.""" + data = self._get_data_from_coordinator() + if data: + return data.remote_write_endpoints or [] + return [] diff --git a/src/cosl/interfaces/utils.py b/src/cosl/interfaces/utils.py new file mode 100644 index 0000000..16b1557 --- /dev/null +++ b/src/cosl/interfaces/utils.py @@ -0,0 +1,83 @@ +#!/usr/bin/env python3 +# Copyright 2024 Canonical +# See LICENSE file for licensing details. + +"""Shared utilities for the cosl interfaces.""" + +import json +import logging +from typing import ( + MutableMapping, + Optional, +) + +import pydantic +from pydantic import ConfigDict + +log = logging.getLogger("utils") + +BUILTIN_JUJU_KEYS = {"ingress-address", "private-address", "egress-subnets"} + +# ================= +# | Databag Model | +# ================= + +# Note: MutableMapping is imported from the typing module and not collections.abc +# because subscripting collections.abc.MutableMapping was added in python 3.9, but +# most of our charms are based on 20.04, which has python 3.8. + +_RawDatabag = MutableMapping[str, str] + + +class DataValidationError(Exception): + """Raised when relation databag validation fails.""" + + +class DatabagModel(pydantic.BaseModel): + """Base databag model.""" + + model_config = ConfigDict( + # tolerate additional keys in databag + extra="ignore", + # Allow instantiating this class by field name (instead of forcing alias). + populate_by_name=True, + ) # type: ignore + """Pydantic config.""" + + @classmethod + def load(cls, databag: _RawDatabag): + """Load this model from a Juju databag.""" + try: + data = { + k: json.loads(v) + for k, v in databag.items() + # Don't attempt to parse model-external values + if k in {(f.alias or n) for n, f in cls.__fields__.items()} # type: ignore + } + except json.JSONDecodeError as e: + msg = f"invalid databag contents: expecting json. {databag}" + log.error(msg) + raise DataValidationError(msg) from e + + try: + return cls.model_validate_json(json.dumps(data)) # type: ignore + except pydantic.ValidationError as e: + msg = f"failed to validate databag: {databag}" + if databag: + log.debug(msg, exc_info=True) + raise DataValidationError(msg) from e + + def dump(self, databag: Optional[_RawDatabag] = None, clear: bool = True) -> _RawDatabag: + """Write the contents of this model to Juju databag. + + :param databag: the databag to write the data to. + :param clear: ensure the databag is cleared before writing it. + """ + _databag: _RawDatabag = {} if databag is None else databag + + if clear: + _databag.clear() + + dct = self.model_dump(mode="json", by_alias=True, exclude_defaults=True) # type: ignore + _databag.update({k: json.dumps(v) for k, v in dct.items()}) + return _databag diff --git a/tests/test_coordinated_workers/test_coordinator.py b/tests/test_coordinated_workers/test_coordinator.py index ca4c466..e79103f 100644 --- a/tests/test_coordinated_workers/test_coordinator.py +++ b/tests/test_coordinated_workers/test_coordinator.py @@ -10,7 +10,7 @@ Coordinator, S3NotFoundError, ) -from src.cosl.coordinated_workers.interface import ClusterRequirerAppData +from src.cosl.interfaces.cluster import ClusterRequirerAppData @pytest.fixture diff --git a/tests/test_coordinated_workers/test_coordinator_status.py b/tests/test_coordinated_workers/test_coordinator_status.py index 63232c2..9424ad5 100644 --- a/tests/test_coordinated_workers/test_coordinator_status.py +++ b/tests/test_coordinated_workers/test_coordinator_status.py @@ -9,7 +9,7 @@ from ops import testing from cosl.coordinated_workers.coordinator import ClusterRolesConfig, Coordinator -from cosl.coordinated_workers.interface import ClusterProviderAppData, ClusterRequirerAppData +from cosl.interfaces.cluster import ClusterProviderAppData, ClusterRequirerAppData from tests.test_coordinated_workers.test_worker_status import k8s_patch my_roles = ClusterRolesConfig( diff --git a/tests/test_coordinated_workers/test_interface.py b/tests/test_coordinated_workers/test_interface.py index b9f19cd..08e7b2f 100644 --- a/tests/test_coordinated_workers/test_interface.py +++ b/tests/test_coordinated_workers/test_interface.py @@ -1,4 +1,4 @@ -from cosl.coordinated_workers.interface import DatabagModel +from cosl.interfaces.utils import DatabagModel def test_databag_dump_update(): diff --git a/tests/test_coordinated_workers/test_worker_status.py b/tests/test_coordinated_workers/test_worker_status.py index 3ffe71f..77912da 100644 --- a/tests/test_coordinated_workers/test_worker_status.py +++ b/tests/test_coordinated_workers/test_worker_status.py @@ -8,11 +8,11 @@ import tenacity from ops import testing -from cosl.coordinated_workers.interface import ClusterProviderAppData from cosl.coordinated_workers.worker import ( NoReadinessCheckEndpointConfiguredError, Worker, ) +from cosl.interfaces.cluster import ClusterProviderAppData @pytest.fixture(params=[True, False]) diff --git a/tests/test_databag_model.py b/tests/test_databag_model.py index dcf17c1..cb967b7 100644 --- a/tests/test_databag_model.py +++ b/tests/test_databag_model.py @@ -2,7 +2,7 @@ import pytest -from cosl.coordinated_workers.interface import DatabagModel, DataValidationError +from cosl.interfaces.utils import DatabagModel, DataValidationError class MyDataModel(DatabagModel): From 220ecf569dc475ca886f3b8ba35bda52522b60eb Mon Sep 17 00:00:00 2001 From: Pietro Pasotti Date: Tue, 3 Dec 2024 15:14:30 +0100 Subject: [PATCH 3/9] ds exchange interface implementation --- src/cosl/interfaces/datasource_exchange.py | 116 +++++++++++++++++++++ src/cosl/interfaces/utils.py | 42 ++++++++ tests/test_datasource_exchange.py | 114 ++++++++++++++++++++ 3 files changed, 272 insertions(+) create mode 100644 src/cosl/interfaces/datasource_exchange.py create mode 100644 tests/test_datasource_exchange.py diff --git a/src/cosl/interfaces/datasource_exchange.py b/src/cosl/interfaces/datasource_exchange.py new file mode 100644 index 0000000..9d545ad --- /dev/null +++ b/src/cosl/interfaces/datasource_exchange.py @@ -0,0 +1,116 @@ +#!/usr/bin/env python3 +# Copyright 2024 Canonical +# See LICENSE file for licensing details. + +"""Shared utilities for the inter-coordinator "grafana_datasource_exchange" interface. + +See https://github.com/canonical/charm-relation-interfaces/pull/207 for the interface specification. +# TODO update when pr merged +""" + +import collections +import json +import logging +from itertools import chain +from typing import ( + Iterable, + List, + Tuple, +) + +import ops +from interfaces.grafana_datasource_exchange.v0.schema import GrafanaSourceAppData, GrafanaDatasource +from ops import CharmBase +from typing_extensions import TypedDict + +import cosl.interfaces.utils +from cosl.interfaces.utils import DataValidationError + +log = logging.getLogger("_cluster") + +DEFAULT_PROVIDE_ENDPOINT_NAME = "provide-ds-exchange" +DEFAULT_REQUIRE_ENDPOINT_NAME = "require-ds-exchange" +DS_EXCHANGE_INTERFACE_NAME = "grafana_datasource_exchange" + + +class DSExchangeAppData(cosl.interfaces.utils._DatabagModelV2, GrafanaSourceAppData): + """App databag schema for both sides of this interface.""" + + +class DatasourceDict(TypedDict): + """Raw datasource information.""" + type: str + uid: str + + +class EndpointValidationError(ValueError): + """Raised if an endpoint name is invalid.""" + + +def _validate_endpoints(charm: CharmBase, provider_endpoint: str, requirer_endpoint: str): + meta = charm.meta + for endpoint, source in ((provider_endpoint, meta.provides), + (requirer_endpoint, meta.requires)): + if endpoint not in source: + raise EndpointValidationError(f"endpoint {endpoint!r} not declared in charm metadata") + interface_name = source[endpoint].interface_name + if interface_name != DS_EXCHANGE_INTERFACE_NAME: + raise EndpointValidationError( + f"endpoint {endpoint} has unexpected interface {interface_name!r} " + f"(should be {DS_EXCHANGE_INTERFACE_NAME})." + ) + + +class DatasourceExchange: + """``grafana_datasource_exchange`` interface endpoint wrapper (provider AND requirer).""" + + def __init__( + self, + charm: ops.CharmBase, + *, + provider_endpoint: str = DEFAULT_PROVIDE_ENDPOINT_NAME, + requirer_endpoint: str = DEFAULT_REQUIRE_ENDPOINT_NAME, + ): + self._charm = charm + _validate_endpoints(charm, provider_endpoint, requirer_endpoint) + + # gather all relations, provider or requirer + all_relations = chain( + charm.model.relations[provider_endpoint], + charm.model.relations[requirer_endpoint] + ) + + # filter out some common unhappy relation states + self._relations: List[ops.Relation] = [ + rel for rel in all_relations if (rel.app and rel.data) + ] + + def submit(self, raw_datasources: Iterable[DatasourceDict]): + """Submit these datasources to all remotes. + + This operation is leader-only. + """ + # sort by UID to prevent endless relation-changed cascades if this keeps flapping + app_data = DSExchangeAppData(datasources=json.dumps(sorted(raw_datasources, key=lambda raw_ds: raw_ds['uid']))) + + for relation in self._relations: + app_data.dump(relation.data[self._charm.app]) + + @property + def received_datasources(self) -> Tuple[GrafanaDatasource, ...]: + """Collect all datasources that the remotes have shared. + + This operation is leader-only. + """ + + datasources: List[GrafanaDatasource] = [] + + for relation in self._relations: + try: + datasource = DSExchangeAppData.load(relation.data[relation.app]) + except DataValidationError: + # load() already logs something in this case + continue + + datasources.extend(datasource.datasources) + return tuple(sorted(datasources, key=lambda ds: ds.uid)) diff --git a/src/cosl/interfaces/utils.py b/src/cosl/interfaces/utils.py index 16b1557..f15a1c9 100644 --- a/src/cosl/interfaces/utils.py +++ b/src/cosl/interfaces/utils.py @@ -81,3 +81,45 @@ def dump(self, databag: Optional[_RawDatabag] = None, clear: bool = True) -> _Ra dct = self.model_dump(mode="json", by_alias=True, exclude_defaults=True) # type: ignore _databag.update({k: json.dumps(v) for k, v in dct.items()}) return _databag + + +# FIXME: in pydantic v2, the json stuff we've been doing is no longer necessary. +# It becomes much easier to work with Json fields and the databagmodel class becomes much simpler. +# We should rewrite the cluster implementation to use this class, +# and replace the original DatabagModel with it +class _DatabagModelV2(pydantic.BaseModel): + """Base databag model.""" + + model_config = ConfigDict( + # tolerate additional keys in databag + extra="ignore", + # Allow instantiating this class by field name (instead of forcing alias). + populate_by_name=True, + ) # type: ignore + """Pydantic config.""" + + @classmethod + def load(cls, databag: _RawDatabag): + """Load this model from a Juju databag.""" + try: + return cls.model_validate_json(json.dumps(dict(databag))) # type: ignore + except pydantic.ValidationError as e: + msg = f"failed to validate databag: {databag}" + if databag: + log.debug(msg, exc_info=True) + raise DataValidationError(msg) from e + + def dump(self, databag: Optional[_RawDatabag] = None, clear: bool = True) -> _RawDatabag: + """Write the contents of this model to Juju databag. + + :param databag: the databag to write the data to. + :param clear: ensure the databag is cleared before writing it. + """ + _databag: _RawDatabag = {} if databag is None else databag + + if clear: + _databag.clear() + + dct = self.model_dump(mode="json", by_alias=True, exclude_defaults=True, round_trip=True) # type: ignore + _databag.update(dct) + return _databag diff --git a/tests/test_datasource_exchange.py b/tests/test_datasource_exchange.py new file mode 100644 index 0000000..aa6af5e --- /dev/null +++ b/tests/test_datasource_exchange.py @@ -0,0 +1,114 @@ +import json + +import pytest +from interfaces.grafana_datasource_exchange.v0.schema import GrafanaDatasource +from ops import CharmBase, Framework +from scenario import Context, State, Relation +from scenario.errors import UncaughtCharmError + +from cosl.interfaces.datasource_exchange import DatasourceExchange, DSExchangeAppData + + +@pytest.mark.parametrize("meta, invalid_reason", ( + ({ + "requires": {"boo": {"interface": "gibberish"}}, + "provides": {"far": {"interface": "grafana_datasource_exchange"}}, + }, "unexpected interface 'gibberish'"), + ({ + "requires": {"boo": {"interface": "grafana_datasource_exchange"}}, + "provides": {"goo": {"interface": "grafana_datasource_exchange"}}, + }, "endpoint 'far' not declared"), +)) +def test_endpoint_validation(meta, invalid_reason): + class BadCharm(CharmBase): + def __init__(self, framework: Framework): + super().__init__(framework) + self.ds_exchange = DatasourceExchange( + self, + provider_endpoint='far', + requirer_endpoint='boo' + ) + + with pytest.raises(UncaughtCharmError, match=invalid_reason) as e: + ctx = Context(BadCharm, meta={"name": "bob", **meta}) + ctx.run(ctx.on.update_status(), State()) + + +def test_ds_submit(): + # GIVEN a charm with a single datasource_exchange relation + class MyCharm(CharmBase): + META = { + 'name': 'robbie', + "provides": {"foo": {"interface": "grafana_datasource_exchange"}}, + "requires": {"bar": {"interface": "grafana_datasource_exchange"}}, + } + + def __init__(self, framework: Framework): + super().__init__(framework) + self.ds_exchange = DatasourceExchange( + self, + provider_endpoint='foo', + requirer_endpoint='bar' + ) + self.ds_exchange.submit([{'type': 'tempo', 'uid': '123'}]) + + ctx = Context(MyCharm, meta=MyCharm.META) + + dse_in = Relation("foo") + state_in = State(relations={dse_in}, leader=True) + + # WHEN we receive any event + state_out = ctx.run(ctx.on.update_status(), state_in) + + # THEN we publish in our app databags any datasources we're aware of + dse_out = state_out.get_relation(dse_in.id) + assert dse_out.local_app_data + data = DSExchangeAppData.load(dse_out.local_app_data) + assert data.datasources[0].type == 'tempo' + assert data.datasources[0].uid == '123' + + +def test_ds_receive(): + # GIVEN a charm with a single datasource_exchange relation + class MyCharm(CharmBase): + META = { + 'name': 'robbie', + "provides": {"foo": {"interface": "grafana_datasource_exchange"}}, + "requires": {"bar": {"interface": "grafana_datasource_exchange"}}, + } + + def __init__(self, framework: Framework): + super().__init__(framework) + self.ds_exchange = DatasourceExchange( + self, + provider_endpoint='foo', + requirer_endpoint='bar' + ) + + ctx = Context(MyCharm, meta=MyCharm.META) + + ds_requirer_in = [{"type": "c", "uid": "3"}, {"type": "a", "uid": "1"}, {"type": "b", "uid": "2"}] + ds_provider_in = [{"type": "d", "uid": "4"}] + + dse_requirer_in = Relation( + "foo", + remote_app_data=DSExchangeAppData( + datasources=json.dumps(sorted(ds_provider_in, key=lambda raw_ds: raw_ds['uid'])) + ).dump() + ) + dse_provider_in = Relation( + "bar", + remote_app_data=DSExchangeAppData( + datasources=json.dumps(sorted(ds_requirer_in, key=lambda raw_ds: raw_ds['uid'])) + ).dump() + ) + state_in = State(relations={dse_requirer_in, dse_provider_in}, leader=True) + + # WHEN we receive any event + with ctx(ctx.on.update_status(), state_in) as mgr: + # THEN we can access all datasources we're given + dss = mgr.charm.ds_exchange.received_datasources + assert [ds.type for ds in dss] == list("abcd") + assert [ds.uid for ds in dss] == list("1234") + assert isinstance(dss[0], GrafanaDatasource) + From d6777d33df9d8664ec1be9d3a6797557c30d30a4 Mon Sep 17 00:00:00 2001 From: Pietro Pasotti Date: Tue, 3 Dec 2024 15:33:18 +0100 Subject: [PATCH 4/9] typing --- pyproject.toml | 1 + src/cosl/interfaces/datasource_exchange.py | 46 +++++++++----- src/cosl/interfaces/utils.py | 2 +- tests/test_datasource_exchange.py | 70 ++++++++++++---------- 4 files changed, 72 insertions(+), 47 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 41a48f2..5dc3ea6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -90,6 +90,7 @@ classmethod-decorators = ["classmethod", "pydantic.validator"] [tool.pyright] include = ["src"] + extraPaths = ["lib", "src/cosl"] pythonVersion = "3.8" pythonPlatform = "All" diff --git a/src/cosl/interfaces/datasource_exchange.py b/src/cosl/interfaces/datasource_exchange.py index 9d545ad..817d442 100644 --- a/src/cosl/interfaces/datasource_exchange.py +++ b/src/cosl/interfaces/datasource_exchange.py @@ -8,7 +8,18 @@ # TODO update when pr merged """ -import collections + +# FIXME: the interfaces import (because it's a git dep perhaps?) +# can't be type-checked, which breaks everything +# pyright: reportMissingImports=false +# pyright: reportUntypedBaseClass=false +# pyright: reportUnknownLambdaType=false +# pyright: reportUnknownMemberType=false +# pyright: reportUnknownVariableType=false +# pyright: reportUnknownArgumentType=false +# pyright: reportUnknownParameterType=false + + import json import logging from itertools import chain @@ -19,7 +30,10 @@ ) import ops -from interfaces.grafana_datasource_exchange.v0.schema import GrafanaSourceAppData, GrafanaDatasource +from interfaces.grafana_datasource_exchange.v0.schema import ( + GrafanaDatasource, + GrafanaSourceAppData, +) from ops import CharmBase from typing_extensions import TypedDict @@ -33,12 +47,13 @@ DS_EXCHANGE_INTERFACE_NAME = "grafana_datasource_exchange" -class DSExchangeAppData(cosl.interfaces.utils._DatabagModelV2, GrafanaSourceAppData): +class DSExchangeAppData(cosl.interfaces.utils.DatabagModelV2, GrafanaSourceAppData): """App databag schema for both sides of this interface.""" class DatasourceDict(TypedDict): """Raw datasource information.""" + type: str uid: str @@ -49,8 +64,10 @@ class EndpointValidationError(ValueError): def _validate_endpoints(charm: CharmBase, provider_endpoint: str, requirer_endpoint: str): meta = charm.meta - for endpoint, source in ((provider_endpoint, meta.provides), - (requirer_endpoint, meta.requires)): + for endpoint, source in ( + (provider_endpoint, meta.provides), + (requirer_endpoint, meta.requires), + ): if endpoint not in source: raise EndpointValidationError(f"endpoint {endpoint!r} not declared in charm metadata") interface_name = source[endpoint].interface_name @@ -65,19 +82,18 @@ class DatasourceExchange: """``grafana_datasource_exchange`` interface endpoint wrapper (provider AND requirer).""" def __init__( - self, - charm: ops.CharmBase, - *, - provider_endpoint: str = DEFAULT_PROVIDE_ENDPOINT_NAME, - requirer_endpoint: str = DEFAULT_REQUIRE_ENDPOINT_NAME, + self, + charm: ops.CharmBase, + *, + provider_endpoint: str = DEFAULT_PROVIDE_ENDPOINT_NAME, + requirer_endpoint: str = DEFAULT_REQUIRE_ENDPOINT_NAME, ): self._charm = charm _validate_endpoints(charm, provider_endpoint, requirer_endpoint) # gather all relations, provider or requirer all_relations = chain( - charm.model.relations[provider_endpoint], - charm.model.relations[requirer_endpoint] + charm.model.relations[provider_endpoint], charm.model.relations[requirer_endpoint] ) # filter out some common unhappy relation states @@ -91,7 +107,10 @@ def submit(self, raw_datasources: Iterable[DatasourceDict]): This operation is leader-only. """ # sort by UID to prevent endless relation-changed cascades if this keeps flapping - app_data = DSExchangeAppData(datasources=json.dumps(sorted(raw_datasources, key=lambda raw_ds: raw_ds['uid']))) + encoded_datasources = json.dumps(sorted(raw_datasources, key=lambda raw_ds: raw_ds["uid"])) + app_data = DSExchangeAppData( + datasources=encoded_datasources # type: ignore[reportCallIssue] + ) for relation in self._relations: app_data.dump(relation.data[self._charm.app]) @@ -102,7 +121,6 @@ def received_datasources(self) -> Tuple[GrafanaDatasource, ...]: This operation is leader-only. """ - datasources: List[GrafanaDatasource] = [] for relation in self._relations: diff --git a/src/cosl/interfaces/utils.py b/src/cosl/interfaces/utils.py index f15a1c9..40edf51 100644 --- a/src/cosl/interfaces/utils.py +++ b/src/cosl/interfaces/utils.py @@ -87,7 +87,7 @@ def dump(self, databag: Optional[_RawDatabag] = None, clear: bool = True) -> _Ra # It becomes much easier to work with Json fields and the databagmodel class becomes much simpler. # We should rewrite the cluster implementation to use this class, # and replace the original DatabagModel with it -class _DatabagModelV2(pydantic.BaseModel): +class DatabagModelV2(pydantic.BaseModel): """Base databag model.""" model_config = ConfigDict( diff --git a/tests/test_datasource_exchange.py b/tests/test_datasource_exchange.py index aa6af5e..b64e23f 100644 --- a/tests/test_datasource_exchange.py +++ b/tests/test_datasource_exchange.py @@ -3,33 +3,40 @@ import pytest from interfaces.grafana_datasource_exchange.v0.schema import GrafanaDatasource from ops import CharmBase, Framework -from scenario import Context, State, Relation +from scenario import Context, Relation, State from scenario.errors import UncaughtCharmError from cosl.interfaces.datasource_exchange import DatasourceExchange, DSExchangeAppData -@pytest.mark.parametrize("meta, invalid_reason", ( - ({ - "requires": {"boo": {"interface": "gibberish"}}, - "provides": {"far": {"interface": "grafana_datasource_exchange"}}, - }, "unexpected interface 'gibberish'"), - ({ - "requires": {"boo": {"interface": "grafana_datasource_exchange"}}, - "provides": {"goo": {"interface": "grafana_datasource_exchange"}}, - }, "endpoint 'far' not declared"), -)) +@pytest.mark.parametrize( + "meta, invalid_reason", + ( + ( + { + "requires": {"boo": {"interface": "gibberish"}}, + "provides": {"far": {"interface": "grafana_datasource_exchange"}}, + }, + "unexpected interface 'gibberish'", + ), + ( + { + "requires": {"boo": {"interface": "grafana_datasource_exchange"}}, + "provides": {"goo": {"interface": "grafana_datasource_exchange"}}, + }, + "endpoint 'far' not declared", + ), + ), +) def test_endpoint_validation(meta, invalid_reason): class BadCharm(CharmBase): def __init__(self, framework: Framework): super().__init__(framework) self.ds_exchange = DatasourceExchange( - self, - provider_endpoint='far', - requirer_endpoint='boo' + self, provider_endpoint="far", requirer_endpoint="boo" ) - with pytest.raises(UncaughtCharmError, match=invalid_reason) as e: + with pytest.raises(UncaughtCharmError, match=invalid_reason): ctx = Context(BadCharm, meta={"name": "bob", **meta}) ctx.run(ctx.on.update_status(), State()) @@ -38,7 +45,7 @@ def test_ds_submit(): # GIVEN a charm with a single datasource_exchange relation class MyCharm(CharmBase): META = { - 'name': 'robbie', + "name": "robbie", "provides": {"foo": {"interface": "grafana_datasource_exchange"}}, "requires": {"bar": {"interface": "grafana_datasource_exchange"}}, } @@ -46,11 +53,9 @@ class MyCharm(CharmBase): def __init__(self, framework: Framework): super().__init__(framework) self.ds_exchange = DatasourceExchange( - self, - provider_endpoint='foo', - requirer_endpoint='bar' + self, provider_endpoint="foo", requirer_endpoint="bar" ) - self.ds_exchange.submit([{'type': 'tempo', 'uid': '123'}]) + self.ds_exchange.submit([{"type": "tempo", "uid": "123"}]) ctx = Context(MyCharm, meta=MyCharm.META) @@ -64,15 +69,15 @@ def __init__(self, framework: Framework): dse_out = state_out.get_relation(dse_in.id) assert dse_out.local_app_data data = DSExchangeAppData.load(dse_out.local_app_data) - assert data.datasources[0].type == 'tempo' - assert data.datasources[0].uid == '123' + assert data.datasources[0].type == "tempo" + assert data.datasources[0].uid == "123" def test_ds_receive(): # GIVEN a charm with a single datasource_exchange relation class MyCharm(CharmBase): META = { - 'name': 'robbie', + "name": "robbie", "provides": {"foo": {"interface": "grafana_datasource_exchange"}}, "requires": {"bar": {"interface": "grafana_datasource_exchange"}}, } @@ -80,27 +85,29 @@ class MyCharm(CharmBase): def __init__(self, framework: Framework): super().__init__(framework) self.ds_exchange = DatasourceExchange( - self, - provider_endpoint='foo', - requirer_endpoint='bar' + self, provider_endpoint="foo", requirer_endpoint="bar" ) ctx = Context(MyCharm, meta=MyCharm.META) - ds_requirer_in = [{"type": "c", "uid": "3"}, {"type": "a", "uid": "1"}, {"type": "b", "uid": "2"}] + ds_requirer_in = [ + {"type": "c", "uid": "3"}, + {"type": "a", "uid": "1"}, + {"type": "b", "uid": "2"}, + ] ds_provider_in = [{"type": "d", "uid": "4"}] dse_requirer_in = Relation( "foo", remote_app_data=DSExchangeAppData( - datasources=json.dumps(sorted(ds_provider_in, key=lambda raw_ds: raw_ds['uid'])) - ).dump() + datasources=json.dumps(sorted(ds_provider_in, key=lambda raw_ds: raw_ds["uid"])) + ).dump(), ) dse_provider_in = Relation( "bar", remote_app_data=DSExchangeAppData( - datasources=json.dumps(sorted(ds_requirer_in, key=lambda raw_ds: raw_ds['uid'])) - ).dump() + datasources=json.dumps(sorted(ds_requirer_in, key=lambda raw_ds: raw_ds["uid"])) + ).dump(), ) state_in = State(relations={dse_requirer_in, dse_provider_in}, leader=True) @@ -111,4 +118,3 @@ def __init__(self, framework: Framework): assert [ds.type for ds in dss] == list("abcd") assert [ds.uid for ds in dss] == list("1234") assert isinstance(dss[0], GrafanaDatasource) - From 374693255a0b6fd74c98ffb64bdd45ce9ac30edf Mon Sep 17 00:00:00 2001 From: Pietro Pasotti Date: Wed, 4 Dec 2024 12:26:16 +0100 Subject: [PATCH 5/9] add datasource exchange to coordinator --- src/cosl/coordinated_workers/coordinator.py | 64 ++++++++++++--------- src/cosl/interfaces/datasource_exchange.py | 9 ++- 2 files changed, 42 insertions(+), 31 deletions(-) diff --git a/src/cosl/coordinated_workers/coordinator.py b/src/cosl/coordinated_workers/coordinator.py index 9c5afae..a70bb0d 100644 --- a/src/cosl/coordinated_workers/coordinator.py +++ b/src/cosl/coordinated_workers/coordinator.py @@ -39,6 +39,7 @@ ) from cosl.helpers import check_libs_installed from cosl.interfaces.cluster import ClusterProvider, RemoteWriteEndpoint +from cosl.interfaces.datasource_exchange import DatasourceExchange check_libs_installed( "charms.data_platform_libs.v0.s3", @@ -124,12 +125,12 @@ def __post_init__(self): is_minimal_valid = set(self.minimal_deployment).issubset(self.roles) is_recommended_valid = set(self.recommended_deployment).issubset(self.roles) if not all( - [ - are_meta_keys_valid, - are_meta_values_valid, - is_minimal_valid, - is_recommended_valid, - ] + [ + are_meta_keys_valid, + are_meta_values_valid, + is_minimal_valid, + is_recommended_valid, + ] ): raise ClusterRolesConfigError( "Invalid ClusterRolesConfig: The configuration is not coherent." @@ -141,8 +142,8 @@ def is_coherent_with(self, cluster_roles: Iterable[str]) -> bool: def _validate_container_name( - container_name: Optional[str], - resources_requests: Optional[Callable[["Coordinator"], Dict[str, str]]], + container_name: Optional[str], + resources_requests: Optional[Callable[["Coordinator"], Dict[str, str]]], ): """Raise `ValueError` if `resources_requests` is not None and `container_name` is None.""" if resources_requests is not None and container_name is None: @@ -161,6 +162,8 @@ def _validate_container_name( "metrics": str, "charm-tracing": str, "workload-tracing": str, + "provide-datasource-exchange": str, + "require-datasource-exchange": str, "s3": str, }, total=True, @@ -185,22 +188,22 @@ class Coordinator(ops.Object): """ def __init__( - self, - charm: ops.CharmBase, - roles_config: ClusterRolesConfig, - external_url: str, # the ingressed url if we have ingress, else fqdn - worker_metrics_port: int, - endpoints: _EndpointMapping, - nginx_config: Callable[["Coordinator"], str], - workers_config: Callable[["Coordinator"], str], - nginx_options: Optional[NginxMappingOverrides] = None, - is_coherent: Optional[Callable[[ClusterProvider, ClusterRolesConfig], bool]] = None, - is_recommended: Optional[Callable[[ClusterProvider, ClusterRolesConfig], bool]] = None, - resources_limit_options: Optional[_ResourceLimitOptionsMapping] = None, - resources_requests: Optional[Callable[["Coordinator"], Dict[str, str]]] = None, - container_name: Optional[str] = None, - remote_write_endpoints: Optional[Callable[[], List[RemoteWriteEndpoint]]] = None, - workload_tracing_protocols: Optional[List[ReceiverProtocol]] = None, + self, + charm: ops.CharmBase, + roles_config: ClusterRolesConfig, + external_url: str, # the ingressed url if we have ingress, else fqdn + worker_metrics_port: int, + endpoints: _EndpointMapping, + nginx_config: Callable[["Coordinator"], str], + workers_config: Callable[["Coordinator"], str], + nginx_options: Optional[NginxMappingOverrides] = None, + is_coherent: Optional[Callable[[ClusterProvider, ClusterRolesConfig], bool]] = None, + is_recommended: Optional[Callable[[ClusterProvider, ClusterRolesConfig], bool]] = None, + resources_limit_options: Optional[_ResourceLimitOptionsMapping] = None, + resources_requests: Optional[Callable[["Coordinator"], Dict[str, str]]] = None, + container_name: Optional[str] = None, + remote_write_endpoints: Optional[Callable[[], List[RemoteWriteEndpoint]]] = None, + workload_tracing_protocols: Optional[List[ReceiverProtocol]] = None, ): """Constructor for a Coordinator object. @@ -277,6 +280,11 @@ def __init__( ) self.s3_requirer = S3Requirer(self._charm, self._endpoints["s3"]) + self.datasource_exchange = DatasourceExchange( + self._charm, + provider_endpoint=self._endpoints["provide-datasource-exchange"], + requirer_endpoint=self._endpoints["require-datasource-exchange"], + ) self._grafana_dashboards = GrafanaDashboardProvider( self._charm, relation_name=self._endpoints["grafana-dashboards"] @@ -428,10 +436,10 @@ def _internal_url(self) -> str: def tls_available(self) -> bool: """Return True if tls is enabled and the necessary certs are found.""" return ( - self.cert_handler.enabled - and (self.cert_handler.server_cert is not None) - and (self.cert_handler.private_key is not None) # type: ignore - and (self.cert_handler.ca_cert is not None) + self.cert_handler.enabled + and (self.cert_handler.server_cert is not None) + and (self.cert_handler.private_key is not None) # type: ignore + and (self.cert_handler.ca_cert is not None) ) @property diff --git a/src/cosl/interfaces/datasource_exchange.py b/src/cosl/interfaces/datasource_exchange.py index 817d442..fab8417 100644 --- a/src/cosl/interfaces/datasource_exchange.py +++ b/src/cosl/interfaces/datasource_exchange.py @@ -26,7 +26,7 @@ from typing import ( Iterable, List, - Tuple, + Tuple, Optional, ) import ops @@ -85,10 +85,13 @@ def __init__( self, charm: ops.CharmBase, *, - provider_endpoint: str = DEFAULT_PROVIDE_ENDPOINT_NAME, - requirer_endpoint: str = DEFAULT_REQUIRE_ENDPOINT_NAME, + provider_endpoint: Optional[str] = None, + requirer_endpoint: Optional[str] = None, ): self._charm = charm + provider_endpoint = provider_endpoint or DEFAULT_PROVIDE_ENDPOINT_NAME + requirer_endpoint = requirer_endpoint or DEFAULT_REQUIRE_ENDPOINT_NAME + _validate_endpoints(charm, provider_endpoint, requirer_endpoint) # gather all relations, provider or requirer From f17b4dd98e0efcba1c336a78ecdc81e2515dad32 Mon Sep 17 00:00:00 2001 From: Pietro Pasotti Date: Wed, 4 Dec 2024 13:57:33 +0100 Subject: [PATCH 6/9] utests --- pyproject.toml | 2 +- src/cosl/interfaces/datasource_exchange.py | 6 +++--- tests/test_coordinated_workers/test_coordinator.py | 4 ++++ tests/test_coordinated_workers/test_coordinator_status.py | 4 ++++ tests/test_datasource_exchange.py | 2 +- 5 files changed, 13 insertions(+), 5 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 5dc3ea6..7d91f8d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,7 +20,7 @@ dependencies = [ "PyYAML", "typing-extensions", "lightkube>=v0.15.4", - "charm-relation-interfaces @ git+https://github.com/canonical/charm-relation-interfaces@grafana_datasource_exchange", + "charm-relation-interfaces @ git+https://github.com/canonical/charm-relation-interfaces", ] classifiers = [ "Programming Language :: Python :: 3.8", diff --git a/src/cosl/interfaces/datasource_exchange.py b/src/cosl/interfaces/datasource_exchange.py index fab8417..64342a4 100644 --- a/src/cosl/interfaces/datasource_exchange.py +++ b/src/cosl/interfaces/datasource_exchange.py @@ -40,7 +40,7 @@ import cosl.interfaces.utils from cosl.interfaces.utils import DataValidationError -log = logging.getLogger("_cluster") +log = logging.getLogger("datasource_exchange") DEFAULT_PROVIDE_ENDPOINT_NAME = "provide-ds-exchange" DEFAULT_REQUIRE_ENDPOINT_NAME = "require-ds-exchange" @@ -104,13 +104,13 @@ def __init__( rel for rel in all_relations if (rel.app and rel.data) ] - def submit(self, raw_datasources: Iterable[DatasourceDict]): + def publish(self, datasources: Iterable[DatasourceDict]): """Submit these datasources to all remotes. This operation is leader-only. """ # sort by UID to prevent endless relation-changed cascades if this keeps flapping - encoded_datasources = json.dumps(sorted(raw_datasources, key=lambda raw_ds: raw_ds["uid"])) + encoded_datasources = json.dumps(sorted(datasources, key=lambda raw_ds: raw_ds["uid"])) app_data = DSExchangeAppData( datasources=encoded_datasources # type: ignore[reportCallIssue] ) diff --git a/tests/test_coordinated_workers/test_coordinator.py b/tests/test_coordinated_workers/test_coordinator.py index e79103f..cd44789 100644 --- a/tests/test_coordinated_workers/test_coordinator.py +++ b/tests/test_coordinated_workers/test_coordinator.py @@ -79,10 +79,12 @@ class MyCoordinator(ops.CharmBase): "my-charm-tracing": {"interface": "tracing", "limit": 1}, "my-workload-tracing": {"interface": "tracing", "limit": 1}, "my-s3": {"interface": "s3"}, + "my-ds-exchange-require": {"interface": "grafana_datasource_exchange"}, }, "provides": { "my-dashboards": {"interface": "grafana_dashboard"}, "my-metrics": {"interface": "prometheus_scrape"}, + "my-ds-exchange-provide": {"interface": "grafana_datasource_exchange"}, }, "containers": { "nginx": {"type": "oci-image"}, @@ -121,6 +123,8 @@ def __init__(self, framework: ops.Framework): "charm-tracing": "my-charm-tracing", "workload-tracing": "my-workload-tracing", "s3": "my-s3", + "provide-datasource-exchange": "my-ds-exchange-provide", + "require-datasource-exchange": "my-ds-exchange-require", }, nginx_config=lambda coordinator: f"nginx configuration for {coordinator._charm.meta.name}", workers_config=lambda coordinator: f"workers configuration for {coordinator._charm.meta.name}", diff --git a/tests/test_coordinated_workers/test_coordinator_status.py b/tests/test_coordinated_workers/test_coordinator_status.py index 9424ad5..56ad0eb 100644 --- a/tests/test_coordinated_workers/test_coordinator_status.py +++ b/tests/test_coordinated_workers/test_coordinator_status.py @@ -38,6 +38,8 @@ def __init__(self, framework: ops.Framework): "metrics": "metrics-endpoint", "charm-tracing": "self-charm-tracing", "workload-tracing": "self-workload-tracing", + "provide-datasource-exchange": "my-ds-exchange-provide", + "require-datasource-exchange": "my-ds-exchange-require", }, nginx_config=lambda _: "nginx config", workers_config=lambda _: "worker config", @@ -64,11 +66,13 @@ def ctx(coord_charm): "certificates": {"interface": "tls-certificates"}, "self-charm-tracing": {"interface": "tracing", "limit": 1}, "self-workload-tracing": {"interface": "tracing", "limit": 1}, + "my-ds-exchange-require": {"interface": "grafana_datasource_exchange"}, }, "provides": { "cluster": {"interface": "cluster"}, "grafana-dashboard": {"interface": "grafana_dashboard"}, "metrics-endpoint": {"interface": "prometheus_scrape"}, + "my-ds-exchange-provide": {"interface": "grafana_datasource_exchange"}, }, "containers": { "nginx": {"type": "oci-image"}, diff --git a/tests/test_datasource_exchange.py b/tests/test_datasource_exchange.py index b64e23f..7f6e223 100644 --- a/tests/test_datasource_exchange.py +++ b/tests/test_datasource_exchange.py @@ -55,7 +55,7 @@ def __init__(self, framework: Framework): self.ds_exchange = DatasourceExchange( self, provider_endpoint="foo", requirer_endpoint="bar" ) - self.ds_exchange.submit([{"type": "tempo", "uid": "123"}]) + self.ds_exchange.publish([{"type": "tempo", "uid": "123"}]) ctx = Context(MyCharm, meta=MyCharm.META) From 95b3d1099604da4dc8ccc49bcf9a340567d2bcc7 Mon Sep 17 00:00:00 2001 From: Pietro Pasotti Date: Wed, 4 Dec 2024 13:57:52 +0100 Subject: [PATCH 7/9] fmt --- src/cosl/coordinated_workers/coordinator.py | 56 ++++++++++----------- src/cosl/interfaces/datasource_exchange.py | 3 +- 2 files changed, 30 insertions(+), 29 deletions(-) diff --git a/src/cosl/coordinated_workers/coordinator.py b/src/cosl/coordinated_workers/coordinator.py index a70bb0d..640309e 100644 --- a/src/cosl/coordinated_workers/coordinator.py +++ b/src/cosl/coordinated_workers/coordinator.py @@ -125,12 +125,12 @@ def __post_init__(self): is_minimal_valid = set(self.minimal_deployment).issubset(self.roles) is_recommended_valid = set(self.recommended_deployment).issubset(self.roles) if not all( - [ - are_meta_keys_valid, - are_meta_values_valid, - is_minimal_valid, - is_recommended_valid, - ] + [ + are_meta_keys_valid, + are_meta_values_valid, + is_minimal_valid, + is_recommended_valid, + ] ): raise ClusterRolesConfigError( "Invalid ClusterRolesConfig: The configuration is not coherent." @@ -142,8 +142,8 @@ def is_coherent_with(self, cluster_roles: Iterable[str]) -> bool: def _validate_container_name( - container_name: Optional[str], - resources_requests: Optional[Callable[["Coordinator"], Dict[str, str]]], + container_name: Optional[str], + resources_requests: Optional[Callable[["Coordinator"], Dict[str, str]]], ): """Raise `ValueError` if `resources_requests` is not None and `container_name` is None.""" if resources_requests is not None and container_name is None: @@ -188,22 +188,22 @@ class Coordinator(ops.Object): """ def __init__( - self, - charm: ops.CharmBase, - roles_config: ClusterRolesConfig, - external_url: str, # the ingressed url if we have ingress, else fqdn - worker_metrics_port: int, - endpoints: _EndpointMapping, - nginx_config: Callable[["Coordinator"], str], - workers_config: Callable[["Coordinator"], str], - nginx_options: Optional[NginxMappingOverrides] = None, - is_coherent: Optional[Callable[[ClusterProvider, ClusterRolesConfig], bool]] = None, - is_recommended: Optional[Callable[[ClusterProvider, ClusterRolesConfig], bool]] = None, - resources_limit_options: Optional[_ResourceLimitOptionsMapping] = None, - resources_requests: Optional[Callable[["Coordinator"], Dict[str, str]]] = None, - container_name: Optional[str] = None, - remote_write_endpoints: Optional[Callable[[], List[RemoteWriteEndpoint]]] = None, - workload_tracing_protocols: Optional[List[ReceiverProtocol]] = None, + self, + charm: ops.CharmBase, + roles_config: ClusterRolesConfig, + external_url: str, # the ingressed url if we have ingress, else fqdn + worker_metrics_port: int, + endpoints: _EndpointMapping, + nginx_config: Callable[["Coordinator"], str], + workers_config: Callable[["Coordinator"], str], + nginx_options: Optional[NginxMappingOverrides] = None, + is_coherent: Optional[Callable[[ClusterProvider, ClusterRolesConfig], bool]] = None, + is_recommended: Optional[Callable[[ClusterProvider, ClusterRolesConfig], bool]] = None, + resources_limit_options: Optional[_ResourceLimitOptionsMapping] = None, + resources_requests: Optional[Callable[["Coordinator"], Dict[str, str]]] = None, + container_name: Optional[str] = None, + remote_write_endpoints: Optional[Callable[[], List[RemoteWriteEndpoint]]] = None, + workload_tracing_protocols: Optional[List[ReceiverProtocol]] = None, ): """Constructor for a Coordinator object. @@ -436,10 +436,10 @@ def _internal_url(self) -> str: def tls_available(self) -> bool: """Return True if tls is enabled and the necessary certs are found.""" return ( - self.cert_handler.enabled - and (self.cert_handler.server_cert is not None) - and (self.cert_handler.private_key is not None) # type: ignore - and (self.cert_handler.ca_cert is not None) + self.cert_handler.enabled + and (self.cert_handler.server_cert is not None) + and (self.cert_handler.private_key is not None) # type: ignore + and (self.cert_handler.ca_cert is not None) ) @property diff --git a/src/cosl/interfaces/datasource_exchange.py b/src/cosl/interfaces/datasource_exchange.py index 64342a4..87b1547 100644 --- a/src/cosl/interfaces/datasource_exchange.py +++ b/src/cosl/interfaces/datasource_exchange.py @@ -26,7 +26,8 @@ from typing import ( Iterable, List, - Tuple, Optional, + Optional, + Tuple, ) import ops From 153d2d18a3f701ed99cc8e3e5ab5cda1420d4507 Mon Sep 17 00:00:00 2001 From: Pietro Pasotti Date: Mon, 9 Dec 2024 11:55:30 +0100 Subject: [PATCH 8/9] made the endpoints optional --- src/cosl/coordinated_workers/coordinator.py | 8 +- src/cosl/interfaces/datasource_exchange.py | 24 ++++-- .../test_coordinator.py | 4 +- .../test_coordinator_status.py | 4 +- tests/test_datasource_exchange.py | 82 +++++++++++++++---- 5 files changed, 91 insertions(+), 31 deletions(-) diff --git a/src/cosl/coordinated_workers/coordinator.py b/src/cosl/coordinated_workers/coordinator.py index 640309e..582f44d 100644 --- a/src/cosl/coordinated_workers/coordinator.py +++ b/src/cosl/coordinated_workers/coordinator.py @@ -162,8 +162,8 @@ def _validate_container_name( "metrics": str, "charm-tracing": str, "workload-tracing": str, - "provide-datasource-exchange": str, - "require-datasource-exchange": str, + "send-datasource": Optional[str], + "receive-datasource": Optional[str], "s3": str, }, total=True, @@ -282,8 +282,8 @@ def __init__( self.s3_requirer = S3Requirer(self._charm, self._endpoints["s3"]) self.datasource_exchange = DatasourceExchange( self._charm, - provider_endpoint=self._endpoints["provide-datasource-exchange"], - requirer_endpoint=self._endpoints["require-datasource-exchange"], + provider_endpoint=self._endpoints.get("send-datasource", None), + requirer_endpoint=self._endpoints.get("receive-datasource", None), ) self._grafana_dashboards = GrafanaDashboardProvider( diff --git a/src/cosl/interfaces/datasource_exchange.py b/src/cosl/interfaces/datasource_exchange.py index 87b1547..80dc9dd 100644 --- a/src/cosl/interfaces/datasource_exchange.py +++ b/src/cosl/interfaces/datasource_exchange.py @@ -43,8 +43,6 @@ log = logging.getLogger("datasource_exchange") -DEFAULT_PROVIDE_ENDPOINT_NAME = "provide-ds-exchange" -DEFAULT_REQUIRE_ENDPOINT_NAME = "require-ds-exchange" DS_EXCHANGE_INTERFACE_NAME = "grafana_datasource_exchange" @@ -57,18 +55,23 @@ class DatasourceDict(TypedDict): type: str uid: str + grafana_uid: str class EndpointValidationError(ValueError): """Raised if an endpoint name is invalid.""" -def _validate_endpoints(charm: CharmBase, provider_endpoint: str, requirer_endpoint: str): +def _validate_endpoints( + charm: CharmBase, provider_endpoint: Optional[str], requirer_endpoint: Optional[str] +): meta = charm.meta for endpoint, source in ( (provider_endpoint, meta.provides), (requirer_endpoint, meta.requires), ): + if endpoint is None: + continue if endpoint not in source: raise EndpointValidationError(f"endpoint {endpoint!r} not declared in charm metadata") interface_name = source[endpoint].interface_name @@ -77,6 +80,11 @@ def _validate_endpoints(charm: CharmBase, provider_endpoint: str, requirer_endpo f"endpoint {endpoint} has unexpected interface {interface_name!r} " f"(should be {DS_EXCHANGE_INTERFACE_NAME})." ) + if not provider_endpoint and not requirer_endpoint: + raise EndpointValidationError( + "This charm should implement either a requirer or a provider (or both)" + "endpoint for `grafana-datasource-exchange`." + ) class DatasourceExchange: @@ -86,18 +94,16 @@ def __init__( self, charm: ops.CharmBase, *, - provider_endpoint: Optional[str] = None, - requirer_endpoint: Optional[str] = None, + provider_endpoint: Optional[str], + requirer_endpoint: Optional[str], ): self._charm = charm - provider_endpoint = provider_endpoint or DEFAULT_PROVIDE_ENDPOINT_NAME - requirer_endpoint = requirer_endpoint or DEFAULT_REQUIRE_ENDPOINT_NAME - _validate_endpoints(charm, provider_endpoint, requirer_endpoint) # gather all relations, provider or requirer all_relations = chain( - charm.model.relations[provider_endpoint], charm.model.relations[requirer_endpoint] + charm.model.relations.get(provider_endpoint, ()), + charm.model.relations.get(requirer_endpoint, ()), ) # filter out some common unhappy relation states diff --git a/tests/test_coordinated_workers/test_coordinator.py b/tests/test_coordinated_workers/test_coordinator.py index cd44789..b241c3c 100644 --- a/tests/test_coordinated_workers/test_coordinator.py +++ b/tests/test_coordinated_workers/test_coordinator.py @@ -123,8 +123,8 @@ def __init__(self, framework: ops.Framework): "charm-tracing": "my-charm-tracing", "workload-tracing": "my-workload-tracing", "s3": "my-s3", - "provide-datasource-exchange": "my-ds-exchange-provide", - "require-datasource-exchange": "my-ds-exchange-require", + "send-datasource": "my-ds-exchange-provide", + "receive-datasource": "my-ds-exchange-require", }, nginx_config=lambda coordinator: f"nginx configuration for {coordinator._charm.meta.name}", workers_config=lambda coordinator: f"workers configuration for {coordinator._charm.meta.name}", diff --git a/tests/test_coordinated_workers/test_coordinator_status.py b/tests/test_coordinated_workers/test_coordinator_status.py index 56ad0eb..300dafa 100644 --- a/tests/test_coordinated_workers/test_coordinator_status.py +++ b/tests/test_coordinated_workers/test_coordinator_status.py @@ -38,8 +38,8 @@ def __init__(self, framework: ops.Framework): "metrics": "metrics-endpoint", "charm-tracing": "self-charm-tracing", "workload-tracing": "self-workload-tracing", - "provide-datasource-exchange": "my-ds-exchange-provide", - "require-datasource-exchange": "my-ds-exchange-require", + "send-datasource": None, + "receive-datasource": "my-ds-exchange-require", }, nginx_config=lambda _: "nginx config", workers_config=lambda _: "worker config", diff --git a/tests/test_datasource_exchange.py b/tests/test_datasource_exchange.py index 7f6e223..45cc135 100644 --- a/tests/test_datasource_exchange.py +++ b/tests/test_datasource_exchange.py @@ -6,40 +6,95 @@ from scenario import Context, Relation, State from scenario.errors import UncaughtCharmError -from cosl.interfaces.datasource_exchange import DatasourceExchange, DSExchangeAppData +from cosl.interfaces.datasource_exchange import ( + DatasourceExchange, + DSExchangeAppData, + EndpointValidationError, +) @pytest.mark.parametrize( - "meta, invalid_reason", + "meta, declared", ( + ( + {}, + (None, None), + ), ( { "requires": {"boo": {"interface": "gibberish"}}, "provides": {"far": {"interface": "grafana_datasource_exchange"}}, }, - "unexpected interface 'gibberish'", + ("far", "boo"), ), ( { "requires": {"boo": {"interface": "grafana_datasource_exchange"}}, "provides": {"goo": {"interface": "grafana_datasource_exchange"}}, }, - "endpoint 'far' not declared", + ("far", "boo"), ), ), ) -def test_endpoint_validation(meta, invalid_reason): +def test_endpoint_validation_failure(meta, declared): + # GIVEN a charm with this metadata and declared provider/requirer endpoints + class BadCharm(CharmBase): def __init__(self, framework: Framework): super().__init__(framework) + prov, req = declared self.ds_exchange = DatasourceExchange( - self, provider_endpoint="far", requirer_endpoint="boo" + self, provider_endpoint=prov, requirer_endpoint=req ) - with pytest.raises(UncaughtCharmError, match=invalid_reason): + # WHEN any event is processed + with pytest.raises(UncaughtCharmError) as e: ctx = Context(BadCharm, meta={"name": "bob", **meta}) ctx.run(ctx.on.update_status(), State()) + # THEN we raise an EndpointValidationError + assert isinstance(e.value.__cause__, EndpointValidationError) + + +@pytest.mark.parametrize( + "meta, declared", + ( + ( + { + "requires": {"boo": {"interface": "grafana_datasource_exchange"}}, + "provides": {"far": {"interface": "grafana_datasource_exchange"}}, + }, + ("far", "boo"), + ), + ( + { + "provides": {"far": {"interface": "grafana_datasource_exchange"}}, + }, + ("far", None), + ), + ( + { + "requires": {"boo": {"interface": "grafana_datasource_exchange"}}, + }, + (None, "boo"), + ), + ), +) +def test_endpoint_validation_ok(meta, declared): + # GIVEN a charm with this metadata and declared provider/requirer endpoints + class BadCharm(CharmBase): + def __init__(self, framework: Framework): + super().__init__(framework) + prov, req = declared + self.ds_exchange = DatasourceExchange( + self, provider_endpoint=prov, requirer_endpoint=req + ) + + # WHEN any event is processed + ctx = Context(BadCharm, meta={"name": "bob", **meta}) + ctx.run(ctx.on.update_status(), State()) + # THEN no exception is raised + def test_ds_submit(): # GIVEN a charm with a single datasource_exchange relation @@ -47,15 +102,14 @@ class MyCharm(CharmBase): META = { "name": "robbie", "provides": {"foo": {"interface": "grafana_datasource_exchange"}}, - "requires": {"bar": {"interface": "grafana_datasource_exchange"}}, } def __init__(self, framework: Framework): super().__init__(framework) self.ds_exchange = DatasourceExchange( - self, provider_endpoint="foo", requirer_endpoint="bar" + self, provider_endpoint="foo", requirer_endpoint=None ) - self.ds_exchange.publish([{"type": "tempo", "uid": "123"}]) + self.ds_exchange.publish([{"type": "tempo", "uid": "123", "grafana_uid": "123123"}]) ctx = Context(MyCharm, meta=MyCharm.META) @@ -91,11 +145,11 @@ def __init__(self, framework: Framework): ctx = Context(MyCharm, meta=MyCharm.META) ds_requirer_in = [ - {"type": "c", "uid": "3"}, - {"type": "a", "uid": "1"}, - {"type": "b", "uid": "2"}, + {"type": "c", "uid": "3", "grafana_uid": "4"}, + {"type": "a", "uid": "1", "grafana_uid": "5"}, + {"type": "b", "uid": "2", "grafana_uid": "6"}, ] - ds_provider_in = [{"type": "d", "uid": "4"}] + ds_provider_in = [{"type": "d", "uid": "4", "grafana_uid": "7"}] dse_requirer_in = Relation( "foo", From 64ee92860f1bb84fbed4129c068ce87edbdaec49 Mon Sep 17 00:00:00 2001 From: Pietro Pasotti Date: Tue, 10 Dec 2024 09:58:47 +0100 Subject: [PATCH 9/9] pr comments --- pyproject.toml | 2 +- src/cosl/interfaces/datasource_exchange.py | 14 +++++++------- tests/test_datasource_exchange.py | 2 +- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 7d91f8d..79e6a91 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "cosl" -version = "0.0.44" +version = "0.0.45" authors = [ { name = "sed-i", email = "82407168+sed-i@users.noreply.github.com" }, ] diff --git a/src/cosl/interfaces/datasource_exchange.py b/src/cosl/interfaces/datasource_exchange.py index 80dc9dd..9105e44 100644 --- a/src/cosl/interfaces/datasource_exchange.py +++ b/src/cosl/interfaces/datasource_exchange.py @@ -4,8 +4,8 @@ """Shared utilities for the inter-coordinator "grafana_datasource_exchange" interface. -See https://github.com/canonical/charm-relation-interfaces/pull/207 for the interface specification. -# TODO update when pr merged +See https://github.com/canonical/charm-relation-interfaces/tree/main/interfaces/grafana_datasource_exchange/v0 +for the interface specification. """ @@ -22,7 +22,6 @@ import json import logging -from itertools import chain from typing import ( Iterable, List, @@ -101,10 +100,11 @@ def __init__( _validate_endpoints(charm, provider_endpoint, requirer_endpoint) # gather all relations, provider or requirer - all_relations = chain( - charm.model.relations.get(provider_endpoint, ()), - charm.model.relations.get(requirer_endpoint, ()), - ) + all_relations = [] + if provider_endpoint: + all_relations.extend(charm.model.relations.get(provider_endpoint, ())) + if requirer_endpoint: + all_relations.extend(charm.model.relations.get(requirer_endpoint, ())) # filter out some common unhappy relation states self._relations: List[ops.Relation] = [ diff --git a/tests/test_datasource_exchange.py b/tests/test_datasource_exchange.py index 45cc135..9b6b57a 100644 --- a/tests/test_datasource_exchange.py +++ b/tests/test_datasource_exchange.py @@ -96,7 +96,7 @@ def __init__(self, framework: Framework): # THEN no exception is raised -def test_ds_submit(): +def test_ds_publish(): # GIVEN a charm with a single datasource_exchange relation class MyCharm(CharmBase): META = {