Skip to content

Commit

Permalink
Datasource exchange (#109)
Browse files Browse the repository at this point in the history
* fixed dependencies

* moved cluster over to new dir

* ds exchange interface implementation

* typing

* add datasource exchange to coordinator

* utests

* fmt

* made the endpoints optional

* pr comments
  • Loading branch information
PietroPasotti authored Dec 10, 2024
1 parent eedfe50 commit 8ae974b
Show file tree
Hide file tree
Showing 8 changed files with 407 additions and 27 deletions.
52 changes: 30 additions & 22 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,23 @@ build-backend = "hatchling.build"

[project]
name = "cosl"
version = "0.0.44"
version = "0.0.45"
authors = [
{ name="sed-i", email="[email protected]" },
{ name = "sed-i", email = "[email protected]" },
]
description = "Utils for COS Lite charms"
readme = "README.md"
license = {file = "LICENSE"}
license = { file = "LICENSE" }
requires-python = ">=3.8"
dependencies = [
"ops",
"pydantic",
"tenacity",
"PyYAML",
"typing-extensions",
"lightkube>=v0.15.4"
"ops",
"pydantic",
"tenacity",
"jsonschema",
"PyYAML",
"typing-extensions",
"lightkube>=v0.15.4",
"charm-relation-interfaces @ git+https://github.com/canonical/charm-relation-interfaces",
]
classifiers = [
"Programming Language :: Python :: 3.8",
Expand Down Expand Up @@ -60,21 +62,21 @@ extend-exclude = ["__pycache__", "*.egg_info"]
[tool.ruff.lint]
select = ["E", "W", "F", "C", "N", "D", "I001"]
extend-ignore = [
"D203",
"D204",
"D213",
"D215",
"D400",
"D404",
"D406",
"D407",
"D408",
"D409",
"D413",
"E402",
"D203",
"D204",
"D213",
"D215",
"D400",
"D404",
"D406",
"D407",
"D408",
"D409",
"D413",
"E402",
]
ignore = ["E501", "D107"]
per-file-ignores = {"tests/*" = ["D100","D101","D102","D103","D104"]}
per-file-ignores = { "tests/*" = ["D100", "D101", "D102", "D103", "D104"] }

[tool.ruff.lint.pydocstyle]
convention = "google"
Expand All @@ -88,6 +90,7 @@ classmethod-decorators = ["classmethod", "pydantic.validator"]

[tool.pyright]
include = ["src"]

extraPaths = ["lib", "src/cosl"]
pythonVersion = "3.8"
pythonPlatform = "All"
Expand All @@ -99,3 +102,8 @@ reportTypeCommentUsage = false
[tool.codespell]
skip = ".git,.tox,build,lib,venv*,.mypy_cache"
ignore-words-list = "assertIn"

[tool.hatch.metadata]
# allow git+ dependencies in pyproject
allow-direct-references = true

8 changes: 8 additions & 0 deletions src/cosl/coordinated_workers/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
)
from cosl.helpers import check_libs_installed
from cosl.interfaces.cluster import ClusterProvider, RemoteWriteEndpoint
from cosl.interfaces.datasource_exchange import DatasourceExchange

check_libs_installed(
"charms.data_platform_libs.v0.s3",
Expand Down Expand Up @@ -161,6 +162,8 @@ def _validate_container_name(
"metrics": str,
"charm-tracing": str,
"workload-tracing": str,
"send-datasource": Optional[str],
"receive-datasource": Optional[str],
"s3": str,
},
total=True,
Expand Down Expand Up @@ -277,6 +280,11 @@ def __init__(
)

self.s3_requirer = S3Requirer(self._charm, self._endpoints["s3"])
self.datasource_exchange = DatasourceExchange(
self._charm,
provider_endpoint=self._endpoints.get("send-datasource", None),
requirer_endpoint=self._endpoints.get("receive-datasource", None),
)

self._grafana_dashboards = GrafanaDashboardProvider(
self._charm, relation_name=self._endpoints["grafana-dashboards"]
Expand Down
144 changes: 144 additions & 0 deletions src/cosl/interfaces/datasource_exchange.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
#!/usr/bin/env python3
# Copyright 2024 Canonical
# See LICENSE file for licensing details.

"""Shared utilities for the inter-coordinator "grafana_datasource_exchange" interface.
See https://github.com/canonical/charm-relation-interfaces/tree/main/interfaces/grafana_datasource_exchange/v0
for the interface specification.
"""


# FIXME: the interfaces import (because it's a git dep perhaps?)
# can't be type-checked, which breaks everything
# pyright: reportMissingImports=false
# pyright: reportUntypedBaseClass=false
# pyright: reportUnknownLambdaType=false
# pyright: reportUnknownMemberType=false
# pyright: reportUnknownVariableType=false
# pyright: reportUnknownArgumentType=false
# pyright: reportUnknownParameterType=false


import json
import logging
from typing import (
Iterable,
List,
Optional,
Tuple,
)

import ops
from interfaces.grafana_datasource_exchange.v0.schema import (
GrafanaDatasource,
GrafanaSourceAppData,
)
from ops import CharmBase
from typing_extensions import TypedDict

import cosl.interfaces.utils
from cosl.interfaces.utils import DataValidationError

log = logging.getLogger("datasource_exchange")

DS_EXCHANGE_INTERFACE_NAME = "grafana_datasource_exchange"


class DSExchangeAppData(cosl.interfaces.utils.DatabagModelV2, GrafanaSourceAppData):
"""App databag schema for both sides of this interface."""


class DatasourceDict(TypedDict):
"""Raw datasource information."""

type: str
uid: str
grafana_uid: str


class EndpointValidationError(ValueError):
"""Raised if an endpoint name is invalid."""


def _validate_endpoints(
charm: CharmBase, provider_endpoint: Optional[str], requirer_endpoint: Optional[str]
):
meta = charm.meta
for endpoint, source in (
(provider_endpoint, meta.provides),
(requirer_endpoint, meta.requires),
):
if endpoint is None:
continue
if endpoint not in source:
raise EndpointValidationError(f"endpoint {endpoint!r} not declared in charm metadata")
interface_name = source[endpoint].interface_name
if interface_name != DS_EXCHANGE_INTERFACE_NAME:
raise EndpointValidationError(
f"endpoint {endpoint} has unexpected interface {interface_name!r} "
f"(should be {DS_EXCHANGE_INTERFACE_NAME})."
)
if not provider_endpoint and not requirer_endpoint:
raise EndpointValidationError(
"This charm should implement either a requirer or a provider (or both)"
"endpoint for `grafana-datasource-exchange`."
)


class DatasourceExchange:
"""``grafana_datasource_exchange`` interface endpoint wrapper (provider AND requirer)."""

def __init__(
self,
charm: ops.CharmBase,
*,
provider_endpoint: Optional[str],
requirer_endpoint: Optional[str],
):
self._charm = charm
_validate_endpoints(charm, provider_endpoint, requirer_endpoint)

# gather all relations, provider or requirer
all_relations = []
if provider_endpoint:
all_relations.extend(charm.model.relations.get(provider_endpoint, ()))
if requirer_endpoint:
all_relations.extend(charm.model.relations.get(requirer_endpoint, ()))

# filter out some common unhappy relation states
self._relations: List[ops.Relation] = [
rel for rel in all_relations if (rel.app and rel.data)
]

def publish(self, datasources: Iterable[DatasourceDict]):
"""Submit these datasources to all remotes.
This operation is leader-only.
"""
# sort by UID to prevent endless relation-changed cascades if this keeps flapping
encoded_datasources = json.dumps(sorted(datasources, key=lambda raw_ds: raw_ds["uid"]))
app_data = DSExchangeAppData(
datasources=encoded_datasources # type: ignore[reportCallIssue]
)

for relation in self._relations:
app_data.dump(relation.data[self._charm.app])

@property
def received_datasources(self) -> Tuple[GrafanaDatasource, ...]:
"""Collect all datasources that the remotes have shared.
This operation is leader-only.
"""
datasources: List[GrafanaDatasource] = []

for relation in self._relations:
try:
datasource = DSExchangeAppData.load(relation.data[relation.app])
except DataValidationError:
# load() already logs something in this case
continue

datasources.extend(datasource.datasources)
return tuple(sorted(datasources, key=lambda ds: ds.uid))
42 changes: 42 additions & 0 deletions src/cosl/interfaces/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,45 @@ def dump(self, databag: Optional[_RawDatabag] = None, clear: bool = True) -> _Ra
dct = self.model_dump(mode="json", by_alias=True, exclude_defaults=True) # type: ignore
_databag.update({k: json.dumps(v) for k, v in dct.items()})
return _databag


# FIXME: in pydantic v2, the json stuff we've been doing is no longer necessary.
# It becomes much easier to work with Json fields and the databagmodel class becomes much simpler.
# We should rewrite the cluster implementation to use this class,
# and replace the original DatabagModel with it
class DatabagModelV2(pydantic.BaseModel):
"""Base databag model."""

model_config = ConfigDict(
# tolerate additional keys in databag
extra="ignore",
# Allow instantiating this class by field name (instead of forcing alias).
populate_by_name=True,
) # type: ignore
"""Pydantic config."""

@classmethod
def load(cls, databag: _RawDatabag):
"""Load this model from a Juju databag."""
try:
return cls.model_validate_json(json.dumps(dict(databag))) # type: ignore
except pydantic.ValidationError as e:
msg = f"failed to validate databag: {databag}"
if databag:
log.debug(msg, exc_info=True)
raise DataValidationError(msg) from e

def dump(self, databag: Optional[_RawDatabag] = None, clear: bool = True) -> _RawDatabag:
"""Write the contents of this model to Juju databag.
:param databag: the databag to write the data to.
:param clear: ensure the databag is cleared before writing it.
"""
_databag: _RawDatabag = {} if databag is None else databag

if clear:
_databag.clear()

dct = self.model_dump(mode="json", by_alias=True, exclude_defaults=True, round_trip=True) # type: ignore
_databag.update(dct)
return _databag
4 changes: 4 additions & 0 deletions tests/test_coordinated_workers/test_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,12 @@ class MyCoordinator(ops.CharmBase):
"my-charm-tracing": {"interface": "tracing", "limit": 1},
"my-workload-tracing": {"interface": "tracing", "limit": 1},
"my-s3": {"interface": "s3"},
"my-ds-exchange-require": {"interface": "grafana_datasource_exchange"},
},
"provides": {
"my-dashboards": {"interface": "grafana_dashboard"},
"my-metrics": {"interface": "prometheus_scrape"},
"my-ds-exchange-provide": {"interface": "grafana_datasource_exchange"},
},
"containers": {
"nginx": {"type": "oci-image"},
Expand Down Expand Up @@ -121,6 +123,8 @@ def __init__(self, framework: ops.Framework):
"charm-tracing": "my-charm-tracing",
"workload-tracing": "my-workload-tracing",
"s3": "my-s3",
"send-datasource": "my-ds-exchange-provide",
"receive-datasource": "my-ds-exchange-require",
},
nginx_config=lambda coordinator: f"nginx configuration for {coordinator._charm.meta.name}",
workers_config=lambda coordinator: f"workers configuration for {coordinator._charm.meta.name}",
Expand Down
4 changes: 4 additions & 0 deletions tests/test_coordinated_workers/test_coordinator_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ def __init__(self, framework: ops.Framework):
"metrics": "metrics-endpoint",
"charm-tracing": "self-charm-tracing",
"workload-tracing": "self-workload-tracing",
"send-datasource": None,
"receive-datasource": "my-ds-exchange-require",
},
nginx_config=lambda _: "nginx config",
workers_config=lambda _: "worker config",
Expand All @@ -64,11 +66,13 @@ def ctx(coord_charm):
"certificates": {"interface": "tls-certificates"},
"self-charm-tracing": {"interface": "tracing", "limit": 1},
"self-workload-tracing": {"interface": "tracing", "limit": 1},
"my-ds-exchange-require": {"interface": "grafana_datasource_exchange"},
},
"provides": {
"cluster": {"interface": "cluster"},
"grafana-dashboard": {"interface": "grafana_dashboard"},
"metrics-endpoint": {"interface": "prometheus_scrape"},
"my-ds-exchange-provide": {"interface": "grafana_datasource_exchange"},
},
"containers": {
"nginx": {"type": "oci-image"},
Expand Down
Loading

0 comments on commit 8ae974b

Please sign in to comment.