diff --git a/lib/charms/tempo_coordinator_k8s/v0/charm_tracing.py b/lib/charms/tempo_coordinator_k8s/v0/charm_tracing.py index 1e7ff84..3aea50f 100644 --- a/lib/charms/tempo_coordinator_k8s/v0/charm_tracing.py +++ b/lib/charms/tempo_coordinator_k8s/v0/charm_tracing.py @@ -69,6 +69,9 @@ def my_tracing_endpoint(self) -> Optional[str]: - every event as a span (including custom events) - every charm method call (except dunders) as a span +We recommend that you scale up your tracing provider and relate it to an ingress so that your tracing requests +go through the ingress and get load balanced across all units. Otherwise, if the provider's leader goes down, your tracing goes down. + ## TLS support If your charm integrates with a TLS provider which is also trusted by the tracing provider (the Tempo charm), @@ -269,7 +272,7 @@ def _remove_stale_otel_sdk_packages(): # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 2 +LIBPATCH = 3 PYDEPS = ["opentelemetry-exporter-otlp-proto-http==1.21.0"] diff --git a/lib/charms/tempo_coordinator_k8s/v0/tracing.py b/lib/charms/tempo_coordinator_k8s/v0/tracing.py index 1f92867..2035dff 100644 --- a/lib/charms/tempo_coordinator_k8s/v0/tracing.py +++ b/lib/charms/tempo_coordinator_k8s/v0/tracing.py @@ -34,7 +34,7 @@ def __init__(self, *args): `TracingEndpointRequirer.request_protocols(*protocol:str, relation:Optional[Relation])` method. Using this method also allows you to use per-relation protocols. -Units of provider charms obtain the tempo endpoint to which they will push their traces by calling +Units of requirer charms obtain the tempo endpoint to which they will push their traces by calling `TracingEndpointRequirer.get_endpoint(protocol: str)`, where `protocol` is, for example: - `otlp_grpc` - `otlp_http` @@ -44,7 +44,10 @@ def __init__(self, *args): If the `protocol` is not in the list of protocols that the charm requested at endpoint set-up time, the library will raise an error. -## Requirer Library Usage +We recommend that you scale up your tracing provider and relate it to an ingress so that your tracing requests +go through the ingress and get load balanced across all units. Otherwise, if the provider's leader goes down, your tracing goes down. + +## Provider Library Usage The `TracingEndpointProvider` object may be used by charms to manage relations with their trace sources. For this purposes a Tempo-like charm needs to do two things @@ -107,7 +110,7 @@ def __init__(self, *args): # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 2 +LIBPATCH = 3 PYDEPS = ["pydantic"] diff --git a/requirements.txt b/requirements.txt index d5ff0f7..37eaa6c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -19,4 +19,4 @@ cryptography # lib/charms/tempo_coordinator_k8s/v1/tracing.py pydantic>=2 # lib/charms/prometheus_k8s/v0/prometheus_scrape.py -cosl>=0.0.41 +cosl>=0.0.42 diff --git a/src/charm.py b/src/charm.py index 7b40f2c..7404d2b 100755 --- a/src/charm.py +++ b/src/charm.py @@ -8,7 +8,7 @@ import socket from pathlib import Path from subprocess import CalledProcessError, getoutput -from typing import Dict, Optional, Set, Tuple, cast, get_args +from typing import Dict, List, Optional, Set, Tuple, cast, get_args import ops from charms.grafana_k8s.v0.grafana_source import GrafanaSourceProvider @@ -24,6 +24,7 @@ ) from charms.traefik_k8s.v0.traefik_route import TraefikRouteRequirer from cosl.coordinated_workers.coordinator import ClusterRolesConfig, Coordinator +from cosl.coordinated_workers.interface import DatabagModel from cosl.coordinated_workers.nginx import CA_CERT_PATH, CERT_PATH, KEY_PATH from ops import CollectStatusEvent from ops.charm import CharmBase @@ -33,6 +34,14 @@ from tempo_config import TEMPO_ROLES_CONFIG logger = logging.getLogger(__name__) +PEERS_RELATION_ENDPOINT_NAME = "peers" + + +class PeerData(DatabagModel): + """Databag model for the "peers" relation between coordinator units.""" + + fqdn: str + """FQDN hostname of this coordinator unit.""" @trace_charm( @@ -50,7 +59,6 @@ def __init__(self, *args): self.tempo = Tempo( requested_receivers=self._requested_receivers, retention_period_hours=self._trace_retention_period_hours, - external_hostname=self._external_hostname, ) # set alert_rules_path="", as we don't want to populate alert rules into the relation databag # we only need `self._remote_write.endpoints` @@ -97,6 +105,11 @@ def __init__(self, *args): ], ) + # peer + self.framework.observe( + self.on[PEERS_RELATION_ENDPOINT_NAME].relation_created, self._on_peers_relation_created + ) + # refuse to handle any other event as we can't possibly know what to do. if not self.coordinator.can_handle_events: # logging is handled by the Coordinator object @@ -116,6 +129,11 @@ def __init__(self, *args): ###################### # UTILITY PROPERTIES # ###################### + @property + def peers(self): + """Fetch the "peers" peer relation.""" + return self.model.get_relation(PEERS_RELATION_ENDPOINT_NAME) + @property def _external_hostname(self) -> str: """Return the external hostname.""" @@ -148,13 +166,17 @@ def _external_url(self) -> str: return self._internal_url @property - def _internal_url(self) -> str: - """Returns workload's FQDN.""" + def _scheme(self) -> str: + """Return the URI scheme that should be used when communicating with this unit.""" scheme = "http" if self.are_certificates_on_disk: scheme = "https" + return scheme - return f"{scheme}://{self.hostname}" + @property + def _internal_url(self) -> str: + """Return the locally addressable, FQDN based unit address.""" + return f"{self._scheme}://{self.hostname}" @property def are_certificates_on_disk(self) -> bool: @@ -186,6 +208,8 @@ def enabled_receivers(self) -> Set[str]: ################## # EVENT HANDLERS # ################## + def _on_peers_relation_created(self, _: ops.RelationCreatedEvent): + self.update_peer_data() def _on_cert_handler_changed(self, _: ops.RelationChangedEvent): # sync the server CA cert with the charm container. @@ -213,6 +237,19 @@ def _on_collect_status(self, e: CollectStatusEvent): ################### # UTILITY METHODS # ################### + + def update_peer_data(self) -> None: + """Update peer unit data bucket with this unit's hostname.""" + if self.peers and self.peers.data: + PeerData(fqdn=self.hostname).dump(self.peers.data[self.unit]) + + def get_peer_data(self, unit: ops.Unit) -> Optional[PeerData]: + """Get peer data from a given unit data bucket.""" + if not (self.peers and self.peers.data): + return None + + return PeerData.load(self.peers.data.get(unit, {})) + def _update_ingress_relation(self) -> None: """Make sure the traefik route is up-to-date.""" if not self.unit.is_leader(): @@ -318,13 +355,13 @@ def _ingress_config(self) -> dict: # see https://doc.traefik.io/traefik/v2.0/user-guides/grpc/#with-http-h2c http_services[ f"juju-{self.model.name}-{self.model.app.name}-service-{sanitized_protocol}" - ] = {"loadBalancer": {"servers": [{"url": f"h2c://{self.hostname}:{port}"}]}} + ] = {"loadBalancer": {"servers": self._build_lb_server_config("h2c", port)}} else: # anything else, including secured GRPC, can use _internal_url # ref https://doc.traefik.io/traefik/v2.0/user-guides/grpc/#with-https http_services[ f"juju-{self.model.name}-{self.model.app.name}-service-{sanitized_protocol}" - ] = {"loadBalancer": {"servers": [{"url": f"{self._internal_url}:{port}"}]}} + ] = {"loadBalancer": {"servers": self._build_lb_server_config(self._scheme, port)}} return { "http": { "routers": http_routers, @@ -332,6 +369,21 @@ def _ingress_config(self) -> dict: }, } + def _build_lb_server_config(self, scheme: str, port: int) -> List[Dict[str, str]]: + """build the server portion of the loadbalancer config of Traefik ingress.""" + + def to_url(fqdn: str): + return {"url": f"{scheme}://{fqdn}:{port}"} + + urls = [to_url(self.hostname)] + if self.peers: + for peer in self.peers.units: + peer_data = self.get_peer_data(peer) + if peer_data: + urls.append(to_url(peer_data.fqdn)) + + return urls + def get_receiver_url(self, protocol: ReceiverProtocol): """Return the receiver endpoint URL based on the protocol. diff --git a/src/tempo.py b/src/tempo.py index 644b217..0839bc1 100644 --- a/src/tempo.py +++ b/src/tempo.py @@ -4,6 +4,7 @@ """Tempo workload configuration and client.""" import logging +import re from typing import Any, Callable, Dict, List, Optional, Sequence, Set, Tuple import yaml @@ -50,11 +51,9 @@ def __init__( self, requested_receivers: Callable[[], "Tuple[ReceiverProtocol, ...]"], retention_period_hours: int, - external_hostname: str, ): self._receivers_getter = requested_receivers self._retention_period_hours = retention_period_hours - self._external_hostname = external_hostname @property def tempo_http_server_port(self) -> int: @@ -74,7 +73,6 @@ def config( Only activate the provided receivers. """ - config = tempo_config.TempoConfig( auth_enabled=False, server=self._build_server_config(coordinator.tls_available), @@ -84,7 +82,7 @@ def config( ingester=self._build_ingester_config(coordinator.cluster.gather_addresses_by_role()), memberlist=self._build_memberlist_config(coordinator.cluster.gather_addresses()), compactor=self._build_compactor_config(), - querier=self._build_querier_config(self._external_hostname), + querier=self._build_querier_config(coordinator.cluster.gather_addresses_by_role()), storage=self._build_storage_config(coordinator._s3_config), metrics_generator=self._build_metrics_generator_config( coordinator.remote_write_endpoints_getter(), coordinator.tls_available # type: ignore @@ -95,30 +93,40 @@ def config( config.overrides = self._build_overrides_config() if coordinator.tls_available: - # cfr: - # https://grafana.com/docs/tempo/latest/configuration/network/tls/#client-configuration - tls_config = { - "tls_enabled": True, - "tls_cert_path": self.tls_cert_path, - "tls_key_path": self.tls_key_path, - "tls_ca_path": self.tls_ca_path, - # try with fqdn? - "tls_server_name": coordinator.hostname, - } + + tls_config = self._build_tls_config(coordinator.cluster.gather_addresses()) + config.ingester_client = tempo_config.Client( grpc_client_config=tempo_config.ClientTLS(**tls_config) ) config.metrics_generator_client = tempo_config.Client( grpc_client_config=tempo_config.ClientTLS(**tls_config) ) - # use ingress hostname here, as the query-frontend worker would be pointing at the ingress url + config.querier.frontend_worker.grpc_client_config = tempo_config.ClientTLS( - **{**tls_config, "tls_server_name": self._external_hostname}, + **tls_config, ) + config.memberlist = config.memberlist.model_copy(update=tls_config) return yaml.dump(config.model_dump(mode="json", by_alias=True, exclude_none=True)) + def _build_tls_config(self, workers_addrs: Tuple[str, ...]): + """Build TLS config to be used by Tempo's internal clients to communicate with each other.""" + + # cfr: + # https://grafana.com/docs/tempo/latest/configuration/network/tls/#client-configuration + return { + "tls_enabled": True, + "tls_cert_path": self.tls_cert_path, + "tls_key_path": self.tls_key_path, + "tls_ca_path": self.tls_ca_path, + # Tempo's internal components contact each other using their IPs not their DNS names + # and we don't provide IP sans to Tempo's certificate. So, we need to provide workers' DNS names + # as tls_server_name to verify the certificate against this name not against the IP. + "tls_server_name": workers_addrs[0] if len(workers_addrs) > 0 else "", + } + def _build_overrides_config(self): return tempo_config.Overrides( defaults=tempo_config.Defaults( @@ -195,15 +203,23 @@ def _build_storage_config(self, s3_config: dict): ) return tempo_config.Storage(trace=storage_config) - def _build_querier_config(self, external_hostname: str): + def _build_querier_config(self, roles_addresses: Dict[str, Set[str]]): """Build querier config. - Use coordinator's external_hostname to loadbalance across query-frontend worker instances if any. + Use query-frontend workers' service fqdn to loadbalance across query-frontend worker instances if any. """ - + query_frontend_addresses = roles_addresses.get(tempo_config.TempoRole.query_frontend) + if not query_frontend_addresses: + svc_addr = "localhost" + else: + addresses = sorted(query_frontend_addresses) + query_frontend_addr = next(iter(addresses)) + # remove "tempo-worker-0." from "tempo-worker-0.tempo-endpoints.cluster.local.svc" + # to extract the worker's headless service + svc_addr = re.sub(r"^[^.]+\.", "", query_frontend_addr) return tempo_config.Querier( frontend_worker=tempo_config.FrontendWorker( - frontend_address=f"{external_hostname}:{self.tempo_grpc_server_port}" + frontend_address=f"{svc_addr}:{self.tempo_grpc_server_port}" ), ) diff --git a/tests/scenario/conftest.py b/tests/scenario/conftest.py index 7ce008d..1785fe6 100644 --- a/tests/scenario/conftest.py +++ b/tests/scenario/conftest.py @@ -3,9 +3,9 @@ import pytest from ops import ActiveStatus -from scenario import Container, Context, Relation +from scenario import Container, Context, PeerRelation, Relation -from charm import TempoCoordinatorCharm +from charm import PEERS_RELATION_ENDPOINT_NAME, TempoCoordinatorCharm @pytest.fixture() @@ -56,6 +56,14 @@ def all_worker(): return Relation( "tempo-cluster", remote_app_data={"role": '"all"'}, + remote_units_data={ + 0: { + "address": json.dumps("localhost"), + "juju_topology": json.dumps( + {"application": "worker", "unit": "worker/0", "charm_name": "tempo"} + ), + } + }, ) @@ -69,6 +77,13 @@ def remote_write(): ) +@pytest.fixture(scope="function") +def peer(): + return PeerRelation( + endpoint=PEERS_RELATION_ENDPOINT_NAME, peers_data={1: {"fqdn": json.dumps("1.2.3.4")}} + ) + + @pytest.fixture(scope="function") def nginx_container(): return Container( diff --git a/tests/scenario/test_ingressed_tracing.py b/tests/scenario/test_ingressed_tracing.py index 7ac93ff..652dd71 100644 --- a/tests/scenario/test_ingressed_tracing.py +++ b/tests/scenario/test_ingressed_tracing.py @@ -31,11 +31,20 @@ def test_external_url_present(context, base_state, s3, all_worker): } +@pytest.mark.parametrize( + "add_peer, expected_servers_count", + ((True, 2),), +) @patch("socket.getfqdn", lambda: "1.2.3.4") -def test_ingress_relation_set_with_dynamic_config(context, base_state, s3, all_worker): +def test_ingress_relation_set_with_dynamic_config( + add_peer, expected_servers_count, context, base_state, s3, all_worker, peer +): # WHEN ingress is related with external_host ingress = Relation("ingress", remote_app_data={"external_host": "1.2.3.4", "scheme": "http"}) + state = replace(base_state, relations=[ingress, s3, all_worker]) + if add_peer: + state = replace(base_state, relations=[ingress, s3, all_worker, peer]) with patch("charm.TempoCoordinatorCharm.is_workload_ready", lambda _: False): out = context.run(context.on.relation_joined(ingress), state) @@ -83,25 +92,60 @@ def test_ingress_relation_set_with_dynamic_config(context, base_state, s3, all_w }, "services": { f"juju-{state.model.name}-{charm_name}-service-jaeger-thrift-http": { - "loadBalancer": {"servers": [{"url": "http://1.2.3.4:14268"}]} + "loadBalancer": { + "servers": [ + {"url": "http://1.2.3.4:14268"} + for server in range(expected_servers_count) + ] + } }, f"juju-{state.model.name}-{charm_name}-service-otlp-http": { - "loadBalancer": {"servers": [{"url": "http://1.2.3.4:4318"}]} + "loadBalancer": { + "servers": [ + {"url": "http://1.2.3.4:4318"} + for server in range(expected_servers_count) + ] + } }, f"juju-{state.model.name}-{charm_name}-service-tempo-http": { - "loadBalancer": {"servers": [{"url": "http://1.2.3.4:3200"}]} + "loadBalancer": { + "servers": [ + {"url": "http://1.2.3.4:3200"} + for server in range(expected_servers_count) + ] + } }, f"juju-{state.model.name}-{charm_name}-service-zipkin": { - "loadBalancer": {"servers": [{"url": "http://1.2.3.4:9411"}]} + "loadBalancer": { + "servers": [ + {"url": "http://1.2.3.4:9411"} + for server in range(expected_servers_count) + ] + } }, f"juju-{state.model.name}-{charm_name}-service-otlp-grpc": { - "loadBalancer": {"servers": [{"url": "h2c://1.2.3.4:4317"}]}, + "loadBalancer": { + "servers": [ + {"url": "h2c://1.2.3.4:4317"} + for server in range(expected_servers_count) + ] + }, }, f"juju-{state.model.name}-{charm_name}-service-tempo-grpc": { - "loadBalancer": {"servers": [{"url": "h2c://1.2.3.4:9096"}]} + "loadBalancer": { + "servers": [ + {"url": "h2c://1.2.3.4:9096"} + for server in range(expected_servers_count) + ] + } }, f"juju-{state.model.name}-{charm_name}-service-jaeger-grpc": { - "loadBalancer": {"servers": [{"url": "h2c://1.2.3.4:14250"}]} + "loadBalancer": { + "servers": [ + {"url": "h2c://1.2.3.4:14250"} + for server in range(expected_servers_count) + ] + } }, }, }, diff --git a/tests/scenario/test_tempo_clustered.py b/tests/scenario/test_tempo_clustered.py index 8597f20..f3cf000 100644 --- a/tests/scenario/test_tempo_clustered.py +++ b/tests/scenario/test_tempo_clustered.py @@ -1,6 +1,5 @@ import datetime import json -import socket from dataclasses import replace from unittest.mock import MagicMock, patch @@ -40,7 +39,7 @@ def coordinator_with_initial_config(): @pytest.fixture def all_worker_with_initial_config(all_worker: Relation, coordinator_with_initial_config): - initial_config = Tempo(lambda: ("otlp_http",), 42, socket.getfqdn()).config( + initial_config = Tempo(lambda: ("otlp_http",), 720).config( coordinator_with_initial_config.return_value ) @@ -157,7 +156,7 @@ def test_tempo_restart_on_ingress_v2_changed( # THEN # Tempo pushes a new config to the all_worker new_config = get_tempo_config(state_out) - expected_config = Tempo( - lambda: ["otlp_http", requested_protocol], 720, socket.getfqdn() - ).config(coordinator_with_initial_config.return_value) + expected_config = Tempo(lambda: ["otlp_http", requested_protocol], 720).config( + coordinator_with_initial_config.return_value + ) assert new_config == expected_config diff --git a/tests/unit/test_tempo.py b/tests/unit/test_tempo.py index 447f93b..862331a 100644 --- a/tests/unit/test_tempo.py +++ b/tests/unit/test_tempo.py @@ -83,8 +83,7 @@ ) def test_tempo_distributor_config(protocols, use_tls, expected_config): assert ( - Tempo(None, 720, "hostname")._build_distributor_config(protocols, use_tls).receivers - == expected_config + Tempo(None, 720)._build_distributor_config(protocols, use_tls).receivers == expected_config ) @@ -108,7 +107,7 @@ def test_tempo_distributor_config(protocols, use_tls, expected_config): ), ) def test_tempo_memberlist_config(peers, expected_config): - assert Tempo(None, 720, "hostname")._build_memberlist_config(peers) == expected_config + assert Tempo(None, 720)._build_memberlist_config(peers) == expected_config @pytest.mark.parametrize( @@ -130,8 +129,6 @@ def test_tempo_memberlist_config(peers, expected_config): ) def test_tempo_ingester_config(addresses, expected_replication): assert ( - Tempo(None, 720, "hostname") - ._build_ingester_config(addresses) - .lifecycler.ring.replication_factor + Tempo(None, 720)._build_ingester_config(addresses).lifecycler.ring.replication_factor == expected_replication )