Skip to content

Commit

Permalink
load-balance between scaled-up coordinator units (#59)
Browse files Browse the repository at this point in the history
* add peers

* update lib

* add datamodel

* make private

* nits

* add recomm note
  • Loading branch information
michaeldmitry authored Oct 16, 2024
1 parent 279ef68 commit 87d7d9b
Show file tree
Hide file tree
Showing 9 changed files with 182 additions and 53 deletions.
5 changes: 4 additions & 1 deletion lib/charms/tempo_coordinator_k8s/v0/charm_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ def my_tracing_endpoint(self) -> Optional[str]:
- every event as a span (including custom events)
- every charm method call (except dunders) as a span
We recommend that you scale up your tracing provider and relate it to an ingress so that your tracing requests
go through the ingress and get load balanced across all units. Otherwise, if the provider's leader goes down, your tracing goes down.
## TLS support
If your charm integrates with a TLS provider which is also trusted by the tracing provider (the Tempo charm),
Expand Down Expand Up @@ -269,7 +272,7 @@ def _remove_stale_otel_sdk_packages():
# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version

LIBPATCH = 2
LIBPATCH = 3

PYDEPS = ["opentelemetry-exporter-otlp-proto-http==1.21.0"]

Expand Down
9 changes: 6 additions & 3 deletions lib/charms/tempo_coordinator_k8s/v0/tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def __init__(self, *args):
`TracingEndpointRequirer.request_protocols(*protocol:str, relation:Optional[Relation])` method.
Using this method also allows you to use per-relation protocols.
Units of provider charms obtain the tempo endpoint to which they will push their traces by calling
Units of requirer charms obtain the tempo endpoint to which they will push their traces by calling
`TracingEndpointRequirer.get_endpoint(protocol: str)`, where `protocol` is, for example:
- `otlp_grpc`
- `otlp_http`
Expand All @@ -44,7 +44,10 @@ def __init__(self, *args):
If the `protocol` is not in the list of protocols that the charm requested at endpoint set-up time,
the library will raise an error.
## Requirer Library Usage
We recommend that you scale up your tracing provider and relate it to an ingress so that your tracing requests
go through the ingress and get load balanced across all units. Otherwise, if the provider's leader goes down, your tracing goes down.
## Provider Library Usage
The `TracingEndpointProvider` object may be used by charms to manage relations with their
trace sources. For this purposes a Tempo-like charm needs to do two things
Expand Down Expand Up @@ -107,7 +110,7 @@ def __init__(self, *args):

# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version
LIBPATCH = 2
LIBPATCH = 3

PYDEPS = ["pydantic"]

Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ cryptography
# lib/charms/tempo_coordinator_k8s/v1/tracing.py
pydantic>=2
# lib/charms/prometheus_k8s/v0/prometheus_scrape.py
cosl>=0.0.41
cosl>=0.0.42
66 changes: 59 additions & 7 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import socket
from pathlib import Path
from subprocess import CalledProcessError, getoutput
from typing import Dict, Optional, Set, Tuple, cast, get_args
from typing import Dict, List, Optional, Set, Tuple, cast, get_args

import ops
from charms.grafana_k8s.v0.grafana_source import GrafanaSourceProvider
Expand All @@ -24,6 +24,7 @@
)
from charms.traefik_k8s.v0.traefik_route import TraefikRouteRequirer
from cosl.coordinated_workers.coordinator import ClusterRolesConfig, Coordinator
from cosl.coordinated_workers.interface import DatabagModel
from cosl.coordinated_workers.nginx import CA_CERT_PATH, CERT_PATH, KEY_PATH
from ops import CollectStatusEvent
from ops.charm import CharmBase
Expand All @@ -33,6 +34,14 @@
from tempo_config import TEMPO_ROLES_CONFIG

logger = logging.getLogger(__name__)
PEERS_RELATION_ENDPOINT_NAME = "peers"


class PeerData(DatabagModel):
"""Databag model for the "peers" relation between coordinator units."""

fqdn: str
"""FQDN hostname of this coordinator unit."""


@trace_charm(
Expand All @@ -50,7 +59,6 @@ def __init__(self, *args):
self.tempo = Tempo(
requested_receivers=self._requested_receivers,
retention_period_hours=self._trace_retention_period_hours,
external_hostname=self._external_hostname,
)
# set alert_rules_path="", as we don't want to populate alert rules into the relation databag
# we only need `self._remote_write.endpoints`
Expand Down Expand Up @@ -97,6 +105,11 @@ def __init__(self, *args):
],
)

# peer
self.framework.observe(
self.on[PEERS_RELATION_ENDPOINT_NAME].relation_created, self._on_peers_relation_created
)

