diff --git a/testsuite/certificates/__init__.py b/testsuite/certificates/__init__.py index 2af40b15..1de2b502 100644 --- a/testsuite/certificates/__init__.py +++ b/testsuite/certificates/__init__.py @@ -18,6 +18,7 @@ class CertInfo: hosts: Optional[Union[Collection[str], str]] = None ca: bool = False children: Optional[Dict[str, Optional["CertInfo"]]] = None + names: Optional[List[Dict[str, str]]] = None @dataclasses.dataclass diff --git a/testsuite/objects/__init__.py b/testsuite/objects/__init__.py index 6d32c16a..90b7403c 100644 --- a/testsuite/objects/__init__.py +++ b/testsuite/objects/__init__.py @@ -62,6 +62,10 @@ def add_api_key_identity(self, name, all_namespaces, match_label, match_expressi def add_anonymous_identity(self, name): """Adds anonymous identity""" + @abc.abstractmethod + def add_mtls_identity(self, name: str, selector_key: str, selector_value: str): + """Adds mTLS identity""" + @abc.abstractmethod def remove_all_identities(self): """Removes all identities from AuthConfig""" @@ -98,6 +102,10 @@ def add_auth_rule(self, name: str, rule: Rule, when: Rule, metrics: bool, priori def add_role_rule(self, name: str, role: str, path: str, metrics: bool, priority: int): """Adds a rule, which allows access to 'path' only to users with 'role'""" + @abc.abstractmethod + def remove_all_rules(self): + """Removes all rules from AuthConfig""" + @abc.abstractmethod def set_deny_with(self, code, value): """Set denyWith to authconfig""" diff --git a/testsuite/openshift/client.py b/testsuite/openshift/client.py index 42612d44..9a4f8462 100644 --- a/testsuite/openshift/client.py +++ b/testsuite/openshift/client.py @@ -3,7 +3,7 @@ import enum import os from functools import cached_property -from typing import Dict +from typing import Dict, Optional import openshift as oc from openshift import Context, Selector, OpenShiftPythonException @@ -126,9 +126,9 @@ def is_ready(self, selector: Selector): success, _, _ = selector.until_all(success_func=lambda obj: "readyReplicas" in obj.model.status) return success - def create_tls_secret(self, name: str, certificate: Certificate): + def create_tls_secret(self, name: str, certificate: Certificate, labels: Optional[Dict[str, str]] = None): """Creates a TLS secret""" - model = { + model: Dict = { 'kind': 'Secret', 'apiVersion': 'v1', 'metadata': { @@ -140,6 +140,9 @@ def create_tls_secret(self, name: str, certificate: Certificate): }, "type": "kubernetes.io/tls" } + if labels is not None: + model["metadata"]["labels"] = labels + with self.context: return oc.create(model, ["--save-config=true"]) diff --git a/testsuite/openshift/objects/auth_config.py b/testsuite/openshift/objects/auth_config.py index 2f48b5ef..24463359 100644 --- a/testsuite/openshift/objects/auth_config.py +++ b/testsuite/openshift/objects/auth_config.py @@ -118,6 +118,26 @@ def add_anonymous_identity(self, name): identities = self.model.spec.setdefault("identity", []) identities.append({"name": name, "anonymous": {}}) + @modify + def add_mtls_identity(self, name: str, selector_key: str, selector_value: str): + """Adds mTLS identity + Args: + :param name: name of the identity + :param selector_key: selector key to match + :param selector_value: selector value to match + """ + identities = self.model.spec.setdefault("identity", []) + identities.append({ + "name": name, + "mtls": { + "selector": { + "matchLabels": { + selector_key: selector_value + } + } + } + }) + @modify def add_auth_rule(self, name, rule: Rule, when: Rule = None, metrics=False, priority=0): """Adds JSON pattern-matching authorization rule (authorization.json)""" @@ -147,6 +167,12 @@ def add_role_rule(self, name: str, role: str, path: str, metrics=False, priority when = Rule("context.request.http.path", "matches", path) self.add_auth_rule(name, rule, when, metrics, priority) + @modify + def remove_all_rules(self): + """Removes all rules from AuthConfig""" + authorization = self.model.spec.setdefault("authorization", []) + authorization.clear() + @modify def remove_all_identities(self): """Removes all identities from AuthConfig""" diff --git a/testsuite/tests/kuadrant/authorino/operator/tls/conftest.py b/testsuite/tests/kuadrant/authorino/operator/tls/conftest.py index e8876136..737d70bc 100644 --- a/testsuite/tests/kuadrant/authorino/operator/tls/conftest.py +++ b/testsuite/tests/kuadrant/authorino/operator/tls/conftest.py @@ -1,4 +1,5 @@ """Conftest for all TLS-enabled tests""" +from typing import Optional, Dict import pytest @@ -8,12 +9,29 @@ @pytest.fixture(scope="session") -def certificates(cfssl, authorino_domain, wildcard_domain): +def cert_attributes() -> Dict[str, str]: + """Certificate attributes""" + return dict(O="Organization Test", + OU="Unit Test", + L="Location Test", + ST="State Test", + C="Country Test") + + +@pytest.fixture(scope="session") +def cert_attributes_other(cert_attributes) -> Dict[str, str]: + """Certificate attributes that are different from the default ones""" + return {k: f"{v}-other" for k, v in cert_attributes.items()} + + +@pytest.fixture(scope="session") +def certificates(cfssl, authorino_domain, wildcard_domain, cert_attributes, cert_attributes_other): """Certificate hierarchy used for the tests""" chain = { "envoy_ca": CertInfo(children={ "envoy_cert": None, - "valid_cert": None + "valid_cert": CertInfo(names=[cert_attributes]), + "custom_cert": CertInfo(names=[cert_attributes_other]) }), "authorino_ca": CertInfo(children={ "authorino_cert": CertInfo(hosts=authorino_domain), @@ -28,9 +46,9 @@ def certificates(cfssl, authorino_domain, wildcard_domain): @pytest.fixture(scope="session") def create_secret(blame, request, openshift): """Creates TLS secret from Certificate""" - def _create_secret(certificate: Certificate, name): + def _create_secret(certificate: Certificate, name: str, labels: Optional[Dict[str, str]] = None): secret_name = blame(name) - secret = openshift.create_tls_secret(secret_name, certificate) + secret = openshift.create_tls_secret(secret_name, certificate, labels=labels) request.addfinalizer(lambda: openshift.delete_selector(secret)) return secret_name return _create_secret @@ -91,17 +109,40 @@ def valid_cert(certificates): @pytest.fixture(scope="session") -def invalid_cert(invalid_authority, cfssl, wildcard_domain): +def custom_cert(certificates): + """Envoy certificate that have different attributes""" + return certificates["custom_cert"] + + +@pytest.fixture(scope="session") +def invalid_cert(certificates): """Certificate rejected by Envoy""" - return cfssl.create("invalid", hosts=[wildcard_domain], certificate_authority=invalid_authority) + return certificates["invalid_cert"] + + +@pytest.fixture(scope="module") +def selector_params(module_label): + """Label key-value pair for the CA secret discovery""" + return "testLabel", module_label + + +@pytest.fixture(scope="module") +def authorino_labels(selector_params) -> Dict[str, str]: + """Labels for the proper Authorino discovery""" + labels = { + "authorino.kuadrant.io/managed-by": "authorino", + selector_params[0]: selector_params[1] + } + return labels +# pylint: disable-msg=too-many-locals @pytest.fixture(scope="module") def envoy(request, authorino, openshift, create_secret, blame, label, backend, - authorino_authority, envoy_authority, envoy_cert, testconfig): + authorino_authority, envoy_authority, envoy_cert, testconfig, authorino_labels): """Envoy + Httpbin backend""" authorino_secret = create_secret(authorino_authority, "authca") - envoy_ca_secret = create_secret(envoy_authority, "backendca") + envoy_ca_secret = create_secret(envoy_authority, "backendca", labels=authorino_labels) envoy_secret = create_secret(envoy_cert, "envoycert") envoy = TLSEnvoy(openshift, authorino, blame("backend"), label, backend.url, testconfig["envoy"]["image"], diff --git a/testsuite/tests/kuadrant/authorino/operator/tls/mtls/__init__.py b/testsuite/tests/kuadrant/authorino/operator/tls/mtls/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/testsuite/tests/kuadrant/authorino/operator/tls/mtls/test_mtls_identity.py b/testsuite/tests/kuadrant/authorino/operator/tls/mtls/test_mtls_identity.py new file mode 100644 index 00000000..b2cb5cff --- /dev/null +++ b/testsuite/tests/kuadrant/authorino/operator/tls/mtls/test_mtls_identity.py @@ -0,0 +1,44 @@ +"""mTLS authentication tests""" +import pytest +from httpx import ReadError, ConnectError + +from testsuite.objects import Rule + + +@pytest.fixture(scope="module", autouse=True) +def authorization(authorization, blame, selector_params, cert_attributes): + """Create AuthConfig with mtls identity and pattern matching rule""" + authorization.remove_all_identities() + + authorization.add_mtls_identity(blame("mtls"), *selector_params) + rule = Rule("auth.identity.Organization", "incl", cert_attributes["O"]) + authorization.add_auth_rule(blame("redhat"), rule) + return authorization + + +def test_mtls_success(envoy_authority, valid_cert, envoy): + """Test successful mtls authentication""" + with envoy.client(verify=envoy_authority, cert=valid_cert) as client: + response = client.get("/get") + assert response.status_code == 200 + + +@pytest.mark.parametrize("cert_authority, certificate, err, err_match", [ + pytest.param("envoy_authority", "invalid_cert", ReadError, "unknown ca", id="Invalid certificate"), + pytest.param("invalid_authority", "valid_cert", ConnectError, "certificate verify failed", id="Unknown authority"), +]) +def test_mtls_fail(request, cert_authority, certificate, err, err_match: str, envoy): + """Test mtls verification with invalid certificate or unknown signed authority""" + ca = request.getfixturevalue(cert_authority) + cert = request.getfixturevalue(certificate) + + with pytest.raises(err, match=err_match): + with envoy.client(verify=ca, cert=cert) as client: + client.get("/get") + + +def test_mtls_unmatched_attributes(envoy_authority, custom_cert, envoy): + """Test certificate that signed by the trusted CA, though their attributes are unmatched""" + with envoy.client(verify=envoy_authority, cert=custom_cert) as client: + response = client.get("/get") + assert response.status_code == 403 diff --git a/testsuite/utils.py b/testsuite/utils.py index 56e9e9d7..3b6f3dbe 100644 --- a/testsuite/utils.py +++ b/testsuite/utils.py @@ -65,9 +65,11 @@ def cert_builder(cfssl: CFSSLClient, chain: dict, hosts: Union[str, Collection[s parsed_hosts = [parsed_hosts] # type: ignore if info.ca or info.children: - cert = cfssl.create_authority(name, hosts=parsed_hosts, certificate_authority=parent) + cert = cfssl.create_authority(name, names=info.names, + hosts=parsed_hosts, certificate_authority=parent) else: - cert = cfssl.create(name, hosts=parsed_hosts, certificate_authority=parent) # type: ignore + cert = cfssl.create(name, names=info.names, + hosts=parsed_hosts, certificate_authority=parent) # type: ignore cert.chain = cert.certificate + parent.chain if parent else cert.certificate # type: ignore if info.children is not None: result.update(cert_builder(cfssl, info.children, parsed_hosts, cert))