diff --git a/charmcraft.yaml b/charmcraft.yaml index f82ceff..fdcbc28 100644 --- a/charmcraft.yaml +++ b/charmcraft.yaml @@ -16,6 +16,22 @@ description: | summary: | Tempo is a distributed tracing backend by Grafana. +containers: + nginx: + resource: nginx-image + nginx-prometheus-exporter: + resource: nginx-prometheus-exporter-image + +resources: + nginx-image: + type: oci-image + description: OCI image for nginx + upstream-source: ubuntu/nginx:1.24-24.04_beta + nginx-prometheus-exporter-image: + type: oci-image + description: OCI image for nginx-prometheus-exporter + upstream-source: nginx/nginx-prometheus-exporter:1.1.0 + links: # FIXME: create docs tree root documentation: https://discourse.charmhub.io/t/tempo-coordinator-k8s-docs-index diff --git a/requirements.txt b/requirements.txt index 0b4d4a0..3834f32 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,6 +6,8 @@ jsonschema==4.17.0 lightkube==0.11.0 lightkube-models==1.24.1.4 tenacity==8.2.3 +# crossplane is a package from nginxinc to interact with the Nginx config +crossplane # PYDEPS # lib/charms/tempo_k8s/v1/charm_tracing.py diff --git a/src/charm.py b/src/charm.py index 9dbb12d..276f4f1 100755 --- a/src/charm.py +++ b/src/charm.py @@ -13,7 +13,6 @@ from charms.data_platform_libs.v0.s3 import S3Requirer from charms.grafana_k8s.v0.grafana_dashboard import GrafanaDashboardProvider from charms.grafana_k8s.v0.grafana_source import GrafanaSourceProvider -from charms.observability_libs.v0.kubernetes_service_patch import KubernetesServicePatch from charms.observability_libs.v1.cert_handler import VAULT_SECRET_LABEL, CertHandler from charms.prometheus_k8s.v0.prometheus_scrape import MetricsEndpointProvider from charms.tempo_k8s.v1.charm_tracing import trace_charm @@ -28,6 +27,8 @@ from ops.model import ActiveStatus, BlockedStatus, Relation, WaitingStatus from coordinator import TempoCoordinator +from nginx import Nginx +from nginx_prometheus_exporter import NginxPrometheusExporter from tempo import Tempo from tempo_cluster import TempoClusterProvider @@ -48,6 +49,7 @@ class TempoCoordinatorCharm(CharmBase): def __init__(self, *args): super().__init__(*args) + self.ingress = TraefikRouteRequirer(self, self.model.get_relation("ingress"), "ingress") # type: ignore self.tempo_cluster = TempoClusterProvider(self) self.coordinator = TempoCoordinator(self.tempo_cluster) @@ -66,6 +68,13 @@ def __init__(self, *args): self.s3_requirer = S3Requirer(self, Tempo.s3_relation_name, Tempo.s3_bucket_name) + self.nginx = Nginx( + self, + cluster_provider=self.tempo_cluster, + server_name=self.hostname, + ) + self.nginx_prometheus_exporter = NginxPrometheusExporter(self) + # configure this tempo as a datasource in grafana self.grafana_source_provider = GrafanaSourceProvider( self, @@ -79,8 +88,8 @@ def __init__(self, *args): ], ) # # Patch the juju-created Kubernetes service to contain the right ports - external_ports = tempo.get_external_ports(self.app.name) - self._service_patcher = KubernetesServicePatch(self, external_ports) + self.unit.set_ports(*self.tempo.all_ports.values()) + # Provide ability for Tempo to be scraped by Prometheus using prometheus_scrape self._scraping = MetricsEndpointProvider( self, @@ -115,6 +124,13 @@ def __init__(self, *args): self.framework.observe(self.on.config_changed, self._on_config_changed) self.framework.observe(self.on.list_receivers_action, self._on_list_receivers_action) + # nginx + self.framework.observe(self.on.nginx_pebble_ready, self._on_nginx_pebble_ready) + self.framework.observe( + self.on.nginx_prometheus_exporter_pebble_ready, + self._on_nginx_prometheus_exporter_pebble_ready, + ) + # ingress ingress = self.on["ingress"] self.framework.observe(ingress.relation_created, self._on_ingress_relation_created) @@ -139,6 +155,9 @@ def __init__(self, *args): # cluster self.framework.observe(self.tempo_cluster.on.changed, self._on_tempo_cluster_changed) + for evt in self.on.events().values(): + self.framework.observe(evt, self._on_event) # type: ignore + ###################### # UTILITY PROPERTIES # ###################### @@ -272,8 +291,14 @@ def _on_tracing_broken(self, _): def _on_cert_handler_changed(self, _): if self.tls_available: logger.debug("enabling TLS") + self.nginx.configure_tls( + server_cert=self.cert_handler.server_cert, # type: ignore + ca_cert=self.cert_handler.ca_cert, # type: ignore + private_key=self.cert_handler.private_key, # type: ignore + ) else: logger.debug("disabling TLS") + self.nginx.delete_certificates() # tls readiness change means config change. # sync scheme change with traefik and related consumers @@ -368,6 +393,26 @@ def _on_collect_unit_status(self, e: CollectStatusEvent): else: e.add_status(ActiveStatus()) + def _on_nginx_pebble_ready(self, _) -> None: + self.nginx.configure_pebble_layer() + + def _on_nginx_prometheus_exporter_pebble_ready(self, _) -> None: + self.nginx_prometheus_exporter.configure_pebble_layer() + + def _on_event(self, event) -> None: + """A set of common configuration actions that should happen on every event.""" + if isinstance(event, CollectStatusEvent): + return + # plan layers + self.nginx.configure_pebble_layer() + self.nginx_prometheus_exporter.configure_pebble_layer() + # configure ingress + self._configure_ingress() + # update cluster relations + self._update_tempo_cluster() + # update tracing relations + self._update_tracing_relations() + ################### # UTILITY METHODS # ################### @@ -386,7 +431,7 @@ def _configure_ingress(self) -> None: # notify the cluster self._update_tempo_cluster() - def _update_tracing_relations(self): + def _update_tracing_relations(self) -> None: tracing_relations = self.model.relations["tracing"] if not tracing_relations: # todo: set waiting status and configure tempo to run without receivers if possible, @@ -414,12 +459,12 @@ def _requested_receivers(self) -> Tuple[ReceiverProtocol, ...]: requested_receivers = requested_protocols.intersection(set(self.tempo.receiver_ports)) return tuple(requested_receivers) - def server_cert(self): + def server_cert(self) -> str: """For charm tracing.""" self._update_server_cert() return self.tempo.server_cert_path - def _update_server_cert(self): + def _update_server_cert(self) -> None: """Server certificate for charm tracing tls, if tls is enabled.""" server_cert = Path(self.tempo.server_cert_path) if self.tls_available: @@ -473,7 +518,7 @@ def loki_endpoints_by_unit(self) -> Dict[str, str]: return endpoints - def _update_tempo_cluster(self): + def _update_tempo_cluster(self) -> None: """Build the config and publish everything to the application databag.""" if not self._is_consistent: logger.error("skipped tempo cluster update: inconsistent state") diff --git a/src/nginx.py b/src/nginx.py index 7f9613e..b706f9a 100644 --- a/src/nginx.py +++ b/src/nginx.py @@ -7,181 +7,20 @@ import crossplane from ops import CharmBase -from ops.pebble import Layer +from ops.pebble import Layer, PathError, ProtocolError -from tempo_cluster import TempoClusterProvider +from tempo import Tempo +from tempo_cluster import TempoClusterProvider, TempoRole logger = logging.getLogger(__name__) NGINX_DIR = "/etc/nginx" NGINX_CONFIG = f"{NGINX_DIR}/nginx.conf" -NGINX_PORT = "8080" KEY_PATH = f"{NGINX_DIR}/certs/server.key" CERT_PATH = f"{NGINX_DIR}/certs/server.cert" CA_CERT_PATH = f"{NGINX_DIR}/certs/ca.cert" -LOCATIONS_DISTRIBUTOR: List[Dict[str, Any]] = [ - { - "directive": "location", - "args": ["/distributor"], - "block": [ - { - "directive": "proxy_pass", - "args": ["http://distributor"], - }, - ], - }, - { - "directive": "location", - "args": ["/api/v1/push"], - "block": [ - { - "directive": "proxy_pass", - "args": ["http://distributor"], - }, - ], - }, - { - "directive": "location", - "args": ["/otlp/v1/metrics"], - "block": [ - { - "directive": "proxy_pass", - "args": ["http://distributor"], - }, - ], - }, -] -LOCATIONS_ALERTMANAGER: List[Dict] = [ - { - "directive": "location", - "args": ["/alertmanager"], - "block": [ - { - "directive": "proxy_pass", - "args": ["http://alertmanager"], - }, - ], - }, - { - "directive": "location", - "args": ["/multitenant_alertmanager/status"], - "block": [ - { - "directive": "proxy_pass", - "args": ["http://alertmanager"], - }, - ], - }, - { - "directive": "location", - "args": ["/api/v1/alerts"], - "block": [ - { - "directive": "proxy_pass", - "args": ["http://alertmanager"], - }, - ], - }, -] -LOCATIONS_RULER: List[Dict] = [ - { - "directive": "location", - "args": ["/prometheus/config/v1/rules"], - "block": [ - { - "directive": "proxy_pass", - "args": ["http://ruler"], - }, - ], - }, - { - "directive": "location", - "args": ["/prometheus/api/v1/rules"], - "block": [ - { - "directive": "proxy_pass", - "args": ["http://ruler"], - }, - ], - }, - { - "directive": "location", - "args": ["/prometheus/api/v1/alerts"], - "block": [ - { - "directive": "proxy_pass", - "args": ["http://ruler"], - }, - ], - }, - { - "directive": "location", - "args": ["=", "/ruler/ring"], - "block": [ - { - "directive": "proxy_pass", - "args": ["http://ruler"], - }, - ], - }, -] -LOCATIONS_QUERY_FRONTEND: List[Dict] = [ - { - "directive": "location", - "args": ["/prometheus"], - "block": [ - { - "directive": "proxy_pass", - "args": ["http://query-frontend"], - }, - ], - }, - # Buildinfo endpoint can go to any component - { - "directive": "location", - "args": ["=", "/api/v1/status/buildinfo"], - "block": [ - { - "directive": "proxy_pass", - "args": ["http://query-frontend"], - }, - ], - }, -] -LOCATIONS_COMPACTOR: List[Dict] = [ - # Compactor endpoint for uploading blocks - { - "directive": "location", - "args": ["=", "/api/v1/upload/block/"], - "block": [ - { - "directive": "proxy_pass", - "args": ["http://compactor"], - }, - ], - }, -] - -LOCATIONS_BASIC: List[Dict] = [ - { - "directive": "location", - "args": ["=", "/"], - "block": [ - {"directive": "return", "args": ["200", "'OK'"]}, - {"directive": "auth_basic", "args": ["off"]}, - ], - }, - { # Location to be used by nginx-prometheus-exporter - "directive": "location", - "args": ["=", "/status"], - "block": [ - {"directive": "stub_status", "args": []}, - ], - }, -] - class Nginx: """Helper class to manage the nginx workload.""" @@ -194,20 +33,27 @@ def __init__(self, charm: CharmBase, cluster_provider: TempoClusterProvider, ser self.server_name = server_name self._container = self._charm.unit.get_container("nginx") - def configure_pebble_layer(self, tls: bool) -> None: + def configure_pebble_layer(self) -> None: """Configure pebble layer.""" + new_config: str = self.config() + should_restart: bool = self._has_config_changed(new_config) if self._container.can_connect(): - self._container.push( - self.config_path, self.config(tls=tls), make_dirs=True # type: ignore - ) + self._container.push(self.config_path, new_config, make_dirs=True) # type: ignore self._container.add_layer("nginx", self.layer, combine=True) self._container.autostart() - def config(self, tls: bool = False) -> str: + if should_restart: + logger.info("new nginx config: reloading the service") + self.reload() + + def config(self) -> str: """Build and return the Nginx configuration.""" + full_config = self._prepare_config() + return crossplane.build(full_config) + + def _prepare_config(self) -> List[dict]: log_level = "error" addresses_by_role = self.cluster_provider.gather_addresses_by_role() - # build the complete configuration full_config = [ {"directive": "worker_processes", "args": ["5"]}, @@ -256,12 +102,33 @@ def config(self, tls: bool = False) -> str: }, {"directive": "proxy_read_timeout", "args": ["300"]}, # server block - self._server(addresses_by_role, tls), + *self._servers(addresses_by_role), ], }, ] + return full_config - return crossplane.build(full_config) + def _has_config_changed(self, new_config: str) -> bool: + """Return True if the passed config differs from the one on disk.""" + if not self._container.can_connect(): + logger.debug("Could not connect to Nginx container") + return False + + try: + current_config = self._container.pull(self.config_path).read() + except (ProtocolError, PathError) as e: + logger.warning( + "Could not check the current nginx configuration due to " + "a failure in retrieving the file: %s", + e, + ) + return False + + return current_config != new_config + + def reload(self) -> None: + """Reload the nginx config without restarting the service.""" + self._container.exec(["nginx", "-s", "reload"]) @property def layer(self) -> Layer: @@ -274,7 +141,7 @@ def layer(self) -> Layer: "nginx": { "override": "replace", "summary": "nginx", - "command": "nginx", + "command": "nginx -g 'daemon off;'", "startup": "enabled", } }, @@ -297,35 +164,78 @@ def _log_verbose(self, verbose: bool = True) -> List[Dict[str, Any]]: ] def _upstreams(self, addresses_by_role: Dict[str, Set[str]]) -> List[Dict[str, Any]]: + addresses_mapped_to_upstreams = {} nginx_upstreams = [] - for role, address_set in addresses_by_role.items(): - nginx_upstreams.append( - { - "directive": "upstream", - "args": [role], - "block": [ - {"directive": "server", "args": [f"{addr}:{NGINX_PORT}"]} - for addr in address_set - ], - } + addresses_mapped_to_upstreams = addresses_by_role.copy() + if TempoRole.all in addresses_by_role.keys(): + # for all, we add addresses to existing upstreams for distributor / query_frontend or create the set + if TempoRole.distributor in addresses_mapped_to_upstreams: + addresses_mapped_to_upstreams[TempoRole.distributor] = ( + addresses_mapped_to_upstreams[TempoRole.distributor].union( + addresses_by_role[TempoRole.all] + ) + ) + else: + addresses_mapped_to_upstreams[TempoRole.distributor] = addresses_by_role[ + TempoRole.all + ] + if TempoRole.query_frontend in addresses_mapped_to_upstreams: + addresses_mapped_to_upstreams[TempoRole.query_frontend] = ( + addresses_mapped_to_upstreams[TempoRole.query_frontend].union( + addresses_by_role[TempoRole.all] + ) + ) + else: + addresses_mapped_to_upstreams[TempoRole.query_frontend] = addresses_by_role[ + TempoRole.all + ] + if TempoRole.distributor in addresses_mapped_to_upstreams.keys(): + nginx_upstreams.extend( + self._distributor_upstreams(addresses_mapped_to_upstreams[TempoRole.distributor]) + ) + if TempoRole.query_frontend in addresses_mapped_to_upstreams.keys(): + nginx_upstreams.extend( + self._query_frontend_upstreams( + addresses_mapped_to_upstreams[TempoRole.query_frontend] + ) ) return nginx_upstreams - def _locations(self, addresses_by_role: Dict[str, Set[str]]) -> List[Dict[str, Any]]: - nginx_locations = LOCATIONS_BASIC.copy() - roles = addresses_by_role.keys() + def _distributor_upstreams(self, address_set: Set[str]) -> List[Dict[str, Any]]: + upstreams = [] + for protocol, port in Tempo.receiver_ports.items(): + upstreams.append(self._upstream(protocol.replace("_", "-"), address_set, port)) + return upstreams - if "distributor" in roles: - nginx_locations.extend(LOCATIONS_DISTRIBUTOR) - if "alertmanager" in roles: - nginx_locations.extend(LOCATIONS_ALERTMANAGER) - if "ruler" in roles: - nginx_locations.extend(LOCATIONS_RULER) - if "query-frontend" in roles: - nginx_locations.extend(LOCATIONS_QUERY_FRONTEND) - if "compactor" in roles: - nginx_locations.extend(LOCATIONS_COMPACTOR) + def _query_frontend_upstreams(self, address_set: Set[str]) -> List[Dict[str, Any]]: + upstreams = [] + for protocol, port in Tempo.server_ports.items(): + upstreams.append(self._upstream(protocol.replace("_", "-"), address_set, port)) + return upstreams + + def _upstream(self, role: str, address_set: Set[str], port: int) -> Dict[str, Any]: + return { + "directive": "upstream", + "args": [role], + "block": [{"directive": "server", "args": [f"{addr}:{port}"]} for addr in address_set], + } + + def _locations(self, upstream: str, grpc: bool, tls: bool) -> List[Dict[str, Any]]: + s = "s" if tls else "" + protocol = f"grpc{s}" if grpc else f"http{s}" + nginx_locations = [ + { + "directive": "location", + "args": ["/"], + "block": [ + { + "directive": "grpc_pass" if grpc else "proxy_pass", + "args": [f"{protocol}://{upstream}"], + } + ], + } + ] return nginx_locations def _resolver(self, custom_resolver: Optional[List[Any]] = None) -> List[Dict[str, Any]]: @@ -344,16 +254,51 @@ def _basic_auth(self, enabled: bool) -> List[Optional[Dict[str, Any]]]: ] return [] - def _server(self, addresses_by_role: Dict[str, Set[str]], tls: bool = False) -> Dict[str, Any]: + def _listen(self, port: int, ssl: bool, http2: bool) -> List[Dict[str, Any]]: + directives = [] + directives.append( + {"directive": "listen", "args": self._listen_args(port, False, ssl, http2)} + ) + directives.append( + {"directive": "listen", "args": self._listen_args(port, True, ssl, http2)} + ) + return directives + + def _listen_args(self, port: int, ipv6: bool, ssl: bool, http2: bool) -> List[str]: + args = [] + if ipv6: + args.append(f"[::]:{port}") + else: + args.append(f"{port}") + if ssl: + args.append("ssl") + if http2: + args.append("http2") + return args + + def _servers(self, addresses_by_role: Dict[str, Set[str]]) -> List[Dict[str, Any]]: + servers = [] + roles = addresses_by_role.keys() + + if TempoRole.distributor.value in roles or TempoRole.all.value in roles: + for protocol, port in Tempo.receiver_ports.items(): + servers.append(self._server(port, protocol.replace("_", "-"), "grpc" in protocol)) + if TempoRole.query_frontend.value in roles or TempoRole.all.value in roles: + for protocol, port in Tempo.server_ports.items(): + servers.append(self._server(port, protocol.replace("_", "-"), "grpc" in protocol)) + return servers + + def _server(self, port: int, upstream: str, grpc: bool = False) -> Dict[str, Any]: auth_enabled = False + tls = self.tls_ready + if tls: return { "directive": "server", "args": [], "block": [ - {"directive": "listen", "args": ["443", "ssl"]}, - {"directive": "listen", "args": ["[::]:443", "ssl"]}, + *self._listen(port, ssl=True, http2=grpc), *self._basic_auth(auth_enabled), { "directive": "proxy_set_header", @@ -365,7 +310,7 @@ def _server(self, addresses_by_role: Dict[str, Set[str]], tls: bool = False) -> {"directive": "ssl_certificate_key", "args": [KEY_PATH]}, {"directive": "ssl_protocols", "args": ["TLSv1", "TLSv1.1", "TLSv1.2"]}, {"directive": "ssl_ciphers", "args": ["HIGH:!aNULL:!MD5"]}, # codespell:ignore - *self._locations(addresses_by_role), + *self._locations(upstream, grpc, tls), ], } @@ -373,13 +318,36 @@ def _server(self, addresses_by_role: Dict[str, Set[str]], tls: bool = False) -> "directive": "server", "args": [], "block": [ - {"directive": "listen", "args": [NGINX_PORT]}, - {"directive": "listen", "args": [f"[::]:{NGINX_PORT}"]}, + *self._listen(port, ssl=False, http2=grpc), *self._basic_auth(auth_enabled), { "directive": "proxy_set_header", "args": ["X-Scope-OrgID", "$ensured_x_scope_orgid"], }, - *self._locations(addresses_by_role), + {"directive": "server_name", "args": [self.server_name]}, + *self._locations(upstream, grpc, tls), ], } + + @property + def tls_ready(self) -> bool: + """Whether cert, key, and ca paths are found on disk and Nginx is ready to use tls.""" + if not self._container.can_connect(): + return False + return all( + self._container.exists(tls_path) for tls_path in (KEY_PATH, CERT_PATH, CA_CERT_PATH) + ) + + def configure_tls(self, private_key: str, server_cert: str, ca_cert: str) -> None: + """Save the certificates file to disk and run update-ca-certificates.""" + if self._container.can_connect(): + self._container.push(KEY_PATH, private_key, make_dirs=True) + self._container.push(CERT_PATH, server_cert, make_dirs=True) + self._container.push(CA_CERT_PATH, ca_cert, make_dirs=True) + + def delete_certificates(self) -> None: + """Delete the certificate files from disk and run update-ca-certificates.""" + if self._container.can_connect(): + self._container.remove_path(CERT_PATH, recursive=True) + self._container.remove_path(KEY_PATH, recursive=True) + self._container.remove_path(CA_CERT_PATH, recursive=True) diff --git a/src/nginx_prometheus_exporter.py b/src/nginx_prometheus_exporter.py index 6d516a7..7ce7b2c 100644 --- a/src/nginx_prometheus_exporter.py +++ b/src/nginx_prometheus_exporter.py @@ -7,8 +7,6 @@ from ops import CharmBase from ops.pebble import Layer -from nginx import NGINX_PORT - logger = logging.getLogger(__name__) NGINX_PROMETHEUS_EXPORTER_PORT = "9113" @@ -29,7 +27,7 @@ def configure_pebble_layer(self) -> None: @property def layer(self) -> Layer: """Return the Pebble layer for Nginx Prometheus exporter.""" - scheme = "https" if self._charm._is_cert_available else "http" # type: ignore + scheme = "https" if self._charm.tls_available else "http" # type: ignore return Layer( { "summary": "nginx prometheus exporter layer", @@ -38,7 +36,7 @@ def layer(self) -> Layer: "nginx": { "override": "replace", "summary": "nginx prometheus exporter", - "command": f"nginx-prometheus-exporter --no-nginx.ssl-verify --web.listen-address=:{NGINX_PROMETHEUS_EXPORTER_PORT} --nginx.scrape-uri={scheme}://127.0.0.1:{NGINX_PORT}/status", + "command": f"nginx-prometheus-exporter --no-nginx.ssl-verify --web.listen-address=:{NGINX_PROMETHEUS_EXPORTER_PORT} --nginx.scrape-uri={scheme}://127.0.0.1:3200/status", "startup": "enabled", } }, diff --git a/src/tempo.py b/src/tempo.py index 91419cf..fd17ab2 100644 --- a/src/tempo.py +++ b/src/tempo.py @@ -42,7 +42,7 @@ class Tempo: memberlist_port = 7946 - server_ports = { + server_ports: Dict[str, int] = { "tempo_http": 3200, "tempo_grpc": 9096, # default grpc listen port is 9095, but that conflicts with promtail. } @@ -207,7 +207,9 @@ def _build_storage_config(self, s3_config: dict): endpoint=s3_config["endpoint"], secret_key=s3_config["secret-key"], ), - block=tempo_config.Block(version="v2"), + # starting from Tempo 2.4, we need to use at least parquet v3 to have search capabilities (Grafana support) + # https://grafana.com/docs/tempo/latest/release-notes/v2-4/#vparquet3-is-now-the-default-block-format + block=tempo_config.Block(version="vParquet3"), ) return tempo_config.Storage(trace=storage_config) @@ -250,7 +252,6 @@ def _build_compactor_config(self): # total trace retention block_retention="720h", compacted_block_retention="1h", - v2_out_buffer_bytes=5242880, ) ) diff --git a/src/tempo_config.py b/src/tempo_config.py index 3d0d7c3..1c2a186 100644 --- a/src/tempo_config.py +++ b/src/tempo_config.py @@ -137,7 +137,6 @@ class Compaction(BaseModel): max_compaction_objects: int block_retention: str compacted_block_retention: str - v2_out_buffer_bytes: int class Compactor(BaseModel): diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 1accd8d..fd3d087 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -5,7 +5,6 @@ import shutil from pathlib import Path -import yaml from pytest import fixture from pytest_operator.plugin import OpsTest @@ -19,16 +18,6 @@ async def tempo_charm(ops_test: OpsTest): return charm -@fixture(scope="module") -def tempo_metadata(ops_test: OpsTest): - return yaml.safe_load(Path("./metadata.yaml").read_text()) - - -@fixture(scope="module") -def tempo_oci_image(ops_test: OpsTest, tempo_metadata): - return tempo_metadata["resources"]["tempo-image"]["upstream-source"] - - @fixture(scope="module", autouse=True) def copy_charm_libs_into_tester_charm(ops_test): """Ensure the tester charm has the libraries it uses.""" diff --git a/tests/integration/test_charm.py b/tests/integration/test_charm.py index aa7bb55..f28a3ea 100644 --- a/tests/integration/test_charm.py +++ b/tests/integration/test_charm.py @@ -33,6 +33,9 @@ async def test_build_and_deploy(ops_test: OpsTest): {mc.name}: charm: {charm} trust: true + resources: + nginx-image: {METADATA["resources"]["nginx-image"]["upstream-source"]} + nginx-prometheus-exporter-image: {METADATA["resources"]["nginx-prometheus-exporter-image"]["upstream-source"]} scale: 1 loki: charm: loki-k8s diff --git a/tests/integration/test_scaling_monolithic.py b/tests/integration/test_scaling_monolithic.py index d41b25d..ed76714 100644 --- a/tests/integration/test_scaling_monolithic.py +++ b/tests/integration/test_scaling_monolithic.py @@ -25,7 +25,13 @@ @pytest.mark.abort_on_fail async def test_deploy_tempo(ops_test: OpsTest): tempo_charm = await ops_test.build_charm(".") - await ops_test.model.deploy(tempo_charm, application_name=APP_NAME) + resources = { + "nginx-image": METADATA["resources"]["nginx-image"]["upstream-source"], + "nginx-prometheus-exporter-image": METADATA["resources"][ + "nginx-prometheus-exporter-image" + ]["upstream-source"], + } + await ops_test.model.deploy(tempo_charm, resources=resources, application_name=APP_NAME) await ops_test.model.wait_for_idle( apps=[APP_NAME], diff --git a/tests/integration/test_self_monitoring.py b/tests/integration/test_self_monitoring.py index 74ee598..5bdf0e7 100644 --- a/tests/integration/test_self_monitoring.py +++ b/tests/integration/test_self_monitoring.py @@ -35,6 +35,9 @@ async def test_build_and_deploy(ops_test: OpsTest): charm: {charm} trust: true scale: 1 + resources: + nginx-image: {METADATA["resources"]["nginx-image"]["upstream-source"]} + nginx-prometheus-exporter-image: {METADATA["resources"]["nginx-prometheus-exporter-image"]["upstream-source"]} prom: charm: prometheus-k8s channel: edge diff --git a/tests/scenario/conftest.py b/tests/scenario/conftest.py index f629faa..9fe9ebb 100644 --- a/tests/scenario/conftest.py +++ b/tests/scenario/conftest.py @@ -1,7 +1,7 @@ from unittest.mock import patch import pytest -from scenario import Context, Relation +from scenario import Container, Context, Relation from charm import TempoCoordinatorCharm from tempo_cluster import TempoClusterRequirerAppData, TempoRole @@ -9,10 +9,9 @@ @pytest.fixture def tempo_charm(): - with patch("charm.KubernetesServicePatch"): - with patch("lightkube.core.client.GenericSyncClient"): - with patch("charm.TempoCoordinatorCharm._update_server_cert"): - yield TempoCoordinatorCharm + with patch("lightkube.core.client.GenericSyncClient"): + with patch("charm.TempoCoordinatorCharm._update_server_cert"): + yield TempoCoordinatorCharm @pytest.fixture(scope="function") @@ -45,3 +44,19 @@ def all_worker(): "tempo-cluster", remote_app_data=TempoClusterRequirerAppData(role=TempoRole.all).dump(), ) + + +@pytest.fixture(scope="function") +def nginx_container(): + return Container( + "nginx", + can_connect=True, + ) + + +@pytest.fixture(scope="function") +def nginx_prometheus_exporter_container(): + return Container( + "nginx-prometheus-exporter", + can_connect=True, + ) diff --git a/tests/scenario/test_charm_statuses.py b/tests/scenario/test_charm_statuses.py index 5c88235..5c16e6b 100644 --- a/tests/scenario/test_charm_statuses.py +++ b/tests/scenario/test_charm_statuses.py @@ -7,7 +7,7 @@ def test_monolithic_status_no_s3_no_workers(context): - state_out = context.run("start", State(unit_status=ops.ActiveStatus())) + state_out = context.run("start", State(unit_status=ops.ActiveStatus(), leader=True)) assert state_out.unit_status.name == "blocked" @@ -33,24 +33,32 @@ def test_scaled_status_no_workers(context, all_worker): assert state_out.unit_status.name == "blocked" -def test_scaled_status_with_s3_and_workers(context, s3, all_worker): +def test_scaled_status_with_s3_and_workers( + context, s3, all_worker, nginx_container, nginx_prometheus_exporter_container +): state_out = context.run( "start", State( relations=[PeerRelation("peers", peers_data={1: {}, 2: {}}), s3, all_worker], + containers=[nginx_container, nginx_prometheus_exporter_container], unit_status=ops.ActiveStatus(), + leader=True, ), ) assert state_out.unit_status.name == "active" @patch.object(Tempo, "is_ready", new=True) -def test_happy_status(context, s3, all_worker): +def test_happy_status( + context, s3, all_worker, nginx_container, nginx_prometheus_exporter_container +): state_out = context.run( "start", State( relations=[PeerRelation("peers", peers_data={1: {}, 2: {}}), s3, all_worker], + containers=[nginx_container, nginx_prometheus_exporter_container], unit_status=ops.ActiveStatus(), + leader=True, ), ) assert state_out.unit_status.name == "active" diff --git a/tests/scenario/test_config.py b/tests/scenario/test_config.py index e933a15..3532fb5 100644 --- a/tests/scenario/test_config.py +++ b/tests/scenario/test_config.py @@ -4,7 +4,9 @@ from tempo_cluster import TempoClusterRequirerUnitData -def test_memberlist_multiple_members(context, all_worker, s3): +def test_memberlist_multiple_members( + context, all_worker, s3, nginx_container, nginx_prometheus_exporter_container +): workers_no = 3 all_worker = all_worker.replace( remote_units_data={ @@ -23,7 +25,11 @@ def test_memberlist_multiple_members(context, all_worker, s3): for worker_idx in range(workers_no) }, ) - state = State(leader=True, relations=[all_worker, s3]) + state = State( + leader=True, + relations=[all_worker, s3], + containers=[nginx_container, nginx_prometheus_exporter_container], + ) with context.manager(all_worker.changed_event, state) as mgr: charm: TempoCoordinatorCharm = mgr.charm assert charm.tempo_cluster.gather_addresses() == set( diff --git a/tests/scenario/test_enabled_receivers.py b/tests/scenario/test_enabled_receivers.py index cbc6a20..c77d8b0 100644 --- a/tests/scenario/test_enabled_receivers.py +++ b/tests/scenario/test_enabled_receivers.py @@ -12,19 +12,31 @@ from charm import TempoCoordinatorCharm -def test_receivers_with_no_relations_or_config(context, s3, all_worker): +def test_receivers_with_no_relations_or_config( + context, s3, all_worker, nginx_container, nginx_prometheus_exporter_container +): - state = State(leader=True, relations=[s3, all_worker]) + state = State( + leader=True, + relations=[s3, all_worker], + containers=[nginx_container, nginx_prometheus_exporter_container], + ) state_out = context.run_action("list-receivers", state) assert state_out.results == {"otlp-http": f"http://{socket.getfqdn()}:4318"} -def test_receivers_with_relations(context, s3, all_worker): +def test_receivers_with_relations( + context, s3, all_worker, nginx_container, nginx_prometheus_exporter_container +): tracing = Relation( "tracing", remote_app_data=TracingRequirerAppData(receivers=["otlp_grpc"]).dump(), ) - state = State(leader=True, relations=[s3, all_worker, tracing]) + state = State( + leader=True, + relations=[s3, all_worker, tracing], + containers=[nginx_container, nginx_prometheus_exporter_container], + ) with context.manager(tracing.changed_event, state) as mgr: charm: TempoCoordinatorCharm = mgr.charm # extra receivers should only include default otlp_http @@ -45,7 +57,9 @@ def test_receivers_with_relations(context, s3, all_worker): } -def test_receivers_with_relations_and_config(context, s3, all_worker): +def test_receivers_with_relations_and_config( + context, s3, all_worker, nginx_container, nginx_prometheus_exporter_container +): tracing = Relation( "tracing", local_app_data=TracingProviderAppData( @@ -64,7 +78,10 @@ def test_receivers_with_relations_and_config(context, s3, all_worker): ) # start with a state that has config changed state = State( - config={"always_enable_zipkin": True}, leader=True, relations=[s3, all_worker, tracing] + config={"always_enable_zipkin": True}, + leader=True, + relations=[s3, all_worker, tracing], + containers=[nginx_container, nginx_prometheus_exporter_container], ) with context.manager("config-changed", state) as mgr: charm: TempoCoordinatorCharm = mgr.charm diff --git a/tests/scenario/test_ingressed_tracing.py b/tests/scenario/test_ingressed_tracing.py index 12805f4..ec9c850 100644 --- a/tests/scenario/test_ingressed_tracing.py +++ b/tests/scenario/test_ingressed_tracing.py @@ -9,8 +9,11 @@ @pytest.fixture -def base_state(): - return State(leader=True) +def base_state(nginx_container, nginx_prometheus_exporter_container): + return State( + leader=True, + containers=[nginx_container, nginx_prometheus_exporter_container], + ) def test_external_url_present(context, base_state, s3, all_worker): diff --git a/tests/scenario/test_nginx.py b/tests/scenario/test_nginx.py new file mode 100644 index 0000000..29811a0 --- /dev/null +++ b/tests/scenario/test_nginx.py @@ -0,0 +1,163 @@ +import logging +from typing import List +from unittest.mock import MagicMock + +import pytest + +from nginx import Nginx +from tempo import Tempo +from tempo_cluster import TempoClusterProvider + +logger = logging.getLogger(__name__) + + +@pytest.fixture +def tempo_cluster_provider(): + cluster_mock = MagicMock() + return TempoClusterProvider(cluster_mock) + + +def test_nginx_config_is_list_before_crossplane(context, nginx_container, tempo_cluster_provider): + unit = MagicMock() + unit.get_container = nginx_container + tempo_charm = MagicMock() + tempo_charm.unit = MagicMock(return_value=unit) + + nginx = Nginx(tempo_charm, tempo_cluster_provider, "lolcathost") + + prepared_config = nginx._prepare_config() + assert isinstance(prepared_config, List) + + +def test_nginx_config_is_parsed_by_crossplane(context, nginx_container, tempo_cluster_provider): + unit = MagicMock() + unit.get_container = nginx_container + tempo_charm = MagicMock() + tempo_charm.unit = MagicMock(return_value=unit) + + nginx = Nginx(tempo_charm, tempo_cluster_provider, "lolcathost") + logger.info(nginx._prepare_config()) + + prepared_config = nginx.config() + assert isinstance(prepared_config, str) + + +@pytest.mark.parametrize( + "addresses", + ( + {}, + {"all": {"1.2.3.4"}}, + {"all": {"1.2.3.4", "1.2.3.5"}}, + { + "all": {"1.2.3.4"}, + "distributor": {"1.2.3.5"}, + "ingester": {"1.2.3.6"}, + "querier": {"1.2.4.7"}, + "query_frontend": {"1.2.5.1"}, + "compactor": {"1.2.6.6"}, + "metrics_generator": {"1.2.8.4"}, + }, + { + "distributor": {"1.2.3.5"}, + "ingester": {"1.2.3.6"}, + "querier": {"1.2.4.7"}, + "query_frontend": {"1.2.5.1"}, + "compactor": {"1.2.6.6"}, + "metrics_generator": {"1.2.8.4"}, + }, + { + "distributor": {"1.2.3.5"}, + "ingester": {"1.2.3.6"}, + "querier": {"1.2.4.7"}, + "query_frontend": {"1.2.5.1"}, + "compactor": {"1.2.6.6"}, + "metrics_generator": {"1.2.8.4"}, + }, + { + "distributor": {"1.2.3.5", "1.2.3.7"}, + "ingester": {"1.2.3.6", "1.2.3.8"}, + "querier": {"1.2.4.7", "1.2.4.9"}, + "query_frontend": {"1.2.5.1", "1.2.5.2"}, + "compactor": {"1.2.6.6", "1.2.6.7"}, + "metrics_generator": {"1.2.8.4", "1.2.8.5"}, + }, + ), +) +def test_nginx_config_is_parsed_with_workers( + context, nginx_container, tempo_cluster_provider, addresses +): + tempo_cluster_provider.gather_addresses_by_role = MagicMock(return_value=addresses) + + unit = MagicMock() + unit.get_container = nginx_container + tempo_charm = MagicMock() + tempo_charm.unit = MagicMock(return_value=unit) + + nginx = Nginx(tempo_charm, tempo_cluster_provider, "lolcathost") + + prepared_config = nginx.config() + assert isinstance(prepared_config, str) + + +@pytest.mark.parametrize( + "addresses", + ( + {"all": {"1.2.3.4"}}, + {"all": {"1.2.3.4", "1.2.3.5"}}, + { + "all": {"1.2.3.4"}, + "distributor": {"1.2.3.5"}, + "ingester": {"1.2.3.6"}, + "querier": {"1.2.4.7"}, + "query_frontend": {"1.2.5.1"}, + "compactor": {"1.2.6.6"}, + "metrics_generator": {"1.2.8.4"}, + }, + { + "distributor": {"1.2.3.5"}, + "ingester": {"1.2.3.6"}, + "querier": {"1.2.4.7"}, + "query_frontend": {"1.2.5.1"}, + "compactor": {"1.2.6.6"}, + "metrics_generator": {"1.2.8.4"}, + }, + ), +) +def test_nginx_config_contains_upstreams_and_proxy_pass( + context, nginx_container, tempo_cluster_provider, addresses +): + tempo_cluster_provider.gather_addresses_by_role = MagicMock(return_value=addresses) + + unit = MagicMock() + unit.get_container = nginx_container + tempo_charm = MagicMock() + tempo_charm.unit = MagicMock(return_value=unit) + + nginx = Nginx(tempo_charm, tempo_cluster_provider, "lolcathost") + + prepared_config = nginx.config() + + for role, addresses in addresses.items(): + for address in addresses: + if role == "all": + _assert_config_per_role(Tempo.all_ports, address, prepared_config) + if role == "distributor": + _assert_config_per_role(Tempo.receiver_ports, address, prepared_config) + if role == "query-frontend": + _assert_config_per_role(Tempo.server_ports, address, prepared_config) + + +def _assert_config_per_role(source_dict, address, prepared_config): + # as entire config is in a format that's hard to parse (and crossplane returns a string), we look for servers, + # upstreams and correct proxy/grpc_pass instructions. + for port in source_dict.values(): + assert f"server {address}:{port};" in prepared_config + assert f"listen {port}" in prepared_config + assert f"listen [::]:{port}" in prepared_config + for protocol in source_dict.keys(): + sanitised_protocol = protocol.replace("_", "-") + assert f"upstream {sanitised_protocol}" in prepared_config + if "grpc" in protocol: + assert f"grpc_pass grpcs://{sanitised_protocol}" in prepared_config + else: + assert f"proxy_pass https://{sanitised_protocol}" in prepared_config diff --git a/tests/scenario/test_tempo_clustered.py b/tests/scenario/test_tempo_clustered.py index 49e157b..b7eb75c 100644 --- a/tests/scenario/test_tempo_clustered.py +++ b/tests/scenario/test_tempo_clustered.py @@ -63,10 +63,16 @@ def patch_certs(): @pytest.fixture -def state_with_certs(context, s3, certs_relation): +def state_with_certs( + context, s3, certs_relation, nginx_container, nginx_prometheus_exporter_container +): return context.run( certs_relation.joined_event, - scenario.State(leader=True, relations=[s3, certs_relation]), + scenario.State( + leader=True, + relations=[s3, certs_relation], + containers=[nginx_container, nginx_prometheus_exporter_container], + ), ) @@ -96,7 +102,14 @@ def test_cluster_relation(context, state_with_certs, all_worker): @pytest.mark.parametrize("requested_protocol", ("otlp_grpc", "zipkin")) def test_tempo_restart_on_ingress_v2_changed( - context, tmp_path, requested_protocol, s3, s3_config, all_worker_with_initial_config + context, + tmp_path, + requested_protocol, + s3, + s3_config, + all_worker_with_initial_config, + nginx_container, + nginx_prometheus_exporter_container, ): # GIVEN # the remote end requests an otlp_grpc endpoint @@ -107,7 +120,11 @@ def test_tempo_restart_on_ingress_v2_changed( # WHEN # the charm receives a tracing(v2) relation-changed requesting an otlp_grpc receiver - state = State(leader=True, relations=[tracing, s3, all_worker_with_initial_config]) + state = State( + leader=True, + relations=[tracing, s3, all_worker_with_initial_config], + containers=[nginx_container, nginx_prometheus_exporter_container], + ) state_out = context.run(tracing.changed_event, state) # THEN diff --git a/tests/scenario/test_tls.py b/tests/scenario/test_tls.py index defb543..8021733 100644 --- a/tests/scenario/test_tls.py +++ b/tests/scenario/test_tls.py @@ -10,7 +10,7 @@ @pytest.fixture -def base_state(): +def base_state(nginx_container, nginx_prometheus_exporter_container): return State( leader=True, secrets=[ @@ -23,6 +23,7 @@ def base_state(): contents={0: {"foo": "bar"}}, ) ], + containers=[nginx_container, nginx_prometheus_exporter_container], ) diff --git a/tests/scenario/test_tracing_legacy.py b/tests/scenario/test_tracing_legacy.py index 0f3fe07..619769f 100644 --- a/tests/scenario/test_tracing_legacy.py +++ b/tests/scenario/test_tracing_legacy.py @@ -6,8 +6,11 @@ @pytest.fixture -def base_state(): - return State(leader=True) +def base_state(nginx_container, nginx_prometheus_exporter_container): + return State( + leader=True, + containers=[nginx_container, nginx_prometheus_exporter_container], + ) @pytest.mark.parametrize("evt_name", ("changed", "created", "joined")) diff --git a/tests/scenario/test_tracing_provider.py b/tests/scenario/test_tracing_provider.py index 9aece32..8630bf8 100644 --- a/tests/scenario/test_tracing_provider.py +++ b/tests/scenario/test_tracing_provider.py @@ -3,7 +3,9 @@ from scenario import Relation, State -def test_receivers_removed_on_relation_broken(context, s3, all_worker): +def test_receivers_removed_on_relation_broken( + context, s3, all_worker, nginx_container, nginx_prometheus_exporter_container +): tracing_grpc = Relation( "tracing", remote_app_data={"receivers": '["otlp_grpc"]'}, @@ -24,6 +26,7 @@ def test_receivers_removed_on_relation_broken(context, s3, all_worker): state = State( leader=True, relations=[tracing_grpc, tracing_http, s3, all_worker], + containers=[nginx_container, nginx_prometheus_exporter_container], ) with charm_tracing_disabled(): diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py index bdc9b33..9a74dac 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm.py @@ -2,23 +2,23 @@ # See LICENSE file for licensing details. import unittest -from unittest.mock import patch from ops.testing import Harness from charm import TempoCoordinatorCharm -CONTAINER_NAME = "tempo" +CONTAINER_NAME = "nginx" class TestTempoCoordinatorCharm(unittest.TestCase): - @patch("charm.KubernetesServicePatch", lambda x, y: None) def setUp(self): self.harness = Harness(TempoCoordinatorCharm) self.harness.set_model_name("testmodel") self.addCleanup(self.harness.cleanup) self.harness.set_leader(True) self.harness.begin_with_initial_hooks() + self.harness.add_relation("s3", "s3-integrator") + self.harness.add_relation("tempo-cluster", "tempo-worker-k8s") self.maxDiff = None # we're comparing big traefik configs in tests def test_entrypoints_are_generated_with_sanitized_names(self):