diff --git a/INTEGRATING.md b/INTEGRATING.md index fe14c485..d36b67ba 100644 --- a/INTEGRATING.md +++ b/INTEGRATING.md @@ -38,3 +38,42 @@ configuration is generated. 4. [Traefik Charmed Operator](https://charmhub.io/traefik-k8s), over the `ingress_per_unit` interface, so Prometheus may be reached from outside the Kubernetes cluster it is running on. 5. [Catalogue Charmed Operator](https://charmhub.io/catalogue-k8s), over the `catalogue` interface, so Prometheus can be published in the service catalogue web page. + +## Deployment scenarios + +### Tier prometheus deployments (local-remote on-call teams) + +```mermaid +graph LR +subgraph COS1 + am1[Alertmanager] + prom1[Prometheus] + am1 --- prom1 +end +subgraph COS1.2 + am1.2[Alertmanager] + prom1.2[Prometheus] + am1.2 --- prom1.2 +end +subgraph COS2 + am2[Alertmanager] + prom2[Prometheus] + am2 --- prom2 +end +subgraph COS[Main COS] + am[Alertmanager] + prom[Prometheus] + prom --- am +end +pd1[PagerDuty] --- am1 +pd2[PagerDuty] --- am2 +am --- pd[PagerDuty] +prom1 --- |remote write, rules| am1.2 +prom1.2 --- |remote write, rules| prom +prom2 --- |remote write, rules| prom +``` + +Main COS will have a combination of rules COS1+COS2+Main COS to track the health of the entire system. +Prometheus in COS1 scrapes an application (PagerDuty), sends metrics and alert rules to Prometheus in COS1.2, +which sends metrics and alert rules to Prometheus in Main COS. As result Prometheus in Main COS will have alert rules +defined in the application. diff --git a/lib/charms/prometheus_k8s/v0/prometheus_remote_write.py b/lib/charms/prometheus_k8s/v0/prometheus_remote_write.py index 074b3289..374347cd 100644 --- a/lib/charms/prometheus_k8s/v0/prometheus_remote_write.py +++ b/lib/charms/prometheus_k8s/v0/prometheus_remote_write.py @@ -19,7 +19,7 @@ import subprocess import tempfile from pathlib import Path -from typing import Dict, List, Optional, Tuple, Union +from typing import Callable, Dict, List, Optional, Tuple, Union import yaml from charms.observability_libs.v0.juju_topology import JujuTopology @@ -42,7 +42,7 @@ # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 8 +LIBPATCH = 9 logger = logging.getLogger(__name__) @@ -595,6 +595,8 @@ def __init__( charm: CharmBase, relation_name: str = DEFAULT_CONSUMER_NAME, alert_rules_path: str = DEFAULT_ALERT_RULES_RELATIVE_PATH, + *, + extra_alerts_callables: List[Optional[Callable[[], dict]]] = [], ): """API to manage a required relation with the `prometheus_remote_write` interface. @@ -630,6 +632,7 @@ def __init__( self._charm = charm self._relation_name = relation_name self._alert_rules_path = alert_rules_path + self.extra_alerts_callables = extra_alerts_callables self.topology = JujuTopology.from_charm(charm) @@ -681,9 +684,39 @@ def _push_alerts_to_relation_databag(self, relation: Relation) -> None: alert_rules_as_dict = alert_rules.as_dict() + alert_rules_as_dict = self._parse_and_add_extra_alerts(alert_rules_as_dict) + if alert_rules_as_dict: relation.data[self._charm.app]["alert_rules"] = json.dumps(alert_rules_as_dict) + def _parse_and_add_extra_alerts(self, alert_rules_as_dict: dict) -> dict: + """Parse alerts from `self.extra_alerts_callable`. + + Call `self.extra_alerts_callable`, parse alert groups and + extend (mutate) `alert_rules_as_dict` with new alerts. + + Returns: + (dict) The original `alert_rules_as_dict` with the return value of + `PrometheusRemoteWriteConsumer.extra_alerts_callable` merged into the dict. + """ + for extra_func in self.extra_alerts_callables: + if callable(extra_func) and alert_rules_as_dict: + extra_alerts_list = [] + extra_alerts = extra_func() + if extra_alerts: + for alert_rule_groups in extra_alerts.values(): + extra_alerts_list.extend(alert_rule_groups.get("groups", [])) + + alert_rules_as_dict["groups"] = ( + alert_rules_as_dict.get("groups", []) + extra_alerts_list + ) + + logger.debug( + "%s extra rules were pushed to remote write endpoint.", + len(extra_alerts_list), + ) + return alert_rules_as_dict + def reload_alerts(self) -> None: """Reload alert rules from disk and push to relation data.""" self._push_alerts_to_all_relation_databags(None) diff --git a/metadata.yaml b/metadata.yaml index c67f58f1..95bef526 100644 --- a/metadata.yaml +++ b/metadata.yaml @@ -49,6 +49,8 @@ requires: limit: 1 catalogue: interface: catalogue + send-remote-write: + interface: prometheus_remote_write peers: prometheus-peers: diff --git a/src/charm.py b/src/charm.py index 5143788b..1df64dd6 100755 --- a/src/charm.py +++ b/src/charm.py @@ -8,7 +8,7 @@ import os import re import socket -from typing import Dict, Optional, cast +from typing import Dict, List, Optional, cast from urllib.parse import urlparse import yaml @@ -27,6 +27,7 @@ DEFAULT_RELATION_NAME as DEFAULT_REMOTE_WRITE_RELATION_NAME, ) from charms.prometheus_k8s.v0.prometheus_remote_write import ( + PrometheusRemoteWriteConsumer, PrometheusRemoteWriteProvider, ) from charms.prometheus_k8s.v0.prometheus_scrape import ( @@ -42,7 +43,7 @@ from lightkube import Client from lightkube.core.exceptions import ApiError as LightkubeApiError from lightkube.resources.core_v1 import PersistentVolumeClaim, Pod -from ops.charm import ActionEvent, CharmBase +from ops.charm import ActionEvent, CharmBase, RelationChangedEvent from ops.framework import StoredState from ops.main import main from ops.model import ( @@ -131,6 +132,14 @@ def __init__(self, *args): endpoint_path=f"{external_url.path}/api/v1/write", ) + self._remote_write_consumer = PrometheusRemoteWriteConsumer( + self, + extra_alerts_callables=[ + self.metrics_consumer.alerts, + self.remote_write_provider.alerts, + ], + ) + self.grafana_source_provider = GrafanaSourceProvider( charm=self, source_type="prometheus", @@ -160,17 +169,36 @@ def __init__(self, *args): self.framework.observe(self.on.prometheus_pebble_ready, self._on_pebble_ready) self.framework.observe(self.on.config_changed, self._configure) self.framework.observe(self.on.upgrade_charm, self._configure) + self.framework.observe(self.on.upgrade_charm, self._push_alerts_to_remote_write) self.framework.observe(self.on.update_status, self._update_status) self.framework.observe(self.ingress.on.ready_for_unit, self._on_ingress_ready) self.framework.observe(self.ingress.on.revoked_for_unit, self._on_ingress_revoked) self.framework.observe(self.on.receive_remote_write_relation_created, self._configure) self.framework.observe(self.on.receive_remote_write_relation_changed, self._configure) + self.framework.observe( + self.on.receive_remote_write_relation_changed, self._push_alerts_to_remote_write + ) self.framework.observe(self.on.receive_remote_write_relation_broken, self._configure) + self.framework.observe(self.on.send_remote_write_relation_broken, self._configure) + self.framework.observe( + self._remote_write_consumer.on.endpoints_changed, + self._on_remote_write_endpoints_changed, + ) + self.framework.observe( + self._remote_write_consumer.on.endpoints_changed, self._push_alerts_to_remote_write + ) + self.framework.observe( + self.metrics_consumer.on.targets_changed, self._push_alerts_to_remote_write + ) self.framework.observe(self.metrics_consumer.on.targets_changed, self._configure) self.framework.observe(self.alertmanager_consumer.on.cluster_changed, self._configure) self.framework.observe(self.resources_patch.on.patch_failed, self._on_k8s_patch_failed) self.framework.observe(self.on.validate_configuration_action, self._on_validate_config) + def _push_alerts_to_remote_write(self, event) -> None: + """Reload alerts on remote write consumer and push it to re-read alerts from callables.""" + self._remote_write_consumer.reload_alerts() + @property def metrics_path(self): """The metrics path, adjusted by ingress path (if any).""" @@ -302,6 +330,11 @@ def _on_ingress_revoked(self, event: IngressPerUnitRevokedForUnitEvent): def _on_k8s_patch_failed(self, event: K8sResourcePatchFailedEvent): self.unit.status = BlockedStatus(event.message) + def _on_remote_write_endpoints_changed(self, event: RelationChangedEvent) -> None: + """Event handler for the remote write endpoint changed event.""" + logger.debug("Remote write endpoints were changed") + self._configure(event) + def _configure(self, _): """Reconfigure and either reload or restart Prometheus. @@ -708,6 +741,23 @@ def _alerting_config(self) -> dict: ) return alerting_config + def _remote_write_config(self) -> List[Dict[str, str]]: + """Construct Prometheus endpoints configuration for remote write. + + See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write + + Returns: + list of dicts consisting of the remote write endpoints configuration for Prometheus. + """ + endpoints = self._remote_write_consumer.endpoints + + if not endpoints: + logger.debug("No remote write endpoints available") + else: + logger.debug("Remote write endpoints are: %s", endpoints) + + return endpoints + def _generate_prometheus_config(self, container) -> bool: """Construct Prometheus configuration and write to filesystem. @@ -723,6 +773,9 @@ def _generate_prometheus_config(self, container) -> bool: if alerting_config: prometheus_config["alerting"] = alerting_config + if remote_write_config := self._remote_write_config(): + prometheus_config["remote_write"] = remote_write_config + prometheus_config["scrape_configs"].append(self._default_config) # type: ignore certs = {} scrape_jobs = self.metrics_consumer.jobs() diff --git a/tests/integration/test_receive_remote_write.py b/tests/integration/test_receive_remote_write.py new file mode 100644 index 00000000..3e0ea7fe --- /dev/null +++ b/tests/integration/test_receive_remote_write.py @@ -0,0 +1,110 @@ +#!/usr/bin/env python3 +# Copyright 2021 Canonical Ltd. +# See LICENSE file for licensing details. + +import asyncio +import logging + +import pytest +from helpers import ( + check_prometheus_is_ready, + get_prometheus_rules, + has_metric, + oci_image, +) +from pytest_operator.plugin import OpsTest + +logger = logging.getLogger(__name__) + +avalanche = "avalanche" +# prometheus that will consume from the app and write to the remote API +prom_send = "prometheus-sender" +prom_receiver_sender = "prometheus-receiver-sender" +prom_receive = "prometheus-receiver" # prometheus that provides `/api/v1/write` API endpoint +local_apps = [avalanche, prom_receive, prom_send, prom_receiver_sender] + + +@pytest.mark.abort_on_fail +async def test_receive_remote_write(ops_test: OpsTest, prometheus_charm): + """Test chaining via `receive-remote-write` relation. + + When two Prometheuses are related to one another via `receive-remote-write`, + then all the alerts from the 1st prometheus should be forwarded to the second. + + Prometheus (prometheus-receiver) that provides `receive-remote-write` relation + provides `/api/v1/write` API endpoint that will be consumed by Prometheus (prometheus-sender) + that requires `send-remote-write` relation. Later, `prometheus-sender` will write all the data + it receives from applications to the provided API endpoint of `prometheus-receiver`. + + """ + await asyncio.gather( + ops_test.model.deploy( + prometheus_charm, + resources={"prometheus-image": oci_image("./metadata.yaml", "prometheus-image")}, + application_name=prom_send, + trust=True, # otherwise errors on ghwf (persistentvolumeclaims ... is forbidden) + series="focal", + ), + ops_test.model.deploy( + prometheus_charm, + resources={"prometheus-image": oci_image("./metadata.yaml", "prometheus-image")}, + application_name=prom_receive, + trust=True, # otherwise errors on ghwf (persistentvolumeclaims ... is forbidden) + series="focal", + ), + ops_test.model.deploy( + prometheus_charm, + resources={"prometheus-image": oci_image("./metadata.yaml", "prometheus-image")}, + application_name=prom_receiver_sender, + trust=True, # otherwise errors on ghwf (persistentvolumeclaims ... is forbidden) + series="focal", + ), + ops_test.model.deploy( + "avalanche-k8s", channel="edge", application_name=avalanche, series="focal" + ), + ) + + await ops_test.model.wait_for_idle(status="active", wait_for_units=1, raise_on_error=False) + assert await check_prometheus_is_ready(ops_test, prom_send, 0) + assert await check_prometheus_is_ready(ops_test, prom_receive, 0) + assert await check_prometheus_is_ready(ops_test, prom_receiver_sender, 0) + + await asyncio.gather( + ops_test.model.add_relation( + f"{avalanche}:metrics-endpoint", f"{prom_send}:metrics-endpoint" + ), + ops_test.model.add_relation( + f"{prom_send}:send-remote-write", f"{prom_receiver_sender}:receive-remote-write" + ), + ops_test.model.add_relation( + f"{prom_receiver_sender}:send-remote-write", f"{prom_receive}:receive-remote-write" + ), + ) + + await ops_test.model.wait_for_idle(apps=local_apps, status="active", idle_period=90) + + # check that both Prometheus have avalanche metrics and both fire avalanche alert + for app in [prom_send, prom_receiver_sender, prom_receive]: + assert await has_metric( + ops_test, + f'up{{juju_model="{ops_test.model_name}",juju_application="{avalanche}"}}', + app, + ) + + # Note: the following depends on an avalnche alert coming from the avalanche charm + # https://github.com/canonical/avalanche-k8s-operator/blob/main/src/prometheus_alert_rules + prom_rules_list = await get_prometheus_rules(ops_test, app, 0) + for rules_dict in prom_rules_list: + if rules_list := rules_dict.get("rules", []): + for rule in rules_list: + if rule["name"] == "AlwaysFiringDueToNumericValue": + assert rule["state"] == "firing" + break + else: + # "AlwaysFiringDueToNumericValue" was not found, go to next rules_dict + continue + break + else: + raise AssertionError( + f"The 'AlwaysFiringDueToNumericValue' avalanche alert was not found in prometheus '{app}'" + ) diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py index aaa6e55c..0c714645 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm.py @@ -10,6 +10,7 @@ import ops import yaml +from charms.prometheus_k8s.v0.prometheus_remote_write import DEFAULT_CONSUMER_NAME from helpers import cli_arg, k8s_resource_multipatch, prom_multipatch from ops.model import ActiveStatus, BlockedStatus, MaintenanceStatus from ops.testing import Harness @@ -232,6 +233,28 @@ def test_honor_labels_is_always_set_in_scrape_configs(self, *unused): self.assertIn("honor_labels", job) self.assertTrue(job["honor_labels"]) + @k8s_resource_multipatch + def test_send_remote_write_endpoints(self, *unused): + rel_id = self.harness.add_relation(DEFAULT_CONSUMER_NAME, "prometheus-receiver") + unit_name = "prometheus-receiver/0" + self.harness.add_relation_unit(rel_id, unit_name) + + self.harness.update_relation_data( + rel_id, + unit_name, + {"remote_write": json.dumps({"url": "http://1.1.1.1:9090/api/v1/write"})}, + ) + + container = self.harness.charm.unit.get_container(self.harness.charm._name) + config = container.pull(PROMETHEUS_CONFIG) + prometheus_scrape_config = yaml.safe_load(config) + + self.assertIn("remote_write", prometheus_scrape_config) + self.assertEqual( + prometheus_scrape_config["remote_write"], + [{"url": "http://1.1.1.1:9090/api/v1/write"}], + ) + @k8s_resource_multipatch @patch("lightkube.core.client.GenericSyncClient") @patch("prometheus_server.Prometheus.reload_configuration")