Skip to content

Commit

Permalink
S3 tls cert (#87)
Browse files Browse the repository at this point in the history
* added tls cert from s3

* vbump

* cert goes in workload container

* more tests

* vbump

* static fixes

* ca path and not cert path

* Use List[str] for tls_ca_chain and fix tests

* unified ca chain when passing it to the worker

* split tls update/wipe logic

* allow nones in tls data

* simpler sync logic

* better status collection

* pr comments and merge

* statuc

* unit

---------

Co-authored-by: Mateusz Kulewicz <[email protected]>
  • Loading branch information
PietroPasotti and mmkay authored Oct 2, 2024
1 parent 5e9316d commit e395157
Show file tree
Hide file tree
Showing 8 changed files with 371 additions and 129 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "hatchling.build"

[project]
name = "cosl"
version = "0.0.36"
version = "0.0.38"
authors = [
{ name="sed-i", email="[email protected]" },
]
Expand Down
118 changes: 78 additions & 40 deletions src/cosl/coordinated_workers/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,12 @@
from urllib.parse import urlparse

import ops
import pydantic
import yaml
from ops import StatusBase

import cosl
from cosl.coordinated_workers import worker
from cosl.coordinated_workers.interface import ClusterProvider, RemoteWriteEndpoint
from cosl.coordinated_workers.nginx import (
Nginx,
Expand Down Expand Up @@ -78,6 +81,27 @@ class ClusterRolesConfigError(Exception):
"""Raised when the ClusterRolesConfig instance is not properly configured."""


class S3ConnectionInfo(pydantic.BaseModel):
"""Model for the s3 relation databag, as returned by the s3 charm lib."""

# they don't use it, we do

model_config = {"populate_by_name": True}

endpoint: str
bucket: str
access_key: str = pydantic.Field(alias="access-key")
secret_key: str = pydantic.Field(alias="secret-key")

region: Optional[str] = pydantic.Field(None)
tls_ca_chain: Optional[List[str]] = pydantic.Field(None, alias="tls-ca-chain")

@property
def ca_cert(self) -> Optional[str]:
"""Unify the ca chain provided by the lib into a single cert."""
return "\n\n".join(self.tls_ca_chain) if self.tls_ca_chain else None


@dataclass
class ClusterRolesConfig:
"""Worker roles and deployment requirements."""
Expand Down Expand Up @@ -425,6 +449,15 @@ def tls_available(self) -> bool:
and (self.cert_handler.ca_cert is not None)
)

@property
def s3_connection_info(self) -> S3ConnectionInfo:
"""Cast and validate the untyped s3 databag to something we can handle."""
try:
# we have to type-ignore here because the s3 lib's type annotation is wrong
return S3ConnectionInfo(**self.s3_requirer.get_s3_connection_info()) # type: ignore
except pydantic.ValidationError:
raise S3NotFoundError("s3 integration inactive or interface corrupt")

@property
def _s3_config(self) -> Dict[str, Any]:
"""The s3 configuration from relation data.
Expand All @@ -434,24 +467,19 @@ def _s3_config(self) -> Dict[str, Any]:
Raises:
S3NotFoundError: The s3 integration is inactive.
"""
s3_data = self.s3_requirer.get_s3_connection_info()
s3_config: Dict[str, Any] = {}
if not (
s3_data
and "bucket" in s3_data
and "endpoint" in s3_data
and "access-key" in s3_data
and "secret-key" in s3_data
):
raise S3NotFoundError("s3 integration inactive")
s3_config["insecure"] = not s3_data["endpoint"].startswith("https://")
s3_config["endpoint"] = re.sub(
rf"^{urlparse(s3_data['endpoint']).scheme}://", "", s3_data["endpoint"]
)
s3_config["region"] = s3_data.get("region", "")
s3_config["access_key_id"] = s3_data.pop("access-key")
s3_config["secret_access_key"] = s3_data.pop("secret-key")
s3_config["bucket_name"] = s3_data.pop("bucket")
s3_data = self.s3_connection_info
s3_config = {
"endpoint": re.sub(rf"^{urlparse(s3_data.endpoint).scheme}://", "", s3_data.endpoint),
"region": s3_data.region,
"access_key_id": s3_data.access_key,
"secret_access_key": s3_data.secret_key,
"bucket_name": s3_data.bucket,
"insecure": not s3_data.tls_ca_chain,
# the tempo config wants a path to a file here. We pass the cert chain separately
# over the cluster relation; the worker will be responsible for writing the file to disk
"tls_ca_path": worker.S3_TLS_CA_CHAIN_FILE if s3_data.tls_ca_chain else None,
}

return s3_config

@property
Expand Down Expand Up @@ -504,24 +532,23 @@ def _local_ip(self) -> Optional[str]:
def _workers_scrape_jobs(self) -> List[Dict[str, Any]]:
"""The Prometheus scrape jobs for the workers connected to the coordinator."""
scrape_jobs: List[Dict[str, Any]] = []
worker_topologies = self.cluster.gather_topology()

for worker in worker_topologies:
for worker_topology in self.cluster.gather_topology():
job = {
"static_configs": [
{
"targets": [f"{worker['address']}:{self._worker_metrics_port}"],
"targets": [f"{worker_topology['address']}:{self._worker_metrics_port}"],
}
],
# setting these as "labels" in the static config gets some of them
# replaced by the coordinator topology
# https://github.com/canonical/prometheus-k8s-operator/issues/571
"relabel_configs": [
{"target_label": "juju_charm", "replacement": worker["charm_name"]},
{"target_label": "juju_unit", "replacement": worker["unit"]},
{"target_label": "juju_charm", "replacement": worker_topology["charm_name"]},
{"target_label": "juju_unit", "replacement": worker_topology["unit"]},
{
"target_label": "juju_application",
"replacement": worker["application"],
"replacement": worker_topology["application"],
},
{"target_label": "juju_model", "replacement": self.model.name},
{"target_label": "juju_model_uuid", "replacement": self.model.uuid},
Expand Down Expand Up @@ -608,21 +635,29 @@ def _on_config_changed(self, _: ops.ConfigChangedEvent):
# keep this event handler at the bottom
def _on_collect_unit_status(self, e: ops.CollectStatusEvent):
# todo add [nginx.workload] statuses
statuses: List[StatusBase] = []

if self.resources_patch and self.resources_patch.get_status().name != "active":
e.add_status(self.resources_patch.get_status())
statuses.append(self.resources_patch.get_status())

if not self.cluster.has_workers:
e.add_status(ops.BlockedStatus("[consistency] Missing any worker relation."))
if not self.is_coherent:
e.add_status(ops.BlockedStatus("[consistency] Cluster inconsistent."))
if not self.s3_ready:
e.add_status(ops.BlockedStatus("[consistency] Missing S3 integration."))
statuses.append(ops.BlockedStatus("[consistency] Missing any worker relation."))
elif not self.is_coherent:
statuses.append(ops.BlockedStatus("[consistency] Cluster inconsistent."))
elif not self.is_recommended:
# if is_recommended is None: it means we don't have a recommended deployment criterion.
e.add_status(ops.ActiveStatus("[coordinator] Degraded."))
else:
e.add_status(ops.ActiveStatus())
statuses.append(ops.ActiveStatus("Degraded."))

if not self.s3_requirer.relations:
statuses.append(ops.BlockedStatus("[s3] Missing S3 integration."))
elif not self.s3_ready:
statuses.append(ops.BlockedStatus("[s3] S3 not ready (probably misconfigured)."))

if not statuses:
statuses.append(ops.ActiveStatus())

for status in statuses:
e.add_status(status)

###################
# UTILITY METHODS #
Expand Down Expand Up @@ -702,31 +737,34 @@ def update_cluster(self):
if self.remote_write_endpoints_getter
else None
),
s3_tls_ca_chain=self.s3_connection_info.ca_cert,
)

def _render_workers_alert_rules(self):
"""Regenerate the worker alert rules from relation data."""
self._remove_rendered_alert_rules()

apps: Set[str] = set()
for worker in self.cluster.gather_topology():
if worker["application"] in apps:
for worker_topology in self.cluster.gather_topology():
if worker_topology["application"] in apps:
continue

apps.add(worker["application"])
apps.add(worker_topology["application"])
topology_dict = {
"model": self.model.name,
"model_uuid": self.model.uuid,
"application": worker["application"],
"unit": worker["unit"],
"charm_name": worker["charm_name"],
"application": worker_topology["application"],
"unit": worker_topology["unit"],
"charm_name": worker_topology["charm_name"],
}
topology = cosl.JujuTopology.from_dict(topology_dict)
alert_rules = cosl.AlertRules(query_type="promql", topology=topology)
alert_rules.add_path(WORKER_ORIGINAL_ALERT_RULES_PATH, recursive=True)
alert_rules_contents = yaml.dump(alert_rules.as_dict())

file_name = f"{CONSOLIDATED_ALERT_RULES_PATH}/rendered_{worker['application']}.rules"
file_name = (
f"{CONSOLIDATED_ALERT_RULES_PATH}/rendered_{worker_topology['application']}.rules"
)
with open(file_name, "w") as writer:
writer.write(alert_rules_contents)

Expand Down
33 changes: 24 additions & 9 deletions src/cosl/coordinated_workers/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
List,
Mapping,
MutableMapping,
NamedTuple,
Optional,
Set,
Tuple,
Expand Down Expand Up @@ -172,7 +173,7 @@ class ClusterRequirerUnitData(DatabagModel):


class ClusterProviderAppData(DatabagModel):
"""App data the the coordinator sends to the worker."""
"""App data that the coordinator sends to the worker."""

### worker node configuration
worker_config: str
Expand All @@ -190,7 +191,16 @@ class ClusterProviderAppData(DatabagModel):
ca_cert: Optional[str] = None
server_cert: Optional[str] = None
privkey_secret_id: Optional[str] = None
"""TLS Config"""
s3_tls_ca_chain: Optional[str] = None


class TLSData(NamedTuple):
"""Section of the cluster data that concerns TLS information."""

ca_cert: Optional[str]
server_cert: Optional[str]
privkey_secret_id: Optional[str]
s3_tls_ca_chain: Optional[str]


class ClusterChangedEvent(ops.EventBase):
Expand Down Expand Up @@ -269,6 +279,7 @@ def publish_data(
worker_config: str,
ca_cert: Optional[str] = None,
server_cert: Optional[str] = None,
s3_tls_ca_chain: Optional[str] = None,
privkey_secret_id: Optional[str] = None,
loki_endpoints: Optional[Dict[str, str]] = None,
tracing_receivers: Optional[Dict[str, str]] = None,
Expand All @@ -285,6 +296,7 @@ def publish_data(
privkey_secret_id=privkey_secret_id,
tracing_receivers=tracing_receivers,
remote_write_endpoints=remote_write_endpoints,
s3_tls_ca_chain=s3_tls_ca_chain,
)
local_app_databag.dump(relation.data[self.model.app])

Expand Down Expand Up @@ -518,20 +530,23 @@ def get_loki_endpoints(self) -> Dict[str, str]:
return data.loki_endpoints or {}
return {}

def get_tls_data(self) -> Optional[Dict[str, str]]:
def get_tls_data(self, allow_none: bool = False) -> Optional[TLSData]:
"""Fetch certificates and the private key secrets id for the worker config."""
data = self._get_data_from_coordinator()
if not data:
return None

if not data.ca_cert or not data.server_cert or not data.privkey_secret_id:
if (
not data.ca_cert or not data.server_cert or not data.privkey_secret_id
) and not allow_none:
return None

return {
"ca_cert": data.ca_cert,
"server_cert": data.server_cert,
"privkey_secret_id": data.privkey_secret_id,
}
return TLSData(
ca_cert=data.ca_cert,
server_cert=data.server_cert,
privkey_secret_id=data.privkey_secret_id,
s3_tls_ca_chain=data.s3_tls_ca_chain,
)

def get_tracing_receivers(self) -> Optional[Dict[str, str]]:
"""Fetch the tracing receivers from the coordinator databag."""
Expand Down
Loading

0 comments on commit e395157

Please sign in to comment.