Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add S3 relation to mimir coordinator #21

Merged
merged 14 commits into from
Jan 5, 2024
768 changes: 768 additions & 0 deletions lib/charms/data_platform_libs/v0/s3.py

Large diffs are not rendered by default.

3 changes: 1 addition & 2 deletions metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ requires:
interface: s3
limit: 1
description: |
The coordinator obtains storage info on behalf of the workers, and
forwards all workers the storage details over mimir-worker.
The coordinator obtains and shares storage details with workers, enabling Mimir's access to an S3 bucket for data storage.

send-remote-write:
interface: prometheus_remote_write
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
ops
pydantic
# crossplane is a package from nginxinc to interact with the Nginx config
PietroPasotti marked this conversation as resolved.
Show resolved Hide resolved
crossplane
crossplane
90 changes: 65 additions & 25 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,18 @@
https://discourse.charmhub.io/t/4208
"""
import logging
from typing import List
from typing import List, Optional

from charms.data_platform_libs.v0.s3 import (
S3Requirer,
)
from charms.grafana_k8s.v0.grafana_dashboard import GrafanaDashboardProvider
from charms.loki_k8s.v0.loki_push_api import LokiPushApiConsumer
from charms.mimir_coordinator_k8s.v0.mimir_cluster import MimirClusterProvider
from charms.prometheus_k8s.v0.prometheus_remote_write import (
PrometheusRemoteWriteConsumer,
)
from mimir_config import BUCKET_NAME, S3_RELATION_NAME, _S3ConfigData
IbraAoad marked this conversation as resolved.
Show resolved Hide resolved
from mimir_coordinator import MimirCoordinator
from nginx import Nginx
from ops.charm import CharmBase, CollectStatusEvent
Expand All @@ -38,11 +42,11 @@ def __init__(self, *args):

self.framework.observe(self.on.config_changed, self._on_config_changed)
self.framework.observe(self.on.collect_unit_status, self._on_collect_status)

# TODO: On any worker relation-joined/departed, need to updade grafana agent's scrape
# targets with the new memberlist.
# (Remote write would still be the same nginx-proxied endpoint.)

self.s3_requirer = S3Requirer(self, S3_RELATION_NAME, BUCKET_NAME)
self.cluster_provider = MimirClusterProvider(self)
self.coordinator = MimirCoordinator(cluster_provider=self.cluster_provider)

Expand All @@ -57,6 +61,13 @@ def __init__(self, *args):
self._on_mimir_cluster_changed,
)

self.framework.observe(
self.s3_requirer.on.credentials_changed, self._on_s3_requirer_credentials_changed
)
self.framework.observe(
self.s3_requirer.on.credentials_gone, self._on_s3_requirer_credentials_gone
)

self.remote_write_consumer = PrometheusRemoteWriteConsumer(self)
self.framework.observe(
self.remote_write_consumer.on.endpoints_changed, # pyright: ignore
Expand All @@ -77,45 +88,74 @@ def __init__(self, *args):
self._on_loki_relation_changed,
)

@property
def _s3_storage(self) -> dict:
# if not self.model.relations['s3']:
# return {}
return {
"url": "foo",
"endpoint": "bar",
"access_key": "bar",
"insecure": False,
"secret_key": "x12",
}

@property
def mimir_worker_relations(self) -> List[Relation]:
"""Returns the list of worker relations."""
return self.model.relations.get("mimir_worker", [])

def _on_config_changed(self, event):
def has_multiple_workers(self) -> bool:
"""Return True if there are multiple workers forming the Mimir cluster."""
mimir_cluster_relations = self.model.relations.get("mimir-cluster", [])
remote_units_count = sum(
len(relation.units)
for relation in mimir_cluster_relations
if relation.app != self.model.app
)
return remote_units_count > 1

def _on_config_changed(self, _):
"""Handle changed configuration."""
self.publish_config()
s3_config_data = self._get_s3_storage_config()
self.publish_config(s3_config_data)

def _on_mimir_cluster_changed(self, _):
self._process_cluster_and_s3_credentials_changes()

def publish_config(self):
def _on_s3_requirer_credentials_changed(self, _):
self._process_cluster_and_s3_credentials_changes()

def _process_cluster_and_s3_credentials_changes(self):
if not self.coordinator.is_coherent():
logger.warning("Incoherent deployment: Some required Mimir roles are missing.")
return
s3_config_data = self._get_s3_storage_config()
if not s3_config_data and self.has_multiple_workers():
logger.warning("Filesystem storage cannot be used with replicated mimir workers")
return
self.publish_config(s3_config_data)

def _on_s3_requirer_credentials_gone(self, _):
if not self.coordinator.is_coherent():
logger.warning("Incoherent deployment: Some required Mimir roles are missing.")
IbraAoad marked this conversation as resolved.
Show resolved Hide resolved
return
if self.has_multiple_workers():
logger.warning("Filesystem storage cannot be used with replicated mimir workers")
return
self.publish_config(None)

def publish_config(self, s3_config_data: Optional[dict]):
"""Generate config file and publish to all workers."""
mimir_config = self.coordinator.build_config(dict(self.config))
mimir_config = self.coordinator.build_config(s3_config_data)
self.cluster_provider.publish_configs(mimir_config)

def _on_mimir_cluster_changed(self, _):
if self.coordinator.is_coherent():
logger.info("mimir deployment coherent: publishing configs")
self.publish_config()
else:
logger.warning("this mimir deployment is incoherent")
def _get_s3_storage_config(self):
"""Retrieve S3 storage configuration."""
if not self.s3_requirer.relations:
return None
raw = self.s3_requirer.get_s3_connection_info()
return _S3ConfigData.load_and_dump_to_dict(raw)

def _on_collect_status(self, event: CollectStatusEvent):
"""Handle start event."""
if not self.coordinator.is_coherent():
event.add_status(
BlockedStatus("Incoherent deployment: you are lacking some required Mimir roles")
)
s3_config_data = self._get_s3_storage_config()
if not s3_config_data and self.has_multiple_workers():
event.add_status(
BlockedStatus(
"Incoherent deployment: you are " "lacking some required Mimir roles"
"When multiple units of Mimir are deployed, you must add a valid S3 relation. S3 relation missing/invalid."
)
)

Expand Down
30 changes: 23 additions & 7 deletions src/mimir_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,18 @@

"""Helper module for interacting with the Mimir configuration."""

import logging
from dataclasses import asdict
from typing import List, Literal, Optional, Union

from pydantic import BaseModel
from pydantic import BaseModel, Field, ValidationError
from pydantic.dataclasses import dataclass as pydantic_dataclass

S3_RELATION_NAME = "s3"
BUCKET_NAME = "mimir"

logger = logging.getLogger(__name__)


class InvalidConfigurationError(Exception):
"""Invalid configuration."""
Expand Down Expand Up @@ -83,19 +89,29 @@ class Alertmanager(BaseModel):
external_url: Optional[str]


class _S3StorageBackend(BaseModel):
class _S3ConfigData(BaseModel):
model_config = {"populate_by_name": True}
access_key_id: str = Field(alias="access-key")
endpoint: str
access_key_id: str
secret_access_key: str
insecure: bool = False
signature_version: str = "v4"
secret_access_key: str = Field(alias="secret-key")
bucket_name: str = Field(alias="bucket")
region: str = ""
IbraAoad marked this conversation as resolved.
Show resolved Hide resolved

@classmethod
def load_and_dump_to_dict(cls, raw_s3_config_data: dict) -> Optional[dict]:
IbraAoad marked this conversation as resolved.
Show resolved Hide resolved
try:
return cls(**raw_s3_config_data).model_dump()
except ValidationError:
msg = f"failed to validate s3 config data: {raw_s3_config_data}"
logger.error(msg, exc_info=True)
return None


class _FilesystemStorageBackend(BaseModel):
dir: str


_StorageBackend = Union[_S3StorageBackend, _FilesystemStorageBackend]
_StorageBackend = Union[_S3ConfigData, _FilesystemStorageBackend]
_StorageKey = Union[Literal["filesystem"], Literal["s3"]]


Expand Down
142 changes: 109 additions & 33 deletions src/mimir_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import logging
from collections import Counter
from pathlib import Path
from typing import Any, Dict, Iterable
from typing import Any, Dict, Iterable, Optional

from charms.mimir_coordinator_k8s.v0.mimir_cluster import MimirClusterProvider, MimirRole

Expand Down Expand Up @@ -81,49 +81,125 @@ def is_recommended(self) -> bool:
return False
return True

def build_config(self, _charm_config: Dict[str, Any]) -> Dict[str, Any]:
def build_config(self, s3_config_data: Optional[dict]) -> Dict[str, Any]:
IbraAoad marked this conversation as resolved.
Show resolved Hide resolved
"""Generate shared config file for mimir.

