Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable service graph processor #77

Merged
merged 7 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 136 additions & 3 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
# See LICENSE file for licensing details.

"""Charmed Operator for Tempo; a lightweight object storage based tracing backend."""
import json
import logging
import re
import socket
from pathlib import Path
from subprocess import CalledProcessError, getoutput
from typing import Dict, List, Optional, Set, Tuple, cast, get_args
from typing import Any, Dict, List, Optional, Set, Tuple, cast, get_args

import ops
from charms.grafana_k8s.v0.grafana_source import GrafanaSourceProvider
Expand All @@ -25,8 +26,8 @@
from charms.traefik_k8s.v0.traefik_route import TraefikRouteRequirer
from cosl.coordinated_workers.coordinator import ClusterRolesConfig, Coordinator
from cosl.coordinated_workers.nginx import CA_CERT_PATH, CERT_PATH, KEY_PATH
from cosl.interfaces.datasource_exchange import DatasourceDict
from cosl.interfaces.utils import DatabagModel
from cosl.interfaces.datasource_exchange import DatasourceDict, DSExchangeAppData
from cosl.interfaces.utils import DatabagModel, DataValidationError
from ops import CollectStatusEvent
from ops.charm import CharmBase

Expand All @@ -36,6 +37,7 @@

logger = logging.getLogger(__name__)
PEERS_RELATION_ENDPOINT_NAME = "peers"
PROMETHEUS_DS_TYPE = "prometheus"


class TempoCoordinator(Coordinator):
Expand Down Expand Up @@ -132,6 +134,7 @@ def __init__(self, *args):
# or when ingress changes
self.ingress.on.ready,
],
extra_fields=self._grafana_source_extra_fields,
)

# peer
Expand All @@ -153,6 +156,25 @@ def __init__(self, *args):
######################
# UTILITY PROPERTIES #
######################
@property
def _grafana_source_extra_fields(self) -> Dict[str, Any]:
michaeldmitry marked this conversation as resolved.
Show resolved Hide resolved
"""Extra fields needed for the grafana-source relation, like data correlation config."""
## https://grafana.com/docs/tempo/latest/metrics-generator/service_graphs/enable-service-graphs/
# "httpMethod": "GET",
# "serviceMap": {
# "datasourceUid": "juju_svcgraph_61e32e2f-50ac-40e7-8ee8-1b7297a3e47f_prometheus_0",
# },
# # https://community.grafana.com/t/how-to-jump-from-traces-to-logs/72477/3
# "tracesToLogs": {
# "datasourceUid": "juju_svcgraph_61e32e2f-50ac-40e7-8ee8-1b7297a3e47f_loki_0"
# },
# "lokiSearch": {
# "datasourceUid": "juju_svcgraph_61e32e2f-50ac-40e7-8ee8-1b7297a3e47f_loki_0"
# },

service_graph_config = self._build_service_graph_config()
return service_graph_config

@property
def peers(self):
"""Fetch the "peers" peer relation."""
Expand Down Expand Up @@ -493,6 +515,10 @@ def _update_source_exchange(self) -> None:
# publish() already sorts the data for us, to prevent databag flapping and ensuing event storms
self.coordinator.datasource_exchange.publish(datasources=raw_datasources)

def _update_grafana_source(self) -> None:
"""Update grafana-source relations."""
self.grafana_source_provider.update_source(source_url=self._external_http_server_url)

def _reconcile(self):
# This method contains unconditional update logic, i.e. logic that should be executed
# regardless of the event we are processing.
Expand All @@ -501,6 +527,113 @@ def _reconcile(self):
self._update_ingress_relation()
self._update_tracing_relations()
self._update_source_exchange()
self._update_grafana_source()
michaeldmitry marked this conversation as resolved.
Show resolved Hide resolved

