Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add S3 relation to mimir coordinator #21

Merged
merged 14 commits into from
Jan 5, 2024
768 changes: 768 additions & 0 deletions lib/charms/data_platform_libs/v0/s3.py

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ requires:
interface: s3
limit: 1
description: |
The coordinator obtains storage info on behalf of the workers, and
forwards all workers the storage details over mimir-worker.
The coordinator obtains storage information on behalf of the workers and forwards all workers
IbraAoad marked this conversation as resolved.
Show resolved Hide resolved
the storage details over mimir-worker. Limited to 1 to ensure only one storage backend will
be connected to Mimir.

send-remote-write:
interface: prometheus_remote_write
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
ops
pydantic
# crossplane is a package from nginxinc to interact with the Nginx config
PietroPasotti marked this conversation as resolved.
Show resolved Hide resolved
crossplane
crossplane
66 changes: 44 additions & 22 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,16 @@
import logging
from typing import List

from charms.data_platform_libs.v0.s3 import (
S3Requirer,
)
from charms.grafana_k8s.v0.grafana_dashboard import GrafanaDashboardProvider
from charms.loki_k8s.v0.loki_push_api import LokiPushApiConsumer
from charms.mimir_coordinator_k8s.v0.mimir_cluster import MimirClusterProvider
from charms.prometheus_k8s.v0.prometheus_remote_write import (
PrometheusRemoteWriteConsumer,
)
from mimir_config import BUCKET_NAME, S3_RELATION_NAME, _S3StorageBackend
from mimir_coordinator import MimirCoordinator
from nginx import Nginx
from ops.charm import CharmBase, CollectStatusEvent
Expand All @@ -38,11 +42,11 @@ def __init__(self, *args):

self.framework.observe(self.on.config_changed, self._on_config_changed)
self.framework.observe(self.on.collect_unit_status, self._on_collect_status)

# TODO: On any worker relation-joined/departed, need to updade grafana agent's scrape
# targets with the new memberlist.
# (Remote write would still be the same nginx-proxied endpoint.)

self.s3_requirer = S3Requirer(self, S3_RELATION_NAME, BUCKET_NAME)
self.cluster_provider = MimirClusterProvider(self)
self.coordinator = MimirCoordinator(cluster_provider=self.cluster_provider)

Expand All @@ -52,11 +56,17 @@ def __init__(self, *args):
self._on_nginx_pebble_ready,
)

self._s3_storage_data = _S3StorageBackend()
IbraAoad marked this conversation as resolved.
Show resolved Hide resolved
self.framework.observe(
self.on.mimir_cluster_relation_changed, # pyright: ignore
self._on_mimir_cluster_changed,
)

self.framework.observe(
self.s3_requirer.on.credentials_changed, self._on_mimir_cluster_changed
IbraAoad marked this conversation as resolved.
Show resolved Hide resolved
)
self.framework.observe(self.s3_requirer.on.credentials_gone, self._on_s3_departed)

self.remote_write_consumer = PrometheusRemoteWriteConsumer(self)
self.framework.observe(
self.remote_write_consumer.on.endpoints_changed, # pyright: ignore
Expand All @@ -77,18 +87,6 @@ def __init__(self, *args):
self._on_loki_relation_changed,
)

@property
def _s3_storage(self) -> dict:
# if not self.model.relations['s3']:
# return {}
return {
"url": "foo",
"endpoint": "bar",
"access_key": "bar",
"insecure": False,
"secret_key": "x12",
}

@property
def mimir_worker_relations(self) -> List[Relation]:
"""Returns the list of worker relations."""
Expand All @@ -100,23 +98,47 @@ def _on_config_changed(self, event):

def publish_config(self):
"""Generate config file and publish to all workers."""
mimir_config = self.coordinator.build_config(dict(self.config))
mimir_config = self.coordinator.build_config(self._s3_storage_data)
IbraAoad marked this conversation as resolved.
Show resolved Hide resolved
self.cluster_provider.publish_configs(mimir_config)

def _s3_connection_info(self):
"""Parse a _S3StorageBackend object from relation data."""
if not self.s3_requirer.relations:
self._s3_storage_data = _S3StorageBackend()
return
raw = self.s3_requirer.get_s3_connection_info()
raw["access_key"], raw["secret_key"] = raw.pop("access-key", ""), raw.pop("secret-key", "")
self._s3_storage_data = _S3StorageBackend(**raw)
IbraAoad marked this conversation as resolved.
Show resolved Hide resolved

def _on_mimir_cluster_changed(self, _):
if self.coordinator.is_coherent():
logger.info("mimir deployment coherent: publishing configs")
self.publish_config()
else:
logger.warning("this mimir deployment is incoherent")
if not self.coordinator.is_coherent():
logger.warning("Incoherent deployment: Some required Mimir roles are missing.")
return
self._s3_connection_info()
IbraAoad marked this conversation as resolved.
Show resolved Hide resolved
if self._s3_storage_data == _S3StorageBackend() and self.coordinator.is_scaled():
IbraAoad marked this conversation as resolved.
Show resolved Hide resolved
logger.warning("Filesystem storage cannot be used with replicated mimir workers")
return
self.publish_config()