Reference: https://grafana.com/docs/mimir/latest/configure/
"""
mimir_config: Dict[str, Any] = {
"common": {},
"alertmanager": {
"data_dir": str(self._root_data_dir / "data-alertmanager"),
"alertmanager": self._build_alertmanager_config(),
"alertmanager_storage": self._build_alertmanager_storage_config(),
"compactor": self._build_compactor_config(),
"ruler": self._build_ruler_config(),
"ruler_storage": self._build_ruler_storage_config(),
"blocks_storage": self._build_blocks_storage_config(),
IbraAoad marked this conversation as resolved.
Show resolved Hide resolved
"memberlist": self._build_memberlist_config(),
}

if s3_config_data:
mimir_config["common"]["storage"] = self._build_s3_storage_config(s3_config_data)
self._update_s3_storage_config(mimir_config["blocks_storage"], "blocks")
self._update_s3_storage_config(mimir_config["ruler_storage"], "rules")
self._update_s3_storage_config(mimir_config["alertmanager_storage"], "alerts")

if self._tls_requirer:
mimir_config.update(self._build_tls_config())

return mimir_config

# data_dir:
# The Mimir Alertmanager stores the alerts state on local disk at the location configured using -alertmanager.storage.path.
# Should be persisted if not replicated
def _build_alertmanager_config(self) -> Dict[str, Any]:
return {
"data_dir": str(self._root_data_dir / "data-alertmanager"),
}

# filesystem: dir
# The Mimir Alertmanager also periodically stores the alert state in the storage backend configured with -alertmanager-storage.backend (For Recovery)
def _build_alertmanager_storage_config(self) -> Dict[str, Any]:
return {
"filesystem": {
"dir": str(self._root_data_dir / "data-alertmanager-recovery"),
},
}

# data_dir:
# Directory to temporarily store blocks during compaction.
# This directory is not required to be persisted between restarts.
def _build_compactor_config(self) -> Dict[str, Any]:
return {
"data_dir": str(self._root_data_dir / "data-compactor"),
}

# rule_path:
# Directory to store temporary rule files loaded by the Prometheus rule managers.
# This directory is not required to be persisted between restarts.
def _build_ruler_config(self) -> Dict[str, Any]:
return {
"rule_path": str(self._root_data_dir / "data-ruler"),
}

# filesystem: dir
# Storage backend reads Prometheus recording rules from the local filesystem.
# The ruler looks for tenant rules in the self._root_data_dir/rules/<TENANT ID> directory. The ruler requires rule files to be in the Prometheus format.
def _build_ruler_storage_config(self) -> Dict[str, Any]:
return {
"filesystem": {
"dir": str(self._root_data_dir / "rules"),
},
"compactor": {
"data_dir": str(self._root_data_dir / "data-compactor"),
}

# bucket_store: sync_dir
# Directory to store synchronized TSDB index headers. This directory is not
# required to be persisted between restarts, but it's highly recommended

# filesystem: dir
# Mimir upload blocks (of metrics) to the object storage at period interval.

# tsdb: dir
# Directory to store TSDBs (including WAL) in the ingesters.
# This directory is required to be persisted between restarts.

# The TSDB dir is used by ingesters, while the filesystem: dir is the "object storage"
# Ingesters are expected to upload TSDB blocks to filesystem: dir every 2h.
def _build_blocks_storage_config(self) -> Dict[str, Any]:
return {
"bucket_store": {
"sync_dir": str(self._root_data_dir / "tsdb-sync"),
},
"filesystem": {
"dir": str(self._root_data_dir / "blocks"),
},
"blocks_storage": {
"bucket_store": {
"sync_dir": str(self._root_data_dir / "tsdb-sync"),
},
"tsdb": {
"dir": str(self._root_data_dir / "tsdb"),
},
}

if self._s3_requirer:
s3_config = self._s3_requirer.s3_config
mimir_config["common"]["storage"] = {
"backend": "s3",
"s3": {
"region": s3_config.region, # eg. 'us-west'
"bucket_name": s3_config.bucket_name, # eg: 'mimir'
},
}
mimir_config["blocks_storage"] = {
"s3": {"bucket_name": s3_config.blocks_bucket_name} # e.g. 'mimir-blocks'
}

# memberlist config for gossip and hash ring
mimir_config["memberlist"] = {
"join_members": list(self._cluster_provider.gather_addresses())
def _build_s3_storage_config(self, s3_config_data: dict) -> Dict[str, Any]:
IbraAoad marked this conversation as resolved.
Show resolved Hide resolved
return {
"backend": "s3",
"s3": s3_config_data,
IbraAoad marked this conversation as resolved.
Show resolved Hide resolved
}

# todo: TLS config for memberlist
if self._tls_requirer:
mimir_config["tls_enabled"] = True
mimir_config["tls_cert_path"] = self._tls_requirer.cacert
mimir_config["tls_key_path"] = self._tls_requirer.key
mimir_config["tls_ca_path"] = self._tls_requirer.capath
def _update_s3_storage_config(self, storage_config: Dict[str, Any], prefix_name: str) -> None:
"""Update S3 storage configuration in `storage_config`.

