Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Support for Time-Based Log Retention in Loki #377

Merged
merged 9 commits into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,11 @@ options:
Ref: https://grafana.com/docs/loki/latest/configure/#limits_config
type: int
default: 15
retention-period:
description: |
Sets a global retention period for log streams in Loki. A value of 0 disables retention (default).
Minimum retention period is 24h.
Specify the period in hours. For example, to set a 48-hour retention period, use `48`.
Specifying retention periods for individual streams is not currently supported.
type: int
default: 0
IbraAoad marked this conversation as resolved.
Show resolved Hide resolved
6 changes: 6 additions & 0 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,11 @@ def _configure(self): # noqa: C901
self._stored.status["config"] = to_tuple(MaintenanceStatus("Configuring Loki"))
return

IbraAoad marked this conversation as resolved.
Show resolved Hide resolved
if 0 < int(self.config["retention-period"]) < 24:
IbraAoad marked this conversation as resolved.
Show resolved Hide resolved
self._stored.status["config"] = to_tuple(
IbraAoad marked this conversation as resolved.
Show resolved Hide resolved
BlockedStatus("The minimum retention period is 24 hours.")
)

current_layer = self._container.get_plan()
new_layer = self._build_pebble_layer
restart = current_layer.services != new_layer.services
Expand All @@ -401,6 +406,7 @@ def _configure(self): # noqa: C901
external_url=self._external_url,
ingestion_rate_mb=int(self.config["ingestion-rate-mb"]),
ingestion_burst_size_mb=int(self.config["ingestion-burst-size-mb"]),
retention_period=int(self.config["retention-period"]),
http_tls=(self.server_cert.server_cert is not None),
).build()

Expand Down
30 changes: 28 additions & 2 deletions src/config_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

LOKI_DIR = "/loki"
CHUNKS_DIR = os.path.join(LOKI_DIR, "chunks")
COMPACTOR_DIR = os.path.join(LOKI_DIR, "compactor")
BOLTDB_DIR = os.path.join(LOKI_DIR, "boltdb-shipper-active")
BOLTDB_CACHE_DIR = os.path.join(LOKI_DIR, "boltdb-shipper-cache")
RULES_DIR = os.path.join(LOKI_DIR, "rules")


Expand All @@ -40,6 +42,7 @@ def __init__(
external_url: str,
ingestion_rate_mb: int,
ingestion_burst_size_mb: int,
retention_period: int,
http_tls: bool = False,
):
"""Init method."""
Expand All @@ -49,6 +52,7 @@ def __init__(
self.ingestion_rate_mb = ingestion_rate_mb
self.ingestion_burst_size_mb = ingestion_burst_size_mb
self.http_tls = http_tls
self.retention_period = retention_period

def build(self) -> dict:
"""Build Loki config dictionary."""
Expand All @@ -66,6 +70,7 @@ def build(self) -> dict:
"chunk_store_config": self._chunk_store_config,
"frontend": self._frontend,
"querier": self._querier,
"compactor": self._compactor,
}

@property
Expand Down Expand Up @@ -109,7 +114,7 @@ def _schema_config(self) -> dict:
"index": {"period": "24h", "prefix": "index_"},
"object_store": "filesystem",
"schema": "v11",
"store": "boltdb",
"store": "boltdb-shipper",
}
]
}
Expand All @@ -131,14 +136,23 @@ def _server(self) -> dict:

@property
def _storage_config(self) -> dict:
# Ref: https://grafana.com/docs/loki/latest/configure/#storage_config
return {
"boltdb": {"directory": BOLTDB_DIR},
"boltdb_shipper": {
"active_index_directory": BOLTDB_DIR,
"shared_store": "filesystem",
"cache_location": BOLTDB_CACHE_DIR,
},
"filesystem": {"directory": CHUNKS_DIR},
}

@property
def _limits_config(self) -> dict:
# Ref: https://grafana.com/docs/loki/latest/configure/#limits_config
if self.retention_period != 0:
IbraAoad marked this conversation as resolved.
Show resolved Hide resolved
retention_period = f"{self.retention_period}h"
else:
retention_period = self.retention_period
return {
# For convenience, we use an integer but Loki takes a float
"ingestion_rate_mb": float(self.ingestion_rate_mb),
Expand All @@ -150,6 +164,7 @@ def _limits_config(self) -> dict:
# This charmed operator is intended for running a single loki instances, so we don't need to split queries
# https://community.grafana.com/t/too-many-outstanding-requests-on-loki-2-7-1/78249/9
"split_queries_by_interval": "0",
"retention_period": retention_period,
}

@property
Expand Down Expand Up @@ -197,3 +212,14 @@ def _querier(self) -> dict:
# The maximum number of concurrent queries allowed. Default is 10.
"max_concurrent": 20,
}

