From a0a2f08b6a29239c954aeadc348e8c32a8448f52 Mon Sep 17 00:00:00 2001 From: Ibrahim Awwad Date: Fri, 5 Jan 2024 19:03:42 +0200 Subject: [PATCH] Add S3 relation to mimir coordinator (#21) * init s3 configs * fix status, scenario test and linter * using pydantic for s3 storage config * Switching ti s3_integrator * Added unit tests for build_config * refactoring mimir_config members list * linter fix * logic refactor to address PR comments * refactoring pydantic validation * passing _S3ConfigData rather than its dictified version * fix static-lib --------- Co-authored-by: Luca Bello --- lib/charms/data_platform_libs/v0/s3.py | 768 +++++++++++++++++++++++++ metadata.yaml | 3 +- requirements.txt | 2 +- src/charm.py | 98 +++- src/mimir_config.py | 21 +- src/mimir_coordinator.py | 143 +++-- tests/unit/test_config.py | 126 ++++ 7 files changed, 1093 insertions(+), 68 deletions(-) create mode 100644 lib/charms/data_platform_libs/v0/s3.py create mode 100644 tests/unit/test_config.py diff --git a/lib/charms/data_platform_libs/v0/s3.py b/lib/charms/data_platform_libs/v0/s3.py new file mode 100644 index 0000000..7beb113 --- /dev/null +++ b/lib/charms/data_platform_libs/v0/s3.py @@ -0,0 +1,768 @@ +# Copyright 2023 Canonical Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +r"""A library for communicating with the S3 credentials providers and consumers. + +This library provides the relevant interface code implementing the communication +specification for fetching, retrieving, triggering, and responding to events related to +the S3 provider charm and its consumers. + +### Provider charm + +The provider is implemented in the `s3-provider` charm which is meant to be deployed +alongside one or more consumer charms. The provider charm is serving the s3 credentials and +metadata needed to communicate and work with an S3 compatible backend. + +Example: +```python + +from charms.data_platform_libs.v0.s3 import CredentialRequestedEvent, S3Provider + + +class ExampleProviderCharm(CharmBase): + def __init__(self, *args) -> None: + super().__init__(*args) + self.s3_provider = S3Provider(self, "s3-credentials") + + self.framework.observe(self.s3_provider.on.credentials_requested, + self._on_credential_requested) + + def _on_credential_requested(self, event: CredentialRequestedEvent): + if not self.unit.is_leader(): + return + + # get relation id + relation_id = event.relation.id + + # get bucket name + bucket = event.bucket + + # S3 configuration parameters + desired_configuration = {"access-key": "your-access-key", "secret-key": + "your-secret-key", "bucket": "your-bucket"} + + # update the configuration + self.s3_provider.update_connection_info(relation_id, desired_configuration) + + # or it is possible to set each field independently + + self.s3_provider.set_secret_key(relation_id, "your-secret-key") + + +if __name__ == "__main__": + main(ExampleProviderCharm) + + +### Requirer charm + +The requirer charm is the charm requiring the S3 credentials. +An example of requirer charm is the following: + +Example: +```python + +from charms.data_platform_libs.v0.s3 import ( + CredentialsChangedEvent, + CredentialsGoneEvent, + S3Requirer +) + +class ExampleRequirerCharm(CharmBase): + + def __init__(self, *args): + super().__init__(*args) + + bucket_name = "test-bucket" + # if bucket name is not provided the bucket name will be generated + # e.g., ('relation-{relation.id}') + + self.s3_client = S3Requirer(self, "s3-credentials", bucket_name) + + self.framework.observe(self.s3_client.on.credentials_changed, self._on_credential_changed) + self.framework.observe(self.s3_client.on.credentials_gone, self._on_credential_gone) + + def _on_credential_changed(self, event: CredentialsChangedEvent): + + # access single parameter credential + secret_key = event.secret_key + access_key = event.access_key + + # or as alternative all credentials can be collected as a dictionary + credentials = self.s3_client.get_s3_credentials() + + def _on_credential_gone(self, event: CredentialsGoneEvent): + # credentials are removed + pass + + if __name__ == "__main__": + main(ExampleRequirerCharm) +``` + +""" +import json +import logging +from collections import namedtuple +from typing import Dict, List, Optional, Union + +import ops.charm +import ops.framework +import ops.model +from ops.charm import ( + CharmBase, + CharmEvents, + RelationBrokenEvent, + RelationChangedEvent, + RelationEvent, + RelationJoinedEvent, +) +from ops.framework import EventSource, Object, ObjectEvents +from ops.model import Application, Relation, RelationDataContent, Unit + +# The unique Charmhub library identifier, never change it +LIBID = "fca396f6254246c9bfa565b1f85ab528" + +# Increment this major API version when introducing breaking changes +LIBAPI = 0 + +# Increment this PATCH version before using `charmcraft publish-lib` or reset +# to 0 if you are raising the major API version +LIBPATCH = 4 + +logger = logging.getLogger(__name__) + +Diff = namedtuple("Diff", "added changed deleted") +Diff.__doc__ = """ +A tuple for storing the diff between two data mappings. + +added - keys that were added +changed - keys that still exist but have new values +deleted - key that were deleted""" + + +def diff(event: RelationChangedEvent, bucket: Union[Unit, Application]) -> Diff: + """Retrieves the diff of the data in the relation changed databag. + + Args: + event: relation changed event. + bucket: bucket of the databag (app or unit) + + Returns: + a Diff instance containing the added, deleted and changed + keys from the event relation databag. + """ + # Retrieve the old data from the data key in the application relation databag. + old_data = json.loads(event.relation.data[bucket].get("data", "{}")) + # Retrieve the new data from the event relation databag. + new_data = ( + {key: value for key, value in event.relation.data[event.app].items() if key != "data"} + if event.app + else {} + ) + + # These are the keys that were added to the databag and triggered this event. + added = new_data.keys() - old_data.keys() + # These are the keys that were removed from the databag and triggered this event. + deleted = old_data.keys() - new_data.keys() + # These are the keys that already existed in the databag, + # but had their values changed. + changed = {key for key in old_data.keys() & new_data.keys() if old_data[key] != new_data[key]} + + # TODO: evaluate the possibility of losing the diff if some error + # happens in the charm before the diff is completely checked (DPE-412). + # Convert the new_data to a serializable format and save it for a next diff check. + event.relation.data[bucket].update({"data": json.dumps(new_data)}) + + # Return the diff with all possible changes. + return Diff(added, changed, deleted) + + +class BucketEvent(RelationEvent): + """Base class for bucket events.""" + + @property + def bucket(self) -> Optional[str]: + """Returns the bucket was requested.""" + if not self.relation.app: + return None + + return self.relation.data[self.relation.app].get("bucket", "") + + +class CredentialRequestedEvent(BucketEvent): + """Event emitted when a set of credential is requested for use on this relation.""" + + +class S3CredentialEvents(CharmEvents): + """Event descriptor for events raised by S3Provider.""" + + credentials_requested = EventSource(CredentialRequestedEvent) + + +class S3Provider(Object): + """A provider handler for communicating S3 credentials to consumers.""" + + on = S3CredentialEvents() # pyright: ignore [reportGeneralTypeIssues] + + def __init__( + self, + charm: CharmBase, + relation_name: str, + ): + super().__init__(charm, relation_name) + self.charm = charm + self.local_app = self.charm.model.app + self.local_unit = self.charm.unit + self.relation_name = relation_name + + # monitor relation changed event for changes in the credentials + self.framework.observe(charm.on[relation_name].relation_changed, self._on_relation_changed) + + def _on_relation_changed(self, event: RelationChangedEvent) -> None: + """React to the relation changed event by consuming data.""" + if not self.charm.unit.is_leader(): + return + diff = self._diff(event) + # emit on credential requested if bucket is provided by the requirer application + if "bucket" in diff.added: + getattr(self.on, "credentials_requested").emit( + event.relation, app=event.app, unit=event.unit + ) + + def _load_relation_data(self, raw_relation_data: dict) -> dict: + """Loads relation data from the relation data bag. + + Args: + raw_relation_data: Relation data from the databag + Returns: + dict: Relation data in dict format. + """ + connection_data = {} + for key in raw_relation_data: + try: + connection_data[key] = json.loads(raw_relation_data[key]) + except (json.decoder.JSONDecodeError, TypeError): + connection_data[key] = raw_relation_data[key] + return connection_data + + # def _diff(self, event: RelationChangedEvent) -> Diff: + # """Retrieves the diff of the data in the relation changed databag. + + # Args: + # event: relation changed event. + + # Returns: + # a Diff instance containing the added, deleted and changed + # keys from the event relation databag. + # """ + # # Retrieve the old data from the data key in the application relation databag. + # old_data = json.loads(event.relation.data[self.local_app].get("data", "{}")) + # # Retrieve the new data from the event relation databag. + # new_data = { + # key: value for key, value in event.relation.data[event.app].items() if key != "data" + # } + + # # These are the keys that were added to the databag and triggered this event. + # added = new_data.keys() - old_data.keys() + # # These are the keys that were removed from the databag and triggered this event. + # deleted = old_data.keys() - new_data.keys() + # # These are the keys that already existed in the databag, + # # but had their values changed. + # changed = { + # key for key in old_data.keys() & new_data.keys() if old_data[key] != new_data[key] + # } + + # # TODO: evaluate the possibility of losing the diff if some error + # # happens in the charm before the diff is completely checked (DPE-412). + # # Convert the new_data to a serializable format and save it for a next diff check. + # event.relation.data[self.local_app].update({"data": json.dumps(new_data)}) + + # # Return the diff with all possible changes. + # return Diff(added, changed, deleted) + + def _diff(self, event: RelationChangedEvent) -> Diff: + """Retrieves the diff of the data in the relation changed databag. + + Args: + event: relation changed event. + + Returns: + a Diff instance containing the added, deleted and changed + keys from the event relation databag. + """ + return diff(event, self.local_app) + + def fetch_relation_data(self) -> dict: + """Retrieves data from relation. + + This function can be used to retrieve data from a relation + in the charm code when outside an event callback. + + Returns: + a dict of the values stored in the relation data bag + for all relation instances (indexed by the relation id). + """ + data = {} + for relation in self.relations: + data[relation.id] = ( + {key: value for key, value in relation.data[relation.app].items() if key != "data"} + if relation.app + else {} + ) + return data + + def update_connection_info(self, relation_id: int, connection_data: dict) -> None: + """Updates the credential data as set of key-value pairs in the relation. + + This function writes in the application data bag, therefore, + only the leader unit can call it. + + Args: + relation_id: the identifier for a particular relation. + connection_data: dict containing the key-value pairs + that should be updated. + """ + # check and write changes only if you are the leader + if not self.local_unit.is_leader(): + return + + relation = self.charm.model.get_relation(self.relation_name, relation_id) + + if not relation: + return + + # configuration options that are list + s3_list_options = ["attributes", "tls-ca-chain"] + + # update the databag, if connection data did not change with respect to before + # the relation changed event is not triggered + updated_connection_data = {} + for configuration_option, configuration_value in connection_data.items(): + if configuration_option in s3_list_options: + updated_connection_data[configuration_option] = json.dumps(configuration_value) + else: + updated_connection_data[configuration_option] = configuration_value + + relation.data[self.local_app].update(updated_connection_data) + logger.debug(f"Updated S3 connection info: {updated_connection_data}") + + @property + def relations(self) -> List[Relation]: + """The list of Relation instances associated with this relation_name.""" + return list(self.charm.model.relations[self.relation_name]) + + def set_bucket(self, relation_id: int, bucket: str) -> None: + """Sets bucket name in application databag. + + This function writes in the application data bag, therefore, + only the leader unit can call it. + + Args: + relation_id: the identifier for a particular relation. + bucket: the bucket name. + """ + self.update_connection_info(relation_id, {"bucket": bucket}) + + def set_access_key(self, relation_id: int, access_key: str) -> None: + """Sets access-key value in application databag. + + This function writes in the application data bag, therefore, + only the leader unit can call it. + + Args: + relation_id: the identifier for a particular relation. + access_key: the access-key value. + """ + self.update_connection_info(relation_id, {"access-key": access_key}) + + def set_secret_key(self, relation_id: int, secret_key: str) -> None: + """Sets the secret key value in application databag. + + This function writes in the application data bag, therefore, + only the leader unit can call it. + + Args: + relation_id: the identifier for a particular relation. + secret_key: the value of the secret key. + """ + self.update_connection_info(relation_id, {"secret-key": secret_key}) + + def set_path(self, relation_id: int, path: str) -> None: + """Sets the path value in application databag. + + This function writes in the application data bag, therefore, + only the leader unit can call it. + + Args: + relation_id: the identifier for a particular relation. + path: the path value. + """ + self.update_connection_info(relation_id, {"path": path}) + + def set_endpoint(self, relation_id: int, endpoint: str) -> None: + """Sets the endpoint address in application databag. + + This function writes in the application data bag, therefore, + only the leader unit can call it. + + Args: + relation_id: the identifier for a particular relation. + endpoint: the endpoint address. + """ + self.update_connection_info(relation_id, {"endpoint": endpoint}) + + def set_region(self, relation_id: int, region: str) -> None: + """Sets the region location in application databag. + + This function writes in the application data bag, therefore, + only the leader unit can call it. + + Args: + relation_id: the identifier for a particular relation. + region: the region address. + """ + self.update_connection_info(relation_id, {"region": region}) + + def set_s3_uri_style(self, relation_id: int, s3_uri_style: str) -> None: + """Sets the S3 URI style in application databag. + + This function writes in the application data bag, therefore, + only the leader unit can call it. + + Args: + relation_id: the identifier for a particular relation. + s3_uri_style: the s3 URI style. + """ + self.update_connection_info(relation_id, {"s3-uri-style": s3_uri_style}) + + def set_storage_class(self, relation_id: int, storage_class: str) -> None: + """Sets the storage class in application databag. + + This function writes in the application data bag, therefore, + only the leader unit can call it. + + Args: + relation_id: the identifier for a particular relation. + storage_class: the storage class. + """ + self.update_connection_info(relation_id, {"storage-class": storage_class}) + + def set_tls_ca_chain(self, relation_id: int, tls_ca_chain: List[str]) -> None: + """Sets the tls_ca_chain value in application databag. + + This function writes in the application data bag, therefore, + only the leader unit can call it. + + Args: + relation_id: the identifier for a particular relation. + tls_ca_chain: the TLS Chain value. + """ + self.update_connection_info(relation_id, {"tls-ca-chain": tls_ca_chain}) + + def set_s3_api_version(self, relation_id: int, s3_api_version: str) -> None: + """Sets the S3 API version in application databag. + + This function writes in the application data bag, therefore, + only the leader unit can call it. + + Args: + relation_id: the identifier for a particular relation. + s3_api_version: the S3 version value. + """ + self.update_connection_info(relation_id, {"s3-api-version": s3_api_version}) + + def set_attributes(self, relation_id: int, attributes: List[str]) -> None: + """Sets the connection attributes in application databag. + + This function writes in the application data bag, therefore, + only the leader unit can call it. + + Args: + relation_id: the identifier for a particular relation. + attributes: the attributes value. + """ + self.update_connection_info(relation_id, {"attributes": attributes}) + + +class S3Event(RelationEvent): + """Base class for S3 storage events.""" + + @property + def bucket(self) -> Optional[str]: + """Returns the bucket name.""" + if not self.relation.app: + return None + + return self.relation.data[self.relation.app].get("bucket") + + @property + def access_key(self) -> Optional[str]: + """Returns the access key.""" + if not self.relation.app: + return None + + return self.relation.data[self.relation.app].get("access-key") + + @property + def secret_key(self) -> Optional[str]: + """Returns the secret key.""" + if not self.relation.app: + return None + + return self.relation.data[self.relation.app].get("secret-key") + + @property + def path(self) -> Optional[str]: + """Returns the path where data can be stored.""" + if not self.relation.app: + return None + + return self.relation.data[self.relation.app].get("path") + + @property + def endpoint(self) -> Optional[str]: + """Returns the endpoint address.""" + if not self.relation.app: + return None + + return self.relation.data[self.relation.app].get("endpoint") + + @property + def region(self) -> Optional[str]: + """Returns the region.""" + if not self.relation.app: + return None + + return self.relation.data[self.relation.app].get("region") + + @property + def s3_uri_style(self) -> Optional[str]: + """Returns the s3 uri style.""" + if not self.relation.app: + return None + + return self.relation.data[self.relation.app].get("s3-uri-style") + + @property + def storage_class(self) -> Optional[str]: + """Returns the storage class name.""" + if not self.relation.app: + return None + + return self.relation.data[self.relation.app].get("storage-class") + + @property + def tls_ca_chain(self) -> Optional[List[str]]: + """Returns the TLS CA chain.""" + if not self.relation.app: + return None + + tls_ca_chain = self.relation.data[self.relation.app].get("tls-ca-chain") + if tls_ca_chain is not None: + return json.loads(tls_ca_chain) + return None + + @property + def s3_api_version(self) -> Optional[str]: + """Returns the S3 API version.""" + if not self.relation.app: + return None + + return self.relation.data[self.relation.app].get("s3-api-version") + + @property + def attributes(self) -> Optional[List[str]]: + """Returns the attributes.""" + if not self.relation.app: + return None + + attributes = self.relation.data[self.relation.app].get("attributes") + if attributes is not None: + return json.loads(attributes) + return None + + +class CredentialsChangedEvent(S3Event): + """Event emitted when S3 credential are changed on this relation.""" + + +class CredentialsGoneEvent(RelationEvent): + """Event emitted when S3 credential are removed from this relation.""" + + +class S3CredentialRequiresEvents(ObjectEvents): + """Event descriptor for events raised by the S3Provider.""" + + credentials_changed = EventSource(CredentialsChangedEvent) + credentials_gone = EventSource(CredentialsGoneEvent) + + +S3_REQUIRED_OPTIONS = ["access-key", "secret-key"] + + +class S3Requirer(Object): + """Requires-side of the s3 relation.""" + + on = S3CredentialRequiresEvents() # pyright: ignore[reportGeneralTypeIssues] + + def __init__( + self, charm: ops.charm.CharmBase, relation_name: str, bucket_name: Optional[str] = None + ): + """Manager of the s3 client relations.""" + super().__init__(charm, relation_name) + + self.relation_name = relation_name + self.charm = charm + self.local_app = self.charm.model.app + self.local_unit = self.charm.unit + self.bucket = bucket_name + + self.framework.observe( + self.charm.on[self.relation_name].relation_changed, self._on_relation_changed + ) + + self.framework.observe( + self.charm.on[self.relation_name].relation_joined, self._on_relation_joined + ) + + self.framework.observe( + self.charm.on[self.relation_name].relation_broken, + self._on_relation_broken, + ) + + def _generate_bucket_name(self, event: RelationJoinedEvent): + """Returns the bucket name generated from relation id.""" + return f"relation-{event.relation.id}" + + def _on_relation_joined(self, event: RelationJoinedEvent) -> None: + """Event emitted when the application joins the s3 relation.""" + if self.bucket is None: + self.bucket = self._generate_bucket_name(event) + self.update_connection_info(event.relation.id, {"bucket": self.bucket}) + + def fetch_relation_data(self) -> dict: + """Retrieves data from relation. + + This function can be used to retrieve data from a relation + in the charm code when outside an event callback. + + Returns: + a dict of the values stored in the relation data bag + for all relation instances (indexed by the relation id). + """ + data = {} + + for relation in self.relations: + data[relation.id] = self._load_relation_data(relation.data[self.charm.app]) + return data + + def update_connection_info(self, relation_id: int, connection_data: dict) -> None: + """Updates the credential data as set of key-value pairs in the relation. + + This function writes in the application data bag, therefore, + only the leader unit can call it. + + Args: + relation_id: the identifier for a particular relation. + connection_data: dict containing the key-value pairs + that should be updated. + """ + # check and write changes only if you are the leader + if not self.local_unit.is_leader(): + return + + relation = self.charm.model.get_relation(self.relation_name, relation_id) + + if not relation: + return + + # update the databag, if connection data did not change with respect to before + # the relation changed event is not triggered + # configuration options that are list + s3_list_options = ["attributes", "tls-ca-chain"] + updated_connection_data = {} + for configuration_option, configuration_value in connection_data.items(): + if configuration_option in s3_list_options: + updated_connection_data[configuration_option] = json.dumps(configuration_value) + else: + updated_connection_data[configuration_option] = configuration_value + + relation.data[self.local_app].update(updated_connection_data) + logger.debug(f"Updated S3 credentials: {updated_connection_data}") + + def _load_relation_data(self, raw_relation_data: RelationDataContent) -> Dict[str, str]: + """Loads relation data from the relation data bag. + + Args: + raw_relation_data: Relation data from the databag + Returns: + dict: Relation data in dict format. + """ + connection_data = {} + for key in raw_relation_data: + try: + connection_data[key] = json.loads(raw_relation_data[key]) + except (json.decoder.JSONDecodeError, TypeError): + connection_data[key] = raw_relation_data[key] + return connection_data + + def _diff(self, event: RelationChangedEvent) -> Diff: + """Retrieves the diff of the data in the relation changed databag. + + Args: + event: relation changed event. + + Returns: + a Diff instance containing the added, deleted and changed + keys from the event relation databag. + """ + return diff(event, self.local_unit) + + def _on_relation_changed(self, event: RelationChangedEvent) -> None: + """Notify the charm about the presence of S3 credentials.""" + # check if the mandatory options are in the relation data + contains_required_options = True + # get current credentials data + credentials = self.get_s3_connection_info() + # records missing options + missing_options = [] + for configuration_option in S3_REQUIRED_OPTIONS: + if configuration_option not in credentials: + contains_required_options = False + missing_options.append(configuration_option) + # emit credential change event only if all mandatory fields are present + if contains_required_options: + getattr(self.on, "credentials_changed").emit( + event.relation, app=event.app, unit=event.unit + ) + else: + logger.warning( + f"Some mandatory fields: {missing_options} are not present, do not emit credential change event!" + ) + + def get_s3_connection_info(self) -> Dict[str, str]: + """Return the s3 credentials as a dictionary.""" + for relation in self.relations: + if relation and relation.app: + return self._load_relation_data(relation.data[relation.app]) + + return {} + + def _on_relation_broken(self, event: RelationBrokenEvent) -> None: + """Notify the charm about a broken S3 credential store relation.""" + getattr(self.on, "credentials_gone").emit(event.relation, app=event.app, unit=event.unit) + + @property + def relations(self) -> List[Relation]: + """The list of Relation instances associated with this relation_name.""" + return list(self.charm.model.relations[self.relation_name]) diff --git a/metadata.yaml b/metadata.yaml index 926d431..0eaba3d 100644 --- a/metadata.yaml +++ b/metadata.yaml @@ -32,8 +32,7 @@ requires: interface: s3 limit: 1 description: | - The coordinator obtains storage info on behalf of the workers, and - forwards all workers the storage details over mimir-worker. + The coordinator obtains and shares storage details with workers, enabling Mimir's access to an S3 bucket for data storage. send-remote-write: interface: prometheus_remote_write diff --git a/requirements.txt b/requirements.txt index 5b56e23..f57ce95 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ ops pydantic # crossplane is a package from nginxinc to interact with the Nginx config -crossplane +crossplane \ No newline at end of file diff --git a/src/charm.py b/src/charm.py index 5acfc42..c20fcd9 100755 --- a/src/charm.py +++ b/src/charm.py @@ -10,17 +10,22 @@ https://discourse.charmhub.io/t/4208 """ import logging -from typing import List +from typing import List, Optional import ops +from charms.data_platform_libs.v0.s3 import ( + S3Requirer, +) from charms.grafana_k8s.v0.grafana_dashboard import GrafanaDashboardProvider from charms.loki_k8s.v0.loki_push_api import LokiPushApiConsumer from charms.mimir_coordinator_k8s.v0.mimir_cluster import MimirClusterProvider from charms.prometheus_k8s.v0.prometheus_remote_write import ( PrometheusRemoteWriteConsumer, ) +from mimir_config import BUCKET_NAME, S3_RELATION_NAME, _S3ConfigData from mimir_coordinator import MimirCoordinator from nginx import Nginx +from pydantic import ValidationError # Log messages can be retrieved using juju debug-log logger = logging.getLogger(__name__) @@ -36,11 +41,11 @@ def __init__(self, framework: ops.Framework): self.framework.observe(self.on.config_changed, self._on_config_changed) self.framework.observe(self.on.collect_unit_status, self._on_collect_status) - # TODO: On any worker relation-joined/departed, need to updade grafana agent's scrape # targets with the new memberlist. # (Remote write would still be the same nginx-proxied endpoint.) + self.s3_requirer = S3Requirer(self, S3_RELATION_NAME, BUCKET_NAME) self.cluster_provider = MimirClusterProvider(self) self.coordinator = MimirCoordinator(cluster_provider=self.cluster_provider) @@ -55,6 +60,13 @@ def __init__(self, framework: ops.Framework): self._on_mimir_cluster_changed, ) + self.framework.observe( + self.s3_requirer.on.credentials_changed, self._on_s3_requirer_credentials_changed + ) + self.framework.observe( + self.s3_requirer.on.credentials_gone, self._on_s3_requirer_credentials_gone + ) + self.remote_write_consumer = PrometheusRemoteWriteConsumer(self) self.framework.observe( self.remote_write_consumer.on.endpoints_changed, # pyright: ignore @@ -75,45 +87,81 @@ def __init__(self, framework: ops.Framework): self._on_loki_relation_changed, ) - @property - def _s3_storage(self) -> dict: # type: ignore - # if not self.model.relations['s3']: - # return {} - return { - "url": "foo", - "endpoint": "bar", - "access_key": "bar", - "insecure": False, - "secret_key": "x12", - } - @property def mimir_worker_relations(self) -> List[ops.Relation]: """Returns the list of worker relations.""" return self.model.relations.get("mimir_worker", []) - def _on_config_changed(self, _event: ops.ConfigChangedEvent): + def has_multiple_workers(self) -> bool: + """Return True if there are multiple workers forming the Mimir cluster.""" + mimir_cluster_relations = self.model.relations.get("mimir-cluster", []) + remote_units_count = sum( + len(relation.units) + for relation in mimir_cluster_relations + if relation.app != self.model.app + ) + return remote_units_count > 1 + + def _on_config_changed(self, __: ops.ConfigChangedEvent): """Handle changed configuration.""" - self.publish_config() + s3_config_data = self._get_s3_storage_config() + self.publish_config(s3_config_data) + + def _on_mimir_cluster_changed(self, _): + self._process_cluster_and_s3_credentials_changes() + + def _on_s3_requirer_credentials_changed(self, _): + self._process_cluster_and_s3_credentials_changes() - def publish_config(self): + def _process_cluster_and_s3_credentials_changes(self): + if not self.coordinator.is_coherent(): + logger.warning("Incoherent deployment: Some required Mimir roles are missing.") + return + s3_config_data = self._get_s3_storage_config() + if not s3_config_data and self.has_multiple_workers(): + logger.warning("Filesystem storage cannot be used with replicated mimir workers") + return + self.publish_config(s3_config_data) + + def _on_s3_requirer_credentials_gone(self, _): + if not self.coordinator.is_coherent(): + logger.warning("Incoherent deployment: Some required Mimir roles are missing.") + return + if self.has_multiple_workers(): + logger.warning("Filesystem storage cannot be used with replicated mimir workers") + return + self.publish_config(None) + + def publish_config(self, s3_config_data: Optional[_S3ConfigData]): """Generate config file and publish to all workers.""" - mimir_config = self.coordinator.build_config(dict(self.config)) + mimir_config = self.coordinator.build_config(s3_config_data) self.cluster_provider.publish_configs(mimir_config) - def _on_mimir_cluster_changed(self, _event: ops.RelationChangedEvent): - if self.coordinator.is_coherent(): - logger.info("mimir deployment coherent: publishing configs") - self.publish_config() - else: - logger.warning("this mimir deployment is incoherent") + def _get_s3_storage_config(self): + """Retrieve S3 storage configuration.""" + if not self.s3_requirer.relations: + return None + raw = self.s3_requirer.get_s3_connection_info() + try: + return _S3ConfigData(**raw) + except ValidationError: + msg = f"failed to validate s3 config data: {raw}" + logger.error(msg, exc_info=True) + return None def _on_collect_status(self, event: ops.CollectStatusEvent): """Handle start event.""" if not self.coordinator.is_coherent(): event.add_status( ops.BlockedStatus( - "Incoherent deployment: you are " "lacking some required Mimir roles" + "Incoherent deployment: you are lacking some required Mimir roles" + ) + ) + s3_config_data = self._get_s3_storage_config() + if not s3_config_data and self.has_multiple_workers(): + event.add_status( + ops.BlockedStatus( + "When multiple units of Mimir are deployed, you must add a valid S3 relation. S3 relation missing/invalid." ) ) diff --git a/src/mimir_config.py b/src/mimir_config.py index ae7b045..5bcf957 100644 --- a/src/mimir_config.py +++ b/src/mimir_config.py @@ -3,12 +3,18 @@ """Helper module for interacting with the Mimir configuration.""" +import logging from dataclasses import asdict from typing import List, Literal, Optional, Union -from pydantic import BaseModel +from pydantic import BaseModel, Field from pydantic.dataclasses import dataclass as pydantic_dataclass +S3_RELATION_NAME = "s3" +BUCKET_NAME = "mimir" + +logger = logging.getLogger(__name__) + class InvalidConfigurationError(Exception): """Invalid configuration.""" @@ -83,19 +89,20 @@ class Alertmanager(BaseModel): external_url: Optional[str] -class _S3StorageBackend(BaseModel): +class _S3ConfigData(BaseModel): + model_config = {"populate_by_name": True} + access_key_id: str = Field(alias="access-key") endpoint: str - access_key_id: str - secret_access_key: str - insecure: bool = False - signature_version: str = "v4" + secret_access_key: str = Field(alias="secret-key") + bucket_name: str = Field(alias="bucket") + region: str = "" class _FilesystemStorageBackend(BaseModel): dir: str -_StorageBackend = Union[_S3StorageBackend, _FilesystemStorageBackend] +_StorageBackend = Union[_S3ConfigData, _FilesystemStorageBackend] _StorageKey = Union[Literal["filesystem"], Literal["s3"]] diff --git a/src/mimir_coordinator.py b/src/mimir_coordinator.py index 65c1223..e2214b0 100644 --- a/src/mimir_coordinator.py +++ b/src/mimir_coordinator.py @@ -7,9 +7,10 @@ import logging from collections import Counter from pathlib import Path -from typing import Any, Dict, Iterable +from typing import Any, Dict, Iterable, Optional from charms.mimir_coordinator_k8s.v0.mimir_cluster import MimirClusterProvider, MimirRole +from mimir_config import _S3ConfigData logger = logging.getLogger(__name__) @@ -81,49 +82,125 @@ def is_recommended(self) -> bool: return False return True - def build_config(self, _charm_config: Dict[str, Any]) -> Dict[str, Any]: + def build_config(self, s3_config_data: Optional[_S3ConfigData]) -> Dict[str, Any]: """Generate shared config file for mimir. Reference: https://grafana.com/docs/mimir/latest/configure/ """ mimir_config: Dict[str, Any] = { "common": {}, - "alertmanager": { - "data_dir": str(self._root_data_dir / "data-alertmanager"), + "alertmanager": self._build_alertmanager_config(), + "alertmanager_storage": self._build_alertmanager_storage_config(), + "compactor": self._build_compactor_config(), + "ruler": self._build_ruler_config(), + "ruler_storage": self._build_ruler_storage_config(), + "blocks_storage": self._build_blocks_storage_config(), + "memberlist": self._build_memberlist_config(), + } + + if s3_config_data: + mimir_config["common"]["storage"] = self._build_s3_storage_config(s3_config_data) + self._update_s3_storage_config(mimir_config["blocks_storage"], "blocks") + self._update_s3_storage_config(mimir_config["ruler_storage"], "rules") + self._update_s3_storage_config(mimir_config["alertmanager_storage"], "alerts") + + if self._tls_requirer: + mimir_config.update(self._build_tls_config()) + + return mimir_config + + # data_dir: + # The Mimir Alertmanager stores the alerts state on local disk at the location configured using -alertmanager.storage.path. + # Should be persisted if not replicated + def _build_alertmanager_config(self) -> Dict[str, Any]: + return { + "data_dir": str(self._root_data_dir / "data-alertmanager"), + } + + # filesystem: dir + # The Mimir Alertmanager also periodically stores the alert state in the storage backend configured with -alertmanager-storage.backend (For Recovery) + def _build_alertmanager_storage_config(self) -> Dict[str, Any]: + return { + "filesystem": { + "dir": str(self._root_data_dir / "data-alertmanager-recovery"), + }, + } + + # data_dir: + # Directory to temporarily store blocks during compaction. + # This directory is not required to be persisted between restarts. + def _build_compactor_config(self) -> Dict[str, Any]: + return { + "data_dir": str(self._root_data_dir / "data-compactor"), + } + + # rule_path: + # Directory to store temporary rule files loaded by the Prometheus rule managers. + # This directory is not required to be persisted between restarts. + def _build_ruler_config(self) -> Dict[str, Any]: + return { + "rule_path": str(self._root_data_dir / "data-ruler"), + } + + # filesystem: dir + # Storage backend reads Prometheus recording rules from the local filesystem. + # The ruler looks for tenant rules in the self._root_data_dir/rules/ directory. The ruler requires rule files to be in the Prometheus format. + def _build_ruler_storage_config(self) -> Dict[str, Any]: + return { + "filesystem": { + "dir": str(self._root_data_dir / "rules"), }, - "compactor": { - "data_dir": str(self._root_data_dir / "data-compactor"), + } + + # bucket_store: sync_dir + # Directory to store synchronized TSDB index headers. This directory is not + # required to be persisted between restarts, but it's highly recommended + + # filesystem: dir + # Mimir upload blocks (of metrics) to the object storage at period interval. + + # tsdb: dir + # Directory to store TSDBs (including WAL) in the ingesters. + # This directory is required to be persisted between restarts. + + # The TSDB dir is used by ingesters, while the filesystem: dir is the "object storage" + # Ingesters are expected to upload TSDB blocks to filesystem: dir every 2h. + def _build_blocks_storage_config(self) -> Dict[str, Any]: + return { + "bucket_store": { + "sync_dir": str(self._root_data_dir / "tsdb-sync"), + }, + "filesystem": { + "dir": str(self._root_data_dir / "blocks"), }, - "blocks_storage": { - "bucket_store": { - "sync_dir": str(self._root_data_dir / "tsdb-sync"), - }, + "tsdb": { + "dir": str(self._root_data_dir / "tsdb"), }, } - if self._s3_requirer: - s3_config = self._s3_requirer.s3_config - mimir_config["common"]["storage"] = { - "backend": "s3", - "s3": { - "region": s3_config.region, # eg. 'us-west' - "bucket_name": s3_config.bucket_name, # eg: 'mimir' - }, - } - mimir_config["blocks_storage"] = { - "s3": {"bucket_name": s3_config.blocks_bucket_name} # e.g. 'mimir-blocks' - } - - # memberlist config for gossip and hash ring - mimir_config["memberlist"] = { - "join_members": list(self._cluster_provider.gather_addresses()) + def _build_s3_storage_config(self, s3_config_data: _S3ConfigData) -> Dict[str, Any]: + return { + "backend": "s3", + "s3": s3_config_data.model_dump(), } - # todo: TLS config for memberlist - if self._tls_requirer: - mimir_config["tls_enabled"] = True - mimir_config["tls_cert_path"] = self._tls_requirer.cacert - mimir_config["tls_key_path"] = self._tls_requirer.key - mimir_config["tls_ca_path"] = self._tls_requirer.capath + def _update_s3_storage_config(self, storage_config: Dict[str, Any], prefix_name: str) -> None: + """Update S3 storage configuration in `storage_config`. - return mimir_config + If the key 'filesystem' is present in `storage_config`, remove it and add a new key + 'storage_prefix' with the value of `prefix_name` for the S3 bucket. + """ + if "filesystem" in storage_config: + storage_config.pop("filesystem") + storage_config["storage_prefix"] = prefix_name + + def _build_memberlist_config(self) -> Dict[str, Any]: + return {"join_members": list(self._cluster_provider.gather_addresses())} + + def _build_tls_config(self) -> Dict[str, Any]: + return { + "tls_enabled": True, + "tls_cert_path": self._tls_requirer.cacert, + "tls_key_path": self._tls_requirer.key, + "tls_ca_path": self._tls_requirer.capath, + } diff --git a/tests/unit/test_config.py b/tests/unit/test_config.py new file mode 100644 index 0000000..a8df5a2 --- /dev/null +++ b/tests/unit/test_config.py @@ -0,0 +1,126 @@ +import unittest +from unittest.mock import MagicMock + +from mimir_config import _S3ConfigData +from mimir_coordinator import MimirCoordinator + + +class TestMimirConfig(unittest.TestCase): + def setUp(self): + self.cluster_provider = MagicMock() + self.tls_requirer = MagicMock() + self.coordinator = MimirCoordinator( + cluster_provider=self.cluster_provider, + tls_requirer=self.tls_requirer, + ) + + def test_build_alertmanager_config(self): + alertmanager_config = self.coordinator._build_alertmanager_config() + expected_config = {"data_dir": "/etc/mimir/data-alertmanager"} + self.assertEqual(alertmanager_config, expected_config) + + def test_build_alertmanager_storage_config(self): + alertmanager_storage_config = self.coordinator._build_alertmanager_storage_config() + expected_config = {"filesystem": {"dir": "/etc/mimir/data-alertmanager-recovery"}} + self.assertEqual(alertmanager_storage_config, expected_config) + + def test_build_compactor_config(self): + compactor_config = self.coordinator._build_compactor_config() + expected_config = {"data_dir": "/etc/mimir/data-compactor"} + self.assertEqual(compactor_config, expected_config) + + def test_build_ruler_config(self): + ruler_config = self.coordinator._build_ruler_config() + expected_config = {"rule_path": "/etc/mimir/data-ruler"} + self.assertEqual(ruler_config, expected_config) + + def test_build_ruler_storage_config(self): + ruler_storage_config = self.coordinator._build_ruler_storage_config() + expected_config = {"filesystem": {"dir": "/etc/mimir/rules"}} + self.assertEqual(ruler_storage_config, expected_config) + + def test_build_blocks_storage_config(self): + blocks_storage_config = self.coordinator._build_blocks_storage_config() + expected_config = { + "bucket_store": {"sync_dir": "/etc/mimir/tsdb-sync"}, + "filesystem": {"dir": "/etc/mimir/blocks"}, + "tsdb": {"dir": "/etc/mimir/tsdb"}, + } + self.assertEqual(blocks_storage_config, expected_config) + + def test_build_config_with_s3_data(self): + raw_s3_config_data = { + "endpoint": "s3.com:port", + "access-key": "your_access_key", + "secret-key": "your_secret_key", + "bucket": "your_bucket", + "region": "your_region", + } + s3_config_data = _S3ConfigData(**raw_s3_config_data) + mimir_config = self.coordinator.build_config(s3_config_data) + self.assertEqual( + mimir_config["common"]["storage"], + self.coordinator._build_s3_storage_config(s3_config_data), + ) + + def test_build_config_without_s3_data(self): + s3_config_data = None + mimir_config = self.coordinator.build_config(s3_config_data) + self.assertNotIn("storage", mimir_config["common"]) + + def test_build_s3_storage_config(self): + raw_s3_config_data = { + "endpoint": "s3.com:port", + "access-key": "your_access_key", + "secret-key": "your_secret_key", + "bucket": "your_bucket", + "region": "your_region", + } + s3_config_data = _S3ConfigData(**raw_s3_config_data) + s3_storage_config = self.coordinator._build_s3_storage_config(s3_config_data) + expected_config = { + "backend": "s3", + "s3": { + "endpoint": "s3.com:port", + "access_key_id": "your_access_key", + "secret_access_key": "your_secret_key", + "bucket_name": "your_bucket", + "region": "your_region", + }, + } + self.assertEqual(s3_storage_config, expected_config) + + def test_update_s3_storage_config(self): + storage_config = {"filesystem": {"dir": "/etc/mimir/blocks"}} + self.coordinator._update_s3_storage_config(storage_config, "blocks") + expected_config = {"storage_prefix": "blocks"} + self.assertEqual(storage_config, expected_config) + + def test_ne_update_s3_storage_config(self): + storage_config = {"storage_prefix": "blocks"} + self.coordinator._update_s3_storage_config(storage_config, "blocks") + expected_config = {"storage_prefix": "blocks"} + self.assertEqual(storage_config, expected_config) + + def test_build_memberlist_config(self): + self.cluster_provider.gather_addresses.return_value = ["address1", "address2"] + memberlist_config = self.coordinator._build_memberlist_config() + expected_config = {"join_members": ["address1", "address2"]} + self.assertEqual(memberlist_config, expected_config) + + def test_build_tls_config(self): + self.tls_requirer.cacert = "/path/to/cert.pem" + self.tls_requirer.key = "/path/to/key.pem" + self.tls_requirer.capath = "/path/to/ca.pem" + tls_config = self.coordinator._build_tls_config() + expected_config = { + "tls_enabled": True, + "tls_cert_path": "/path/to/cert.pem", + "tls_key_path": "/path/to/key.pem", + "tls_ca_path": "/path/to/ca.pem", + } + self.assertEqual(tls_config, expected_config) + + +if __name__ == "__main__": + unittest.main()