Skip to content

Commit

Permalink
Enable service graph processor (#77)
Browse files Browse the repository at this point in the history
* enable svc graph

* remove dup

* Apply suggestions from code review

Co-authored-by: PietroPasotti <[email protected]>
Signed-off-by: Michael Dmitry <[email protected]>

* PR comments

* remove tmp path

* fix flapping

---------

Signed-off-by: Michael Dmitry <[email protected]>
Co-authored-by: michael <[email protected]>
Co-authored-by: Michael Dmitry <[email protected]>
  • Loading branch information
3 people authored Dec 13, 2024
1 parent 3bd31c9 commit 994d813
Show file tree
Hide file tree
Showing 5 changed files with 428 additions and 32 deletions.
151 changes: 146 additions & 5 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
# See LICENSE file for licensing details.

"""Charmed Operator for Tempo; a lightweight object storage based tracing backend."""
import json
import logging
import re
import socket
from pathlib import Path
from subprocess import CalledProcessError, getoutput
from typing import Dict, List, Optional, Set, Tuple, cast, get_args
from typing import Any, Dict, List, Optional, Set, Tuple, cast, get_args

import ops
from charms.grafana_k8s.v0.grafana_source import GrafanaSourceProvider
Expand All @@ -25,8 +26,8 @@
from charms.traefik_k8s.v0.traefik_route import TraefikRouteRequirer
from cosl.coordinated_workers.coordinator import ClusterRolesConfig, Coordinator
from cosl.coordinated_workers.nginx import CA_CERT_PATH, CERT_PATH, KEY_PATH
from cosl.interfaces.datasource_exchange import DatasourceDict
from cosl.interfaces.utils import DatabagModel
from cosl.interfaces.datasource_exchange import DatasourceDict, DSExchangeAppData
from cosl.interfaces.utils import DatabagModel, DataValidationError
from ops import CollectStatusEvent
from ops.charm import CharmBase

Expand All @@ -36,6 +37,7 @@

logger = logging.getLogger(__name__)
PEERS_RELATION_ENDPOINT_NAME = "peers"
PROMETHEUS_DS_TYPE = "prometheus"


class TempoCoordinator(Coordinator):
Expand Down Expand Up @@ -132,6 +134,7 @@ def __init__(self, *args):
# or when ingress changes
self.ingress.on.ready,
],
extra_fields=self._build_grafana_source_extra_fields(),
)

# peer
Expand Down Expand Up @@ -443,8 +446,8 @@ def get_resources_requests(self, _) -> Dict[str, str]:
return {"cpu": "50m", "memory": "100Mi"}

def remote_write_endpoints(self):
"""Return remote-write endpoints."""
return self._remote_write.endpoints
"""Return a sorted list of remote-write endpoints."""
return sorted(self._remote_write.endpoints, key=lambda x: x["url"])

def _update_source_exchange(self) -> None:
"""Update the grafana-datasource-exchange relations with what we receive from grafana-source."""
Expand Down Expand Up @@ -493,6 +496,10 @@ def _update_source_exchange(self) -> None:
# publish() already sorts the data for us, to prevent databag flapping and ensuing event storms
self.coordinator.datasource_exchange.publish(datasources=raw_datasources)

def _update_grafana_source(self) -> None:
"""Update grafana-source relations."""
self.grafana_source_provider.update_source(source_url=self._external_http_server_url)

def _reconcile(self):
# This method contains unconditional update logic, i.e. logic that should be executed
# regardless of the event we are processing.
Expand All @@ -501,6 +508,140 @@ def _reconcile(self):
self._update_ingress_relation()
self._update_tracing_relations()
self._update_source_exchange()
# reconcile grafana-source databags to update `extra_fields`
# if it gets changed by any other influencing relation.
self._update_grafana_source()

def _get_grafana_source_uids(self) -> Dict[str, Dict[str, str]]:
"""Helper method to retrieve the databags of any grafana-source relations.
Duplicate implementation of GrafanaSourceProvider.get_source_uids() to use in the
situation where we want to access relation data when the GrafanaSourceProvider object
is not yet initialised.
"""
uids = {}
for rel in self.model.relations.get("grafana-source", []):
if not rel:
continue
app_databag = rel.data[rel.app]
grafana_uid = app_databag.get("grafana_uid")
if not grafana_uid:
logger.warning(
"remote end is using an old grafana_datasource interface: "
"`grafana_uid` field not found."
)
continue

