diff --git a/config.yaml b/config.yaml index f2a0f2b8d..eae5cdfec 100644 --- a/config.yaml +++ b/config.yaml @@ -37,3 +37,11 @@ options: Ref: https://grafana.com/docs/loki/latest/configure/#limits_config type: int default: 15 + retention-period: + description: | + Sets a global retention period for log streams in Loki. A value of 0 disables retention (default). + Minimum retention period is 1d. + Specify the period in days. For example, to set a 48-day retention period, use `48`. + Specifying retention periods for individual streams is not currently supported. + type: int + default: 0 diff --git a/src/charm.py b/src/charm.py index 76006fe76..b703f1501 100755 --- a/src/charm.py +++ b/src/charm.py @@ -80,6 +80,7 @@ class CompositeStatus(TypedDict): k8s_patch: Tuple[str, str] config: Tuple[str, str] rules: Tuple[str, str] + retention: Tuple[str, str] def to_tuple(status: StatusBase) -> Tuple[str, str]: @@ -123,6 +124,7 @@ def __init__(self, *args): k8s_patch=to_tuple(ActiveStatus()), config=to_tuple(ActiveStatus()), rules=to_tuple(ActiveStatus()), + retention=to_tuple(ActiveStatus()), ) ) @@ -387,10 +389,20 @@ def _configure(self): # noqa: C901 # "can_connect" is a racy check, so we do it once here (instead of in collect-status) if self._container.can_connect(): self._stored.status["config"] = to_tuple(ActiveStatus()) + + # The config validity check does not return on error because if a lifecycle event + # comes in after a config change, we still want Loki to continue to function even + # with the invalid config. else: self._stored.status["config"] = to_tuple(MaintenanceStatus("Configuring Loki")) return + if 0 > int(self.config["retention-period"]): + self._stored.status["retention"] = to_tuple( + BlockedStatus("Please provide a non-negative retention duration") + ) + return + current_layer = self._container.get_plan() new_layer = self._build_pebble_layer restart = current_layer.services != new_layer.services @@ -401,6 +413,7 @@ def _configure(self): # noqa: C901 external_url=self._external_url, ingestion_rate_mb=int(self.config["ingestion-rate-mb"]), ingestion_burst_size_mb=int(self.config["ingestion-burst-size-mb"]), + retention_period=int(self.config["retention-period"]), http_tls=(self.server_cert.server_cert is not None), ).build() diff --git a/src/config_builder.py b/src/config_builder.py index cf2364f2f..3ee201d78 100644 --- a/src/config_builder.py +++ b/src/config_builder.py @@ -16,7 +16,9 @@ LOKI_DIR = "/loki" CHUNKS_DIR = os.path.join(LOKI_DIR, "chunks") +COMPACTOR_DIR = os.path.join(LOKI_DIR, "compactor") BOLTDB_DIR = os.path.join(LOKI_DIR, "boltdb-shipper-active") +BOLTDB_CACHE_DIR = os.path.join(LOKI_DIR, "boltdb-shipper-cache") RULES_DIR = os.path.join(LOKI_DIR, "rules") @@ -40,6 +42,7 @@ def __init__( external_url: str, ingestion_rate_mb: int, ingestion_burst_size_mb: int, + retention_period: int, http_tls: bool = False, ): """Init method.""" @@ -49,6 +52,7 @@ def __init__( self.ingestion_rate_mb = ingestion_rate_mb self.ingestion_burst_size_mb = ingestion_burst_size_mb self.http_tls = http_tls + self.retention_period = retention_period def build(self) -> dict: """Build Loki config dictionary.""" @@ -66,6 +70,7 @@ def build(self) -> dict: "chunk_store_config": self._chunk_store_config, "frontend": self._frontend, "querier": self._querier, + "compactor": self._compactor, } @property @@ -109,7 +114,7 @@ def _schema_config(self) -> dict: "index": {"period": "24h", "prefix": "index_"}, "object_store": "filesystem", "schema": "v11", - "store": "boltdb", + "store": "boltdb-shipper", } ] } @@ -131,8 +136,13 @@ def _server(self) -> dict: @property def _storage_config(self) -> dict: + # Ref: https://grafana.com/docs/loki/latest/configure/#storage_config return { - "boltdb": {"directory": BOLTDB_DIR}, + "boltdb_shipper": { + "active_index_directory": BOLTDB_DIR, + "shared_store": "filesystem", + "cache_location": BOLTDB_CACHE_DIR, + }, "filesystem": {"directory": CHUNKS_DIR}, } @@ -150,6 +160,7 @@ def _limits_config(self) -> dict: # This charmed operator is intended for running a single loki instances, so we don't need to split queries # https://community.grafana.com/t/too-many-outstanding-requests-on-loki-2-7-1/78249/9 "split_queries_by_interval": "0", + "retention_period": f"{self.retention_period}d", } @property @@ -197,3 +208,14 @@ def _querier(self) -> dict: # The maximum number of concurrent queries allowed. Default is 10. "max_concurrent": 20, } + + @property + def _compactor(self) -> dict: + # Ref: https://grafana.com/docs/loki/latest/configure/#compactor + retention_enabled = self.retention_period != 0 + return { + # Activate custom retention. Default is False. + "retention_enabled": retention_enabled, + "working_directory": COMPACTOR_DIR, + "shared_store": "filesystem", + } diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index 8c2ce52c9..f119d6836 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -62,6 +62,80 @@ async def loki_rules(ops_test, app_name) -> dict: return {} +async def loki_services(ops_test, app_name: str) -> dict: + """Fetches the status of Loki services from loki HTTP api. + + Returns: + dict: A dictionary containing the status of Loki services, where keys are service names and values are their statuses. + + Example: + { + 'server': 'Running', + 'ring': 'Running', + 'analytics': 'Running', + 'querier': 'Running', + 'query-frontend': 'Running', + 'query-scheduler-ring': 'Running', + 'query-frontend-tripperware': 'Running', + 'ingester': 'Running', + 'distributor': 'Running', + 'query-scheduler': 'Running', + 'ingester-querier': 'Running', + 'store': 'Running', + 'cache-generation-loader': 'Running', + 'memberlist-kv': 'Running', + 'compactor': 'Running', + 'ruler': 'Running' + } + """ + address = await get_unit_address(ops_test, app_name, 0) + url = f"http://{address}:3100/services" + try: + response = requests.get(url) + if response.status_code == 200: + services = {} + # Parse the response and populate the services dictionary + # Each line represents a service name and its status separated by " => " + # We split each line by " => " and store the key-value pairs in the services dictionary + for line in response.text.split("\n"): + if line.strip(): + key, value = line.strip().split(" => ") + services[key.strip()] = value.strip() + return services + return {} + except requests.exceptions.RequestException: + return {} + + +async def loki_config(ops_test, app_name: str) -> dict: + """Fetches the Loki configuration from loki HTTP api. + + Returns: + dict: A dictionary containing the Loki configuration. + + Example: + { + 'limits_config': { + 'retention_period': '0s' + }, + 'compactor': { + 'retention_enabled': False + }, + # Other configuration parameters... + } + """ + address = await get_unit_address(ops_test, app_name, 0) + url = f"http://{address}:3100/config" + try: + response = requests.get(url) + if response.status_code == 200: + yaml_dict = yaml.safe_load(response.text) + return yaml_dict + return {} + except requests.exceptions.RequestException: + return {} + + async def loki_endpoint_request(ops_test, app_name: str, endpoint: str, unit_num: int = 0): address = await get_unit_address(ops_test, app_name, unit_num) url = urljoin(f"http://{address}:3100/", endpoint) diff --git a/tests/integration/test_loki_configs.py b/tests/integration/test_loki_configs.py new file mode 100644 index 000000000..f51fdd84d --- /dev/null +++ b/tests/integration/test_loki_configs.py @@ -0,0 +1,54 @@ +#!/usr/bin/env python3 +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. + +import logging +from pathlib import Path + +import pytest +import yaml +from helpers import is_loki_up, loki_config, loki_services +from pytest_operator.plugin import OpsTest + +logger = logging.getLogger(__name__) + +METADATA = yaml.safe_load(Path("./metadata.yaml").read_text()) +app_name = METADATA["name"] +resources = {"loki-image": METADATA["resources"]["loki-image"]["upstream-source"]} + + +@pytest.mark.abort_on_fail +async def test_services_running(ops_test: OpsTest, loki_charm): + """Deploy the charm-under-test.""" + logger.debug("deploy local charm") + + await ops_test.model.deploy( + loki_charm, application_name=app_name, resources=resources, trust=True + ) + await ops_test.model.wait_for_idle(apps=[app_name], status="active", timeout=1000) + assert await is_loki_up(ops_test, app_name) + + services = await loki_services(ops_test, app_name) + assert all(status == "Running" for status in services.values()), "Not all services are running" + + +@pytest.mark.abort_on_fail +async def test_retention_configs(ops_test: OpsTest): + default_configs = await loki_config(ops_test, app_name) + assert all( + [ + default_configs["limits_config"]["retention_period"] == "0s", + not default_configs["compactor"]["retention_enabled"], + ] + ) + + await ops_test.model.applications[app_name].set_config({"retention-period": "3"}) + await ops_test.model.wait_for_idle(apps=[app_name], status="active", timeout=1000) + + configs_with_retention = await loki_config(ops_test, app_name) + assert all( + [ + configs_with_retention["limits_config"]["retention_period"] == "3d", + configs_with_retention["compactor"]["retention_enabled"], + ] + )