diff --git a/lib/charms/tempo_k8s/v1/charm_tracing.py b/lib/charms/tempo_k8s/v1/charm_tracing.py new file mode 100644 index 0000000..ddd8551 --- /dev/null +++ b/lib/charms/tempo_k8s/v1/charm_tracing.py @@ -0,0 +1,517 @@ +#!/usr/bin/env python3 +# Copyright 2022 Canonical Ltd. +# See LICENSE file for licensing details. + +"""This charm library contains utilities to instrument your Charm with opentelemetry tracing data collection. + +(yes! charm code, not workload code!) + +This means that, if your charm is related to, for example, COS' Tempo charm, you will be able to inspect +in real time from the Grafana dashboard the execution flow of your charm. + +To start using this library, you need to do two things: +1) decorate your charm class with + +`@trace_charm(tracing_endpoint="my_tracing_endpoint")` + +2) add to your charm a "my_tracing_endpoint" (you can name this attribute whatever you like) **property** +that returns an otlp http/https endpoint url. If you are using the `TracingEndpointProvider` as +`self.tracing = TracingEndpointProvider(self)`, the implementation could be: + +``` + @property + def my_tracing_endpoint(self) -> Optional[str]: + '''Tempo endpoint for charm tracing''' + if self.tracing.is_ready(): + return self.tracing.otlp_http_endpoint() + else: + return None +``` + +At this point your charm will be automatically instrumented so that: +- charm execution starts a trace, containing + - every event as a span (including custom events) + - every charm method call (except dunders) as a span + +if you wish to add more fine-grained information to the trace, you can do so by getting a hold of the tracer like so: +``` +import opentelemetry +... + @property + def tracer(self) -> opentelemetry.trace.Tracer: + return opentelemetry.trace.get_tracer(type(self).__name__) +``` + +By default, the tracer is named after the charm type. If you wish to override that, you can pass +a different `service_name` argument to `trace_charm`. + +*Upgrading from `v0`:* + +If you are upgrading from `charm_tracing` v0, you need to do the following steps (assuming you already +have the newest version of the library in your charm): +1) If you have a dependency on `opentelemetry-exporter-otlp-proto-grpc` in your project for charm tracing, replace +it with `opentelemetry-exporter-otlp-proto-http>=1.21.0`. +2) Update your `tracing_endpoint` property to point at `self.tracing.otlp_http_endpoint()` +3) If you were passing a certificate using `server_cert`, you need to change it to provide an *absolute* path to +the certificate file. +""" + +import functools +import inspect +import logging +import os +from contextlib import contextmanager +from contextvars import Context, ContextVar, copy_context +from pathlib import Path +from typing import ( + Any, + Callable, + Generator, + Optional, + Sequence, + Type, + TypeVar, + Union, + cast, +) + +import opentelemetry +from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.trace import Span, TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor +from opentelemetry.trace import INVALID_SPAN, Tracer +from opentelemetry.trace import get_current_span as otlp_get_current_span +from opentelemetry.trace import ( + get_tracer, + get_tracer_provider, + set_span_in_context, + set_tracer_provider, +) +from ops.charm import CharmBase +from ops.framework import Framework + +# The unique Charmhub library identifier, never change it +LIBID = "cb1705dcd1a14ca09b2e60187d1215c7" + +# Increment this major API version when introducing breaking changes +LIBAPI = 1 + +# Increment this PATCH version before using `charmcraft publish-lib` or reset +# to 0 if you are raising the major API version + +LIBPATCH = 0 + +PYDEPS = ["opentelemetry-exporter-otlp-proto-http>=1.21.0"] + +logger = logging.getLogger("tracing") + +tracer: ContextVar[Tracer] = ContextVar("tracer") +_GetterType = Union[Callable[[CharmBase], Optional[str]], property] + +CHARM_TRACING_ENABLED = "CHARM_TRACING_ENABLED" + + +def is_enabled() -> bool: + """Whether charm tracing is enabled.""" + return os.getenv(CHARM_TRACING_ENABLED, "1") == "1" + + +@contextmanager +def charm_tracing_disabled(): + """Contextmanager to temporarily disable charm tracing. + + For usage in tests. + """ + previous = os.getenv(CHARM_TRACING_ENABLED, "1") + os.environ[CHARM_TRACING_ENABLED] = "0" + yield + os.environ[CHARM_TRACING_ENABLED] = previous + + +def get_current_span() -> Union[Span, None]: + """Return the currently active Span, if there is one, else None. + + If you'd rather keep your logic unconditional, you can use opentelemetry.trace.get_current_span, + which will return an object that behaves like a span but records no data. + """ + span = otlp_get_current_span() + if span is INVALID_SPAN: + return None + return cast(Span, span) + + +def _get_tracer_from_context(ctx: Context) -> Optional[ContextVar]: + tracers = [v for v in ctx if v is not None and v.name == "tracer"] + if tracers: + return tracers[0] + return None + + +def _get_tracer() -> Optional[Tracer]: + """Find tracer in context variable and as a fallback locate it in the full context.""" + try: + return tracer.get() + except LookupError: + try: + logger.debug("tracer was not found in context variable, looking up in default context") + ctx: Context = copy_context() + if context_tracer := _get_tracer_from_context(ctx): + return context_tracer.get() + else: + logger.warning("Couldn't find context var for tracer: span will be skipped") + return None + except LookupError as err: + logger.warning(f"Couldn't find tracer: span will be skipped, err: {err}") + return None + + +@contextmanager +def _span(name: str) -> Generator[Optional[Span], Any, Any]: + """Context to create a span if there is a tracer, otherwise do nothing.""" + if tracer := _get_tracer(): + with tracer.start_as_current_span(name) as span: + yield cast(Span, span) + else: + logger.debug("tracer not found") + yield None + + +_C = TypeVar("_C", bound=Type[CharmBase]) +_T = TypeVar("_T", bound=type) +_F = TypeVar("_F", bound=Type[Callable]) + + +class TracingError(RuntimeError): + """Base class for errors raised by this module.""" + + +class UntraceableObjectError(TracingError): + """Raised when an object you're attempting to instrument cannot be autoinstrumented.""" + + +def _get_tracing_endpoint(tracing_endpoint_getter, self, charm): + if isinstance(tracing_endpoint_getter, property): + tracing_endpoint = tracing_endpoint_getter.__get__(self) + else: # method or callable + tracing_endpoint = tracing_endpoint_getter(self) + + if tracing_endpoint is None: + logger.warning( + f"{charm}.{getattr(tracing_endpoint_getter, '__qualname__', str(tracing_endpoint_getter))} " + f"returned None; continuing with tracing DISABLED." + ) + return + elif not isinstance(tracing_endpoint, str): + raise TypeError( + f"{charm}.{tracing_endpoint_getter} should return a tempo endpoint (string); " + f"got {tracing_endpoint} instead." + ) + else: + logger.debug(f"Setting up span exporter to endpoint: {tracing_endpoint}/v1/traces") + return f"{tracing_endpoint}/v1/traces" + + +def _get_server_cert(server_cert_getter, self, charm): + if isinstance(server_cert_getter, property): + server_cert = server_cert_getter.__get__(self) + else: # method or callable + server_cert = server_cert_getter(self) + + if server_cert is None: + logger.warning( + f"{charm}.{server_cert_getter} returned None; continuing with INSECURE connection." + ) + return + elif not Path(server_cert).is_absolute(): + raise ValueError( + f"{charm}.{server_cert_getter} should return a valid tls cert absolute path (string | Path)); " + f"got {server_cert} instead." + ) + logger.debug("Certificate successfully retrieved.") # todo: some more validation? + return server_cert + + +def _setup_root_span_initializer( + charm: Type[CharmBase], + tracing_endpoint_getter: _GetterType, + server_cert_getter: Optional[_GetterType], + service_name: Optional[str] = None, +): + """Patch the charm's initializer.""" + original_init = charm.__init__ + + @functools.wraps(original_init) + def wrap_init(self: CharmBase, framework: Framework, *args, **kwargs): + original_init(self, framework, *args, **kwargs) + if not is_enabled(): + logger.info("Tracing DISABLED: skipping root span initialization") + return + + # already init some attrs that will be reinited later by calling original_init: + # self.framework = framework + # self.handle = Handle(None, self.handle_kind, None) + + original_event_context = framework._event_context + + logging.debug("Initializing opentelemetry tracer...") + _service_name = service_name or self.app.name + + resource = Resource.create( + attributes={ + "service.name": _service_name, + "compose_service": _service_name, + "charm_type": type(self).__name__, + # juju topology + "juju_unit": self.unit.name, + "juju_application": self.app.name, + "juju_model": self.model.name, + "juju_model_uuid": self.model.uuid, + } + ) + provider = TracerProvider(resource=resource) + tracing_endpoint = _get_tracing_endpoint(tracing_endpoint_getter, self, charm) + if not tracing_endpoint: + return + + server_cert: Optional[Union[str, Path]] = ( + _get_server_cert(server_cert_getter, self, charm) if server_cert_getter else None + ) + + exporter = OTLPSpanExporter( + endpoint=tracing_endpoint, + certificate_file=str(Path(server_cert).absolute()) if server_cert else None, + timeout=2, + ) + + processor = BatchSpanProcessor(exporter) + provider.add_span_processor(processor) + set_tracer_provider(provider) + _tracer = get_tracer(_service_name) # type: ignore + _tracer_token = tracer.set(_tracer) + + dispatch_path = os.getenv("JUJU_DISPATCH_PATH", "") + + # all these shenanigans are to work around the fact that the opentelemetry tracing API is built + # on the assumption that spans will be used as contextmanagers. + # Since we don't (as we need to close the span on framework.commit), + # we need to manually set the root span as current. + span = _tracer.start_span("charm exec", attributes={"juju.dispatch_path": dispatch_path}) + ctx = set_span_in_context(span) + + # log a trace id so we can look it up in tempo. + root_trace_id = hex(span.get_span_context().trace_id)[2:] # strip 0x prefix + logger.debug(f"Starting root trace with id={root_trace_id!r}.") + + span_token = opentelemetry.context.attach(ctx) # type: ignore + + @contextmanager + def wrap_event_context(event_name: str): + # when the framework enters an event context, we create a span. + with _span("event: " + event_name) as event_context_span: + if event_context_span: + # todo: figure out how to inject event attrs in here + event_context_span.add_event(event_name) + yield original_event_context(event_name) + + framework._event_context = wrap_event_context # type: ignore + + original_close = framework.close + + @functools.wraps(original_close) + def wrap_close(): + span.end() + opentelemetry.context.detach(span_token) # type: ignore + tracer.reset(_tracer_token) + tp = cast(TracerProvider, get_tracer_provider()) + tp.force_flush(timeout_millis=1000) # don't block for too long + tp.shutdown() + original_close() + + framework.close = wrap_close + return + + charm.__init__ = wrap_init + + +def trace_charm( + tracing_endpoint: str, + server_cert: Optional[str] = None, + service_name: Optional[str] = None, + extra_types: Sequence[type] = (), +): + """Autoinstrument the decorated charm with tracing telemetry. + + Use this function to get out-of-the-box traces for all events emitted on this charm and all + method calls on instances of this class. + + Usage: + >>> from charms.tempo_k8s.v1.charm_tracing import trace_charm + >>> from charms.tempo_k8s.v1.tracing import TracingEndpointProvider + >>> from ops import CharmBase + >>> + >>> @trace_charm( + >>> tracing_endpoint="tempo_otlp_http_endpoint", + >>> ) + >>> class MyCharm(CharmBase): + >>> + >>> def __init__(self, framework: Framework): + >>> ... + >>> self.tracing = TracingEndpointProvider(self) + >>> + >>> @property + >>> def tempo_otlp_http_endpoint(self) -> Optional[str]: + >>> if self.tracing.is_ready(): + >>> return self.tracing.otlp_http_endpoint() + >>> else: + >>> return None + >>> + :param server_cert: method or property on the charm type that returns an + optional absolute path to a tls certificate to be used when sending traces to a remote server. + If it returns None, an _insecure_ connection will be used. + :param tracing_endpoint: name of a property on the charm type that returns an + optional (fully resolvable) tempo url. If None, tracing will be effectively disabled. Else, traces will be + pushed to that endpoint. + :param service_name: service name tag to attach to all traces generated by this charm. + Defaults to the juju application name this charm is deployed under. + :param extra_types: pass any number of types that you also wish to autoinstrument. + For example, charm libs, relation endpoint wrappers, workload abstractions, ... + """ + + def _decorator(charm_type: Type[CharmBase]): + """Autoinstrument the wrapped charmbase type.""" + _autoinstrument( + charm_type, + tracing_endpoint_getter=getattr(charm_type, tracing_endpoint), + server_cert_getter=getattr(charm_type, server_cert) if server_cert else None, + service_name=service_name, + extra_types=extra_types, + ) + return charm_type + + return _decorator + + +def _autoinstrument( + charm_type: Type[CharmBase], + tracing_endpoint_getter: _GetterType, + server_cert_getter: Optional[_GetterType] = None, + service_name: Optional[str] = None, + extra_types: Sequence[type] = (), +) -> Type[CharmBase]: + """Set up tracing on this charm class. + + Use this function to get out-of-the-box traces for all events emitted on this charm and all + method calls on instances of this class. + + Usage: + + >>> from charms.tempo_k8s.v1.charm_tracing import _autoinstrument + >>> from ops.main import main + >>> _autoinstrument( + >>> MyCharm, + >>> tracing_endpoint_getter=MyCharm.tempo_otlp_http_endpoint, + >>> service_name="MyCharm", + >>> extra_types=(Foo, Bar) + >>> ) + >>> main(MyCharm) + + :param charm_type: the CharmBase subclass to autoinstrument. + :param server_cert_getter: method or property on the charm type that returns an + optional absolute path to a tls certificate to be used when sending traces to a remote server. + This needs to be a valid path to a certificate. + :param tracing_endpoint_getter: method or property on the charm type that returns an + optional tempo url. If None, tracing will be effectively disabled. Else, traces will be + pushed to that endpoint. + :param service_name: service name tag to attach to all traces generated by this charm. + Defaults to the juju application name this charm is deployed under. + :param extra_types: pass any number of types that you also wish to autoinstrument. + For example, charm libs, relation endpoint wrappers, workload abstractions, ... + """ + logger.info(f"instrumenting {charm_type}") + _setup_root_span_initializer( + charm_type, + tracing_endpoint_getter, + server_cert_getter=server_cert_getter, + service_name=service_name, + ) + trace_type(charm_type) + for type_ in extra_types: + trace_type(type_) + + return charm_type + + +def trace_type(cls: _T) -> _T: + """Set up tracing on this class. + + Use this decorator to get out-of-the-box traces for all method calls on instances of this class. + It assumes that this class is only instantiated after a charm type decorated with `@trace_charm` + has been instantiated. + """ + logger.info(f"instrumenting {cls}") + for name, method in inspect.getmembers(cls, predicate=inspect.isfunction): + logger.info(f"discovered {method}") + + if method.__name__.startswith("__"): + logger.info(f"skipping {method} (dunder)") + continue + + isstatic = isinstance(inspect.getattr_static(cls, method.__name__), staticmethod) + setattr(cls, name, trace_method(method, static=isstatic)) + + return cls + + +def trace_method(method: _F, static: bool = False) -> _F: + """Trace this method. + + A span will be opened when this method is called and closed when it returns. + """ + return _trace_callable(method, "method", static=static) + + +def trace_function(function: _F) -> _F: + """Trace this function. + + A span will be opened when this function is called and closed when it returns. + """ + return _trace_callable(function, "function") + + +def _trace_callable(callable: _F, qualifier: str, static: bool = False) -> _F: + logger.info(f"instrumenting {callable}") + + # sig = inspect.signature(callable) + @functools.wraps(callable) + def wrapped_function(*args, **kwargs): # type: ignore + name = getattr(callable, "__qualname__", getattr(callable, "__name__", str(callable))) + with _span(f"{'(static) ' if static else ''}{qualifier} call: {name}"): # type: ignore + if static: + return callable(*args[1:], **kwargs) # type: ignore + return callable(*args, **kwargs) # type: ignore + + # wrapped_function.__signature__ = sig + return wrapped_function # type: ignore + + +def trace(obj: Union[Type, Callable]): + """Trace this object and send the resulting spans to Tempo. + + It will dispatch to ``trace_type`` if the decorated object is a class, otherwise + ``trace_function``. + """ + if isinstance(obj, type): + if issubclass(obj, CharmBase): + raise ValueError( + "cannot use @trace on CharmBase subclasses: use @trace_charm instead " + "(we need some arguments!)" + ) + return trace_type(obj) + else: + try: + return trace_function(obj) + except Exception: + raise UntraceableObjectError( + f"cannot create span from {type(obj)}; instrument {obj} manually." + ) diff --git a/lib/charms/tempo_k8s/v1/tracing.py b/lib/charms/tempo_k8s/v1/tracing.py new file mode 100644 index 0000000..79ddebf --- /dev/null +++ b/lib/charms/tempo_k8s/v1/tracing.py @@ -0,0 +1,611 @@ +# Copyright 2022 Pietro Pasotti +# See LICENSE file for licensing details. +"""## Overview. + +This document explains how to integrate with the Tempo charm for the purpose of pushing traces to a +tracing endpoint provided by Tempo. It also explains how alternative implementations of the Tempo charm +may maintain the same interface and be backward compatible with all currently integrated charms. + +## Requirer Library Usage + +Charms seeking to push traces to Tempo, must do so using the `TracingEndpointRequirer` +object from this charm library. For the simplest use cases, using the `TracingEndpointRequirer` +object only requires instantiating it, typically in the constructor of your charm. The +`TracingEndpointRequirer` constructor requires the name of the relation over which a tracing endpoint + is exposed by the Tempo charm. This relation must use the +`tracing` interface. + The `TracingEndpointRequirer` object may be instantiated as follows + + from charms.tempo_k8s.v1.tracing import TracingEndpointRequirer + + def __init__(self, *args): + super().__init__(*args) + # ... + self.tracing = TracingEndpointRequirer(self) + # ... + +Note that the first argument (`self`) to `TracingEndpointRequirer` is always a reference to the +parent charm. + +Units of provider charms obtain the tempo endpoint to which they will push their traces by using one +of these `TracingEndpointRequirer` attributes, depending on which protocol they support: +- otlp_grpc_endpoint +- otlp_http_endpoint +- zipkin_endpoint +- tempo_endpoint + +## Requirer Library Usage + +The `TracingEndpointProvider` object may be used by charms to manage relations with their +trace sources. For this purposes a Tempo-like charm needs to do two things + +1. Instantiate the `TracingEndpointProvider` object by providing it a +reference to the parent (Tempo) charm and optionally the name of the relation that the Tempo charm +uses to interact with its trace sources. This relation must conform to the `tracing` interface +and it is strongly recommended that this relation be named `tracing` which is its +default value. + +For example a Tempo charm may instantiate the `TracingEndpointProvider` in its constructor as +follows + + from charms.tempo_k8s.v1.tracing import TracingEndpointProvider + + def __init__(self, *args): + super().__init__(*args) + # ... + self.tracing = TracingEndpointProvider(self) + # ... + + + +""" # noqa: W505 +import json +import logging +from typing import ( + TYPE_CHECKING, + Any, + Dict, + List, + Literal, + MutableMapping, + Optional, + Tuple, + cast, +) + +import pydantic +from ops.charm import ( + CharmBase, + CharmEvents, + RelationBrokenEvent, + RelationEvent, + RelationRole, +) +from ops.framework import EventSource, Object +from ops.model import ModelError, Relation +from pydantic import BaseModel, ConfigDict + +# The unique Charmhub library identifier, never change it +LIBID = "12977e9aa0b34367903d8afeb8c3d85d" + +# Increment this major API version when introducing breaking changes +LIBAPI = 1 + +# Increment this PATCH version before using `charmcraft publish-lib` or reset +# to 0 if you are raising the major API version +LIBPATCH = 2 + +PYDEPS = ["pydantic>=2"] + +logger = logging.getLogger(__name__) + +DEFAULT_RELATION_NAME = "tracing" +RELATION_INTERFACE_NAME = "tracing" + +IngesterProtocol = Literal[ + "otlp_grpc", "otlp_http", "zipkin", "tempo", "jaeger_http_thrift", "jaeger_grpc" +] + +RawIngester = Tuple[IngesterProtocol, int] +BUILTIN_JUJU_KEYS = {"ingress-address", "private-address", "egress-subnets"} + + +class TracingError(RuntimeError): + """Base class for custom errors raised by this library.""" + + +class DataValidationError(TracingError): + """Raised when data validation fails on IPU relation data.""" + + +class AmbiguousRelationUsageError(TracingError): + """Raised when one wrongly assumes that there can only be one relation on an endpoint.""" + + +class DatabagModel(BaseModel): + """Base databag model.""" + + model_config = ConfigDict( + # Allow instantiating this class by field name (instead of forcing alias). + populate_by_name=True, + # Custom config key: whether to nest the whole datastructure (as json) + # under a field or spread it out at the toplevel. + _NEST_UNDER=None, + ) # type: ignore + """Pydantic config.""" + + @classmethod + def load(cls, databag: MutableMapping): + """Load this model from a Juju databag.""" + nest_under = cls.model_config.get("_NEST_UNDER") + if nest_under: + return cls.parse_obj(json.loads(databag[nest_under])) + + try: + data = {k: json.loads(v) for k, v in databag.items() if k not in BUILTIN_JUJU_KEYS} + except json.JSONDecodeError as e: + msg = f"invalid databag contents: expecting json. {databag}" + logger.error(msg) + raise DataValidationError(msg) from e + + try: + return cls.parse_raw(json.dumps(data)) # type: ignore + except pydantic.ValidationError as e: + msg = f"failed to validate databag: {databag}" + logger.error(msg, exc_info=True) + raise DataValidationError(msg) from e + + def dump(self, databag: Optional[MutableMapping] = None, clear: bool = True): + """Write the contents of this model to Juju databag. + + :param databag: the databag to write the data to. + :param clear: ensure the databag is cleared before writing it. + """ + if clear and databag: + databag.clear() + + if databag is None: + databag = {} + nest_under = self.model_config.get("_NEST_UNDER") + if nest_under: + databag[nest_under] = self.json() + + dct = self.model_dump() + for key, field in self.model_fields.items(): # type: ignore + value = dct[key] + databag[field.alias or key] = json.dumps(value) + + return databag + + +# todo use models from charm-relation-interfaces +class Ingester(BaseModel): # noqa: D101 + """Ingester data structure.""" + + protocol: IngesterProtocol + port: int + + +class TracingProviderAppData(DatabagModel): # noqa: D101 + """Application databag model for the tracing provider.""" + + host: str + ingesters: List[Ingester] + + +class _AutoSnapshotEvent(RelationEvent): + __args__ = () # type: Tuple[str, ...] + __optional_kwargs__ = {} # type: Dict[str, Any] + + @classmethod + def __attrs__(cls): + return cls.__args__ + tuple(cls.__optional_kwargs__.keys()) + + def __init__(self, handle, relation, *args, **kwargs): + super().__init__(handle, relation) + + if not len(self.__args__) == len(args): + raise TypeError("expected {} args, got {}".format(len(self.__args__), len(args))) + + for attr, obj in zip(self.__args__, args): + setattr(self, attr, obj) + for attr, default in self.__optional_kwargs__.items(): + obj = kwargs.get(attr, default) + setattr(self, attr, obj) + + def snapshot(self) -> dict: + dct = super().snapshot() + for attr in self.__attrs__(): + obj = getattr(self, attr) + try: + dct[attr] = obj + except ValueError as e: + raise ValueError( + "cannot automagically serialize {}: " + "override this method and do it " + "manually.".format(obj) + ) from e + + return dct + + def restore(self, snapshot: dict) -> None: + super().restore(snapshot) + for attr, obj in snapshot.items(): + setattr(self, attr, obj) + + +class RelationNotFoundError(Exception): + """Raised if no relation with the given name is found.""" + + def __init__(self, relation_name: str): + self.relation_name = relation_name + self.message = "No relation named '{}' found".format(relation_name) + super().__init__(self.message) + + +class RelationInterfaceMismatchError(Exception): + """Raised if the relation with the given name has an unexpected interface.""" + + def __init__( + self, + relation_name: str, + expected_relation_interface: str, + actual_relation_interface: str, + ): + self.relation_name = relation_name + self.expected_relation_interface = expected_relation_interface + self.actual_relation_interface = actual_relation_interface + self.message = ( + "The '{}' relation has '{}' as interface rather than the expected '{}'".format( + relation_name, actual_relation_interface, expected_relation_interface + ) + ) + + super().__init__(self.message) + + +class RelationRoleMismatchError(Exception): + """Raised if the relation with the given name has a different role than expected.""" + + def __init__( + self, + relation_name: str, + expected_relation_role: RelationRole, + actual_relation_role: RelationRole, + ): + self.relation_name = relation_name + self.expected_relation_interface = expected_relation_role + self.actual_relation_role = actual_relation_role + self.message = "The '{}' relation has role '{}' rather than the expected '{}'".format( + relation_name, repr(actual_relation_role), repr(expected_relation_role) + ) + + super().__init__(self.message) + + +def _validate_relation_by_interface_and_direction( + charm: CharmBase, + relation_name: str, + expected_relation_interface: str, + expected_relation_role: RelationRole, +): + """Validate a relation. + + Verifies that the `relation_name` provided: (1) exists in metadata.yaml, + (2) declares as interface the interface name passed as `relation_interface` + and (3) has the right "direction", i.e., it is a relation that `charm` + provides or requires. + + Args: + charm: a `CharmBase` object to scan for the matching relation. + relation_name: the name of the relation to be verified. + expected_relation_interface: the interface name to be matched by the + relation named `relation_name`. + expected_relation_role: whether the `relation_name` must be either + provided or required by `charm`. + + Raises: + RelationNotFoundError: If there is no relation in the charm's metadata.yaml + with the same name as provided via `relation_name` argument. + RelationInterfaceMismatchError: The relation with the same name as provided + via `relation_name` argument does not have the same relation interface + as specified via the `expected_relation_interface` argument. + RelationRoleMismatchError: If the relation with the same name as provided + via `relation_name` argument does not have the same role as specified + via the `expected_relation_role` argument. + """ + if relation_name not in charm.meta.relations: + raise RelationNotFoundError(relation_name) + + relation = charm.meta.relations[relation_name] + + # fixme: why do we need to cast here? + actual_relation_interface = cast(str, relation.interface_name) + + if actual_relation_interface != expected_relation_interface: + raise RelationInterfaceMismatchError( + relation_name, expected_relation_interface, actual_relation_interface + ) + + if expected_relation_role is RelationRole.provides: + if relation_name not in charm.meta.provides: + raise RelationRoleMismatchError( + relation_name, RelationRole.provides, RelationRole.requires + ) + elif expected_relation_role is RelationRole.requires: + if relation_name not in charm.meta.requires: + raise RelationRoleMismatchError( + relation_name, RelationRole.requires, RelationRole.provides + ) + else: + raise TypeError("Unexpected RelationDirection: {}".format(expected_relation_role)) + + +class TracingEndpointProvider(Object): + """Class representing a trace ingester service.""" + + def __init__( + self, + charm: CharmBase, + host: str, + ingesters: List[RawIngester], + relation_name: str = DEFAULT_RELATION_NAME, + ): + """Initialize. + + Args: + charm: a `CharmBase` instance that manages this instance of the Tempo service. + relation_name: an optional string name of the relation between `charm` + and the Tempo charmed service. The default is "tracing". + + Raises: + RelationNotFoundError: If there is no relation in the charm's metadata.yaml + with the same name as provided via `relation_name` argument. + RelationInterfaceMismatchError: The relation with the same name as provided + via `relation_name` argument does not have the `tracing` relation + interface. + RelationRoleMismatchError: If the relation with the same name as provided + via `relation_name` argument does not have the `RelationRole.requires` + role. + """ + _validate_relation_by_interface_and_direction( + charm, relation_name, RELATION_INTERFACE_NAME, RelationRole.provides + ) + + super().__init__(charm, relation_name) + self._charm = charm + self._host = host + self._ingesters = ingesters + self._relation_name = relation_name + events = self._charm.on[relation_name] + self.framework.observe(events.relation_created, self._on_relation_event) + self.framework.observe(events.relation_joined, self._on_relation_event) + + def _on_relation_event(self, _): + # Generic relation event handler. + + try: + if self._charm.unit.is_leader(): + for relation in self._charm.model.relations[self._relation_name]: + TracingProviderAppData( + host=self._host, + ingesters=[ + Ingester(port=port, protocol=protocol) + for protocol, port in self._ingesters + ], + ).dump(relation.data[self._charm.app]) + + except ModelError as e: + # args are bytes + msg = e.args[0] + if isinstance(msg, bytes): + if msg.startswith( + b"ERROR cannot read relation application settings: permission denied" + ): + logger.error( + f"encountered error {e} while attempting to update_relation_data." + f"The relation must be gone." + ) + return + raise + + +class EndpointRemovedEvent(RelationBrokenEvent): + """Event representing a change in one of the ingester endpoints.""" + + +class EndpointChangedEvent(_AutoSnapshotEvent): + """Event representing a change in one of the ingester endpoints.""" + + __args__ = ("host", "_ingesters") + + if TYPE_CHECKING: + host = "" # type: str + _ingesters = [] # type: List[dict] + + @property + def ingesters(self) -> List[Ingester]: + """Cast ingesters back from dict.""" + return [Ingester(**i) for i in self._ingesters] + + +class TracingEndpointEvents(CharmEvents): + """TracingEndpointRequirer events.""" + + endpoint_changed = EventSource(EndpointChangedEvent) + endpoint_removed = EventSource(EndpointRemovedEvent) + + +class TracingEndpointRequirer(Object): + """A tracing endpoint for Tempo.""" + + on = TracingEndpointEvents() # type: ignore + + def __init__( + self, + charm: CharmBase, + relation_name: str = DEFAULT_RELATION_NAME, + ): + """Construct a tracing requirer for a Tempo charm. + + If your application supports pushing traces to a distributed tracing backend, the + `TracingEndpointRequirer` object enables your charm to easily access endpoint information + exchanged over a `tracing` relation interface. + + Args: + charm: a `CharmBase` object that manages this + `TracingEndpointRequirer` object. Typically, this is `self` in the instantiating + class. + relation_name: an optional string name of the relation between `charm` + and the Tempo charmed service. The default is "tracing". It is strongly + advised not to change the default, so that people deploying your charm will have a + consistent experience with all other charms that provide tracing endpoints. + + Raises: + RelationNotFoundError: If there is no relation in the charm's metadata.yaml + with the same name as provided via `relation_name` argument. + RelationInterfaceMismatchError: The relation with the same name as provided + via `relation_name` argument does not have the `tracing` relation + interface. + RelationRoleMismatchError: If the relation with the same name as provided + via `relation_name` argument does not have the `RelationRole.provides` + role. + """ + _validate_relation_by_interface_and_direction( + charm, relation_name, RELATION_INTERFACE_NAME, RelationRole.requires + ) + + super().__init__(charm, relation_name) + + self._is_single_endpoint = charm.meta.relations[relation_name].limit == 1 + + self._charm = charm + self._relation_name = relation_name + + events = self._charm.on[self._relation_name] + self.framework.observe(events.relation_changed, self._on_tracing_relation_changed) + self.framework.observe(events.relation_broken, self._on_tracing_relation_broken) + + @property + def relations(self) -> List[Relation]: + """The tracing relations associated with this endpoint.""" + return self._charm.model.relations[self._relation_name] + + @property + def _relation(self) -> Optional[Relation]: + """If this wraps a single endpoint, the relation bound to it, if any.""" + if not self._is_single_endpoint: + objname = type(self).__name__ + raise AmbiguousRelationUsageError( + f"This {objname} wraps a {self._relation_name} endpoint that has " + "limit != 1. We can't determine what relation, of the possibly many, you are " + f"talking about. Please pass a relation instance while calling {objname}, " + "or set limit=1 in the charm metadata." + ) + relations = self.relations + return relations[0] if relations else None + + def is_ready(self, relation: Optional[Relation] = None): + """Is this endpoint ready?""" + relation = relation or self._relation + if not relation: + logger.debug(f"no relation on {self._relation_name !r}: tracing not ready") + return False + if relation.data is None: + logger.error(f"relation data is None for {relation}") + return False + if not relation.app: + logger.error(f"{relation} event received but there is no relation.app") + return False + try: + TracingProviderAppData.load(relation.data[relation.app]) + except (json.JSONDecodeError, pydantic.ValidationError, DataValidationError): + logger.info(f"failed validating relation data for {relation}") + return False + return True + + def _on_tracing_relation_changed(self, event): + """Notify the providers that there is new endpoint information available.""" + relation = event.relation + if not self.is_ready(relation): + self.on.endpoint_removed.emit(relation) # type: ignore + return + + data = TracingProviderAppData.load(relation.data[relation.app]) + self.on.endpoint_changed.emit(relation, data.host, [i.dict() for i in data.ingesters]) # type: ignore + + def _on_tracing_relation_broken(self, event: RelationBrokenEvent): + """Notify the providers that the endpoint is broken.""" + relation = event.relation + self.on.endpoint_removed.emit(relation) # type: ignore + + def get_all_endpoints( + self, relation: Optional[Relation] = None + ) -> Optional[TracingProviderAppData]: + """Unmarshalled relation data.""" + if not self.is_ready(relation or self._relation): + return + return TracingProviderAppData.load(relation.data[relation.app]) # type: ignore + + def _get_ingester( + self, relation: Optional[Relation], protocol: IngesterProtocol, ssl: bool = False + ): + ep = self.get_all_endpoints(relation) + if not ep: + return None + try: + ingester: Ingester = next(filter(lambda i: i.protocol == protocol, ep.ingesters)) + if ingester.protocol in ["otlp_grpc", "jaeger_grpc"]: + if ssl: + logger.warning("unused ssl argument - was the right protocol called?") + return f"{ep.host}:{ingester.port}" + if ssl: + return f"https://{ep.host}:{ingester.port}" + return f"http://{ep.host}:{ingester.port}" + except StopIteration: + logger.error(f"no ingester found with protocol={protocol!r}") + return None + + def otlp_grpc_endpoint(self, relation: Optional[Relation] = None) -> Optional[str]: + """Ingester endpoint for the ``otlp_grpc`` protocol.""" + return self._get_ingester(relation or self._relation, protocol="otlp_grpc") + + def otlp_http_endpoint( + self, relation: Optional[Relation] = None, ssl: bool = False + ) -> Optional[str]: + """Ingester endpoint for the ``otlp_http`` protocol. + + The provided endpoint does not contain the endpoint suffix. If the instrumenting library needs the full path, + your endpoint code needs to add the ``/v1/traces`` suffix. + """ + return self._get_ingester(relation or self._relation, protocol="otlp_http", ssl=ssl) + + def zipkin_endpoint( + self, relation: Optional[Relation] = None, ssl: bool = False + ) -> Optional[str]: + """Ingester endpoint for the ``zipkin`` protocol. + + The provided endpoint does not contain the endpoint suffix. If the instrumenting library needs the full path, + your endpoint code needs to add the ``/api/v2/spans`` suffix. + """ + return self._get_ingester(relation or self._relation, protocol="zipkin", ssl=ssl) + + def tempo_endpoint( + self, relation: Optional[Relation] = None, ssl: bool = False + ) -> Optional[str]: + """Ingester endpoint for the ``tempo`` protocol.""" + return self._get_ingester(relation or self._relation, protocol="tempo", ssl=ssl) + + def jaeger_http_thrift_endpoint( + self, relation: Optional[Relation] = None, ssl: bool = False + ) -> Optional[str]: + """Ingester endpoint for the ``jaeger_http_thrift`` protocol. + + The provided endpoint does not contain the endpoint suffix. If the instrumenting library needs the full path, + your endpoint code needs to add the ``/api/traces`` suffix. + """ + return self._get_ingester(relation or self._relation, "jaeger_http_thrift", ssl=ssl) + + def jaeger_grpc_endpoint(self, relation: Optional[Relation] = None) -> Optional[str]: + """Ingester endpoint for the ``jaeger_grpc`` protocol.""" + return self._get_ingester(relation or self._relation, "jaeger_grpc") diff --git a/metadata.yaml b/metadata.yaml index 0eaba3d..5968e1d 100644 --- a/metadata.yaml +++ b/metadata.yaml @@ -55,6 +55,12 @@ requires: description: | Ingress-per-app, to load-balance across multiple units of the coordinator. + tracing: + interface: tracing + limit: 1 + description: | + Enables sending traces to the tracing backend. + provides: mimir-cluster: interface: mimir_cluster @@ -68,7 +74,3 @@ provides: Forward workers' built-in dashboards to grafana (the coordinator, not the worker, owns all dashboards). - tracing: - interface: tracing - description: | - Send traces to tempo. diff --git a/requirements.txt b/requirements.txt index f57ce95..2cb1603 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,7 @@ ops pydantic # crossplane is a package from nginxinc to interact with the Nginx config -crossplane \ No newline at end of file +crossplane + +# lib/charms/tempo_k8s/v1/charm_tracing.py +opentelemetry-exporter-otlp-proto-http \ No newline at end of file diff --git a/src/charm.py b/src/charm.py index c20fcd9..04b3841 100755 --- a/src/charm.py +++ b/src/charm.py @@ -22,6 +22,8 @@ from charms.prometheus_k8s.v0.prometheus_remote_write import ( PrometheusRemoteWriteConsumer, ) +from charms.tempo_k8s.v1.charm_tracing import trace_charm +from charms.tempo_k8s.v1.tracing import TracingEndpointRequirer from mimir_config import BUCKET_NAME, S3_RELATION_NAME, _S3ConfigData from mimir_coordinator import MimirCoordinator from nginx import Nginx @@ -31,6 +33,16 @@ logger = logging.getLogger(__name__) +@trace_charm( + tracing_endpoint="tempo_endpoint", + extra_types=[ + S3Requirer, + MimirClusterProvider, + MimirCoordinator, + Nginx, + ], + # TODO add certificate file once TLS support is merged +) class MimirCoordinatorK8SOperatorCharm(ops.CharmBase): """Charm the service.""" @@ -50,6 +62,8 @@ def __init__(self, framework: ops.Framework): self.coordinator = MimirCoordinator(cluster_provider=self.cluster_provider) self.nginx = Nginx(cluster_provider=self.cluster_provider) + self.tracing = TracingEndpointRequirer(self) + self.framework.observe( self.on.nginx_pebble_ready, # pyright: ignore self._on_nginx_pebble_ready, @@ -185,6 +199,14 @@ def _on_nginx_pebble_ready(self, _event: ops.PebbleReadyEvent) -> None: self._nginx_container.add_layer("nginx", self.nginx.layer, combine=True) self._nginx_container.autostart() + @property + def tempo_endpoint(self) -> Optional[str]: + """Tempo endpoint for charm tracing.""" + if self.tracing.is_ready(): + return self.tracing.otlp_http_endpoint() + else: + return None + if __name__ == "__main__": # pragma: nocover ops.main.main(MimirCoordinatorK8SOperatorCharm) diff --git a/src/grafana_dashboards/mimir-top-tenants.json b/src/grafana_dashboards/mimir-top-tenants.json index b43659a..320bc44 100644 --- a/src/grafana_dashboards/mimir-top-tenants.json +++ b/src/grafana_dashboards/mimir-top-tenants.json @@ -35,7 +35,7 @@ "height": "25px", "panels": [ { - "content": "

\n This dashboard shows the top tenants based on multiple selection criterias.\n Rows are collapsed by default to avoid querying all of them.\n Use the templating variable \"limit\" above to select the amount of users to be shown.\n

\n", + "content": "

\n This dashboard shows the top tenants based on multiple selection criteria.\n Rows are collapsed by default to avoid querying all of them.\n Use the templating variable \"limit\" above to select the amount of users to be shown.\n

\n", "datasource": null, "description": "", "id": 1,