From 49a9ee0e50b318276699e0a560ba6cd5b8d71a5b Mon Sep 17 00:00:00 2001 From: Mateusz Kulewicz Date: Thu, 20 Jun 2024 19:25:06 +0200 Subject: [PATCH 01/19] [wip] Use and configure nginx --- charmcraft.yaml | 16 +++++ requirements.txt | 2 + src/charm.py | 23 +++++++ src/nginx.py | 103 ++++++++++++------------------- src/nginx_prometheus_exporter.py | 4 +- 5 files changed, 82 insertions(+), 66 deletions(-) diff --git a/charmcraft.yaml b/charmcraft.yaml index f82ceff..1055a84 100644 --- a/charmcraft.yaml +++ b/charmcraft.yaml @@ -16,6 +16,22 @@ description: | summary: | Tempo is a distributed tracing backend by Grafana. +containers: + nginx: + resource: nginx-image + nginx-prometheus-exporter: + resource: nginx-prometheus-exporter-image + +resources: + nginx-image: + type: oci-image + description: OCI image for nginx + upstream-source: ubuntu/nginx:1.18-22.04_beta + nginx-prometheus-exporter-image: + type: oci-image + description: OCI image for nginx-prometheus-exporter + upstream-source: nginx/nginx-prometheus-exporter:1.1.0 + links: # FIXME: create docs tree root documentation: https://discourse.charmhub.io/t/tempo-coordinator-k8s-docs-index diff --git a/requirements.txt b/requirements.txt index 0b4d4a0..3834f32 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,6 +6,8 @@ jsonschema==4.17.0 lightkube==0.11.0 lightkube-models==1.24.1.4 tenacity==8.2.3 +# crossplane is a package from nginxinc to interact with the Nginx config +crossplane # PYDEPS # lib/charms/tempo_k8s/v1/charm_tracing.py diff --git a/src/charm.py b/src/charm.py index 9dbb12d..8395b6f 100755 --- a/src/charm.py +++ b/src/charm.py @@ -28,6 +28,8 @@ from ops.model import ActiveStatus, BlockedStatus, Relation, WaitingStatus from coordinator import TempoCoordinator +from nginx import CA_CERT_PATH, CERT_PATH, KEY_PATH, Nginx +from nginx_prometheus_exporter import NGINX_PROMETHEUS_EXPORTER_PORT, NginxPrometheusExporter from tempo import Tempo from tempo_cluster import TempoClusterProvider @@ -48,6 +50,7 @@ class TempoCoordinatorCharm(CharmBase): def __init__(self, *args): super().__init__(*args) + self.ingress = TraefikRouteRequirer(self, self.model.get_relation("ingress"), "ingress") # type: ignore self.tempo_cluster = TempoClusterProvider(self) self.coordinator = TempoCoordinator(self.tempo_cluster) @@ -66,6 +69,13 @@ def __init__(self, *args): self.s3_requirer = S3Requirer(self, Tempo.s3_relation_name, Tempo.s3_bucket_name) + self.nginx = Nginx( + self, + cluster_provider=self.tempo_cluster, + server_name=self.hostname, + ) + self.nginx_prometheus_exporter = NginxPrometheusExporter(self) + # configure this tempo as a datasource in grafana self.grafana_source_provider = GrafanaSourceProvider( self, @@ -115,6 +125,13 @@ def __init__(self, *args): self.framework.observe(self.on.config_changed, self._on_config_changed) self.framework.observe(self.on.list_receivers_action, self._on_list_receivers_action) + # nginx + self.framework.observe(self.on.nginx_pebble_ready, self._on_nginx_pebble_ready) + self.framework.observe( + self.on.nginx_prometheus_exporter_pebble_ready, + self._on_nginx_prometheus_exporter_pebble_ready, + ) + # ingress ingress = self.on["ingress"] self.framework.observe(ingress.relation_created, self._on_ingress_relation_created) @@ -368,6 +385,12 @@ def _on_collect_unit_status(self, e: CollectStatusEvent): else: e.add_status(ActiveStatus()) + def _on_nginx_pebble_ready(self, _) -> None: + self.nginx.configure_pebble_layer(tls=self.tls_available) + + def _on_nginx_prometheus_exporter_pebble_ready(self, _) -> None: + self.nginx_prometheus_exporter.configure_pebble_layer() + ################### # UTILITY METHODS # ################### diff --git a/src/nginx.py b/src/nginx.py index 7f9613e..fb19cb7 100644 --- a/src/nginx.py +++ b/src/nginx.py @@ -16,7 +16,6 @@ NGINX_DIR = "/etc/nginx" NGINX_CONFIG = f"{NGINX_DIR}/nginx.conf" -NGINX_PORT = "8080" KEY_PATH = f"{NGINX_DIR}/certs/server.key" CERT_PATH = f"{NGINX_DIR}/certs/server.cert" CA_CERT_PATH = f"{NGINX_DIR}/certs/ca.cert" @@ -32,9 +31,10 @@ }, ], }, + # OTLP/HTTP ingestion { "directive": "location", - "args": ["/api/v1/push"], + "args": ["/v1/traces"], "block": [ { "directive": "proxy_pass", @@ -42,9 +42,10 @@ }, ], }, + # Zipkin ingestion { "directive": "location", - "args": ["/otlp/v1/metrics"], + "args": ["/api/v2/spans"], "block": [ { "directive": "proxy_pass", @@ -53,84 +54,60 @@ ], }, ] -LOCATIONS_ALERTMANAGER: List[Dict] = [ - { - "directive": "location", - "args": ["/alertmanager"], - "block": [ - { - "directive": "proxy_pass", - "args": ["http://alertmanager"], - }, - ], - }, - { - "directive": "location", - "args": ["/multitenant_alertmanager/status"], - "block": [ - { - "directive": "proxy_pass", - "args": ["http://alertmanager"], - }, - ], - }, +LOCATIONS_QUERY_FRONTEND: List[Dict] = [ { "directive": "location", - "args": ["/api/v1/alerts"], + "args": ["/prometheus"], "block": [ { "directive": "proxy_pass", - "args": ["http://alertmanager"], + "args": ["http://query-frontend"], }, ], }, -] -LOCATIONS_RULER: List[Dict] = [ { "directive": "location", - "args": ["/prometheus/config/v1/rules"], + "args": ["/api/echo"], "block": [ { "directive": "proxy_pass", - "args": ["http://ruler"], + "args": ["http://query-frontend"], }, ], }, { "directive": "location", - "args": ["/prometheus/api/v1/rules"], + "args": ["/api/traces"], "block": [ { "directive": "proxy_pass", - "args": ["http://ruler"], + "args": ["http://query-frontend"], }, ], }, { "directive": "location", - "args": ["/prometheus/api/v1/alerts"], + "args": ["/api/search"], "block": [ { "directive": "proxy_pass", - "args": ["http://ruler"], + "args": ["http://query-frontend"], }, ], }, { "directive": "location", - "args": ["=", "/ruler/ring"], + "args": ["/api/v2/search"], "block": [ { "directive": "proxy_pass", - "args": ["http://ruler"], + "args": ["http://query-frontend"], }, ], }, -] -LOCATIONS_QUERY_FRONTEND: List[Dict] = [ { "directive": "location", - "args": ["/prometheus"], + "args": ["/api/overrides"], "block": [ { "directive": "proxy_pass", @@ -141,7 +118,7 @@ # Buildinfo endpoint can go to any component { "directive": "location", - "args": ["=", "/api/v1/status/buildinfo"], + "args": ["=", "/api/status/buildinfo"], "block": [ { "directive": "proxy_pass", @@ -150,19 +127,6 @@ ], }, ] -LOCATIONS_COMPACTOR: List[Dict] = [ - # Compactor endpoint for uploading blocks - { - "directive": "location", - "args": ["=", "/api/v1/upload/block/"], - "block": [ - { - "directive": "proxy_pass", - "args": ["http://compactor"], - }, - ], - }, -] LOCATIONS_BASIC: List[Dict] = [ { @@ -318,14 +282,9 @@ def _locations(self, addresses_by_role: Dict[str, Set[str]]) -> List[Dict[str, A if "distributor" in roles: nginx_locations.extend(LOCATIONS_DISTRIBUTOR) - if "alertmanager" in roles: - nginx_locations.extend(LOCATIONS_ALERTMANAGER) - if "ruler" in roles: - nginx_locations.extend(LOCATIONS_RULER) + # TODO do we need ingester, querier, compactor here? they aren't an entrypoint from outside if "query-frontend" in roles: nginx_locations.extend(LOCATIONS_QUERY_FRONTEND) - if "compactor" in roles: - nginx_locations.extend(LOCATIONS_COMPACTOR) return nginx_locations def _resolver(self, custom_resolver: Optional[List[Any]] = None) -> List[Dict[str, Any]]: @@ -352,8 +311,18 @@ def _server(self, addresses_by_role: Dict[str, Set[str]], tls: bool = False) -> "directive": "server", "args": [], "block": [ - {"directive": "listen", "args": ["443", "ssl"]}, - {"directive": "listen", "args": ["[::]:443", "ssl"]}, + {"directive": "listen", "args": ["3200", "ssl"]}, + {"directive": "listen", "args": ["[::]:3200", "ssl"]}, + {"directive": "listen", "args": ["4317", "ssl"]}, + {"directive": "listen", "args": ["[::]:4317", "ssl"]}, + {"directive": "listen", "args": ["4318", "ssl"]}, + {"directive": "listen", "args": ["[::]:4318", "ssl"]}, + {"directive": "listen", "args": ["9411", "ssl"]}, + {"directive": "listen", "args": ["[::]:9411", "ssl"]}, + {"directive": "listen", "args": ["9096", "ssl"]}, + {"directive": "listen", "args": ["[::]:9096", "ssl"]}, + {"directive": "listen", "args": ["14268", "ssl"]}, + {"directive": "listen", "args": ["[::]:14268", "ssl"]}, *self._basic_auth(auth_enabled), { "directive": "proxy_set_header", @@ -373,8 +342,16 @@ def _server(self, addresses_by_role: Dict[str, Set[str]], tls: bool = False) -> "directive": "server", "args": [], "block": [ - {"directive": "listen", "args": [NGINX_PORT]}, - {"directive": "listen", "args": [f"[::]:{NGINX_PORT}"]}, + {"directive": "listen", "args": ["3200"]}, + {"directive": "listen", "args": ["[::]:3200"]}, + {"directive": "listen", "args": ["4317"]}, + {"directive": "listen", "args": ["[::]:4318"]}, + {"directive": "listen", "args": ["9411"]}, + {"directive": "listen", "args": ["[::]:9411"]}, + {"directive": "listen", "args": ["9096"]}, + {"directive": "listen", "args": ["[::]:9096"]}, + {"directive": "listen", "args": ["14268"]}, + {"directive": "listen", "args": ["[::]:14268"]}, *self._basic_auth(auth_enabled), { "directive": "proxy_set_header", diff --git a/src/nginx_prometheus_exporter.py b/src/nginx_prometheus_exporter.py index 6d516a7..539bd01 100644 --- a/src/nginx_prometheus_exporter.py +++ b/src/nginx_prometheus_exporter.py @@ -7,8 +7,6 @@ from ops import CharmBase from ops.pebble import Layer -from nginx import NGINX_PORT - logger = logging.getLogger(__name__) NGINX_PROMETHEUS_EXPORTER_PORT = "9113" @@ -38,7 +36,7 @@ def layer(self) -> Layer: "nginx": { "override": "replace", "summary": "nginx prometheus exporter", - "command": f"nginx-prometheus-exporter --no-nginx.ssl-verify --web.listen-address=:{NGINX_PROMETHEUS_EXPORTER_PORT} --nginx.scrape-uri={scheme}://127.0.0.1:{NGINX_PORT}/status", + "command": f"nginx-prometheus-exporter --no-nginx.ssl-verify --web.listen-address=:{NGINX_PROMETHEUS_EXPORTER_PORT} --nginx.scrape-uri={scheme}://127.0.0.1:3200/status", "startup": "enabled", } }, From a04e87a9c9ab701984ccbab31a40ff3ac78a0c25 Mon Sep 17 00:00:00 2001 From: Mateusz Kulewicz Date: Mon, 24 Jun 2024 10:45:28 +0200 Subject: [PATCH 02/19] generate directives per role, first tests around config generation --- src/charm.py | 18 ++++++++++ src/nginx.py | 52 +++++++++++++-------------- src/nginx_prometheus_exporter.py | 2 +- tests/scenario/conftest.py | 10 +++++- tests/scenario/test_charm_statuses.py | 7 ++-- tests/scenario/test_nginx.py | 30 ++++++++++++++++ 6 files changed, 87 insertions(+), 32 deletions(-) create mode 100644 tests/scenario/test_nginx.py diff --git a/src/charm.py b/src/charm.py index 8395b6f..bb523c0 100755 --- a/src/charm.py +++ b/src/charm.py @@ -156,6 +156,9 @@ def __init__(self, *args): # cluster self.framework.observe(self.tempo_cluster.on.changed, self._on_tempo_cluster_changed) + for evt in self.on.events().values(): + self.framework.observe(evt, self._on_event) + ###################### # UTILITY PROPERTIES # ###################### @@ -391,6 +394,21 @@ def _on_nginx_pebble_ready(self, _) -> None: def _on_nginx_prometheus_exporter_pebble_ready(self, _) -> None: self.nginx_prometheus_exporter.configure_pebble_layer() + def _on_event(self, event): + """A set of common configuration actions that should happen on every event.""" + if isinstance(event, CollectStatusEvent): + return + # plan layers + self.nginx.configure_pebble_layer(tls=self.tls_available) + self.nginx_prometheus_exporter.configure_pebble_layer() + # configure ingress + self._configure_ingress() + # update cluster relations + self._update_tempo_cluster() + # update tracing relations + self._update_tracing_relations() + + ################### # UTILITY METHODS # ################### diff --git a/src/nginx.py b/src/nginx.py index fb19cb7..4146016 100644 --- a/src/nginx.py +++ b/src/nginx.py @@ -9,7 +9,8 @@ from ops import CharmBase from ops.pebble import Layer -from tempo_cluster import TempoClusterProvider +from tempo import Tempo +from tempo_cluster import TempoClusterProvider, TempoRole logger = logging.getLogger(__name__) @@ -169,9 +170,13 @@ def configure_pebble_layer(self, tls: bool) -> None: def config(self, tls: bool = False) -> str: """Build and return the Nginx configuration.""" + full_config = self._prepare_config(tls) + + return crossplane.build(full_config) + + def _prepare_config(self, tls: bool = False) -> List[dict]: log_level = "error" addresses_by_role = self.cluster_provider.gather_addresses_by_role() - # build the complete configuration full_config = [ {"directive": "worker_processes", "args": ["5"]}, @@ -224,8 +229,7 @@ def config(self, tls: bool = False) -> str: ], }, ] - - return crossplane.build(full_config) + return full_config @property def layer(self) -> Layer: @@ -266,9 +270,9 @@ def _upstreams(self, addresses_by_role: Dict[str, Set[str]]) -> List[Dict[str, A nginx_upstreams.append( { "directive": "upstream", - "args": [role], + "args": [str(role)], "block": [ - {"directive": "server", "args": [f"{addr}:{NGINX_PORT}"]} + [directive for directive in self._get_server_by_role(addr, role)] for addr in address_set ], } @@ -276,6 +280,16 @@ def _upstreams(self, addresses_by_role: Dict[str, Set[str]]) -> List[Dict[str, A return nginx_upstreams + def _get_server_by_role(self, addr, role): + directives = [] + if role == TempoRole.distributor.value or TempoRole.all.value: + for port in Tempo.receiver_ports.values(): + directives.append({"directive": "server", "args": [f"{addr}:{port}"]}) + if role == TempoRole.query_frontend.value or TempoRole.all.value: + for port in Tempo.server_ports.values(): + directives.append({"directive": "server", "args": [f"{addr}:{port}"]}) + return directives + def _locations(self, addresses_by_role: Dict[str, Set[str]]) -> List[Dict[str, Any]]: nginx_locations = LOCATIONS_BASIC.copy() roles = addresses_by_role.keys() @@ -311,18 +325,8 @@ def _server(self, addresses_by_role: Dict[str, Set[str]], tls: bool = False) -> "directive": "server", "args": [], "block": [ - {"directive": "listen", "args": ["3200", "ssl"]}, - {"directive": "listen", "args": ["[::]:3200", "ssl"]}, - {"directive": "listen", "args": ["4317", "ssl"]}, - {"directive": "listen", "args": ["[::]:4317", "ssl"]}, - {"directive": "listen", "args": ["4318", "ssl"]}, - {"directive": "listen", "args": ["[::]:4318", "ssl"]}, - {"directive": "listen", "args": ["9411", "ssl"]}, - {"directive": "listen", "args": ["[::]:9411", "ssl"]}, - {"directive": "listen", "args": ["9096", "ssl"]}, - {"directive": "listen", "args": ["[::]:9096", "ssl"]}, - {"directive": "listen", "args": ["14268", "ssl"]}, - {"directive": "listen", "args": ["[::]:14268", "ssl"]}, + [{"directive": "listen", "args": [f"{port}", "ssl"]} for port in Tempo.all_ports.values()], + [{"directive": "listen", "args": [f"[::]:{port}", "ssl"]} for port in Tempo.all_ports.values()], *self._basic_auth(auth_enabled), { "directive": "proxy_set_header", @@ -342,16 +346,8 @@ def _server(self, addresses_by_role: Dict[str, Set[str]], tls: bool = False) -> "directive": "server", "args": [], "block": [ - {"directive": "listen", "args": ["3200"]}, - {"directive": "listen", "args": ["[::]:3200"]}, - {"directive": "listen", "args": ["4317"]}, - {"directive": "listen", "args": ["[::]:4318"]}, - {"directive": "listen", "args": ["9411"]}, - {"directive": "listen", "args": ["[::]:9411"]}, - {"directive": "listen", "args": ["9096"]}, - {"directive": "listen", "args": ["[::]:9096"]}, - {"directive": "listen", "args": ["14268"]}, - {"directive": "listen", "args": ["[::]:14268"]}, + [{"directive": "listen", "args": [f"{port}"]} for port in Tempo.all_ports.values()], + [{"directive": "listen", "args": [f"[::]:{port}"]} for port in Tempo.all_ports.values()], *self._basic_auth(auth_enabled), { "directive": "proxy_set_header", diff --git a/src/nginx_prometheus_exporter.py b/src/nginx_prometheus_exporter.py index 539bd01..7ce7b2c 100644 --- a/src/nginx_prometheus_exporter.py +++ b/src/nginx_prometheus_exporter.py @@ -27,7 +27,7 @@ def configure_pebble_layer(self) -> None: @property def layer(self) -> Layer: """Return the Pebble layer for Nginx Prometheus exporter.""" - scheme = "https" if self._charm._is_cert_available else "http" # type: ignore + scheme = "https" if self._charm.tls_available else "http" # type: ignore return Layer( { "summary": "nginx prometheus exporter layer", diff --git a/tests/scenario/conftest.py b/tests/scenario/conftest.py index f629faa..84f9d16 100644 --- a/tests/scenario/conftest.py +++ b/tests/scenario/conftest.py @@ -1,7 +1,7 @@ from unittest.mock import patch import pytest -from scenario import Context, Relation +from scenario import Container, Context, Relation from charm import TempoCoordinatorCharm from tempo_cluster import TempoClusterRequirerAppData, TempoRole @@ -45,3 +45,11 @@ def all_worker(): "tempo-cluster", remote_app_data=TempoClusterRequirerAppData(role=TempoRole.all).dump(), ) + + +@pytest.fixture(scope="function") +def nginx_container(): + return Container( + "nginx", + can_connect=True, + ) diff --git a/tests/scenario/test_charm_statuses.py b/tests/scenario/test_charm_statuses.py index 5c88235..c6e3601 100644 --- a/tests/scenario/test_charm_statuses.py +++ b/tests/scenario/test_charm_statuses.py @@ -1,6 +1,7 @@ from unittest.mock import patch import ops + from scenario import PeerRelation, State from tempo import Tempo @@ -33,11 +34,12 @@ def test_scaled_status_no_workers(context, all_worker): assert state_out.unit_status.name == "blocked" -def test_scaled_status_with_s3_and_workers(context, s3, all_worker): +def test_scaled_status_with_s3_and_workers(context, s3, all_worker, nginx_container): state_out = context.run( "start", State( relations=[PeerRelation("peers", peers_data={1: {}, 2: {}}), s3, all_worker], + containers=[nginx_container], unit_status=ops.ActiveStatus(), ), ) @@ -45,11 +47,12 @@ def test_scaled_status_with_s3_and_workers(context, s3, all_worker): @patch.object(Tempo, "is_ready", new=True) -def test_happy_status(context, s3, all_worker): +def test_happy_status(context, s3, all_worker, nginx_container): state_out = context.run( "start", State( relations=[PeerRelation("peers", peers_data={1: {}, 2: {}}), s3, all_worker], + containers=[nginx_container], unit_status=ops.ActiveStatus(), ), ) diff --git a/tests/scenario/test_nginx.py b/tests/scenario/test_nginx.py new file mode 100644 index 0000000..e8b2d98 --- /dev/null +++ b/tests/scenario/test_nginx.py @@ -0,0 +1,30 @@ +from typing import List +from unittest.mock import MagicMock, patch + +import logging +import ops +import pytest +from scenario import PeerRelation, State + +from nginx import Nginx +from tempo_cluster import TempoClusterProvider + + +logger = logging.getLogger(__name__) + +@pytest.fixture +def tempo_cluster_provider(): + cluster_mock = MagicMock() + return TempoClusterProvider(cluster_mock) + + +def test_nginx_config_is_list_before_crossplane(context, nginx_container, tempo_cluster_provider): + unit = MagicMock() + unit.get_container = nginx_container + tempo_charm = MagicMock() + tempo_charm.unit = MagicMock(return_value=unit) + + nginx = Nginx(tempo_charm, tempo_cluster_provider, "lolcathost") + + prepared_config = nginx._prepare_config(tls=False) + assert isinstance(prepared_config, List) From 4a28a07ff59aac899882ce843a74caa5ad7cabd1 Mon Sep 17 00:00:00 2001 From: Mateusz Kulewicz Date: Mon, 24 Jun 2024 16:47:51 +0200 Subject: [PATCH 03/19] fix errors in generating nginx config --- src/nginx.py | 26 ++++++---- tests/scenario/conftest.py | 8 +++ tests/scenario/test_charm_statuses.py | 8 +-- tests/scenario/test_nginx.py | 70 +++++++++++++++++++++++++-- 4 files changed, 96 insertions(+), 16 deletions(-) diff --git a/src/nginx.py b/src/nginx.py index 4146016..0015f79 100644 --- a/src/nginx.py +++ b/src/nginx.py @@ -171,7 +171,6 @@ def configure_pebble_layer(self, tls: bool) -> None: def config(self, tls: bool = False) -> str: """Build and return the Nginx configuration.""" full_config = self._prepare_config(tls) - return crossplane.build(full_config) def _prepare_config(self, tls: bool = False) -> List[dict]: @@ -271,15 +270,19 @@ def _upstreams(self, addresses_by_role: Dict[str, Set[str]]) -> List[Dict[str, A { "directive": "upstream", "args": [str(role)], - "block": [ - [directive for directive in self._get_server_by_role(addr, role)] - for addr in address_set - ], + "block": self._upstream_servers(role, address_set), } ) return nginx_upstreams + def _upstream_servers(self, role, address_set): + servers = [] + for addr in address_set: + servers.extend(self._get_server_by_role(addr, role)) + return servers + + def _get_server_by_role(self, addr, role): directives = [] if role == TempoRole.distributor.value or TempoRole.all.value: @@ -317,6 +320,13 @@ def _basic_auth(self, enabled: bool) -> List[Optional[Dict[str, Any]]]: ] return [] + def _listen(self, ssl): + directives = [] + for port in Tempo.all_ports.values(): + directives.append({"directive": "listen", "args": [f"{port}", "ssl"] if ssl else [f"{port}"]}) + directives.append({"directive": "listen", "args": [f"[::]:{port}", "ssl"] if ssl else [f"[::]:{port}"]}) + return directives + def _server(self, addresses_by_role: Dict[str, Set[str]], tls: bool = False) -> Dict[str, Any]: auth_enabled = False @@ -325,8 +335,7 @@ def _server(self, addresses_by_role: Dict[str, Set[str]], tls: bool = False) -> "directive": "server", "args": [], "block": [ - [{"directive": "listen", "args": [f"{port}", "ssl"]} for port in Tempo.all_ports.values()], - [{"directive": "listen", "args": [f"[::]:{port}", "ssl"]} for port in Tempo.all_ports.values()], + *self._listen(ssl=True), *self._basic_auth(auth_enabled), { "directive": "proxy_set_header", @@ -346,8 +355,7 @@ def _server(self, addresses_by_role: Dict[str, Set[str]], tls: bool = False) -> "directive": "server", "args": [], "block": [ - [{"directive": "listen", "args": [f"{port}"]} for port in Tempo.all_ports.values()], - [{"directive": "listen", "args": [f"[::]:{port}"]} for port in Tempo.all_ports.values()], + *self._listen(ssl=False), *self._basic_auth(auth_enabled), { "directive": "proxy_set_header", diff --git a/tests/scenario/conftest.py b/tests/scenario/conftest.py index 84f9d16..91d499d 100644 --- a/tests/scenario/conftest.py +++ b/tests/scenario/conftest.py @@ -53,3 +53,11 @@ def nginx_container(): "nginx", can_connect=True, ) + + +@pytest.fixture(scope="function") +def nginx_prometheus_exporter_container(): + return Container( + "nginx-prometheus-exporter", + can_connect=True, + ) \ No newline at end of file diff --git a/tests/scenario/test_charm_statuses.py b/tests/scenario/test_charm_statuses.py index c6e3601..4b8a765 100644 --- a/tests/scenario/test_charm_statuses.py +++ b/tests/scenario/test_charm_statuses.py @@ -34,12 +34,12 @@ def test_scaled_status_no_workers(context, all_worker): assert state_out.unit_status.name == "blocked" -def test_scaled_status_with_s3_and_workers(context, s3, all_worker, nginx_container): +def test_scaled_status_with_s3_and_workers(context, s3, all_worker, nginx_container, nginx_prometheus_exporter_container): state_out = context.run( "start", State( relations=[PeerRelation("peers", peers_data={1: {}, 2: {}}), s3, all_worker], - containers=[nginx_container], + containers=[nginx_container, nginx_prometheus_exporter_container], unit_status=ops.ActiveStatus(), ), ) @@ -47,12 +47,12 @@ def test_scaled_status_with_s3_and_workers(context, s3, all_worker, nginx_contai @patch.object(Tempo, "is_ready", new=True) -def test_happy_status(context, s3, all_worker, nginx_container): +def test_happy_status(context, s3, all_worker, nginx_container, nginx_prometheus_exporter_container): state_out = context.run( "start", State( relations=[PeerRelation("peers", peers_data={1: {}, 2: {}}), s3, all_worker], - containers=[nginx_container], + containers=[nginx_container, nginx_prometheus_exporter_container], unit_status=ops.ActiveStatus(), ), ) diff --git a/tests/scenario/test_nginx.py b/tests/scenario/test_nginx.py index e8b2d98..0ec1140 100644 --- a/tests/scenario/test_nginx.py +++ b/tests/scenario/test_nginx.py @@ -1,10 +1,8 @@ from typing import List -from unittest.mock import MagicMock, patch +from unittest.mock import MagicMock import logging -import ops import pytest -from scenario import PeerRelation, State from nginx import Nginx from tempo_cluster import TempoClusterProvider @@ -28,3 +26,69 @@ def test_nginx_config_is_list_before_crossplane(context, nginx_container, tempo_ prepared_config = nginx._prepare_config(tls=False) assert isinstance(prepared_config, List) + +def test_nginx_config_is_parsed_by_crossplane(context, nginx_container, tempo_cluster_provider): + unit = MagicMock() + unit.get_container = nginx_container + tempo_charm = MagicMock() + tempo_charm.unit = MagicMock(return_value=unit) + + nginx = Nginx(tempo_charm, tempo_cluster_provider, "lolcathost") + + prepared_config = nginx.config(tls=False) + assert isinstance(prepared_config, str) + + +@pytest.mark.parametrize( + "addresses", + ( + {}, + {"all": {"1.2.3.4"}}, + {"all": {"1.2.3.4", "1.2.3.5"}}, + { + "all": {"1.2.3.4"}, + "distributor": {"1.2.3.5"}, + "ingester": {"1.2.3.6"}, + "querier": {"1.2.4.7"}, + "query_frontend": {"1.2.5.1"}, + "compactor": {"1.2.6.6"}, + "metrics_generator": {"1.2.8.4"} + }, + { + "distributor": {"1.2.3.5"}, + "ingester": {"1.2.3.6"}, + "querier": {"1.2.4.7"}, + "query_frontend": {"1.2.5.1"}, + "compactor": {"1.2.6.6"}, + "metrics_generator": {"1.2.8.4"} + }, + { + "distributor": {"1.2.3.5"}, + "ingester": {"1.2.3.6"}, + "querier": {"1.2.4.7"}, + "query_frontend": {"1.2.5.1"}, + "compactor": {"1.2.6.6"}, + "metrics_generator": {"1.2.8.4"} + }, + { + "distributor": {"1.2.3.5", "1.2.3.7"}, + "ingester": {"1.2.3.6", "1.2.3.8"}, + "querier": {"1.2.4.7", "1.2.4.9"}, + "query_frontend": {"1.2.5.1", "1.2.5.2"}, + "compactor": {"1.2.6.6", "1.2.6.7"}, + "metrics_generator": {"1.2.8.4", "1.2.8.5"} + } + ), +) +def test_nginx_config_is_parsed_with_workers(context, nginx_container, tempo_cluster_provider, addresses): + tempo_cluster_provider.gather_addresses_by_role = MagicMock(return_value=addresses) + + unit = MagicMock() + unit.get_container = nginx_container + tempo_charm = MagicMock() + tempo_charm.unit = MagicMock(return_value=unit) + + nginx = Nginx(tempo_charm, tempo_cluster_provider, "lolcathost") + + prepared_config = nginx.config(tls=False) + assert isinstance(prepared_config, str) \ No newline at end of file From b394023cb7fb2532824abbe3e6375edb9614dff2 Mon Sep 17 00:00:00 2001 From: Mateusz Kulewicz Date: Tue, 25 Jun 2024 14:47:36 +0200 Subject: [PATCH 04/19] Separate upstream for each port --- src/nginx.py | 80 ++++++++++++++++++++++++++++++++++------------------ 1 file changed, 53 insertions(+), 27 deletions(-) diff --git a/src/nginx.py b/src/nginx.py index 0015f79..46f44fc 100644 --- a/src/nginx.py +++ b/src/nginx.py @@ -39,7 +39,7 @@ "block": [ { "directive": "proxy_pass", - "args": ["http://distributor"], + "args": ["http://otlp-http"], }, ], }, @@ -50,11 +50,23 @@ "block": [ { "directive": "proxy_pass", - "args": ["http://distributor"], + "args": ["http://zipkin"], }, ], }, + # # Jaeger thrift HTTP ingestion + # { + # "directive": "location", + # "args": ["/api/traces"], + # "block": [ + # { + # "directive": "proxy_pass", + # "args": ["http://jaeger-thrift-http"], + # }, + # ], + # }, ] +# TODO add GRPC locations - perhaps as a separate server section? LOCATIONS_QUERY_FRONTEND: List[Dict] = [ { "directive": "location", @@ -264,43 +276,57 @@ def _log_verbose(self, verbose: bool = True) -> List[Dict[str, Any]]: ] def _upstreams(self, addresses_by_role: Dict[str, Set[str]]) -> List[Dict[str, Any]]: + addresses_mapped_to_upstreams = {} nginx_upstreams = [] - for role, address_set in addresses_by_role.items(): - nginx_upstreams.append( - { - "directive": "upstream", - "args": [str(role)], - "block": self._upstream_servers(role, address_set), - } - ) + if TempoRole.distributor in addresses_by_role.keys(): + addresses_mapped_to_upstreams["distributor"] = addresses_by_role[TempoRole.distributor] + if TempoRole.query_frontend in addresses_by_role.keys(): + addresses_mapped_to_upstreams["query_frontend"] = addresses_by_role[TempoRole.query_frontend] + if TempoRole.all in addresses_by_role.keys(): + # for all, we add addresses to existing upstreams for distributor / query_frontend or create the set + if "distributor" in addresses_mapped_to_upstreams: + addresses_mapped_to_upstreams["distributor"] = addresses_mapped_to_upstreams["distributor"].union(addresses_by_role[TempoRole.all]) + else: + addresses_mapped_to_upstreams["distributor"] = addresses_by_role[TempoRole.all] + if "query_frontend" in addresses_mapped_to_upstreams: + addresses_mapped_to_upstreams["query_frontend"] = addresses_mapped_to_upstreams["query_frontend"].union(addresses_by_role[TempoRole.all]) + else: + addresses_mapped_to_upstreams["query_frontend"] = addresses_by_role[TempoRole.all] + if "distributor" in addresses_mapped_to_upstreams.keys(): + nginx_upstreams.extend(self._distributor_upstreams(addresses_mapped_to_upstreams["distributor"])) + if "query_frontend" in addresses_mapped_to_upstreams.keys(): + nginx_upstreams.extend(self._query_frontend_upstreams(addresses_mapped_to_upstreams["query_frontend"])) return nginx_upstreams - def _upstream_servers(self, role, address_set): - servers = [] - for addr in address_set: - servers.extend(self._get_server_by_role(addr, role)) - return servers + def _distributor_upstreams(self, address_set): + return [ + self._upstream("distributor", address_set, Tempo.server_ports["tempo_http"]), + self._upstream("otlp-http", address_set, Tempo.receiver_ports["otlp_http"]), + self._upstream("zipkin", address_set, Tempo.receiver_ports["zipkin"]), + self._upstream("jaeger-thrift-http", address_set, Tempo.receiver_ports["jaeger_thrift_http"]), + ] + def _query_frontend_upstreams(self, address_set): + return [ + self._upstream("query-frontend", address_set, Tempo.server_ports["tempo_http"]) + ] - def _get_server_by_role(self, addr, role): - directives = [] - if role == TempoRole.distributor.value or TempoRole.all.value: - for port in Tempo.receiver_ports.values(): - directives.append({"directive": "server", "args": [f"{addr}:{port}"]}) - if role == TempoRole.query_frontend.value or TempoRole.all.value: - for port in Tempo.server_ports.values(): - directives.append({"directive": "server", "args": [f"{addr}:{port}"]}) - return directives + def _upstream(self, role, address_set, port): + return { + "directive": "upstream", + "args": [role], + "block": [{"directive": "server", "args": [f"{addr}:{port}"]} for addr in address_set], + } def _locations(self, addresses_by_role: Dict[str, Set[str]]) -> List[Dict[str, Any]]: nginx_locations = LOCATIONS_BASIC.copy() roles = addresses_by_role.keys() - if "distributor" in roles: + if "distributor" in roles or "all" in roles: + # TODO split locations for every port nginx_locations.extend(LOCATIONS_DISTRIBUTOR) - # TODO do we need ingester, querier, compactor here? they aren't an entrypoint from outside - if "query-frontend" in roles: + if "query-frontend" in roles or "all" in roles: nginx_locations.extend(LOCATIONS_QUERY_FRONTEND) return nginx_locations From 8a79a1f82ba3f7a5defcbbd0349d117ca6bed754 Mon Sep 17 00:00:00 2001 From: Mateusz Kulewicz Date: Tue, 25 Jun 2024 15:19:01 +0200 Subject: [PATCH 05/19] Add proxy header to nginx endpoints --- src/nginx.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/nginx.py b/src/nginx.py index 46f44fc..8eedb76 100644 --- a/src/nginx.py +++ b/src/nginx.py @@ -367,6 +367,10 @@ def _server(self, addresses_by_role: Dict[str, Set[str]], tls: bool = False) -> "directive": "proxy_set_header", "args": ["X-Scope-OrgID", "$ensured_x_scope_orgid"], }, + { + "directive": "proxy_set_header", + "args": ["Host", "$host:$server_port"] + }, # FIXME: use a suitable SERVER_NAME {"directive": "server_name", "args": [self.server_name]}, {"directive": "ssl_certificate", "args": [CERT_PATH]}, @@ -387,6 +391,10 @@ def _server(self, addresses_by_role: Dict[str, Set[str]], tls: bool = False) -> "directive": "proxy_set_header", "args": ["X-Scope-OrgID", "$ensured_x_scope_orgid"], }, + { + "directive": "proxy_set_header", + "args": ["Host", "$host:$server_port"] + }, *self._locations(addresses_by_role), ], } From f6b9ada9ed5b2d969778e8c319632aa20dad2cfa Mon Sep 17 00:00:00 2001 From: Mateusz Kulewicz Date: Mon, 1 Jul 2024 11:55:42 +0200 Subject: [PATCH 06/19] Use set_ports instead of KubernetesServicePatch --- charmcraft.yaml | 2 +- src/charm.py | 5 +++-- src/nginx.py | 19 +++++++++---------- tests/unit/test_charm.py | 4 +++- 4 files changed, 16 insertions(+), 14 deletions(-) diff --git a/charmcraft.yaml b/charmcraft.yaml index 1055a84..fdcbc28 100644 --- a/charmcraft.yaml +++ b/charmcraft.yaml @@ -26,7 +26,7 @@ resources: nginx-image: type: oci-image description: OCI image for nginx - upstream-source: ubuntu/nginx:1.18-22.04_beta + upstream-source: ubuntu/nginx:1.24-24.04_beta nginx-prometheus-exporter-image: type: oci-image description: OCI image for nginx-prometheus-exporter diff --git a/src/charm.py b/src/charm.py index bb523c0..ce9ab72 100755 --- a/src/charm.py +++ b/src/charm.py @@ -89,8 +89,9 @@ def __init__(self, *args): ], ) # # Patch the juju-created Kubernetes service to contain the right ports - external_ports = tempo.get_external_ports(self.app.name) - self._service_patcher = KubernetesServicePatch(self, external_ports) + self.unit.set_ports(*self.tempo.all_ports.values()) + + # self._service_patcher = KubernetesServicePatch(self, external_ports) # Provide ability for Tempo to be scraped by Prometheus using prometheus_scrape self._scraping = MetricsEndpointProvider( self, diff --git a/src/nginx.py b/src/nginx.py index 8eedb76..6163e89 100644 --- a/src/nginx.py +++ b/src/nginx.py @@ -186,7 +186,8 @@ def config(self, tls: bool = False) -> str: return crossplane.build(full_config) def _prepare_config(self, tls: bool = False) -> List[dict]: - log_level = "error" + # TODO remember to put it back to error + log_level = "debug" addresses_by_role = self.cluster_provider.gather_addresses_by_role() # build the complete configuration full_config = [ @@ -253,7 +254,7 @@ def layer(self) -> Layer: "nginx": { "override": "replace", "summary": "nginx", - "command": "nginx", + "command": "nginx -g 'daemon off;'", "startup": "enabled", } }, @@ -351,6 +352,7 @@ def _listen(self, ssl): for port in Tempo.all_ports.values(): directives.append({"directive": "listen", "args": [f"{port}", "ssl"] if ssl else [f"{port}"]}) directives.append({"directive": "listen", "args": [f"[::]:{port}", "ssl"] if ssl else [f"[::]:{port}"]}) + return directives return directives def _server(self, addresses_by_role: Dict[str, Set[str]], tls: bool = False) -> Dict[str, Any]: @@ -367,10 +369,6 @@ def _server(self, addresses_by_role: Dict[str, Set[str]], tls: bool = False) -> "directive": "proxy_set_header", "args": ["X-Scope-OrgID", "$ensured_x_scope_orgid"], }, - { - "directive": "proxy_set_header", - "args": ["Host", "$host:$server_port"] - }, # FIXME: use a suitable SERVER_NAME {"directive": "server_name", "args": [self.server_name]}, {"directive": "ssl_certificate", "args": [CERT_PATH]}, @@ -391,10 +389,11 @@ def _server(self, addresses_by_role: Dict[str, Set[str]], tls: bool = False) -> "directive": "proxy_set_header", "args": ["X-Scope-OrgID", "$ensured_x_scope_orgid"], }, - { - "directive": "proxy_set_header", - "args": ["Host", "$host:$server_port"] - }, + # { + # "directive": "proxy_set_header", + # "args": ["Host", "$host:$server_port"] + # }, + {"directive": "server_name", "args": [self.server_name]}, *self._locations(addresses_by_role), ], } diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py index bdc9b33..72485d6 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm.py @@ -8,7 +8,7 @@ from charm import TempoCoordinatorCharm -CONTAINER_NAME = "tempo" +CONTAINER_NAME = "nginx" class TestTempoCoordinatorCharm(unittest.TestCase): @@ -19,6 +19,8 @@ def setUp(self): self.addCleanup(self.harness.cleanup) self.harness.set_leader(True) self.harness.begin_with_initial_hooks() + self.harness.add_relation("s3", "s3-integrator") + self.harness.add_relation("tempo-cluster", "tempo-worker-k8s") self.maxDiff = None # we're comparing big traefik configs in tests def test_entrypoints_are_generated_with_sanitized_names(self): From cdcd0ab226b345ee66db41bcb923168476f8aabd Mon Sep 17 00:00:00 2001 From: Mateusz Kulewicz Date: Tue, 2 Jul 2024 15:21:19 +0200 Subject: [PATCH 07/19] Separate servers for each port, route everything to right upstream --- src/nginx.py | 70 +++++++++++++++++++++--------------- tests/scenario/test_nginx.py | 1 + 2 files changed, 43 insertions(+), 28 deletions(-) diff --git a/src/nginx.py b/src/nginx.py index 6163e89..c2f0ec6 100644 --- a/src/nginx.py +++ b/src/nginx.py @@ -186,8 +186,7 @@ def config(self, tls: bool = False) -> str: return crossplane.build(full_config) def _prepare_config(self, tls: bool = False) -> List[dict]: - # TODO remember to put it back to error - log_level = "debug" + log_level = "error" addresses_by_role = self.cluster_provider.gather_addresses_by_role() # build the complete configuration full_config = [ @@ -237,7 +236,7 @@ def _prepare_config(self, tls: bool = False) -> List[dict]: }, {"directive": "proxy_read_timeout", "args": ["300"]}, # server block - self._server(addresses_by_role, tls), + *self._servers(addresses_by_role, tls), ], }, ] @@ -279,6 +278,7 @@ def _log_verbose(self, verbose: bool = True) -> List[Dict[str, Any]]: def _upstreams(self, addresses_by_role: Dict[str, Set[str]]) -> List[Dict[str, Any]]: addresses_mapped_to_upstreams = {} nginx_upstreams = [] + # TODO this code might be unnecessarily complex. Can we simplify it? if TempoRole.distributor in addresses_by_role.keys(): addresses_mapped_to_upstreams["distributor"] = addresses_by_role[TempoRole.distributor] if TempoRole.query_frontend in addresses_by_role.keys(): @@ -302,15 +302,16 @@ def _upstreams(self, addresses_by_role: Dict[str, Set[str]]) -> List[Dict[str, A def _distributor_upstreams(self, address_set): return [ - self._upstream("distributor", address_set, Tempo.server_ports["tempo_http"]), self._upstream("otlp-http", address_set, Tempo.receiver_ports["otlp_http"]), + self._upstream("otlp-grpc", address_set, Tempo.receiver_ports["otlp_grpc"]), self._upstream("zipkin", address_set, Tempo.receiver_ports["zipkin"]), self._upstream("jaeger-thrift-http", address_set, Tempo.receiver_ports["jaeger_thrift_http"]), ] def _query_frontend_upstreams(self, address_set): return [ - self._upstream("query-frontend", address_set, Tempo.server_ports["tempo_http"]) + self._upstream("tempo-http", address_set, Tempo.server_ports["tempo_http"]), + self._upstream("tempo-grpc", address_set, Tempo.server_ports["tempo_grpc"]), ] def _upstream(self, role, address_set, port): @@ -320,15 +321,20 @@ def _upstream(self, role, address_set, port): "block": [{"directive": "server", "args": [f"{addr}:{port}"]} for addr in address_set], } - def _locations(self, addresses_by_role: Dict[str, Set[str]]) -> List[Dict[str, Any]]: - nginx_locations = LOCATIONS_BASIC.copy() - roles = addresses_by_role.keys() - - if "distributor" in roles or "all" in roles: - # TODO split locations for every port - nginx_locations.extend(LOCATIONS_DISTRIBUTOR) - if "query-frontend" in roles or "all" in roles: - nginx_locations.extend(LOCATIONS_QUERY_FRONTEND) + def _locations(self, upstream: str, grpc: bool) -> List[Dict[str, Any]]: + protocol = "grpc" if grpc else "http" + nginx_locations = [ + { + "directive": "location", + "args": ["/"], + "block": [ + { + "directive": "grpc_pass" if grpc else "proxy_pass", + "args": [f"{protocol}://{upstream}"], + } + ], + } + ] return nginx_locations def _resolver(self, custom_resolver: Optional[List[Any]] = None) -> List[Dict[str, Any]]: @@ -347,15 +353,27 @@ def _basic_auth(self, enabled: bool) -> List[Optional[Dict[str, Any]]]: ] return [] - def _listen(self, ssl): + def _listen(self, port, ssl): directives = [] - for port in Tempo.all_ports.values(): - directives.append({"directive": "listen", "args": [f"{port}", "ssl"] if ssl else [f"{port}"]}) - directives.append({"directive": "listen", "args": [f"[::]:{port}", "ssl"] if ssl else [f"[::]:{port}"]}) - return directives + directives.append({"directive": "listen", "args": [f"{port}", "ssl"] if ssl else [f"{port}"]}) + directives.append({"directive": "listen", "args": [f"[::]:{port}", "ssl"] if ssl else [f"[::]:{port}"]}) return directives - def _server(self, addresses_by_role: Dict[str, Set[str]], tls: bool = False) -> Dict[str, Any]: + def _servers(self, addresses_by_role: Dict[str, Set[str]], tls: bool = False) -> List[Dict[str, Any]]: + servers = [] + roles = addresses_by_role.keys() + + if "distributor" in roles or "all" in roles: + servers.append(self._server(Tempo.receiver_ports["otlp_http"], "otlp-http", False, tls)) + servers.append(self._server(Tempo.receiver_ports["zipkin"], "zipkin", False, tls)) + servers.append(self._server(Tempo.receiver_ports["jaeger_thrift_http"], "jaeger-thrift-http", False, tls)) + servers.append(self._server(Tempo.receiver_ports["otlp_grpc"], "otlp-grpc", True, tls)) + if "query-frontend" in roles or "all" in roles: + servers.append(self._server(Tempo.server_ports["tempo_http"], "tempo-http", False, tls)) + servers.append(self._server(Tempo.server_ports["tempo_grpc"], "tempo-grpc", True, tls)) + return servers + + def _server(self, port: int, upstream: str, grpc: bool = False, tls: bool = False) -> Dict[str, Any]: auth_enabled = False if tls: @@ -363,7 +381,7 @@ def _server(self, addresses_by_role: Dict[str, Set[str]], tls: bool = False) -> "directive": "server", "args": [], "block": [ - *self._listen(ssl=True), + *self._listen(port, ssl=True), *self._basic_auth(auth_enabled), { "directive": "proxy_set_header", @@ -375,7 +393,7 @@ def _server(self, addresses_by_role: Dict[str, Set[str]], tls: bool = False) -> {"directive": "ssl_certificate_key", "args": [KEY_PATH]}, {"directive": "ssl_protocols", "args": ["TLSv1", "TLSv1.1", "TLSv1.2"]}, {"directive": "ssl_ciphers", "args": ["HIGH:!aNULL:!MD5"]}, # codespell:ignore - *self._locations(addresses_by_role), + *self._locations(upstream, grpc), ], } @@ -383,17 +401,13 @@ def _server(self, addresses_by_role: Dict[str, Set[str]], tls: bool = False) -> "directive": "server", "args": [], "block": [ - *self._listen(ssl=False), + *self._listen(port, ssl=False), *self._basic_auth(auth_enabled), { "directive": "proxy_set_header", "args": ["X-Scope-OrgID", "$ensured_x_scope_orgid"], }, - # { - # "directive": "proxy_set_header", - # "args": ["Host", "$host:$server_port"] - # }, {"directive": "server_name", "args": [self.server_name]}, - *self._locations(addresses_by_role), + *self._locations(upstream, grpc), ], } diff --git a/tests/scenario/test_nginx.py b/tests/scenario/test_nginx.py index 0ec1140..409683f 100644 --- a/tests/scenario/test_nginx.py +++ b/tests/scenario/test_nginx.py @@ -34,6 +34,7 @@ def test_nginx_config_is_parsed_by_crossplane(context, nginx_container, tempo_cl tempo_charm.unit = MagicMock(return_value=unit) nginx = Nginx(tempo_charm, tempo_cluster_provider, "lolcathost") + logger.info(nginx._prepare_config(tls=False)) prepared_config = nginx.config(tls=False) assert isinstance(prepared_config, str) From a7e6d2547fa87b3a14c989b6916ff152dd24fabd Mon Sep 17 00:00:00 2001 From: Mateusz Kulewicz Date: Wed, 3 Jul 2024 16:32:45 +0200 Subject: [PATCH 08/19] Use http2 for grpc communication --- src/nginx.py | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/src/nginx.py b/src/nginx.py index c2f0ec6..2409e5c 100644 --- a/src/nginx.py +++ b/src/nginx.py @@ -353,12 +353,24 @@ def _basic_auth(self, enabled: bool) -> List[Optional[Dict[str, Any]]]: ] return [] - def _listen(self, port, ssl): + def _listen(self, port, ssl, http2): directives = [] - directives.append({"directive": "listen", "args": [f"{port}", "ssl"] if ssl else [f"{port}"]}) - directives.append({"directive": "listen", "args": [f"[::]:{port}", "ssl"] if ssl else [f"[::]:{port}"]}) + directives.append({"directive": "listen", "args": self._listen_args(port, False, ssl, http2)}) + directives.append({"directive": "listen", "args": self._listen_args(port, True, ssl, http2)}) return directives + def _listen_args(self, port, ipv6, ssl, http2): + args = [] + if ipv6: + args.append(f"[::]:{port}") + else: + args.append(f"{port}") + if ssl: + args.append("ssl") + if http2: + args.append("http2") + return args + def _servers(self, addresses_by_role: Dict[str, Set[str]], tls: bool = False) -> List[Dict[str, Any]]: servers = [] roles = addresses_by_role.keys() @@ -381,7 +393,7 @@ def _server(self, port: int, upstream: str, grpc: bool = False, tls: bool = Fals "directive": "server", "args": [], "block": [ - *self._listen(port, ssl=True), + *self._listen(port, ssl=True, http2=grpc), *self._basic_auth(auth_enabled), { "directive": "proxy_set_header", @@ -401,7 +413,7 @@ def _server(self, port: int, upstream: str, grpc: bool = False, tls: bool = Fals "directive": "server", "args": [], "block": [ - *self._listen(port, ssl=False), + *self._listen(port, ssl=False, http2=grpc), *self._basic_auth(auth_enabled), { "directive": "proxy_set_header", From 10ad79c41d24edffd7c5c06db10186c2a8a3a1e7 Mon Sep 17 00:00:00 2001 From: Mateusz Kulewicz Date: Wed, 3 Jul 2024 17:23:29 +0200 Subject: [PATCH 09/19] test fixes and formatting --- src/charm.py | 6 +- src/nginx.py | 192 +++++------------------ tests/scenario/conftest.py | 9 +- tests/scenario/test_charm_statuses.py | 13 +- tests/scenario/test_enabled_receivers.py | 29 +++- tests/scenario/test_ingressed_tracing.py | 7 +- tests/scenario/test_nginx.py | 21 +-- tests/scenario/test_tempo_clustered.py | 25 ++- tests/scenario/test_tls.py | 3 +- tests/scenario/test_tracing_legacy.py | 7 +- tests/scenario/test_tracing_provider.py | 5 +- tests/unit/test_charm.py | 2 - 12 files changed, 128 insertions(+), 191 deletions(-) diff --git a/src/charm.py b/src/charm.py index ce9ab72..c96dcbc 100755 --- a/src/charm.py +++ b/src/charm.py @@ -13,7 +13,6 @@ from charms.data_platform_libs.v0.s3 import S3Requirer from charms.grafana_k8s.v0.grafana_dashboard import GrafanaDashboardProvider from charms.grafana_k8s.v0.grafana_source import GrafanaSourceProvider -from charms.observability_libs.v0.kubernetes_service_patch import KubernetesServicePatch from charms.observability_libs.v1.cert_handler import VAULT_SECRET_LABEL, CertHandler from charms.prometheus_k8s.v0.prometheus_scrape import MetricsEndpointProvider from charms.tempo_k8s.v1.charm_tracing import trace_charm @@ -28,8 +27,8 @@ from ops.model import ActiveStatus, BlockedStatus, Relation, WaitingStatus from coordinator import TempoCoordinator -from nginx import CA_CERT_PATH, CERT_PATH, KEY_PATH, Nginx -from nginx_prometheus_exporter import NGINX_PROMETHEUS_EXPORTER_PORT, NginxPrometheusExporter +from nginx import Nginx +from nginx_prometheus_exporter import NginxPrometheusExporter from tempo import Tempo from tempo_cluster import TempoClusterProvider @@ -409,7 +408,6 @@ def _on_event(self, event): # update tracing relations self._update_tracing_relations() - ################### # UTILITY METHODS # ################### diff --git a/src/nginx.py b/src/nginx.py index 2409e5c..10aca14 100644 --- a/src/nginx.py +++ b/src/nginx.py @@ -21,144 +21,6 @@ CERT_PATH = f"{NGINX_DIR}/certs/server.cert" CA_CERT_PATH = f"{NGINX_DIR}/certs/ca.cert" -LOCATIONS_DISTRIBUTOR: List[Dict[str, Any]] = [ - { - "directive": "location", - "args": ["/distributor"], - "block": [ - { - "directive": "proxy_pass", - "args": ["http://distributor"], - }, - ], - }, - # OTLP/HTTP ingestion - { - "directive": "location", - "args": ["/v1/traces"], - "block": [ - { - "directive": "proxy_pass", - "args": ["http://otlp-http"], - }, - ], - }, - # Zipkin ingestion - { - "directive": "location", - "args": ["/api/v2/spans"], - "block": [ - { - "directive": "proxy_pass", - "args": ["http://zipkin"], - }, - ], - }, - # # Jaeger thrift HTTP ingestion - # { - # "directive": "location", - # "args": ["/api/traces"], - # "block": [ - # { - # "directive": "proxy_pass", - # "args": ["http://jaeger-thrift-http"], - # }, - # ], - # }, -] -# TODO add GRPC locations - perhaps as a separate server section? -LOCATIONS_QUERY_FRONTEND: List[Dict] = [ - { - "directive": "location", - "args": ["/prometheus"], - "block": [ - { - "directive": "proxy_pass", - "args": ["http://query-frontend"], - }, - ], - }, - { - "directive": "location", - "args": ["/api/echo"], - "block": [ - { - "directive": "proxy_pass", - "args": ["http://query-frontend"], - }, - ], - }, - { - "directive": "location", - "args": ["/api/traces"], - "block": [ - { - "directive": "proxy_pass", - "args": ["http://query-frontend"], - }, - ], - }, - { - "directive": "location", - "args": ["/api/search"], - "block": [ - { - "directive": "proxy_pass", - "args": ["http://query-frontend"], - }, - ], - }, - { - "directive": "location", - "args": ["/api/v2/search"], - "block": [ - { - "directive": "proxy_pass", - "args": ["http://query-frontend"], - }, - ], - }, - { - "directive": "location", - "args": ["/api/overrides"], - "block": [ - { - "directive": "proxy_pass", - "args": ["http://query-frontend"], - }, - ], - }, - # Buildinfo endpoint can go to any component - { - "directive": "location", - "args": ["=", "/api/status/buildinfo"], - "block": [ - { - "directive": "proxy_pass", - "args": ["http://query-frontend"], - }, - ], - }, -] - -LOCATIONS_BASIC: List[Dict] = [ - { - "directive": "location", - "args": ["=", "/"], - "block": [ - {"directive": "return", "args": ["200", "'OK'"]}, - {"directive": "auth_basic", "args": ["off"]}, - ], - }, - { # Location to be used by nginx-prometheus-exporter - "directive": "location", - "args": ["=", "/status"], - "block": [ - {"directive": "stub_status", "args": []}, - ], - }, -] - class Nginx: """Helper class to manage the nginx workload.""" @@ -282,21 +144,31 @@ def _upstreams(self, addresses_by_role: Dict[str, Set[str]]) -> List[Dict[str, A if TempoRole.distributor in addresses_by_role.keys(): addresses_mapped_to_upstreams["distributor"] = addresses_by_role[TempoRole.distributor] if TempoRole.query_frontend in addresses_by_role.keys(): - addresses_mapped_to_upstreams["query_frontend"] = addresses_by_role[TempoRole.query_frontend] + addresses_mapped_to_upstreams["query_frontend"] = addresses_by_role[ + TempoRole.query_frontend + ] if TempoRole.all in addresses_by_role.keys(): # for all, we add addresses to existing upstreams for distributor / query_frontend or create the set if "distributor" in addresses_mapped_to_upstreams: - addresses_mapped_to_upstreams["distributor"] = addresses_mapped_to_upstreams["distributor"].union(addresses_by_role[TempoRole.all]) + addresses_mapped_to_upstreams["distributor"] = addresses_mapped_to_upstreams[ + "distributor" + ].union(addresses_by_role[TempoRole.all]) else: addresses_mapped_to_upstreams["distributor"] = addresses_by_role[TempoRole.all] if "query_frontend" in addresses_mapped_to_upstreams: - addresses_mapped_to_upstreams["query_frontend"] = addresses_mapped_to_upstreams["query_frontend"].union(addresses_by_role[TempoRole.all]) + addresses_mapped_to_upstreams["query_frontend"] = addresses_mapped_to_upstreams[ + "query_frontend" + ].union(addresses_by_role[TempoRole.all]) else: addresses_mapped_to_upstreams["query_frontend"] = addresses_by_role[TempoRole.all] if "distributor" in addresses_mapped_to_upstreams.keys(): - nginx_upstreams.extend(self._distributor_upstreams(addresses_mapped_to_upstreams["distributor"])) + nginx_upstreams.extend( + self._distributor_upstreams(addresses_mapped_to_upstreams["distributor"]) + ) if "query_frontend" in addresses_mapped_to_upstreams.keys(): - nginx_upstreams.extend(self._query_frontend_upstreams(addresses_mapped_to_upstreams["query_frontend"])) + nginx_upstreams.extend( + self._query_frontend_upstreams(addresses_mapped_to_upstreams["query_frontend"]) + ) return nginx_upstreams @@ -305,7 +177,9 @@ def _distributor_upstreams(self, address_set): self._upstream("otlp-http", address_set, Tempo.receiver_ports["otlp_http"]), self._upstream("otlp-grpc", address_set, Tempo.receiver_ports["otlp_grpc"]), self._upstream("zipkin", address_set, Tempo.receiver_ports["zipkin"]), - self._upstream("jaeger-thrift-http", address_set, Tempo.receiver_ports["jaeger_thrift_http"]), + self._upstream( + "jaeger-thrift-http", address_set, Tempo.receiver_ports["jaeger_thrift_http"] + ), ] def _query_frontend_upstreams(self, address_set): @@ -355,8 +229,12 @@ def _basic_auth(self, enabled: bool) -> List[Optional[Dict[str, Any]]]: def _listen(self, port, ssl, http2): directives = [] - directives.append({"directive": "listen", "args": self._listen_args(port, False, ssl, http2)}) - directives.append({"directive": "listen", "args": self._listen_args(port, True, ssl, http2)}) + directives.append( + {"directive": "listen", "args": self._listen_args(port, False, ssl, http2)} + ) + directives.append( + {"directive": "listen", "args": self._listen_args(port, True, ssl, http2)} + ) return directives def _listen_args(self, port, ipv6, ssl, http2): @@ -371,21 +249,33 @@ def _listen_args(self, port, ipv6, ssl, http2): args.append("http2") return args - def _servers(self, addresses_by_role: Dict[str, Set[str]], tls: bool = False) -> List[Dict[str, Any]]: + def _servers( + self, addresses_by_role: Dict[str, Set[str]], tls: bool = False + ) -> List[Dict[str, Any]]: servers = [] roles = addresses_by_role.keys() if "distributor" in roles or "all" in roles: - servers.append(self._server(Tempo.receiver_ports["otlp_http"], "otlp-http", False, tls)) + servers.append( + self._server(Tempo.receiver_ports["otlp_http"], "otlp-http", False, tls) + ) servers.append(self._server(Tempo.receiver_ports["zipkin"], "zipkin", False, tls)) - servers.append(self._server(Tempo.receiver_ports["jaeger_thrift_http"], "jaeger-thrift-http", False, tls)) + servers.append( + self._server( + Tempo.receiver_ports["jaeger_thrift_http"], "jaeger-thrift-http", False, tls + ) + ) servers.append(self._server(Tempo.receiver_ports["otlp_grpc"], "otlp-grpc", True, tls)) if "query-frontend" in roles or "all" in roles: - servers.append(self._server(Tempo.server_ports["tempo_http"], "tempo-http", False, tls)) + servers.append( + self._server(Tempo.server_ports["tempo_http"], "tempo-http", False, tls) + ) servers.append(self._server(Tempo.server_ports["tempo_grpc"], "tempo-grpc", True, tls)) return servers - def _server(self, port: int, upstream: str, grpc: bool = False, tls: bool = False) -> Dict[str, Any]: + def _server( + self, port: int, upstream: str, grpc: bool = False, tls: bool = False + ) -> Dict[str, Any]: auth_enabled = False if tls: diff --git a/tests/scenario/conftest.py b/tests/scenario/conftest.py index 91d499d..9fe9ebb 100644 --- a/tests/scenario/conftest.py +++ b/tests/scenario/conftest.py @@ -9,10 +9,9 @@ @pytest.fixture def tempo_charm(): - with patch("charm.KubernetesServicePatch"): - with patch("lightkube.core.client.GenericSyncClient"): - with patch("charm.TempoCoordinatorCharm._update_server_cert"): - yield TempoCoordinatorCharm + with patch("lightkube.core.client.GenericSyncClient"): + with patch("charm.TempoCoordinatorCharm._update_server_cert"): + yield TempoCoordinatorCharm @pytest.fixture(scope="function") @@ -60,4 +59,4 @@ def nginx_prometheus_exporter_container(): return Container( "nginx-prometheus-exporter", can_connect=True, - ) \ No newline at end of file + ) diff --git a/tests/scenario/test_charm_statuses.py b/tests/scenario/test_charm_statuses.py index 4b8a765..5c16e6b 100644 --- a/tests/scenario/test_charm_statuses.py +++ b/tests/scenario/test_charm_statuses.py @@ -1,14 +1,13 @@ from unittest.mock import patch import ops - from scenario import PeerRelation, State from tempo import Tempo def test_monolithic_status_no_s3_no_workers(context): - state_out = context.run("start", State(unit_status=ops.ActiveStatus())) + state_out = context.run("start", State(unit_status=ops.ActiveStatus(), leader=True)) assert state_out.unit_status.name == "blocked" @@ -34,26 +33,32 @@ def test_scaled_status_no_workers(context, all_worker): assert state_out.unit_status.name == "blocked" -def test_scaled_status_with_s3_and_workers(context, s3, all_worker, nginx_container, nginx_prometheus_exporter_container): +def test_scaled_status_with_s3_and_workers( + context, s3, all_worker, nginx_container, nginx_prometheus_exporter_container +): state_out = context.run( "start", State( relations=[PeerRelation("peers", peers_data={1: {}, 2: {}}), s3, all_worker], containers=[nginx_container, nginx_prometheus_exporter_container], unit_status=ops.ActiveStatus(), + leader=True, ), ) assert state_out.unit_status.name == "active" @patch.object(Tempo, "is_ready", new=True) -def test_happy_status(context, s3, all_worker, nginx_container, nginx_prometheus_exporter_container): +def test_happy_status( + context, s3, all_worker, nginx_container, nginx_prometheus_exporter_container +): state_out = context.run( "start", State( relations=[PeerRelation("peers", peers_data={1: {}, 2: {}}), s3, all_worker], containers=[nginx_container, nginx_prometheus_exporter_container], unit_status=ops.ActiveStatus(), + leader=True, ), ) assert state_out.unit_status.name == "active" diff --git a/tests/scenario/test_enabled_receivers.py b/tests/scenario/test_enabled_receivers.py index cbc6a20..c77d8b0 100644 --- a/tests/scenario/test_enabled_receivers.py +++ b/tests/scenario/test_enabled_receivers.py @@ -12,19 +12,31 @@ from charm import TempoCoordinatorCharm -def test_receivers_with_no_relations_or_config(context, s3, all_worker): +def test_receivers_with_no_relations_or_config( + context, s3, all_worker, nginx_container, nginx_prometheus_exporter_container +): - state = State(leader=True, relations=[s3, all_worker]) + state = State( + leader=True, + relations=[s3, all_worker], + containers=[nginx_container, nginx_prometheus_exporter_container], + ) state_out = context.run_action("list-receivers", state) assert state_out.results == {"otlp-http": f"http://{socket.getfqdn()}:4318"} -def test_receivers_with_relations(context, s3, all_worker): +def test_receivers_with_relations( + context, s3, all_worker, nginx_container, nginx_prometheus_exporter_container +): tracing = Relation( "tracing", remote_app_data=TracingRequirerAppData(receivers=["otlp_grpc"]).dump(), ) - state = State(leader=True, relations=[s3, all_worker, tracing]) + state = State( + leader=True, + relations=[s3, all_worker, tracing], + containers=[nginx_container, nginx_prometheus_exporter_container], + ) with context.manager(tracing.changed_event, state) as mgr: charm: TempoCoordinatorCharm = mgr.charm # extra receivers should only include default otlp_http @@ -45,7 +57,9 @@ def test_receivers_with_relations(context, s3, all_worker): } -def test_receivers_with_relations_and_config(context, s3, all_worker): +def test_receivers_with_relations_and_config( + context, s3, all_worker, nginx_container, nginx_prometheus_exporter_container +): tracing = Relation( "tracing", local_app_data=TracingProviderAppData( @@ -64,7 +78,10 @@ def test_receivers_with_relations_and_config(context, s3, all_worker): ) # start with a state that has config changed state = State( - config={"always_enable_zipkin": True}, leader=True, relations=[s3, all_worker, tracing] + config={"always_enable_zipkin": True}, + leader=True, + relations=[s3, all_worker, tracing], + containers=[nginx_container, nginx_prometheus_exporter_container], ) with context.manager("config-changed", state) as mgr: charm: TempoCoordinatorCharm = mgr.charm diff --git a/tests/scenario/test_ingressed_tracing.py b/tests/scenario/test_ingressed_tracing.py index 12805f4..ec9c850 100644 --- a/tests/scenario/test_ingressed_tracing.py +++ b/tests/scenario/test_ingressed_tracing.py @@ -9,8 +9,11 @@ @pytest.fixture -def base_state(): - return State(leader=True) +def base_state(nginx_container, nginx_prometheus_exporter_container): + return State( + leader=True, + containers=[nginx_container, nginx_prometheus_exporter_container], + ) def test_external_url_present(context, base_state, s3, all_worker): diff --git a/tests/scenario/test_nginx.py b/tests/scenario/test_nginx.py index 409683f..c8409fc 100644 --- a/tests/scenario/test_nginx.py +++ b/tests/scenario/test_nginx.py @@ -1,15 +1,15 @@ +import logging from typing import List from unittest.mock import MagicMock -import logging import pytest from nginx import Nginx from tempo_cluster import TempoClusterProvider - logger = logging.getLogger(__name__) + @pytest.fixture def tempo_cluster_provider(): cluster_mock = MagicMock() @@ -27,6 +27,7 @@ def test_nginx_config_is_list_before_crossplane(context, nginx_container, tempo_ prepared_config = nginx._prepare_config(tls=False) assert isinstance(prepared_config, List) + def test_nginx_config_is_parsed_by_crossplane(context, nginx_container, tempo_cluster_provider): unit = MagicMock() unit.get_container = nginx_container @@ -53,7 +54,7 @@ def test_nginx_config_is_parsed_by_crossplane(context, nginx_container, tempo_cl "querier": {"1.2.4.7"}, "query_frontend": {"1.2.5.1"}, "compactor": {"1.2.6.6"}, - "metrics_generator": {"1.2.8.4"} + "metrics_generator": {"1.2.8.4"}, }, { "distributor": {"1.2.3.5"}, @@ -61,7 +62,7 @@ def test_nginx_config_is_parsed_by_crossplane(context, nginx_container, tempo_cl "querier": {"1.2.4.7"}, "query_frontend": {"1.2.5.1"}, "compactor": {"1.2.6.6"}, - "metrics_generator": {"1.2.8.4"} + "metrics_generator": {"1.2.8.4"}, }, { "distributor": {"1.2.3.5"}, @@ -69,7 +70,7 @@ def test_nginx_config_is_parsed_by_crossplane(context, nginx_container, tempo_cl "querier": {"1.2.4.7"}, "query_frontend": {"1.2.5.1"}, "compactor": {"1.2.6.6"}, - "metrics_generator": {"1.2.8.4"} + "metrics_generator": {"1.2.8.4"}, }, { "distributor": {"1.2.3.5", "1.2.3.7"}, @@ -77,11 +78,13 @@ def test_nginx_config_is_parsed_by_crossplane(context, nginx_container, tempo_cl "querier": {"1.2.4.7", "1.2.4.9"}, "query_frontend": {"1.2.5.1", "1.2.5.2"}, "compactor": {"1.2.6.6", "1.2.6.7"}, - "metrics_generator": {"1.2.8.4", "1.2.8.5"} - } + "metrics_generator": {"1.2.8.4", "1.2.8.5"}, + }, ), ) -def test_nginx_config_is_parsed_with_workers(context, nginx_container, tempo_cluster_provider, addresses): +def test_nginx_config_is_parsed_with_workers( + context, nginx_container, tempo_cluster_provider, addresses +): tempo_cluster_provider.gather_addresses_by_role = MagicMock(return_value=addresses) unit = MagicMock() @@ -92,4 +95,4 @@ def test_nginx_config_is_parsed_with_workers(context, nginx_container, tempo_clu nginx = Nginx(tempo_charm, tempo_cluster_provider, "lolcathost") prepared_config = nginx.config(tls=False) - assert isinstance(prepared_config, str) \ No newline at end of file + assert isinstance(prepared_config, str) diff --git a/tests/scenario/test_tempo_clustered.py b/tests/scenario/test_tempo_clustered.py index 49e157b..b7eb75c 100644 --- a/tests/scenario/test_tempo_clustered.py +++ b/tests/scenario/test_tempo_clustered.py @@ -63,10 +63,16 @@ def patch_certs(): @pytest.fixture -def state_with_certs(context, s3, certs_relation): +def state_with_certs( + context, s3, certs_relation, nginx_container, nginx_prometheus_exporter_container +): return context.run( certs_relation.joined_event, - scenario.State(leader=True, relations=[s3, certs_relation]), + scenario.State( + leader=True, + relations=[s3, certs_relation], + containers=[nginx_container, nginx_prometheus_exporter_container], + ), ) @@ -96,7 +102,14 @@ def test_cluster_relation(context, state_with_certs, all_worker): @pytest.mark.parametrize("requested_protocol", ("otlp_grpc", "zipkin")) def test_tempo_restart_on_ingress_v2_changed( - context, tmp_path, requested_protocol, s3, s3_config, all_worker_with_initial_config + context, + tmp_path, + requested_protocol, + s3, + s3_config, + all_worker_with_initial_config, + nginx_container, + nginx_prometheus_exporter_container, ): # GIVEN # the remote end requests an otlp_grpc endpoint @@ -107,7 +120,11 @@ def test_tempo_restart_on_ingress_v2_changed( # WHEN # the charm receives a tracing(v2) relation-changed requesting an otlp_grpc receiver - state = State(leader=True, relations=[tracing, s3, all_worker_with_initial_config]) + state = State( + leader=True, + relations=[tracing, s3, all_worker_with_initial_config], + containers=[nginx_container, nginx_prometheus_exporter_container], + ) state_out = context.run(tracing.changed_event, state) # THEN diff --git a/tests/scenario/test_tls.py b/tests/scenario/test_tls.py index defb543..8021733 100644 --- a/tests/scenario/test_tls.py +++ b/tests/scenario/test_tls.py @@ -10,7 +10,7 @@ @pytest.fixture -def base_state(): +def base_state(nginx_container, nginx_prometheus_exporter_container): return State( leader=True, secrets=[ @@ -23,6 +23,7 @@ def base_state(): contents={0: {"foo": "bar"}}, ) ], + containers=[nginx_container, nginx_prometheus_exporter_container], ) diff --git a/tests/scenario/test_tracing_legacy.py b/tests/scenario/test_tracing_legacy.py index 0f3fe07..619769f 100644 --- a/tests/scenario/test_tracing_legacy.py +++ b/tests/scenario/test_tracing_legacy.py @@ -6,8 +6,11 @@ @pytest.fixture -def base_state(): - return State(leader=True) +def base_state(nginx_container, nginx_prometheus_exporter_container): + return State( + leader=True, + containers=[nginx_container, nginx_prometheus_exporter_container], + ) @pytest.mark.parametrize("evt_name", ("changed", "created", "joined")) diff --git a/tests/scenario/test_tracing_provider.py b/tests/scenario/test_tracing_provider.py index 9aece32..8630bf8 100644 --- a/tests/scenario/test_tracing_provider.py +++ b/tests/scenario/test_tracing_provider.py @@ -3,7 +3,9 @@ from scenario import Relation, State -def test_receivers_removed_on_relation_broken(context, s3, all_worker): +def test_receivers_removed_on_relation_broken( + context, s3, all_worker, nginx_container, nginx_prometheus_exporter_container +): tracing_grpc = Relation( "tracing", remote_app_data={"receivers": '["otlp_grpc"]'}, @@ -24,6 +26,7 @@ def test_receivers_removed_on_relation_broken(context, s3, all_worker): state = State( leader=True, relations=[tracing_grpc, tracing_http, s3, all_worker], + containers=[nginx_container, nginx_prometheus_exporter_container], ) with charm_tracing_disabled(): diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py index 72485d6..9a74dac 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm.py @@ -2,7 +2,6 @@ # See LICENSE file for licensing details. import unittest -from unittest.mock import patch from ops.testing import Harness @@ -12,7 +11,6 @@ class TestTempoCoordinatorCharm(unittest.TestCase): - @patch("charm.KubernetesServicePatch", lambda x, y: None) def setUp(self): self.harness = Harness(TempoCoordinatorCharm) self.harness.set_model_name("testmodel") From 8765fbb505955331460de12463e3f9735b4c33c5 Mon Sep 17 00:00:00 2001 From: Mateusz Kulewicz Date: Fri, 5 Jul 2024 13:07:54 +0200 Subject: [PATCH 10/19] Reload nginx whenever config change is detected --- src/nginx.py | 31 ++++++++++++++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) diff --git a/src/nginx.py b/src/nginx.py index 10aca14..82050b2 100644 --- a/src/nginx.py +++ b/src/nginx.py @@ -7,7 +7,8 @@ import crossplane from ops import CharmBase -from ops.pebble import Layer +from ops.pebble import Layer, PathError, ProtocolError + from tempo import Tempo from tempo_cluster import TempoClusterProvider, TempoRole @@ -35,6 +36,8 @@ def __init__(self, charm: CharmBase, cluster_provider: TempoClusterProvider, ser def configure_pebble_layer(self, tls: bool) -> None: """Configure pebble layer.""" + new_config: str = self.config(tls) + should_restart: bool = self._has_config_changed(new_config) if self._container.can_connect(): self._container.push( self.config_path, self.config(tls=tls), make_dirs=True # type: ignore @@ -42,6 +45,10 @@ def configure_pebble_layer(self, tls: bool) -> None: self._container.add_layer("nginx", self.layer, combine=True) self._container.autostart() + if should_restart: + logger.info("new nginx config: restarting the service") + self.reload() + def config(self, tls: bool = False) -> str: """Build and return the Nginx configuration.""" full_config = self._prepare_config(tls) @@ -104,6 +111,28 @@ def _prepare_config(self, tls: bool = False) -> List[dict]: ] return full_config + def _has_config_changed(self, new_config: str) -> bool: + """Return True if the passed config differs from the one on disk.""" + if not self._container.can_connect(): + logger.debug("Could not connect to Nginx container") + return False + + try: + current_config = self._container.pull(self.config_path).read() + except (ProtocolError, PathError) as e: + logger.warning( + "Could not check the current nginx configuration due to " + "a failure in retrieving the file: %s", + e, + ) + return False + + return current_config != new_config + + def reload(self) -> None: + """Reload the nginx config without restarting the service.""" + self._container.exec(["nginx", "-s", "reload"]) + @property def layer(self) -> Layer: """Return the Pebble layer for Nginx.""" From bd60cea534de17dacecb164d64bc6b4f2f664789 Mon Sep 17 00:00:00 2001 From: Mateusz Kulewicz Date: Mon, 8 Jul 2024 14:13:47 +0200 Subject: [PATCH 11/19] Use Parquet v3 so Grafana is able to query for traces --- src/tempo.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/tempo.py b/src/tempo.py index 91419cf..bdd42b3 100644 --- a/src/tempo.py +++ b/src/tempo.py @@ -207,7 +207,9 @@ def _build_storage_config(self, s3_config: dict): endpoint=s3_config["endpoint"], secret_key=s3_config["secret-key"], ), - block=tempo_config.Block(version="v2"), + # starting from Tempo 2.4, we need to use at least parquet v3 to have search capabilities (Grafana support) + # https://grafana.com/docs/tempo/latest/release-notes/v2-4/#vparquet3-is-now-the-default-block-format + block=tempo_config.Block(version="vParquet3"), ) return tempo_config.Storage(trace=storage_config) From 0834ab618e1c94840560d4fdfa6436ff4a6e62a0 Mon Sep 17 00:00:00 2001 From: Mateusz Kulewicz Date: Mon, 8 Jul 2024 16:41:07 +0200 Subject: [PATCH 12/19] typing fixes --- src/charm.py | 12 ++++++------ src/nginx.py | 11 +++++------ 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/src/charm.py b/src/charm.py index c96dcbc..1dee8cc 100755 --- a/src/charm.py +++ b/src/charm.py @@ -157,7 +157,7 @@ def __init__(self, *args): self.framework.observe(self.tempo_cluster.on.changed, self._on_tempo_cluster_changed) for evt in self.on.events().values(): - self.framework.observe(evt, self._on_event) + self.framework.observe(evt, self._on_event) # type: ignore ###################### # UTILITY PROPERTIES # @@ -394,7 +394,7 @@ def _on_nginx_pebble_ready(self, _) -> None: def _on_nginx_prometheus_exporter_pebble_ready(self, _) -> None: self.nginx_prometheus_exporter.configure_pebble_layer() - def _on_event(self, event): + def _on_event(self, event) -> None: """A set of common configuration actions that should happen on every event.""" if isinstance(event, CollectStatusEvent): return @@ -426,7 +426,7 @@ def _configure_ingress(self) -> None: # notify the cluster self._update_tempo_cluster() - def _update_tracing_relations(self): + def _update_tracing_relations(self) -> None: tracing_relations = self.model.relations["tracing"] if not tracing_relations: # todo: set waiting status and configure tempo to run without receivers if possible, @@ -454,12 +454,12 @@ def _requested_receivers(self) -> Tuple[ReceiverProtocol, ...]: requested_receivers = requested_protocols.intersection(set(self.tempo.receiver_ports)) return tuple(requested_receivers) - def server_cert(self): + def server_cert(self) -> str: """For charm tracing.""" self._update_server_cert() return self.tempo.server_cert_path - def _update_server_cert(self): + def _update_server_cert(self) -> None: """Server certificate for charm tracing tls, if tls is enabled.""" server_cert = Path(self.tempo.server_cert_path) if self.tls_available: @@ -513,7 +513,7 @@ def loki_endpoints_by_unit(self) -> Dict[str, str]: return endpoints - def _update_tempo_cluster(self): + def _update_tempo_cluster(self) -> None: """Build the config and publish everything to the application databag.""" if not self._is_consistent: logger.error("skipped tempo cluster update: inconsistent state") diff --git a/src/nginx.py b/src/nginx.py index 82050b2..60feb42 100644 --- a/src/nginx.py +++ b/src/nginx.py @@ -9,7 +9,6 @@ from ops import CharmBase from ops.pebble import Layer, PathError, ProtocolError - from tempo import Tempo from tempo_cluster import TempoClusterProvider, TempoRole @@ -201,7 +200,7 @@ def _upstreams(self, addresses_by_role: Dict[str, Set[str]]) -> List[Dict[str, A return nginx_upstreams - def _distributor_upstreams(self, address_set): + def _distributor_upstreams(self, address_set: Set[str]) -> List[Dict[str, Any]]: return [ self._upstream("otlp-http", address_set, Tempo.receiver_ports["otlp_http"]), self._upstream("otlp-grpc", address_set, Tempo.receiver_ports["otlp_grpc"]), @@ -211,13 +210,13 @@ def _distributor_upstreams(self, address_set): ), ] - def _query_frontend_upstreams(self, address_set): + def _query_frontend_upstreams(self, address_set: Set[str]) -> List[Dict[str, Any]]: return [ self._upstream("tempo-http", address_set, Tempo.server_ports["tempo_http"]), self._upstream("tempo-grpc", address_set, Tempo.server_ports["tempo_grpc"]), ] - def _upstream(self, role, address_set, port): + def _upstream(self, role: str, address_set: Set[str], port: int) -> Dict[str, Any]: return { "directive": "upstream", "args": [role], @@ -256,7 +255,7 @@ def _basic_auth(self, enabled: bool) -> List[Optional[Dict[str, Any]]]: ] return [] - def _listen(self, port, ssl, http2): + def _listen(self, port: int, ssl: bool, http2: bool) -> List[Dict[str, Any]]: directives = [] directives.append( {"directive": "listen", "args": self._listen_args(port, False, ssl, http2)} @@ -266,7 +265,7 @@ def _listen(self, port, ssl, http2): ) return directives - def _listen_args(self, port, ipv6, ssl, http2): + def _listen_args(self, port: int, ipv6: bool, ssl: bool, http2: bool) -> List[str]: args = [] if ipv6: args.append(f"[::]:{port}") From 8069abe2716dd473a430bf4bc161f8921be3ccc8 Mon Sep 17 00:00:00 2001 From: Mateusz Kulewicz Date: Mon, 8 Jul 2024 17:58:31 +0200 Subject: [PATCH 13/19] nginx-image and nginx-prometheus-exporter-image in integration tests --- tests/integration/conftest.py | 10 ---------- tests/integration/test_charm.py | 3 +++ tests/integration/test_scaling_monolithic.py | 6 +++++- tests/integration/test_self_monitoring.py | 3 +++ 4 files changed, 11 insertions(+), 11 deletions(-) diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 1accd8d..2fb6544 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -19,16 +19,6 @@ async def tempo_charm(ops_test: OpsTest): return charm -@fixture(scope="module") -def tempo_metadata(ops_test: OpsTest): - return yaml.safe_load(Path("./metadata.yaml").read_text()) - - -@fixture(scope="module") -def tempo_oci_image(ops_test: OpsTest, tempo_metadata): - return tempo_metadata["resources"]["tempo-image"]["upstream-source"] - - @fixture(scope="module", autouse=True) def copy_charm_libs_into_tester_charm(ops_test): """Ensure the tester charm has the libraries it uses.""" diff --git a/tests/integration/test_charm.py b/tests/integration/test_charm.py index aa7bb55..f28a3ea 100644 --- a/tests/integration/test_charm.py +++ b/tests/integration/test_charm.py @@ -33,6 +33,9 @@ async def test_build_and_deploy(ops_test: OpsTest): {mc.name}: charm: {charm} trust: true + resources: + nginx-image: {METADATA["resources"]["nginx-image"]["upstream-source"]} + nginx-prometheus-exporter-image: {METADATA["resources"]["nginx-prometheus-exporter-image"]["upstream-source"]} scale: 1 loki: charm: loki-k8s diff --git a/tests/integration/test_scaling_monolithic.py b/tests/integration/test_scaling_monolithic.py index d41b25d..17554e2 100644 --- a/tests/integration/test_scaling_monolithic.py +++ b/tests/integration/test_scaling_monolithic.py @@ -25,7 +25,11 @@ @pytest.mark.abort_on_fail async def test_deploy_tempo(ops_test: OpsTest): tempo_charm = await ops_test.build_charm(".") - await ops_test.model.deploy(tempo_charm, application_name=APP_NAME) + resources = { + "nginx-image": METADATA["resources"]["nginx-image"]["upstream-source"], + "nginx-prometheus-exporter-image": METADATA["resources"]["nginx-prometheus-exporter-image"]["upstream-source"], + } + await ops_test.model.deploy(tempo_charm, resources=resources, application_name=APP_NAME) await ops_test.model.wait_for_idle( apps=[APP_NAME], diff --git a/tests/integration/test_self_monitoring.py b/tests/integration/test_self_monitoring.py index 74ee598..5bdf0e7 100644 --- a/tests/integration/test_self_monitoring.py +++ b/tests/integration/test_self_monitoring.py @@ -35,6 +35,9 @@ async def test_build_and_deploy(ops_test: OpsTest): charm: {charm} trust: true scale: 1 + resources: + nginx-image: {METADATA["resources"]["nginx-image"]["upstream-source"]} + nginx-prometheus-exporter-image: {METADATA["resources"]["nginx-prometheus-exporter-image"]["upstream-source"]} prom: charm: prometheus-k8s channel: edge From fc57ef9d5bcde5fcd809ff7d8bb75f5f7ea843a4 Mon Sep 17 00:00:00 2001 From: Mateusz Kulewicz Date: Mon, 8 Jul 2024 17:59:58 +0200 Subject: [PATCH 14/19] lint --- tests/integration/conftest.py | 1 - tests/integration/test_scaling_monolithic.py | 4 +++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 2fb6544..fd3d087 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -5,7 +5,6 @@ import shutil from pathlib import Path -import yaml from pytest import fixture from pytest_operator.plugin import OpsTest diff --git a/tests/integration/test_scaling_monolithic.py b/tests/integration/test_scaling_monolithic.py index 17554e2..ed76714 100644 --- a/tests/integration/test_scaling_monolithic.py +++ b/tests/integration/test_scaling_monolithic.py @@ -27,7 +27,9 @@ async def test_deploy_tempo(ops_test: OpsTest): tempo_charm = await ops_test.build_charm(".") resources = { "nginx-image": METADATA["resources"]["nginx-image"]["upstream-source"], - "nginx-prometheus-exporter-image": METADATA["resources"]["nginx-prometheus-exporter-image"]["upstream-source"], + "nginx-prometheus-exporter-image": METADATA["resources"][ + "nginx-prometheus-exporter-image" + ]["upstream-source"], } await ops_test.model.deploy(tempo_charm, resources=resources, application_name=APP_NAME) From 41d076bb597d1cbaf451d8afd37a4ae7a021d6f5 Mon Sep 17 00:00:00 2001 From: Mateusz Kulewicz Date: Tue, 9 Jul 2024 11:11:57 +0200 Subject: [PATCH 15/19] Add containers to new config test --- tests/scenario/test_config.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/tests/scenario/test_config.py b/tests/scenario/test_config.py index e933a15..3532fb5 100644 --- a/tests/scenario/test_config.py +++ b/tests/scenario/test_config.py @@ -4,7 +4,9 @@ from tempo_cluster import TempoClusterRequirerUnitData -def test_memberlist_multiple_members(context, all_worker, s3): +def test_memberlist_multiple_members( + context, all_worker, s3, nginx_container, nginx_prometheus_exporter_container +): workers_no = 3 all_worker = all_worker.replace( remote_units_data={ @@ -23,7 +25,11 @@ def test_memberlist_multiple_members(context, all_worker, s3): for worker_idx in range(workers_no) }, ) - state = State(leader=True, relations=[all_worker, s3]) + state = State( + leader=True, + relations=[all_worker, s3], + containers=[nginx_container, nginx_prometheus_exporter_container], + ) with context.manager(all_worker.changed_event, state) as mgr: charm: TempoCoordinatorCharm = mgr.charm assert charm.tempo_cluster.gather_addresses() == set( From 6f617656d83366e9237e50f2e42b9bc2cd6c0867 Mon Sep 17 00:00:00 2001 From: Mateusz Kulewicz Date: Wed, 10 Jul 2024 19:31:32 +0200 Subject: [PATCH 16/19] Review remarks: proper cert support in nginx -> workers --- src/charm.py | 10 ++++- src/nginx.py | 76 ++++++++++++++++++++++-------------- src/tempo.py | 1 - src/tempo_config.py | 1 - tests/scenario/test_nginx.py | 8 ++-- 5 files changed, 58 insertions(+), 38 deletions(-) diff --git a/src/charm.py b/src/charm.py index 1dee8cc..8ef5447 100755 --- a/src/charm.py +++ b/src/charm.py @@ -292,8 +292,14 @@ def _on_tracing_broken(self, _): def _on_cert_handler_changed(self, _): if self.tls_available: logger.debug("enabling TLS") + self.nginx.configure_tls( + server_cert=self.cert_handler.server_cert, # type: ignore + ca_cert=self.cert_handler.ca_cert, # type: ignore + private_key=self.cert_handler.private_key, # type: ignore + ) else: logger.debug("disabling TLS") + self.nginx.delete_certificates() # tls readiness change means config change. # sync scheme change with traefik and related consumers @@ -389,7 +395,7 @@ def _on_collect_unit_status(self, e: CollectStatusEvent): e.add_status(ActiveStatus()) def _on_nginx_pebble_ready(self, _) -> None: - self.nginx.configure_pebble_layer(tls=self.tls_available) + self.nginx.configure_pebble_layer() def _on_nginx_prometheus_exporter_pebble_ready(self, _) -> None: self.nginx_prometheus_exporter.configure_pebble_layer() @@ -399,7 +405,7 @@ def _on_event(self, event) -> None: if isinstance(event, CollectStatusEvent): return # plan layers - self.nginx.configure_pebble_layer(tls=self.tls_available) + self.nginx.configure_pebble_layer() self.nginx_prometheus_exporter.configure_pebble_layer() # configure ingress self._configure_ingress() diff --git a/src/nginx.py b/src/nginx.py index 60feb42..4be3a10 100644 --- a/src/nginx.py +++ b/src/nginx.py @@ -33,27 +33,25 @@ def __init__(self, charm: CharmBase, cluster_provider: TempoClusterProvider, ser self.server_name = server_name self._container = self._charm.unit.get_container("nginx") - def configure_pebble_layer(self, tls: bool) -> None: + def configure_pebble_layer(self) -> None: """Configure pebble layer.""" - new_config: str = self.config(tls) + new_config: str = self.config() should_restart: bool = self._has_config_changed(new_config) if self._container.can_connect(): - self._container.push( - self.config_path, self.config(tls=tls), make_dirs=True # type: ignore - ) + self._container.push(self.config_path, new_config, make_dirs=True) # type: ignore self._container.add_layer("nginx", self.layer, combine=True) self._container.autostart() if should_restart: - logger.info("new nginx config: restarting the service") + logger.info("new nginx config: reloading the service") self.reload() - def config(self, tls: bool = False) -> str: + def config(self) -> str: """Build and return the Nginx configuration.""" - full_config = self._prepare_config(tls) + full_config = self._prepare_config() return crossplane.build(full_config) - def _prepare_config(self, tls: bool = False) -> List[dict]: + def _prepare_config(self) -> List[dict]: log_level = "error" addresses_by_role = self.cluster_provider.gather_addresses_by_role() # build the complete configuration @@ -104,7 +102,7 @@ def _prepare_config(self, tls: bool = False) -> List[dict]: }, {"directive": "proxy_read_timeout", "args": ["300"]}, # server block - *self._servers(addresses_by_role, tls), + *self._servers(addresses_by_role), ], }, ] @@ -223,8 +221,9 @@ def _upstream(self, role: str, address_set: Set[str], port: int) -> Dict[str, An "block": [{"directive": "server", "args": [f"{addr}:{port}"]} for addr in address_set], } - def _locations(self, upstream: str, grpc: bool) -> List[Dict[str, Any]]: - protocol = "grpc" if grpc else "http" + def _locations(self, upstream: str, grpc: bool, tls: bool) -> List[Dict[str, Any]]: + s = "s" if tls else "" + protocol = f"grpc{s}" if grpc else f"http{s}" nginx_locations = [ { "directive": "location", @@ -277,35 +276,29 @@ def _listen_args(self, port: int, ipv6: bool, ssl: bool, http2: bool) -> List[st args.append("http2") return args - def _servers( - self, addresses_by_role: Dict[str, Set[str]], tls: bool = False - ) -> List[Dict[str, Any]]: + def _servers(self, addresses_by_role: Dict[str, Set[str]]) -> List[Dict[str, Any]]: servers = [] roles = addresses_by_role.keys() if "distributor" in roles or "all" in roles: - servers.append( - self._server(Tempo.receiver_ports["otlp_http"], "otlp-http", False, tls) - ) - servers.append(self._server(Tempo.receiver_ports["zipkin"], "zipkin", False, tls)) + servers.append(self._server(Tempo.receiver_ports["otlp_http"], "otlp-http", False)) + servers.append(self._server(Tempo.receiver_ports["zipkin"], "zipkin", False)) servers.append( self._server( - Tempo.receiver_ports["jaeger_thrift_http"], "jaeger-thrift-http", False, tls + Tempo.receiver_ports["jaeger_thrift_http"], "jaeger-thrift-http", False ) ) - servers.append(self._server(Tempo.receiver_ports["otlp_grpc"], "otlp-grpc", True, tls)) + servers.append(self._server(Tempo.receiver_ports["otlp_grpc"], "otlp-grpc", True)) if "query-frontend" in roles or "all" in roles: - servers.append( - self._server(Tempo.server_ports["tempo_http"], "tempo-http", False, tls) - ) - servers.append(self._server(Tempo.server_ports["tempo_grpc"], "tempo-grpc", True, tls)) + servers.append(self._server(Tempo.server_ports["tempo_http"], "tempo-http", False)) + servers.append(self._server(Tempo.server_ports["tempo_grpc"], "tempo-grpc", True)) return servers - def _server( - self, port: int, upstream: str, grpc: bool = False, tls: bool = False - ) -> Dict[str, Any]: + def _server(self, port: int, upstream: str, grpc: bool = False) -> Dict[str, Any]: auth_enabled = False + tls = self.tls_ready + if tls: return { "directive": "server", @@ -323,7 +316,7 @@ def _server( {"directive": "ssl_certificate_key", "args": [KEY_PATH]}, {"directive": "ssl_protocols", "args": ["TLSv1", "TLSv1.1", "TLSv1.2"]}, {"directive": "ssl_ciphers", "args": ["HIGH:!aNULL:!MD5"]}, # codespell:ignore - *self._locations(upstream, grpc), + *self._locations(upstream, grpc, tls), ], } @@ -338,6 +331,29 @@ def _server( "args": ["X-Scope-OrgID", "$ensured_x_scope_orgid"], }, {"directive": "server_name", "args": [self.server_name]}, - *self._locations(upstream, grpc), + *self._locations(upstream, grpc, tls), ], } + + @property + def tls_ready(self) -> bool: + """Whether cert, key, and ca paths are found on disk and Nginx is ready to use tls.""" + if not self._container.can_connect(): + return False + return all( + self._container.exists(tls_path) for tls_path in (KEY_PATH, CERT_PATH, CA_CERT_PATH) + ) + + def configure_tls(self, private_key: str, server_cert: str, ca_cert: str) -> None: + """Save the certificates file to disk and run update-ca-certificates.""" + if self._container.can_connect(): + self._container.push(KEY_PATH, private_key, make_dirs=True) + self._container.push(CERT_PATH, server_cert, make_dirs=True) + self._container.push(CA_CERT_PATH, ca_cert, make_dirs=True) + + def delete_certificates(self) -> None: + """Delete the certificate files from disk and run update-ca-certificates.""" + if self._container.can_connect(): + self._container.remove_path(CERT_PATH, recursive=True) + self._container.remove_path(KEY_PATH, recursive=True) + self._container.remove_path(CA_CERT_PATH, recursive=True) diff --git a/src/tempo.py b/src/tempo.py index bdd42b3..b44d1f3 100644 --- a/src/tempo.py +++ b/src/tempo.py @@ -252,7 +252,6 @@ def _build_compactor_config(self): # total trace retention block_retention="720h", compacted_block_retention="1h", - v2_out_buffer_bytes=5242880, ) ) diff --git a/src/tempo_config.py b/src/tempo_config.py index 3d0d7c3..1c2a186 100644 --- a/src/tempo_config.py +++ b/src/tempo_config.py @@ -137,7 +137,6 @@ class Compaction(BaseModel): max_compaction_objects: int block_retention: str compacted_block_retention: str - v2_out_buffer_bytes: int class Compactor(BaseModel): diff --git a/tests/scenario/test_nginx.py b/tests/scenario/test_nginx.py index c8409fc..9deee5d 100644 --- a/tests/scenario/test_nginx.py +++ b/tests/scenario/test_nginx.py @@ -24,7 +24,7 @@ def test_nginx_config_is_list_before_crossplane(context, nginx_container, tempo_ nginx = Nginx(tempo_charm, tempo_cluster_provider, "lolcathost") - prepared_config = nginx._prepare_config(tls=False) + prepared_config = nginx._prepare_config() assert isinstance(prepared_config, List) @@ -35,9 +35,9 @@ def test_nginx_config_is_parsed_by_crossplane(context, nginx_container, tempo_cl tempo_charm.unit = MagicMock(return_value=unit) nginx = Nginx(tempo_charm, tempo_cluster_provider, "lolcathost") - logger.info(nginx._prepare_config(tls=False)) + logger.info(nginx._prepare_config()) - prepared_config = nginx.config(tls=False) + prepared_config = nginx.config() assert isinstance(prepared_config, str) @@ -94,5 +94,5 @@ def test_nginx_config_is_parsed_with_workers( nginx = Nginx(tempo_charm, tempo_cluster_provider, "lolcathost") - prepared_config = nginx.config(tls=False) + prepared_config = nginx.config() assert isinstance(prepared_config, str) From 68bcfab70e62821db41c39e6d2642ec795ea79f2 Mon Sep 17 00:00:00 2001 From: Mateusz Kulewicz Date: Thu, 11 Jul 2024 12:35:34 +0200 Subject: [PATCH 17/19] Simpler handling of upstream and server creation, less magic strings --- src/nginx.py | 84 ++++++++++++++++++++++++---------------------------- src/tempo.py | 2 +- 2 files changed, 40 insertions(+), 46 deletions(-) diff --git a/src/nginx.py b/src/nginx.py index 4be3a10..b706f9a 100644 --- a/src/nginx.py +++ b/src/nginx.py @@ -166,53 +166,53 @@ def _log_verbose(self, verbose: bool = True) -> List[Dict[str, Any]]: def _upstreams(self, addresses_by_role: Dict[str, Set[str]]) -> List[Dict[str, Any]]: addresses_mapped_to_upstreams = {} nginx_upstreams = [] - # TODO this code might be unnecessarily complex. Can we simplify it? - if TempoRole.distributor in addresses_by_role.keys(): - addresses_mapped_to_upstreams["distributor"] = addresses_by_role[TempoRole.distributor] - if TempoRole.query_frontend in addresses_by_role.keys(): - addresses_mapped_to_upstreams["query_frontend"] = addresses_by_role[ - TempoRole.query_frontend - ] + addresses_mapped_to_upstreams = addresses_by_role.copy() if TempoRole.all in addresses_by_role.keys(): # for all, we add addresses to existing upstreams for distributor / query_frontend or create the set - if "distributor" in addresses_mapped_to_upstreams: - addresses_mapped_to_upstreams["distributor"] = addresses_mapped_to_upstreams[ - "distributor" - ].union(addresses_by_role[TempoRole.all]) + if TempoRole.distributor in addresses_mapped_to_upstreams: + addresses_mapped_to_upstreams[TempoRole.distributor] = ( + addresses_mapped_to_upstreams[TempoRole.distributor].union( + addresses_by_role[TempoRole.all] + ) + ) else: - addresses_mapped_to_upstreams["distributor"] = addresses_by_role[TempoRole.all] - if "query_frontend" in addresses_mapped_to_upstreams: - addresses_mapped_to_upstreams["query_frontend"] = addresses_mapped_to_upstreams[ - "query_frontend" - ].union(addresses_by_role[TempoRole.all]) + addresses_mapped_to_upstreams[TempoRole.distributor] = addresses_by_role[ + TempoRole.all + ] + if TempoRole.query_frontend in addresses_mapped_to_upstreams: + addresses_mapped_to_upstreams[TempoRole.query_frontend] = ( + addresses_mapped_to_upstreams[TempoRole.query_frontend].union( + addresses_by_role[TempoRole.all] + ) + ) else: - addresses_mapped_to_upstreams["query_frontend"] = addresses_by_role[TempoRole.all] - if "distributor" in addresses_mapped_to_upstreams.keys(): + addresses_mapped_to_upstreams[TempoRole.query_frontend] = addresses_by_role[ + TempoRole.all + ] + if TempoRole.distributor in addresses_mapped_to_upstreams.keys(): nginx_upstreams.extend( - self._distributor_upstreams(addresses_mapped_to_upstreams["distributor"]) + self._distributor_upstreams(addresses_mapped_to_upstreams[TempoRole.distributor]) ) - if "query_frontend" in addresses_mapped_to_upstreams.keys(): + if TempoRole.query_frontend in addresses_mapped_to_upstreams.keys(): nginx_upstreams.extend( - self._query_frontend_upstreams(addresses_mapped_to_upstreams["query_frontend"]) + self._query_frontend_upstreams( + addresses_mapped_to_upstreams[TempoRole.query_frontend] + ) ) return nginx_upstreams def _distributor_upstreams(self, address_set: Set[str]) -> List[Dict[str, Any]]: - return [ - self._upstream("otlp-http", address_set, Tempo.receiver_ports["otlp_http"]), - self._upstream("otlp-grpc", address_set, Tempo.receiver_ports["otlp_grpc"]), - self._upstream("zipkin", address_set, Tempo.receiver_ports["zipkin"]), - self._upstream( - "jaeger-thrift-http", address_set, Tempo.receiver_ports["jaeger_thrift_http"] - ), - ] + upstreams = [] + for protocol, port in Tempo.receiver_ports.items(): + upstreams.append(self._upstream(protocol.replace("_", "-"), address_set, port)) + return upstreams def _query_frontend_upstreams(self, address_set: Set[str]) -> List[Dict[str, Any]]: - return [ - self._upstream("tempo-http", address_set, Tempo.server_ports["tempo_http"]), - self._upstream("tempo-grpc", address_set, Tempo.server_ports["tempo_grpc"]), - ] + upstreams = [] + for protocol, port in Tempo.server_ports.items(): + upstreams.append(self._upstream(protocol.replace("_", "-"), address_set, port)) + return upstreams def _upstream(self, role: str, address_set: Set[str], port: int) -> Dict[str, Any]: return { @@ -280,18 +280,12 @@ def _servers(self, addresses_by_role: Dict[str, Set[str]]) -> List[Dict[str, Any servers = [] roles = addresses_by_role.keys() - if "distributor" in roles or "all" in roles: - servers.append(self._server(Tempo.receiver_ports["otlp_http"], "otlp-http", False)) - servers.append(self._server(Tempo.receiver_ports["zipkin"], "zipkin", False)) - servers.append( - self._server( - Tempo.receiver_ports["jaeger_thrift_http"], "jaeger-thrift-http", False - ) - ) - servers.append(self._server(Tempo.receiver_ports["otlp_grpc"], "otlp-grpc", True)) - if "query-frontend" in roles or "all" in roles: - servers.append(self._server(Tempo.server_ports["tempo_http"], "tempo-http", False)) - servers.append(self._server(Tempo.server_ports["tempo_grpc"], "tempo-grpc", True)) + if TempoRole.distributor.value in roles or TempoRole.all.value in roles: + for protocol, port in Tempo.receiver_ports.items(): + servers.append(self._server(port, protocol.replace("_", "-"), "grpc" in protocol)) + if TempoRole.query_frontend.value in roles or TempoRole.all.value in roles: + for protocol, port in Tempo.server_ports.items(): + servers.append(self._server(port, protocol.replace("_", "-"), "grpc" in protocol)) return servers def _server(self, port: int, upstream: str, grpc: bool = False) -> Dict[str, Any]: diff --git a/src/tempo.py b/src/tempo.py index b44d1f3..fd17ab2 100644 --- a/src/tempo.py +++ b/src/tempo.py @@ -42,7 +42,7 @@ class Tempo: memberlist_port = 7946 - server_ports = { + server_ports: Dict[str, int] = { "tempo_http": 3200, "tempo_grpc": 9096, # default grpc listen port is 9095, but that conflicts with promtail. } From 7ab3dff1edd9c64999ba22b08f2e7a8cbcf07ceb Mon Sep 17 00:00:00 2001 From: Mateusz Kulewicz Date: Fri, 12 Jul 2024 17:24:50 +0200 Subject: [PATCH 18/19] Test created nginx configuration contents --- tests/scenario/test_nginx.py | 65 ++++++++++++++++++++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/tests/scenario/test_nginx.py b/tests/scenario/test_nginx.py index 9deee5d..29811a0 100644 --- a/tests/scenario/test_nginx.py +++ b/tests/scenario/test_nginx.py @@ -5,6 +5,7 @@ import pytest from nginx import Nginx +from tempo import Tempo from tempo_cluster import TempoClusterProvider logger = logging.getLogger(__name__) @@ -96,3 +97,67 @@ def test_nginx_config_is_parsed_with_workers( prepared_config = nginx.config() assert isinstance(prepared_config, str) + + +@pytest.mark.parametrize( + "addresses", + ( + {"all": {"1.2.3.4"}}, + {"all": {"1.2.3.4", "1.2.3.5"}}, + { + "all": {"1.2.3.4"}, + "distributor": {"1.2.3.5"}, + "ingester": {"1.2.3.6"}, + "querier": {"1.2.4.7"}, + "query_frontend": {"1.2.5.1"}, + "compactor": {"1.2.6.6"}, + "metrics_generator": {"1.2.8.4"}, + }, + { + "distributor": {"1.2.3.5"}, + "ingester": {"1.2.3.6"}, + "querier": {"1.2.4.7"}, + "query_frontend": {"1.2.5.1"}, + "compactor": {"1.2.6.6"}, + "metrics_generator": {"1.2.8.4"}, + }, + ), +) +def test_nginx_config_contains_upstreams_and_proxy_pass( + context, nginx_container, tempo_cluster_provider, addresses +): + tempo_cluster_provider.gather_addresses_by_role = MagicMock(return_value=addresses) + + unit = MagicMock() + unit.get_container = nginx_container + tempo_charm = MagicMock() + tempo_charm.unit = MagicMock(return_value=unit) + + nginx = Nginx(tempo_charm, tempo_cluster_provider, "lolcathost") + + prepared_config = nginx.config() + + for role, addresses in addresses.items(): + for address in addresses: + if role == "all": + _assert_config_per_role(Tempo.all_ports, address, prepared_config) + if role == "distributor": + _assert_config_per_role(Tempo.receiver_ports, address, prepared_config) + if role == "query-frontend": + _assert_config_per_role(Tempo.server_ports, address, prepared_config) + + +def _assert_config_per_role(source_dict, address, prepared_config): + # as entire config is in a format that's hard to parse (and crossplane returns a string), we look for servers, + # upstreams and correct proxy/grpc_pass instructions. + for port in source_dict.values(): + assert f"server {address}:{port};" in prepared_config + assert f"listen {port}" in prepared_config + assert f"listen [::]:{port}" in prepared_config + for protocol in source_dict.keys(): + sanitised_protocol = protocol.replace("_", "-") + assert f"upstream {sanitised_protocol}" in prepared_config + if "grpc" in protocol: + assert f"grpc_pass grpcs://{sanitised_protocol}" in prepared_config + else: + assert f"proxy_pass https://{sanitised_protocol}" in prepared_config From 33d7968006713f14ef426cc872ac4c71a0a72ce6 Mon Sep 17 00:00:00 2001 From: Mateusz Kulewicz Date: Mon, 15 Jul 2024 11:34:07 +0200 Subject: [PATCH 19/19] remove commented out line --- src/charm.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/charm.py b/src/charm.py index 8ef5447..276f4f1 100755 --- a/src/charm.py +++ b/src/charm.py @@ -90,7 +90,6 @@ def __init__(self, *args): # # Patch the juju-created Kubernetes service to contain the right ports self.unit.set_ports(*self.tempo.all_ports.values()) - # self._service_patcher = KubernetesServicePatch(self, external_ports) # Provide ability for Tempo to be scraped by Prometheus using prometheus_scrape self._scraping = MetricsEndpointProvider( self,