Skip to content

Commit

Permalink
yanked some more, fixed unittests
Browse files Browse the repository at this point in the history
  • Loading branch information
PietroPasotti committed Jun 19, 2024
1 parent 79e4775 commit ada59ac
Show file tree
Hide file tree
Showing 25 changed files with 338 additions and 1,070 deletions.
18 changes: 6 additions & 12 deletions charmcraft.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Copyright 2022 Canonical Ltd.
# See LICENSE file for licensing details.
name: tempo-k8s
name: tempo-coordinator-k8s
type: charm

assumes:
Expand All @@ -17,20 +17,14 @@ summary: |
Tempo is a distributed tracing backend by Grafana.
links:
documentation: https://discourse.charmhub.io/t/tempo-k8s-docs-index/14005
# FIXME: create docs tree root
documentation: https://discourse.charmhub.io/t/tempo-coordinator-k8s-docs-index
website:
- https://charmhub.io/tempo-k8s
- https://charmhub.io/tempo-coordinator-k8s
source:
- https://github.com/canonical/tempo-k8s-operator
- https://github.com/canonical/tempo-coordinator-k8s-operator
issues:
- https://github.com/canonical/tempo-k8s-operator/issues

containers:
tempo:
resource: tempo-image
mounts:
- storage: data
location: /tmp/tempo
- https://github.com/canonical/tempo-coordinator-k8s-operator/issues

provides:
tempo-cluster:
Expand Down
181 changes: 80 additions & 101 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,30 +5,16 @@
"""Charmed Operator for Tempo; a lightweight object storage based tracing backend."""
import json
import logging
import re
import socket
from pathlib import Path
from typing import Dict, List
from typing import Optional, Set, Tuple
from typing import Dict, List, Optional, Set, Tuple

import ops
from ops.charm import (
CharmBase,
CollectStatusEvent,
PebbleNoticeEvent,
RelationEvent,
WorkloadEvent,
)
from ops.main import main
from ops.model import ActiveStatus, BlockedStatus, MaintenanceStatus, WaitingStatus
from ops.model import Relation

from charms.data_platform_libs.v0.s3 import S3Requirer
from charms.grafana_k8s.v0.grafana_dashboard import GrafanaDashboardProvider
from charms.grafana_k8s.v0.grafana_source import GrafanaSourceProvider
from charms.loki_k8s.v0.loki_push_api import LogProxyConsumer
from charms.observability_libs.v0.kubernetes_service_patch import KubernetesServicePatch
from charms.observability_libs.v1.cert_handler import CertHandler, VAULT_SECRET_LABEL
from charms.observability_libs.v1.cert_handler import VAULT_SECRET_LABEL, CertHandler
from charms.prometheus_k8s.v0.prometheus_scrape import MetricsEndpointProvider
from charms.tempo_k8s.v1.charm_tracing import trace_charm
from charms.tempo_k8s.v2.tracing import (
Expand All @@ -37,19 +23,27 @@
TracingEndpointProvider,
)
from charms.traefik_route_k8s.v0.traefik_route import TraefikRouteRequirer
from ops.charm import CharmBase, CollectStatusEvent, RelationEvent
from ops.main import main
from ops.model import ActiveStatus, BlockedStatus, Relation, WaitingStatus

from coordinator import TempoCoordinator
from tempo import Tempo
from tempo_cluster import TempoClusterProvider

logger = logging.getLogger(__name__)


class S3NotFoundError(Exception):
"""Raised when the s3 integration is not present or not ready."""


@trace_charm(
tracing_endpoint="tempo_otlp_http_endpoint",
server_cert="server_cert",
extra_types=(Tempo, TracingEndpointProvider),
)
class TempoCharm(CharmBase):
class TempoCoordinatorCharm(CharmBase):
"""Charmed Operator for Tempo; a distributed tracing backend."""

def __init__(self, *args):
Expand All @@ -58,6 +52,13 @@ def __init__(self, *args):
self.tempo_cluster = TempoClusterProvider(self)
self.coordinator = TempoCoordinator(self.tempo_cluster)

# keep this above Tempo instantiation, as we need it in self.tls_enabled
self.cert_handler = CertHandler(
self,
key="tempo-server-cert",
sans=[self.hostname],
)

self.tempo = tempo = Tempo(
external_host=self.hostname,
# we need otlp_http receiver for charm_tracing
Expand All @@ -66,12 +67,6 @@ def __init__(self, *args):
use_tls=self.tls_available,
)

