diff --git a/.github/workflows/integration-test.yaml b/.github/workflows/integration-test.yaml index 65d40fd..0b117e7 100644 --- a/.github/workflows/integration-test.yaml +++ b/.github/workflows/integration-test.yaml @@ -9,7 +9,7 @@ jobs: matrix: arch: - arch: amd64 - runner: ubuntu-22.04 + runner: [self-hosted, linux, X64, jammy, xlarge] runs-on: ${{ matrix.arch.runner }} steps: diff --git a/charmcraft.yaml b/charmcraft.yaml index 83f41d9..9b03603 100644 --- a/charmcraft.yaml +++ b/charmcraft.yaml @@ -31,6 +31,9 @@ provides: interface: grafana_dashboard requires: + access-certificates: + limit: 1 + interface: tls-certificates logging: interface: loki_push_api diff --git a/src/charm.py b/src/charm.py index 9fc1621..58a1afb 100755 --- a/src/charm.py +++ b/src/charm.py @@ -6,6 +6,7 @@ import logging import random +import socket import string from contextlib import suppress from dataclasses import dataclass @@ -17,8 +18,12 @@ from charms.prometheus_k8s.v0.prometheus_scrape import MetricsEndpointProvider from charms.tls_certificates_interface.v4.tls_certificates import ( Certificate, + CertificateRequest, + Mode, + PrivateKey, ProviderCertificate, TLSCertificatesProvidesV4, + TLSCertificatesRequiresV4, generate_ca, generate_certificate, generate_csr, @@ -34,6 +39,7 @@ LOGGING_RELATION_NAME = "logging" METRICS_RELATION_NAME = "metrics" GRAFANA_RELATION_NAME = "grafana-dashboard" +TLS_ACCESS_RELATION_NAME = "access-certificates" DB_MOUNT = "database" CONFIG_MOUNT = "config" @@ -68,8 +74,12 @@ class NotaryCharm(ops.CharmBase): def __init__(self, framework: ops.Framework): super().__init__(framework) - self.port = 2111 + self.access_csr = CertificateRequest( + common_name="Notary", + sans_dns=frozenset([socket.getfqdn()]), + ) + self.unit.set_ports(self.port) self.container = self.unit.get_container("notary") self.tls = TLSCertificatesProvidesV4( @@ -89,9 +99,15 @@ def __init__(self, framework: ops.Framework): } ], ) + self.tls_access = TLSCertificatesRequiresV4( + charm=self, + mode=Mode.APP, + relationship_name=TLS_ACCESS_RELATION_NAME, + certificate_requests=[self.access_csr], + ) self.client = Notary( - f"https://{self._application_bind_address}:{self.port}", + f"https://{socket.getfqdn()}:{self.port}", f"{CHARM_PATH}/{CONFIG_MOUNT}/0/ca.pem", ) [ @@ -100,6 +116,9 @@ def __init__(self, framework: ops.Framework): self.on["notary"].pebble_ready, self.on["notary"].pebble_custom_notice, self.on["certificates"].relation_changed, + self.on["certificates"].relation_departed, + self.on["access-certificates"].relation_changed, + self.on["access-certificates"].relation_departed, self.on.config_storage_attached, self.on.database_storage_attached, self.on.config_changed, @@ -131,8 +150,8 @@ def _on_collect_status(self, event: ops.CollectStatusEvent): if not self._storages_attached(): event.add_status(ops.WaitingStatus("storages not yet available")) return - if not self._self_signed_certificates_generated(): - event.add_status(ops.WaitingStatus("certificates not yet created")) + if not self._certificates_available(): + event.add_status(ops.WaitingStatus("certificates not yet pushed to workload")) return if not self.client.is_api_available(): event.add_status(ops.WaitingStatus("Notary server not yet available")) @@ -167,14 +186,18 @@ def _configure_notary_config_file(self): def _configure_access_certificates(self): """Update the config files for notary and replan if required.""" certificates_changed = False - if not self._self_signed_certificates_generated(): - certificates_changed = True - self._generate_self_signed_certificates() - logger.info("Certificates configured.") + if not self.tls_access._tls_relation_created(): + if not self._self_signed_certificates_generated(): + certificates_changed = True + self._generate_self_signed_certificates() + else: + certificates_changed = self._store_certificate_from_access_relation_if_available() if certificates_changed: - self.container.add_layer("notary", self._pebble_layer, combine=True) - with suppress(ops.pebble.ChangeError): - self.container.replan() + logger.info("Certificates changed. Restarting service.") + self.container.restart("notary") + self.container.add_layer("notary", self._pebble_layer, combine=True) + with suppress(ops.pebble.ChangeError): + self.container.replan() def _configure_charm_authorization(self): """Create an admin user to manage Notary if needed, and acquire a token by logging in if needed.""" @@ -183,6 +206,13 @@ def _configure_charm_authorization(self): return if not login_details.token or not self.client.token_is_valid(login_details.token): login_details.token = self.client.login(login_details.username, login_details.password) + if not login_details.token: + logger.warning( + "failed to login with the existing admin credentials." + " If you've manually modified the admin account credentials," + " please update the charm's credentials secret accordingly." + ) + return login_details_secret = self.model.get_secret(label=NOTARY_LOGIN_SECRET_LABEL) login_details_secret.set_content(login_details.to_dict()) @@ -270,17 +300,6 @@ def _pebble_layer(self) -> ops.pebble.LayerDict: }, } - @property - def _application_bind_address(self) -> str | None: - binding = self.model.get_binding("juju-info") - if not binding: - return None - if not binding.network: - return None - if not binding.network.bind_address: - return None - return str(binding.network.bind_address) - ## Status Checks ## def _storages_attached(self) -> bool: """Return if the storages are attached.""" @@ -289,11 +308,25 @@ def _storages_attached(self) -> bool: ) ## Helpers ## + def _store_certificate_from_access_relation_if_available(self) -> bool: + """Check if the requirer object has a certificate assigned. Save it to the workload if so. + + Returns: + bool: True if a new certificate was saved. + """ + cert, pk = self.tls_access.get_assigned_certificate(certificate_request=self.access_csr) + if not cert or not pk: + return False + saved_cert = self.container.pull( + f"{WORKLOAD_CONFIG_PATH}/{CONFIG_MOUNT}/certificate.pem", + ).read() + if str(cert.certificate) == saved_cert: + return False + self._push_files_to_workload(cert.ca, cert.certificate, pk) + return True + def _generate_self_signed_certificates(self) -> None: """Generate self signed certificates and saves them to secrets and the charm.""" - if not self._application_bind_address: - logger.warning("unit IP not found.") - return ca_private_key = generate_private_key() ca_certificate = generate_ca( private_key=ca_private_key, @@ -304,8 +337,7 @@ def _generate_self_signed_certificates(self) -> None: csr = generate_csr( private_key=private_key, common_name=CERTIFICATE_COMMON_NAME, - sans_dns=frozenset([CERTIFICATE_COMMON_NAME]), - sans_ip=frozenset([self._application_bind_address]), + sans_dns=frozenset([socket.getfqdn()]), ) certificate = generate_certificate( ca=ca_certificate, @@ -313,19 +345,7 @@ def _generate_self_signed_certificates(self) -> None: csr=csr, validity=365, ) - self.container.push( - f"{WORKLOAD_CONFIG_PATH}/{CONFIG_MOUNT}/ca.pem", str(ca_certificate), make_dirs=True - ) - self.container.push( - f"{WORKLOAD_CONFIG_PATH}/{CONFIG_MOUNT}/certificate.pem", - str(certificate), - make_dirs=True, - ) - self.container.push( - f"{WORKLOAD_CONFIG_PATH}/{CONFIG_MOUNT}/private_key.pem", - str(private_key), - make_dirs=True, - ) + self._push_files_to_workload(ca_certificate, certificate, private_key) logger.info("Created self signed certificates.") def _self_signed_certificates_generated(self) -> bool: @@ -339,6 +359,14 @@ def _self_signed_certificates_generated(self) -> bool: cert = Certificate.from_string(existing_cert.read()) return cert.common_name == CERTIFICATE_COMMON_NAME + def _certificates_available(self) -> bool: + """Check if the workload certificate is available.""" + try: + self.container.pull(f"{WORKLOAD_CONFIG_PATH}/{CONFIG_MOUNT}/certificate.pem") + except ops.pebble.PathError: + return False + return True + def _get_or_create_admin_account(self) -> LoginSecret | None: """Get the first admin user for the charm to use from secrets. Create one if it doesn't exist. @@ -367,6 +395,32 @@ def _get_or_create_admin_account(self) -> LoginSecret | None: return None return account + def _push_files_to_workload( + self, + ca_certificate: Certificate | None, + certificate: Certificate | None, + private_key: PrivateKey | None, + ) -> None: + """Push all given files to workload.""" + if ca_certificate: + self.container.push( + f"{WORKLOAD_CONFIG_PATH}/{CONFIG_MOUNT}/ca.pem", + str(ca_certificate), + make_dirs=True, + ) + if certificate: + self.container.push( + f"{WORKLOAD_CONFIG_PATH}/{CONFIG_MOUNT}/certificate.pem", + str(certificate), + make_dirs=True, + ) + if private_key: + self.container.push( + f"{WORKLOAD_CONFIG_PATH}/{CONFIG_MOUNT}/private_key.pem", + str(private_key), + make_dirs=True, + ) + def _generate_password() -> str: """Generate a password for the Notary Account.""" diff --git a/src/notary.py b/src/notary.py index c7f92f9..e45b1cb 100644 --- a/src/notary.py +++ b/src/notary.py @@ -56,6 +56,7 @@ def login(self, username: str, password: str) -> str | None: json={"username": username, "password": password}, ) except (requests.RequestException, OSError): + logger.warning("login failed: ", exc_info=True) return try: req.raise_for_status() diff --git a/tests/integration/test_charm.py b/tests/integration/test_charm.py index c5eca2c..0234e86 100644 --- a/tests/integration/test_charm.py +++ b/tests/integration/test_charm.py @@ -16,6 +16,7 @@ generate_certificate, generate_private_key, ) +from juju.application import Application from juju.client.client import SecretsFilter from pytest_operator.plugin import OpsTest @@ -29,6 +30,7 @@ LOKI_APPLICATION_NAME = "loki-k8s" PROMETHEUS_APPLICATION_NAME = "prometheus-k8s" +TLS_PROVIDER_APPLICATION_NAME = "self-signed-certificates" TLS_REQUIRER_APPLICATION_NAME = "tls-certificates-requirer" @@ -43,9 +45,75 @@ async def test_build_and_deploy(ops_test: OpsTest, request: pytest.FixtureReques assert ops_test.model await ops_test.model.deploy(charm, resources=resources, application_name=APP_NAME) + await ops_test.model.deploy( + "self-signed-certificates", + application_name=TLS_PROVIDER_APPLICATION_NAME, + channel="edge", + trust=True, + ) + await ops_test.model.deploy( + "tls-certificates-requirer", + application_name=TLS_REQUIRER_APPLICATION_NAME, + channel="edge", + trust=True, + ) + await ops_test.model.deploy( + "prometheus-k8s", + application_name=PROMETHEUS_APPLICATION_NAME, + trust=True, + ) + await ops_test.model.deploy( + "loki-k8s", application_name=LOKI_APPLICATION_NAME, trust=True, channel="stable" + ) await ops_test.model.wait_for_idle(apps=[APP_NAME], status="active", timeout=1000) +@pytest.mark.abort_on_fail +async def test_given_tls_access_relation_when_related_and_unrelated_to_notary_then_certificates_replaced_appropriately( + ops_test: OpsTest, +): + assert ops_test.model + + await ops_test.model.wait_for_idle( + apps=[APP_NAME, TLS_PROVIDER_APPLICATION_NAME], + status="active", + timeout=1000, + raise_on_error=True, + ) + + first_ca = await get_file_from_notary(ops_test, "ca.pem") + assert first_ca.startswith("-----BEGIN CERTIFICATE-----") + + await ops_test.model.integrate( + relation1=f"{TLS_PROVIDER_APPLICATION_NAME}:certificates", + relation2=f"{APP_NAME}:access-certificates", + ) + async with ops_test.fast_forward(): + await ops_test.model.wait_for_idle( + apps=[APP_NAME, TLS_PROVIDER_APPLICATION_NAME], + status="active", + timeout=1000, + raise_on_error=True, + ) + new_ca = await get_file_from_notary(ops_test, "ca.pem") + assert new_ca != first_ca + + notary_app = ops_test.model.applications[APP_NAME] + assert isinstance(notary_app, Application) + await notary_app.remove_relation( + "access-certificates", f"{TLS_PROVIDER_APPLICATION_NAME}:certificates" + ) + async with ops_test.fast_forward(): + await ops_test.model.wait_for_idle( + apps=[APP_NAME, TLS_PROVIDER_APPLICATION_NAME], + status="active", + timeout=1000, + raise_on_error=True, + ) + final_ca = await get_file_from_notary(ops_test, "ca.pem") + assert final_ca != new_ca + + @pytest.mark.abort_on_fail async def test_given_notary_when_tls_requirer_related_then_csr_uploaded_to_notary_and_certificate_provided_to_requirer( ops_test: OpsTest, @@ -58,12 +126,6 @@ async def test_given_notary_when_tls_requirer_related_then_csr_uploaded_to_notar client = Notary(url=endpoint) assert client.token_is_valid(token) - await ops_test.model.deploy( - "tls-certificates-requirer", - application_name=TLS_REQUIRER_APPLICATION_NAME, - channel="edge", - trust=True, - ) await ops_test.model.integrate( relation1=f"{APP_NAME}:certificates", relation2=f"{TLS_REQUIRER_APPLICATION_NAME}", @@ -108,19 +170,7 @@ async def test_given_loki_and_prometheus_related_to_notary_all_charm_statuses_ac ): """Deploy loki and prometheus, and make sure all applications are active.""" assert ops_test.model - deploy_prometheus = ops_test.model.deploy( - "prometheus-k8s", - application_name=PROMETHEUS_APPLICATION_NAME, - trust=True, - ) - deploy_loki = ops_test.model.deploy( - "loki-k8s", application_name=LOKI_APPLICATION_NAME, trust=True, channel="stable" - ) - await asyncio.gather( - deploy_loki, - deploy_prometheus, - ) await asyncio.gather( ops_test.model.integrate( relation1=f"{APP_NAME}:logging", @@ -168,9 +218,13 @@ async def run_get_certificate_action(ops_test: OpsTest) -> str: """ assert ops_test.model tls_requirer_unit = ops_test.model.units[f"{TLS_REQUIRER_APPLICATION_NAME}/0"] - assert tls_requirer_unit - action = await tls_requirer_unit.run_action( - action_name="get-certificate", - ) + action = await tls_requirer_unit.run_action(action_name="get-certificate") # type: ignore action_output = await ops_test.model.get_action_output(action_uuid=action.entity_id, wait=30) return action_output.get("certificates", "") + + +async def get_file_from_notary(ops_test: OpsTest, file_name: str) -> str: + notary_unit = ops_test.model.units[f"{APP_NAME}/0"] # type: ignore + action = await notary_unit.run(f"cat /var/lib/juju/storage/config/0/{file_name}") # type: ignore + await action.wait() + return action.results["stdout"] diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py index c004de2..b44b9b4 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm.py @@ -1,7 +1,7 @@ # Copyright 2024 Canonical Ltd. # See LICENSE file for licensing details. -import tempfile +from pathlib import Path from unittest.mock import Mock, patch import ops @@ -9,7 +9,12 @@ from ops.pebble import Layer from scenario import Container, Context, Mount, Network, Relation, Secret, State, Storage -from charm import CERTIFICATE_PROVIDER_RELATION_NAME, NOTARY_LOGIN_SECRET_LABEL, NotaryCharm +from charm import ( + CERTIFICATE_PROVIDER_RELATION_NAME, + NOTARY_LOGIN_SECRET_LABEL, + TLS_ACCESS_RELATION_NAME, + NotaryCharm, +) from lib.charms.tls_certificates_interface.v4.tls_certificates import ( Certificate, PrivateKey, @@ -20,13 +25,16 @@ generate_csr, generate_private_key, ) -from notary import CertificateRequest, CertificateRequests +from notary import CertificateRequest as CertificateRequestRow +from notary import CertificateRequests TLS_LIB_PATH = "charms.tls_certificates_interface.v4.tls_certificates" CERTIFICATE_COMMON_NAME = "Notary Self Signed Certificate" SELF_SIGNED_CA_COMMON_NAME = "Notary Self Signed Root CA" +TLS_LIB_PATH = "charms.tls_certificates_interface.v4.tls_certificates" + class TestCharm: @pytest.fixture(scope="function") @@ -289,9 +297,7 @@ def test_given_storages_available_container_can_connect_network_not_available_no out = context.run(context.on.config_changed(), state) root = out.get_container("notary").get_filesystem(context) assert (root / "etc/notary/config/config.yaml").open("r") - assert not (root / "etc/notary/config/certificate.pem").exists() - assert not (root / "etc/notary/config/private_key.pem").exists() - assert len(out.secrets) == 1 + assert len(list(out.secrets)) == 1 assert out.get_secret(label="Notary Login Details") def test_given_only_config_storage_container_cant_connect_network_available_notary_not_running_when_configure_then_no_error_raised( @@ -777,9 +783,7 @@ def test_given_storages_available_container_can_connect_network_not_available_no out = context.run(context.on.config_changed(), state) root = out.get_container("notary").get_filesystem(context) assert (root / "etc/notary/config/config.yaml").open("r") - assert not (root / "etc/notary/config/certificate.pem").exists() - assert not ((root / "etc/notary/config/private_key.pem").exists()) - assert len(out.secrets) == 1 + assert len(list(out.secrets)) == 1 assert out.get_secret(label="Notary Login Details") def test_given_only_config_storage_container_cant_connect_network_available_notary_running_when_configure_then_no_error_raised( @@ -1252,9 +1256,7 @@ def test_given_storages_available_container_can_connect_network_not_available_no root = out.get_container("notary").get_filesystem(context) assert (root / "etc/notary/config/config.yaml").open("r") - assert not (root / "etc/notary/config/certificate.pem").exists() - assert not ((root / "etc/notary/config/private_key.pem").exists()) - assert len(out.secrets) == 1 + assert len(list(out.secrets)) == 1 assert out.get_secret(label="Notary Login Details") def test_given_only_config_storage_container_cant_connect_network_available_notary_initialized_when_configure_then_no_error_raised( @@ -1732,7 +1734,7 @@ def test_given_storages_available_container_can_connect_network_not_available_no ), ): out = context.run(context.on.collect_unit_status(), state) - assert out.unit_status == ops.WaitingStatus("certificates not yet created") + assert out.unit_status == ops.WaitingStatus("certificates not yet pushed to workload") def test_given_only_config_storage_container_cant_connect_network_available_notary_not_running_when_collect_status_then_status_is_waiting( self, context @@ -1972,7 +1974,7 @@ def test_given_storages_available_container_can_connect_network_available_notary ), ): out = context.run(context.on.collect_unit_status(), state) - assert out.unit_status == ops.WaitingStatus("certificates not yet created") + assert out.unit_status == ops.WaitingStatus("certificates not yet pushed to workload") def test_given_only_config_storage_container_cant_connect_network_not_available_notary_running_when_collect_status_then_status_is_waiting( self, context @@ -2212,7 +2214,7 @@ def test_given_storages_available_container_can_connect_network_not_available_no ), ): out = context.run(context.on.collect_unit_status(), state) - assert out.unit_status == ops.WaitingStatus("certificates not yet created") + assert out.unit_status == ops.WaitingStatus("certificates not yet pushed to workload") def test_given_only_config_storage_container_cant_connect_network_available_notary_running_when_collect_status_then_status_is_waiting( self, context @@ -2452,7 +2454,7 @@ def test_given_storages_available_container_can_connect_network_available_notary ), ): out = context.run(context.on.collect_unit_status(), state) - assert out.unit_status == ops.WaitingStatus("certificates not yet created") + assert out.unit_status == ops.WaitingStatus("certificates not yet pushed to workload") def test_given_only_config_storage_container_cant_connect_network_not_available_notary_initialized_when_collect_status_then_status_is_waiting( self, context @@ -2692,7 +2694,7 @@ def test_given_storages_available_container_can_connect_network_not_available_no ), ): out = context.run(context.on.collect_unit_status(), state) - assert out.unit_status == ops.WaitingStatus("certificates not yet created") + assert out.unit_status == ops.WaitingStatus("certificates not yet pushed to workload") def test_given_only_config_storage_container_cant_connect_network_available_notary_initialized_when_collect_status_then_status_is_waiting( self, context @@ -2932,64 +2934,81 @@ def test_given_storages_available_container_can_connect_network_available_notary ), ): out = context.run(context.on.collect_unit_status(), state) - assert out.unit_status == ops.WaitingStatus("certificates not yet created") + assert out.unit_status == ops.WaitingStatus("certificates not yet pushed to workload") def test_given_notary_available_and_initialized_when_collect_status_then_status_is_active( - self, context + self, context, tmpdir ): - with tempfile.TemporaryDirectory() as tempdir: - config_mount = Mount(location="/etc/notary/config", source=tempdir) - state = State( - storages={Storage(name="config"), Storage(name="database")}, - containers=[ - Container(name="notary", can_connect=True, mounts={"config": config_mount}) - ], - networks={Network("juju-info")}, - leader=True, - ) + config_mount = Mount(location="/etc/notary/config", source=tmpdir) + state = State( + storages={Storage(name="config"), Storage(name="database")}, + containers=[ + Container(name="notary", can_connect=True, mounts={"config": config_mount}) + ], + leader=True, + ) - certificate, _ = self.example_cert_and_key() - with open(tempdir + "/certificate.pem", "w") as f: - f.write(str(certificate)) + certificate, _ = self.example_cert_and_key() + with open(tmpdir + "/certificate.pem", "w") as f: + f.write(str(certificate)) - with patch( - "notary.Notary.__new__", - return_value=Mock( - **{"is_api_available.return_value": True, "is_initialized.return_value": True}, # type: ignore - ), - ): - out = context.run(context.on.collect_unit_status(), state) - assert out.unit_status == ops.ActiveStatus() + with patch( + "notary.Notary.__new__", + return_value=Mock( + **{"is_api_available.return_value": True, "is_initialized.return_value": True}, # type: ignore + ), + ): + out = context.run(context.on.collect_unit_status(), state) + assert out.unit_status == ops.ActiveStatus() def test_given_notary_available_and_not_initialized_when_configure_then_admin_user_created( - self, context + self, context, tmpdir ): - with tempfile.TemporaryDirectory() as tempdir: - config_mount = Mount(location="/etc/notary/config", source=tempdir) - state = State( - storages={Storage(name="config"), Storage(name="database")}, - containers=[ - Container(name="notary", can_connect=True, mounts={"config": config_mount}) - ], - networks={Network("juju-info")}, - leader=True, - ) + config_mount = Mount(location="/etc/notary/config", source=tmpdir) + state = State( + storages={Storage(name="config"), Storage(name="database")}, + containers=[ + Container( + name="notary", + can_connect=True, + mounts={"config": config_mount}, + layers={ + "notary": Layer( + { + "summary": "notary layer", + "description": "pebble config layer for notary", + "services": { + "notary": { + "override": "replace", + "summary": "notary", + "command": "notary -config /etc/notary/config/config.yaml", + "startup": "enabled", + } + }, + } + ) + }, + ) + ], + leader=True, + ) - with patch( - "notary.Notary.__new__", - return_value=Mock( - **{ - "is_api_available.return_value": True, - "is_initialized.return_value": False, - "login.return_value": "example-token", - "token_is_valid.return_value": False, - }, - ), - ): - out = context.run(context.on.update_status(), state) - assert len(out.secrets) == 1 - secret = out.get_secret(label="Notary Login Details") - assert secret.latest_content.get("token") == "example-token" + with patch( + "notary.Notary.__new__", + return_value=Mock( + **{ + "is_api_available.return_value": True, + "is_initialized.return_value": False, + "login.return_value": "example-token", + "token_is_valid.return_value": False, + }, + ), + ): + out = context.run(context.on.update_status(), state) + assert len(list(out.secrets)) == 1 + secret = out.get_secret(label="Notary Login Details") + assert secret.latest_content + assert secret.latest_content.get("token") == "example-token" def test_given_tls_requirer_available_when_notary_unreachable_then_no_error_raised( self, context @@ -3155,7 +3174,7 @@ def test_given_tls_requirers_available_when_csrs_already_posted_then_duplicate_c "is_initialized.return_value": True, "token_is_valid.return_value": True, "get_certificate_requests_table.return_value": CertificateRequests( - rows=[CertificateRequest(id=1, csr=str(csr), certificate_chain="")] + rows=[CertificateRequestRow(id=1, csr=str(csr), certificate_chain="")] ), "post_csr": post_call, }, @@ -3225,7 +3244,7 @@ def test_given_tls_requirers_available_when_certificate_available_then_certs_pro "token_is_valid.return_value": True, "get_certificate_requests_table.return_value": CertificateRequests( rows=[ - CertificateRequest( + CertificateRequestRow( id=1, csr=str(csr), certificate_chain=[str(cert), str(ca)] ) ] @@ -3311,7 +3330,7 @@ def test_given_tls_requirers_when_invalid_certificate_available_when_configure_t "token_is_valid.return_value": True, "get_certificate_requests_table.return_value": CertificateRequests( rows=[ - CertificateRequest( + CertificateRequestRow( id=1, csr=str(csr), certificate_chain=[str(new_cert), str(ca)] ) ] @@ -3395,10 +3414,177 @@ def test_given_certificate_rejected_in_notary_when_configure_then_certificate_re "is_initialized.return_value": True, "token_is_valid.return_value": True, "get_certificate_requests_table.return_value": CertificateRequests( - rows=[CertificateRequest(id=1, csr=str(csr), certificate_chain="rejected")] + rows=[ + CertificateRequestRow(id=1, csr=str(csr), certificate_chain="rejected") + ] ), }, ), ): context.run(context.on.update_status(), state) mock_set_relation_certificate.assert_called_once() + + @patch(f"{TLS_LIB_PATH}.TLSCertificatesRequiresV4.get_assigned_certificate") + def test_given_access_relation_created_when_configure_then_certificate_not_replaced( + self, mock_assigned_certificates, context, tmpdir + ): + config_mount = Mount(location="/etc/notary/config", source=tmpdir) + state = State( + storages={Storage(name="config"), Storage(name="database")}, + containers=[ + Container( + name="notary", + can_connect=True, + mounts={"config": config_mount}, + layers={ + "notary": Layer( + { + "summary": "notary layer", + "description": "pebble config layer for notary", + "services": { + "notary": { + "override": "replace", + "summary": "notary", + "command": "notary -config /etc/notary/config/config.yaml", + "startup": "enabled", + } + }, + } + ) + }, + ) + ], + relations=[Relation(id=1, endpoint=TLS_ACCESS_RELATION_NAME)], + leader=True, + ) + certificate, _ = self.example_cert_and_key() + with open(tmpdir + "/certificate.pem", "w") as f: + f.write(str(certificate)) + mock_assigned_certificates.return_value = (None, None) + with patch( + "notary.Notary.__new__", + return_value=Mock( + **{ + "is_api_available.return_value": True, + "is_initialized.return_value": True, + "login.return_value": "example-token", + "token_is_valid.return_value": True, + }, + ), + ): + context.run(context.on.update_status(), state) + Path(tmpdir + "etc/notary/config").mkdir(parents=True, exist_ok=True) + with open(tmpdir + "/certificate.pem") as f: + saved_cert = f.read() + assert saved_cert == str(certificate) + + @patch(f"{TLS_LIB_PATH}.TLSCertificatesRequiresV4.get_assigned_certificate") + def test_given_new_certificate_available_when_configure_then_certificate_replaced( + self, mock_assigned_certificates, context, tmpdir + ): + config_mount = Mount(location="/etc/notary/config", source=tmpdir) + state = State( + storages={Storage(name="config"), Storage(name="database")}, + containers=[ + Container( + name="notary", + can_connect=True, + mounts={"config": config_mount}, + layers={ + "notary": Layer( + { + "summary": "notary layer", + "description": "pebble config layer for notary", + "services": { + "notary": { + "override": "replace", + "summary": "notary", + "command": "notary -config /etc/notary/config/config.yaml", + "startup": "enabled", + } + }, + } + ) + }, + ) + ], + relations=[Relation(id=1, endpoint=TLS_ACCESS_RELATION_NAME)], + leader=True, + ) + existing_certificate, _ = self.example_cert_and_key() + certificate, pk = self.example_cert_and_key() + provider_certificate_mock = Mock() + provider_certificate_mock.certificate = certificate.raw + with open(tmpdir + "/certificate.pem", "w") as f: + f.write(str(existing_certificate)) + mock_assigned_certificates.return_value = (provider_certificate_mock, pk) + with patch( + "notary.Notary.__new__", + return_value=Mock( + **{ + "is_api_available.return_value": True, + "is_initialized.return_value": True, + "login.return_value": "example-token", + "token_is_valid.return_value": True, + }, + ), + ): + context.run(context.on.update_status(), state) + with open(tmpdir + "/certificate.pem") as f: + saved_cert = f.read() + assert saved_cert == str(certificate) + + @patch(f"{TLS_LIB_PATH}.TLSCertificatesRequiresV4.get_assigned_certificate") + def test_given_new_certificate_available_and_new_cert_already_saved_when_configure_then_certificate_not_replaced( + self, mock_assigned_certificates, context, tmpdir + ): + config_mount = Mount(location="/etc/notary/config", source=tmpdir) + state = State( + storages={Storage(name="config"), Storage(name="database")}, + containers=[ + Container( + name="notary", + can_connect=True, + mounts={"config": config_mount}, + layers={ + "notary": Layer( + { + "summary": "notary layer", + "description": "pebble config layer for notary", + "services": { + "notary": { + "override": "replace", + "summary": "notary", + "command": "notary -config /etc/notary/config/config.yaml", + "startup": "enabled", + } + }, + } + ) + }, + ) + ], + relations=[Relation(id=1, endpoint=TLS_ACCESS_RELATION_NAME)], + leader=True, + ) + certificate, pk = self.example_cert_and_key() + provider_certificate_mock = Mock() + provider_certificate_mock.certificate = certificate.raw + with open(tmpdir + "/certificate.pem", "w") as f: + f.write(str(certificate)) + mock_assigned_certificates.return_value = (provider_certificate_mock, pk) + with patch( + "notary.Notary.__new__", + return_value=Mock( + **{ + "is_api_available.return_value": True, + "is_initialized.return_value": True, + "login.return_value": "example-token", + "token_is_valid.return_value": True, + }, + ), + ): + context.run(context.on.update_status(), state) + with open(tmpdir + "/certificate.pem") as f: + saved_cert = f.read() + assert saved_cert == str(certificate)