diff --git a/lib/charms/tls_certificates_interface/v1/tls_certificates.py b/lib/charms/tls_certificates_interface/v2/tls_certificates.py similarity index 70% rename from lib/charms/tls_certificates_interface/v1/tls_certificates.py rename to lib/charms/tls_certificates_interface/v2/tls_certificates.py index 1eda19bf..11187eb1 100644 --- a/lib/charms/tls_certificates_interface/v1/tls_certificates.py +++ b/lib/charms/tls_certificates_interface/v2/tls_certificates.py @@ -1,6 +1,7 @@ # Copyright 2021 Canonical Ltd. # See LICENSE file for licensing details. + """Library for the tls-certificates relation. This library contains the Requires and Provides classes for handling the tls-certificates @@ -10,7 +11,7 @@ From a charm directory, fetch the library using `charmcraft`: ```shell -charmcraft fetch-lib charms.tls_certificates_interface.v1.tls_certificates +charmcraft fetch-lib charms.tls_certificates_interface.v2.tls_certificates ``` Add the following libraries to the charm's `requirements.txt` file: @@ -35,10 +36,10 @@ Example: ```python -from charms.tls_certificates_interface.v1.tls_certificates import ( +from charms.tls_certificates_interface.v2.tls_certificates import ( CertificateCreationRequestEvent, CertificateRevocationRequestEvent, - TLSCertificatesProvidesV1, + TLSCertificatesProvidesV2, generate_private_key, ) from ops.charm import CharmBase, InstallEvent @@ -58,12 +59,14 @@ class ExampleProviderCharm(CharmBase): def __init__(self, *args): super().__init__(*args) - self.certificates = TLSCertificatesProvidesV1(self, "certificates") + self.certificates = TLSCertificatesProvidesV2(self, "certificates") self.framework.observe( - self.certificates.on.certificate_request, self._on_certificate_request + self.certificates.on.certificate_request, + self._on_certificate_request ) self.framework.observe( - self.certificates.on.certificate_revoked, self._on_certificate_revocation_request + self.certificates.on.certificate_revocation_request, + self._on_certificate_revocation_request ) self.framework.observe(self.on.install, self._on_install) @@ -123,16 +126,18 @@ def _on_certificate_revocation_request(self, event: CertificateRevocationRequest Example: ```python -from charms.tls_certificates_interface.v1.tls_certificates import ( +from charms.tls_certificates_interface.v2.tls_certificates import ( CertificateAvailableEvent, CertificateExpiringEvent, - TLSCertificatesRequiresV1, + CertificateRevokedEvent, + TLSCertificatesRequiresV2, generate_csr, generate_private_key, ) from ops.charm import CharmBase, RelationJoinedEvent from ops.main import main from ops.model import ActiveStatus, WaitingStatus +from typing import Union class ExampleRequirerCharm(CharmBase): @@ -140,7 +145,7 @@ class ExampleRequirerCharm(CharmBase): def __init__(self, *args): super().__init__(*args) self.cert_subject = "whatever" - self.certificates = TLSCertificatesRequiresV1(self, "certificates") + self.certificates = TLSCertificatesRequiresV2(self, "certificates") self.framework.observe(self.on.install, self._on_install) self.framework.observe( self.on.certificates_relation_joined, self._on_certificates_relation_joined @@ -151,6 +156,13 @@ def __init__(self, *args): self.framework.observe( self.certificates.on.certificate_expiring, self._on_certificate_expiring ) + self.framework.observe( + self.certificates.on.certificate_invalidated, self._on_certificate_invalidated + ) + self.framework.observe( + self.certificates.on.all_certificates_invalidated, + self._on_all_certificates_invalidated + ) def _on_install(self, event) -> None: private_key_password = b"banana" @@ -191,7 +203,9 @@ def _on_certificate_available(self, event: CertificateAvailableEvent) -> None: replicas_relation.data[self.app].update({"chain": event.chain}) self.unit.status = ActiveStatus() - def _on_certificate_expiring(self, event: CertificateExpiringEvent) -> None: + def _on_certificate_expiring( + self, event: Union[CertificateExpiringEvent, CertificateInvalidatedEvent] + ) -> None: replicas_relation = self.model.get_relation("replicas") if not replicas_relation: self.unit.status = WaitingStatus("Waiting for peer relation to be created") @@ -211,19 +225,62 @@ def _on_certificate_expiring(self, event: CertificateExpiringEvent) -> None: ) replicas_relation.data[self.app].update({"csr": new_csr.decode()}) + def _certificate_revoked(self) -> None: + old_csr = replicas_relation.data[self.app].get("csr") + private_key_password = replicas_relation.data[self.app].get("private_key_password") + private_key = replicas_relation.data[self.app].get("private_key") + new_csr = generate_csr( + private_key=private_key.encode(), + private_key_password=private_key_password.encode(), + subject=self.cert_subject, + ) + self.certificates.request_certificate_renewal( + old_certificate_signing_request=old_csr, + new_certificate_signing_request=new_csr, + ) + replicas_relation.data[self.app].update({"csr": new_csr.decode()}) + replicas_relation.data[self.app].pop("certificate") + replicas_relation.data[self.app].pop("ca") + replicas_relation.data[self.app].pop("chain") + self.unit.status = WaitingStatus("Waiting for new certificate") + + def _on_certificate_invalidated(self, event: CertificateInvalidatedEvent) -> None: + replicas_relation = self.model.get_relation("replicas") + if not replicas_relation: + self.unit.status = WaitingStatus("Waiting for peer relation to be created") + event.defer() + return + if event.reason == "revoked": + self._certificate_revoked() + if event.reason == "expired": + self._on_certificate_expiring(event) + + def _on_all_certificates_invalidated(self, event: AllCertificatesInvalidatedEvent) -> None: + # Do what you want with this information, probably remove all certificates. + pass + if __name__ == "__main__": main(ExampleRequirerCharm) ``` + +You can relate both charms by running: + +```bash +juju relate +``` + """ # noqa: D405, D410, D411, D214, D416 import copy import json import logging import uuid +from collections import defaultdict +from contextlib import suppress from datetime import datetime, timedelta from ipaddress import IPv4Address -from typing import Dict, List, Optional +from typing import Any, Dict, List, Literal, Optional from cryptography import x509 from cryptography.hazmat._oid import ExtensionOID @@ -232,22 +289,33 @@ def _on_certificate_expiring(self, event: CertificateExpiringEvent) -> None: from cryptography.hazmat.primitives.serialization import pkcs12 from cryptography.x509.extensions import Extension, ExtensionNotFound from jsonschema import exceptions, validate # type: ignore[import] -from ops.charm import CharmBase, CharmEvents, RelationChangedEvent, UpdateStatusEvent +from ops.charm import ( + CharmBase, + CharmEvents, + RelationBrokenEvent, + RelationChangedEvent, + SecretExpiredEvent, + UpdateStatusEvent, +) from ops.framework import EventBase, EventSource, Handle, Object +from ops.jujuversion import JujuVersion +from ops.model import SecretNotFoundError # The unique Charmhub library identifier, never change it LIBID = "afd8c2bccf834997afce12c2706d2ede" # Increment this major API version when introducing breaking changes -LIBAPI = 1 +LIBAPI = 2 # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 10 +LIBPATCH = 9 + +PYDEPS = ["cryptography", "jsonschema"] REQUIRER_JSON_SCHEMA = { "$schema": "http://json-schema.org/draft-04/schema#", - "$id": "https://canonical.github.io/charm-relation-interfaces/tls_certificates/v1/schemas/requirer.json", # noqa: E501 + "$id": "https://canonical.github.io/charm-relation-interfaces/tls_certificates/v2/schemas/requirer.json", # noqa: E501 "type": "object", "title": "`tls_certificates` requirer root schema", "description": "The `tls_certificates` root schema comprises the entire requirer databag for this interface.", # noqa: E501 @@ -279,11 +347,11 @@ def _on_certificate_expiring(self, event: CertificateExpiringEvent) -> None: PROVIDER_JSON_SCHEMA = { "$schema": "http://json-schema.org/draft-04/schema#", - "$id": "https://canonical.github.io/charm-relation-interfaces/tls_certificates/v1/schemas/provider.json", # noqa: E501 + "$id": "https://canonical.github.io/charm-relation-interfaces/tls_certificates/v2/schemas/provider.json", # noqa: E501 "type": "object", "title": "`tls_certificates` provider root schema", "description": "The `tls_certificates` root schema comprises the entire provider databag for this interface.", # noqa: E501 - "example": [ + "examples": [ { "certificates": [ { @@ -295,7 +363,20 @@ def _on_certificate_expiring(self, event: CertificateExpiringEvent) -> None: "certificate": "-----BEGIN CERTIFICATE-----\nMIICvDCCAaQCFFPAOD7utDTsgFrm0vS4We18OcnKMA0GCSqGSIb3DQEBCwUAMCAx\nCzAJBgNVBAYTAlVTMREwDwYDVQQDDAh3aGF0ZXZlcjAeFw0yMjA3MjkyMTE5Mzha\nFw0yMzA3MjkyMTE5MzhaMBUxEzARBgNVBAMMCmJhbmFuYS5jb20wggEiMA0GCSqG\nSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDVpcfcBOnFuyZG+A2WQzmaBI5NXgwTCfvE\neKciqRQXhzJdUkEg7eqwFrK3y9yjhoiB6q0WNAeR+nOdS/Cw7layRtGz5skOq7Aa\nN4FZHg0or30i7Rrx7afJcGJyLpxfK/OfLmJm5QEdLXV0DZp0L5vuhhEb1EUOrMaY\nGe4iwqTyg6D7fuBili9dBVn9IvNhYMVgtiqkWVLTW4ChE0LgES4oO3rQZgp4dtM5\nsp6KwHGO766UzwGnkKRizaqmLylfVusllWNPFfp6gEaxa45N70oqGUrvGSVHWeHf\nfvkhpWx+wOnu+2A5F/Yv3UNz2v4g7Vjt7V0tjL4KMV9YklpRjTh3AgMBAAEwDQYJ\nKoZIhvcNAQELBQADggEBAChjRzuba8zjQ7NYBVas89Oy7u++MlS8xWxh++yiUsV6\nWMk3ZemsPtXc1YmXorIQohtxLxzUPm2JhyzFzU/sOLmJQ1E/l+gtZHyRCwsb20fX\nmphuJsMVd7qv/GwEk9PBsk2uDqg4/Wix0Rx5lf95juJP7CPXQJl5FQauf3+LSz0y\nwF/j+4GqvrwsWr9hKOLmPdkyKkR6bHKtzzsxL9PM8GnElk2OpaPMMnzbL/vt2IAt\nxK01ZzPxCQCzVwHo5IJO5NR/fIyFbEPhxzG17QsRDOBR9fl9cOIvDeSO04vyZ+nz\n+kA2c3fNrZFAtpIlOOmFh8Q12rVL4sAjI5mVWnNEgvI=\n-----END CERTIFICATE-----\n", # noqa: E501 } ] - } + }, + { + "certificates": [ + { + "ca": "-----BEGIN CERTIFICATE-----\\nMIIDJTCCAg2gAwIBAgIUMsSK+4FGCjW6sL/EXMSxColmKw8wDQYJKoZIhvcNAQEL\\nBQAwIDELMAkGA1UEBhMCVVMxETAPBgNVBAMMCHdoYXRldmVyMB4XDTIyMDcyOTIx\\nMTgyN1oXDTIzMDcyOTIxMTgyN1owIDELMAkGA1UEBhMCVVMxETAPBgNVBAMMCHdo\\nYXRldmVyMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA55N9DkgFWbJ/\\naqcdQhso7n1kFvt6j/fL1tJBvRubkiFMQJnZFtekfalN6FfRtA3jq+nx8o49e+7t\\nLCKT0xQ+wufXfOnxv6/if6HMhHTiCNPOCeztUgQ2+dfNwRhYYgB1P93wkUVjwudK\\n13qHTTZ6NtEF6EzOqhOCe6zxq6wrr422+ZqCvcggeQ5tW9xSd/8O1vNID/0MTKpy\\nET3drDtBfHmiUEIBR3T3tcy6QsIe4Rz/2sDinAcM3j7sG8uY6drh8jY3PWar9til\\nv2l4qDYSU8Qm5856AB1FVZRLRJkLxZYZNgreShAIYgEd0mcyI2EO/UvKxsIcxsXc\\nd45GhGpKkwIDAQABo1cwVTAfBgNVHQ4EGAQWBBRXBrXKh3p/aFdQjUcT/UcvICBL\\nODAhBgNVHSMEGjAYgBYEFFcGtcqHen9oV1CNRxP9Ry8gIEs4MA8GA1UdEwEB/wQF\\nMAMBAf8wDQYJKoZIhvcNAQELBQADggEBAGmCEvcoFUrT9e133SHkgF/ZAgzeIziO\\nBjfAdU4fvAVTVfzaPm0yBnGqzcHyacCzbZjKQpaKVgc5e6IaqAQtf6cZJSCiJGhS\\nJYeosWrj3dahLOUAMrXRr8G/Ybcacoqc+osKaRa2p71cC3V6u2VvcHRV7HDFGJU7\\noijbdB+WhqET6Txe67rxZCJG9Ez3EOejBJBl2PJPpy7m1Ml4RR+E8YHNzB0lcBzc\\nEoiJKlDfKSO14E2CPDonnUoWBJWjEvJys3tbvKzsRj2fnLilytPFU0gH3cEjCopi\\nzFoWRdaRuNHYCqlBmso1JFDl8h4fMmglxGNKnKRar0WeGyxb4xXBGpI=\\n-----END CERTIFICATE-----\\n", # noqa: E501 + "chain": [ + "-----BEGIN CERTIFICATE-----\\nMIIDJTCCAg2gAwIBAgIUMsSK+4FGCjW6sL/EXMSxColmKw8wDQYJKoZIhvcNAQEL\\nBQAwIDELMAkGA1UEBhMCVVMxETAPBgNVBAMMCHdoYXRldmVyMB4XDTIyMDcyOTIx\\nMTgyN1oXDTIzMDcyOTIxMTgyN1owIDELMAkGA1UEBhMCVVMxETAPBgNVBAMMCHdo\\nYXRldmVyMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA55N9DkgFWbJ/\\naqcdQhso7n1kFvt6j/fL1tJBvRubkiFMQJnZFtekfalN6FfRtA3jq+nx8o49e+7t\\nLCKT0xQ+wufXfOnxv6/if6HMhHTiCNPOCeztUgQ2+dfNwRhYYgB1P93wkUVjwudK\\n13qHTTZ6NtEF6EzOqhOCe6zxq6wrr422+ZqCvcggeQ5tW9xSd/8O1vNID/0MTKpy\\nET3drDtBfHmiUEIBR3T3tcy6QsIe4Rz/2sDinAcM3j7sG8uY6drh8jY3PWar9til\\nv2l4qDYSU8Qm5856AB1FVZRLRJkLxZYZNgreShAIYgEd0mcyI2EO/UvKxsIcxsXc\\nd45GhGpKkwIDAQABo1cwVTAfBgNVHQ4EGAQWBBRXBrXKh3p/aFdQjUcT/UcvICBL\\nODAhBgNVHSMEGjAYgBYEFFcGtcqHen9oV1CNRxP9Ry8gIEs4MA8GA1UdEwEB/wQF\\nMAMBAf8wDQYJKoZIhvcNAQELBQADggEBAGmCEvcoFUrT9e133SHkgF/ZAgzeIziO\\nBjfAdU4fvAVTVfzaPm0yBnGqzcHyacCzbZjKQpaKVgc5e6IaqAQtf6cZJSCiJGhS\\nJYeosWrj3dahLOUAMrXRr8G/Ybcacoqc+osKaRa2p71cC3V6u2VvcHRV7HDFGJU7\\noijbdB+WhqET6Txe67rxZCJG9Ez3EOejBJBl2PJPpy7m1Ml4RR+E8YHNzB0lcBzc\\nEoiJKlDfKSO14E2CPDonnUoWBJWjEvJys3tbvKzsRj2fnLilytPFU0gH3cEjCopi\\nzFoWRdaRuNHYCqlBmso1JFDl8h4fMmglxGNKnKRar0WeGyxb4xXBGpI=\\n-----END CERTIFICATE-----\\n" # noqa: E501, W505 + ], + "certificate_signing_request": "-----BEGIN CERTIFICATE REQUEST-----\nMIICWjCCAUICAQAwFTETMBEGA1UEAwwKYmFuYW5hLmNvbTCCASIwDQYJKoZIhvcN\nAQEBBQADggEPADCCAQoCggEBANWlx9wE6cW7Jkb4DZZDOZoEjk1eDBMJ+8R4pyKp\nFBeHMl1SQSDt6rAWsrfL3KOGiIHqrRY0B5H6c51L8LDuVrJG0bPmyQ6rsBo3gVke\nDSivfSLtGvHtp8lwYnIunF8r858uYmblAR0tdXQNmnQvm+6GERvURQ6sxpgZ7iLC\npPKDoPt+4GKWL10FWf0i82FgxWC2KqRZUtNbgKETQuARLig7etBmCnh20zmynorA\ncY7vrpTPAaeQpGLNqqYvKV9W6yWVY08V+nqARrFrjk3vSioZSu8ZJUdZ4d9++SGl\nbH7A6e77YDkX9i/dQ3Pa/iDtWO3tXS2MvgoxX1iSWlGNOHcCAwEAAaAAMA0GCSqG\nSIb3DQEBCwUAA4IBAQCW1fKcHessy/ZhnIwAtSLznZeZNH8LTVOzkhVd4HA7EJW+\nKVLBx8DnN7L3V2/uPJfHiOg4Rx7fi7LkJPegl3SCqJZ0N5bQS/KvDTCyLG+9E8Y+\n7wqCmWiXaH1devimXZvazilu4IC2dSks2D8DPWHgsOdVks9bme8J3KjdNMQudegc\newWZZ1Dtbd+Rn7cpKU3jURMwm4fRwGxbJ7iT5fkLlPBlyM/yFEik4SmQxFYrZCQg\n0f3v4kBefTh5yclPy5tEH+8G0LMsbbo3dJ5mPKpAShi0QEKDLd7eR1R/712lYTK4\ndi4XaEfqERgy68O4rvb4PGlJeRGS7AmL7Ss8wfAq\n-----END CERTIFICATE REQUEST-----\n", # noqa: E501 + "certificate": "-----BEGIN CERTIFICATE-----\nMIICvDCCAaQCFFPAOD7utDTsgFrm0vS4We18OcnKMA0GCSqGSIb3DQEBCwUAMCAx\nCzAJBgNVBAYTAlVTMREwDwYDVQQDDAh3aGF0ZXZlcjAeFw0yMjA3MjkyMTE5Mzha\nFw0yMzA3MjkyMTE5MzhaMBUxEzARBgNVBAMMCmJhbmFuYS5jb20wggEiMA0GCSqG\nSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDVpcfcBOnFuyZG+A2WQzmaBI5NXgwTCfvE\neKciqRQXhzJdUkEg7eqwFrK3y9yjhoiB6q0WNAeR+nOdS/Cw7layRtGz5skOq7Aa\nN4FZHg0or30i7Rrx7afJcGJyLpxfK/OfLmJm5QEdLXV0DZp0L5vuhhEb1EUOrMaY\nGe4iwqTyg6D7fuBili9dBVn9IvNhYMVgtiqkWVLTW4ChE0LgES4oO3rQZgp4dtM5\nsp6KwHGO766UzwGnkKRizaqmLylfVusllWNPFfp6gEaxa45N70oqGUrvGSVHWeHf\nfvkhpWx+wOnu+2A5F/Yv3UNz2v4g7Vjt7V0tjL4KMV9YklpRjTh3AgMBAAEwDQYJ\nKoZIhvcNAQELBQADggEBAChjRzuba8zjQ7NYBVas89Oy7u++MlS8xWxh++yiUsV6\nWMk3ZemsPtXc1YmXorIQohtxLxzUPm2JhyzFzU/sOLmJQ1E/l+gtZHyRCwsb20fX\nmphuJsMVd7qv/GwEk9PBsk2uDqg4/Wix0Rx5lf95juJP7CPXQJl5FQauf3+LSz0y\nwF/j+4GqvrwsWr9hKOLmPdkyKkR6bHKtzzsxL9PM8GnElk2OpaPMMnzbL/vt2IAt\nxK01ZzPxCQCzVwHo5IJO5NR/fIyFbEPhxzG17QsRDOBR9fl9cOIvDeSO04vyZ+nz\n+kA2c3fNrZFAtpIlOOmFh8Q12rVL4sAjI5mVWnNEgvI=\n-----END CERTIFICATE-----\n", # noqa: E501 + "revoked": True, + } + ] + }, ], "properties": { "certificates": { @@ -323,6 +404,10 @@ def _on_certificate_expiring(self, event: CertificateExpiringEvent) -> None: "$id": "#/properties/certificates/items/chain/items", }, }, + "revoked": { + "$id": "#/properties/certificates/items/revoked", + "type": "boolean", + }, }, "additionalProperties": True, }, @@ -379,7 +464,7 @@ def __init__(self, handle, certificate: str, expiry: str): Args: handle (Handle): Juju framework handle certificate (str): TLS Certificate - expiry (str): Datetime string reprensenting the time at which the certificate + expiry (str): Datetime string representing the time at which the certificate won't be valid anymore. """ super().__init__(handle) @@ -396,20 +481,57 @@ def restore(self, snapshot: dict): self.expiry = snapshot["expiry"] -class CertificateExpiredEvent(EventBase): - """Charm Event triggered when a TLS certificate is expired.""" +class CertificateInvalidatedEvent(EventBase): + """Charm Event triggered when a TLS certificate is invalidated.""" - def __init__(self, handle: Handle, certificate: str): + def __init__( + self, + handle: Handle, + reason: Literal["expired", "revoked"], + certificate: str, + certificate_signing_request: str, + ca: str, + chain: List[str], + ): super().__init__(handle) + self.reason = reason + self.certificate_signing_request = certificate_signing_request self.certificate = certificate + self.ca = ca + self.chain = chain def snapshot(self) -> dict: """Returns snapshot.""" - return {"certificate": self.certificate} + return { + "reason": self.reason, + "certificate_signing_request": self.certificate_signing_request, + "certificate": self.certificate, + "ca": self.ca, + "chain": self.chain, + } def restore(self, snapshot: dict): """Restores snapshot.""" + self.reason = snapshot["reason"] + self.certificate_signing_request = snapshot["certificate_signing_request"] self.certificate = snapshot["certificate"] + self.ca = snapshot["ca"] + self.chain = snapshot["chain"] + + +class AllCertificatesInvalidatedEvent(EventBase): + """Charm Event triggered when all TLS certificates are invalidated.""" + + def __init__(self, handle: Handle): + super().__init__(handle) + + def snapshot(self) -> dict: + """Returns snapshot.""" + return {} + + def restore(self, snapshot: dict): + """Restores snapshot.""" + pass class CertificateCreationRequestEvent(EventBase): @@ -551,7 +673,7 @@ def generate_certificate( ca_key: bytes, ca_key_password: Optional[bytes] = None, validity: int = 365, - alt_names: List[str] = None, + alt_names: Optional[List[str]] = None, ) -> bytes: """Generates a TLS certificate based on a CSR. @@ -679,9 +801,9 @@ def generate_csr( private_key: bytes, subject: str, add_unique_id_to_subject_name: bool = True, - organization: str = None, - email_address: str = None, - country_name: str = None, + organization: Optional[str] = None, + email_address: Optional[str] = None, + country_name: Optional[str] = None, private_key_password: Optional[bytes] = None, sans: Optional[List[str]] = None, sans_oid: Optional[List[str]] = None, @@ -759,10 +881,11 @@ class CertificatesRequirerCharmEvents(CharmEvents): certificate_available = EventSource(CertificateAvailableEvent) certificate_expiring = EventSource(CertificateExpiringEvent) - certificate_expired = EventSource(CertificateExpiredEvent) + certificate_invalidated = EventSource(CertificateInvalidatedEvent) + all_certificates_invalidated = EventSource(AllCertificatesInvalidatedEvent) -class TLSCertificatesProvidesV1(Object): +class TLSCertificatesProvidesV2(Object): """TLS certificates provider class to be instantiated by TLS certificates providers.""" on = CertificatesProviderCharmEvents() @@ -821,8 +944,8 @@ def _add_certificate( def _remove_certificate( self, relation_id: int, - certificate: str = None, - certificate_signing_request: str = None, + certificate: Optional[str] = None, + certificate_signing_request: Optional[str] = None, ) -> None: """Removes certificate from a given relation based on user provided certificate or csr. @@ -877,7 +1000,11 @@ def revoke_all_certificates(self) -> None: This method is meant to be used when the Root CA has changed. """ for relation in self.model.relations[self.relationship_name]: - relation.data[self.model.app]["certificates"] = json.dumps([]) + provider_relation_data = _load_relation_data(relation.data[self.charm.app]) + provider_certificates = copy.deepcopy(provider_relation_data.get("certificates", [])) + for certificate in provider_certificates: + certificate["revoked"] = True + relation.data[self.model.app]["certificates"] = json.dumps(provider_certificates) def set_relation_certificate( self, @@ -899,6 +1026,8 @@ def set_relation_certificate( Returns: None """ + if not self.model.unit.is_leader(): + return certificates_relation = self.model.get_relation( relation_name=self.relationship_name, relation_id=relation_id ) @@ -931,8 +1060,34 @@ def remove_certificate(self, certificate: str) -> None: for certificate_relation in certificates_relation: self._remove_certificate(certificate=certificate, relation_id=certificate_relation.id) + def get_issued_certificates( + self, relation_id: Optional[int] = None + ) -> Dict[str, Dict[str, str]]: + """Returns a dictionary of issued certificates. + + It returns certificates from all relations if relation_id is not specified. + Certificates are returned per application name and CSR. + + Returns: + dict: Certificates per application name. + """ + certificates: Dict[str, Dict[str, str]] = defaultdict(dict) + relations = ( + [self.model.relations[self.relationship_name][relation_id]] + if relation_id + else self.model.relations.get(self.relationship_name, []) + ) + for relation in relations: + provider_relation_data = _load_relation_data(relation.data[self.charm.app]) + provider_certificates = provider_relation_data.get("certificates", []) + for certificate in provider_certificates: + certificates[relation.app.name].update( # type: ignore[union-attr] + {certificate["certificate_signing_request"]: certificate["certificate"]} + ) + return certificates + def _on_relation_changed(self, event: RelationChangedEvent) -> None: - """Handler triggerred on relation changed event. + """Handler triggered on relation changed event. Looks at the relation data and either emits: - certificate request event: If the unit relation data contains a CSR for which @@ -950,9 +1105,7 @@ def _on_relation_changed(self, event: RelationChangedEvent) -> None: requirer_relation_data = _load_relation_data(event.relation.data[event.unit]) provider_relation_data = _load_relation_data(event.relation.data[self.charm.app]) if not self._relation_data_is_valid(requirer_relation_data): - logger.warning( - f"Relation data did not pass JSON Schema validation: {requirer_relation_data}" - ) + logger.debug("Relation data did not pass JSON Schema validation") return provider_certificates = provider_relation_data.get("certificates", []) requirer_csrs = requirer_relation_data.get("certificate_signing_requests", []) @@ -975,7 +1128,7 @@ def _on_relation_changed(self, event: RelationChangedEvent) -> None: def _revoke_certificates_for_which_no_csr_exists(self, relation_id: int) -> None: """Revokes certificates for which no unit has a CSR. - Goes through all generated certificates and compare agains the list of CSRS for all units + Goes through all generated certificates and compare against the list of CSRs for all units of a given relationship. Args: @@ -1007,7 +1160,7 @@ def _revoke_certificates_for_which_no_csr_exists(self, relation_id: int) -> None self.remove_certificate(certificate=certificate["certificate"]) -class TLSCertificatesRequiresV1(Object): +class TLSCertificatesRequiresV2(Object): """TLS certificates requirer class to be instantiated by TLS certificates requirers.""" on = CertificatesRequirerCharmEvents() @@ -1033,7 +1186,13 @@ def __init__( self.framework.observe( charm.on[relationship_name].relation_changed, self._on_relation_changed ) - self.framework.observe(charm.on.update_status, self._on_update_status) + self.framework.observe( + charm.on[relationship_name].relation_broken, self._on_relation_broken + ) + if JujuVersion.from_environ().has_secrets: + self.framework.observe(charm.on.secret_expired, self._on_secret_expired) + else: + self.framework.observe(charm.on.update_status, self._on_update_status) @property def _requirer_csrs(self) -> List[Dict[str, str]]: @@ -1046,13 +1205,18 @@ def _requirer_csrs(self) -> List[Dict[str, str]]: @property def _provider_certificates(self) -> List[Dict[str, str]]: - """Returns list of provider CSR's from relation data.""" + """Returns list of certificates from the provider's relation data.""" relation = self.model.get_relation(self.relationship_name) if not relation: - raise RuntimeError(f"Relation {self.relationship_name} does not exist") + logger.debug("No relation: %s", self.relationship_name) + return [] if not relation.app: - raise RuntimeError(f"Remote app for relation {self.relationship_name} does not exist") + logger.debug("No remote app in relation: %s", self.relationship_name) + return [] provider_relation_data = _load_relation_data(relation.data[relation.app]) + if not self._relation_data_is_valid(provider_relation_data): + logger.warning("Provider relation data did not pass JSON Schema validation") + return [] return provider_relation_data.get("certificates", []) def _add_requirer_csr(self, csr: str) -> None: @@ -1112,12 +1276,10 @@ def request_certificate_creation(self, certificate_signing_request: bytes) -> No """ relation = self.model.get_relation(self.relationship_name) if not relation: - message = ( + raise RuntimeError( f"Relation {self.relationship_name} does not exist - " f"The certificate request can't be completed" ) - logger.error(message) - raise RuntimeError(message) self._add_requirer_csr(certificate_signing_request.decode().strip()) logger.info("Certificate request sent to provider") @@ -1179,7 +1341,16 @@ def _relation_data_is_valid(certificates_data: dict) -> bool: return False def _on_relation_changed(self, event: RelationChangedEvent) -> None: - """Handler triggerred on relation changed events. + """Handler triggered on relation changed events. + + Goes through all providers certificates that match a requested CSR. + + If the provider certificate is revoked, emit a CertificateInvalidateEvent, + otherwise emit a CertificateAvailableEvent. + + When Juju secrets are available, remove the secret for revoked certificate, + or add a secret with the correct expiry time for new certificates. + Args: event: Juju event @@ -1187,32 +1358,146 @@ def _on_relation_changed(self, event: RelationChangedEvent) -> None: Returns: None """ - relation = self.model.get_relation(self.relationship_name) - if not relation: - logger.warning(f"No relation: {self.relationship_name}") - return - if not relation.app: - logger.warning(f"No remote app in relation: {self.relationship_name}") - return - provider_relation_data = _load_relation_data(relation.data[relation.app]) - if not self._relation_data_is_valid(provider_relation_data): - logger.warning( - f"Provider relation data did not pass JSON Schema validation: " - f"{event.relation.data[relation.app]}" - ) - return requirer_csrs = [ certificate_creation_request["certificate_signing_request"] for certificate_creation_request in self._requirer_csrs ] for certificate in self._provider_certificates: if certificate["certificate_signing_request"] in requirer_csrs: - self.on.certificate_available.emit( - certificate_signing_request=certificate["certificate_signing_request"], - certificate=certificate["certificate"], - ca=certificate["ca"], - chain=certificate["chain"], - ) + if certificate.get("revoked", False): + if JujuVersion.from_environ().has_secrets: + with suppress(SecretNotFoundError): + secret = self.model.get_secret( + label=f"{LIBID}-{certificate['certificate_signing_request']}" + ) + secret.remove_all_revisions() + self.on.certificate_invalidated.emit( + reason="revoked", + certificate=certificate["certificate"], + certificate_signing_request=certificate["certificate_signing_request"], + ca=certificate["ca"], + chain=certificate["chain"], + ) + else: + if JujuVersion.from_environ().has_secrets: + try: + secret = self.model.get_secret( + label=f"{LIBID}-{certificate['certificate_signing_request']}" + ) + secret.set_content({"certificate": certificate["certificate"]}) + secret.set_info( + expire=self._get_next_secret_expiry_time( + certificate["certificate"] + ), + ) + except SecretNotFoundError: + secret = self.charm.unit.add_secret( + {"certificate": certificate["certificate"]}, + label=f"{LIBID}-{certificate['certificate_signing_request']}", + expire=self._get_next_secret_expiry_time( + certificate["certificate"] + ), + ) + self.on.certificate_available.emit( + certificate_signing_request=certificate["certificate_signing_request"], + certificate=certificate["certificate"], + ca=certificate["ca"], + chain=certificate["chain"], + ) + + def _get_next_secret_expiry_time(self, certificate: str) -> Optional[datetime]: + """Return the expiry time or expiry notification time. + + Extracts the expiry time from the provided certificate, calculates the + expiry notification time and return the closest of the two, that is in + the future. + + Args: + certificate: x509 certificate + + Returns: + Optional[datetime]: None if the certificate expiry time cannot be read, + next expiry time otherwise. + """ + expiry_time = _get_certificate_expiry_time(certificate) + if not expiry_time: + return None + expiry_notification_time = expiry_time - timedelta(hours=self.expiry_notification_time) + return _get_closest_future_time(expiry_notification_time, expiry_time) + + def _on_relation_broken(self, event: RelationBrokenEvent) -> None: + """Handler triggered on relation broken event. + + Emitting `all_certificates_invalidated` from `relation-broken` rather + than `relation-departed` since certs are stored in app data. + + Args: + event: Juju event + + Returns: + None + """ + self.on.all_certificates_invalidated.emit() + + def _on_secret_expired(self, event: SecretExpiredEvent) -> None: + """Triggered when a certificate is set to expire. + + Loads the certificate from the secret, and will emit 1 of 2 + events. + + If the certificate is not yet expired, emits CertificateExpiringEvent + and updates the expiry time of the secret to the exact expiry time on + the certificate. + + If the certificate is expired, emits CertificateInvalidedEvent and + deletes the secret. + + Args: + event (SecretExpiredEvent): Juju event + """ + if not event.secret.label or not event.secret.label.startswith(f"{LIBID}-"): + return + csr = event.secret.label[len(f"{LIBID}-") :] + certificate_dict = self._find_certificate_in_relation_data(csr) + if not certificate_dict: + # A secret expired but we did not find matching certificate. Cleaning up + event.secret.remove_all_revisions() + return + + expiry_time = _get_certificate_expiry_time(certificate_dict["certificate"]) + if not expiry_time: + # A secret expired but matching certificate is invalid. Cleaning up + event.secret.remove_all_revisions() + return + + if datetime.utcnow() < expiry_time: + logger.warning("Certificate almost expired") + self.on.certificate_expiring.emit( + certificate=certificate_dict["certificate"], + expiry=expiry_time.isoformat(), + ) + event.secret.set_info( + expire=_get_certificate_expiry_time(certificate_dict["certificate"]), + ) + else: + logger.warning("Certificate is expired") + self.on.certificate_invalidated.emit( + reason="expired", + certificate=certificate_dict["certificate"], + certificate_signing_request=certificate_dict["certificate_signing_request"], + ca=certificate_dict["ca"], + chain=certificate_dict["chain"], + ) + self.request_certificate_revocation(certificate_dict["certificate"].encode()) + event.secret.remove_all_revisions() + + def _find_certificate_in_relation_data(self, csr: str) -> Optional[Dict[str, Any]]: + """Returns the certificate that match the given CSR.""" + for certificate_dict in self._provider_certificates: + if certificate_dict["certificate_signing_request"] != csr: + continue + return certificate_dict + return None def _on_update_status(self, event: UpdateStatusEvent) -> None: """Triggered on update status event. @@ -1227,35 +1512,59 @@ def _on_update_status(self, event: UpdateStatusEvent) -> None: Returns: None """ - relation = self.model.get_relation(self.relationship_name) - if not relation: - logger.warning(f"No relation: {self.relationship_name}") - return - if not relation.app: - logger.warning(f"No remote app in relation: {self.relationship_name}") - return - provider_relation_data = _load_relation_data(relation.data[relation.app]) - if not self._relation_data_is_valid(provider_relation_data): - logger.warning( - f"Provider relation data did not pass JSON Schema validation: " - f"{relation.data[relation.app]}" - ) - return for certificate_dict in self._provider_certificates: - certificate = certificate_dict["certificate"] - try: - certificate_object = x509.load_pem_x509_certificate(data=certificate.encode()) - except ValueError: - logger.warning("Could not load certificate.") + expiry_time = _get_certificate_expiry_time(certificate_dict["certificate"]) + if not expiry_time: continue - time_difference = certificate_object.not_valid_after - datetime.utcnow() + time_difference = expiry_time - datetime.utcnow() if time_difference.total_seconds() < 0: logger.warning("Certificate is expired") - self.on.certificate_expired.emit(certificate=certificate) - self.request_certificate_revocation(certificate.encode()) + self.on.certificate_invalidated.emit( + reason="expired", + certificate=certificate_dict["certificate"], + certificate_signing_request=certificate_dict["certificate_signing_request"], + ca=certificate_dict["ca"], + chain=certificate_dict["chain"], + ) + self.request_certificate_revocation(certificate_dict["certificate"].encode()) continue if time_difference.total_seconds() < (self.expiry_notification_time * 60 * 60): logger.warning("Certificate almost expired") self.on.certificate_expiring.emit( - certificate=certificate, expiry=certificate_object.not_valid_after.isoformat() + certificate=certificate_dict["certificate"], + expiry=expiry_time.isoformat(), ) + + +def _get_closest_future_time( + expiry_notification_time: datetime, expiry_time: datetime +) -> datetime: + """Return expiry_notification_time if not in the past, otherwise return expiry_time. + + Args: + expiry_notification_time (datetime): Notification time of impending expiration + expiry_time (datetime): Expiration time + + Returns: + datetime: expiry_notification_time if not in the past, expiry_time otherwise + """ + return ( + expiry_notification_time if datetime.utcnow() < expiry_notification_time else expiry_time + ) + + +def _get_certificate_expiry_time(certificate: str) -> Optional[datetime]: + """Extract expiry time from a certificate string. + + Args: + certificate (str): x509 certificate as a string + + Returns: + Optional[datetime]: Expiry datetime or None + """ + try: + certificate_object = x509.load_pem_x509_certificate(data=certificate.encode()) + return certificate_object.not_valid_after + except ValueError: + logger.warning("Could not load certificate.") + return None diff --git a/metadata.yaml b/metadata.yaml index 2fa525e9..bf355da0 100644 --- a/metadata.yaml +++ b/metadata.yaml @@ -1,5 +1,5 @@ #!/usr/bin/env python3 -# Copyright 2021 Canonical Ltd. +# Copyright 2023 Canonical Ltd. # See LICENSE file for licensing details. name: vault-k8s diff --git a/src/charm.py b/src/charm.py index 7c4cc16e..87bba314 100755 --- a/src/charm.py +++ b/src/charm.py @@ -1,5 +1,5 @@ #!/usr/bin/env python3 -# Copyright 2021 Canonical Ltd. +# Copyright 2023 Canonical Ltd. # See LICENSE file for licensing details. """Charm for Vault running on Kubernetes. @@ -14,9 +14,9 @@ KubernetesServicePatch, ServicePort, ) -from charms.tls_certificates_interface.v1.tls_certificates import ( +from charms.tls_certificates_interface.v2.tls_certificates import ( CertificateCreationRequestEvent, - TLSCertificatesProvidesV1, + TLSCertificatesProvidesV2, ) from ops.charm import ActionEvent, CharmBase, ConfigChangedEvent from ops.framework import StoredState @@ -42,7 +42,7 @@ class VaultCharm(CharmBase): def __init__(self, *args): super().__init__(*args) self._stored.set_default(role_id="", secret_id="") - self.tls_certificates = TLSCertificatesProvidesV1(self, "certificates") + self.tls_certificates = TLSCertificatesProvidesV2(self, "certificates") self.vault = Vault( url=f"http://localhost:{self.VAULT_PORT}", role_id=self._stored.role_id, # type: ignore[arg-type] @@ -84,7 +84,7 @@ def _on_certificate_creation_request(self, event: CertificateCreationRequestEven ) def _on_config_changed(self, event: ConfigChangedEvent) -> None: - """Handler triggerred whenever there is a config-changed event. + """Handler triggered whenever there is a config-changed event. Args: event: Juju event @@ -193,7 +193,7 @@ def _on_authorise_charm_action(self, event: ActionEvent) -> None: if self.unit.is_leader(): self.vault.set_token(token=event.params["token"]) if not self.vault.is_ready: - self.vault.enable_secrets_engine() + self.vault.enable_pki_secrets_engine() self.vault.generate_root_certificate() self.vault.write_charm_pki_role() self.vault.enable_approle_auth() @@ -205,5 +205,5 @@ def _on_authorise_charm_action(self, event: ActionEvent) -> None: self.unit.status = ActiveStatus() -if __name__ == "__main__": +if __name__ == "__main__": # pragma: no cover main(VaultCharm) diff --git a/src/vault.py b/src/vault.py index 0b270f01..63892413 100644 --- a/src/vault.py +++ b/src/vault.py @@ -1,18 +1,19 @@ #!/usr/bin/env python3 -# Copyright 2021 Canonical Ltd. +# Copyright 2023 Canonical Ltd. # See LICENSE file for licensing details. """Contains all the specificities to communicate with Vault through its API.""" -import ipaddress import logging -from typing import List, Optional, Tuple +from typing import Optional, Tuple import hvac # type: ignore[import] import requests from certificate_signing_request import CertificateSigningRequest +CHARM_POLICY_FILE = "charm_policy.hcl" +CHARM_POLICY_PATH = f"src/{CHARM_POLICY_FILE}" CHARM_POLICY_NAME = "local-charm-policy" CHARM_ACCESS_ROLE = "local-charm-access" @@ -47,20 +48,12 @@ def __init__(self, url: str, role_id: Optional[str] = None, secret_id: Optional[ @property def token(self) -> Optional[str]: - """Returns Vault's token. - - Returns: - str: Vault token. - """ + """Returns Vault's token.""" return self._client.token @property def is_ready(self) -> bool: - """Returns whether Vault is ready. - - Returns: - bool: Whether Vault is ready. - """ + """Returns whether Vault is ready for interaction.""" if not self._is_backend_mounted: logger.info("Vault is not ready - Backend not mounted") return False @@ -71,25 +64,11 @@ def is_ready(self) -> bool: return True def set_token(self, token: str) -> None: - """Sets the Vault token. - - Args: - token (str): Vault token - - Returns: - None - """ + """Sets the Vault token for authentication.""" self._client.token = token def approle_login(self, role_id: str, secret_id: str) -> None: - """Logs in to Vault via API. - - Args: - role_id: Role ID. - - Returns: - None - """ + """Authenticate with Vault using the AppRole authentication method.""" try: login_response = self._client.auth.approle.login( role_id=role_id, secret_id=secret_id, use_token=False @@ -99,32 +78,20 @@ def approle_login(self, role_id: str, secret_id: str) -> None: logger.error("Login Failed - Can't connect to Vault") def enable_approle_auth(self) -> None: - """Enables AppRole auth method. - - Returns: - None - """ + """Enable the AppRole authentication method in Vault, if not already enabled.""" if "approle/" not in self._client.sys.list_auth_methods(): self._client.sys.enable_auth_method("approle") logger.info("Enabled approle auth method") def create_local_charm_policy(self) -> None: - """Add a new policy for the charm. - - Returns: - None - """ - with open("src/charm_policy.hcl", "r") as f: + """Adds a new charm policy to Vault using the predefined charm policy definition.""" + with open(CHARM_POLICY_PATH, "r") as f: charm_policy = f.read() self._client.sys.create_or_update_policy(name=CHARM_POLICY_NAME, policy=charm_policy) logger.info(f"Created charm policy: {CHARM_POLICY_NAME}") def create_local_charm_access_approle(self) -> None: - """Creates approle for charm. - - Returns: - None - """ + """Create or update an AppRole in Vault with specific permissions for charm access.""" self._client.auth.approle.create_or_update_approle( role_name=CHARM_ACCESS_ROLE, token_ttl="60s", @@ -134,7 +101,7 @@ def create_local_charm_access_approle(self) -> None: logger.info(f"Created approle {CHARM_ACCESS_ROLE}") def get_approle_auth_data(self) -> Tuple[str, str]: - """Returns Approle authentication data (role_id and secret_id). + """Retrieve the role ID and secret ID for the AppRole authentication method. Returns: str: Role ID @@ -155,8 +122,8 @@ def write_charm_pki_role( allow_glob_domains=True, enforce_hostnames=False, max_ttl="87598h", - ): - """Writes role in Vault for the charm to be capable of issuing certificates. + ) -> None: + """Write a role in Vault for the charm to be capable of issuing certificates. Args: allow_any_name (bool): Specifies if clients can request certs for any CN. @@ -187,7 +154,7 @@ def write_charm_pki_role( ) def generate_root_certificate(self, ttl: str = "87599h") -> str: - """Generating root CA certificate and private key and returning certificate. + """Generate an internal root CA certificate and private key, and return the certificate. Args: ttl: Time to live @@ -196,9 +163,7 @@ def generate_root_certificate(self, ttl: str = "87599h") -> str: str: Public key of the root certificate. """ config = { - "common_name": ( - "Vault Root Certificate Authority " "({})".format(CHARM_PKI_MOUNT_POINT) - ), + "common_name": f"Vault Root Certificate Authority ({CHARM_PKI_MOUNT_POINT})", "ttl": ttl, } root_certificate = self._client.write( @@ -209,10 +174,10 @@ def generate_root_certificate(self, ttl: str = "87599h") -> str: logger.info("Generated root CA") return root_certificate["data"]["certificate"] - def enable_secrets_engine( + def enable_pki_secrets_engine( self, ttl: Optional[str] = None, max_ttl: Optional[str] = None ) -> None: - """Enables Vault's secret engine if the backend is mounted. + """Enable Vault's PKI secrets engine on the specified mount point. Args: ttl (str): Time to live. @@ -254,7 +219,7 @@ def _write_role(self, role: str, **kwargs) -> None: logger.info(f"Wrote role for PKI access: {role}") def issue_certificate(self, certificate_signing_request: str) -> dict: - """Issues a certificate based on a provided CSR. + """Issue a certificate based on a provided Certificate Signing Request (CSR). Args: certificate_signing_request: Certificate Signing Request @@ -275,6 +240,9 @@ def _issue_certificate(self, **config) -> dict: Args: role (str): Vault role + + Returns: + dict: certificate data """ try: response = self._client.write( @@ -286,23 +254,3 @@ def _issue_certificate(self, **config) -> dict: raise RuntimeError(response.get("warnings", "unknown error")) logger.info(f"Issued certificate with role {CHARM_PKI_ROLE} for config: {config}") return response["data"] - - @staticmethod - def _sort_sans(sans: list) -> Tuple[List, List]: - """Split SANs into IP SANs and name SANs. - - Args: - sans (list): List of SANs - - Returns: - A tuple containing, a list of IP SAN's and a list of name SAN's. - """ - ip_sans = set() - for san in sans: - try: - ipaddress.ip_address(san) - ip_sans.add(san) - except ValueError: - pass - alt_names = set(sans).difference(ip_sans) - return sorted(list(ip_sans)), sorted(list(alt_names)) diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py index 7187e1dd..dbd20a7c 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm.py @@ -68,7 +68,7 @@ def test_given_unsecure_config_not_set_when_pebble_ready_then_unit_is_in_waiting ) @patch( - "charms.tls_certificates_interface.v1.tls_certificates.TLSCertificatesProvidesV1.set_relation_certificate" # noqa: E501,W505 + "charms.tls_certificates_interface.v2.tls_certificates.TLSCertificatesProvidesV2.set_relation_certificate" # noqa: E501,W505 ) @patch("vault.Vault.issue_certificate") def test_given_certificate_request_contains_correct_information_when_certificate_request_then_vault_is_called( # noqa: E501 @@ -85,7 +85,7 @@ def test_given_certificate_request_contains_correct_information_when_certificate patch_issue_certs.assert_has_calls(calls=calls) @patch( - "charms.tls_certificates_interface.v1.tls_certificates.TLSCertificatesProvidesV1.set_relation_certificate" # noqa: E501, W505 + "charms.tls_certificates_interface.v2.tls_certificates.TLSCertificatesProvidesV2.set_relation_certificate" # noqa: E501, W505 ) @patch("vault.Vault.issue_certificate") def test_given_vault_answers_with_certificate_when_certificate_request_then_certificates_are_added_to_relation_data( # noqa: E501 @@ -124,7 +124,7 @@ def test_given_vault_answers_with_certificate_when_certificate_request_then_cert @patch("vault.Vault.generate_root_certificate") @patch("vault.Vault.create_local_charm_access_approle") @patch("vault.Vault.write_charm_pki_role") - @patch("vault.Vault.enable_secrets_engine") + @patch("vault.Vault.enable_pki_secrets_engine") @patch("vault.Vault.approle_login") @patch("vault.Vault.create_local_charm_policy") @patch("vault.Vault.enable_approle_auth") diff --git a/tests/unit/test_vault.py b/tests/unit/test_vault.py index 1d07391b..5983c47e 100644 --- a/tests/unit/test_vault.py +++ b/tests/unit/test_vault.py @@ -71,7 +71,7 @@ def test_given_certificate_returned_by_vault_when_generate_root_certificate_then @patch("hvac.api.system_backend.mount.Mount.list_mounted_secrets_engines") @patch("hvac.api.system_backend.mount.Mount.enable_secrets_engine") - def test_given_backend_not_mounted_when_enable_secrets_engine_then_secrets_engine_is_enabled( + def test_given_backend_not_mounted_when_enable_pki_secrets_engine_then_secrets_engine_is_enabled( # noqa: E501 self, patch_enable_secrets_engine, patch_list_mounted_secrets_engine, @@ -79,7 +79,7 @@ def test_given_backend_not_mounted_when_enable_secrets_engine_then_secrets_engin patch_list_mounted_secrets_engine.return_value = dict() vault = Vault(url="http://whatever-url") - vault.enable_secrets_engine() + vault.enable_pki_secrets_engine() patch_enable_secrets_engine.assert_called_with( backend_type="pki",