def _get_grafana_source_uids(self) -> Dict[str, Dict[str, str]]:
"""Helper method to retrieve grafana source UIDs from remote databags using raw relations.
michaeldmitry marked this conversation as resolved.
Show resolved Hide resolved

Duplicate implementation of GrafanaSourceProvider.get_source_uids() to use in the
situation where we want to access relation data when the GrafanaSourceProvider object
is not yet initialised.
"""
uids = {}
for rel in self.model.relations.get("grafana-source", []):
if not rel:
continue
app_databag = rel.data[rel.app]
grafana_uid = app_databag.get("grafana_uid")
if not grafana_uid:
logger.warning(
"remote end is using an old grafana_datasource interface: "
"`grafana_uid` field not found."
)
continue

uids[grafana_uid] = json.loads(app_databag.get("datasource_uids", "{}"))
return uids

def _build_service_graph_config(self) -> Dict[str, Any]:
"""Build the service graph config based on matching datasource UIDs.

To enable service graphs, we need the datasource UID of the prometheus/mimir instance where:
michaeldmitry marked this conversation as resolved.
Show resolved Hide resolved
1- Tempo is connected to over "send-remote-write" relation.
michaeldmitry marked this conversation as resolved.
Show resolved Hide resolved
2- It is also connected, as a datasource, to the same grafana instance(s) Tempo is connected to.
michaeldmitry marked this conversation as resolved.
Show resolved Hide resolved

If there are multiple datasources that fit this description, we can assume that they are all
equivalent and we can use any of them.
"""

dsx_relations = {
relation.app.name: relation
for relation in self.coordinator.datasource_exchange._relations
}

remote_write_apps = {
relation.app.name
for relation in self.model.relations["send-remote-write"]
if relation.app and relation.data
}

# the list of datasource exchange relations whose remote we're also remote writing to.
remote_write_dsx_relations = [
dsx_relations[app_name]
for app_name in set(dsx_relations).intersection(remote_write_apps)
]

# grafana UIDs that are connected to this Tempo.
grafana_uids = set(self._get_grafana_source_uids())

remote_write_dsx_databags = []
for relation in remote_write_dsx_relations:
try:
datasource = DSExchangeAppData.load(relation.data[relation.app])
remote_write_dsx_databags.append(datasource)
except DataValidationError:
# load() already logs
continue

# filter the remote_write_dsx_databags with those that are connected to the same grafana instances Tempo is connected to.
matching_datasources = [
datasource
for databag in remote_write_dsx_databags
for datasource in databag.datasources
if datasource.grafana_uid in grafana_uids and datasource.type == PROMETHEUS_DS_TYPE
]

if not matching_datasources:
msg = "service graph disabled."
michaeldmitry marked this conversation as resolved.
Show resolved Hide resolved
missing_rels = []
if not remote_write_apps:
missing_rels.append("send-remote-write")
if not grafana_uids:
missing_rels.append("grafana-source")
if not dsx_relations:
missing_rels.append("receive-datasource")

if missing_rels:
msg += f" Missing relations: {missing_rels}."

if not remote_write_dsx_relations:
msg += " There are no datasource_exchange relations with a Prometheus/Mimir that we're also remote writing to."
else:
msg += " There are no datasource_exchange relations to a Prometheus/Mimir that are datasources to the same grafana instances Tempo is connected to."

logger.info(msg)
return {}

if len(matching_datasources) > 1:
logger.info(
"there are multiple datasources that could be used to create the service graph. We assume that all are equivalent."
)

# At this point, we can assume any datasource is a valid datasource to use for service graphs.
matching_datasource = matching_datasources[0]
return {
"httpMethod": "GET",
michaeldmitry marked this conversation as resolved.
Show resolved Hide resolved
"serviceMap": {
"datasourceUid": matching_datasource.uid,
},
}


if __name__ == "__main__": # pragma: nocover
Expand Down
15 changes: 12 additions & 3 deletions src/tempo.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,15 @@ def _build_tls_config(self, workers_addrs: Tuple[str, ...]):
}

