From 0b53c073641a3ce44b4d48608148c97b8d620fda Mon Sep 17 00:00:00 2001 From: Leon <82407168+sed-i@users.noreply.github.com> Date: Tue, 20 Aug 2024 08:08:26 -0400 Subject: [PATCH] Update logging: bump lib and introduce pebble log forwarding (#486) * Bump loki lib to v1 * Use LogForwarder * Change to property --------- Co-authored-by: Dragomir Penev --- .../loki_k8s/{v0 => v1}/loki_push_api.py | 776 ++++++++++++------ src/charm.py | 26 +- 2 files changed, 529 insertions(+), 273 deletions(-) rename lib/charms/loki_k8s/{v0 => v1}/loki_push_api.py (80%) diff --git a/lib/charms/loki_k8s/v0/loki_push_api.py b/lib/charms/loki_k8s/v1/loki_push_api.py similarity index 80% rename from lib/charms/loki_k8s/v0/loki_push_api.py rename to lib/charms/loki_k8s/v1/loki_push_api.py index 16e1294649..7f8372c473 100644 --- a/lib/charms/loki_k8s/v0/loki_push_api.py +++ b/lib/charms/loki_k8s/v1/loki_push_api.py @@ -1,5 +1,5 @@ #!/usr/bin/env python3 -# Copyright 2021 Canonical Ltd. +# Copyright 2023 Canonical Ltd. # See LICENSE file for licensing details. # # Learn more at: https://juju.is/docs/sdk @@ -12,21 +12,28 @@ implement the provider side of the `loki_push_api` relation interface. For instance, a Loki charm. The provider side of the relation represents the server side, to which logs are being pushed. -- `LokiPushApiConsumer`: Used to obtain the loki api endpoint. This is useful for configuring - applications such as pebble, or charmed operators of workloads such as grafana-agent or promtail, - that can communicate with loki directly. +- `LokiPushApiConsumer`: This object is meant to be used by any Charmed Operator that needs to +send log to Loki by implementing the consumer side of the `loki_push_api` relation interface. +For instance, a Promtail or Grafana agent charm which needs to send logs to Loki. - `LogProxyConsumer`: DEPRECATED. This object can be used by any Charmed Operator which needs to send telemetry, such as logs, to Loki through a Log Proxy by implementing the consumer side of the `loki_push_api` relation interface. +In order to be able to control the labels on the logs pushed this object adds a Pebble layer +that runs Promtail in the workload container, injecting Juju topology labels into the +logs on the fly. +This object is deprecated. Consider migrating to LogForwarder with the release of Juju 3.6 LTS. + +- `LogForwarder`: This object can be used by any Charmed Operator which needs to send the workload +standard output (stdout) through Pebble's log forwarding mechanism, to Loki endpoints through the +`loki_push_api` relation interface. +In order to be able to control the labels on the logs pushed this object updates the pebble layer's +"log-targets" section with Juju topology. Filtering logs in Loki is largely performed on the basis of labels. In the Juju ecosystem, Juju topology labels are used to uniquely identify the workload which generates telemetry like logs. -In order to be able to control the labels on the logs pushed this object adds a Pebble layer -that runs Promtail in the workload container, injecting Juju topology labels into the -logs on the fly. ## LokiPushApiProvider Library Usage @@ -39,14 +46,14 @@ - `charm`: A reference to the parent (Loki) charm. - `relation_name`: The name of the relation that the charm uses to interact - with its clients, which implement `LokiPushApiConsumer` or `LogProxyConsumer` + with its clients, which implement `LokiPushApiConsumer` `LogForwarder`, or `LogProxyConsumer` (note that LogProxyConsumer is deprecated). If provided, this relation name must match a provided relation in metadata.yaml with the `loki_push_api` interface. - The default relation name is "logging" for `LokiPushApiConsumer` and "log-proxy" for - `LogProxyConsumer` (note that LogProxyConsumer is deprecated). + The default relation name is "logging" for `LokiPushApiConsumer` and `LogForwarder`, and + "log-proxy" for `LogProxyConsumer` (note that LogProxyConsumer is deprecated). For example, a provider's `metadata.yaml` file may look as follows: @@ -59,7 +66,7 @@ Subsequently, a Loki charm may instantiate the `LokiPushApiProvider` in its constructor as follows: - from charms.loki_k8s.v0.loki_push_api import LokiPushApiProvider + from charms.loki_k8s.v1.loki_push_api import LokiPushApiProvider from loki_server import LokiServer ... @@ -165,7 +172,7 @@ def __init__(self, *args): sends logs). ```python -from charms.loki_k8s.v0.loki_push_api import LokiPushApiConsumer +from charms.loki_k8s.v1.loki_push_api import LokiPushApiConsumer class LokiClientCharm(CharmBase): @@ -221,8 +228,8 @@ def __init__(self, *args): ## LogProxyConsumer Library Usage -> Note: This object is deprecated. Consider migrating to LogForwarder (see v1/loki_push_api) with -> the release of Juju 3.6 LTS. +> Note: This object is deprecated. Consider migrating to LogForwarder with the release of Juju 3.6 +> LTS. Let's say that we have a workload charm that produces logs, and we need to send those logs to a workload implementing the `loki_push_api` interface, such as `Loki` or `Grafana Agent`. @@ -236,16 +243,23 @@ def __init__(self, *args): For example: ```python - from charms.loki_k8s.v0.loki_push_api import LogProxyConsumer + from charms.loki_k8s.v1.loki_push_api import LogProxyConsumer ... def __init__(self, *args): ... self._log_proxy = LogProxyConsumer( - charm=self, log_files=LOG_FILES, container_name=PEER, enable_syslog=True + self, + logs_scheme={ + "workload-a": { + "log-files": ["/tmp/worload-a-1.log", "/tmp/worload-a-2.log"], + "syslog-port": 1514, + }, + "workload-b": {"log-files": ["/tmp/worload-b.log"], "syslog-port": 1515}, + }, + relation_name="log-proxy", ) - self.framework.observe( self._log_proxy.on.promtail_digest_error, self._promtail_error, @@ -282,22 +296,9 @@ def _promtail_error(self, event): Note that: - - `LOG_FILES` is a `list` containing the log files we want to send to `Loki` or - `Grafana Agent`, for instance: - - ```python - LOG_FILES = [ - "/var/log/apache2/access.log", - "/var/log/alternatives.log", - ] - ``` - - - `container_name` is the name of the container in which the application is running. - If in the Pod there is only one container, this argument can be omitted. - - You can configure your syslog software using `localhost` as the address and the method - `LogProxyConsumer.syslog_port` to get the port, or, alternatively, if you are using rsyslog - you may use the method `LogProxyConsumer.rsyslog_config()`. + `LogProxyConsumer.syslog_port("container_name")` to get the port, or, alternatively, if you are using rsyslog + you may use the method `LogProxyConsumer.rsyslog_config("container_name")`. 2. Modify the `metadata.yaml` file to add: @@ -360,6 +361,45 @@ def _promtail_error(self, event): ) ``` +## LogForwarder class Usage + +Let's say that we have a charm's workload that writes logs to the standard output (stdout), +and we need to send those logs to a workload implementing the `loki_push_api` interface, +such as `Loki` or `Grafana Agent`. To know how to reach a Loki instance, a charm would +typically use the `loki_push_api` interface. + +Use the `LogForwarder` class by instantiating it in the `__init__` method of the charm: + +```python +from charms.loki_k8s.v1.loki_push_api import LogForwarder + +... + + def __init__(self, *args): + ... + self._log_forwarder = LogForwarder( + self, + relation_name="logging" # optional, defaults to `logging` + ) +``` + +The `LogForwarder` by default will observe relation events on the `logging` endpoint and +enable/disable log forwarding automatically. +Next, modify the `metadata.yaml` file to add: + +The `log-forwarding` relation in the `requires` section: +```yaml +requires: + logging: + interface: loki_push_api + optional: true +``` + +Once the LogForwader class is implemented in your charm and the relation (implementing the +`loki_push_api` interface) is active and healthy, the library will inject a Pebble layer in +each workload container the charm has access to, to configure Pebble's log forwarding +feature and start sending logs to Loki. + ## Alerting Rules This charm library also supports gathering alerting rules from all related Loki client @@ -456,15 +496,16 @@ def _alert_rules_error(self, event): from hashlib import sha256 from io import BytesIO from pathlib import Path -from typing import Any, Dict, List, Optional, Tuple, Union, cast +from typing import Any, Dict, List, Optional, Tuple, Union from urllib import request -from urllib.error import HTTPError +from urllib.error import URLError import yaml from cosl import JujuTopology from ops.charm import ( CharmBase, HookEvent, + PebbleReadyEvent, RelationBrokenEvent, RelationCreatedEvent, RelationDepartedEvent, @@ -474,18 +515,19 @@ def _alert_rules_error(self, event): WorkloadEvent, ) from ops.framework import EventBase, EventSource, Object, ObjectEvents +from ops.jujuversion import JujuVersion from ops.model import Container, ModelError, Relation -from ops.pebble import APIError, ChangeError, PathError, ProtocolError +from ops.pebble import APIError, ChangeError, Layer, PathError, ProtocolError # The unique Charmhub library identifier, never change it LIBID = "bf76f23cdd03464b877c52bd1d2f563e" # Increment this major API version when introducing breaking changes -LIBAPI = 0 +LIBAPI = 1 # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 30 +LIBPATCH = 12 PYDEPS = ["cosl"] @@ -500,13 +542,21 @@ def _alert_rules_error(self, event): # To update Promtail version you only need to change the PROMTAIL_VERSION and # update all sha256 sums in PROMTAIL_BINARIES. To support a new architecture # you only need to add a new key value pair for the architecture in PROMTAIL_BINARIES. -PROMTAIL_VERSION = "v2.5.0" +PROMTAIL_VERSION = "v2.9.7" +PROMTAIL_ARM_BINARY = { + "filename": "promtail-static-arm64", + "zipsha": "c083fdb45e5c794103f974eeb426489b4142438d9e10d0ae272b2aff886e249b", + "binsha": "4cd055c477a301c0bdfdbcea514e6e93f6df5d57425ce10ffc77f3e16fec1ddf", +} + PROMTAIL_BINARIES = { "amd64": { "filename": "promtail-static-amd64", - "zipsha": "543e333b0184e14015a42c3c9e9e66d2464aaa66eca48b29e185a6a18f67ab6d", - "binsha": "17e2e271e65f793a9fbe81eab887b941e9d680abe82d5a0602888c50f5e0cac9", + "zipsha": "6873cbdabf23062aeefed6de5f00ff382710332af3ab90a48c253ea17e08f465", + "binsha": "28da9b99f81296fe297831f3bc9d92aea43b4a92826b8ff04ba433b8cb92fb50", }, + "arm64": PROMTAIL_ARM_BINARY, + "aarch64": PROMTAIL_ARM_BINARY, } # Paths in `charm` container @@ -520,8 +570,11 @@ def _alert_rules_error(self, event): WORKLOAD_POSITIONS_PATH = "{}/positions.yaml".format(WORKLOAD_BINARY_DIR) WORKLOAD_SERVICE_NAME = "promtail" -HTTP_LISTEN_PORT = 9080 -GRPC_LISTEN_PORT = 9095 +# These are the initial port values. As we can have more than one container, +# we use odd and even numbers to avoid collisions. +# Each new container adds 2 to the previous value. +HTTP_LISTEN_PORT_START = 9080 # even start port +GRPC_LISTEN_PORT_START = 9095 # odd start port class RelationNotFoundError(ValueError): @@ -765,7 +818,11 @@ def _from_file(self, root_path: Path, file_path: Path) -> List[dict]: alert_rule["labels"] = {} if self.topology: - alert_rule["labels"].update(self.topology.label_matcher_dict) + # only insert labels that do not already exist + for label, val in self.topology.label_matcher_dict.items(): + if label not in alert_rule["labels"]: + alert_rule["labels"][label] = val + # insert juju topology filters into a prometheus alert rule # logql doesn't like empty matchers, so add a job matcher which hits # any string as a "wildcard" which the topology labels will @@ -1544,8 +1601,8 @@ def __init__( the Loki API endpoint to push logs. It is intended for workloads that can speak loki_push_api (https://grafana.com/docs/loki/latest/api/#push-log-entries-to-loki), such as grafana-agent. - (If you need to forward workload stdout logs, then use v1/loki_push_api.LogForwarder; if - you need to forward log files, then use LogProxyConsumer.) + (If you need to forward workload stdout logs, then use LogForwarder; if you need to forward + log files, then use LogProxyConsumer.) `LokiPushApiConsumer` can be instantiated as follows: @@ -1685,20 +1742,6 @@ def __init__(self): super().__init__(self.message) -class MultipleContainersFoundError(Exception): - """Raised if no container name is passed but multiple containers are present.""" - - def __init__(self): - msg = ( - "No 'container_name' parameter has been specified; since this Charmed Operator" - " is has multiple containers, container_name must be specified for the container" - " to get logs from." - ) - self.message = msg - - super().__init__(self.message) - - class PromtailDigestError(EventBase): """Event emitted when there is an error with Promtail initialization.""" @@ -1734,35 +1777,44 @@ class LogProxyEvents(ObjectEvents): class LogProxyConsumer(ConsumerBase): """LogProxyConsumer class. - > Note: This object is deprecated. Consider migrating to v1/loki_push_api.LogForwarder with the - > release of Juju 3.6 LTS. + > Note: This object is deprecated. Consider migrating to LogForwarder with the release of Juju + > 3.6 LTS. The `LogProxyConsumer` object provides a method for attaching `promtail` to a workload in order to generate structured logging data from applications which traditionally log to syslog or do not have native Loki integration. The `LogProxyConsumer` can be instantiated as follows: - self._log_proxy_consumer = LogProxyConsumer(self, log_files=["/var/log/messages"]) + self._log_proxy = LogProxyConsumer( + self, + logs_scheme={ + "workload-a": { + "log-files": ["/tmp/worload-a-1.log", "/tmp/worload-a-2.log"], + "syslog-port": 1514, + }, + "workload-b": {"log-files": ["/tmp/worload-b.log"], "syslog-port": 1515}, + }, + relation_name="log-proxy", + ) Args: charm: a `CharmBase` object that manages this `LokiPushApiConsumer` object. Typically, this is `self` in the instantiating class. - log_files: a list of log files to monitor with Promtail. + logs_scheme: a dict which maps containers and a list of log files and syslog port. relation_name: the string name of the relation interface to look up. If `charm` has exactly one relation with this interface, the relation's name is returned. If none or multiple relations with the provided interface are found, this method will raise either a NoRelationWithInterfaceFoundError or MultipleRelationsWithInterfaceFoundError exception, respectively. - enable_syslog: Whether to enable syslog integration. - syslog_port: The port syslog is attached to. + containers_syslog_port: a dict which maps (and enable) containers and syslog port. alert_rules_path: an optional path for the location of alert rules files. Defaults to "./src/loki_alert_rules", resolved from the directory hosting the charm entry file. The alert rules are automatically updated on charm upgrade. recursive: Whether to scan for rule files recursively. - container_name: An optional container name to inject the payload into. promtail_resource_name: An optional promtail resource name from metadata if it has been modified and attached + insecure_skip_verify: skip SSL verification. Raises: RelationNotFoundError: If there is no relation in the charm's metadata.yaml @@ -1780,62 +1832,56 @@ class LogProxyConsumer(ConsumerBase): def __init__( self, charm, - log_files: Optional[Union[List[str], str]] = None, + *, + logs_scheme=None, relation_name: str = DEFAULT_LOG_PROXY_RELATION_NAME, - enable_syslog: bool = False, - syslog_port: int = 1514, alert_rules_path: str = DEFAULT_ALERT_RULES_RELATIVE_PATH, recursive: bool = False, - container_name: str = "", promtail_resource_name: Optional[str] = None, - *, # TODO: In v1, move the star up so everything after 'charm' is a kwarg insecure_skip_verify: bool = False, ): super().__init__(charm, relation_name, alert_rules_path, recursive) self._charm = charm + self._logs_scheme = logs_scheme or {} self._relation_name = relation_name - self._container = self._get_container(container_name) - self._container_name = self._get_container_name(container_name) - - if not log_files: - log_files = [] - elif isinstance(log_files, str): - log_files = [log_files] - elif not isinstance(log_files, list) or not all((isinstance(x, str) for x in log_files)): - raise TypeError("The 'log_files' argument must be a list of strings.") - self._log_files = log_files - - self._syslog_port = syslog_port - self._is_syslog = enable_syslog self.topology = JujuTopology.from_charm(charm) self._promtail_resource_name = promtail_resource_name or "promtail-bin" self.insecure_skip_verify = insecure_skip_verify + self._promtails_ports = self._generate_promtails_ports(logs_scheme) # architecture used for promtail binary arch = platform.processor() - self._arch = "amd64" if arch == "x86_64" else arch + if arch in ["x86_64", "amd64"]: + self._arch = "amd64" + elif arch in ["aarch64", "arm64", "armv8b", "armv8l"]: + self._arch = "arm64" + else: + self._arch = arch events = self._charm.on[relation_name] self.framework.observe(events.relation_created, self._on_relation_created) self.framework.observe(events.relation_changed, self._on_relation_changed) self.framework.observe(events.relation_departed, self._on_relation_departed) - # turn the container name to a valid Python identifier - snake_case_container_name = self._container_name.replace("-", "_") - self.framework.observe( - getattr(self._charm.on, "{}_pebble_ready".format(snake_case_container_name)), - self._on_pebble_ready, - ) + self._observe_pebble_ready() + + def _observe_pebble_ready(self): + for container in self._containers.keys(): + snake_case_container_name = container.replace("-", "_") + self.framework.observe( + getattr(self._charm.on, f"{snake_case_container_name}_pebble_ready"), + self._on_pebble_ready, + ) - def _on_pebble_ready(self, _: WorkloadEvent): + def _on_pebble_ready(self, event: WorkloadEvent): """Event handler for `pebble_ready`.""" if self.model.relations[self._relation_name]: - self._setup_promtail() + self._setup_promtail(event.workload) def _on_relation_created(self, _: RelationCreatedEvent) -> None: """Event handler for `relation_created`.""" - if not self._container.can_connect(): - return - self._setup_promtail() + for container in self._containers.values(): + if container.can_connect(): + self._setup_promtail(container) def _on_relation_changed(self, event: RelationEvent) -> None: """Event handler for `relation_changed`. @@ -1857,26 +1903,27 @@ def _on_relation_changed(self, event: RelationEvent) -> None: else: self.on.alert_rule_status_changed.emit(valid=valid, errors=errors) - if not self._container.can_connect(): - return - if self.model.relations[self._relation_name]: - if "promtail" not in self._container.get_plan().services: - self._setup_promtail() - return + for container in self._containers.values(): + if not container.can_connect(): + continue + if self.model.relations[self._relation_name]: + if "promtail" not in container.get_plan().services: + self._setup_promtail(container) + continue - new_config = self._promtail_config - if new_config != self._current_config: - self._container.push( - WORKLOAD_CONFIG_PATH, yaml.safe_dump(new_config), make_dirs=True - ) + new_config = self._promtail_config(container.name) + if new_config != self._current_config(container): + container.push( + WORKLOAD_CONFIG_PATH, yaml.safe_dump(new_config), make_dirs=True + ) - # Loki may send endpoints late. Don't necessarily start, there may be - # no clients - if new_config["clients"]: - self._container.restart(WORKLOAD_SERVICE_NAME) - self.on.log_proxy_endpoint_joined.emit() - else: - self.on.promtail_digest_error.emit("No promtail client endpoints available!") + # Loki may send endpoints late. Don't necessarily start, there may be + # no clients + if new_config["clients"]: + container.restart(WORKLOAD_SERVICE_NAME) + self.on.log_proxy_endpoint_joined.emit() + else: + self.on.promtail_digest_error.emit("No promtail client endpoints available!") def _on_relation_departed(self, _: RelationEvent) -> None: """Event handler for `relation_departed`. @@ -1884,106 +1931,52 @@ def _on_relation_departed(self, _: RelationEvent) -> None: Args: event: The event object `RelationDepartedEvent`. """ - if not self._container.can_connect(): - return - if not self._charm.model.relations[self._relation_name]: - self._container.stop(WORKLOAD_SERVICE_NAME) - return - - new_config = self._promtail_config - if new_config != self._current_config: - self._container.push(WORKLOAD_CONFIG_PATH, yaml.safe_dump(new_config), make_dirs=True) - - if new_config["clients"]: - self._container.restart(WORKLOAD_SERVICE_NAME) - else: - self._container.stop(WORKLOAD_SERVICE_NAME) - self.on.log_proxy_endpoint_departed.emit() - - def _get_container(self, container_name: str = "") -> Container: # pyright: ignore - """Gets a single container by name or using the only container running in the Pod. - - If there is more than one container in the Pod a `PromtailDigestError` is emitted. - - Args: - container_name: The container name. - - Returns: - A `ops.model.Container` object representing the container. - - Emits: - PromtailDigestError, if there was a problem obtaining a container. - """ - try: - container_name = self._get_container_name(container_name) - return self._charm.unit.get_container(container_name) - except (MultipleContainersFoundError, ContainerNotFoundError, ModelError) as e: - msg = str(e) - logger.warning(msg) - self.on.promtail_digest_error.emit(msg) - - def _get_container_name(self, container_name: str = "") -> str: - """Helper function for getting/validating a container name. - - Args: - container_name: The container name to be validated (optional). - - Returns: - container_name: The same container_name that was passed (if it exists) or the only - container name that is present (if no container_name was passed). - - Raises: - ContainerNotFoundError, if container_name does not exist. - MultipleContainersFoundError, if container_name was not provided but multiple - containers are present. - """ - containers = dict(self._charm.model.unit.containers) - if len(containers) == 0: - raise ContainerNotFoundError - - if not container_name: - # container_name was not provided - will get it ourselves, if it is the only one - if len(containers) > 1: - raise MultipleContainersFoundError - - # Get the first key in the containers' dict. - # Need to "cast", otherwise: - # error: Incompatible return value type (got "Optional[str]", expected "str") - container_name = cast(str, next(iter(containers.keys()))) + for container in self._containers.values(): + if not container.can_connect(): + continue + if not self._charm.model.relations[self._relation_name]: + container.stop(WORKLOAD_SERVICE_NAME) + continue - elif container_name not in containers: - raise ContainerNotFoundError + new_config = self._promtail_config(container.name) + if new_config != self._current_config(container): + container.push(WORKLOAD_CONFIG_PATH, yaml.safe_dump(new_config), make_dirs=True) - return container_name + if new_config["clients"]: + container.restart(WORKLOAD_SERVICE_NAME) + else: + container.stop(WORKLOAD_SERVICE_NAME) + self.on.log_proxy_endpoint_departed.emit() - def _add_pebble_layer(self, workload_binary_path: str) -> None: + def _add_pebble_layer(self, workload_binary_path: str, container: Container) -> None: """Adds Pebble layer that manages Promtail service in Workload container. Args: workload_binary_path: string providing path to promtail binary in workload container. + container: container into which the layer is to be added. """ - pebble_layer = { - "summary": "promtail layer", - "description": "pebble config layer for promtail", - "services": { - WORKLOAD_SERVICE_NAME: { - "override": "replace", - "summary": WORKLOAD_SERVICE_NAME, - "command": "{} {}".format(workload_binary_path, self._cli_args), - "startup": "disabled", - } - }, - } - self._container.add_layer( - self._container_name, pebble_layer, combine=True # pyright: ignore + pebble_layer = Layer( + { + "summary": "promtail layer", + "description": "pebble config layer for promtail", + "services": { + WORKLOAD_SERVICE_NAME: { + "override": "replace", + "summary": WORKLOAD_SERVICE_NAME, + "command": f"{workload_binary_path} {self._cli_args}", + "startup": "disabled", + } + }, + } ) + container.add_layer(container.name, pebble_layer, combine=True) - def _create_directories(self) -> None: + def _create_directories(self, container: Container) -> None: """Creates the directories for Promtail binary and config file.""" - self._container.make_dir(path=WORKLOAD_BINARY_DIR, make_parents=True) - self._container.make_dir(path=WORKLOAD_CONFIG_DIR, make_parents=True) + container.make_dir(path=WORKLOAD_BINARY_DIR, make_parents=True) + container.make_dir(path=WORKLOAD_CONFIG_DIR, make_parents=True) - def _obtain_promtail(self, promtail_info: dict) -> None: + def _obtain_promtail(self, promtail_info: dict, container: Container) -> None: """Obtain promtail binary from an attached resource or download it. Args: @@ -1992,33 +1985,31 @@ def _obtain_promtail(self, promtail_info: dict) -> None: - "filename": filename of promtail binary - "zipsha": sha256 sum of zip file of promtail binary - "binsha": sha256 sum of unpacked promtail binary + container: container into which promtail is to be obtained. """ workload_binary_path = os.path.join(WORKLOAD_BINARY_DIR, promtail_info["filename"]) if self._promtail_attached_as_resource: - self._push_promtail_if_attached(workload_binary_path) + self._push_promtail_if_attached(container, workload_binary_path) return if self._promtail_must_be_downloaded(promtail_info): - self._download_and_push_promtail_to_workload(promtail_info) + self._download_and_push_promtail_to_workload(container, promtail_info) else: binary_path = os.path.join(BINARY_DIR, promtail_info["filename"]) - self._push_binary_to_workload(binary_path, workload_binary_path) + self._push_binary_to_workload(container, binary_path, workload_binary_path) - def _push_binary_to_workload(self, binary_path: str, workload_binary_path: str) -> None: + def _push_binary_to_workload( + self, container: Container, binary_path: str, workload_binary_path: str + ) -> None: """Push promtail binary into workload container. Args: binary_path: path in charm container from which promtail binary is read. workload_binary_path: path in workload container to which promtail binary is pushed. + container: container into which promtail is to be uploaded. """ with open(binary_path, "rb") as f: - self._container.push( - workload_binary_path, - f, - permissions=0o755, - encoding=None, # pyright: ignore - make_dirs=True, - ) + container.push(workload_binary_path, f, permissions=0o755, make_dirs=True) logger.debug("The promtail binary file has been pushed to the workload container.") @property @@ -2038,19 +2029,20 @@ def _promtail_attached_as_resource(self) -> bool: return False raise - def _push_promtail_if_attached(self, workload_binary_path: str) -> bool: + def _push_promtail_if_attached(self, container: Container, workload_binary_path: str) -> bool: """Checks whether Promtail binary is attached to the charm or not. Args: workload_binary_path: string specifying expected path of promtail in workload container + container: container into which promtail is to be pushed. Returns: a boolean representing whether Promtail binary is attached or not. """ logger.info("Promtail binary file has been obtained from an attached resource.") resource_path = self._charm.model.resources.fetch(self._promtail_resource_name) - self._push_binary_to_workload(resource_path, workload_binary_path) + self._push_binary_to_workload(container, resource_path, workload_binary_path) return True def _promtail_must_be_downloaded(self, promtail_info: dict) -> bool: @@ -2116,7 +2108,9 @@ def _is_promtail_binary_in_charm(self, binary_path: str) -> bool: """ return True if Path(binary_path).is_file() else False - def _download_and_push_promtail_to_workload(self, promtail_info: dict) -> None: + def _download_and_push_promtail_to_workload( + self, container: Container, promtail_info: dict + ) -> None: """Downloads a Promtail zip file and pushes the binary to the workload. Args: @@ -2125,6 +2119,7 @@ def _download_and_push_promtail_to_workload(self, promtail_info: dict) -> None: - "filename": filename of promtail binary - "zipsha": sha256 sum of zip file of promtail binary - "binsha": sha256 sum of unpacked promtail binary + container: container into which promtail is to be uploaded. """ # Check for Juju proxy variables and fall back to standard ones if not set # If no Juju proxy variable was set, we set proxies to None to let the ProxyHandler get @@ -2160,7 +2155,7 @@ def _download_and_push_promtail_to_workload(self, promtail_info: dict) -> None: logger.debug("Promtail binary file has been downloaded.") workload_binary_path = os.path.join(WORKLOAD_BINARY_DIR, promtail_info["filename"]) - self._push_binary_to_workload(binary_path, workload_binary_path) + self._push_binary_to_workload(container, binary_path, workload_binary_path) @property def _cli_args(self) -> str: @@ -2171,18 +2166,17 @@ def _cli_args(self) -> str: """ return "-config.file={}".format(WORKLOAD_CONFIG_PATH) - @property - def _current_config(self) -> dict: + def _current_config(self, container) -> dict: """Property that returns the current Promtail configuration. Returns: A dict containing Promtail configuration. """ - if not self._container.can_connect(): + if not container.can_connect(): logger.debug("Could not connect to promtail container!") return {} try: - raw_current = self._container.pull(WORKLOAD_CONFIG_PATH).read() + raw_current = container.pull(WORKLOAD_CONFIG_PATH).read() return yaml.safe_load(raw_current) except (ProtocolError, PathError) as e: logger.warning( @@ -2192,8 +2186,7 @@ def _current_config(self) -> dict: ) return {} - @property - def _promtail_config(self) -> dict: + def _promtail_config(self, container_name: str) -> dict: """Generates the config file for Promtail. Reference: https://grafana.com/docs/loki/latest/send-data/promtail/configuration @@ -2203,9 +2196,9 @@ def _promtail_config(self) -> dict: for client in config["clients"]: client["tls_config"] = {"insecure_skip_verify": True} - config.update(self._server_config()) - config.update(self._positions()) - config.update(self._scrape_configs()) + config.update(self._server_config(container_name)) + config.update(self._positions) + config.update(self._scrape_configs(container_name)) return config def _clients_list(self) -> list: @@ -2216,7 +2209,7 @@ def _clients_list(self) -> list: """ return self.loki_endpoints - def _server_config(self) -> dict: + def _server_config(self, container_name: str) -> dict: """Generates the server section of the Promtail config file. Returns: @@ -2224,11 +2217,12 @@ def _server_config(self) -> dict: """ return { "server": { - "http_listen_port": HTTP_LISTEN_PORT, - "grpc_listen_port": GRPC_LISTEN_PORT, + "http_listen_port": self._promtails_ports[container_name]["http_listen_port"], + "grpc_listen_port": self._promtails_ports[container_name]["grpc_listen_port"], } } + @property def _positions(self) -> dict: """Generates the positions section of the Promtail config file. @@ -2237,19 +2231,20 @@ def _positions(self) -> dict: """ return {"positions": {"filename": WORKLOAD_POSITIONS_PATH}} - def _scrape_configs(self) -> dict: + def _scrape_configs(self, container_name: str) -> dict: """Generates the scrape_configs section of the Promtail config file. Returns: A dict representing the `scrape_configs` section. """ - job_name = "juju_{}".format(self.topology.identifier) + job_name = f"juju_{self.topology.identifier}" # The new JujuTopology doesn't include unit, but LogProxyConsumer should have it common_labels = { - "juju_{}".format(k): v + f"juju_{k}": v for k, v in self.topology.as_dict(remapped_keys={"charm_name": "charm"}).items() } + common_labels["container"] = container_name scrape_configs = [] # Files config @@ -2263,12 +2258,13 @@ def _scrape_configs(self) -> dict: config = {"targets": ["localhost"], "labels": labels} scrape_config = { "job_name": "system", - "static_configs": self._generate_static_configs(config), + "static_configs": self._generate_static_configs(config, container_name), } scrape_configs.append(scrape_config) # Syslog config - if self._is_syslog: + syslog_port = self._logs_scheme.get(container_name, {}).get("syslog-port") + if syslog_port: relabel_mappings = [ "severity", "facility", @@ -2278,16 +2274,16 @@ def _scrape_configs(self) -> dict: "msg_id", ] syslog_labels = common_labels.copy() - syslog_labels.update({"job": "{}_syslog".format(job_name)}) + syslog_labels.update({"job": f"{job_name}_syslog"}) syslog_config = { "job_name": "syslog", "syslog": { - "listen_address": "127.0.0.1:{}".format(self._syslog_port), + "listen_address": f"127.0.0.1:{syslog_port}", "label_structured_data": True, "labels": syslog_labels, }, "relabel_configs": [ - {"source_labels": ["__syslog_message_{}".format(val)], "target_label": val} + {"source_labels": [f"__syslog_message_{val}"], "target_label": val} for val in relabel_mappings ] + [{"action": "labelmap", "regex": "__syslog_message_sd_(.+)"}], @@ -2296,7 +2292,7 @@ def _scrape_configs(self) -> dict: return {"scrape_configs": scrape_configs} - def _generate_static_configs(self, config: dict) -> list: + def _generate_static_configs(self, config: dict, container_name: str) -> list: """Generates static_configs section. Returns: @@ -2304,14 +2300,14 @@ def _generate_static_configs(self, config: dict) -> list: """ static_configs = [] - for _file in self._log_files: + for _file in self._logs_scheme.get(container_name, {}).get("log-files", []): conf = deepcopy(config) conf["labels"]["__path__"] = _file static_configs.append(conf) return static_configs - def _setup_promtail(self) -> None: + def _setup_promtail(self, container: Container) -> None: # Use the first relations = self._charm.model.relations[self._relation_name] if len(relations) > 1: @@ -2327,29 +2323,23 @@ def _setup_promtail(self) -> None: if not promtail_binaries: return - if not self._is_promtail_installed(promtail_binaries[self._arch]): - try: - self._obtain_promtail(promtail_binaries[self._arch]) - except HTTPError as e: - msg = "Promtail binary couldn't be downloaded - {}".format(str(e)) - logger.warning(msg) - self.on.promtail_digest_error.emit(msg) - return + self._create_directories(container) + self._ensure_promtail_binary(promtail_binaries, container) - workload_binary_path = os.path.join( - WORKLOAD_BINARY_DIR, promtail_binaries[self._arch]["filename"] + container.push( + WORKLOAD_CONFIG_PATH, + yaml.safe_dump(self._promtail_config(container.name)), + make_dirs=True, ) - self._create_directories() - self._container.push( - WORKLOAD_CONFIG_PATH, yaml.safe_dump(self._promtail_config), make_dirs=True + workload_binary_path = os.path.join( + WORKLOAD_BINARY_DIR, promtail_binaries[self._arch]["filename"] ) + self._add_pebble_layer(workload_binary_path, container) - self._add_pebble_layer(workload_binary_path) - - if self._current_config.get("clients"): + if self._current_config(container).get("clients"): try: - self._container.restart(WORKLOAD_SERVICE_NAME) + container.restart(WORKLOAD_SERVICE_NAME) except ChangeError as e: self.on.promtail_digest_error.emit(str(e)) else: @@ -2357,40 +2347,292 @@ def _setup_promtail(self) -> None: else: self.on.promtail_digest_error.emit("No promtail client endpoints available!") - def _is_promtail_installed(self, promtail_info: dict) -> bool: + def _ensure_promtail_binary(self, promtail_binaries: dict, container: Container): + if self._is_promtail_installed(promtail_binaries[self._arch], container): + return + + try: + self._obtain_promtail(promtail_binaries[self._arch], container) + except URLError as e: + msg = f"Promtail binary couldn't be downloaded - {str(e)}" + logger.warning(msg) + self.on.promtail_digest_error.emit(msg) + + def _is_promtail_installed(self, promtail_info: dict, container: Container) -> bool: """Determine if promtail has already been installed to the container. Args: promtail_info: dictionary containing information about promtail binary that must be used. The dictionary must at least contain a key "filename" giving the name of promtail binary + container: container in which to check whether promtail is installed. """ - workload_binary_path = "{}/{}".format(WORKLOAD_BINARY_DIR, promtail_info["filename"]) + workload_binary_path = f"{WORKLOAD_BINARY_DIR}/{promtail_info['filename']}" try: - self._container.list_files(workload_binary_path) + container.list_files(workload_binary_path) except (APIError, FileNotFoundError): return False return True - @property - def syslog_port(self) -> str: - """Gets the port on which promtail is listening for syslog. + def _generate_promtails_ports(self, logs_scheme) -> dict: + return { + container: { + "http_listen_port": HTTP_LISTEN_PORT_START + 2 * i, + "grpc_listen_port": GRPC_LISTEN_PORT_START + 2 * i, + } + for i, container in enumerate(logs_scheme.keys()) + } + + def syslog_port(self, container_name: str) -> str: + """Gets the port on which promtail is listening for syslog in this container. Returns: A str representing the port """ - return str(self._syslog_port) + return str(self._logs_scheme.get(container_name, {}).get("syslog-port")) - @property - def rsyslog_config(self) -> str: + def rsyslog_config(self, container_name: str) -> str: """Generates a config line for use with rsyslog. Returns: The rsyslog config line as a string """ return 'action(type="omfwd" protocol="tcp" target="127.0.0.1" port="{}" Template="RSYSLOG_SyslogProtocol23Format" TCP_Framing="octet-counted")'.format( - self._syslog_port + self._logs_scheme.get(container_name, {}).get("syslog-port") + ) + + @property + def _containers(self) -> Dict[str, Container]: + return {cont: self._charm.unit.get_container(cont) for cont in self._logs_scheme.keys()} + + +class _PebbleLogClient: + @staticmethod + def check_juju_version() -> bool: + """Make sure the Juju version supports Log Forwarding.""" + juju_version = JujuVersion.from_environ() + if not juju_version > JujuVersion(version=str("3.3")): + msg = f"Juju version {juju_version} does not support Pebble log forwarding. Juju >= 3.4 is needed." + logger.warning(msg) + return False + return True + + @staticmethod + def _build_log_target( + unit_name: str, loki_endpoint: str, topology: JujuTopology, enable: bool + ) -> Dict: + """Build a log target for the log forwarding Pebble layer. + + Log target's syntax for enabling/disabling forwarding is explained here: + https://github.com/canonical/pebble?tab=readme-ov-file#log-forwarding + """ + services_value = ["all"] if enable else ["-all"] + + log_target = { + "override": "replace", + "services": services_value, + "type": "loki", + "location": loki_endpoint, + } + if enable: + log_target.update( + { + "labels": { + "product": "Juju", + "charm": topology._charm_name, + "juju_model": topology._model, + "juju_model_uuid": topology._model_uuid, + "juju_application": topology._application, + "juju_unit": topology._unit, + }, + } + ) + + return {unit_name: log_target} + + @staticmethod + def _build_log_targets( + loki_endpoints: Optional[Dict[str, str]], topology: JujuTopology, enable: bool + ): + """Build all the targets for the log forwarding Pebble layer.""" + targets = {} + if not loki_endpoints: + return targets + + for unit_name, endpoint in loki_endpoints.items(): + targets.update( + _PebbleLogClient._build_log_target( + unit_name=unit_name, + loki_endpoint=endpoint, + topology=topology, + enable=enable, + ) + ) + return targets + + @staticmethod + def disable_inactive_endpoints( + container: Container, active_endpoints: Dict[str, str], topology: JujuTopology + ): + """Disable forwarding for inactive endpoints by checking against the Pebble plan.""" + pebble_layer = container.get_plan().to_dict().get("log-targets", None) + if not pebble_layer: + return + + for unit_name, target in pebble_layer.items(): + # If the layer is a disabled log forwarding endpoint, skip it + if "-all" in target["services"]: # pyright: ignore + continue + + if unit_name not in active_endpoints: + layer = Layer( + { # pyright: ignore + "log-targets": _PebbleLogClient._build_log_targets( + loki_endpoints={unit_name: "(removed)"}, + topology=topology, + enable=False, + ) + } + ) + container.add_layer(f"{container.name}-log-forwarding", layer=layer, combine=True) + + @staticmethod + def enable_endpoints( + container: Container, active_endpoints: Dict[str, str], topology: JujuTopology + ): + """Enable forwarding for the specified Loki endpoints.""" + layer = Layer( + { # pyright: ignore + "log-targets": _PebbleLogClient._build_log_targets( + loki_endpoints=active_endpoints, + topology=topology, + enable=True, + ) + } ) + container.add_layer(f"{container.name}-log-forwarding", layer, combine=True) + + +class LogForwarder(ConsumerBase): + """Forward the standard outputs of all workloads operated by a charm to one or multiple Loki endpoints. + + This class implements Pebble log forwarding. Juju >= 3.4 is needed. + """ + + def __init__( + self, + charm: CharmBase, + *, + relation_name: str = DEFAULT_RELATION_NAME, + alert_rules_path: str = DEFAULT_ALERT_RULES_RELATIVE_PATH, + recursive: bool = True, + skip_alert_topology_labeling: bool = False, + ): + _PebbleLogClient.check_juju_version() + super().__init__( + charm, relation_name, alert_rules_path, recursive, skip_alert_topology_labeling + ) + self._charm = charm + self._relation_name = relation_name + + on = self._charm.on[self._relation_name] + self.framework.observe(on.relation_joined, self._update_logging) + self.framework.observe(on.relation_changed, self._update_logging) + self.framework.observe(on.relation_departed, self._update_logging) + self.framework.observe(on.relation_broken, self._update_logging) + + for container_name in self._charm.meta.containers.keys(): + snake_case_container_name = container_name.replace("-", "_") + self.framework.observe( + getattr(self._charm.on, f"{snake_case_container_name}_pebble_ready"), + self._on_pebble_ready, + ) + + def _on_pebble_ready(self, event: PebbleReadyEvent): + if not (loki_endpoints := self._retrieve_endpoints_from_relation()): + logger.warning("No Loki endpoints available") + return + + self._update_endpoints(event.workload, loki_endpoints) + + def _update_logging(self, _): + """Update the log forwarding to match the active Loki endpoints.""" + if not (loki_endpoints := self._retrieve_endpoints_from_relation()): + logger.warning("No Loki endpoints available") + return + + for container in self._charm.unit.containers.values(): + if container.can_connect(): + self._update_endpoints(container, loki_endpoints) + # else: `_update_endpoints` will be called on pebble-ready anyway. + + def _retrieve_endpoints_from_relation(self) -> dict: + loki_endpoints = {} + + # Get the endpoints from relation data + for relation in self._charm.model.relations[self._relation_name]: + loki_endpoints.update(self._fetch_endpoints(relation)) + + return loki_endpoints + + def _update_endpoints(self, container: Container, loki_endpoints: dict): + _PebbleLogClient.disable_inactive_endpoints( + container=container, + active_endpoints=loki_endpoints, + topology=self.topology, + ) + _PebbleLogClient.enable_endpoints( + container=container, active_endpoints=loki_endpoints, topology=self.topology + ) + + def is_ready(self, relation: Optional[Relation] = None): + """Check if the relation is active and healthy.""" + if not relation: + relations = self._charm.model.relations[self._relation_name] + if not relations: + return False + return all(self.is_ready(relation) for relation in relations) + + try: + if self._extract_urls(relation): + return True + return False + except (KeyError, json.JSONDecodeError): + return False + + def _extract_urls(self, relation: Relation) -> Dict[str, str]: + """Default getter function to extract Loki endpoints from a relation. + + Returns: + A dictionary of remote units and the respective Loki endpoint. + { + "loki/0": "http://loki:3100/loki/api/v1/push", + "another-loki/0": "http://another-loki:3100/loki/api/v1/push", + } + """ + endpoints: Dict = {} + + for unit in relation.units: + endpoint = relation.data[unit]["endpoint"] + deserialized_endpoint = json.loads(endpoint) + url = deserialized_endpoint["url"] + endpoints[unit.name] = url + + return endpoints + + def _fetch_endpoints(self, relation: Relation) -> Dict[str, str]: + """Fetch Loki Push API endpoints from relation data using the endpoints getter.""" + endpoints: Dict = {} + + if not self.is_ready(relation): + logger.warning(f"The relation '{relation.name}' is not ready yet.") + return endpoints + + # if the code gets here, the function won't raise anymore because it's + # also called in is_ready() + endpoints = self._extract_urls(relation) + + return endpoints class CosTool: diff --git a/src/charm.py b/src/charm.py index 73fb037bda..9695ff3946 100755 --- a/src/charm.py +++ b/src/charm.py @@ -31,7 +31,7 @@ from charms.data_platform_libs.v0.data_interfaces import DataPeerData, DataPeerUnitData from charms.data_platform_libs.v0.data_models import TypedCharmBase from charms.grafana_k8s.v0.grafana_dashboard import GrafanaDashboardProvider -from charms.loki_k8s.v0.loki_push_api import LogProxyConsumer +from charms.loki_k8s.v1.loki_push_api import LogForwarder, LogProxyConsumer from charms.observability_libs.v1.kubernetes_service_patch import KubernetesServicePatch from charms.postgresql_k8s.v0.postgresql import ( REQUIRED_PLUGINS, @@ -226,11 +226,17 @@ def __init__(self, *args): refresh_event=[self.on.start], jobs=self._generate_metrics_jobs(self.is_tls_enabled), ) - self.loki_push = LogProxyConsumer( - self, - log_files=POSTGRES_LOG_FILES, - relation_name="logging", - container_name="postgresql", + self.loki_push = ( + LogForwarder( + self, + relation_name="logging", + ) + if self._pebble_log_forwarding_supported + else LogProxyConsumer( + self, + logs_scheme={"postgresql": {"log-files": POSTGRES_LOG_FILES}}, + relation_name="logging", + ) ) postgresql_db_port = ServicePort(5432, name="database") @@ -246,6 +252,14 @@ def tracing_endpoint(self) -> Optional[str]: if self.tracing.is_ready(): return self.tracing.get_endpoint(TRACING_PROTOCOL) + @property + def _pebble_log_forwarding_supported(self) -> bool: + # https://github.com/canonical/operator/issues/1230 + from ops.jujuversion import JujuVersion + + juju_version = JujuVersion.from_environ() + return juju_version > JujuVersion(version=str("3.3")) + def _generate_metrics_jobs(self, enable_tls: bool) -> Dict: """Generate spec for Prometheus scraping.""" return [