uids[grafana_uid] = json.loads(app_databag.get("datasource_uids", "{}"))
return uids

def _build_service_graph_config(self) -> Dict[str, Any]:
"""Build the service graph config based on matching datasource UIDs.
To enable service graphs, we need the datasource UID of any prometheus/mimir instance such that:
1- Tempo is connected to it over "send-remote-write".
2- It is also connected, over `grafana_datasource`, to at least one of the grafana instance(s) that Tempo is connected to.
If there are multiple datasources that fit this description, we can assume that they are all
equivalent and we can use any of them.
"""

dsx_relations = {
relation.app.name: relation
for relation in self.coordinator.datasource_exchange._relations
}

remote_write_apps = {
relation.app.name
for relation in self.model.relations["send-remote-write"]
if relation.app and relation.data
}

# the list of datasource exchange relations whose remote we're also remote writing to.
remote_write_dsx_relations = [
dsx_relations[app_name]
for app_name in set(dsx_relations).intersection(remote_write_apps)
]

# grafana UIDs that are connected to this Tempo.
grafana_uids = set(self._get_grafana_source_uids())

remote_write_dsx_databags = []
for relation in remote_write_dsx_relations:
try:
datasource = DSExchangeAppData.load(relation.data[relation.app])
remote_write_dsx_databags.append(datasource)
except DataValidationError:
# load() already logs
continue

# filter the remote_write_dsx_databags with those that are connected to the same grafana instances Tempo is connected to.
matching_datasources = [
datasource
for databag in remote_write_dsx_databags
for datasource in databag.datasources
if datasource.grafana_uid in grafana_uids and datasource.type == PROMETHEUS_DS_TYPE
]

if not matching_datasources:
# take good care of logging exactly why this happening, as the logic is quite complex and debugging this will be hell
msg = "service graph disabled."
missing_rels = []
if not remote_write_apps:
missing_rels.append("send-remote-write")
if not grafana_uids:
missing_rels.append("grafana-source")
if not dsx_relations:
missing_rels.append("receive-datasource")

if missing_rels:
msg += f" Missing relations: {missing_rels}."

if not remote_write_dsx_relations:
msg += " There are no datasource_exchange relations with a Prometheus/Mimir that we're also remote writing to."
else:
msg += " There are no datasource_exchange relations to a Prometheus/Mimir that are datasources to the same grafana instances Tempo is connected to."

logger.info(msg)
return {}

if len(matching_datasources) > 1:
logger.info(
"there are multiple datasources that could be used to create the service graph. We assume that all are equivalent."
)

# At this point, we can assume any datasource is a valid datasource to use for service graphs.
matching_datasource = matching_datasources[0]
return {
"serviceMap": {
"datasourceUid": matching_datasource.uid,
},
}

def _build_grafana_source_extra_fields(self) -> Dict[str, Any]:
"""Extra fields needed for the grafana-source relation, like data correlation config."""
## https://grafana.com/docs/tempo/latest/metrics-generator/service_graphs/enable-service-graphs/
# "httpMethod": "GET",
# "serviceMap": {
# "datasourceUid": "juju_svcgraph_61e32e2f-50ac-40e7-8ee8-1b7297a3e47f_prometheus_0",
# },
# # https://community.grafana.com/t/how-to-jump-from-traces-to-logs/72477/3
# "tracesToLogs": {
# "datasourceUid": "juju_svcgraph_61e32e2f-50ac-40e7-8ee8-1b7297a3e47f_loki_0"
# },
# "lokiSearch": {
# "datasourceUid": "juju_svcgraph_61e32e2f-50ac-40e7-8ee8-1b7297a3e47f_loki_0"
# },

svc_graph_config = self._build_service_graph_config()

if not svc_graph_config:
return {}

return {
"httpMethod": "GET",
**svc_graph_config,
}


if __name__ == "__main__": # pragma: nocover
Expand Down
15 changes: 12 additions & 3 deletions src/tempo.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,15 @@ def _build_tls_config(self, workers_addrs: Tuple[str, ...]):
}