def _build_overrides_config(self):
# in order to tell tempo to enable the metrics generator, we need to set
# "processors": list of enabled processors in the overrides section
return tempo_config.Overrides(
defaults=tempo_config.Defaults(
metrics_generator=tempo_config.MetricsGeneratorDefaults(
processors=[tempo_config.MetricsGeneratorProcessor.SPAN_METRICS],
processors=[
tempo_config.MetricsGeneratorProcessorLabel.SPAN_METRICS,
tempo_config.MetricsGeneratorProcessorLabel.SERVICE_GRAPHS,
],
)
)
)
Expand Down Expand Up @@ -160,11 +165,15 @@ def _build_metrics_generator_config(
storage=tempo_config.MetricsGeneratorStorage(
path=self.metrics_generator_wal_path,
remote_write=remote_write_instances,
)
),
# Adding juju topology will be done on the worker's side
# to populate the correct unit label.
processor=tempo_config.MetricsGeneratorProcessor(
span_metrics=tempo_config.MetricsGeneratorSpanMetricsProcessor(),
service_graphs=tempo_config.MetricsGeneratorServiceGraphsProcessor(),
),
# per-processor configuration should go in here
)

return config

def _build_server_config(self, use_tls=False):
Expand Down
33 changes: 31 additions & 2 deletions src/tempo_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from pathlib import Path
from typing import Any, Dict, List, Optional

import pydantic
from cosl.coordinated_workers.coordinator import ClusterRolesConfig
from pydantic import BaseModel, ConfigDict, Field

Expand Down Expand Up @@ -101,7 +102,7 @@ class ClientAuthTypeEnum(str, enum.Enum):
REQUIRE_AND_VERIFY_CLIENT_CERT = "RequireAndVerifyClientCert"


class MetricsGeneratorProcessor(str, enum.Enum):
class MetricsGeneratorProcessorLabel(str, enum.Enum):
"""Metrics generator processors supported values.

Supported values: https://grafana.com/docs/tempo/latest/configuration/#standard-overrides
Expand Down Expand Up @@ -301,6 +302,29 @@ class RemoteWrite(BaseModel):
tls_config: Optional[RemoteWriteTLS] = None


class MetricsGeneratorSpanMetricsProcessor(BaseModel):
"""Metrics Generator span_metrics processor configuration schema."""

# see https://grafana.com/docs/tempo/v2.6.x/configuration/#metrics-generator
# for a full list of config options


class MetricsGeneratorServiceGraphsProcessor(BaseModel):
"""Metrics Generator service_graphs processor configuration schema."""

# see https://grafana.com/docs/tempo/v2.6.x/configuration/#metrics-generator
# for a full list of config options


class MetricsGeneratorProcessor(BaseModel):
"""Metrics Generator processor schema."""

span_metrics: MetricsGeneratorSpanMetricsProcessor
service_graphs: MetricsGeneratorServiceGraphsProcessor
# see https://grafana.com/docs/tempo/v2.6.x/configuration/#metrics-generator
# for a full list of config options; could add local_blocks here


class MetricsGeneratorStorage(BaseModel):
"""Metrics Generator storage schema."""

Expand All @@ -314,6 +338,9 @@ class MetricsGenerator(BaseModel):
ring: Optional[Ring] = None
storage: MetricsGeneratorStorage

# processor-specific config depends on the processor type
processor: MetricsGeneratorProcessor


class MetricsGeneratorDefaults(BaseModel):
"""Metrics generator defaults schema."""
Expand All @@ -323,7 +350,9 @@ class MetricsGeneratorDefaults(BaseModel):
use_enum_values=True
)
"""Pydantic config."""
processors: List[MetricsGeneratorProcessor]
processors: Optional[List[MetricsGeneratorProcessorLabel]] = pydantic.Field(
default_factory=list
)


class Defaults(BaseModel):
Expand Down
Loading
Loading