Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

send-remote-write feature #411

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions INTEGRATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,42 @@ configuration is generated.
4. [Traefik Charmed Operator](https://charmhub.io/traefik-k8s), over the `ingress_per_unit` interface, so Prometheus may be reached from outside the Kubernetes cluster it is running on.

5. [Catalogue Charmed Operator](https://charmhub.io/catalogue-k8s), over the `catalogue` interface, so Prometheus can be published in the service catalogue web page.

## Deployment scenarios

### Tier prometheus deployments (local-remote on-call teams)

```mermaid
graph LR
subgraph COS1
am1[Alertmanager]
prom1[Prometheus]
am1 --- prom1
end
subgraph COS1.2
am1.2[Alertmanager]
prom1.2[Prometheus]
am1.2 --- prom1.2
end
subgraph COS2
am2[Alertmanager]
prom2[Prometheus]
am2 --- prom2
end
subgraph COS[Main COS]
am[Alertmanager]
prom[Prometheus]
prom --- am
end
pd1[PagerDuty] --- am1
pd2[PagerDuty] --- am2
am --- pd[PagerDuty]
prom1 --- |remote write, rules| am1.2
prom1.2 --- |remote write, rules| prom
prom2 --- |remote write, rules| prom
```

Main COS will have a combination of rules COS1+COS2+Main COS to track the health of the entire system.
Prometheus in COS1 scrapes an application (PagerDuty), sends metrics and alert rules to Prometheus in COS1.2,
which sends metrics and alert rules to Prometheus in Main COS. As result Prometheus in Main COS will have alert rules
defined in the application.
37 changes: 35 additions & 2 deletions lib/charms/prometheus_k8s/v0/prometheus_remote_write.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import subprocess
import tempfile
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Union
from typing import Callable, Dict, List, Optional, Tuple, Union

import yaml
from charms.observability_libs.v0.juju_topology import JujuTopology
Expand All @@ -42,7 +42,7 @@

# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version
LIBPATCH = 8
LIBPATCH = 9


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -595,6 +595,8 @@ def __init__(
charm: CharmBase,
relation_name: str = DEFAULT_CONSUMER_NAME,
alert_rules_path: str = DEFAULT_ALERT_RULES_RELATIVE_PATH,
*,
extra_alerts_callables: List[Optional[Callable[[], dict]]] = [],
):
"""API to manage a required relation with the `prometheus_remote_write` interface.

Expand Down Expand Up @@ -630,6 +632,7 @@ def __init__(
self._charm = charm
self._relation_name = relation_name
self._alert_rules_path = alert_rules_path
self.extra_alerts_callables = extra_alerts_callables

self.topology = JujuTopology.from_charm(charm)

Expand Down Expand Up @@ -681,9 +684,39 @@ def _push_alerts_to_relation_databag(self, relation: Relation) -> None:

alert_rules_as_dict = alert_rules.as_dict()

alert_rules_as_dict = self._parse_and_add_extra_alerts(alert_rules_as_dict)

if alert_rules_as_dict:
relation.data[self._charm.app]["alert_rules"] = json.dumps(alert_rules_as_dict)

def _parse_and_add_extra_alerts(self, alert_rules_as_dict: dict) -> dict:
"""Parse alerts from `self.extra_alerts_callable`.

Call `self.extra_alerts_callable`, parse alert groups and
extend (mutate) `alert_rules_as_dict` with new alerts.

Returns:
(dict) The original `alert_rules_as_dict` with the return value of
`PrometheusRemoteWriteConsumer.extra_alerts_callable` merged into the dict.
"""
for extra_func in self.extra_alerts_callables:
if callable(extra_func) and alert_rules_as_dict:
extra_alerts_list = []
extra_alerts = extra_func()
if extra_alerts:
for alert_rule_groups in extra_alerts.values():
extra_alerts_list.extend(alert_rule_groups.get("groups", []))

alert_rules_as_dict["groups"] = (
alert_rules_as_dict.get("groups", []) + extra_alerts_list
)

logger.debug(
"%s extra rules were pushed to remote write endpoint.",
len(extra_alerts_list),
)
return alert_rules_as_dict

def reload_alerts(self) -> None:
"""Reload alert rules from disk and push to relation data."""
self._push_alerts_to_all_relation_databags(None)
Expand Down
2 changes: 2 additions & 0 deletions metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ requires:
limit: 1
catalogue:
interface: catalogue
send-remote-write:
interface: prometheus_remote_write

peers:
prometheus-peers:
Expand Down
57 changes: 55 additions & 2 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import os
import re
import socket
from typing import Dict, Optional, cast
from typing import Dict, List, Optional, cast
from urllib.parse import urlparse

import yaml
Expand All @@ -27,6 +27,7 @@
DEFAULT_RELATION_NAME as DEFAULT_REMOTE_WRITE_RELATION_NAME,
)
from charms.prometheus_k8s.v0.prometheus_remote_write import (
PrometheusRemoteWriteConsumer,
PrometheusRemoteWriteProvider,
)
from charms.prometheus_k8s.v0.prometheus_scrape import (
Expand All @@ -42,7 +43,7 @@
from lightkube import Client
from lightkube.core.exceptions import ApiError as LightkubeApiError
from lightkube.resources.core_v1 import PersistentVolumeClaim, Pod
from ops.charm import ActionEvent, CharmBase
from ops.charm import ActionEvent, CharmBase, RelationChangedEvent
from ops.framework import StoredState
from ops.main import main
from ops.model import (
Expand Down Expand Up @@ -131,6 +132,14 @@ def __init__(self, *args):
endpoint_path=f"{external_url.path}/api/v1/write",
)

self._remote_write_consumer = PrometheusRemoteWriteConsumer(
self,
extra_alerts_callables=[
self.metrics_consumer.alerts,
self.remote_write_provider.alerts,
],
)

self.grafana_source_provider = GrafanaSourceProvider(
charm=self,
source_type="prometheus",
Expand Down Expand Up @@ -160,17 +169,36 @@ def __init__(self, *args):
self.framework.observe(self.on.prometheus_pebble_ready, self._on_pebble_ready)
self.framework.observe(self.on.config_changed, self._configure)
self.framework.observe(self.on.upgrade_charm, self._configure)
self.framework.observe(self.on.upgrade_charm, self._push_alerts_to_remote_write)
self.framework.observe(self.on.update_status, self._update_status)
self.framework.observe(self.ingress.on.ready_for_unit, self._on_ingress_ready)
self.framework.observe(self.ingress.on.revoked_for_unit, self._on_ingress_revoked)
self.framework.observe(self.on.receive_remote_write_relation_created, self._configure)
self.framework.observe(self.on.receive_remote_write_relation_changed, self._configure)
self.framework.observe(
self.on.receive_remote_write_relation_changed, self._push_alerts_to_remote_write
)
self.framework.observe(self.on.receive_remote_write_relation_broken, self._configure)
self.framework.observe(self.on.send_remote_write_relation_broken, self._configure)
self.framework.observe(
self._remote_write_consumer.on.endpoints_changed,
self._on_remote_write_endpoints_changed,
)
self.framework.observe(
self._remote_write_consumer.on.endpoints_changed, self._push_alerts_to_remote_write
)
self.framework.observe(
self.metrics_consumer.on.targets_changed, self._push_alerts_to_remote_write
)
self.framework.observe(self.metrics_consumer.on.targets_changed, self._configure)
self.framework.observe(self.alertmanager_consumer.on.cluster_changed, self._configure)
self.framework.observe(self.resources_patch.on.patch_failed, self._on_k8s_patch_failed)
self.framework.observe(self.on.validate_configuration_action, self._on_validate_config)

def _push_alerts_to_remote_write(self, event) -> None:
"""Reload alerts on remote write consumer and push it to re-read alerts from callables."""
self._remote_write_consumer.reload_alerts()

@property
def metrics_path(self):
"""The metrics path, adjusted by ingress path (if any)."""
Expand Down Expand Up @@ -302,6 +330,11 @@ def _on_ingress_revoked(self, event: IngressPerUnitRevokedForUnitEvent):
def _on_k8s_patch_failed(self, event: K8sResourcePatchFailedEvent):
self.unit.status = BlockedStatus(event.message)

def _on_remote_write_endpoints_changed(self, event: RelationChangedEvent) -> None:
"""Event handler for the remote write endpoint changed event."""
logger.debug("Remote write endpoints were changed")
self._configure(event)

def _configure(self, _):
"""Reconfigure and either reload or restart Prometheus.

Expand Down Expand Up @@ -708,6 +741,23 @@ def _alerting_config(self) -> dict:
)
return alerting_config

def _remote_write_config(self) -> List[Dict[str, str]]:
"""Construct Prometheus endpoints configuration for remote write.

See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write

Returns:
list of dicts consisting of the remote write endpoints configuration for Prometheus.
"""
endpoints = self._remote_write_consumer.endpoints

if not endpoints:
logger.debug("No remote write endpoints available")
else:
logger.debug("Remote write endpoints are: %s", endpoints)

return endpoints

def _generate_prometheus_config(self, container) -> bool:
"""Construct Prometheus configuration and write to filesystem.

Expand All @@ -723,6 +773,9 @@ def _generate_prometheus_config(self, container) -> bool:
if alerting_config:
prometheus_config["alerting"] = alerting_config

if remote_write_config := self._remote_write_config():
prometheus_config["remote_write"] = remote_write_config

prometheus_config["scrape_configs"].append(self._default_config) # type: ignore
certs = {}
scrape_jobs = self.metrics_consumer.jobs()
Expand Down
110 changes: 110 additions & 0 deletions tests/integration/test_receive_remote_write.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
#!/usr/bin/env python3
# Copyright 2021 Canonical Ltd.
# See LICENSE file for licensing details.

import asyncio
import logging

import pytest
from helpers import (
check_prometheus_is_ready,
get_prometheus_rules,
has_metric,
oci_image,
)
from pytest_operator.plugin import OpsTest

logger = logging.getLogger(__name__)

avalanche = "avalanche"
# prometheus that will consume from the app and write to the remote API
prom_send = "prometheus-sender"
prom_receiver_sender = "prometheus-receiver-sender"
prom_receive = "prometheus-receiver" # prometheus that provides `/api/v1/write` API endpoint
local_apps = [avalanche, prom_receive, prom_send, prom_receiver_sender]


@pytest.mark.abort_on_fail
async def test_receive_remote_write(ops_test: OpsTest, prometheus_charm):
"""Test chaining via `receive-remote-write` relation.

When two Prometheuses are related to one another via `receive-remote-write`,
then all the alerts from the 1st prometheus should be forwarded to the second.

Prometheus (prometheus-receiver) that provides `receive-remote-write` relation
provides `/api/v1/write` API endpoint that will be consumed by Prometheus (prometheus-sender)
that requires `send-remote-write` relation. Later, `prometheus-sender` will write all the data
it receives from applications to the provided API endpoint of `prometheus-receiver`.

"""
await asyncio.gather(
ops_test.model.deploy(
prometheus_charm,
resources={"prometheus-image": oci_image("./metadata.yaml", "prometheus-image")},
application_name=prom_send,
trust=True, # otherwise errors on ghwf (persistentvolumeclaims ... is forbidden)
series="focal",
),
ops_test.model.deploy(
prometheus_charm,
resources={"prometheus-image": oci_image("./metadata.yaml", "prometheus-image")},
application_name=prom_receive,
trust=True, # otherwise errors on ghwf (persistentvolumeclaims ... is forbidden)
series="focal",
),
ops_test.model.deploy(
prometheus_charm,
resources={"prometheus-image": oci_image("./metadata.yaml", "prometheus-image")},
application_name=prom_receiver_sender,
trust=True, # otherwise errors on ghwf (persistentvolumeclaims ... is forbidden)
series="focal",
),
ops_test.model.deploy(
"avalanche-k8s", channel="edge", application_name=avalanche, series="focal"
),
)

await ops_test.model.wait_for_idle(status="active", wait_for_units=1, raise_on_error=False)
assert await check_prometheus_is_ready(ops_test, prom_send, 0)
assert await check_prometheus_is_ready(ops_test, prom_receive, 0)
assert await check_prometheus_is_ready(ops_test, prom_receiver_sender, 0)

await asyncio.gather(
ops_test.model.add_relation(
f"{avalanche}:metrics-endpoint", f"{prom_send}:metrics-endpoint"
),
ops_test.model.add_relation(
f"{prom_send}:send-remote-write", f"{prom_receiver_sender}:receive-remote-write"
),
ops_test.model.add_relation(
f"{prom_receiver_sender}:send-remote-write", f"{prom_receive}:receive-remote-write"
),
)

await ops_test.model.wait_for_idle(apps=local_apps, status="active", idle_period=90)

# check that both Prometheus have avalanche metrics and both fire avalanche alert
for app in [prom_send, prom_receiver_sender, prom_receive]:
assert await has_metric(
ops_test,
f'up{{juju_model="{ops_test.model_name}",juju_application="{avalanche}"}}',
app,
)

# Note: the following depends on an avalnche alert coming from the avalanche charm
# https://github.com/canonical/avalanche-k8s-operator/blob/main/src/prometheus_alert_rules
prom_rules_list = await get_prometheus_rules(ops_test, app, 0)
for rules_dict in prom_rules_list:
if rules_list := rules_dict.get("rules", []):
for rule in rules_list:
if rule["name"] == "AlwaysFiringDueToNumericValue":
assert rule["state"] == "firing"
break
else:
# "AlwaysFiringDueToNumericValue" was not found, go to next rules_dict
continue
break
else:
raise AssertionError(
f"The 'AlwaysFiringDueToNumericValue' avalanche alert was not found in prometheus '{app}'"
)
23 changes: 23 additions & 0 deletions tests/unit/test_charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import ops
import yaml
from charms.prometheus_k8s.v0.prometheus_remote_write import DEFAULT_CONSUMER_NAME
from helpers import cli_arg, k8s_resource_multipatch, prom_multipatch
from ops.model import ActiveStatus, BlockedStatus, MaintenanceStatus
from ops.testing import Harness
Expand Down Expand Up @@ -232,6 +233,28 @@ def test_honor_labels_is_always_set_in_scrape_configs(self, *unused):
self.assertIn("honor_labels", job)
self.assertTrue(job["honor_labels"])

@k8s_resource_multipatch
def test_send_remote_write_endpoints(self, *unused):
rel_id = self.harness.add_relation(DEFAULT_CONSUMER_NAME, "prometheus-receiver")
unit_name = "prometheus-receiver/0"
self.harness.add_relation_unit(rel_id, unit_name)

self.harness.update_relation_data(
rel_id,
unit_name,
{"remote_write": json.dumps({"url": "http://1.1.1.1:9090/api/v1/write"})},
)

container = self.harness.charm.unit.get_container(self.harness.charm._name)
config = container.pull(PROMETHEUS_CONFIG)
prometheus_scrape_config = yaml.safe_load(config)

self.assertIn("remote_write", prometheus_scrape_config)
self.assertEqual(
prometheus_scrape_config["remote_write"],
[{"url": "http://1.1.1.1:9090/api/v1/write"}],
)

@k8s_resource_multipatch
@patch("lightkube.core.client.GenericSyncClient")
@patch("prometheus_server.Prometheus.reload_configuration")
Expand Down