diff --git a/lib/charms/grafana_k8s/v0/grafana_dashboard.py b/lib/charms/grafana_k8s/v0/grafana_dashboard.py index 1d550c94..de8715bf 100644 --- a/lib/charms/grafana_k8s/v0/grafana_dashboard.py +++ b/lib/charms/grafana_k8s/v0/grafana_dashboard.py @@ -219,7 +219,7 @@ def __init__(self, *args): # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 33 +LIBPATCH = 34 logger = logging.getLogger(__name__) @@ -790,7 +790,7 @@ def _inject_labels(content: str, topology: dict, transformer: "CosTool") -> str: # We need to use an index so we can insert the changed element back later for panel_idx, panel in enumerate(panels): - if type(panel) is not dict: + if not isinstance(panel, dict): continue # Use the index to insert it back in the same location @@ -835,7 +835,7 @@ def _modify_panel(panel: dict, topology: dict, transformer: "CosTool") -> dict: if panel["datasource"] not in known_datasources: continue querytype = known_datasources[panel["datasource"]] - elif type(panel["datasource"]) == dict: + elif isinstance(panel["datasource"], dict): if panel["datasource"]["uid"] not in known_datasources: continue querytype = known_datasources[panel["datasource"]["uid"]] diff --git a/lib/charms/tls_certificates_interface/v2/tls_certificates.py b/lib/charms/tls_certificates_interface/v2/tls_certificates.py index 11187eb1..fc2f450b 100644 --- a/lib/charms/tls_certificates_interface/v2/tls_certificates.py +++ b/lib/charms/tls_certificates_interface/v2/tls_certificates.py @@ -280,7 +280,7 @@ def _on_all_certificates_invalidated(self, event: AllCertificatesInvalidatedEven from contextlib import suppress from datetime import datetime, timedelta from ipaddress import IPv4Address -from typing import Any, Dict, List, Literal, Optional +from typing import Any, Dict, List, Literal, Optional, Union from cryptography import x509 from cryptography.hazmat._oid import ExtensionOID @@ -309,7 +309,7 @@ def _on_all_certificates_invalidated(self, event: AllCertificatesInvalidatedEven # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 9 +LIBPATCH = 10 PYDEPS = ["cryptography", "jsonschema"] @@ -1073,17 +1073,22 @@ def get_issued_certificates( """ certificates: Dict[str, Dict[str, str]] = defaultdict(dict) relations = ( - [self.model.relations[self.relationship_name][relation_id]] - if relation_id + [ + relation + for relation in self.model.relations[self.relationship_name] + if relation.id == relation_id + ] + if relation_id is not None else self.model.relations.get(self.relationship_name, []) ) for relation in relations: provider_relation_data = _load_relation_data(relation.data[self.charm.app]) provider_certificates = provider_relation_data.get("certificates", []) for certificate in provider_certificates: - certificates[relation.app.name].update( # type: ignore[union-attr] - {certificate["certificate_signing_request"]: certificate["certificate"]} - ) + if not certificate.get("revoked", False): + certificates[relation.app.name].update( # type: ignore[union-attr] + {certificate["certificate_signing_request"]: certificate["certificate"]} + ) return certificates def _on_relation_changed(self, event: RelationChangedEvent) -> None: @@ -1159,6 +1164,84 @@ def _revoke_certificates_for_which_no_csr_exists(self, relation_id: int) -> None ) self.remove_certificate(certificate=certificate["certificate"]) + def get_requirer_csrs_with_no_certs( + self, + ) -> List[Dict[str, Union[int, str, List[Dict[str, str]]]]]: + """Filters the requirer's units csrs. + + Keeps the ones for which no certificate was provided. + + Returns: + list: List of dictionaries that contain the unit's csrs + that don't have a certificate issued. + """ + all_unit_csr_mappings = copy.deepcopy(self.get_requirer_csrs()) + for unit_csr_mapping in all_unit_csr_mappings: + for csr in unit_csr_mapping["unit_csrs"]: # type: ignore[union-attr] + if self.certificate_issued_for_csr( + app_name=unit_csr_mapping["application_name"], # type: ignore[arg-type] + csr=csr["certificate_signing_request"], # type: ignore[index] + ): + unit_csr_mapping["unit_csrs"].remove(csr) # type: ignore[union-attr, arg-type] + if len(unit_csr_mapping["unit_csrs"]) == 0: # type: ignore[arg-type] + all_unit_csr_mappings.remove(unit_csr_mapping) + return all_unit_csr_mappings + + def get_requirer_csrs( + self, relation_id: Optional[int] = None + ) -> List[Dict[str, Union[int, str, List[Dict[str, str]]]]]: + """Returns a list of requirers' CSRs grouped by unit. + + It returns CSRs from all relations if relation_id is not specified. + CSRs are returned per relation id, application name and unit name. + + Returns: + list: List of dictionaries that contain the unit's csrs + with the following information + relation_id, application_name and unit_name. + """ + unit_csr_mappings: List[Dict[str, Union[int, str, List[Dict[str, str]]]]] = [] + + relations = ( + [ + relation + for relation in self.model.relations[self.relationship_name] + if relation.id == relation_id + ] + if relation_id is not None + else self.model.relations.get(self.relationship_name, []) + ) + + for relation in relations: + for unit in relation.units: + requirer_relation_data = _load_relation_data(relation.data[unit]) + unit_csrs_list = requirer_relation_data.get("certificate_signing_requests", []) + unit_csr_mappings.append( + { + "relation_id": relation.id, + "application_name": relation.app.name, # type: ignore[union-attr] + "unit_name": unit.name, + "unit_csrs": unit_csrs_list, + } + ) + return unit_csr_mappings + + def certificate_issued_for_csr(self, app_name: str, csr: str) -> bool: + """Checks whether a certificate has been issued for a given CSR. + + Args: + app_name (str): Application name that the CSR belongs to. + csr (str): Certificate Signing Request. + + Returns: + bool: True/False depending on whether a certificate has been issued for the given CSR. + """ + issued_certificates_per_csr = self.get_issued_certificates()[app_name] + for request, cert in issued_certificates_per_csr.items(): + if request == csr: + return csr_matches_certificate(csr, cert) + return False + class TLSCertificatesRequiresV2(Object): """TLS certificates requirer class to be instantiated by TLS certificates requirers.""" @@ -1196,7 +1279,7 @@ def __init__( @property def _requirer_csrs(self) -> List[Dict[str, str]]: - """Returns list of requirer CSR's from relation data.""" + """Returns list of requirer's CSRs from relation data.""" relation = self.model.get_relation(self.relationship_name) if not relation: raise RuntimeError(f"Relation {self.relationship_name} does not exist") @@ -1536,6 +1619,37 @@ def _on_update_status(self, event: UpdateStatusEvent) -> None: ) +def csr_matches_certificate(csr: str, cert: str) -> bool: + """Check if a CSR matches a certificate. + + expects to get the original string representations. + + Args: + csr (str): Certificate Signing Request + cert (str): Certificate + Returns: + bool: True/False depending on whether the CSR matches the certificate. + """ + try: + csr_object = x509.load_pem_x509_csr(csr.encode("utf-8")) + cert_object = x509.load_pem_x509_certificate(cert.encode("utf-8")) + + if csr_object.public_key().public_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PublicFormat.SubjectPublicKeyInfo, + ) != cert_object.public_key().public_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PublicFormat.SubjectPublicKeyInfo, + ): + return False + if csr_object.subject != cert_object.subject: + return False + except ValueError: + logger.warning("Could not load certificate or CSR.") + return False + return True + + def _get_closest_future_time( expiry_notification_time: datetime, expiry_time: datetime ) -> datetime: diff --git a/src/charm.py b/src/charm.py index 5b23a2a0..31a0c077 100755 --- a/src/charm.py +++ b/src/charm.py @@ -119,7 +119,7 @@ def __init__(self, *args): self, relation_name="ingress", port=self._port, - strip_prefix=True, + strip_prefix=False, redirect_https=True, scheme=lambda: "https" if self._is_tls_enabled() else "http", ) @@ -147,7 +147,9 @@ def __init__(self, *args): self.cert_handler.on.cert_changed, ], ) - self._prometheus_client = Prometheus(f"{external_url.scheme}://localhost:9090") + self._prometheus_client = Prometheus( + f"{external_url.scheme}://localhost:9090{external_url.path if external_url.path else ''}" + ) self.remote_write_provider = PrometheusRemoteWriteProvider( charm=self, @@ -617,7 +619,7 @@ def _generate_command(self) -> str: if self._web_config(): args.append(f"--web.config.file={WEB_CONFIG_PATH}") - args.append(f"--web.external-url={self.internal_url}") + args.append(f"--web.external-url={self.external_url}") if self.model.relations[DEFAULT_REMOTE_WRITE_RELATION_NAME]: args.append("--web.enable-remote-write-receiver")