Skip to content

Commit

Permalink
Enable scaling query-frontend (#55)
Browse files Browse the repository at this point in the history
* scale frontend query

* fix replication factor + add tests

* fix factor

* reorder integrations

* use ingress hostname

* lint

* add external hostname property

* Update src/tempo.py

Co-authored-by: PietroPasotti <[email protected]>
Signed-off-by: Michael Dmitry <[email protected]>

* remove duplication

* fix tests

* fix UT

---------

Signed-off-by: Michael Dmitry <[email protected]>
Co-authored-by: PietroPasotti <[email protected]>
  • Loading branch information
michaeldmitry and PietroPasotti authored Oct 10, 2024
1 parent 1c411a5 commit fd0e882
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 23 deletions.
7 changes: 7 additions & 0 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

"""Charmed Operator for Tempo; a lightweight object storage based tracing backend."""
import logging
import re
import socket
from pathlib import Path
from subprocess import CalledProcessError, getoutput
Expand Down Expand Up @@ -49,6 +50,7 @@ def __init__(self, *args):
self.tempo = Tempo(
requested_receivers=self._requested_receivers,
retention_period_hours=self._trace_retention_period_hours,
external_hostname=self._external_hostname,
)
# set alert_rules_path="", as we don't want to populate alert rules into the relation databag
# we only need `self._remote_write.endpoints`
Expand Down Expand Up @@ -114,6 +116,11 @@ def __init__(self, *args):
######################
# UTILITY PROPERTIES #
######################
@property
def _external_hostname(self) -> str:
"""Return the external hostname."""
return re.sub(r"^https?:\/\/", "", self._external_url)

@property
def hostname(self) -> str:
"""Unit's hostname."""
Expand Down
7 changes: 6 additions & 1 deletion src/nginx_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,12 @@ def _locations(self, upstream: str, grpc: bool, tls: bool) -> List[Dict[str, Any
{
"directive": "grpc_pass" if grpc else "proxy_pass",
"args": [f"{protocol}://{upstream}"],
}
},
# if a server is down, no need to wait for a long time to pass on the request to the next available server
{
"directive": "proxy_connect_timeout",
"args": ["5s"],
},
],
}
]
Expand Down
36 changes: 22 additions & 14 deletions src/tempo.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,11 @@ def __init__(
self,
requested_receivers: Callable[[], "Tuple[ReceiverProtocol, ...]"],
retention_period_hours: int,
external_hostname: str,
):
self._receivers_getter = requested_receivers
self._retention_period_hours = retention_period_hours
self._external_hostname = external_hostname

