Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Datasource exchange #109

Merged
merged 11 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 29 additions & 21 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,21 @@ build-backend = "hatchling.build"
name = "cosl"
version = "0.0.44"
authors = [
{ name="sed-i", email="[email protected]" },
{ name = "sed-i", email = "[email protected]" },
PietroPasotti marked this conversation as resolved.
Show resolved Hide resolved
]
description = "Utils for COS Lite charms"
readme = "README.md"
license = {file = "LICENSE"}
license = { file = "LICENSE" }
requires-python = ">=3.8"
dependencies = [
"ops",
"pydantic",
"tenacity",
"PyYAML",
"typing-extensions",
"lightkube>=v0.15.4"
"ops",
"pydantic",
"tenacity",
"jsonschema",
"PyYAML",
"typing-extensions",
"lightkube>=v0.15.4",
"charm-relation-interfaces @ git+https://github.com/canonical/charm-relation-interfaces@grafana_datasource_exchange",
]
classifiers = [
"Programming Language :: Python :: 3.8",
Expand Down Expand Up @@ -60,21 +62,21 @@ extend-exclude = ["__pycache__", "*.egg_info"]
[tool.ruff.lint]
select = ["E", "W", "F", "C", "N", "D", "I001"]
extend-ignore = [
"D203",
"D204",
"D213",
"D215",
"D400",
"D404",
"D406",
"D407",
"D408",
"D409",
"D413",
"E402",
"D203",
"D204",
"D213",
"D215",
"D400",
"D404",
"D406",
"D407",
"D408",
"D409",
"D413",
"E402",
]
ignore = ["E501", "D107"]
per-file-ignores = {"tests/*" = ["D100","D101","D102","D103","D104"]}
per-file-ignores = { "tests/*" = ["D100", "D101", "D102", "D103", "D104"] }

[tool.ruff.lint.pydocstyle]
convention = "google"
Expand All @@ -88,6 +90,7 @@ classmethod-decorators = ["classmethod", "pydantic.validator"]

[tool.pyright]
include = ["src"]

extraPaths = ["lib", "src/cosl"]
pythonVersion = "3.8"
pythonPlatform = "All"
Expand All @@ -99,3 +102,8 @@ reportTypeCommentUsage = false
[tool.codespell]
skip = ".git,.tox,build,lib,venv*,.mypy_cache"
ignore-words-list = "assertIn"

[tool.hatch.metadata]
# allow git+ dependencies in pyproject
allow-direct-references = true
PietroPasotti marked this conversation as resolved.
Show resolved Hide resolved

64 changes: 36 additions & 28 deletions src/cosl/coordinated_workers/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
)
from cosl.helpers import check_libs_installed
from cosl.interfaces.cluster import ClusterProvider, RemoteWriteEndpoint
from cosl.interfaces.datasource_exchange import DatasourceExchange

