diff --git a/lib/charms/observability_libs/v1/cert_handler.py b/lib/charms/observability_libs/v1/cert_handler.py index 4a1940b..26be879 100644 --- a/lib/charms/observability_libs/v1/cert_handler.py +++ b/lib/charms/observability_libs/v1/cert_handler.py @@ -32,6 +32,7 @@ Since this library uses [Juju Secrets](https://juju.is/docs/juju/secret) it requires Juju >= 3.0.3. """ import abc +import hashlib import ipaddress import json import socket @@ -67,7 +68,7 @@ LIBID = "b5cd5cd580f3428fa5f59a8876dcbe6a" LIBAPI = 1 -LIBPATCH = 13 +LIBPATCH = 14 VAULT_SECRET_LABEL = "cert-handler-private-vault" @@ -301,14 +302,11 @@ def __init__( Must match metadata.yaml. cert_subject: Custom subject. Name collisions are under the caller's responsibility. sans: DNS names. If none are given, use FQDN. - refresh_events: an optional list of bound events which - will be observed to replace the current CSR with a new one - if there are changes in the CSR's DNS SANs or IP SANs. - Then, subsequently, replace its corresponding certificate with a new one. + refresh_events: [DEPRECATED]. """ super().__init__(charm, key) # use StoredState to store the hash of the CSR - # to potentially trigger a CSR renewal on `refresh_events` + # to potentially trigger a CSR renewal self._stored.set_default( csr_hash=None, ) @@ -320,8 +318,9 @@ def __init__( # Use fqdn only if no SANs were given, and drop empty/duplicate SANs sans = list(set(filter(None, (sans or [socket.getfqdn()])))) - self.sans_ip = list(filter(is_ip_address, sans)) - self.sans_dns = list(filterfalse(is_ip_address, sans)) + # sort SANS lists to avoid unnecessary csr renewals during reconciliation + self.sans_ip = sorted(filter(is_ip_address, sans)) + self.sans_dns = sorted(filterfalse(is_ip_address, sans)) if self._check_juju_supports_secrets(): vault_backend = _SecretVaultBackend(charm, secret_label=VAULT_SECRET_LABEL) @@ -367,13 +366,15 @@ def __init__( ) if refresh_events: - for ev in refresh_events: - self.framework.observe(ev, self._on_refresh_event) + logger.warn( + "DEPRECATION WARNING. `refresh_events` is now deprecated. CertHandler will automatically refresh the CSR when necessary." + ) - def _on_refresh_event(self, _): - """Replace the latest current CSR with a new one if there are any SANs changes.""" - if self._stored.csr_hash != self._csr_hash: - self._generate_csr(renew=True) + self._reconcile() + + def _reconcile(self): + """Run all logic that is independent of what event we're processing.""" + self._refresh_csr_if_needed() def _on_upgrade_charm(self, _): has_privkey = self.vault.get_value("private-key") @@ -388,6 +389,11 @@ def _on_upgrade_charm(self, _): # this will call `self.private_key` which will generate a new privkey. self._generate_csr(renew=True) + def _refresh_csr_if_needed(self): + """Refresh the current CSR with a new one if there are any SANs changes.""" + if self._stored.csr_hash is not None and self._stored.csr_hash != self._csr_hash: + self._generate_csr(renew=True) + def _migrate_vault(self): peer_backend = _RelationVaultBackend(self.charm, relation_name="peers") @@ -440,13 +446,17 @@ def enabled(self) -> bool: return True @property - def _csr_hash(self) -> int: + def _csr_hash(self) -> str: """A hash of the config that constructs the CSR. Only include here the config options that, should they change, should trigger a renewal of the CSR. """ - return hash( + + def _stable_hash(data): + return hashlib.sha256(str(data).encode()).hexdigest() + + return _stable_hash( ( tuple(self.sans_dns), tuple(self.sans_ip), diff --git a/lib/charms/tempo_k8s/v2/tracing.py b/lib/charms/tempo_k8s/v2/tracing.py index 81bf1f1..ce142b8 100644 --- a/lib/charms/tempo_k8s/v2/tracing.py +++ b/lib/charms/tempo_k8s/v2/tracing.py @@ -1,996 +1,29 @@ -# Copyright 2024 Canonical Ltd. +#!/usr/bin/env python3 +# Copyright 2022 Canonical Ltd. # See LICENSE file for licensing details. -"""## Overview. -This document explains how to integrate with the Tempo charm for the purpose of pushing traces to a -tracing endpoint provided by Tempo. It also explains how alternative implementations of the Tempo charm -may maintain the same interface and be backward compatible with all currently integrated charms. +"""This charm library has been transferred to the HA version of this charm. -## Requirer Library Usage +The new owner is the `tempo-coordinator-k8s` charm: +- [github](https://github.com/canonical/tempo-coordinator-k8s/) +- [charmhub](https://charmhub.io/tempo-coordinator-k8s/) -Charms seeking to push traces to Tempo, must do so using the `TracingEndpointRequirer` -object from this charm library. For the simplest use cases, using the `TracingEndpointRequirer` -object only requires instantiating it, typically in the constructor of your charm. The -`TracingEndpointRequirer` constructor requires the name of the relation over which a tracing endpoint - is exposed by the Tempo charm, and a list of protocols it intends to send traces with. - This relation must use the `tracing` interface. - The `TracingEndpointRequirer` object may be instantiated as follows +The new library (with its major version reset to 0) can be found at - from charms.tempo_k8s.v2.tracing import TracingEndpointRequirer +https://charmhub.io/tempo-coordinator-k8s/libraries/tracing - def __init__(self, *args): - super().__init__(*args) - # ... - self.tracing = TracingEndpointRequirer(self, - protocols=['otlp_grpc', 'otlp_http', 'jaeger_http_thrift'] - ) - # ... +to install it: -Note that the first argument (`self`) to `TracingEndpointRequirer` is always a reference to the -parent charm. +> charmcraft fetch-lib charms.tempo_coordinator_k8s.v0.tracing -Alternatively to providing the list of requested protocols at init time, the charm can do it at -any point in time by calling the -`TracingEndpointRequirer.request_protocols(*protocol:str, relation:Optional[Relation])` method. -Using this method also allows you to use per-relation protocols. - -Units of provider charms obtain the tempo endpoint to which they will push their traces by calling -`TracingEndpointRequirer.get_endpoint(protocol: str)`, where `protocol` is, for example: -- `otlp_grpc` -- `otlp_http` -- `zipkin` -- `tempo` - -If the `protocol` is not in the list of protocols that the charm requested at endpoint set-up time, -the library will raise an error. - -## Requirer Library Usage - -The `TracingEndpointProvider` object may be used by charms to manage relations with their -trace sources. For this purposes a Tempo-like charm needs to do two things - -1. Instantiate the `TracingEndpointProvider` object by providing it a -reference to the parent (Tempo) charm and optionally the name of the relation that the Tempo charm -uses to interact with its trace sources. This relation must conform to the `tracing` interface -and it is strongly recommended that this relation be named `tracing` which is its -default value. - -For example a Tempo charm may instantiate the `TracingEndpointProvider` in its constructor as -follows - - from charms.tempo_k8s.v2.tracing import TracingEndpointProvider - - def __init__(self, *args): - super().__init__(*args) - # ... - self.tracing = TracingEndpointProvider(self) - # ... - - - -""" # noqa: W505 -import enum -import json -import logging -from pathlib import Path -from typing import ( - TYPE_CHECKING, - Any, - Dict, - List, - Literal, - MutableMapping, - Optional, - Sequence, - Tuple, - Union, - cast, -) - -import pydantic -from ops.charm import ( - CharmBase, - CharmEvents, - RelationBrokenEvent, - RelationEvent, - RelationRole, -) -from ops.framework import EventSource, Object -from ops.model import ModelError, Relation -from pydantic import BaseModel, Field +The API is unchanged, so you can search and replace the path to swap the old lib with the new one. +""" -# The unique Charmhub library identifier, never change it LIBID = "12977e9aa0b34367903d8afeb8c3d85d" - -# Increment this major API version when introducing breaking changes LIBAPI = 2 +LIBPATCH = 11 -# Increment this PATCH version before using `charmcraft publish-lib` or reset -# to 0 if you are raising the major API version -LIBPATCH = 10 - -PYDEPS = ["pydantic"] - -logger = logging.getLogger(__name__) - -DEFAULT_RELATION_NAME = "tracing" -RELATION_INTERFACE_NAME = "tracing" - -# Supported list rationale https://github.com/canonical/tempo-coordinator-k8s-operator/issues/8 -ReceiverProtocol = Literal[ - "zipkin", - "otlp_grpc", - "otlp_http", - "jaeger_grpc", - "jaeger_thrift_http", -] - -RawReceiver = Tuple[ReceiverProtocol, str] -"""Helper type. A raw receiver is defined as a tuple consisting of the protocol name, and the (external, if available), -(secured, if available) resolvable server url. -""" - -BUILTIN_JUJU_KEYS = {"ingress-address", "private-address", "egress-subnets"} - - -class TransportProtocolType(str, enum.Enum): - """Receiver Type.""" - - http = "http" - grpc = "grpc" - - -receiver_protocol_to_transport_protocol: Dict[ReceiverProtocol, TransportProtocolType] = { - "zipkin": TransportProtocolType.http, - "otlp_grpc": TransportProtocolType.grpc, - "otlp_http": TransportProtocolType.http, - "jaeger_thrift_http": TransportProtocolType.http, - "jaeger_grpc": TransportProtocolType.grpc, -} -"""A mapping between telemetry protocols and their corresponding transport protocol. -""" - - -class TracingError(Exception): - """Base class for custom errors raised by this library.""" - - -class NotReadyError(TracingError): - """Raised by the provider wrapper if a requirer hasn't published the required data (yet).""" - - -class ProtocolNotRequestedError(TracingError): - """Raised if the user attempts to obtain an endpoint for a protocol it did not request.""" - - -class DataValidationError(TracingError): - """Raised when data validation fails on IPU relation data.""" - - -class AmbiguousRelationUsageError(TracingError): - """Raised when one wrongly assumes that there can only be one relation on an endpoint.""" - - -if int(pydantic.version.VERSION.split(".")[0]) < 2: - - class DatabagModel(BaseModel): # type: ignore - """Base databag model.""" - - class Config: - """Pydantic config.""" - - # ignore any extra fields in the databag - extra = "ignore" - """Ignore any extra fields in the databag.""" - allow_population_by_field_name = True - """Allow instantiating this class by field name (instead of forcing alias).""" - - _NEST_UNDER = None - - @classmethod - def load(cls, databag: MutableMapping): - """Load this model from a Juju databag.""" - if cls._NEST_UNDER: - return cls.parse_obj(json.loads(databag[cls._NEST_UNDER])) - - try: - data = { - k: json.loads(v) - for k, v in databag.items() - # Don't attempt to parse model-external values - if k in {f.alias for f in cls.__fields__.values()} - } - except json.JSONDecodeError as e: - msg = f"invalid databag contents: expecting json. {databag}" - logger.error(msg) - raise DataValidationError(msg) from e - - try: - return cls.parse_raw(json.dumps(data)) # type: ignore - except pydantic.ValidationError as e: - msg = f"failed to validate databag: {databag}" - logger.debug(msg, exc_info=True) - raise DataValidationError(msg) from e - - def dump(self, databag: Optional[MutableMapping] = None, clear: bool = True): - """Write the contents of this model to Juju databag. - - :param databag: the databag to write the data to. - :param clear: ensure the databag is cleared before writing it. - """ - if clear and databag: - databag.clear() - - if databag is None: - databag = {} - - if self._NEST_UNDER: - databag[self._NEST_UNDER] = self.json(by_alias=True) - return databag - - dct = self.dict() - for key, field in self.__fields__.items(): # type: ignore - value = dct[key] - databag[field.alias or key] = json.dumps(value) - - return databag - -else: - from pydantic import ConfigDict - - class DatabagModel(BaseModel): - """Base databag model.""" - - model_config = ConfigDict( - # ignore any extra fields in the databag - extra="ignore", - # Allow instantiating this class by field name (instead of forcing alias). - populate_by_name=True, - # Custom config key: whether to nest the whole datastructure (as json) - # under a field or spread it out at the toplevel. - _NEST_UNDER=None, # type: ignore - ) - """Pydantic config.""" - - @classmethod - def load(cls, databag: MutableMapping): - """Load this model from a Juju databag.""" - nest_under = cls.model_config.get("_NEST_UNDER") # type: ignore - if nest_under: - return cls.model_validate(json.loads(databag[nest_under])) # type: ignore - - try: - data = { - k: json.loads(v) - for k, v in databag.items() - # Don't attempt to parse model-external values - if k in {(f.alias or n) for n, f in cls.__fields__.items()} - } - except json.JSONDecodeError as e: - msg = f"invalid databag contents: expecting json. {databag}" - logger.error(msg) - raise DataValidationError(msg) from e - - try: - return cls.model_validate_json(json.dumps(data)) # type: ignore - except pydantic.ValidationError as e: - msg = f"failed to validate databag: {databag}" - logger.debug(msg, exc_info=True) - raise DataValidationError(msg) from e - - def dump(self, databag: Optional[MutableMapping] = None, clear: bool = True): - """Write the contents of this model to Juju databag. - - :param databag: the databag to write the data to. - :param clear: ensure the databag is cleared before writing it. - """ - if clear and databag: - databag.clear() - - if databag is None: - databag = {} - nest_under = self.model_config.get("_NEST_UNDER") - if nest_under: - databag[nest_under] = self.model_dump_json( # type: ignore - by_alias=True, - # skip keys whose values are default - exclude_defaults=True, - ) - return databag - - dct = self.model_dump() # type: ignore - for key, field in self.model_fields.items(): # type: ignore - value = dct[key] - if value == field.default: - continue - databag[field.alias or key] = json.dumps(value) - - return databag - - -# todo use models from charm-relation-interfaces -if int(pydantic.version.VERSION.split(".")[0]) < 2: - - class ProtocolType(BaseModel): # type: ignore - """Protocol Type.""" - - class Config: - """Pydantic config.""" - - use_enum_values = True - """Allow serializing enum values.""" - - name: str = Field( - ..., - description="Receiver protocol name. What protocols are supported (and what they are called) " - "may differ per provider.", - examples=["otlp_grpc", "otlp_http", "tempo_http"], - ) - - type: TransportProtocolType = Field( - ..., - description="The transport protocol used by this receiver.", - examples=["http", "grpc"], - ) - -else: - - class ProtocolType(BaseModel): - """Protocol Type.""" - - model_config = ConfigDict( # type: ignore - # Allow serializing enum values. - use_enum_values=True - ) - """Pydantic config.""" - - name: str = Field( - ..., - description="Receiver protocol name. What protocols are supported (and what they are called) " - "may differ per provider.", - examples=["otlp_grpc", "otlp_http", "tempo_http"], - ) - - type: TransportProtocolType = Field( - ..., - description="The transport protocol used by this receiver.", - examples=["http", "grpc"], - ) - - -class Receiver(BaseModel): - """Specification of an active receiver.""" - - protocol: ProtocolType = Field(..., description="Receiver protocol name and type.") - url: str = Field( - ..., - description="""URL at which the receiver is reachable. If there's an ingress, it would be the external URL. - Otherwise, it would be the service's fqdn or internal IP. - If the protocol type is grpc, the url will not contain a scheme.""", - examples=[ - "http://traefik_address:2331", - "https://traefik_address:2331", - "http://tempo_public_ip:2331", - "https://tempo_public_ip:2331", - "tempo_public_ip:2331", - ], - ) - - -class TracingProviderAppData(DatabagModel): # noqa: D101 - """Application databag model for the tracing provider.""" - - receivers: List[Receiver] = Field( - ..., - description="List of all receivers enabled on the tracing provider.", - ) - - -class TracingRequirerAppData(DatabagModel): # noqa: D101 - """Application databag model for the tracing requirer.""" - - receivers: List[ReceiverProtocol] - """Requested receivers.""" - - -class _AutoSnapshotEvent(RelationEvent): - __args__: Tuple[str, ...] = () - __optional_kwargs__: Dict[str, Any] = {} - - @classmethod - def __attrs__(cls): - return cls.__args__ + tuple(cls.__optional_kwargs__.keys()) - - def __init__(self, handle, relation, *args, **kwargs): - super().__init__(handle, relation) - - if not len(self.__args__) == len(args): - raise TypeError("expected {} args, got {}".format(len(self.__args__), len(args))) - - for attr, obj in zip(self.__args__, args): - setattr(self, attr, obj) - for attr, default in self.__optional_kwargs__.items(): - obj = kwargs.get(attr, default) - setattr(self, attr, obj) - - def snapshot(self) -> dict: - dct = super().snapshot() - for attr in self.__attrs__(): - obj = getattr(self, attr) - try: - dct[attr] = obj - except ValueError as e: - raise ValueError( - "cannot automagically serialize {}: " - "override this method and do it " - "manually.".format(obj) - ) from e - - return dct - - def restore(self, snapshot: dict) -> None: - super().restore(snapshot) - for attr, obj in snapshot.items(): - setattr(self, attr, obj) - - -class RelationNotFoundError(Exception): - """Raised if no relation with the given name is found.""" - - def __init__(self, relation_name: str): - self.relation_name = relation_name - self.message = "No relation named '{}' found".format(relation_name) - super().__init__(self.message) - - -class RelationInterfaceMismatchError(Exception): - """Raised if the relation with the given name has an unexpected interface.""" - - def __init__( - self, - relation_name: str, - expected_relation_interface: str, - actual_relation_interface: str, - ): - self.relation_name = relation_name - self.expected_relation_interface = expected_relation_interface - self.actual_relation_interface = actual_relation_interface - self.message = ( - "The '{}' relation has '{}' as interface rather than the expected '{}'".format( - relation_name, actual_relation_interface, expected_relation_interface - ) - ) - - super().__init__(self.message) - - -class RelationRoleMismatchError(Exception): - """Raised if the relation with the given name has a different role than expected.""" - - def __init__( - self, - relation_name: str, - expected_relation_role: RelationRole, - actual_relation_role: RelationRole, - ): - self.relation_name = relation_name - self.expected_relation_interface = expected_relation_role - self.actual_relation_role = actual_relation_role - self.message = "The '{}' relation has role '{}' rather than the expected '{}'".format( - relation_name, repr(actual_relation_role), repr(expected_relation_role) - ) - - super().__init__(self.message) - - -def _validate_relation_by_interface_and_direction( - charm: CharmBase, - relation_name: str, - expected_relation_interface: str, - expected_relation_role: RelationRole, -): - """Validate a relation. - - Verifies that the `relation_name` provided: (1) exists in metadata.yaml, - (2) declares as interface the interface name passed as `relation_interface` - and (3) has the right "direction", i.e., it is a relation that `charm` - provides or requires. - - Args: - charm: a `CharmBase` object to scan for the matching relation. - relation_name: the name of the relation to be verified. - expected_relation_interface: the interface name to be matched by the - relation named `relation_name`. - expected_relation_role: whether the `relation_name` must be either - provided or required by `charm`. - - Raises: - RelationNotFoundError: If there is no relation in the charm's metadata.yaml - with the same name as provided via `relation_name` argument. - RelationInterfaceMismatchError: The relation with the same name as provided - via `relation_name` argument does not have the same relation interface - as specified via the `expected_relation_interface` argument. - RelationRoleMismatchError: If the relation with the same name as provided - via `relation_name` argument does not have the same role as specified - via the `expected_relation_role` argument. - """ - if relation_name not in charm.meta.relations: - raise RelationNotFoundError(relation_name) - - relation = charm.meta.relations[relation_name] - - # fixme: why do we need to cast here? - actual_relation_interface = cast(str, relation.interface_name) - - if actual_relation_interface != expected_relation_interface: - raise RelationInterfaceMismatchError( - relation_name, expected_relation_interface, actual_relation_interface - ) - - if expected_relation_role is RelationRole.provides: - if relation_name not in charm.meta.provides: - raise RelationRoleMismatchError( - relation_name, RelationRole.provides, RelationRole.requires - ) - elif expected_relation_role is RelationRole.requires: - if relation_name not in charm.meta.requires: - raise RelationRoleMismatchError( - relation_name, RelationRole.requires, RelationRole.provides - ) - else: - raise TypeError("Unexpected RelationDirection: {}".format(expected_relation_role)) - - -class RequestEvent(RelationEvent): - """Event emitted when a remote requests a tracing endpoint.""" - - @property - def requested_receivers(self) -> List[ReceiverProtocol]: - """List of receiver protocols that have been requested.""" - relation = self.relation - app = relation.app - if not app: - raise NotReadyError("relation.app is None") - - return TracingRequirerAppData.load(relation.data[app]).receivers - - -class BrokenEvent(RelationBrokenEvent): - """Event emitted when a relation on tracing is broken.""" - - -class TracingEndpointProviderEvents(CharmEvents): - """TracingEndpointProvider events.""" - - request = EventSource(RequestEvent) - broken = EventSource(BrokenEvent) - - -class TracingEndpointProvider(Object): - """Class representing a trace receiver service.""" - - on = TracingEndpointProviderEvents() # type: ignore - - def __init__( - self, - charm: CharmBase, - external_url: Optional[str] = None, - relation_name: str = DEFAULT_RELATION_NAME, - ): - """Initialize. - - Args: - charm: a `CharmBase` instance that manages this instance of the Tempo service. - external_url: external address of the node hosting the tempo server, - if an ingress is present. - relation_name: an optional string name of the relation between `charm` - and the Tempo charmed service. The default is "tracing". - - Raises: - RelationNotFoundError: If there is no relation in the charm's metadata.yaml - with the same name as provided via `relation_name` argument. - RelationInterfaceMismatchError: The relation with the same name as provided - via `relation_name` argument does not have the `tracing` relation - interface. - RelationRoleMismatchError: If the relation with the same name as provided - via `relation_name` argument does not have the `RelationRole.requires` - role. - """ - _validate_relation_by_interface_and_direction( - charm, relation_name, RELATION_INTERFACE_NAME, RelationRole.provides - ) - - super().__init__(charm, relation_name + "tracing-provider") - self._charm = charm - self._external_url = external_url - self._relation_name = relation_name - self.framework.observe( - self._charm.on[relation_name].relation_joined, self._on_relation_event - ) - self.framework.observe( - self._charm.on[relation_name].relation_created, self._on_relation_event - ) - self.framework.observe( - self._charm.on[relation_name].relation_changed, self._on_relation_event - ) - self.framework.observe( - self._charm.on[relation_name].relation_broken, self._on_relation_broken_event - ) - - def _on_relation_broken_event(self, e: RelationBrokenEvent): - """Handle relation broken events.""" - self.on.broken.emit(e.relation) - - def _on_relation_event(self, e: RelationEvent): - """Handle relation created/joined/changed events.""" - if self.is_requirer_ready(e.relation): - self.on.request.emit(e.relation) - - def is_requirer_ready(self, relation: Relation): - """Attempt to determine if requirer has already populated app data.""" - try: - self._get_requested_protocols(relation) - except NotReadyError: - return False - return True - - @staticmethod - def _get_requested_protocols(relation: Relation): - app = relation.app - if not app: - raise NotReadyError("relation.app is None") - - try: - databag = TracingRequirerAppData.load(relation.data[app]) - except (json.JSONDecodeError, pydantic.ValidationError, DataValidationError): - logger.info(f"relation {relation} is not ready to talk tracing") - raise NotReadyError() - return databag.receivers - - def requested_protocols(self): - """All receiver protocols that have been requested by our related apps.""" - requested_protocols = set() - for relation in self.relations: - try: - protocols = self._get_requested_protocols(relation) - except NotReadyError: - continue - requested_protocols.update(protocols) - return requested_protocols - - @property - def relations(self) -> List[Relation]: - """All relations active on this endpoint.""" - return self._charm.model.relations[self._relation_name] - - def publish_receivers(self, receivers: Sequence[RawReceiver]): - """Let all requirers know that these receivers are active and listening.""" - if not self._charm.unit.is_leader(): - raise RuntimeError("only leader can do this") - - for relation in self.relations: - try: - TracingProviderAppData( - receivers=[ - Receiver( - url=url, - protocol=ProtocolType( - name=protocol, - type=receiver_protocol_to_transport_protocol[protocol], - ), - ) - for protocol, url in receivers - ], - ).dump(relation.data[self._charm.app]) - - except ModelError as e: - # args are bytes - msg = e.args[0] - if isinstance(msg, bytes): - if msg.startswith( - b"ERROR cannot read relation application settings: permission denied" - ): - logger.error( - f"encountered error {e} while attempting to update_relation_data." - f"The relation must be gone." - ) - continue - raise - - -class EndpointRemovedEvent(RelationBrokenEvent): - """Event representing a change in one of the receiver endpoints.""" - - -class EndpointChangedEvent(_AutoSnapshotEvent): - """Event representing a change in one of the receiver endpoints.""" - - __args__ = ("_receivers",) - - if TYPE_CHECKING: - _receivers = [] # type: List[dict] - - @property - def receivers(self) -> List[Receiver]: - """Cast receivers back from dict.""" - return [Receiver(**i) for i in self._receivers] - - -class TracingEndpointRequirerEvents(CharmEvents): - """TracingEndpointRequirer events.""" - - endpoint_changed = EventSource(EndpointChangedEvent) - endpoint_removed = EventSource(EndpointRemovedEvent) - - -class TracingEndpointRequirer(Object): - """A tracing endpoint for Tempo.""" - - on = TracingEndpointRequirerEvents() # type: ignore - - def __init__( - self, - charm: CharmBase, - relation_name: str = DEFAULT_RELATION_NAME, - protocols: Optional[List[ReceiverProtocol]] = None, - ): - """Construct a tracing requirer for a Tempo charm. - - If your application supports pushing traces to a distributed tracing backend, the - `TracingEndpointRequirer` object enables your charm to easily access endpoint information - exchanged over a `tracing` relation interface. - - Args: - charm: a `CharmBase` object that manages this - `TracingEndpointRequirer` object. Typically, this is `self` in the instantiating - class. - relation_name: an optional string name of the relation between `charm` - and the Tempo charmed service. The default is "tracing". It is strongly - advised not to change the default, so that people deploying your charm will have a - consistent experience with all other charms that provide tracing endpoints. - protocols: optional list of protocols that the charm intends to send traces with. - The provider will enable receivers for these and only these protocols, - so be sure to enable all protocols the charm or its workload are going to need. - - Raises: - RelationNotFoundError: If there is no relation in the charm's metadata.yaml - with the same name as provided via `relation_name` argument. - RelationInterfaceMismatchError: The relation with the same name as provided - via `relation_name` argument does not have the `tracing` relation - interface. - RelationRoleMismatchError: If the relation with the same name as provided - via `relation_name` argument does not have the `RelationRole.provides` - role. - """ - _validate_relation_by_interface_and_direction( - charm, relation_name, RELATION_INTERFACE_NAME, RelationRole.requires - ) - - super().__init__(charm, relation_name) - - self._is_single_endpoint = charm.meta.relations[relation_name].limit == 1 - - self._charm = charm - self._relation_name = relation_name - - events = self._charm.on[self._relation_name] - self.framework.observe(events.relation_changed, self._on_tracing_relation_changed) - self.framework.observe(events.relation_broken, self._on_tracing_relation_broken) - - if protocols: - self.request_protocols(protocols) - - def request_protocols( - self, protocols: Sequence[ReceiverProtocol], relation: Optional[Relation] = None - ): - """Publish the list of protocols which the provider should activate.""" - # todo: should we check if _is_single_endpoint and len(self.relations) > 1 and raise, here? - relations = [relation] if relation else self.relations - - if not protocols: - # empty sequence - raise ValueError( - "You need to pass a nonempty sequence of protocols to `request_protocols`." - ) - - try: - if self._charm.unit.is_leader(): - for relation in relations: - TracingRequirerAppData( - receivers=list(protocols), - ).dump(relation.data[self._charm.app]) - - except ModelError as e: - # args are bytes - msg = e.args[0] - if isinstance(msg, bytes): - if msg.startswith( - b"ERROR cannot read relation application settings: permission denied" - ): - logger.error( - f"encountered error {e} while attempting to request_protocols." - f"The relation must be gone." - ) - return - raise - - @property - def relations(self) -> List[Relation]: - """The tracing relations associated with this endpoint.""" - return self._charm.model.relations[self._relation_name] - - @property - def _relation(self) -> Optional[Relation]: - """If this wraps a single endpoint, the relation bound to it, if any.""" - if not self._is_single_endpoint: - objname = type(self).__name__ - raise AmbiguousRelationUsageError( - f"This {objname} wraps a {self._relation_name} endpoint that has " - "limit != 1. We can't determine what relation, of the possibly many, you are " - f"talking about. Please pass a relation instance while calling {objname}, " - "or set limit=1 in the charm metadata." - ) - relations = self.relations - return relations[0] if relations else None - - def is_ready(self, relation: Optional[Relation] = None): - """Is this endpoint ready?""" - relation = relation or self._relation - if not relation: - logger.debug(f"no relation on {self._relation_name !r}: tracing not ready") - return False - if relation.data is None: - logger.error(f"relation data is None for {relation}") - return False - if not relation.app: - logger.error(f"{relation} event received but there is no relation.app") - return False - try: - databag = dict(relation.data[relation.app]) - TracingProviderAppData.load(databag) - - except (json.JSONDecodeError, pydantic.ValidationError, DataValidationError): - logger.info(f"failed validating relation data for {relation}") - return False - return True - - def _on_tracing_relation_changed(self, event): - """Notify the providers that there is new endpoint information available.""" - relation = event.relation - if not self.is_ready(relation): - self.on.endpoint_removed.emit(relation) # type: ignore - return - - data = TracingProviderAppData.load(relation.data[relation.app]) - self.on.endpoint_changed.emit(relation, [i.dict() for i in data.receivers]) # type: ignore - - def _on_tracing_relation_broken(self, event: RelationBrokenEvent): - """Notify the providers that the endpoint is broken.""" - relation = event.relation - self.on.endpoint_removed.emit(relation) # type: ignore - - def get_all_endpoints( - self, relation: Optional[Relation] = None - ) -> Optional[TracingProviderAppData]: - """Unmarshalled relation data.""" - relation = relation or self._relation - if not self.is_ready(relation): - return - return TracingProviderAppData.load(relation.data[relation.app]) # type: ignore - - def _get_endpoint( - self, relation: Optional[Relation], protocol: ReceiverProtocol - ) -> Optional[str]: - app_data = self.get_all_endpoints(relation) - if not app_data: - return None - receivers: List[Receiver] = list( - filter(lambda i: i.protocol.name == protocol, app_data.receivers) - ) - if not receivers: - logger.error(f"no receiver found with protocol={protocol!r}") - return - if len(receivers) > 1: - logger.error( - f"too many receivers with protocol={protocol!r}; using first one. Found: {receivers}" - ) - return - - receiver = receivers[0] - return receiver.url - - def get_endpoint( - self, protocol: ReceiverProtocol, relation: Optional[Relation] = None - ) -> Optional[str]: - """Receiver endpoint for the given protocol. - - It could happen that this function gets called before the provider publishes the endpoints. - In such a scenario, if a non-leader unit calls this function, a permission denied exception will be raised due to - restricted access. To prevent this, this function needs to be guarded by the `is_ready` check. - - Raises: - ProtocolNotRequestedError: - If the charm unit is the leader unit and attempts to obtain an endpoint for a protocol it did not request. - """ - endpoint = self._get_endpoint(relation or self._relation, protocol=protocol) - if not endpoint: - requested_protocols = set() - relations = [relation] if relation else self.relations - for relation in relations: - try: - databag = TracingRequirerAppData.load(relation.data[self._charm.app]) - except DataValidationError: - continue - - requested_protocols.update(databag.receivers) - - if protocol not in requested_protocols: - raise ProtocolNotRequestedError(protocol, relation) - - return None - return endpoint - - -def charm_tracing_config( - endpoint_requirer: TracingEndpointRequirer, cert_path: Optional[Union[Path, str]] -) -> Tuple[Optional[str], Optional[str]]: - """Return the charm_tracing config you likely want. - - If no endpoint is provided: - disable charm tracing. - If https endpoint is provided but cert_path is not found on disk: - disable charm tracing. - If https endpoint is provided and cert_path is None: - ERROR - Else: - proceed with charm tracing (with or without tls, as appropriate) - - Usage: - If you are using charm_tracing >= v1.9: - >>> from lib.charms.tempo_k8s.v1.charm_tracing import trace_charm - >>> from lib.charms.tempo_k8s.v2.tracing import charm_tracing_config - >>> @trace_charm(tracing_endpoint="my_endpoint", cert_path="cert_path") - >>> class MyCharm(...): - >>> _cert_path = "/path/to/cert/on/charm/container.crt" - >>> def __init__(self, ...): - >>> self.tracing = TracingEndpointRequirer(...) - >>> self.my_endpoint, self.cert_path = charm_tracing_config( - ... self.tracing, self._cert_path) - - If you are using charm_tracing < v1.9: - >>> from lib.charms.tempo_k8s.v1.charm_tracing import trace_charm - >>> from lib.charms.tempo_k8s.v2.tracing import charm_tracing_config - >>> @trace_charm(tracing_endpoint="my_endpoint", cert_path="cert_path") - >>> class MyCharm(...): - >>> _cert_path = "/path/to/cert/on/charm/container.crt" - >>> def __init__(self, ...): - >>> self.tracing = TracingEndpointRequirer(...) - >>> self._my_endpoint, self._cert_path = charm_tracing_config( - ... self.tracing, self._cert_path) - >>> @property - >>> def my_endpoint(self): - >>> return self._my_endpoint - >>> @property - >>> def cert_path(self): - >>> return self._cert_path - - """ - if not endpoint_requirer.is_ready(): - return None, None - - endpoint = endpoint_requirer.get_endpoint("otlp_http") - if not endpoint: - return None, None - - is_https = endpoint.startswith("https://") - - if is_https: - if cert_path is None: - raise TracingError("Cannot send traces to an https endpoint without a certificate.") - elif not Path(cert_path).exists(): - # if endpoint is https BUT we don't have a server_cert yet: - # disable charm tracing until we do to prevent tls errors - return None, None - return endpoint, str(cert_path) - else: - return endpoint, None +raise DeprecationWarning( + "this charm lib is deprecated; please use charms.tempo_coordinator_k8s.v0.tracing instead. " + "see https://charmhub.io/tempo-coordinator-k8s/libraries/tracing" +)