self.cert_handler = CertHandler(
self,
key="tempo-server-cert",
sans=[self.hostname],
)

self.s3_requirer = S3Requirer(self, Tempo.s3_relation_name, Tempo.s3_bucket_name)

# configure this tempo as a datasource in grafana
Expand All @@ -95,34 +90,31 @@ def __init__(self, *args):
relation_name="metrics-endpoint",
jobs=[{"static_configs": [{"targets": [f"*:{tempo.tempo_http_server_port}"]}]}],
)
# Enable log forwarding for Loki and other charms that implement loki_push_api
self._logging = LogProxyConsumer(
self, relation_name="logging", log_files=[self.tempo.log_path], container_name="tempo"
)
self._grafana_dashboards = GrafanaDashboardProvider(
self, relation_name="grafana-dashboard"
)

self.tracing = TracingEndpointProvider(self, external_url=self._external_url)
self._inconsistencies = self.coordinator.get_deployment_inconsistencies(
has_s3=self.is_s3_ready
has_s3=self.s3_ready
)
self._is_consistent = not self._inconsistencies

# We always listen to collect-status
self.framework.observe(self.on.collect_unit_status, self._on_collect_unit_status)

if not self._is_consistent:
logger.error(
f"Inconsistent deployment. {self.unit.name} will be shutting down. "
"This likely means you need to add an s3 integration. "
"This charm will be unresponsive and refuse to handle any event until "
"the situation is resolved by the cloud admin, to avoid data loss."
)
self.framework.observe(self.on.collect_unit_status, self._on_collect_unit_status)
return # refuse to handle any other event as we can't possibly know what to do.

# lifecycle
self.framework.observe(self.on.leader_elected, self._on_leader_elected)
self.framework.observe(self.on.update_status, self._on_update_status)
self.framework.observe(self.on.collect_unit_status, self._on_collect_unit_status)
self.framework.observe(self.on.config_changed, self._on_config_changed)
self.framework.observe(self.on.list_receivers_action, self._on_list_receivers_action)

Expand Down Expand Up @@ -163,15 +155,6 @@ def is_clustered(self) -> bool:
"""Check whether this Tempo is a coordinator and has worker nodes connected to it."""
return self.tempo_cluster.has_workers

@property
def is_s3_ready(self) -> bool:
# we have an s3 config

# we cannot check for self.tempo.can_scale() here, because if we are handling a s3-changed
# event, it may be that we have a s3 config which is not yet on disk (it will be put there by
# our on_s3_changed event handler momentarily).
return bool(self._s3_config)

@property
def hostname(self) -> str:
"""Unit's hostname."""
Expand Down Expand Up @@ -207,26 +190,61 @@ def _internal_url(self) -> str:
def tls_available(self) -> bool:
"""Return True if tls is enabled and the necessary certs are found."""
return (
self.cert_handler.enabled
and (self.cert_handler.server_cert is not None)
and (self.cert_handler.private_key is not None)
and (self.cert_handler.ca_cert is not None)
self.cert_handler.enabled
and (self.cert_handler.server_cert is not None)
and (self.cert_handler.private_key is not None)
and (self.cert_handler.ca_cert is not None)
)

@property
def _s3_config(self) -> Optional[dict]:
if not self.s3_requirer.relations:
return None
def _s3_config(self) -> dict:
s3_config = self.s3_requirer.get_s3_connection_info()
if (
s3_config
and "bucket" in s3_config
and "endpoint" in s3_config
and "access-key" in s3_config
and "secret-key" in s3_config
s3_config
and "bucket" in s3_config
and "endpoint" in s3_config
and "access-key" in s3_config
and "secret-key" in s3_config
):
return s3_config
return None
raise S3NotFoundError("s3 integration inactive")

@property
def s3_ready(self) -> bool:
"""Check whether s3 is configured."""
try:
return bool(self._s3_config)
except S3NotFoundError:
return False

@property
def peer_addresses(self) -> List[str]:
peers = self._peers
relation = self.model.get_relation("tempo-peers")
# get unit addresses for all the other units from a databag
if peers and relation:
addresses = [relation.data[unit].get("local-ip") for unit in peers]
addresses = list(filter(None, addresses))
else:
addresses = []

# add own address
if self._local_ip:
addresses.append(self._local_ip)

return addresses

@property
def _local_ip(self) -> Optional[str]:
try:
return str(self.model.get_binding("tempo-peers").network.bind_address)
except (ops.ModelError, KeyError) as e:
logger.debug("failed to obtain local ip from tempo-peers binding", exc_info=True)
logger.error(
f"unable to get local IP at this time: failed with {type(e)}; "
f"see debug log for more info"
)
return None