check_libs_installed(
"charms.data_platform_libs.v0.s3",
Expand Down Expand Up @@ -124,12 +125,12 @@ def __post_init__(self):
is_minimal_valid = set(self.minimal_deployment).issubset(self.roles)
is_recommended_valid = set(self.recommended_deployment).issubset(self.roles)
if not all(
[
are_meta_keys_valid,
are_meta_values_valid,
is_minimal_valid,
is_recommended_valid,
]
[
are_meta_keys_valid,
are_meta_values_valid,
is_minimal_valid,
is_recommended_valid,
]
):
raise ClusterRolesConfigError(
"Invalid ClusterRolesConfig: The configuration is not coherent."
Expand All @@ -141,8 +142,8 @@ def is_coherent_with(self, cluster_roles: Iterable[str]) -> bool:


def _validate_container_name(
container_name: Optional[str],
resources_requests: Optional[Callable[["Coordinator"], Dict[str, str]]],
container_name: Optional[str],
resources_requests: Optional[Callable[["Coordinator"], Dict[str, str]]],
):
"""Raise `ValueError` if `resources_requests` is not None and `container_name` is None."""
if resources_requests is not None and container_name is None:
Expand All @@ -161,6 +162,8 @@ def _validate_container_name(
"metrics": str,
"charm-tracing": str,
"workload-tracing": str,
"provide-datasource-exchange": str,
"require-datasource-exchange": str,
"s3": str,
},
total=True,
Expand All @@ -185,22 +188,22 @@ class Coordinator(ops.Object):
"""

def __init__(
self,
charm: ops.CharmBase,
roles_config: ClusterRolesConfig,
external_url: str, # the ingressed url if we have ingress, else fqdn
worker_metrics_port: int,
endpoints: _EndpointMapping,
nginx_config: Callable[["Coordinator"], str],
workers_config: Callable[["Coordinator"], str],
nginx_options: Optional[NginxMappingOverrides] = None,
is_coherent: Optional[Callable[[ClusterProvider, ClusterRolesConfig], bool]] = None,
is_recommended: Optional[Callable[[ClusterProvider, ClusterRolesConfig], bool]] = None,
resources_limit_options: Optional[_ResourceLimitOptionsMapping] = None,
resources_requests: Optional[Callable[["Coordinator"], Dict[str, str]]] = None,
container_name: Optional[str] = None,
remote_write_endpoints: Optional[Callable[[], List[RemoteWriteEndpoint]]] = None,
workload_tracing_protocols: Optional[List[ReceiverProtocol]] = None,
self,
charm: ops.CharmBase,
roles_config: ClusterRolesConfig,
external_url: str, # the ingressed url if we have ingress, else fqdn
worker_metrics_port: int,
endpoints: _EndpointMapping,
nginx_config: Callable[["Coordinator"], str],
workers_config: Callable[["Coordinator"], str],
nginx_options: Optional[NginxMappingOverrides] = None,
is_coherent: Optional[Callable[[ClusterProvider, ClusterRolesConfig], bool]] = None,
is_recommended: Optional[Callable[[ClusterProvider, ClusterRolesConfig], bool]] = None,
resources_limit_options: Optional[_ResourceLimitOptionsMapping] = None,
resources_requests: Optional[Callable[["Coordinator"], Dict[str, str]]] = None,
container_name: Optional[str] = None,
remote_write_endpoints: Optional[Callable[[], List[RemoteWriteEndpoint]]] = None,
workload_tracing_protocols: Optional[List[ReceiverProtocol]] = None,
):
"""Constructor for a Coordinator object.

Expand Down Expand Up @@ -277,6 +280,11 @@ def __init__(
)

self.s3_requirer = S3Requirer(self._charm, self._endpoints["s3"])
self.datasource_exchange = DatasourceExchange(
self._charm,
provider_endpoint=self._endpoints["provide-datasource-exchange"],
requirer_endpoint=self._endpoints["require-datasource-exchange"],
)

self._grafana_dashboards = GrafanaDashboardProvider(
self._charm, relation_name=self._endpoints["grafana-dashboards"]
Expand Down Expand Up @@ -428,10 +436,10 @@ def _internal_url(self) -> str:
def tls_available(self) -> bool:
"""Return True if tls is enabled and the necessary certs are found."""
return (
self.cert_handler.enabled
and (self.cert_handler.server_cert is not None)
and (self.cert_handler.private_key is not None) # type: ignore
and (self.cert_handler.ca_cert is not None)
self.cert_handler.enabled
and (self.cert_handler.server_cert is not None)
and (self.cert_handler.private_key is not None) # type: ignore
and (self.cert_handler.ca_cert is not None)
)

@property
Expand Down
137 changes: 137 additions & 0 deletions src/cosl/interfaces/datasource_exchange.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
#!/usr/bin/env python3
# Copyright 2024 Canonical
# See LICENSE file for licensing details.

"""Shared utilities for the inter-coordinator "grafana_datasource_exchange" interface.

See https://github.com/canonical/charm-relation-interfaces/pull/207 for the interface specification.
PietroPasotti marked this conversation as resolved.
Show resolved Hide resolved
# TODO update when pr merged
"""


# FIXME: the interfaces import (because it's a git dep perhaps?)
# can't be type-checked, which breaks everything
# pyright: reportMissingImports=false
# pyright: reportUntypedBaseClass=false
# pyright: reportUnknownLambdaType=false
# pyright: reportUnknownMemberType=false
# pyright: reportUnknownVariableType=false
# pyright: reportUnknownArgumentType=false
# pyright: reportUnknownParameterType=false


import json
import logging
from itertools import chain
from typing import (
Iterable,
List,
Tuple, Optional,
)

import ops
from interfaces.grafana_datasource_exchange.v0.schema import (
GrafanaDatasource,
GrafanaSourceAppData,
)
from ops import CharmBase
from typing_extensions import TypedDict

import cosl.interfaces.utils
from cosl.interfaces.utils import DataValidationError

log = logging.getLogger("_cluster")
PietroPasotti marked this conversation as resolved.
Show resolved Hide resolved

DEFAULT_PROVIDE_ENDPOINT_NAME = "provide-ds-exchange"
DEFAULT_REQUIRE_ENDPOINT_NAME = "require-ds-exchange"
DS_EXCHANGE_INTERFACE_NAME = "grafana_datasource_exchange"


class DSExchangeAppData(cosl.interfaces.utils.DatabagModelV2, GrafanaSourceAppData):
"""App databag schema for both sides of this interface."""


class DatasourceDict(TypedDict):
"""Raw datasource information."""

type: str
uid: str


class EndpointValidationError(ValueError):
"""Raised if an endpoint name is invalid."""


def _validate_endpoints(charm: CharmBase, provider_endpoint: str, requirer_endpoint: str):
meta = charm.meta
for endpoint, source in (
(provider_endpoint, meta.provides),
(requirer_endpoint, meta.requires),
):
if endpoint not in source:
raise EndpointValidationError(f"endpoint {endpoint!r} not declared in charm metadata")
interface_name = source[endpoint].interface_name
if interface_name != DS_EXCHANGE_INTERFACE_NAME:
raise EndpointValidationError(
f"endpoint {endpoint} has unexpected interface {interface_name!r} "
f"(should be {DS_EXCHANGE_INTERFACE_NAME})."
)


class DatasourceExchange:
"""``grafana_datasource_exchange`` interface endpoint wrapper (provider AND requirer)."""

def __init__(
self,
charm: ops.CharmBase,
*,
provider_endpoint: Optional[str] = None,
requirer_endpoint: Optional[str] = None,
):
self._charm = charm
provider_endpoint = provider_endpoint or DEFAULT_PROVIDE_ENDPOINT_NAME
requirer_endpoint = requirer_endpoint or DEFAULT_REQUIRE_ENDPOINT_NAME

_validate_endpoints(charm, provider_endpoint, requirer_endpoint)

# gather all relations, provider or requirer
all_relations = chain(
charm.model.relations[provider_endpoint], charm.model.relations[requirer_endpoint]
)

# filter out some common unhappy relation states
self._relations: List[ops.Relation] = [
rel for rel in all_relations if (rel.app and rel.data)
]

def submit(self, raw_datasources: Iterable[DatasourceDict]):
PietroPasotti marked this conversation as resolved.
Show resolved Hide resolved
"""Submit these datasources to all remotes.

This operation is leader-only.
"""
# sort by UID to prevent endless relation-changed cascades if this keeps flapping
encoded_datasources = json.dumps(sorted(raw_datasources, key=lambda raw_ds: raw_ds["uid"]))
PietroPasotti marked this conversation as resolved.
Show resolved Hide resolved
app_data = DSExchangeAppData(
datasources=encoded_datasources # type: ignore[reportCallIssue]
)

for relation in self._relations:
app_data.dump(relation.data[self._charm.app])

@property
def received_datasources(self) -> Tuple[GrafanaDatasource, ...]:
"""Collect all datasources that the remotes have shared.

This operation is leader-only.
"""
datasources: List[GrafanaDatasource] = []

for relation in self._relations:
try:
datasource = DSExchangeAppData.load(relation.data[relation.app])
except DataValidationError:
# load() already logs something in this case
continue

datasources.extend(datasource.datasources)
return tuple(sorted(datasources, key=lambda ds: ds.uid))
42 changes: 42 additions & 0 deletions src/cosl/interfaces/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,45 @@ def dump(self, databag: Optional[_RawDatabag] = None, clear: bool = True) -> _Ra
dct = self.model_dump(mode="json", by_alias=True, exclude_defaults=True) # type: ignore
_databag.update({k: json.dumps(v) for k, v in dct.items()})
return _databag


# FIXME: in pydantic v2, the json stuff we've been doing is no longer necessary.
# It becomes much easier to work with Json fields and the databagmodel class becomes much simpler.
# We should rewrite the cluster implementation to use this class,
# and replace the original DatabagModel with it
class DatabagModelV2(pydantic.BaseModel):
"""Base databag model."""

model_config = ConfigDict(
# tolerate additional keys in databag
extra="ignore",
# Allow instantiating this class by field name (instead of forcing alias).
populate_by_name=True,
) # type: ignore
"""Pydantic config."""

@classmethod
def load(cls, databag: _RawDatabag):
"""Load this model from a Juju databag."""
try:
return cls.model_validate_json(json.dumps(dict(databag))) # type: ignore
except pydantic.ValidationError as e:
msg = f"failed to validate databag: {databag}"
if databag:
log.debug(msg, exc_info=True)
raise DataValidationError(msg) from e

def dump(self, databag: Optional[_RawDatabag] = None, clear: bool = True) -> _RawDatabag:
"""Write the contents of this model to Juju databag.
:param databag: the databag to write the data to.
:param clear: ensure the databag is cleared before writing it.
"""
_databag: _RawDatabag = {} if databag is None else databag

if clear:
_databag.clear()

dct = self.model_dump(mode="json", by_alias=True, exclude_defaults=True, round_trip=True) # type: ignore
_databag.update(dct)
return _databag
Loading
Loading