def _on_s3_departed(self, _):
self._s3_storage_data = _S3StorageBackend()
if not self.coordinator.is_coherent():
logger.warning("Incoherent deployment: Some required Mimir roles are missing.")
IbraAoad marked this conversation as resolved.
Show resolved Hide resolved
return
if self.coordinator.is_scaled():
logger.warning("Filesystem storage cannot be used with replicated mimir workers")
return
self.publish_config()

def _on_collect_status(self, event: CollectStatusEvent):
"""Handle start event."""
if not self.coordinator.is_coherent():
event.add_status(
BlockedStatus(
"Incoherent deployment: you are " "lacking some required Mimir roles"
)
BlockedStatus("Incoherent deployment: you are lacking some required Mimir roles")
)
if self._s3_storage_data == _S3StorageBackend() and self.coordinator.is_scaled():
event.add_status(
BlockedStatus("Missing s3 relation, replicated units must use S3 storage.")
IbraAoad marked this conversation as resolved.
Show resolved Hide resolved
)

if self.coordinator.is_recommended():
Expand Down
13 changes: 8 additions & 5 deletions src/mimir_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
from pydantic import BaseModel
from pydantic.dataclasses import dataclass as pydantic_dataclass

S3_RELATION_NAME = "s3"
BUCKET_NAME = "mimir"


class InvalidConfigurationError(Exception):
"""Invalid configuration."""
Expand Down Expand Up @@ -84,11 +87,11 @@ class Alertmanager(BaseModel):


class _S3StorageBackend(BaseModel):
IbraAoad marked this conversation as resolved.
Show resolved Hide resolved
endpoint: str
access_key_id: str
secret_access_key: str
insecure: bool = False
signature_version: str = "v4"
access_key: str = ""
IbraAoad marked this conversation as resolved.
Show resolved Hide resolved
endpoint: str = ""
secret_key: str = ""
bucket: str = ""
IbraAoad marked this conversation as resolved.
Show resolved Hide resolved
region: str = ""
IbraAoad marked this conversation as resolved.
Show resolved Hide resolved


class _FilesystemStorageBackend(BaseModel):
Expand Down
152 changes: 119 additions & 33 deletions src/mimir_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from typing import Any, Dict, Iterable

from charms.mimir_coordinator_k8s.v0.mimir_cluster import MimirClusterProvider, MimirRole
from mimir_config import _S3StorageBackend

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -69,6 +70,10 @@ def is_coherent(self) -> bool:
roles: Iterable[MimirRole] = self._cluster_provider.gather_roles().keys()
return set(roles).issuperset(MINIMAL_DEPLOYMENT)

def is_scaled(self) -> bool:
IbraAoad marked this conversation as resolved.
Show resolved Hide resolved
"""Return True if more than 1 worker are forming the mimir cluster."""
return len(list(self._cluster_provider.gather_addresses())) > 1
IbraAoad marked this conversation as resolved.
Show resolved Hide resolved

def is_recommended(self) -> bool:
"""Return True if is a superset of the minimal deployment.

Expand All @@ -81,49 +86,130 @@ def is_recommended(self) -> bool:
return False
return True

def build_config(self, _charm_config: Dict[str, Any]) -> Dict[str, Any]:
def build_config(self, s3_data: _S3StorageBackend) -> Dict[str, Any]:
IbraAoad marked this conversation as resolved.
Show resolved Hide resolved
"""Generate shared config file for mimir.

