From b62aba93a31721f178e9be28662cee1b9024269d Mon Sep 17 00:00:00 2001 From: Judit Novak Date: Wed, 25 Oct 2023 10:00:44 +0200 Subject: [PATCH] [DPE-2790] Extended TLS testing (6/edge) (#281) ## Issue As already done for K8s: https://github.com/canonical/mongodb-k8s-operator/pull/206 ## Solution --- tests/__init__.py | 2 + tests/integration/helpers.py | 89 +++++++++++++++++++ .../relation_tests/new_relations/helpers.py | 59 +----------- tests/integration/tls_tests/helpers.py | 22 +++++ tests/integration/tls_tests/test_tls.py | 63 +++++++++++++ 5 files changed, 178 insertions(+), 57 deletions(-) create mode 100644 tests/__init__.py diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 000000000..db3bfe1a6 --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1,2 @@ +# Copyright 2023 Canonical Ltd. +# See LICENSE file for licensing details. diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index 755f6e464..3ce39568f 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -1,7 +1,9 @@ # Copyright 2023 Canonical Ltd. # See LICENSE file for licensing details. +import json from pathlib import Path +from typing import Dict, Optional import ops import yaml @@ -100,3 +102,90 @@ async def set_password( ) action = await action.wait() return action.results + + +async def get_application_relation_data( + ops_test: OpsTest, + application_name: str, + relation_name: str, + key: str, + relation_id: str = None, + relation_alias: str = None, +) -> Optional[str]: + """Get relation data for an application. + + Args: + ops_test: The ops test framework instance + application_name: The name of the application + relation_name: name of the relation to get connection data from + key: key of data to be retrieved + relation_id: id of the relation to get connection data from + relation_alias: alias of the relation (like a connection name) + to get connection data from + Returns: + the that that was requested or None + if no data in the relation + Raises: + ValueError if it's not possible to get application unit data + or if there is no data for the particular relation endpoint + and/or alias. + """ + unit_name = f"{application_name}/0" + raw_data = (await ops_test.juju("show-unit", unit_name))[1] + + if not raw_data: + raise ValueError(f"no unit info could be grabbed for {unit_name}") + data = yaml.safe_load(raw_data) + + # Filter the data based on the relation name. + relation_data = [v for v in data[unit_name]["relation-info"] if v["endpoint"] == relation_name] + + if relation_id: + # Filter the data based on the relation id. + relation_data = [v for v in relation_data if v["relation-id"] == relation_id] + + if relation_alias: + # Filter the data based on the cluster/relation alias. + relation_data = [ + v + for v in relation_data + if json.loads(v["application-data"]["data"])["alias"] == relation_alias + ] + + if len(relation_data) == 0: + raise ValueError( + f"no relation data could be grabbed on relation with endpoint {relation_name} and alias {relation_alias}" + ) + + return relation_data[0]["application-data"].get(key) + + +async def get_secret_id(ops_test, app_or_unit: Optional[str] = None) -> str: + """Retrieve secert ID for an app or unit.""" + complete_command = "list-secrets" + + prefix = "" + if app_or_unit: + if app_or_unit[-1].isdigit(): + # it's a unit + app_or_unit = "-".join(app_or_unit.split("/")) + prefix = "unit-" + else: + prefix = "application-" + complete_command += f" --owner {prefix}{app_or_unit}" + + _, stdout, _ = await ops_test.juju(*complete_command.split()) + output_lines_split = [line.split() for line in stdout.split("\n")] + if app_or_unit: + return [line[0] for line in output_lines_split if app_or_unit in line][0] + else: + return output_lines_split[1][0] + + +async def get_secret_content(ops_test, secret_id) -> Dict[str, str]: + """Retrieve contents of a Juju Secret.""" + secret_id = secret_id.split("/")[-1] + complete_command = f"show-secret {secret_id} --reveal --format=json" + _, stdout, _ = await ops_test.juju(*complete_command.split()) + data = json.loads(stdout) + return data[secret_id]["content"]["Data"] diff --git a/tests/integration/relation_tests/new_relations/helpers.py b/tests/integration/relation_tests/new_relations/helpers.py index 162c9d8f1..dcaf62bc8 100644 --- a/tests/integration/relation_tests/new_relations/helpers.py +++ b/tests/integration/relation_tests/new_relations/helpers.py @@ -1,68 +1,13 @@ #!/usr/bin/env python3 # Copyright 2023 Canonical Ltd. # See LICENSE file for licensing details. + import json -from typing import Optional -import yaml from pytest_operator.plugin import OpsTest from tenacity import RetryError, Retrying, stop_after_delay, wait_fixed - -async def get_application_relation_data( - ops_test: OpsTest, - application_name: str, - relation_name: str, - key: str, - relation_id: str = None, - relation_alias: str = None, -) -> Optional[str]: - """Get relation data for an application. - - Args: - ops_test: The ops test framework instance - application_name: The name of the application - relation_name: name of the relation to get connection data from - key: key of data to be retrieved - relation_id: id of the relation to get connection data from - relation_alias: alias of the relation (like a connection name) - to get connection data from - Returns: - the that that was requested or None - if no data in the relation - Raises: - ValueError if it's not possible to get application unit data - or if there is no data for the particular relation endpoint - and/or alias. - """ - unit_name = f"{application_name}/0" - raw_data = (await ops_test.juju("show-unit", unit_name))[1] - - if not raw_data: - raise ValueError(f"no unit info could be grabbed for {unit_name}") - data = yaml.safe_load(raw_data) - - # Filter the data based on the relation name. - relation_data = [v for v in data[unit_name]["relation-info"] if v["endpoint"] == relation_name] - - if relation_id: - # Filter the data based on the relation id. - relation_data = [v for v in relation_data if v["relation-id"] == relation_id] - - if relation_alias: - # Filter the data based on the cluster/relation alias. - relation_data = [ - v - for v in relation_data - if json.loads(v["application-data"]["data"])["alias"] == relation_alias - ] - - if len(relation_data) == 0: - raise ValueError( - f"no relation data could be grabbed on relation with endpoint {relation_name} and alias {relation_alias}" - ) - - return relation_data[0]["application-data"].get(key) +from tests.integration.helpers import get_application_relation_data async def verify_application_data( diff --git a/tests/integration/tls_tests/helpers.py b/tests/integration/tls_tests/helpers.py index a87f167bf..a3fd4065d 100644 --- a/tests/integration/tls_tests/helpers.py +++ b/tests/integration/tls_tests/helpers.py @@ -1,6 +1,7 @@ #!/usr/bin/env python3 # Copyright 2023 Canonical Ltd. # See LICENSE file for licensing details. +import logging from datetime import datetime import ops @@ -20,6 +21,8 @@ INTERNAL_CERT_PATH = f"{MONGOD_CONF_DIR}/internal-ca.crt" EXTERNAL_PEM_PATH = f"{MONGOD_CONF_DIR}/external-cert.pem" +logger = logging.getLogger(__name__) + class ProcessError(Exception): """Raised when a process fails.""" @@ -110,3 +113,22 @@ def process_systemctl_time(systemctl_output): time_as_str = "T".join(systemctl_output.split("=")[1].split(" ")[1:3]) d = datetime.strptime(time_as_str, "%Y-%m-%dT%H:%M:%S") return d + + +async def scp_file_preserve_ctime(ops_test: OpsTest, unit_name: str, path: str) -> int: + """Returns the unix timestamp of when a file was created on a specified unit.""" + # Retrieving the file + filename = path.split("/")[-1] + complete_command = f"scp --container mongod {unit_name}:{path} {filename}" + return_code, scp_output, stderr = await ops_test.juju(*complete_command.split()) + + if return_code != 0: + logger.error(stderr) + raise ProcessError( + "Expected command %s to succeed instead it failed: %s; %s", + complete_command, + return_code, + stderr, + ) + + return f"{filename}" diff --git a/tests/integration/tls_tests/test_tls.py b/tests/integration/tls_tests/test_tls.py index db67d09e2..9b932c606 100644 --- a/tests/integration/tls_tests/test_tls.py +++ b/tests/integration/tls_tests/test_tls.py @@ -2,24 +2,82 @@ # Copyright 2023 Canonical Ltd. # See LICENSE file for licensing details. +import json + import pytest +from ops import Unit from pytest_operator.plugin import OpsTest +from ..helpers import get_application_relation_data, get_secret_content, get_secret_id from .helpers import ( EXTERNAL_CERT_PATH, INTERNAL_CERT_PATH, check_tls, + scp_file_preserve_ctime, time_file_created, time_process_started, ) MONGO_COMMON_DIR = "/var/snap/charmed-mongodb/common" TLS_CERTIFICATES_APP_NAME = "tls-certificates-operator" +TLS_RELATION_NAME = "certificates" DATABASE_APP_NAME = "mongodb" TLS_TEST_DATA = "tests/integration/tls_tests/data" DB_SERVICE = "snap.charmed-mongodb.mongod.service" +async def check_certs_correctly_distributed(ops_test: OpsTest, unit: Unit) -> None: + """Comparing expected vs distributed certificates. + + Verifying certificates downloaded on the charm against the ones distributed by the TLS operator + """ + app_secret_id = await get_secret_id(ops_test, DATABASE_APP_NAME) + unit_secret_id = await get_secret_id(ops_test, unit.name) + app_secret_content = await get_secret_content(ops_test, app_secret_id) + unit_secret_content = await get_secret_content(ops_test, unit_secret_id) + app_current_crt = app_secret_content["csr-secret"] + unit_current_crt = unit_secret_content["csr-secret"] + + # Get the values for certs from the relation, as provided by TLS Charm + certificates_raw_data = await get_application_relation_data( + ops_test, DATABASE_APP_NAME, TLS_RELATION_NAME, "certificates" + ) + certificates_data = json.loads(certificates_raw_data) + + external_item = [ + data + for data in certificates_data + if data["certificate_signing_request"].rstrip() == unit_current_crt.rstrip() + ][0] + internal_item = [ + data + for data in certificates_data + if data["certificate_signing_request"].rstrip() == app_current_crt.rstrip() + ][0] + + # Get a local copy of the external cert + external_copy_path = await scp_file_preserve_ctime(ops_test, unit.name, EXTERNAL_CERT_PATH) + + # Get the external cert value from the relation + relation_external_cert = "\n".join(external_item["chain"]) + + # CHECK: Compare if they are the same + with open(external_copy_path) as f: + external_contents_file = f.read() + assert relation_external_cert == external_contents_file + + # Get a local copy of the internal cert + internal_copy_path = await scp_file_preserve_ctime(ops_test, unit.name, INTERNAL_CERT_PATH) + + # Get the external cert value from the relation + relation_internal_cert = "\n".join(internal_item["chain"]) + + # CHECK: Compare if they are the same + with open(internal_copy_path) as f: + internal_contents_file = f.read() + assert relation_internal_cert == internal_contents_file + + @pytest.mark.abort_on_fail async def test_build_and_deploy(ops_test: OpsTest) -> None: """Build and deploy one unit of MongoDB and one unit of TLS.""" @@ -67,6 +125,7 @@ async def test_rotate_tls_key(ops_test: OpsTest) -> None: original_tls_times[unit.name]["mongod_service"] = await time_process_started( ops_test, unit.name, DB_SERVICE ) + check_certs_correctly_distributed(ops_test, unit) # set external and internal key using auto-generated key for each unit for unit in ops_test.model.applications[DATABASE_APP_NAME].units: @@ -87,6 +146,8 @@ async def test_rotate_tls_key(ops_test: OpsTest) -> None: new_internal_cert_time = await time_file_created(ops_test, unit.name, INTERNAL_CERT_PATH) new_mongod_service_time = await time_process_started(ops_test, unit.name, DB_SERVICE) + check_certs_correctly_distributed(ops_test, unit) + assert ( new_external_cert_time > original_tls_times[unit.name]["external_cert"] ), f"external cert for {unit.name} was not updated." @@ -165,6 +226,8 @@ async def test_set_tls_key(ops_test: OpsTest) -> None: new_internal_cert_time = await time_file_created(ops_test, unit.name, INTERNAL_CERT_PATH) new_mongod_service_time = await time_process_started(ops_test, unit.name, DB_SERVICE) + check_certs_correctly_distributed(ops_test, unit) + assert ( new_external_cert_time > original_tls_times[unit.name]["external_cert"] ), f"external cert for {unit.name} was not updated."