From 8ae974bfbbfcbedf316df552f98c5b622a20cffe Mon Sep 17 00:00:00 2001 From: PietroPasotti Date: Tue, 10 Dec 2024 10:08:58 +0100 Subject: [PATCH] Datasource exchange (#109) * fixed dependencies * moved cluster over to new dir * ds exchange interface implementation * typing * add datasource exchange to coordinator * utests * fmt * made the endpoints optional * pr comments --- pyproject.toml | 52 +++--- src/cosl/coordinated_workers/coordinator.py | 8 + src/cosl/interfaces/datasource_exchange.py | 144 +++++++++++++++ src/cosl/interfaces/utils.py | 42 +++++ .../test_coordinator.py | 4 + .../test_coordinator_status.py | 4 + tests/test_datasource_exchange.py | 174 ++++++++++++++++++ tox.ini | 6 +- 8 files changed, 407 insertions(+), 27 deletions(-) create mode 100644 src/cosl/interfaces/datasource_exchange.py create mode 100644 tests/test_datasource_exchange.py diff --git a/pyproject.toml b/pyproject.toml index 07f6ae6..79e6a91 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,21 +4,23 @@ build-backend = "hatchling.build" [project] name = "cosl" -version = "0.0.44" +version = "0.0.45" authors = [ - { name="sed-i", email="82407168+sed-i@users.noreply.github.com" }, + { name = "sed-i", email = "82407168+sed-i@users.noreply.github.com" }, ] description = "Utils for COS Lite charms" readme = "README.md" -license = {file = "LICENSE"} +license = { file = "LICENSE" } requires-python = ">=3.8" dependencies = [ - "ops", - "pydantic", - "tenacity", - "PyYAML", - "typing-extensions", - "lightkube>=v0.15.4" + "ops", + "pydantic", + "tenacity", + "jsonschema", + "PyYAML", + "typing-extensions", + "lightkube>=v0.15.4", + "charm-relation-interfaces @ git+https://github.com/canonical/charm-relation-interfaces", ] classifiers = [ "Programming Language :: Python :: 3.8", @@ -60,21 +62,21 @@ extend-exclude = ["__pycache__", "*.egg_info"] [tool.ruff.lint] select = ["E", "W", "F", "C", "N", "D", "I001"] extend-ignore = [ - "D203", - "D204", - "D213", - "D215", - "D400", - "D404", - "D406", - "D407", - "D408", - "D409", - "D413", - "E402", + "D203", + "D204", + "D213", + "D215", + "D400", + "D404", + "D406", + "D407", + "D408", + "D409", + "D413", + "E402", ] ignore = ["E501", "D107"] -per-file-ignores = {"tests/*" = ["D100","D101","D102","D103","D104"]} +per-file-ignores = { "tests/*" = ["D100", "D101", "D102", "D103", "D104"] } [tool.ruff.lint.pydocstyle] convention = "google" @@ -88,6 +90,7 @@ classmethod-decorators = ["classmethod", "pydantic.validator"] [tool.pyright] include = ["src"] + extraPaths = ["lib", "src/cosl"] pythonVersion = "3.8" pythonPlatform = "All" @@ -99,3 +102,8 @@ reportTypeCommentUsage = false [tool.codespell] skip = ".git,.tox,build,lib,venv*,.mypy_cache" ignore-words-list = "assertIn" + +[tool.hatch.metadata] +# allow git+ dependencies in pyproject +allow-direct-references = true + diff --git a/src/cosl/coordinated_workers/coordinator.py b/src/cosl/coordinated_workers/coordinator.py index 9c5afae..582f44d 100644 --- a/src/cosl/coordinated_workers/coordinator.py +++ b/src/cosl/coordinated_workers/coordinator.py @@ -39,6 +39,7 @@ ) from cosl.helpers import check_libs_installed from cosl.interfaces.cluster import ClusterProvider, RemoteWriteEndpoint +from cosl.interfaces.datasource_exchange import DatasourceExchange check_libs_installed( "charms.data_platform_libs.v0.s3", @@ -161,6 +162,8 @@ def _validate_container_name( "metrics": str, "charm-tracing": str, "workload-tracing": str, + "send-datasource": Optional[str], + "receive-datasource": Optional[str], "s3": str, }, total=True, @@ -277,6 +280,11 @@ def __init__( ) self.s3_requirer = S3Requirer(self._charm, self._endpoints["s3"]) + self.datasource_exchange = DatasourceExchange( + self._charm, + provider_endpoint=self._endpoints.get("send-datasource", None), + requirer_endpoint=self._endpoints.get("receive-datasource", None), + ) self._grafana_dashboards = GrafanaDashboardProvider( self._charm, relation_name=self._endpoints["grafana-dashboards"] diff --git a/src/cosl/interfaces/datasource_exchange.py b/src/cosl/interfaces/datasource_exchange.py new file mode 100644 index 0000000..9105e44 --- /dev/null +++ b/src/cosl/interfaces/datasource_exchange.py @@ -0,0 +1,144 @@ +#!/usr/bin/env python3 +# Copyright 2024 Canonical +# See LICENSE file for licensing details. + +"""Shared utilities for the inter-coordinator "grafana_datasource_exchange" interface. + +See https://github.com/canonical/charm-relation-interfaces/tree/main/interfaces/grafana_datasource_exchange/v0 +for the interface specification. +""" + + +# FIXME: the interfaces import (because it's a git dep perhaps?) +# can't be type-checked, which breaks everything +# pyright: reportMissingImports=false +# pyright: reportUntypedBaseClass=false +# pyright: reportUnknownLambdaType=false +# pyright: reportUnknownMemberType=false +# pyright: reportUnknownVariableType=false +# pyright: reportUnknownArgumentType=false +# pyright: reportUnknownParameterType=false + + +import json +import logging +from typing import ( + Iterable, + List, + Optional, + Tuple, +) + +import ops +from interfaces.grafana_datasource_exchange.v0.schema import ( + GrafanaDatasource, + GrafanaSourceAppData, +) +from ops import CharmBase +from typing_extensions import TypedDict + +import cosl.interfaces.utils +from cosl.interfaces.utils import DataValidationError + +log = logging.getLogger("datasource_exchange") + +DS_EXCHANGE_INTERFACE_NAME = "grafana_datasource_exchange" + + +class DSExchangeAppData(cosl.interfaces.utils.DatabagModelV2, GrafanaSourceAppData): + """App databag schema for both sides of this interface.""" + + +class DatasourceDict(TypedDict): + """Raw datasource information.""" + + type: str + uid: str + grafana_uid: str + + +class EndpointValidationError(ValueError): + """Raised if an endpoint name is invalid.""" + + +def _validate_endpoints( + charm: CharmBase, provider_endpoint: Optional[str], requirer_endpoint: Optional[str] +): + meta = charm.meta + for endpoint, source in ( + (provider_endpoint, meta.provides), + (requirer_endpoint, meta.requires), + ): + if endpoint is None: + continue + if endpoint not in source: + raise EndpointValidationError(f"endpoint {endpoint!r} not declared in charm metadata") + interface_name = source[endpoint].interface_name + if interface_name != DS_EXCHANGE_INTERFACE_NAME: + raise EndpointValidationError( + f"endpoint {endpoint} has unexpected interface {interface_name!r} " + f"(should be {DS_EXCHANGE_INTERFACE_NAME})." + ) + if not provider_endpoint and not requirer_endpoint: + raise EndpointValidationError( + "This charm should implement either a requirer or a provider (or both)" + "endpoint for `grafana-datasource-exchange`." + ) + + +class DatasourceExchange: + """``grafana_datasource_exchange`` interface endpoint wrapper (provider AND requirer).""" + + def __init__( + self, + charm: ops.CharmBase, + *, + provider_endpoint: Optional[str], + requirer_endpoint: Optional[str], + ): + self._charm = charm + _validate_endpoints(charm, provider_endpoint, requirer_endpoint) + + # gather all relations, provider or requirer + all_relations = [] + if provider_endpoint: + all_relations.extend(charm.model.relations.get(provider_endpoint, ())) + if requirer_endpoint: + all_relations.extend(charm.model.relations.get(requirer_endpoint, ())) + + # filter out some common unhappy relation states + self._relations: List[ops.Relation] = [ + rel for rel in all_relations if (rel.app and rel.data) + ] + + def publish(self, datasources: Iterable[DatasourceDict]): + """Submit these datasources to all remotes. + + This operation is leader-only. + """ + # sort by UID to prevent endless relation-changed cascades if this keeps flapping + encoded_datasources = json.dumps(sorted(datasources, key=lambda raw_ds: raw_ds["uid"])) + app_data = DSExchangeAppData( + datasources=encoded_datasources # type: ignore[reportCallIssue] + ) + + for relation in self._relations: + app_data.dump(relation.data[self._charm.app]) + + @property + def received_datasources(self) -> Tuple[GrafanaDatasource, ...]: + """Collect all datasources that the remotes have shared. + + This operation is leader-only. + """ + datasources: List[GrafanaDatasource] = [] + + for relation in self._relations: + try: + datasource = DSExchangeAppData.load(relation.data[relation.app]) + except DataValidationError: + # load() already logs something in this case + continue + + datasources.extend(datasource.datasources) + return tuple(sorted(datasources, key=lambda ds: ds.uid)) diff --git a/src/cosl/interfaces/utils.py b/src/cosl/interfaces/utils.py index 16b1557..40edf51 100644 --- a/src/cosl/interfaces/utils.py +++ b/src/cosl/interfaces/utils.py @@ -81,3 +81,45 @@ def dump(self, databag: Optional[_RawDatabag] = None, clear: bool = True) -> _Ra dct = self.model_dump(mode="json", by_alias=True, exclude_defaults=True) # type: ignore _databag.update({k: json.dumps(v) for k, v in dct.items()}) return _databag + + +# FIXME: in pydantic v2, the json stuff we've been doing is no longer necessary. +# It becomes much easier to work with Json fields and the databagmodel class becomes much simpler. +# We should rewrite the cluster implementation to use this class, +# and replace the original DatabagModel with it +class DatabagModelV2(pydantic.BaseModel): + """Base databag model.""" + + model_config = ConfigDict( + # tolerate additional keys in databag + extra="ignore", + # Allow instantiating this class by field name (instead of forcing alias). + populate_by_name=True, + ) # type: ignore + """Pydantic config.""" + + @classmethod + def load(cls, databag: _RawDatabag): + """Load this model from a Juju databag.""" + try: + return cls.model_validate_json(json.dumps(dict(databag))) # type: ignore + except pydantic.ValidationError as e: + msg = f"failed to validate databag: {databag}" + if databag: + log.debug(msg, exc_info=True) + raise DataValidationError(msg) from e + + def dump(self, databag: Optional[_RawDatabag] = None, clear: bool = True) -> _RawDatabag: + """Write the contents of this model to Juju databag. + + :param databag: the databag to write the data to. + :param clear: ensure the databag is cleared before writing it. + """ + _databag: _RawDatabag = {} if databag is None else databag + + if clear: + _databag.clear() + + dct = self.model_dump(mode="json", by_alias=True, exclude_defaults=True, round_trip=True) # type: ignore + _databag.update(dct) + return _databag diff --git a/tests/test_coordinated_workers/test_coordinator.py b/tests/test_coordinated_workers/test_coordinator.py index e79103f..b241c3c 100644 --- a/tests/test_coordinated_workers/test_coordinator.py +++ b/tests/test_coordinated_workers/test_coordinator.py @@ -79,10 +79,12 @@ class MyCoordinator(ops.CharmBase): "my-charm-tracing": {"interface": "tracing", "limit": 1}, "my-workload-tracing": {"interface": "tracing", "limit": 1}, "my-s3": {"interface": "s3"}, + "my-ds-exchange-require": {"interface": "grafana_datasource_exchange"}, }, "provides": { "my-dashboards": {"interface": "grafana_dashboard"}, "my-metrics": {"interface": "prometheus_scrape"}, + "my-ds-exchange-provide": {"interface": "grafana_datasource_exchange"}, }, "containers": { "nginx": {"type": "oci-image"}, @@ -121,6 +123,8 @@ def __init__(self, framework: ops.Framework): "charm-tracing": "my-charm-tracing", "workload-tracing": "my-workload-tracing", "s3": "my-s3", + "send-datasource": "my-ds-exchange-provide", + "receive-datasource": "my-ds-exchange-require", }, nginx_config=lambda coordinator: f"nginx configuration for {coordinator._charm.meta.name}", workers_config=lambda coordinator: f"workers configuration for {coordinator._charm.meta.name}", diff --git a/tests/test_coordinated_workers/test_coordinator_status.py b/tests/test_coordinated_workers/test_coordinator_status.py index 9424ad5..300dafa 100644 --- a/tests/test_coordinated_workers/test_coordinator_status.py +++ b/tests/test_coordinated_workers/test_coordinator_status.py @@ -38,6 +38,8 @@ def __init__(self, framework: ops.Framework): "metrics": "metrics-endpoint", "charm-tracing": "self-charm-tracing", "workload-tracing": "self-workload-tracing", + "send-datasource": None, + "receive-datasource": "my-ds-exchange-require", }, nginx_config=lambda _: "nginx config", workers_config=lambda _: "worker config", @@ -64,11 +66,13 @@ def ctx(coord_charm): "certificates": {"interface": "tls-certificates"}, "self-charm-tracing": {"interface": "tracing", "limit": 1}, "self-workload-tracing": {"interface": "tracing", "limit": 1}, + "my-ds-exchange-require": {"interface": "grafana_datasource_exchange"}, }, "provides": { "cluster": {"interface": "cluster"}, "grafana-dashboard": {"interface": "grafana_dashboard"}, "metrics-endpoint": {"interface": "prometheus_scrape"}, + "my-ds-exchange-provide": {"interface": "grafana_datasource_exchange"}, }, "containers": { "nginx": {"type": "oci-image"}, diff --git a/tests/test_datasource_exchange.py b/tests/test_datasource_exchange.py new file mode 100644 index 0000000..9b6b57a --- /dev/null +++ b/tests/test_datasource_exchange.py @@ -0,0 +1,174 @@ +import json + +import pytest +from interfaces.grafana_datasource_exchange.v0.schema import GrafanaDatasource +from ops import CharmBase, Framework +from scenario import Context, Relation, State +from scenario.errors import UncaughtCharmError + +from cosl.interfaces.datasource_exchange import ( + DatasourceExchange, + DSExchangeAppData, + EndpointValidationError, +) + + +@pytest.mark.parametrize( + "meta, declared", + ( + ( + {}, + (None, None), + ), + ( + { + "requires": {"boo": {"interface": "gibberish"}}, + "provides": {"far": {"interface": "grafana_datasource_exchange"}}, + }, + ("far", "boo"), + ), + ( + { + "requires": {"boo": {"interface": "grafana_datasource_exchange"}}, + "provides": {"goo": {"interface": "grafana_datasource_exchange"}}, + }, + ("far", "boo"), + ), + ), +) +def test_endpoint_validation_failure(meta, declared): + # GIVEN a charm with this metadata and declared provider/requirer endpoints + + class BadCharm(CharmBase): + def __init__(self, framework: Framework): + super().__init__(framework) + prov, req = declared + self.ds_exchange = DatasourceExchange( + self, provider_endpoint=prov, requirer_endpoint=req + ) + + # WHEN any event is processed + with pytest.raises(UncaughtCharmError) as e: + ctx = Context(BadCharm, meta={"name": "bob", **meta}) + ctx.run(ctx.on.update_status(), State()) + + # THEN we raise an EndpointValidationError + assert isinstance(e.value.__cause__, EndpointValidationError) + + +@pytest.mark.parametrize( + "meta, declared", + ( + ( + { + "requires": {"boo": {"interface": "grafana_datasource_exchange"}}, + "provides": {"far": {"interface": "grafana_datasource_exchange"}}, + }, + ("far", "boo"), + ), + ( + { + "provides": {"far": {"interface": "grafana_datasource_exchange"}}, + }, + ("far", None), + ), + ( + { + "requires": {"boo": {"interface": "grafana_datasource_exchange"}}, + }, + (None, "boo"), + ), + ), +) +def test_endpoint_validation_ok(meta, declared): + # GIVEN a charm with this metadata and declared provider/requirer endpoints + class BadCharm(CharmBase): + def __init__(self, framework: Framework): + super().__init__(framework) + prov, req = declared + self.ds_exchange = DatasourceExchange( + self, provider_endpoint=prov, requirer_endpoint=req + ) + + # WHEN any event is processed + ctx = Context(BadCharm, meta={"name": "bob", **meta}) + ctx.run(ctx.on.update_status(), State()) + # THEN no exception is raised + + +def test_ds_publish(): + # GIVEN a charm with a single datasource_exchange relation + class MyCharm(CharmBase): + META = { + "name": "robbie", + "provides": {"foo": {"interface": "grafana_datasource_exchange"}}, + } + + def __init__(self, framework: Framework): + super().__init__(framework) + self.ds_exchange = DatasourceExchange( + self, provider_endpoint="foo", requirer_endpoint=None + ) + self.ds_exchange.publish([{"type": "tempo", "uid": "123", "grafana_uid": "123123"}]) + + ctx = Context(MyCharm, meta=MyCharm.META) + + dse_in = Relation("foo") + state_in = State(relations={dse_in}, leader=True) + + # WHEN we receive any event + state_out = ctx.run(ctx.on.update_status(), state_in) + + # THEN we publish in our app databags any datasources we're aware of + dse_out = state_out.get_relation(dse_in.id) + assert dse_out.local_app_data + data = DSExchangeAppData.load(dse_out.local_app_data) + assert data.datasources[0].type == "tempo" + assert data.datasources[0].uid == "123" + + +def test_ds_receive(): + # GIVEN a charm with a single datasource_exchange relation + class MyCharm(CharmBase): + META = { + "name": "robbie", + "provides": {"foo": {"interface": "grafana_datasource_exchange"}}, + "requires": {"bar": {"interface": "grafana_datasource_exchange"}}, + } + + def __init__(self, framework: Framework): + super().__init__(framework) + self.ds_exchange = DatasourceExchange( + self, provider_endpoint="foo", requirer_endpoint="bar" + ) + + ctx = Context(MyCharm, meta=MyCharm.META) + + ds_requirer_in = [ + {"type": "c", "uid": "3", "grafana_uid": "4"}, + {"type": "a", "uid": "1", "grafana_uid": "5"}, + {"type": "b", "uid": "2", "grafana_uid": "6"}, + ] + ds_provider_in = [{"type": "d", "uid": "4", "grafana_uid": "7"}] + + dse_requirer_in = Relation( + "foo", + remote_app_data=DSExchangeAppData( + datasources=json.dumps(sorted(ds_provider_in, key=lambda raw_ds: raw_ds["uid"])) + ).dump(), + ) + dse_provider_in = Relation( + "bar", + remote_app_data=DSExchangeAppData( + datasources=json.dumps(sorted(ds_requirer_in, key=lambda raw_ds: raw_ds["uid"])) + ).dump(), + ) + state_in = State(relations={dse_requirer_in, dse_provider_in}, leader=True) + + # WHEN we receive any event + with ctx(ctx.on.update_status(), state_in) as mgr: + # THEN we can access all datasources we're given + dss = mgr.charm.ds_exchange.received_datasources + assert [ds.type for ds in dss] == list("abcd") + assert [ds.uid for ds in dss] == list("1234") + assert isinstance(dss[0], GrafanaDatasource) diff --git a/tox.ini b/tox.ini index e579a02..c984ba9 100644 --- a/tox.ini +++ b/tox.ini @@ -70,17 +70,13 @@ commands = [testenv:unit] description = Run unit tests deps = + . deepdiff fs pytest pytest-cov ops[testing] PyYAML - typing_extensions - cryptography - jsonschema - lightkube>=v0.15.4 - lightkube-models==1.24.1.4 allowlist_externals = /usr/bin/env charmcraft