Skip to content

Commit

Permalink
yanked workload
Browse files Browse the repository at this point in the history
  • Loading branch information
PietroPasotti committed Jun 19, 2024
1 parent 316e4d0 commit 79e4775
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 711 deletions.
22 changes: 0 additions & 22 deletions charmcraft.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,6 @@ containers:
- storage: data
location: /tmp/tempo

resources:
tempo-image:
type: oci-image
description: OCI image for Tempo
# Included for simplicity in integration tests
# see https://hub.docker.com/r/grafana/tempo/tags
upstream-source: grafana/tempo:2.4.0

provides:
tempo-cluster:
interface: tempo_cluster
Expand Down Expand Up @@ -103,20 +95,6 @@ peers:
description: |
peer relation for internal coordination
config:
options:
coordinator_runs_workload_when_clustered:
type: boolean
default: true
description: |
Whether this charm should also run a worker node when related
to any number of specialized worker applications.
Set it to ``false`` if you want this charm to stop running `Tempo`
as soon as you integrate it with a tempo-worker-k8s-charm instance.
In this case, tempo-k8s will only act as coordinator
(and reverse proxy) for the tempo cluster.
bases:
- build-on:
Expand Down
179 changes: 16 additions & 163 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,14 @@ def __init__(self, *args):
super().__init__(*args)
self.ingress = TraefikRouteRequirer(self, self.model.get_relation("ingress"), "ingress") # type: ignore
self.tempo_cluster = TempoClusterProvider(self)
self.coordinator = TempoCoordinator(self.tempo_cluster,
is_worker=self.is_worker_node)
self.coordinator = TempoCoordinator(self.tempo_cluster)

self.tempo = tempo = Tempo(
self.unit.get_container("tempo"),
external_host=self.hostname,
# we need otlp_http receiver for charm_tracing
# TODO add any extra receivers enabled manually via config
enable_receivers=["otlp_http"],
run_worker_node=self.is_worker_node
use_tls=self.tls_available,
)

self.cert_handler = CertHandler(
Expand Down Expand Up @@ -106,10 +105,6 @@ def __init__(self, *args):

self.tracing = TracingEndpointProvider(self, external_url=self._external_url)
self._inconsistencies = self.coordinator.get_deployment_inconsistencies(
clustered=self.is_clustered,
scaled=self.is_scaled,
has_workers=self.tempo_cluster.has_workers,
is_worker_node=self.is_worker_node,
has_s3=self.is_s3_ready
)
self._is_consistent = not self._inconsistencies
Expand All @@ -122,15 +117,10 @@ def __init__(self, *args):
"the situation is resolved by the cloud admin, to avoid data loss."
)
self.framework.observe(self.on.collect_unit_status, self._on_collect_unit_status)

if self.tempo.is_tempo_service_defined:
self.tempo.shutdown()

return # refuse to handle any other event as we can't possibly know what to do.

# lifecycle
self.framework.observe(self.on.leader_elected, self._on_leader_elected)
self.framework.observe(self.on.leader_settings_changed, self._on_leader_settings_changed)
self.framework.observe(self.on.update_status, self._on_update_status)
self.framework.observe(self.on.collect_unit_status, self._on_collect_unit_status)
self.framework.observe(self.on.config_changed, self._on_config_changed)
Expand All @@ -142,12 +132,6 @@ def __init__(self, *args):
self.framework.observe(ingress.relation_joined, self._on_ingress_relation_joined)
self.framework.observe(self.ingress.on.ready, self._on_ingress_ready)

# workload
self.framework.observe(self.on.tempo_pebble_ready, self._on_tempo_pebble_ready)
self.framework.observe(
self.on.tempo_pebble_custom_notice, self._on_tempo_pebble_custom_notice
)

