Skip to content

Commit

Permalink
more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
PietroPasotti committed Sep 27, 2024
1 parent 759c1f8 commit 8db021a
Show file tree
Hide file tree
Showing 6 changed files with 247 additions and 102 deletions.
96 changes: 54 additions & 42 deletions src/cosl/coordinated_workers/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import socket
from dataclasses import dataclass
from functools import partial
from pathlib import Path
from typing import (
Any,
Callable,
Expand All @@ -27,10 +26,11 @@
from urllib.parse import urlparse

import ops
import pydantic
import yaml
from ops import Container

import cosl
from cosl.coordinated_workers import worker
from cosl.coordinated_workers.interface import ClusterProvider, RemoteWriteEndpoint
from cosl.coordinated_workers.nginx import (
Nginx,
Expand Down Expand Up @@ -65,8 +65,6 @@

logger = logging.getLogger(__name__)

S3_TLS_CA_CHAIN_PATH = "/etc/worker/s3_ca.crt"

# The paths of the base rules to be rendered in CONSOLIDATED_ALERT_RULES_PATH
NGINX_ORIGINAL_ALERT_RULES_PATH = "./src/prometheus_alert_rules/nginx"
WORKER_ORIGINAL_ALERT_RULES_PATH = "./src/prometheus_alert_rules/workers"
Expand All @@ -82,6 +80,22 @@ class ClusterRolesConfigError(Exception):
"""Raised when the ClusterRolesConfig instance is not properly configured."""


class S3ConnectionInfo(pydantic.BaseModel):
"""Model for the s3 relation databag, as returned by the s3 charm lib."""

# they don't use it, we do

model_config = {"populate_by_name": True}

endpoint: str
bucket: str
access_key: str = pydantic.Field(alias="access-key")
secret_key: str = pydantic.Field(alias="secret-key")

region: Optional[str] = pydantic.Field(None)
tls_ca_chain: Optional[str] = pydantic.Field(None, alias="tls-ca-chain")


@dataclass
class ClusterRolesConfig:
"""Worker roles and deployment requirements."""
Expand Down Expand Up @@ -236,7 +250,6 @@ def __init__(
partial(resources_requests, self) if resources_requests is not None else None
)
self._container_name = container_name
self._container: Container = charm.unit.get_container(container_name)
self._resources_limit_options = resources_limit_options or {}
self.remote_write_endpoints_getter = remote_write_endpoints

Expand Down Expand Up @@ -430,6 +443,14 @@ def tls_available(self) -> bool:
and (self.cert_handler.ca_cert is not None)
)

@property
def s3_connection_info(self) -> S3ConnectionInfo:
"""Cast and validate the untyped s3 databag to something we can handle."""
try:
return S3ConnectionInfo(**self.s3_requirer.get_s3_connection_info())
except pydantic.ValidationError:
raise S3NotFoundError("s3 integration inactive or interface corrupt")

@property
def _s3_config(self) -> Dict[str, Any]:
"""The s3 configuration from relation data.
Expand All @@ -439,35 +460,24 @@ def _s3_config(self) -> Dict[str, Any]:
Raises:
S3NotFoundError: The s3 integration is inactive.
"""
s3_data = self.s3_requirer.get_s3_connection_info()
s3_config: Dict[str, Any] = {}
if not (
s3_data
and "bucket" in s3_data
and "endpoint" in s3_data
and "access-key" in s3_data
and "secret-key" in s3_data
):
raise S3NotFoundError("s3 integration inactive")
s3_config["insecure"] = not s3_data["endpoint"].startswith("https://")
s3_config["endpoint"] = re.sub(
rf"^{urlparse(s3_data['endpoint']).scheme}://", "", s3_data["endpoint"]
)
s3_config["region"] = s3_data.get("region", "")
s3_config["access_key_id"] = s3_data.pop("access-key")
s3_config["secret_access_key"] = s3_data.pop("secret-key")
s3_config["bucket_name"] = s3_data.pop("bucket")
ca_chain = s3_data.get("tls-ca-chain")
if ca_chain:
# put the cacert to disk
container = self._container
s3_data = self.s3_connection_info
s3_config = {
"endpoint": re.sub(rf"^{urlparse(s3_data.endpoint).scheme}://", "", s3_data.endpoint),
"region": s3_data.region,
"access_key_id": s3_data.access_key,
"secret_access_key": s3_data.secret_key,
"bucket_name": s3_data.bucket,
"insecure": not s3_data.tls_ca_chain,
# FIXME: s3 gives us a cert chain. tempo's s3 config takes:
# tls_cert_path: Path to the client certificate file.
# tls_key_path: Path to the private client key file.
# tls_ca_path: Path to the CA certificate file.
# tls_server_name: Path to the CA certificate file. # not a typo: it's the same
container.push(S3_TLS_CA_CHAIN_PATH, "\n\n".join(ca_chain))
s3_config["tls_cert_path"] = S3_TLS_CA_CHAIN_PATH
# we send to the worker the chain itself, but the tempo config actually wants a path to a file
# the worker will be responsible for putting it there
"tls_cert_path": worker.S3_TLS_CA_CHAIN_FILE if s3_data.tls_ca_chain else None,
}

return s3_config

@property
Expand Down Expand Up @@ -520,24 +530,23 @@ def _local_ip(self) -> Optional[str]:
def _workers_scrape_jobs(self) -> List[Dict[str, Any]]:
"""The Prometheus scrape jobs for the workers connected to the coordinator."""
scrape_jobs: List[Dict[str, Any]] = []
worker_topologies = self.cluster.gather_topology()

for worker in worker_topologies:
for worker_topology in self.cluster.gather_topology():
job = {
"static_configs": [
{
"targets": [f"{worker['address']}:{self._worker_metrics_port}"],
"targets": [f"{worker_topology['address']}:{self._worker_metrics_port}"],
}
],
# setting these as "labels" in the static config gets some of them
# replaced by the coordinator topology
# https://github.com/canonical/prometheus-k8s-operator/issues/571
"relabel_configs": [
{"target_label": "juju_charm", "replacement": worker["charm_name"]},
{"target_label": "juju_unit", "replacement": worker["unit"]},
{"target_label": "juju_charm", "replacement": worker_topology["charm_name"]},
{"target_label": "juju_unit", "replacement": worker_topology["unit"]},
{
"target_label": "juju_application",
"replacement": worker["application"],
"replacement": worker_topology["application"],
},
{"target_label": "juju_model", "replacement": self.model.name},
{"target_label": "juju_model_uuid", "replacement": self.model.uuid},
Expand Down Expand Up @@ -718,31 +727,34 @@ def update_cluster(self):
if self.remote_write_endpoints_getter
else None
),
s3_tls_ca_cert=self._s3_config,
)

def _render_workers_alert_rules(self):
"""Regenerate the worker alert rules from relation data."""
self._remove_rendered_alert_rules()

apps: Set[str] = set()
for worker in self.cluster.gather_topology():
if worker["application"] in apps:
for worker_topology in self.cluster.gather_topology():
if worker_topology["application"] in apps:
continue

apps.add(worker["application"])
apps.add(worker_topology["application"])
topology_dict = {
"model": self.model.name,
"model_uuid": self.model.uuid,
"application": worker["application"],
"unit": worker["unit"],
"charm_name": worker["charm_name"],
"application": worker_topology["application"],
"unit": worker_topology["unit"],
"charm_name": worker_topology["charm_name"],
}
topology = cosl.JujuTopology.from_dict(topology_dict)
alert_rules = cosl.AlertRules(query_type="promql", topology=topology)
alert_rules.add_path(WORKER_ORIGINAL_ALERT_RULES_PATH, recursive=True)
alert_rules_contents = yaml.dump(alert_rules.as_dict())

file_name = f"{CONSOLIDATED_ALERT_RULES_PATH}/rendered_{worker['application']}.rules"
file_name = (
f"{CONSOLIDATED_ALERT_RULES_PATH}/rendered_{worker_topology['application']}.rules"
)
with open(file_name, "w") as writer:
writer.write(alert_rules_contents)

Expand Down
31 changes: 23 additions & 8 deletions src/cosl/coordinated_workers/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import collections
import json
import logging
from collections import namedtuple
from typing import (
Any,
Counter,
Expand Down Expand Up @@ -171,7 +172,7 @@ class ClusterRequirerUnitData(DatabagModel):


class ClusterProviderAppData(DatabagModel):
"""App data the the coordinator sends to the worker."""
"""App data that the coordinator sends to the worker."""

### worker node configuration
worker_config: str
Expand All @@ -189,7 +190,18 @@ class ClusterProviderAppData(DatabagModel):
ca_cert: Optional[str] = None
server_cert: Optional[str] = None
privkey_secret_id: Optional[str] = None
"""TLS Config"""
s3_tls_ca_chain: Optional[str] = None


TLSData = namedtuple(
"TLSData",
[
"ca_cert",
"server_cert",
"privkey_secret_id",
"s3_tls_ca_chain",
],
)


class ClusterChangedEvent(ops.EventBase):
Expand Down Expand Up @@ -268,6 +280,7 @@ def publish_data(
worker_config: str,
ca_cert: Optional[str] = None,
server_cert: Optional[str] = None,
s3_tls_ca_cert: Optional[str] = None,
privkey_secret_id: Optional[str] = None,
loki_endpoints: Optional[Dict[str, str]] = None,
tracing_receivers: Optional[Dict[str, str]] = None,
Expand All @@ -284,6 +297,7 @@ def publish_data(
privkey_secret_id=privkey_secret_id,
tracing_receivers=tracing_receivers,
remote_write_endpoints=remote_write_endpoints,
s3_tls_ca_cert=s3_tls_ca_cert,
)
local_app_databag.dump(relation.data[self.model.app])

Expand Down Expand Up @@ -517,7 +531,7 @@ def get_loki_endpoints(self) -> Dict[str, str]:
return data.loki_endpoints or {}
return {}

def get_tls_data(self) -> Optional[Dict[str, str]]:
def get_tls_data(self) -> Optional[TLSData]:
"""Fetch certificates and the private key secrets id for the worker config."""
data = self._get_data_from_coordinator()
if not data:
Expand All @@ -526,11 +540,12 @@ def get_tls_data(self) -> Optional[Dict[str, str]]:
if not data.ca_cert or not data.server_cert or not data.privkey_secret_id:
return None

return {
"ca_cert": data.ca_cert,
"server_cert": data.server_cert,
"privkey_secret_id": data.privkey_secret_id,
}
return TLSData(
ca_cert=data.ca_cert,
server_cert=data.server_cert,
privkey_secret_id=data.privkey_secret_id,
s3_tls_ca_chain=data.s3_tls_ca_chain,
)

def get_tracing_receivers(self) -> Optional[Dict[str, str]]:
"""Fetch the tracing receivers from the coordinator databag."""
Expand Down
Loading

0 comments on commit 8db021a

Please sign in to comment.