From 79e4775585ff6499b6f3f35c5f71338d54281761 Mon Sep 17 00:00:00 2001 From: Pietro Pasotti Date: Wed, 19 Jun 2024 11:50:29 +0200 Subject: [PATCH] yanked workload --- charmcraft.yaml | 22 --- src/charm.py | 179 +++--------------------- src/coordinator.py | 20 +-- src/tempo.py | 238 +++----------------------------- src/tempo_coordinator.py | 286 --------------------------------------- 5 files changed, 34 insertions(+), 711 deletions(-) delete mode 100644 src/tempo_coordinator.py diff --git a/charmcraft.yaml b/charmcraft.yaml index 6832689..09e9403 100644 --- a/charmcraft.yaml +++ b/charmcraft.yaml @@ -32,14 +32,6 @@ containers: - storage: data location: /tmp/tempo -resources: - tempo-image: - type: oci-image - description: OCI image for Tempo - # Included for simplicity in integration tests - # see https://hub.docker.com/r/grafana/tempo/tags - upstream-source: grafana/tempo:2.4.0 - provides: tempo-cluster: interface: tempo_cluster @@ -103,20 +95,6 @@ peers: description: | peer relation for internal coordination -config: - options: - coordinator_runs_workload_when_clustered: - type: boolean - default: true - description: | - Whether this charm should also run a worker node when related - to any number of specialized worker applications. - - Set it to ``false`` if you want this charm to stop running `Tempo` - as soon as you integrate it with a tempo-worker-k8s-charm instance. - In this case, tempo-k8s will only act as coordinator - (and reverse proxy) for the tempo cluster. - bases: - build-on: diff --git a/src/charm.py b/src/charm.py index 1787004..111425c 100755 --- a/src/charm.py +++ b/src/charm.py @@ -56,15 +56,14 @@ def __init__(self, *args): super().__init__(*args) self.ingress = TraefikRouteRequirer(self, self.model.get_relation("ingress"), "ingress") # type: ignore self.tempo_cluster = TempoClusterProvider(self) - self.coordinator = TempoCoordinator(self.tempo_cluster, - is_worker=self.is_worker_node) + self.coordinator = TempoCoordinator(self.tempo_cluster) self.tempo = tempo = Tempo( - self.unit.get_container("tempo"), external_host=self.hostname, # we need otlp_http receiver for charm_tracing + # TODO add any extra receivers enabled manually via config enable_receivers=["otlp_http"], - run_worker_node=self.is_worker_node + use_tls=self.tls_available, ) self.cert_handler = CertHandler( @@ -106,10 +105,6 @@ def __init__(self, *args): self.tracing = TracingEndpointProvider(self, external_url=self._external_url) self._inconsistencies = self.coordinator.get_deployment_inconsistencies( - clustered=self.is_clustered, - scaled=self.is_scaled, - has_workers=self.tempo_cluster.has_workers, - is_worker_node=self.is_worker_node, has_s3=self.is_s3_ready ) self._is_consistent = not self._inconsistencies @@ -122,15 +117,10 @@ def __init__(self, *args): "the situation is resolved by the cloud admin, to avoid data loss." ) self.framework.observe(self.on.collect_unit_status, self._on_collect_unit_status) - - if self.tempo.is_tempo_service_defined: - self.tempo.shutdown() - return # refuse to handle any other event as we can't possibly know what to do. # lifecycle self.framework.observe(self.on.leader_elected, self._on_leader_elected) - self.framework.observe(self.on.leader_settings_changed, self._on_leader_settings_changed) self.framework.observe(self.on.update_status, self._on_update_status) self.framework.observe(self.on.collect_unit_status, self._on_collect_unit_status) self.framework.observe(self.on.config_changed, self._on_config_changed) @@ -142,12 +132,6 @@ def __init__(self, *args): self.framework.observe(ingress.relation_joined, self._on_ingress_relation_joined) self.framework.observe(self.ingress.on.ready, self._on_ingress_ready) - # workload - self.framework.observe(self.on.tempo_pebble_ready, self._on_tempo_pebble_ready) - self.framework.observe( - self.on.tempo_pebble_custom_notice, self._on_tempo_pebble_custom_notice - ) - # s3 self.framework.observe( self.s3_requirer.on.credentials_changed, self._on_s3_credentials_changed @@ -174,22 +158,6 @@ def __init__(self, *args): # UTILITY PROPERTIES # ###################### - @property - def is_worker_node(self) -> bool: - """Check whether this Tempo charm is configured to run a worker node.""" - if self.is_clustered: - return self.config.get("coordinator_runs_workload_when_clustered", True) - return True - - @property - def is_scaled(self) -> bool: - """Check whether Tempo is deployed with scale > 1.""" - relation = self.model.get_relation("tempo-peers") - if not relation: - return False - # does not include self - return len(relation.units) > 0 - @property def is_clustered(self) -> bool: """Check whether this Tempo is a coordinator and has worker nodes connected to it.""" @@ -268,27 +236,14 @@ def _on_tracing_broken(self, _): self._update_tracing_relations() def _on_cert_handler_changed(self, _): - was_ready = self.tempo.tls_ready - if self.tls_available: logger.debug("enabling TLS") - self.tempo.configure_tls( - cert=self.cert_handler.server_cert, # type: ignore - key=self.cert_handler.private_key, # type: ignore - ca=self.cert_handler.ca_cert, # type: ignore - ) else: logger.debug("disabling TLS") - self.tempo.clear_tls_config() - - if was_ready != self.tempo.tls_ready: - # tls readiness change means config change. - self._update_tempo_config() - # sync scheme change with traefik and related consumers - self._configure_ingress() - if self.tempo.is_tempo_service_defined: - self.tempo.restart() + # tls readiness change means config change. + # sync scheme change with traefik and related consumers + self._configure_ingress() # sync the server cert with the charm container. # technically, because of charm tracing, this will be called first thing on each event @@ -314,12 +269,6 @@ def _on_ingress_relation_created(self, _: RelationEvent): def _on_ingress_relation_joined(self, _: RelationEvent): self._configure_ingress() - def _on_leader_settings_changed(self, _: ops.LeaderSettingsChangedEvent): - if not self.is_s3_ready: - logger.error( - "Losing leadership without s3. " "This unit will soon be in an inconsistent state." - ) - def _on_leader_elected(self, _: ops.LeaderElectedEvent): # as traefik_route goes through app data, we need to take lead of traefik_route if our leader dies. self._configure_ingress() @@ -331,19 +280,6 @@ def _on_s3_credentials_gone(self, _): self._on_s3_changed() def _on_s3_changed(self): - could_scale_before = self.tempo.can_scale() - - self._update_tempo_config() - - can_scale_now = self.tempo.can_scale() - # if we had s3, and we don't anymore, we need to replan from 'scaling-monolithic' to 'all' - # if we didn't have s3, and now we do, we can replan from 'all' to 'scaling-monolithic' - if could_scale_before != can_scale_now: - if not self.tempo.is_tempo_service_defined: - # TODO: should we be deferring this to the next pebble-ready instead? - logger.debug("tempo was not running! Starting it now") - self.tempo.plan() - self._update_tempo_cluster() def _on_tempo_peers_relation_created(self, event: ops.RelationCreatedEvent): @@ -351,11 +287,11 @@ def _on_tempo_peers_relation_created(self, event: ops.RelationCreatedEvent): event.relation.data[self.unit]["local-ip"] = self._local_ip def _on_tempo_peers_relation_changed(self, _): - if self._update_tempo_config(): - self.tempo.restart() + self._update_tempo_cluster() - def _update_tempo_config(self) -> bool: - peers = self.peers() + @property + def peer_addresses(self) -> List[str]: + peers = self._peers relation = self.model.get_relation("tempo-peers") # get unit addresses for all the other units from a databag if peers and relation: @@ -368,7 +304,7 @@ def _update_tempo_config(self) -> bool: if self._local_ip: addresses.append(self._local_ip) - return self.tempo.update_config(self._requested_receivers(), self._s3_config, addresses) + return addresses @property def _local_ip(self) -> Optional[str]: @@ -382,36 +318,10 @@ def _local_ip(self) -> Optional[str]: def _on_config_changed(self, _): # check if certificate files haven't disappeared and recreate them if needed - if self.tls_available and not self.tempo.tls_ready: - logger.debug("enabling TLS") - self.tempo.configure_tls( - cert=self.cert_handler.server_cert, # type: ignore - key=self.cert_handler.private_key, # type: ignore - ca=self.cert_handler.ca_cert, # type: ignore - ) - self._update_tempo_cluster() - def _on_tempo_pebble_custom_notice(self, event: PebbleNoticeEvent): - if event.notice.key == self.tempo.tempo_ready_notice_key: - logger.debug("pebble api reported ready") - # collect-unit-status should do the rest and report that pebble is ready. - self.tempo.receive_tempo_ready_notice() - - def _on_tempo_pebble_ready(self, event: WorkloadEvent): - if not self.tempo.container.can_connect(): - logger.warning("container not ready, cannot configure; will retry soon") - return event.defer() - - self.tempo.update_config(self._requested_receivers(), self._s3_config) - self.tempo.plan() - - self.unit.set_workload_version(self.version) - self.unit.status = ActiveStatus() - def _on_update_status(self, _): """Update the status of the application.""" - self.unit.set_workload_version(self.version) def _on_ingress_ready(self, _event): # whenever there's a change in ingress, we need to update all tracing relations @@ -433,8 +343,6 @@ def _on_list_receivers_action(self, event: ops.ActionEvent): def _on_collect_unit_status(self, e: CollectStatusEvent): # todo add [nginx.workload] statuses - if not self.tempo.container.can_connect(): - e.add_status(WaitingStatus("[workload.tempo] Tempo container not ready")) if not self.tempo.is_ready(): e.add_status(WaitingStatus("[workload.tempo] Tempo API not ready just yet...")) @@ -478,6 +386,9 @@ def _configure_ingress(self) -> None: if self.ingress.external_host: self._update_tracing_relations() + # notify the cluster + self._update_tempo_cluster() + def _update_tracing_relations(self): tracing_relations = self.model.relations["tracing"] if not tracing_relations: @@ -493,22 +404,8 @@ def _update_tracing_relations(self): [(p, self.tempo.get_receiver_url(p, self.ingress)) for p in requested_receivers] ) - self._restart_if_receivers_changed() - self._update_tempo_cluster() - def _restart_if_receivers_changed(self): - # if the receivers have changed, we need to reconfigure tempo - self.unit.status = MaintenanceStatus("reconfiguring Tempo...") - updated = self._update_tempo_config() - if not updated: - logger.debug("Config not updated; skipping tempo restart") - if updated: - restarted = self.tempo.is_tempo_service_defined and self.tempo.restart() - if not restarted: - # assume that this will be handled at the next pebble-ready - logger.debug("Cannot reconfigure/restart tempo at this time.") - def _requested_receivers(self) -> Tuple[ReceiverProtocol, ...]: """List what receivers we should activate, based on the active tracing relations.""" # we start with the sum of the requested endpoints from the requirers @@ -519,51 +416,6 @@ def _requested_receivers(self) -> Tuple[ReceiverProtocol, ...]: requested_receivers.update(self.tempo.enabled_receivers) return tuple(requested_receivers) - @property - def version(self) -> str: - """Reports the current Tempo version.""" - container = self.unit.get_container("tempo") - if container.can_connect() and container.get_services("tempo"): - try: - return self._get_version() or "" - # Catching Exception is not ideal, but we don't care much for the error here, and just - # default to setting a blank version since there isn't much the admin can do! - except Exception as e: - logger.warning("unable to get version from API: %s", str(e)) - logger.debug(e, exc_info=True) - return "" - return "" - - def _get_version(self) -> Optional[str]: - """Fetch the version from the running workload using the Tempo CLI. - - Helper function. - """ - container = self.unit.get_container("tempo") - proc = container.exec(["/tempo", "-version"]) - out, err = proc.wait_output() - - # example output: - # / # /tempo --version - # tempo, version (branch: HEAD, revision: fd5743d5d) - # build user: - # build date: - # go version: go1.18.5 - # platform: linux/amd64 - - if version_head := re.search(r"tempo, version (.*) \(branch: (.*), revision: (.*)\)", out): - v_head, b_head, r_head = version_head.groups() - version = f"{v_head}:{b_head}/{r_head}" - elif version_headless := re.search(r"tempo, version (\S+)", out): - version = version_headless.groups()[0] - else: - logger.warning( - f"unable to determine tempo workload version: output {out} " - f"does not match any known pattern" - ) - return - return version - def server_cert(self): """For charm tracing.""" self._update_server_cert() @@ -589,7 +441,8 @@ def tempo_otlp_http_endpoint(self) -> Optional[str]: return None - def peers(self) -> Optional[Set[ops.model.Unit]]: + @property + def _peers(self) -> Optional[Set[ops.model.Unit]]: relation = self.model.get_relation("tempo-peers") if not relation: return None diff --git a/src/coordinator.py b/src/coordinator.py index cc87926..68d2ccf 100644 --- a/src/coordinator.py +++ b/src/coordinator.py @@ -66,17 +66,13 @@ def _is_recommended(): # python>=3.11 would support roles >= RECOMMENDED_DEPLOYMENT def get_deployment_inconsistencies( - self, clustered: bool, scaled: bool, has_workers: bool, is_worker_node: bool, has_s3: bool + self, has_s3: bool ) -> List[str]: """Determine whether the deployment as a whole is consistent. Return a list of failed consistency checks. """ return self._get_deployment_inconsistencies( - clustered=clustered, - scaled=scaled, - has_workers=has_workers, - is_worker_node=is_worker_node, has_s3=has_s3, coherent=self.is_coherent, missing_roles=self.missing_roles, @@ -84,10 +80,6 @@ def get_deployment_inconsistencies( @staticmethod def _get_deployment_inconsistencies( - clustered: bool, - scaled: bool, - has_workers: bool, - is_worker_node: bool, has_s3: bool, coherent: bool, missing_roles: Set[TempoRole] = None, @@ -96,17 +88,9 @@ def _get_deployment_inconsistencies( Return a list of failed consistency checks. """ - failures = [] - # is_monolith = not (scaled or clustered or has_workers) and is_worker_node - - if not is_worker_node and not has_workers: - failures.append("Tempo must either be a worker node or have some workers.") if not has_s3: - if scaled: - failures.append("Tempo is scaled but has no s3 integration.") - if clustered: - failures.append("Tempo is clustered but has no s3 integration.") + failures.append("Tempo has no s3 integration.") elif not coherent: failures.append(f"Incoherent coordinator: missing roles: {missing_roles}.") return failures diff --git a/src/tempo.py b/src/tempo.py index f7d5989..eaa28ee 100644 --- a/src/tempo.py +++ b/src/tempo.py @@ -6,28 +6,21 @@ import logging import re import socket -from pathlib import Path from subprocess import CalledProcessError, getoutput from typing import Dict, List, Optional, Sequence, Tuple from urllib.parse import urlparse -import ops -import tenacity -import yaml from charms.tempo_k8s.v2.tracing import ( ReceiverProtocol, receiver_protocol_to_transport_protocol, ) from charms.traefik_route_k8s.v0.traefik_route import TraefikRouteRequirer -from ops import ModelError -from ops.pebble import Layer logger = logging.getLogger(__name__) class Tempo: """Class representing the Tempo client workload configuration.""" - config_path = "/etc/tempo/tempo.yaml" # cert path on charm container @@ -72,19 +65,17 @@ class Tempo: all_ports = {**server_ports, **receiver_ports} def __init__( - self, - container: ops.Container, - external_host: Optional[str] = None, - enable_receivers: Optional[Sequence[ReceiverProtocol]] = None, - run_worker_node: bool = True, + self, + external_host: Optional[str] = None, + enable_receivers: Optional[Sequence[ReceiverProtocol]] = None, + use_tls: bool = False ): # ports source: https://github.com/grafana/tempo/blob/main/example/docker-compose/local/docker-compose.yaml # fqdn, if an ingress is not available, else the ingress address. self._external_hostname = external_host or socket.getfqdn() - self.container = container - self.run_worker_node = run_worker_node self.enabled_receivers = enable_receivers or [] + self.use_tls = use_tls @property def tempo_http_server_port(self) -> int: @@ -139,157 +130,6 @@ def get_receiver_url(self, protocol: ReceiverProtocol, ingress: TraefikRouteRequ return f"{url}:{receiver_port}" - def plan(self): - """Update pebble plan and start the tempo-ready service.""" - if not self.run_worker_node: - logger.info("will not plan tempo container: this node is a coordinator") - return - - self.container.add_layer("tempo", self.pebble_layer, combine=True) - self.container.add_layer("tempo-ready", self.tempo_ready_layer, combine=True) - try: - self.container.replan() - # is not autostart-enabled, we just run it once on pebble-ready. - self.container.start("tempo-ready") - except ops.pebble.ChangeError: - # replan failed likely because address was still in use. try to (re)start tempo with backoff as a fallback - restart_result = self.restart() - if not restart_result: - logger.exception( - "Starting tempo failed with a ChangeError and restart attempts didn't resolve the issue" - ) - - def update_config( - self, - requested_receivers: Sequence[ReceiverProtocol], - s3_config: Optional[dict] = None, - peers: Optional[List[str]] = None, - ) -> bool: - """Generate a config and push it to the container it if necessary.""" - container = self.container - if not container.can_connect(): - logger.debug("Container can't connect: config update skipped.") - return False - - new_config = self.generate_config(requested_receivers, s3_config, peers) - - if self.get_current_config() != new_config: - logger.debug("Pushing new config to container...") - container.push( - self.config_path, - yaml.safe_dump(new_config), - make_dirs=True, - ) - return True - return False - - @property - def is_tempo_service_defined(self) -> bool: - """Check that the tempo service is present in the plan.""" - try: - self.container.get_service("tempo") - return True - except (ModelError, ops.pebble.ConnectionError): - return False - - def restart(self) -> bool: - """Try to restart the tempo service.""" - # restarting tempo can cause errors such as: - # Could not bind to :3200 - Address in use - # probably because of some lag with releasing the port. We restart tempo 'too quickly' - # and it fails to start. As a workaround, see the @retry logic below. - if not self.run_worker_node: - return False - - if not self.is_tempo_service_defined: - self.plan() - - return self._restart() - - @tenacity.retry( - # if restart FAILS (this function returns False) - retry=tenacity.retry_if_result(lambda r: r is False), - # we wait 3, 9, 27... up to 40 seconds between tries - wait=tenacity.wait_exponential(multiplier=3, min=1, max=40), - # we give up after 20 attempts - stop=tenacity.stop_after_attempt(20), - # if there's any exceptions throughout, raise them - reraise=True, - ) - def _restart(self) -> bool: - if not self.container.can_connect(): - return False - - try: - is_started = self.is_running - except ModelError: - is_started = False - - # verify if tempo is already inactive, then try to start a new instance - if is_started: - try: - self.container.restart("tempo") - except ops.pebble.ChangeError: - # if tempo fails to start, we'll try again after some backoff - return False - else: - try: - self.container.start("tempo") - except ops.pebble.ChangeError: - # if tempo fails to start, we'll try again after retry backoff - return False - - # set the notice to start checking for tempo server readiness so we don't have to - # wait for an update-status - self.container.start("tempo-ready") - return True - - def shutdown(self): - """Gracefully shutdown the tempo process.""" - for service in ["tempo", "tempo-ready"]: - if self.container.get_service(service).is_running(): - self.container.stop(service) - logger.info(f"stopped {service}") - - @property - def is_running(self) -> bool: - return self.container.get_service("tempo").is_running() - - def get_current_config(self) -> Optional[dict]: - """Fetch the current configuration from the container.""" - if not self.container.can_connect(): - return None - try: - return yaml.safe_load(self.container.pull(self.config_path)) - except ops.pebble.PathError: - return None - - def configure_tls(self, *, cert: str, key: str, ca: str): - """Push cert, key and CA to the tempo container.""" - # we save the cacert in the charm container too (for notices) - Path(self.server_cert_path).write_text(ca) - - self.container.push(self.tls_cert_path, cert, make_dirs=True) - self.container.push(self.tls_key_path, key, make_dirs=True) - self.container.push(self.tls_ca_path, ca, make_dirs=True) - self.container.exec(["update-ca-certificates"]) - - def clear_tls_config(self): - """Remove cert, key and CA files from the tempo container.""" - self.container.remove_path(self.tls_cert_path, recursive=True) - self.container.remove_path(self.tls_key_path, recursive=True) - self.container.remove_path(self.tls_ca_path, recursive=True) - - @property - def tls_ready(self) -> bool: - """Whether cert, key, and ca paths are found on disk and Tempo is ready to use tls.""" - if not self.container.can_connect(): - return False - return all( - self.container.exists(tls_path) - for tls_path in (self.tls_cert_path, self.tls_key_path, self.tls_ca_path) - ) - def _build_server_config(self): server_config = { "http_listen_port": self.tempo_http_server_port, @@ -297,7 +137,7 @@ def _build_server_config(self): # otherwise it will default to 9595 and make promtail bork "grpc_listen_port": self.tempo_grpc_server_port, } - if self.tls_ready: + if self.use_tls: for cfg in ("http_tls_config", "grpc_tls_config"): server_config[cfg] = { # type: ignore "cert_file": str(self.tls_cert_path), @@ -310,10 +150,10 @@ def _build_server_config(self): return server_config def generate_config( - self, - receivers: Sequence[ReceiverProtocol], - s3_config: Optional[dict] = None, - peers: Optional[List[str]] = None, + self, + receivers: Sequence[ReceiverProtocol], + s3_config: Optional[dict] = None, + peers: Optional[List[str]] = None, ) -> dict: """Generate the Tempo configuration. @@ -360,7 +200,7 @@ def generate_config( "storage": self._build_storage_config(s3_config), } - if self.tls_ready: + if self.use_tls: # cfr: # https://grafana.com/docs/tempo/latest/configuration/network/tls/#client-configuration tls_config = { @@ -374,7 +214,9 @@ def generate_config( config["ingester_client"] = {"grpc_client_config": tls_config} config["metrics_generator_client"] = {"grpc_client_config": tls_config} - config["querier"]["frontend_worker"].update({"grpc_client_config": tls_config}) + config["querier"]["frontend_worker"].update( + {"grpc_client_config": tls_config} + ) # this is not an error. config["memberlist"].update(tls_config) @@ -422,52 +264,9 @@ def _build_storage_config(self, s3_config: Optional[dict] = None): ) return {"trace": storage_config} - def can_scale(self) -> bool: - """Return whether this tempo instance can scale, i.e., whether s3 is configured.""" - config = self.get_current_config() - if not config: - return False - return config["storage"]["trace"]["backend"] == "s3" - - @property - def pebble_layer(self) -> Layer: - """Generate the pebble layer for the Tempo container.""" - target = "scalable-single-binary" if self.can_scale() else "all" - return Layer( - { - "services": { - "tempo": { - "override": "replace", - "summary": "Main Tempo layer", - "command": f'/bin/sh -c "/tempo -config.file={self.config_path} -target {target} | tee {self.log_path}"', - "startup": "enabled", - } - }, - } - ) - - @property - def tempo_ready_layer(self) -> Layer: - """Generate the pebble layer to fire the tempo-ready custom notice.""" - s = "s" if self.tls_ready else "" - return Layer( - { - "services": { - "tempo-ready": { - "override": "replace", - "summary": "Notify charm when tempo is ready", - "command": f"""watch -n 5 '[ $(wget -q -O- --no-check-certificate http{s}://localhost:{self.tempo_http_server_port}/ready) = "ready" ] && - ( /charm/bin/pebble notify {self.tempo_ready_notice_key} ) || - ( echo "tempo not ready" )'""", - "startup": "disabled", - } - }, - } - ) - def is_ready(self): """Whether the tempo built-in readiness check reports 'ready'.""" - if self.tls_ready: + if self.use_tls: tls, s = f" --cacert {self.server_cert_path}", "s" else: tls = s = "" @@ -490,7 +289,7 @@ def _build_receivers_config(self, receivers: Sequence[ReceiverProtocol]): # noq if not receivers_set: logger.warning("No receivers set. Tempo will be up but not functional.") - if self.tls_ready: + if self.use_tls: receiver_config = { "tls": { "ca_file": str(self.tls_ca_path), @@ -530,8 +329,3 @@ def _build_receivers_config(self, receivers: Sequence[ReceiverProtocol]): # noq config["jaeger"] = {"protocols": jaeger_config} return config - - def receive_tempo_ready_notice(self): - """Handle the tempo-ready-pebble-notice event by turning off the notice itself.""" - if self.container.get_services("tempo-ready"): - self.container.stop("tempo-ready") diff --git a/src/tempo_coordinator.py b/src/tempo_coordinator.py deleted file mode 100644 index 01b60ee..0000000 --- a/src/tempo_coordinator.py +++ /dev/null @@ -1,286 +0,0 @@ -#!/usr/bin/env python3 -# Copyright 2023 Canonical -# See LICENSE file for licensing details. - -"""Tempo coordinator.""" - -import logging -from collections import Counter -from pathlib import Path -from typing import Any, Dict, Iterable, Optional, Set - -from tempo_cluster import ( - TEMPO_CERT_FILE, - TEMPO_CLIENT_CA_FILE, - TEMPO_KEY_FILE, - TempoClusterProvider, - TempoRole, -) -from tempo_config import _S3ConfigData - -logger = logging.getLogger(__name__) - -MINIMAL_DEPLOYMENT = { - # from official docs: - TempoRole.compactor: 1, - TempoRole.distributor: 1, - TempoRole.ingester: 1, - TempoRole.querier: 1, - TempoRole.query_frontend: 1, - TempoRole.query_scheduler: 1, - TempoRole.store_gateway: 1, - # we add: - TempoRole.ruler: 1, - TempoRole.alertmanager: 1, -} -"""The minimal set of roles that need to be allocated for the -deployment to be considered consistent (otherwise we set blocked). On top of what tempo itself lists as required, -we add alertmanager.""" - -RECOMMENDED_DEPLOYMENT = Counter( - { - TempoRole.ingester: 3, - TempoRole.querier: 2, - TempoRole.query_scheduler: 2, - TempoRole.alertmanager: 1, - TempoRole.query_frontend: 1, - TempoRole.ruler: 1, - TempoRole.store_gateway: 1, - TempoRole.compactor: 1, - TempoRole.distributor: 1, - } -) -"""The set of roles that need to be allocated for the -deployment to be considered robust according to the official recommendations/guidelines.""" - -# The minimum number of workers per role to enable replication -REPLICATION_MIN_WORKERS = 3 -# The default amount of replicas to set when there are enough workers per role; -# otherwise, replicas will be "disabled" by setting the amount to 1 -DEFAULT_REPLICATION = 3 - - -class TempoCoordinator: - """Tempo coordinator.""" - - def __init__( - self, - cluster_provider: TempoClusterProvider, - # TODO: use and import tls requirer obj - tls_requirer: Any = None, - # TODO: use and import s3 requirer obj - s3_requirer: Any = None, - # root and recovery data need to be in distinct directories - root_data_dir: Path = Path("/data"), - recovery_data_dir: Path = Path("/recovery-data"), - ): - self._cluster_provider = cluster_provider - self._s3_requirer = s3_requirer # type: ignore - self._tls_requirer = tls_requirer # type: ignore - self._root_data_dir = root_data_dir - self._recovery_data_dir = recovery_data_dir - - def is_coherent(self) -> bool: - """Return True if the roles list makes up a coherent tempo deployment.""" - roles: Iterable[TempoRole] = self._cluster_provider.gather_roles().keys() - return set(roles).issuperset(MINIMAL_DEPLOYMENT) - - def missing_roles(self) -> Set[TempoRole]: - """If the coordinator is incoherent, return the roles that are missing for it to become so.""" - roles: Iterable[TempoRole] = self._cluster_provider.gather_roles().keys() - return set(MINIMAL_DEPLOYMENT).difference(roles) - - def is_recommended(self) -> bool: - """Return True if is a superset of the minimal deployment. - - I.E. If all required roles are assigned, and each role has the recommended amount of units. - """ - roles: Dict[TempoRole, int] = self._cluster_provider.gather_roles() - # python>=3.11 would support roles >= RECOMMENDED_DEPLOYMENT - for role, min_n in RECOMMENDED_DEPLOYMENT.items(): - if roles.get(role, 0) < min_n: - return False - return True - - def build_config( - self, s3_config_data: Optional[_S3ConfigData], tls_enabled: bool = False - ) -> Dict[str, Any]: - """Generate shared config file for tempo. - - Reference: https://grafana.com/docs/tempo/latest/configure/ - """ - tempo_config: Dict[str, Any] = { - "common": {}, - "alertmanager": self._build_alertmanager_config(), - "alertmanager_storage": self._build_alertmanager_storage_config(), - "compactor": self._build_compactor_config(), - "ingester": self._build_ingester_config(), - "ruler": self._build_ruler_config(), - "ruler_storage": self._build_ruler_storage_config(), - "store_gateway": self._build_store_gateway_config(), - "blocks_storage": self._build_blocks_storage_config(), - "memberlist": self._build_memberlist_config(), - } - - if s3_config_data: - tempo_config["common"]["storage"] = self._build_s3_storage_config(s3_config_data) - self._update_s3_storage_config(tempo_config["blocks_storage"], "blocks") - self._update_s3_storage_config(tempo_config["ruler_storage"], "rules") - self._update_s3_storage_config(tempo_config["alertmanager_storage"], "alerts") - - # todo: TLS config for memberlist - if tls_enabled: - tempo_config["server"] = self._build_tls_config() - - return tempo_config - - def _build_tls_config(self) -> Dict[str, Any]: - tls_config = { - "cert_file": TEMPO_CERT_FILE, - "key_file": TEMPO_KEY_FILE, - "client_ca_file": TEMPO_CLIENT_CA_FILE, - "client_auth_type": "RequestClientCert", - } - return { - "http_tls_config": tls_config, - "grpc_tls_config": tls_config, - } - - # data_dir: - # The Tempo Alertmanager stores the alerts state on local disk at the location configured using -alertmanager.storage.path. - # Should be persisted if not replicated - - # sharding_ring.replication_factor: int - # (advanced) The replication factor to use when sharding the alertmanager. - def _build_alertmanager_config(self) -> Dict[str, Any]: - alertmanager_scale = len( - self._cluster_provider.gather_addresses_by_role().get(TempoRole.alertmanager, []) - ) - return { - "data_dir": str(self._root_data_dir / "data-alertmanager"), - "sharding_ring": { - "replication_factor": ( - 1 if alertmanager_scale < REPLICATION_MIN_WORKERS else DEFAULT_REPLICATION - ) - }, - } - - # filesystem: dir - # The Tempo Alertmanager also periodically stores the alert state in the storage backend configured with -alertmanager-storage.backend (For Recovery) - def _build_alertmanager_storage_config(self) -> Dict[str, Any]: - return { - "filesystem": { - "dir": str(self._recovery_data_dir / "data-alertmanager"), - }, - } - - # data_dir: - # Directory to temporarily store blocks during compaction. - # This directory is not required to be persisted between restarts. - def _build_compactor_config(self) -> Dict[str, Any]: - return { - "data_dir": str(self._root_data_dir / "data-compactor"), - } - - # ring.replication_factor: int - # Number of ingesters that each time series is replicated to. This option - # needs be set on ingesters, distributors, queriers and rulers when running in - # microservices mode. - def _build_ingester_config(self) -> Dict[str, Any]: - ingester_scale = len( - self._cluster_provider.gather_addresses_by_role().get(TempoRole.ingester, []) - ) - return { - "ring": { - "replication_factor": ( - 1 if ingester_scale < REPLICATION_MIN_WORKERS else DEFAULT_REPLICATION - ) - } - } - - # rule_path: - # Directory to store temporary rule files loaded by the Prometheus rule managers. - # This directory is not required to be persisted between restarts. - def _build_ruler_config(self) -> Dict[str, Any]: - return { - "rule_path": str(self._root_data_dir / "data-ruler"), - } - - # sharding_ring.replication_factor: - # (advanced) The replication factor to use when sharding blocks. This option - # needs be set both on the store-gateway, querier and ruler when running in - # microservices mode. - def _build_store_gateway_config(self) -> Dict[str, Any]: - store_gateway_scale = len( - self._cluster_provider.gather_addresses_by_role().get(TempoRole.store_gateway, []) - ) - return { - "sharding_ring": { - "replication_factor": ( - 1 if store_gateway_scale < REPLICATION_MIN_WORKERS else DEFAULT_REPLICATION - ) - } - } - - # filesystem: dir - # Storage backend reads Prometheus recording rules from the local filesystem. - # The ruler looks for tenant rules in the self._root_data_dir/rules/ directory. The ruler requires rule files to be in the Prometheus format. - def _build_ruler_storage_config(self) -> Dict[str, Any]: - return { - "filesystem": { - "dir": str(self._root_data_dir / "rules"), - }, - } - - # bucket_store: sync_dir - # Directory to store synchronized TSDB index headers. This directory is not - # required to be persisted between restarts, but it's highly recommended - - # filesystem: dir - # Tempo upload blocks (of metrics) to the object storage at period interval. - - # tsdb: dir - # Directory to store TSDBs (including WAL) in the ingesters. - # This directory is required to be persisted between restarts. - - # The TSDB dir is used by ingesters, while the filesystem: dir is the "object storage" - # Ingesters are expected to upload TSDB blocks to filesystem: dir every 2h. - def _build_blocks_storage_config(self) -> Dict[str, Any]: - return { - "bucket_store": { - "sync_dir": str(self._root_data_dir / "tsdb-sync"), - }, - "filesystem": { - "dir": str(self._root_data_dir / "blocks"), - }, - "tsdb": { - "dir": str(self._root_data_dir / "tsdb"), - }, - } - - def _build_s3_storage_config(self, s3_config_data: _S3ConfigData) -> Dict[str, Any]: - return { - "backend": "s3", - "s3": s3_config_data.model_dump(), - } - - def _update_s3_storage_config(self, storage_config: Dict[str, Any], prefix_name: str) -> None: - """Update S3 storage configuration in `storage_config`. - - If the key 'filesystem' is present in `storage_config`, remove it and add a new key - 'storage_prefix' with the value of `prefix_name` for the S3 bucket. - """ - if "filesystem" in storage_config: - storage_config.pop("filesystem") - storage_config["storage_prefix"] = prefix_name - - # cluster_label: - # (advanced) The cluster label is an optional string to include in outbound - # packets and gossip streams. Other members in the memberlist cluster will - # discard any message whose label doesn't match the configured one, unless the - def _build_memberlist_config(self) -> Dict[str, Any]: - coordinator = self._cluster_provider._charm - return { - "cluster_label": f"{coordinator.model.name}_{coordinator.model.uuid}_{coordinator.app.name}", - "join_members": list(self._cluster_provider.gather_addresses()), - }