@property
def _compactor(self) -> dict:
# Ref: https://grafana.com/docs/loki/latest/configure/#compactor
retention_enabled = self.retention_period != 0
return {
# Activate custom retention. Default is False.
"retention_enabled": retention_enabled,
"working_directory": COMPACTOR_DIR,
"shared_store": "filesystem",
}
74 changes: 74 additions & 0 deletions tests/integration/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,80 @@ async def loki_rules(ops_test, app_name) -> dict:
return {}


async def loki_services(ops_test, app_name: str) -> dict:
"""Fetches the status of Loki services from loki HTTP api.

Returns:
dict: A dictionary containing the status of Loki services, where keys are service names and values are their statuses.

Example:
{
'server': 'Running',
'ring': 'Running',
'analytics': 'Running',
'querier': 'Running',
'query-frontend': 'Running',
'query-scheduler-ring': 'Running',
'query-frontend-tripperware': 'Running',
'ingester': 'Running',
'distributor': 'Running',
'query-scheduler': 'Running',
'ingester-querier': 'Running',
'store': 'Running',
'cache-generation-loader': 'Running',
'memberlist-kv': 'Running',
'compactor': 'Running',
'ruler': 'Running'
}
"""
address = await get_unit_address(ops_test, app_name, 0)
url = f"http://{address}:3100/services"
try:
response = requests.get(url)
if response.status_code == 200:
services = {}
# Parse the response and populate the services dictionary
# Each line represents a service name and its status separated by " => "
# We split each line by " => " and store the key-value pairs in the services dictionary
for line in response.text.split("\n"):
if line.strip():
key, value = line.strip().split(" => ")
IbraAoad marked this conversation as resolved.
Show resolved Hide resolved
services[key.strip()] = value.strip()
return services
return {}
except requests.exceptions.RequestException:
return {}


async def loki_config(ops_test, app_name: str) -> dict:
"""Fetches the Loki configuration from loki HTTP api.

Returns:
dict: A dictionary containing the Loki configuration.

Example:
{
'limits_config': {
'retention_period': '0s'
},
'compactor': {
'retention_enabled': False
},
# Other configuration parameters...
}
"""
address = await get_unit_address(ops_test, app_name, 0)
url = f"http://{address}:3100/config"
try:
response = requests.get(url)
if response.status_code == 200:
yaml_dict = yaml.safe_load(response.text)
return yaml_dict
return {}
except requests.exceptions.RequestException:
return {}


async def loki_endpoint_request(ops_test, app_name: str, endpoint: str, unit_num: int = 0):
address = await get_unit_address(ops_test, app_name, unit_num)
url = urljoin(f"http://{address}:3100/", endpoint)
Expand Down
57 changes: 57 additions & 0 deletions tests/integration/test_loki_configs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#!/usr/bin/env python3
# Copyright 2024 Canonical Ltd.
# See LICENSE file for licensing details.

import logging
from pathlib import Path

import pytest
import yaml
from helpers import is_loki_up, loki_config, loki_services
from pytest_operator.plugin import OpsTest

logger = logging.getLogger(__name__)

METADATA = yaml.safe_load(Path("./metadata.yaml").read_text())
app_name = METADATA["name"]
resources = {"loki-image": METADATA["resources"]["loki-image"]["upstream-source"]}


@pytest.mark.abort_on_fail
async def test_services_running(ops_test: OpsTest, loki_charm):
"""Deploy the charm-under-test."""
logger.debug("deploy local charm")

await ops_test.model.deploy(
loki_charm, application_name=app_name, resources=resources, trust=True
)
await ops_test.model.wait_for_idle(apps=[app_name], status="active", timeout=1000)
assert await is_loki_up(ops_test, app_name)

services = await loki_services(ops_test, app_name)
assert all(status == "Running" for status in services.values()), "Not all services are running"
IbraAoad marked this conversation as resolved.
Show resolved Hide resolved


@pytest.mark.abort_on_fail
async def test_retention_configs(ops_test: OpsTest):
default_configs = await loki_config(ops_test, app_name)
assert all(
[
default_configs["limits_config"]["retention_period"] == "0s",
not default_configs["compactor"]["retention_enabled"],
]
)

await ops_test.model.applications[app_name].set_config({"retention-period": "24"})
await ops_test.model.wait_for_idle(apps=[app_name], status="active", timeout=1000)

configs_with_retention = await loki_config(ops_test, app_name)
assert all(
[
configs_with_retention["limits_config"]["retention_period"] == "1d",
configs_with_retention["compactor"]["retention_enabled"],
]
)

await ops_test.model.applications[app_name].set_config({"retention-period": "8"})
await ops_test.model.wait_for_idle(apps=[app_name], status="blocked", timeout=60)
Loading