Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

S3 tls cert #87

Merged
merged 18 commits into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "hatchling.build"

[project]
name = "cosl"
version = "0.0.35"
version = "0.0.37"
authors = [
{ name="sed-i", email="[email protected]" },
]
Expand Down
122 changes: 82 additions & 40 deletions src/cosl/coordinated_workers/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@
from urllib.parse import urlparse

import ops
import pydantic
import yaml

import cosl
from cosl.coordinated_workers import worker
from cosl.coordinated_workers.interface import ClusterProvider, RemoteWriteEndpoint
from cosl.coordinated_workers.nginx import (
Nginx,
Expand Down Expand Up @@ -78,6 +80,27 @@ class ClusterRolesConfigError(Exception):
"""Raised when the ClusterRolesConfig instance is not properly configured."""


class S3ConnectionInfo(pydantic.BaseModel):
"""Model for the s3 relation databag, as returned by the s3 charm lib."""

# they don't use it, we do

model_config = {"populate_by_name": True}

endpoint: str
bucket: str
access_key: str = pydantic.Field(alias="access-key")
secret_key: str = pydantic.Field(alias="secret-key")

region: Optional[str] = pydantic.Field(None)
tls_ca_chain: Optional[List[str]] = pydantic.Field(None, alias="tls-ca-chain")

@property
def ca_cert(self) -> Optional[str]:
"""Unify the ca chain provided by the lib into a single cert."""
return "\n\n".join(self.tls_ca_chain) if self.tls_ca_chain else None


@dataclass
class ClusterRolesConfig:
"""Worker roles and deployment requirements."""
Expand Down Expand Up @@ -425,6 +448,14 @@ def tls_available(self) -> bool:
and (self.cert_handler.ca_cert is not None)
)

@property
def s3_connection_info(self) -> S3ConnectionInfo:
"""Cast and validate the untyped s3 databag to something we can handle."""
try:
return S3ConnectionInfo(**self.s3_requirer.get_s3_connection_info())
except pydantic.ValidationError:
raise S3NotFoundError("s3 integration inactive or interface corrupt")

@property
def _s3_config(self) -> Dict[str, Any]:
"""The s3 configuration from relation data.
Expand All @@ -434,24 +465,24 @@ def _s3_config(self) -> Dict[str, Any]:
Raises:
S3NotFoundError: The s3 integration is inactive.
"""
s3_data = self.s3_requirer.get_s3_connection_info()
s3_config: Dict[str, Any] = {}
if not (
s3_data
and "bucket" in s3_data
and "endpoint" in s3_data
and "access-key" in s3_data
and "secret-key" in s3_data
):
raise S3NotFoundError("s3 integration inactive")
s3_config["insecure"] = not s3_data["endpoint"].startswith("https://")
s3_config["endpoint"] = re.sub(
rf"^{urlparse(s3_data['endpoint']).scheme}://", "", s3_data["endpoint"]
)
s3_config["region"] = s3_data.get("region", "")
s3_config["access_key_id"] = s3_data.pop("access-key")
s3_config["secret_access_key"] = s3_data.pop("secret-key")
s3_config["bucket_name"] = s3_data.pop("bucket")
s3_data = self.s3_connection_info
s3_config = {
"endpoint": re.sub(rf"^{urlparse(s3_data.endpoint).scheme}://", "", s3_data.endpoint),
"region": s3_data.region,
"access_key_id": s3_data.access_key,
"secret_access_key": s3_data.secret_key,
"bucket_name": s3_data.bucket,
"insecure": not s3_data.tls_ca_chain,
# FIXME: s3 gives us a cert chain. tempo's s3 config takes:
PietroPasotti marked this conversation as resolved.
Show resolved Hide resolved
# tls_cert_path: Path to the client certificate file.
# tls_key_path: Path to the private client key file.
# tls_ca_path: Path to the CA certificate file.
# tls_server_name: Path to the CA certificate file. # not a typo: it's the same
# we send to the worker the chain itself, but the tempo config actually wants a path to a file
# the worker will be responsible for putting it there
"tls_ca_path": worker.S3_TLS_CA_CHAIN_FILE if s3_data.tls_ca_chain else None,
}