# s3
self.framework.observe(
self.s3_requirer.on.credentials_changed, self._on_s3_credentials_changed
Expand All @@ -174,22 +158,6 @@ def __init__(self, *args):
# UTILITY PROPERTIES #
######################

@property
def is_worker_node(self) -> bool:
"""Check whether this Tempo charm is configured to run a worker node."""
if self.is_clustered:
return self.config.get("coordinator_runs_workload_when_clustered", True)
return True

@property
def is_scaled(self) -> bool:
"""Check whether Tempo is deployed with scale > 1."""
relation = self.model.get_relation("tempo-peers")
if not relation:
return False
# does not include self
return len(relation.units) > 0

@property
def is_clustered(self) -> bool:
"""Check whether this Tempo is a coordinator and has worker nodes connected to it."""
Expand Down Expand Up @@ -268,27 +236,14 @@ def _on_tracing_broken(self, _):
self._update_tracing_relations()

def _on_cert_handler_changed(self, _):
was_ready = self.tempo.tls_ready

if self.tls_available:
logger.debug("enabling TLS")
self.tempo.configure_tls(
cert=self.cert_handler.server_cert, # type: ignore
key=self.cert_handler.private_key, # type: ignore
ca=self.cert_handler.ca_cert, # type: ignore
)
else:
logger.debug("disabling TLS")
self.tempo.clear_tls_config()

if was_ready != self.tempo.tls_ready:
# tls readiness change means config change.
self._update_tempo_config()
# sync scheme change with traefik and related consumers
self._configure_ingress()

if self.tempo.is_tempo_service_defined:
self.tempo.restart()
# tls readiness change means config change.
# sync scheme change with traefik and related consumers
self._configure_ingress()

# sync the server cert with the charm container.
# technically, because of charm tracing, this will be called first thing on each event
Expand All @@ -314,12 +269,6 @@ def _on_ingress_relation_created(self, _: RelationEvent):
def _on_ingress_relation_joined(self, _: RelationEvent):
self._configure_ingress()

def _on_leader_settings_changed(self, _: ops.LeaderSettingsChangedEvent):
if not self.is_s3_ready:
logger.error(
"Losing leadership without s3. " "This unit will soon be in an inconsistent state."
)

def _on_leader_elected(self, _: ops.LeaderElectedEvent):
# as traefik_route goes through app data, we need to take lead of traefik_route if our leader dies.
self._configure_ingress()
Expand All @@ -331,31 +280,18 @@ def _on_s3_credentials_gone(self, _):
self._on_s3_changed()

def _on_s3_changed(self):
could_scale_before = self.tempo.can_scale()

self._update_tempo_config()

can_scale_now = self.tempo.can_scale()
# if we had s3, and we don't anymore, we need to replan from 'scaling-monolithic' to 'all'
# if we didn't have s3, and now we do, we can replan from 'all' to 'scaling-monolithic'
if could_scale_before != can_scale_now:
if not self.tempo.is_tempo_service_defined:
# TODO: should we be deferring this to the next pebble-ready instead?
logger.debug("tempo was not running! Starting it now")
self.tempo.plan()

self._update_tempo_cluster()

def _on_tempo_peers_relation_created(self, event: ops.RelationCreatedEvent):
if self._local_ip:
event.relation.data[self.unit]["local-ip"] = self._local_ip

def _on_tempo_peers_relation_changed(self, _):
if self._update_tempo_config():
self.tempo.restart()
self._update_tempo_cluster()

def _update_tempo_config(self) -> bool:
peers = self.peers()
@property
def peer_addresses(self) -> List[str]:
peers = self._peers
relation = self.model.get_relation("tempo-peers")
# get unit addresses for all the other units from a databag
if peers and relation:
Expand All @@ -368,7 +304,7 @@ def _update_tempo_config(self) -> bool:
if self._local_ip:
addresses.append(self._local_ip)

return self.tempo.update_config(self._requested_receivers(), self._s3_config, addresses)
return addresses

@property
def _local_ip(self) -> Optional[str]:
Expand All @@ -382,36 +318,10 @@ def _local_ip(self) -> Optional[str]:

def _on_config_changed(self, _):
# check if certificate files haven't disappeared and recreate them if needed
if self.tls_available and not self.tempo.tls_ready:
logger.debug("enabling TLS")
self.tempo.configure_tls(
cert=self.cert_handler.server_cert, # type: ignore
key=self.cert_handler.private_key, # type: ignore
ca=self.cert_handler.ca_cert, # type: ignore
)

self._update_tempo_cluster()

def _on_tempo_pebble_custom_notice(self, event: PebbleNoticeEvent):
if event.notice.key == self.tempo.tempo_ready_notice_key:
logger.debug("pebble api reported ready")
# collect-unit-status should do the rest and report that pebble is ready.
self.tempo.receive_tempo_ready_notice()

def _on_tempo_pebble_ready(self, event: WorkloadEvent):
if not self.tempo.container.can_connect():
logger.warning("container not ready, cannot configure; will retry soon")
return event.defer()

self.tempo.update_config(self._requested_receivers(), self._s3_config)
self.tempo.plan()

self.unit.set_workload_version(self.version)
self.unit.status = ActiveStatus()

def _on_update_status(self, _):
"""Update the status of the application."""
self.unit.set_workload_version(self.version)

def _on_ingress_ready(self, _event):
# whenever there's a change in ingress, we need to update all tracing relations
Expand All @@ -433,8 +343,6 @@ def _on_list_receivers_action(self, event: ops.ActionEvent):
def _on_collect_unit_status(self, e: CollectStatusEvent):
# todo add [nginx.workload] statuses

if not self.tempo.container.can_connect():
e.add_status(WaitingStatus("[workload.tempo] Tempo container not ready"))
if not self.tempo.is_ready():
e.add_status(WaitingStatus("[workload.tempo] Tempo API not ready just yet..."))

Expand Down Expand Up @@ -478,6 +386,9 @@ def _configure_ingress(self) -> None:
if self.ingress.external_host:
self._update_tracing_relations()

# notify the cluster
self._update_tempo_cluster()

def _update_tracing_relations(self):
tracing_relations = self.model.relations["tracing"]
if not tracing_relations:
Expand All @@ -493,22 +404,8 @@ def _update_tracing_relations(self):
[(p, self.tempo.get_receiver_url(p, self.ingress)) for p in requested_receivers]
)