# refuse to handle any other event as we can't possibly know what to do.
if not self.coordinator.can_handle_events:
# logging is handled by the Coordinator object
Expand All @@ -116,6 +129,11 @@ def __init__(self, *args):
######################
# UTILITY PROPERTIES #
######################
@property
def peers(self):
"""Fetch the "peers" peer relation."""
return self.model.get_relation(PEERS_RELATION_ENDPOINT_NAME)

@property
def _external_hostname(self) -> str:
"""Return the external hostname."""
Expand Down Expand Up @@ -148,13 +166,17 @@ def _external_url(self) -> str:
return self._internal_url

@property
def _internal_url(self) -> str:
"""Returns workload's FQDN."""
def _scheme(self) -> str:
"""Return the URI scheme that should be used when communicating with this unit."""
scheme = "http"
if self.are_certificates_on_disk:
scheme = "https"
return scheme

return f"{scheme}://{self.hostname}"
@property
def _internal_url(self) -> str:
"""Return the locally addressable, FQDN based unit address."""
return f"{self._scheme}://{self.hostname}"

@property
def are_certificates_on_disk(self) -> bool:
Expand Down Expand Up @@ -186,6 +208,8 @@ def enabled_receivers(self) -> Set[str]:
##################
# EVENT HANDLERS #
##################
def _on_peers_relation_created(self, _: ops.RelationCreatedEvent):
self.update_peer_data()

def _on_cert_handler_changed(self, _: ops.RelationChangedEvent):
# sync the server CA cert with the charm container.
Expand Down Expand Up @@ -213,6 +237,19 @@ def _on_collect_status(self, e: CollectStatusEvent):
###################
# UTILITY METHODS #
###################

def update_peer_data(self) -> None:
"""Update peer unit data bucket with this unit's hostname."""
if self.peers and self.peers.data:
PeerData(fqdn=self.hostname).dump(self.peers.data[self.unit])

def get_peer_data(self, unit: ops.Unit) -> Optional[PeerData]:
"""Get peer data from a given unit data bucket."""
if not (self.peers and self.peers.data):
return None

return PeerData.load(self.peers.data.get(unit, {}))

def _update_ingress_relation(self) -> None:
"""Make sure the traefik route is up-to-date."""
if not self.unit.is_leader():
Expand Down Expand Up @@ -318,20 +355,35 @@ def _ingress_config(self) -> dict:
# see https://doc.traefik.io/traefik/v2.0/user-guides/grpc/#with-http-h2c
http_services[
f"juju-{self.model.name}-{self.model.app.name}-service-{sanitized_protocol}"
] = {"loadBalancer": {"servers": [{"url": f"h2c://{self.hostname}:{port}"}]}}
] = {"loadBalancer": {"servers": self._build_lb_server_config("h2c", port)}}
else:
# anything else, including secured GRPC, can use _internal_url
# ref https://doc.traefik.io/traefik/v2.0/user-guides/grpc/#with-https
http_services[
f"juju-{self.model.name}-{self.model.app.name}-service-{sanitized_protocol}"
] = {"loadBalancer": {"servers": [{"url": f"{self._internal_url}:{port}"}]}}
] = {"loadBalancer": {"servers": self._build_lb_server_config(self._scheme, port)}}
return {
"http": {
"routers": http_routers,
"services": http_services,
},
}

def _build_lb_server_config(self, scheme: str, port: int) -> List[Dict[str, str]]:
"""build the server portion of the loadbalancer config of Traefik ingress."""

def to_url(fqdn: str):
return {"url": f"{scheme}://{fqdn}:{port}"}

urls = [to_url(self.hostname)]
if self.peers:
for peer in self.peers.units:
peer_data = self.get_peer_data(peer)
if peer_data:
urls.append(to_url(peer_data.fqdn))

return urls

def get_receiver_url(self, protocol: ReceiverProtocol):
"""Return the receiver endpoint URL based on the protocol.
Expand Down
56 changes: 36 additions & 20 deletions src/tempo.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

"""Tempo workload configuration and client."""
import logging
import re
from typing import Any, Callable, Dict, List, Optional, Sequence, Set, Tuple

import yaml
Expand Down Expand Up @@ -50,11 +51,9 @@ def __init__(
self,
requested_receivers: Callable[[], "Tuple[ReceiverProtocol, ...]"],
retention_period_hours: int,
external_hostname: str,
):
self._receivers_getter = requested_receivers
self._retention_period_hours = retention_period_hours
self._external_hostname = external_hostname

@property
def tempo_http_server_port(self) -> int:
Expand All @@ -74,7 +73,6 @@ def config(
Only activate the provided receivers.
"""