return s3_config

@property
Expand Down Expand Up @@ -504,24 +535,23 @@ def _local_ip(self) -> Optional[str]:
def _workers_scrape_jobs(self) -> List[Dict[str, Any]]:
"""The Prometheus scrape jobs for the workers connected to the coordinator."""
scrape_jobs: List[Dict[str, Any]] = []
worker_topologies = self.cluster.gather_topology()

for worker in worker_topologies:
for worker_topology in self.cluster.gather_topology():
job = {
"static_configs": [
{
"targets": [f"{worker['address']}:{self._worker_metrics_port}"],
"targets": [f"{worker_topology['address']}:{self._worker_metrics_port}"],
}
],
# setting these as "labels" in the static config gets some of them
# replaced by the coordinator topology
# https://github.com/canonical/prometheus-k8s-operator/issues/571
"relabel_configs": [
{"target_label": "juju_charm", "replacement": worker["charm_name"]},
{"target_label": "juju_unit", "replacement": worker["unit"]},
{"target_label": "juju_charm", "replacement": worker_topology["charm_name"]},
{"target_label": "juju_unit", "replacement": worker_topology["unit"]},
{
"target_label": "juju_application",
"replacement": worker["application"],
"replacement": worker_topology["application"],
},
{"target_label": "juju_model", "replacement": self.model.name},
{"target_label": "juju_model_uuid", "replacement": self.model.uuid},
Expand Down Expand Up @@ -608,21 +638,30 @@ def _on_config_changed(self, _: ops.ConfigChangedEvent):
# keep this event handler at the bottom
def _on_collect_unit_status(self, e: ops.CollectStatusEvent):
# todo add [nginx.workload] statuses
statuses = []

if self.resources_patch and self.resources_patch.get_status().name != "active":
e.add_status(self.resources_patch.get_status())
statuses.append(self.resources_patch.get_status())

if not self.cluster.has_workers:
e.add_status(ops.BlockedStatus("[consistency] Missing any worker relation."))
if not self.is_coherent:
e.add_status(ops.BlockedStatus("[consistency] Cluster inconsistent."))
if not self.s3_ready:
e.add_status(ops.BlockedStatus("[consistency] Missing S3 integration."))
statuses.append(ops.BlockedStatus("[consistency] Missing any worker relation."))
elif not self.is_coherent:
statuses.append(ops.BlockedStatus("[consistency] Cluster inconsistent."))
elif not self.is_recommended:
# if is_recommended is None: it means we don't have a recommended deployment criterion.
e.add_status(ops.ActiveStatus("[coordinator] Degraded."))
else:
e.add_status(ops.ActiveStatus())
statuses.append(ops.ActiveStatus("Degraded."))

if not self.s3_requirer.relations:
statuses.append(ops.BlockedStatus("[s3] Missing S3 integration."))
elif not self.s3_ready:
statuses.append(ops.BlockedStatus("[s3] S3 not ready (probably misconfigured)."))

if not statuses:
statuses.append(ops.ActiveStatus())

for status in statuses:
e.add_status(status)


###################
# UTILITY METHODS #
Expand Down Expand Up @@ -702,31 +741,34 @@ def update_cluster(self):
if self.remote_write_endpoints_getter
else None
),
s3_tls_ca_chain=self.s3_connection_info.ca_cert,
)

def _render_workers_alert_rules(self):
"""Regenerate the worker alert rules from relation data."""
self._remove_rendered_alert_rules()

apps: Set[str] = set()
for worker in self.cluster.gather_topology():
if worker["application"] in apps:
for worker_topology in self.cluster.gather_topology():
if worker_topology["application"] in apps:
continue