@property
def tempo_http_server_port(self) -> int:
Expand All @@ -79,10 +81,10 @@ def config(
distributor=self._build_distributor_config(
self._receivers_getter(), coordinator.tls_available
),
ingester=self._build_ingester_config(),
ingester=self._build_ingester_config(coordinator.cluster.gather_addresses_by_role()),
memberlist=self._build_memberlist_config(coordinator.cluster.gather_addresses()),
compactor=self._build_compactor_config(),
querier=self._build_querier_config(coordinator.cluster.gather_addresses_by_role()),
querier=self._build_querier_config(self._external_hostname),
storage=self._build_storage_config(coordinator._s3_config),
metrics_generator=self._build_metrics_generator_config(
coordinator.remote_write_endpoints_getter(), coordinator.tls_available # type: ignore
Expand All @@ -109,8 +111,9 @@ def config(
config.metrics_generator_client = tempo_config.Client(
grpc_client_config=tempo_config.ClientTLS(**tls_config)
)
# use ingress hostname here, as the query-frontend worker would be pointing at the ingress url
config.querier.frontend_worker.grpc_client_config = tempo_config.ClientTLS(
**tls_config
**{**tls_config, "tls_server_name": self._external_hostname},
)
config.memberlist = config.memberlist.model_copy(update=tls_config)

Expand Down Expand Up @@ -192,20 +195,15 @@ def _build_storage_config(self, s3_config: dict):
)
return tempo_config.Storage(trace=storage_config)

def _build_querier_config(self, roles_addresses: Dict[str, Set[str]]):
"""Build querier config"""
# if distributor and query-frontend have the same address, then the mode of operation is 'all'.
query_frontend_addresses = roles_addresses.get(tempo_config.TempoRole.query_frontend)
distributor_addresses = roles_addresses.get(tempo_config.TempoRole.distributor)
def _build_querier_config(self, external_hostname: str):
"""Build querier config.
if not query_frontend_addresses or query_frontend_addresses == distributor_addresses:
addr = "localhost"
else:
addr = query_frontend_addresses.pop()
Use coordinator's external_hostname to loadbalance across query-frontend worker instances if any.
"""

return tempo_config.Querier(
frontend_worker=tempo_config.FrontendWorker(
frontend_address=f"{addr}:{self.tempo_grpc_server_port}"
frontend_address=f"{external_hostname}:{self.tempo_grpc_server_port}"
),
)

Expand Down Expand Up @@ -233,15 +231,25 @@ def _build_memberlist_config(
join_members=([f"{peer}:{self.memberlist_port}" for peer in peers] if peers else []),
)

def _build_ingester_config(self):
def _build_ingester_config(self, roles_addresses: Dict[str, Set[str]]):
"""Build ingester config"""
ingester_addresses = roles_addresses.get(tempo_config.TempoRole.ingester)
# the length of time after a trace has not received spans to consider it complete and flush it
# cut the head block when it hits this number of traces or ...
# this much time passes
return tempo_config.Ingester(
trace_idle_period="10s",
max_block_bytes=100,
max_block_duration="30m",
# replication_factor=3 to ensure that the Tempo cluster can still be
# functional if one of the ingesters is down.
lifecycler=tempo_config.Lifecycler(
ring=tempo_config.Ring(
replication_factor=(
3 if ingester_addresses and len(ingester_addresses) >= 3 else 1
)
),
),
)

def _build_distributor_config(
Expand Down
10 changes: 8 additions & 2 deletions src/tempo_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,16 @@ class Kvstore(BaseModel):
class Ring(BaseModel):
"""Ring schema."""

kvstore: Kvstore
kvstore: Optional[Kvstore] = None
replication_factor: int


class Lifecycler(BaseModel):
"""Lifecycler schema."""

ring: Ring


class Memberlist(BaseModel):
"""Memberlist schema."""

Expand Down Expand Up @@ -173,7 +179,7 @@ class Ingester(BaseModel):
trace_idle_period: str
max_block_bytes: int
max_block_duration: str
lifecycler: Optional[Ring] = None
lifecycler: Lifecycler


class FrontendWorker(BaseModel):
Expand Down
9 changes: 5 additions & 4 deletions tests/scenario/test_tempo_clustered.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import datetime
import json
import socket
from dataclasses import replace
from unittest.mock import MagicMock, patch

Expand Down Expand Up @@ -39,7 +40,7 @@ def coordinator_with_initial_config():
@pytest.fixture
def all_worker_with_initial_config(all_worker: Relation, coordinator_with_initial_config):

initial_config = Tempo(lambda: ("otlp_http",), 42).config(
initial_config = Tempo(lambda: ("otlp_http",), 42, socket.getfqdn()).config(
coordinator_with_initial_config.return_value
)

Expand Down Expand Up @@ -156,7 +157,7 @@ def test_tempo_restart_on_ingress_v2_changed(
# THEN
# Tempo pushes a new config to the all_worker
new_config = get_tempo_config(state_out)
expected_config = Tempo(lambda: ["otlp_http", requested_protocol], 720).config(
coordinator_with_initial_config.return_value
)
expected_config = Tempo(
lambda: ["otlp_http", requested_protocol], 720, socket.getfqdn()
).config(coordinator_with_initial_config.return_value)
assert new_config == expected_config
31 changes: 29 additions & 2 deletions tests/unit/test_tempo.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@
)
def test_tempo_distributor_config(protocols, use_tls, expected_config):
assert (
Tempo(None, 720)._build_distributor_config(protocols, use_tls).receivers == expected_config
Tempo(None, 720, "hostname")._build_distributor_config(protocols, use_tls).receivers
== expected_config
)


Expand All @@ -107,4 +108,30 @@ def test_tempo_distributor_config(protocols, use_tls, expected_config):
),
)
def test_tempo_memberlist_config(peers, expected_config):
assert Tempo(None, 720)._build_memberlist_config(peers) == expected_config
assert Tempo(None, 720, "hostname")._build_memberlist_config(peers) == expected_config


@pytest.mark.parametrize(
"addresses, expected_replication",
(
(
{"querier": {"addr1"}, "ingester": {"addr1", "addr2", "addr3"}},
3,
),
(
{"querier": {"addr1"}},
1,
),
(
{"ingester": {"addr2"}, "querier": {"addr1"}},
1,
),
),
)
def test_tempo_ingester_config(addresses, expected_replication):
assert (
Tempo(None, 720, "hostname")
._build_ingester_config(addresses)
.lifecycler.ring.replication_factor
== expected_replication
)

0 comments on commit fd0e882

Please sign in to comment.