config = tempo_config.TempoConfig(
auth_enabled=False,
server=self._build_server_config(coordinator.tls_available),
Expand All @@ -84,7 +82,7 @@ def config(
ingester=self._build_ingester_config(coordinator.cluster.gather_addresses_by_role()),
memberlist=self._build_memberlist_config(coordinator.cluster.gather_addresses()),
compactor=self._build_compactor_config(),
querier=self._build_querier_config(self._external_hostname),
querier=self._build_querier_config(coordinator.cluster.gather_addresses_by_role()),
storage=self._build_storage_config(coordinator._s3_config),
metrics_generator=self._build_metrics_generator_config(
coordinator.remote_write_endpoints_getter(), coordinator.tls_available # type: ignore
Expand All @@ -95,30 +93,40 @@ def config(
config.overrides = self._build_overrides_config()

if coordinator.tls_available:
# cfr:
# https://grafana.com/docs/tempo/latest/configuration/network/tls/#client-configuration
tls_config = {
"tls_enabled": True,
"tls_cert_path": self.tls_cert_path,
"tls_key_path": self.tls_key_path,
"tls_ca_path": self.tls_ca_path,
# try with fqdn?
"tls_server_name": coordinator.hostname,
}

tls_config = self._build_tls_config(coordinator.cluster.gather_addresses())

config.ingester_client = tempo_config.Client(
grpc_client_config=tempo_config.ClientTLS(**tls_config)
)
config.metrics_generator_client = tempo_config.Client(
grpc_client_config=tempo_config.ClientTLS(**tls_config)
)
# use ingress hostname here, as the query-frontend worker would be pointing at the ingress url

config.querier.frontend_worker.grpc_client_config = tempo_config.ClientTLS(
**{**tls_config, "tls_server_name": self._external_hostname},
**tls_config,
)

config.memberlist = config.memberlist.model_copy(update=tls_config)

return yaml.dump(config.model_dump(mode="json", by_alias=True, exclude_none=True))

def _build_tls_config(self, workers_addrs: Tuple[str, ...]):
"""Build TLS config to be used by Tempo's internal clients to communicate with each other."""

# cfr:
# https://grafana.com/docs/tempo/latest/configuration/network/tls/#client-configuration
return {
"tls_enabled": True,
"tls_cert_path": self.tls_cert_path,
"tls_key_path": self.tls_key_path,
"tls_ca_path": self.tls_ca_path,
# Tempo's internal components contact each other using their IPs not their DNS names
# and we don't provide IP sans to Tempo's certificate. So, we need to provide workers' DNS names
# as tls_server_name to verify the certificate against this name not against the IP.
"tls_server_name": workers_addrs[0] if len(workers_addrs) > 0 else "",
}

def _build_overrides_config(self):
return tempo_config.Overrides(
defaults=tempo_config.Defaults(
Expand Down Expand Up @@ -195,15 +203,23 @@ def _build_storage_config(self, s3_config: dict):
)
return tempo_config.Storage(trace=storage_config)

def _build_querier_config(self, external_hostname: str):
def _build_querier_config(self, roles_addresses: Dict[str, Set[str]]):
"""Build querier config.
Use coordinator's external_hostname to loadbalance across query-frontend worker instances if any.
Use query-frontend workers' service fqdn to loadbalance across query-frontend worker instances if any.
"""

query_frontend_addresses = roles_addresses.get(tempo_config.TempoRole.query_frontend)
if not query_frontend_addresses:
svc_addr = "localhost"
else:
addresses = sorted(query_frontend_addresses)
query_frontend_addr = next(iter(addresses))
# remove "tempo-worker-0." from "tempo-worker-0.tempo-endpoints.cluster.local.svc"
# to extract the worker's headless service
svc_addr = re.sub(r"^[^.]+\.", "", query_frontend_addr)
return tempo_config.Querier(
frontend_worker=tempo_config.FrontendWorker(
frontend_address=f"{external_hostname}:{self.tempo_grpc_server_port}"
frontend_address=f"{svc_addr}:{self.tempo_grpc_server_port}"
),
)

Expand Down
19 changes: 17 additions & 2 deletions tests/scenario/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@

import pytest
from ops import ActiveStatus
from scenario import Container, Context, Relation
from scenario import Container, Context, PeerRelation, Relation

from charm import TempoCoordinatorCharm
from charm import PEERS_RELATION_ENDPOINT_NAME, TempoCoordinatorCharm


@pytest.fixture()
Expand Down Expand Up @@ -56,6 +56,14 @@ def all_worker():
return Relation(
"tempo-cluster",
remote_app_data={"role": '"all"'},
remote_units_data={
0: {
"address": json.dumps("localhost"),
"juju_topology": json.dumps(
{"application": "worker", "unit": "worker/0", "charm_name": "tempo"}
),
}
},
)


Expand All @@ -69,6 +77,13 @@ def remote_write():
)


@pytest.fixture(scope="function")
def peer():
return PeerRelation(
endpoint=PEERS_RELATION_ENDPOINT_NAME, peers_data={1: {"fqdn": json.dumps("1.2.3.4")}}
)


@pytest.fixture(scope="function")
def nginx_container():
return Container(
Expand Down
Loading

0 comments on commit 87d7d9b

Please sign in to comment.