##################
# EVENT HANDLERS #
Expand Down Expand Up @@ -289,33 +307,6 @@ def _on_tempo_peers_relation_created(self, event: ops.RelationCreatedEvent):
def _on_tempo_peers_relation_changed(self, _):
self._update_tempo_cluster()

@property
def peer_addresses(self) -> List[str]:
peers = self._peers
relation = self.model.get_relation("tempo-peers")
# get unit addresses for all the other units from a databag
if peers and relation:
addresses = [relation.data[unit].get("local-ip") for unit in peers]
addresses = list(filter(None, addresses))
else:
addresses = []

# add own address
if self._local_ip:
addresses.append(self._local_ip)

return addresses

@property
def _local_ip(self) -> Optional[str]:
try:
return str(self.model.get_binding('tempo-peers').network.bind_address)
except (ops.ModelError, KeyError) as e:
logger.debug("failed to obtain local ip from tempo-peers binding", exc_info=True)
logger.error(f"unable to get local IP at this time: failed with {type(e)}; "
f"see debug log for more info")
return None

def _on_config_changed(self, _):
# check if certificate files haven't disappeared and recreate them if needed
self._update_tempo_cluster()
Expand Down Expand Up @@ -343,24 +334,14 @@ def _on_list_receivers_action(self, event: ops.ActionEvent):
def _on_collect_unit_status(self, e: CollectStatusEvent):
# todo add [nginx.workload] statuses

if not self.tempo.is_ready():
if not self.tempo.is_ready:
e.add_status(WaitingStatus("[workload.tempo] Tempo API not ready just yet..."))

# todo: how to surface this inconsistent state?
# if not self.tempo.can_scale() and self.is_s3_ready):
# e.add_status(BlockedStatus("[s3] s3 ready but tempo not configured."))

# TODO: should we set these statuses on the leader only, or on all units?
if issues := self._inconsistencies:
for issue in issues:
e.add_status(
BlockedStatus("[consistency.issues]" + issue)
)
e.add_status(
BlockedStatus(
"[consistency] Unit *disabled*."
)
)
e.add_status(BlockedStatus("[consistency.issues]" + issue))
e.add_status(BlockedStatus("[consistency] Unit *disabled*."))
else:
if self.is_clustered:
# no issues: tempo is consistent
Expand Down Expand Up @@ -450,9 +431,6 @@ def _peers(self) -> Optional[Set[ops.model.Unit]]:
# self is not included in relation.units
return relation.units

def _is_s3_ready(self) -> bool:
return bool(self._s3_config)

@property
def loki_endpoints_by_unit(self) -> Dict[str, str]:
"""Loki endpoints from relation data in the format needed for Pebble log forwarding.
Expand Down Expand Up @@ -480,24 +458,25 @@ def loki_endpoints_by_unit(self) -> Dict[str, str]:

def _update_tempo_cluster(self):
"""Build the config and publish everything to the application databag."""
if not self.coordinator.is_coherent:
if not self._is_consistent:
logger.error("skipped tempo cluster update: inconsistent state")
return

kwargs = {}

if self.tls_available:
# we share the certs in plaintext as they're not sensitive information
kwargs['ca_cert'] = self.cert_handler.ca_cert
kwargs['server_cert'] = self.cert_handler.server_cert
kwargs['privkey_secret_id'] = self.tempo_cluster.publish_privkey(VAULT_SECRET_LABEL)
kwargs["ca_cert"] = self.cert_handler.ca_cert
kwargs["server_cert"] = self.cert_handler.server_cert
kwargs["privkey_secret_id"] = self.tempo_cluster.publish_privkey(VAULT_SECRET_LABEL)

# On every function call, we always publish everything to the databag; however, if there
# are no changes, Juju will notice there's no delta and do nothing
self.tempo_cluster.publish_data(
tempo_config=self.tempo.generate_config(self._requested_receivers(), self._s3_config),
loki_endpoints=self.loki_endpoints_by_unit,
# TODO tempo receiver for charm tracing
**kwargs
**kwargs,
)

@property
Expand Down Expand Up @@ -543,4 +522,4 @@ def _ingress_config(self) -> dict:


if __name__ == "__main__": # pragma: nocover
main(TempoCharm)
main(TempoCoordinatorCharm)
Loading

0 comments on commit ada59ac

Please sign in to comment.