From be362952423c5b14de778cde2026eb325b91615c Mon Sep 17 00:00:00 2001 From: averevki Date: Fri, 14 Oct 2022 14:53:31 +0200 Subject: [PATCH] Add mtls identity tests Closes #31 --- testsuite/certificates/__init__.py | 15 ++++- testsuite/openshift/client.py | 9 ++- .../authorino/operator/tls/conftest.py | 60 ++++++++++++++----- .../authorino/operator/tls/mtls/__init__.py | 0 .../operator/tls/mtls/test_mtls_identity.py | 44 ++++++++++++++ 5 files changed, 108 insertions(+), 20 deletions(-) create mode 100644 testsuite/tests/kuadrant/authorino/operator/tls/mtls/__init__.py create mode 100644 testsuite/tests/kuadrant/authorino/operator/tls/mtls/test_mtls_identity.py diff --git a/testsuite/certificates/__init__.py b/testsuite/certificates/__init__.py index bed9c901..1de2b502 100644 --- a/testsuite/certificates/__init__.py +++ b/testsuite/certificates/__init__.py @@ -38,6 +38,16 @@ class UnsignedKey: class CFSSLClient: """Client for working with CFSSL library""" + DEFAULT_NAMES = [ + { + "O": "Red Hat Inc.", + "OU": "IT", + "L": "San Francisco", + "ST": "California", + "C": "US", + } + ] + def __init__(self, binary) -> None: super().__init__() self.binary = binary @@ -115,16 +125,16 @@ def create_authority(self, :param names: dict of all names :param certificate_authority: Optional Authority to sign this new authority, making it intermediate """ + names = names or self.DEFAULT_NAMES data = { "CN": common_name, + "names": names, "hosts": hosts, "key": { "algo": "rsa", "size": 4096 }, } - if names: - data["names"] = names # type: ignore result = self._execute_command("genkey", "-initca", "-", stdin=json.dumps(data)) key = UnsignedKey(key=result["key"], csr=result["csr"]) @@ -145,6 +155,7 @@ def create(self, :param names: Names field in the csr :param certificate_authority: Certificate Authority to be used for signing """ + names = names or self.DEFAULT_NAMES key = self.generate_key(common_name, names, hosts) certificate = self.sign(key, certificate_authority=certificate_authority) return certificate diff --git a/testsuite/openshift/client.py b/testsuite/openshift/client.py index 42612d44..9a4f8462 100644 --- a/testsuite/openshift/client.py +++ b/testsuite/openshift/client.py @@ -3,7 +3,7 @@ import enum import os from functools import cached_property -from typing import Dict +from typing import Dict, Optional import openshift as oc from openshift import Context, Selector, OpenShiftPythonException @@ -126,9 +126,9 @@ def is_ready(self, selector: Selector): success, _, _ = selector.until_all(success_func=lambda obj: "readyReplicas" in obj.model.status) return success - def create_tls_secret(self, name: str, certificate: Certificate): + def create_tls_secret(self, name: str, certificate: Certificate, labels: Optional[Dict[str, str]] = None): """Creates a TLS secret""" - model = { + model: Dict = { 'kind': 'Secret', 'apiVersion': 'v1', 'metadata': { @@ -140,6 +140,9 @@ def create_tls_secret(self, name: str, certificate: Certificate): }, "type": "kubernetes.io/tls" } + if labels is not None: + model["metadata"]["labels"] = labels + with self.context: return oc.create(model, ["--save-config=true"]) diff --git a/testsuite/tests/kuadrant/authorino/operator/tls/conftest.py b/testsuite/tests/kuadrant/authorino/operator/tls/conftest.py index 819d7914..737d70bc 100644 --- a/testsuite/tests/kuadrant/authorino/operator/tls/conftest.py +++ b/testsuite/tests/kuadrant/authorino/operator/tls/conftest.py @@ -1,5 +1,5 @@ """Conftest for all TLS-enabled tests""" -from typing import Dict +from typing import Optional, Dict import pytest @@ -11,20 +11,27 @@ @pytest.fixture(scope="session") def cert_attributes() -> Dict[str, str]: """Certificate attributes""" - return dict(O="Red Hat Inc.", - OU="IT", - L="San Francisco", - ST="California", - C="US",) + return dict(O="Organization Test", + OU="Unit Test", + L="Location Test", + ST="State Test", + C="Country Test") @pytest.fixture(scope="session") -def certificates(cfssl, authorino_domain, wildcard_domain, cert_attributes): +def cert_attributes_other(cert_attributes) -> Dict[str, str]: + """Certificate attributes that are different from the default ones""" + return {k: f"{v}-other" for k, v in cert_attributes.items()} + + +@pytest.fixture(scope="session") +def certificates(cfssl, authorino_domain, wildcard_domain, cert_attributes, cert_attributes_other): """Certificate hierarchy used for the tests""" chain = { - "envoy_ca": CertInfo(names=[cert_attributes], children={ + "envoy_ca": CertInfo(children={ "envoy_cert": None, - "valid_cert": CertInfo(names=[cert_attributes]) + "valid_cert": CertInfo(names=[cert_attributes]), + "custom_cert": CertInfo(names=[cert_attributes_other]) }), "authorino_ca": CertInfo(children={ "authorino_cert": CertInfo(hosts=authorino_domain), @@ -39,9 +46,9 @@ def certificates(cfssl, authorino_domain, wildcard_domain, cert_attributes): @pytest.fixture(scope="session") def create_secret(blame, request, openshift): """Creates TLS secret from Certificate""" - def _create_secret(certificate: Certificate, name): + def _create_secret(certificate: Certificate, name: str, labels: Optional[Dict[str, str]] = None): secret_name = blame(name) - secret = openshift.create_tls_secret(secret_name, certificate) + secret = openshift.create_tls_secret(secret_name, certificate, labels=labels) request.addfinalizer(lambda: openshift.delete_selector(secret)) return secret_name return _create_secret @@ -102,17 +109,40 @@ def valid_cert(certificates): @pytest.fixture(scope="session") -def invalid_cert(invalid_authority, cfssl, wildcard_domain): +def custom_cert(certificates): + """Envoy certificate that have different attributes""" + return certificates["custom_cert"] + + +@pytest.fixture(scope="session") +def invalid_cert(certificates): """Certificate rejected by Envoy""" - return cfssl.create("invalid", hosts=[wildcard_domain], certificate_authority=invalid_authority) + return certificates["invalid_cert"] + + +@pytest.fixture(scope="module") +def selector_params(module_label): + """Label key-value pair for the CA secret discovery""" + return "testLabel", module_label + + +@pytest.fixture(scope="module") +def authorino_labels(selector_params) -> Dict[str, str]: + """Labels for the proper Authorino discovery""" + labels = { + "authorino.kuadrant.io/managed-by": "authorino", + selector_params[0]: selector_params[1] + } + return labels +# pylint: disable-msg=too-many-locals @pytest.fixture(scope="module") def envoy(request, authorino, openshift, create_secret, blame, label, backend, - authorino_authority, envoy_authority, envoy_cert, testconfig): + authorino_authority, envoy_authority, envoy_cert, testconfig, authorino_labels): """Envoy + Httpbin backend""" authorino_secret = create_secret(authorino_authority, "authca") - envoy_ca_secret = create_secret(envoy_authority, "backendca") + envoy_ca_secret = create_secret(envoy_authority, "backendca", labels=authorino_labels) envoy_secret = create_secret(envoy_cert, "envoycert") envoy = TLSEnvoy(openshift, authorino, blame("backend"), label, backend.url, testconfig["envoy"]["image"], diff --git a/testsuite/tests/kuadrant/authorino/operator/tls/mtls/__init__.py b/testsuite/tests/kuadrant/authorino/operator/tls/mtls/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/testsuite/tests/kuadrant/authorino/operator/tls/mtls/test_mtls_identity.py b/testsuite/tests/kuadrant/authorino/operator/tls/mtls/test_mtls_identity.py new file mode 100644 index 00000000..b2cb5cff --- /dev/null +++ b/testsuite/tests/kuadrant/authorino/operator/tls/mtls/test_mtls_identity.py @@ -0,0 +1,44 @@ +"""mTLS authentication tests""" +import pytest +from httpx import ReadError, ConnectError + +from testsuite.objects import Rule + + +@pytest.fixture(scope="module", autouse=True) +def authorization(authorization, blame, selector_params, cert_attributes): + """Create AuthConfig with mtls identity and pattern matching rule""" + authorization.remove_all_identities() + + authorization.add_mtls_identity(blame("mtls"), *selector_params) + rule = Rule("auth.identity.Organization", "incl", cert_attributes["O"]) + authorization.add_auth_rule(blame("redhat"), rule) + return authorization + + +def test_mtls_success(envoy_authority, valid_cert, envoy): + """Test successful mtls authentication""" + with envoy.client(verify=envoy_authority, cert=valid_cert) as client: + response = client.get("/get") + assert response.status_code == 200 + + +@pytest.mark.parametrize("cert_authority, certificate, err, err_match", [ + pytest.param("envoy_authority", "invalid_cert", ReadError, "unknown ca", id="Invalid certificate"), + pytest.param("invalid_authority", "valid_cert", ConnectError, "certificate verify failed", id="Unknown authority"), +]) +def test_mtls_fail(request, cert_authority, certificate, err, err_match: str, envoy): + """Test mtls verification with invalid certificate or unknown signed authority""" + ca = request.getfixturevalue(cert_authority) + cert = request.getfixturevalue(certificate) + + with pytest.raises(err, match=err_match): + with envoy.client(verify=ca, cert=cert) as client: + client.get("/get") + + +def test_mtls_unmatched_attributes(envoy_authority, custom_cert, envoy): + """Test certificate that signed by the trusted CA, though their attributes are unmatched""" + with envoy.client(verify=envoy_authority, cert=custom_cert) as client: + response = client.get("/get") + assert response.status_code == 403