def _build_overrides_config(self):
# in order to tell tempo to enable the metrics generator, we need to set
# "processors": list of enabled processors in the overrides section
return tempo_config.Overrides(
defaults=tempo_config.Defaults(
metrics_generator=tempo_config.MetricsGeneratorDefaults(
processors=[tempo_config.MetricsGeneratorProcessor.SPAN_METRICS],
processors=[
tempo_config.MetricsGeneratorProcessorLabel.SPAN_METRICS,
tempo_config.MetricsGeneratorProcessorLabel.SERVICE_GRAPHS,
],
)
)
)
Expand Down Expand Up @@ -160,11 +165,15 @@ def _build_metrics_generator_config(
storage=tempo_config.MetricsGeneratorStorage(
path=self.metrics_generator_wal_path,
remote_write=remote_write_instances,
)
),
# Adding juju topology will be done on the worker's side
# to populate the correct unit label.
processor=tempo_config.MetricsGeneratorProcessor(
span_metrics=tempo_config.MetricsGeneratorSpanMetricsProcessor(),
service_graphs=tempo_config.MetricsGeneratorServiceGraphsProcessor(),
),
# per-processor configuration should go in here
)

return config

def _build_server_config(self, use_tls=False):
Expand Down
33 changes: 31 additions & 2 deletions src/tempo_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from pathlib import Path
from typing import Any, Dict, List, Optional

import pydantic
from cosl.coordinated_workers.coordinator import ClusterRolesConfig
from pydantic import BaseModel, ConfigDict, Field

Expand Down Expand Up @@ -101,7 +102,7 @@ class ClientAuthTypeEnum(str, enum.Enum):
REQUIRE_AND_VERIFY_CLIENT_CERT = "RequireAndVerifyClientCert"


class MetricsGeneratorProcessor(str, enum.Enum):
class MetricsGeneratorProcessorLabel(str, enum.Enum):
"""Metrics generator processors supported values.
Supported values: https://grafana.com/docs/tempo/latest/configuration/#standard-overrides
Expand Down Expand Up @@ -301,6 +302,29 @@ class RemoteWrite(BaseModel):
tls_config: Optional[RemoteWriteTLS] = None


class MetricsGeneratorSpanMetricsProcessor(BaseModel):
"""Metrics Generator span_metrics processor configuration schema."""

# see https://grafana.com/docs/tempo/v2.6.x/configuration/#metrics-generator
# for a full list of config options


class MetricsGeneratorServiceGraphsProcessor(BaseModel):
"""Metrics Generator service_graphs processor configuration schema."""

# see https://grafana.com/docs/tempo/v2.6.x/configuration/#metrics-generator
# for a full list of config options


class MetricsGeneratorProcessor(BaseModel):
"""Metrics Generator processor schema."""

span_metrics: MetricsGeneratorSpanMetricsProcessor
service_graphs: MetricsGeneratorServiceGraphsProcessor
# see https://grafana.com/docs/tempo/v2.6.x/configuration/#metrics-generator
# for a full list of config options; could add local_blocks here


class MetricsGeneratorStorage(BaseModel):
"""Metrics Generator storage schema."""

Expand All @@ -314,6 +338,9 @@ class MetricsGenerator(BaseModel):
ring: Optional[Ring] = None
storage: MetricsGeneratorStorage

# processor-specific config depends on the processor type
processor: MetricsGeneratorProcessor


class MetricsGeneratorDefaults(BaseModel):
"""Metrics generator defaults schema."""
Expand All @@ -323,7 +350,9 @@ class MetricsGeneratorDefaults(BaseModel):
use_enum_values=True
)
"""Pydantic config."""
processors: List[MetricsGeneratorProcessor]
processors: Optional[List[MetricsGeneratorProcessorLabel]] = pydantic.Field(
default_factory=list
)


class Defaults(BaseModel):
Expand Down
2 changes: 1 addition & 1 deletion tests/scenario/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def patch_buffer_file_for_charm_tracing(tmp_path):


@pytest.fixture(autouse=True, scope="session")
def cleanup_prometheus_alert_rules(tmp_path):
def cleanup_prometheus_alert_rules():
# some tests trigger the charm to generate prometheus alert rules file in ./src; clean it up
yield
src_path = Path(__file__).parent / "src"
Expand Down
Loading

0 comments on commit 994d813

Please sign in to comment.