apps.add(worker["application"])
apps.add(worker_topology["application"])
topology_dict = {
"model": self.model.name,
"model_uuid": self.model.uuid,
"application": worker["application"],
"unit": worker["unit"],
"charm_name": worker["charm_name"],
"application": worker_topology["application"],
"unit": worker_topology["unit"],
"charm_name": worker_topology["charm_name"],
}
topology = cosl.JujuTopology.from_dict(topology_dict)
alert_rules = cosl.AlertRules(query_type="promql", topology=topology)
alert_rules.add_path(WORKER_ORIGINAL_ALERT_RULES_PATH, recursive=True)
alert_rules_contents = yaml.dump(alert_rules.as_dict())

file_name = f"{CONSOLIDATED_ALERT_RULES_PATH}/rendered_{worker['application']}.rules"
file_name = (
f"{CONSOLIDATED_ALERT_RULES_PATH}/rendered_{worker_topology['application']}.rules"
)
with open(file_name, "w") as writer:
writer.write(alert_rules_contents)

Expand Down
33 changes: 24 additions & 9 deletions src/cosl/coordinated_workers/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
List,
Mapping,
MutableMapping,
NamedTuple,
Optional,
Set,
)
Expand Down Expand Up @@ -171,7 +172,7 @@ class ClusterRequirerUnitData(DatabagModel):


class ClusterProviderAppData(DatabagModel):
"""App data the the coordinator sends to the worker."""
"""App data that the coordinator sends to the worker."""

### worker node configuration
worker_config: str
Expand All @@ -189,7 +190,16 @@ class ClusterProviderAppData(DatabagModel):
ca_cert: Optional[str] = None
server_cert: Optional[str] = None
privkey_secret_id: Optional[str] = None
"""TLS Config"""
s3_tls_ca_chain: Optional[str] = None


class TLSData(NamedTuple):
"""Section of the cluster data that concerns TLS information."""

ca_cert: Optional[str]
server_cert: Optional[str]
privkey_secret_id: Optional[str]
s3_tls_ca_chain: Optional[str]


class ClusterChangedEvent(ops.EventBase):
Expand Down Expand Up @@ -268,6 +278,7 @@ def publish_data(
worker_config: str,
ca_cert: Optional[str] = None,
server_cert: Optional[str] = None,
s3_tls_ca_chain: Optional[str] = None,
privkey_secret_id: Optional[str] = None,
loki_endpoints: Optional[Dict[str, str]] = None,
tracing_receivers: Optional[Dict[str, str]] = None,
Expand All @@ -284,6 +295,7 @@ def publish_data(
privkey_secret_id=privkey_secret_id,
tracing_receivers=tracing_receivers,
remote_write_endpoints=remote_write_endpoints,
s3_tls_ca_chain=s3_tls_ca_chain,
)
local_app_databag.dump(relation.data[self.model.app])

Expand Down Expand Up @@ -517,20 +529,23 @@ def get_loki_endpoints(self) -> Dict[str, str]:
return data.loki_endpoints or {}
return {}

def get_tls_data(self) -> Optional[Dict[str, str]]:
def get_tls_data(self, allow_none: bool = False) -> Optional[TLSData]:
"""Fetch certificates and the private key secrets id for the worker config."""
data = self._get_data_from_coordinator()
if not data:
return None

if not data.ca_cert or not data.server_cert or not data.privkey_secret_id:
if (
not data.ca_cert or not data.server_cert or not data.privkey_secret_id
) and not allow_none:
return None

return {
"ca_cert": data.ca_cert,
"server_cert": data.server_cert,
"privkey_secret_id": data.privkey_secret_id,
}
return TLSData(
ca_cert=data.ca_cert,
server_cert=data.server_cert,
privkey_secret_id=data.privkey_secret_id,
s3_tls_ca_chain=data.s3_tls_ca_chain,
)

def get_tracing_receivers(self) -> Optional[Dict[str, str]]:
"""Fetch the tracing receivers from the coordinator databag."""
Expand Down
Loading