return mimir_config
If the key 'filesystem' is present in `storage_config`, remove it and add a new key
'storage_prefix' with the value of `prefix_name` for the S3 bucket.
"""
if "filesystem" in storage_config:
storage_config.pop("filesystem")
storage_config["storage_prefix"] = prefix_name

def _build_memberlist_config(self) -> Dict[str, Any]:
return {"join_members": list(self._cluster_provider.gather_addresses())}

def _build_tls_config(self) -> Dict[str, Any]:
return {
"tls_enabled": True,
"tls_cert_path": self._tls_requirer.cacert,
"tls_key_path": self._tls_requirer.key,
"tls_ca_path": self._tls_requirer.capath,
}
8 changes: 6 additions & 2 deletions tests/scenario/test_mimir_cluster_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,12 @@ def test_address_collection(workers_addresses):
i: MimirClusterRequirerUnitData(address=address, juju_topology=topo).dump()
for i, address in enumerate(worker_addresses)
}
relations.append(Relation("mimir-cluster-provide", remote_units_data=units_data))

roles_data = MimirClusterRequirerAppData(roles=[MimirRole.read, MimirRole.write]).dump()
relations.append(
Relation(
"mimir-cluster-provide", remote_app_data=roles_data, remote_units_data=units_data
)
)
# all unit addresses should show up
expected = set(chain(*workers_addresses))

Expand Down
Loading
Loading