Reference: https://grafana.com/docs/mimir/latest/configure/
"""
mimir_config: Dict[str, Any] = {
"common": {},
"alertmanager": {
"data_dir": str(self._root_data_dir / "data-alertmanager"),
},
"compactor": {
"data_dir": str(self._root_data_dir / "data-compactor"),
"alertmanager": self._build_alertmanager_config(),
"alertmanager_storage": self._build_alertmanager_storage_config(),
"compactor": self._build_compactor_config(),
"ruler": self._build_ruler_config(),
"ruler_storage": self._build_ruler_storage_config(),
"blocks_storage": self._build_blocks_storage_config(),
IbraAoad marked this conversation as resolved.
Show resolved Hide resolved
"memberlist": self._build_memberlist_config(),
}

if s3_data != _S3StorageBackend():
mimir_config["common"]["storage"] = self._build_s3_storage_config(s3_data)
self._update_s3_storage_config(mimir_config["blocks_storage"], "filesystem", "blocks")
self._update_s3_storage_config(mimir_config["ruler_storage"], "filesystem", "rules")
self._update_s3_storage_config(
mimir_config["alertmanager_storage"], "filesystem", "alerts"
)

if self._tls_requirer:
mimir_config.update(self._build_tls_config())

return mimir_config

# data_dir:
# The Mimir Alertmanager stores the alerts state on local disk at the location configured using -alertmanager.storage.path.
# Should be persisted if not replicated
def _build_alertmanager_config(self) -> Dict[str, Any]:
return {
"data_dir": str(self._root_data_dir / "data-alertmanager"),
}

# filesystem: dir
# The Mimir Alertmanager also periodically stores the alert state in the storage backend configured with -alertmanager-storage.backend (For Recovery)
def _build_alertmanager_storage_config(self) -> Dict[str, Any]:
return {
"filesystem": {
"dir": str(self._root_data_dir / "data-alertmanager-recovery"),
},
"blocks_storage": {
"bucket_store": {
"sync_dir": str(self._root_data_dir / "tsdb-sync"),
},
}

# data_dir:
# Directory to temporarily store blocks during compaction.
# This directory is not required to be persisted between restarts.
def _build_compactor_config(self) -> Dict[str, Any]:
return {
"data_dir": str(self._root_data_dir / "data-compactor"),
}

# rule_path:
# Directory to store temporary rule files loaded by the Prometheus rule managers.
# This directory is not required to be persisted between restarts.
def _build_ruler_config(self) -> Dict[str, Any]:
return {
"rule_path": str(self._root_data_dir / "data-ruler"),
}

# filesystem: dir
# Storage backend reads Prometheus recording rules from the local filesystem.
# The ruler looks for tenant rules in the self._root_data_dir/rules/<TENANT ID> directory. The ruler requires rule files to be in the Prometheus format.
def _build_ruler_storage_config(self) -> Dict[str, Any]:
return {
"filesystem": {
"dir": str(self._root_data_dir / "rules"),
},
}

if self._s3_requirer:
s3_config = self._s3_requirer.s3_config
mimir_config["common"]["storage"] = {
"backend": "s3",
"s3": {
"region": s3_config.region, # eg. 'us-west'
"bucket_name": s3_config.bucket_name, # eg: 'mimir'
},
}
mimir_config["blocks_storage"] = {
"s3": {"bucket_name": s3_config.blocks_bucket_name} # e.g. 'mimir-blocks'
}

# memberlist config for gossip and hash ring
mimir_config["memberlist"] = {
"join_members": list(self._cluster_provider.gather_addresses())
# bucket_store: sync_dir
# Directory to store synchronized TSDB index headers. This directory is not
# required to be persisted between restarts, but it's highly recommended

# filesystem: dir
# Mimir upload blocks (of metrics) to the object storage at period interval.

# tsdb: dir
# Directory to store TSDBs (including WAL) in the ingesters.
# This directory is required to be persisted between restarts.

# The TSDB dir is used by ingesters, while the filesystem: dir is the "object storage"
# Ingesters are expected to upload TSDB blocks to filesystem: dir every 2h.
def _build_blocks_storage_config(self) -> Dict[str, Any]:
return {
"bucket_store": {
"sync_dir": str(self._root_data_dir / "tsdb-sync"),
},
"filesystem": {
"dir": str(self._root_data_dir / "blocks"),
},
"tsdb": {
"dir": str(self._root_data_dir / "tsdb"),
},
}

# todo: TLS config for memberlist
if self._tls_requirer:
mimir_config["tls_enabled"] = True
mimir_config["tls_cert_path"] = self._tls_requirer.cacert
mimir_config["tls_key_path"] = self._tls_requirer.key
mimir_config["tls_ca_path"] = self._tls_requirer.capath
def _build_s3_storage_config(self, s3_data: _S3StorageBackend) -> Dict[str, Any]:
return {
"backend": "s3",
"s3": {
"endpoint": s3_data.endpoint,
"access_key_id": s3_data.access_key,
"secret_access_key": s3_data.secret_key,
"bucket_name": s3_data.bucket,
"region": s3_data.region,
},
}

return mimir_config
def _update_s3_storage_config(
IbraAoad marked this conversation as resolved.
Show resolved Hide resolved
self, storage_config: Dict[str, Any], old_key: str, prefix_name: str
) -> None:
if old_key in storage_config:
storage_config.pop(old_key)
storage_config["storage_prefix"] = prefix_name

def _build_memberlist_config(self) -> Dict[str, Any]:
return {"join_members": list(self._cluster_provider.gather_addresses())}

def _build_tls_config(self) -> Dict[str, Any]:
return {
"tls_enabled": True,
"tls_cert_path": self._tls_requirer.cacert,
"tls_key_path": self._tls_requirer.key,
"tls_ca_path": self._tls_requirer.capath,
}
8 changes: 6 additions & 2 deletions tests/scenario/test_mimir_cluster_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,12 @@ def test_address_collection(workers_addresses):
i: MimirClusterRequirerUnitData(address=address, juju_topology=topo).dump()
for i, address in enumerate(worker_addresses)
}
relations.append(Relation("mimir-cluster-provide", remote_units_data=units_data))

roles_data = MimirClusterRequirerAppData(roles=[MimirRole.read, MimirRole.write]).dump()
relations.append(
Relation(
"mimir-cluster-provide", remote_app_data=roles_data, remote_units_data=units_data
)
)
# all unit addresses should show up
expected = set(chain(*workers_addresses))

Expand Down
Loading