self._restart_if_receivers_changed()

self._update_tempo_cluster()

def _restart_if_receivers_changed(self):
# if the receivers have changed, we need to reconfigure tempo
self.unit.status = MaintenanceStatus("reconfiguring Tempo...")
updated = self._update_tempo_config()
if not updated:
logger.debug("Config not updated; skipping tempo restart")
if updated:
restarted = self.tempo.is_tempo_service_defined and self.tempo.restart()
if not restarted:
# assume that this will be handled at the next pebble-ready
logger.debug("Cannot reconfigure/restart tempo at this time.")

def _requested_receivers(self) -> Tuple[ReceiverProtocol, ...]:
"""List what receivers we should activate, based on the active tracing relations."""
# we start with the sum of the requested endpoints from the requirers
Expand All @@ -519,51 +416,6 @@ def _requested_receivers(self) -> Tuple[ReceiverProtocol, ...]:
requested_receivers.update(self.tempo.enabled_receivers)
return tuple(requested_receivers)

@property
def version(self) -> str:
"""Reports the current Tempo version."""
container = self.unit.get_container("tempo")
if container.can_connect() and container.get_services("tempo"):
try:
return self._get_version() or ""
# Catching Exception is not ideal, but we don't care much for the error here, and just
# default to setting a blank version since there isn't much the admin can do!
except Exception as e:
logger.warning("unable to get version from API: %s", str(e))
logger.debug(e, exc_info=True)
return ""
return ""

def _get_version(self) -> Optional[str]:
"""Fetch the version from the running workload using the Tempo CLI.
Helper function.
"""
container = self.unit.get_container("tempo")
proc = container.exec(["/tempo", "-version"])
out, err = proc.wait_output()

# example output:
# / # /tempo --version
# tempo, version (branch: HEAD, revision: fd5743d5d)
# build user:
# build date:
# go version: go1.18.5
# platform: linux/amd64

if version_head := re.search(r"tempo, version (.*) \(branch: (.*), revision: (.*)\)", out):
v_head, b_head, r_head = version_head.groups()
version = f"{v_head}:{b_head}/{r_head}"
elif version_headless := re.search(r"tempo, version (\S+)", out):
version = version_headless.groups()[0]
else:
logger.warning(
f"unable to determine tempo workload version: output {out} "
f"does not match any known pattern"
)
return
return version

def server_cert(self):
"""For charm tracing."""
self._update_server_cert()
Expand All @@ -589,7 +441,8 @@ def tempo_otlp_http_endpoint(self) -> Optional[str]:

return None

def peers(self) -> Optional[Set[ops.model.Unit]]:
@property
def _peers(self) -> Optional[Set[ops.model.Unit]]:
relation = self.model.get_relation("tempo-peers")
if not relation:
return None
Expand Down
20 changes: 2 additions & 18 deletions src/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,28 +66,20 @@ def _is_recommended():
# python>=3.11 would support roles >= RECOMMENDED_DEPLOYMENT

def get_deployment_inconsistencies(
self, clustered: bool, scaled: bool, has_workers: bool, is_worker_node: bool, has_s3: bool
self, has_s3: bool
) -> List[str]:
"""Determine whether the deployment as a whole is consistent.
Return a list of failed consistency checks.
"""
return self._get_deployment_inconsistencies(
clustered=clustered,
scaled=scaled,
has_workers=has_workers,
is_worker_node=is_worker_node,
has_s3=has_s3,
coherent=self.is_coherent,
missing_roles=self.missing_roles,
)

@staticmethod
def _get_deployment_inconsistencies(
clustered: bool,
scaled: bool,
has_workers: bool,
is_worker_node: bool,
has_s3: bool,
coherent: bool,
missing_roles: Set[TempoRole] = None,
Expand All @@ -96,17 +88,9 @@ def _get_deployment_inconsistencies(
Return a list of failed consistency checks.
"""

failures = []
# is_monolith = not (scaled or clustered or has_workers) and is_worker_node

if not is_worker_node and not has_workers:
failures.append("Tempo must either be a worker node or have some workers.")
if not has_s3:
if scaled:
failures.append("Tempo is scaled but has no s3 integration.")
if clustered:
failures.append("Tempo is clustered but has no s3 integration.")
failures.append("Tempo has no s3 integration.")
elif not coherent:
failures.append(f"Incoherent coordinator: missing roles: {missing_roles}.")
return failures
Loading

0 comments on commit 79e4775

Please sign in to comment.