From fd0e8827f1e125492ac9cf8d3732e4c03ade7684 Mon Sep 17 00:00:00 2001 From: Michael Dmitry <33381599+michaeldmitry@users.noreply.github.com> Date: Thu, 10 Oct 2024 08:55:49 +0300 Subject: [PATCH] Enable scaling `query-frontend` (#55) * scale frontend query * fix replication factor + add tests * fix factor * reorder integrations * use ingress hostname * lint * add external hostname property * Update src/tempo.py Co-authored-by: PietroPasotti Signed-off-by: Michael Dmitry <33381599+michaeldmitry@users.noreply.github.com> * remove duplication * fix tests * fix UT --------- Signed-off-by: Michael Dmitry <33381599+michaeldmitry@users.noreply.github.com> Co-authored-by: PietroPasotti --- src/charm.py | 7 +++++ src/nginx_config.py | 7 ++++- src/tempo.py | 36 ++++++++++++++++---------- src/tempo_config.py | 10 +++++-- tests/scenario/test_tempo_clustered.py | 9 ++++--- tests/unit/test_tempo.py | 31 ++++++++++++++++++++-- 6 files changed, 77 insertions(+), 23 deletions(-) diff --git a/src/charm.py b/src/charm.py index 4082dbc..7b40f2c 100755 --- a/src/charm.py +++ b/src/charm.py @@ -4,6 +4,7 @@ """Charmed Operator for Tempo; a lightweight object storage based tracing backend.""" import logging +import re import socket from pathlib import Path from subprocess import CalledProcessError, getoutput @@ -49,6 +50,7 @@ def __init__(self, *args): self.tempo = Tempo( requested_receivers=self._requested_receivers, retention_period_hours=self._trace_retention_period_hours, + external_hostname=self._external_hostname, ) # set alert_rules_path="", as we don't want to populate alert rules into the relation databag # we only need `self._remote_write.endpoints` @@ -114,6 +116,11 @@ def __init__(self, *args): ###################### # UTILITY PROPERTIES # ###################### + @property + def _external_hostname(self) -> str: + """Return the external hostname.""" + return re.sub(r"^https?:\/\/", "", self._external_url) + @property def hostname(self) -> str: """Unit's hostname.""" diff --git a/src/nginx_config.py b/src/nginx_config.py index b9ff7d0..e7e5c9f 100644 --- a/src/nginx_config.py +++ b/src/nginx_config.py @@ -152,7 +152,12 @@ def _locations(self, upstream: str, grpc: bool, tls: bool) -> List[Dict[str, Any { "directive": "grpc_pass" if grpc else "proxy_pass", "args": [f"{protocol}://{upstream}"], - } + }, + # if a server is down, no need to wait for a long time to pass on the request to the next available server + { + "directive": "proxy_connect_timeout", + "args": ["5s"], + }, ], } ] diff --git a/src/tempo.py b/src/tempo.py index 1a7c344..644b217 100644 --- a/src/tempo.py +++ b/src/tempo.py @@ -50,9 +50,11 @@ def __init__( self, requested_receivers: Callable[[], "Tuple[ReceiverProtocol, ...]"], retention_period_hours: int, + external_hostname: str, ): self._receivers_getter = requested_receivers self._retention_period_hours = retention_period_hours + self._external_hostname = external_hostname @property def tempo_http_server_port(self) -> int: @@ -79,10 +81,10 @@ def config( distributor=self._build_distributor_config( self._receivers_getter(), coordinator.tls_available ), - ingester=self._build_ingester_config(), + ingester=self._build_ingester_config(coordinator.cluster.gather_addresses_by_role()), memberlist=self._build_memberlist_config(coordinator.cluster.gather_addresses()), compactor=self._build_compactor_config(), - querier=self._build_querier_config(coordinator.cluster.gather_addresses_by_role()), + querier=self._build_querier_config(self._external_hostname), storage=self._build_storage_config(coordinator._s3_config), metrics_generator=self._build_metrics_generator_config( coordinator.remote_write_endpoints_getter(), coordinator.tls_available # type: ignore @@ -109,8 +111,9 @@ def config( config.metrics_generator_client = tempo_config.Client( grpc_client_config=tempo_config.ClientTLS(**tls_config) ) + # use ingress hostname here, as the query-frontend worker would be pointing at the ingress url config.querier.frontend_worker.grpc_client_config = tempo_config.ClientTLS( - **tls_config + **{**tls_config, "tls_server_name": self._external_hostname}, ) config.memberlist = config.memberlist.model_copy(update=tls_config) @@ -192,20 +195,15 @@ def _build_storage_config(self, s3_config: dict): ) return tempo_config.Storage(trace=storage_config) - def _build_querier_config(self, roles_addresses: Dict[str, Set[str]]): - """Build querier config""" - # if distributor and query-frontend have the same address, then the mode of operation is 'all'. - query_frontend_addresses = roles_addresses.get(tempo_config.TempoRole.query_frontend) - distributor_addresses = roles_addresses.get(tempo_config.TempoRole.distributor) + def _build_querier_config(self, external_hostname: str): + """Build querier config. - if not query_frontend_addresses or query_frontend_addresses == distributor_addresses: - addr = "localhost" - else: - addr = query_frontend_addresses.pop() + Use coordinator's external_hostname to loadbalance across query-frontend worker instances if any. + """ return tempo_config.Querier( frontend_worker=tempo_config.FrontendWorker( - frontend_address=f"{addr}:{self.tempo_grpc_server_port}" + frontend_address=f"{external_hostname}:{self.tempo_grpc_server_port}" ), ) @@ -233,8 +231,9 @@ def _build_memberlist_config( join_members=([f"{peer}:{self.memberlist_port}" for peer in peers] if peers else []), ) - def _build_ingester_config(self): + def _build_ingester_config(self, roles_addresses: Dict[str, Set[str]]): """Build ingester config""" + ingester_addresses = roles_addresses.get(tempo_config.TempoRole.ingester) # the length of time after a trace has not received spans to consider it complete and flush it # cut the head block when it hits this number of traces or ... # this much time passes @@ -242,6 +241,15 @@ def _build_ingester_config(self): trace_idle_period="10s", max_block_bytes=100, max_block_duration="30m", + # replication_factor=3 to ensure that the Tempo cluster can still be + # functional if one of the ingesters is down. + lifecycler=tempo_config.Lifecycler( + ring=tempo_config.Ring( + replication_factor=( + 3 if ingester_addresses and len(ingester_addresses) >= 3 else 1 + ) + ), + ), ) def _build_distributor_config( diff --git a/src/tempo_config.py b/src/tempo_config.py index a7a46da..f40098d 100644 --- a/src/tempo_config.py +++ b/src/tempo_config.py @@ -127,10 +127,16 @@ class Kvstore(BaseModel): class Ring(BaseModel): """Ring schema.""" - kvstore: Kvstore + kvstore: Optional[Kvstore] = None replication_factor: int +class Lifecycler(BaseModel): + """Lifecycler schema.""" + + ring: Ring + + class Memberlist(BaseModel): """Memberlist schema.""" @@ -173,7 +179,7 @@ class Ingester(BaseModel): trace_idle_period: str max_block_bytes: int max_block_duration: str - lifecycler: Optional[Ring] = None + lifecycler: Lifecycler class FrontendWorker(BaseModel): diff --git a/tests/scenario/test_tempo_clustered.py b/tests/scenario/test_tempo_clustered.py index d43346d..8597f20 100644 --- a/tests/scenario/test_tempo_clustered.py +++ b/tests/scenario/test_tempo_clustered.py @@ -1,5 +1,6 @@ import datetime import json +import socket from dataclasses import replace from unittest.mock import MagicMock, patch @@ -39,7 +40,7 @@ def coordinator_with_initial_config(): @pytest.fixture def all_worker_with_initial_config(all_worker: Relation, coordinator_with_initial_config): - initial_config = Tempo(lambda: ("otlp_http",), 42).config( + initial_config = Tempo(lambda: ("otlp_http",), 42, socket.getfqdn()).config( coordinator_with_initial_config.return_value ) @@ -156,7 +157,7 @@ def test_tempo_restart_on_ingress_v2_changed( # THEN # Tempo pushes a new config to the all_worker new_config = get_tempo_config(state_out) - expected_config = Tempo(lambda: ["otlp_http", requested_protocol], 720).config( - coordinator_with_initial_config.return_value - ) + expected_config = Tempo( + lambda: ["otlp_http", requested_protocol], 720, socket.getfqdn() + ).config(coordinator_with_initial_config.return_value) assert new_config == expected_config diff --git a/tests/unit/test_tempo.py b/tests/unit/test_tempo.py index 0a48c51..447f93b 100644 --- a/tests/unit/test_tempo.py +++ b/tests/unit/test_tempo.py @@ -83,7 +83,8 @@ ) def test_tempo_distributor_config(protocols, use_tls, expected_config): assert ( - Tempo(None, 720)._build_distributor_config(protocols, use_tls).receivers == expected_config + Tempo(None, 720, "hostname")._build_distributor_config(protocols, use_tls).receivers + == expected_config ) @@ -107,4 +108,30 @@ def test_tempo_distributor_config(protocols, use_tls, expected_config): ), ) def test_tempo_memberlist_config(peers, expected_config): - assert Tempo(None, 720)._build_memberlist_config(peers) == expected_config + assert Tempo(None, 720, "hostname")._build_memberlist_config(peers) == expected_config + + +@pytest.mark.parametrize( + "addresses, expected_replication", + ( + ( + {"querier": {"addr1"}, "ingester": {"addr1", "addr2", "addr3"}}, + 3, + ), + ( + {"querier": {"addr1"}}, + 1, + ), + ( + {"ingester": {"addr2"}, "querier": {"addr1"}}, + 1, + ), + ), +) +def test_tempo_ingester_config(addresses, expected_replication): + assert ( + Tempo(None, 720, "hostname") + ._build_ingester_config(addresses) + .lifecycler.ring.replication_factor + == expected_replication + )