Skip to content

Commit

Permalink
Merge pull request #124 from averevki/mtls-identity-tests
Browse files Browse the repository at this point in the history
Add a basic mTLS identity tests
  • Loading branch information
pehala authored Oct 24, 2022
2 parents 0a4ae62 + be36295 commit 8bb4c6e
Show file tree
Hide file tree
Showing 8 changed files with 138 additions and 13 deletions.
1 change: 1 addition & 0 deletions testsuite/certificates/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class CertInfo:
hosts: Optional[Union[Collection[str], str]] = None
ca: bool = False
children: Optional[Dict[str, Optional["CertInfo"]]] = None
names: Optional[List[Dict[str, str]]] = None


@dataclasses.dataclass
Expand Down
8 changes: 8 additions & 0 deletions testsuite/objects/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ def add_api_key_identity(self, name, all_namespaces, match_label, match_expressi
def add_anonymous_identity(self, name):
"""Adds anonymous identity"""

@abc.abstractmethod
def add_mtls_identity(self, name: str, selector_key: str, selector_value: str):
"""Adds mTLS identity"""

@abc.abstractmethod
def remove_all_identities(self):
"""Removes all identities from AuthConfig"""
Expand Down Expand Up @@ -98,6 +102,10 @@ def add_auth_rule(self, name: str, rule: Rule, when: Rule, metrics: bool, priori
def add_role_rule(self, name: str, role: str, path: str, metrics: bool, priority: int):
"""Adds a rule, which allows access to 'path' only to users with 'role'"""

@abc.abstractmethod
def remove_all_rules(self):
"""Removes all rules from AuthConfig"""

@abc.abstractmethod
def set_deny_with(self, code, value):
"""Set denyWith to authconfig"""
Expand Down
9 changes: 6 additions & 3 deletions testsuite/openshift/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import enum
import os
from functools import cached_property
from typing import Dict
from typing import Dict, Optional

import openshift as oc
from openshift import Context, Selector, OpenShiftPythonException
Expand Down Expand Up @@ -126,9 +126,9 @@ def is_ready(self, selector: Selector):
success, _, _ = selector.until_all(success_func=lambda obj: "readyReplicas" in obj.model.status)
return success

def create_tls_secret(self, name: str, certificate: Certificate):
def create_tls_secret(self, name: str, certificate: Certificate, labels: Optional[Dict[str, str]] = None):
"""Creates a TLS secret"""
model = {
model: Dict = {
'kind': 'Secret',
'apiVersion': 'v1',
'metadata': {
Expand All @@ -140,6 +140,9 @@ def create_tls_secret(self, name: str, certificate: Certificate):
},
"type": "kubernetes.io/tls"
}
if labels is not None:
model["metadata"]["labels"] = labels

with self.context:
return oc.create(model, ["--save-config=true"])

Expand Down
26 changes: 26 additions & 0 deletions testsuite/openshift/objects/auth_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,26 @@ def add_anonymous_identity(self, name):
identities = self.model.spec.setdefault("identity", [])
identities.append({"name": name, "anonymous": {}})

@modify
def add_mtls_identity(self, name: str, selector_key: str, selector_value: str):
"""Adds mTLS identity
Args:
:param name: name of the identity
:param selector_key: selector key to match
:param selector_value: selector value to match
"""
identities = self.model.spec.setdefault("identity", [])
identities.append({
"name": name,
"mtls": {
"selector": {
"matchLabels": {
selector_key: selector_value
}
}
}
})

@modify
def add_auth_rule(self, name, rule: Rule, when: Rule = None, metrics=False, priority=0):
"""Adds JSON pattern-matching authorization rule (authorization.json)"""
Expand Down Expand Up @@ -147,6 +167,12 @@ def add_role_rule(self, name: str, role: str, path: str, metrics=False, priority
when = Rule("context.request.http.path", "matches", path)
self.add_auth_rule(name, rule, when, metrics, priority)

@modify
def remove_all_rules(self):
"""Removes all rules from AuthConfig"""
authorization = self.model.spec.setdefault("authorization", [])
authorization.clear()

@modify
def remove_all_identities(self):
"""Removes all identities from AuthConfig"""
Expand Down
57 changes: 49 additions & 8 deletions testsuite/tests/kuadrant/authorino/operator/tls/conftest.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Conftest for all TLS-enabled tests"""
from typing import Optional, Dict

import pytest

Expand All @@ -8,12 +9,29 @@


@pytest.fixture(scope="session")
def certificates(cfssl, authorino_domain, wildcard_domain):
def cert_attributes() -> Dict[str, str]:
"""Certificate attributes"""
return dict(O="Organization Test",
OU="Unit Test",
L="Location Test",
ST="State Test",
C="Country Test")


@pytest.fixture(scope="session")
def cert_attributes_other(cert_attributes) -> Dict[str, str]:
"""Certificate attributes that are different from the default ones"""
return {k: f"{v}-other" for k, v in cert_attributes.items()}


@pytest.fixture(scope="session")
def certificates(cfssl, authorino_domain, wildcard_domain, cert_attributes, cert_attributes_other):
"""Certificate hierarchy used for the tests"""
chain = {
"envoy_ca": CertInfo(children={
"envoy_cert": None,
"valid_cert": None
"valid_cert": CertInfo(names=[cert_attributes]),
"custom_cert": CertInfo(names=[cert_attributes_other])
}),
"authorino_ca": CertInfo(children={
"authorino_cert": CertInfo(hosts=authorino_domain),
Expand All @@ -28,9 +46,9 @@ def certificates(cfssl, authorino_domain, wildcard_domain):
@pytest.fixture(scope="session")
def create_secret(blame, request, openshift):
"""Creates TLS secret from Certificate"""
def _create_secret(certificate: Certificate, name):
def _create_secret(certificate: Certificate, name: str, labels: Optional[Dict[str, str]] = None):
secret_name = blame(name)
secret = openshift.create_tls_secret(secret_name, certificate)
secret = openshift.create_tls_secret(secret_name, certificate, labels=labels)
request.addfinalizer(lambda: openshift.delete_selector(secret))
return secret_name
return _create_secret
Expand Down Expand Up @@ -91,17 +109,40 @@ def valid_cert(certificates):


@pytest.fixture(scope="session")
def invalid_cert(invalid_authority, cfssl, wildcard_domain):
def custom_cert(certificates):
"""Envoy certificate that have different attributes"""
return certificates["custom_cert"]


@pytest.fixture(scope="session")
def invalid_cert(certificates):
"""Certificate rejected by Envoy"""
return cfssl.create("invalid", hosts=[wildcard_domain], certificate_authority=invalid_authority)
return certificates["invalid_cert"]


@pytest.fixture(scope="module")
def selector_params(module_label):
"""Label key-value pair for the CA secret discovery"""
return "testLabel", module_label


@pytest.fixture(scope="module")
def authorino_labels(selector_params) -> Dict[str, str]:
"""Labels for the proper Authorino discovery"""
labels = {
"authorino.kuadrant.io/managed-by": "authorino",
selector_params[0]: selector_params[1]
}
return labels


# pylint: disable-msg=too-many-locals
@pytest.fixture(scope="module")
def envoy(request, authorino, openshift, create_secret, blame, label, backend,
authorino_authority, envoy_authority, envoy_cert, testconfig):
authorino_authority, envoy_authority, envoy_cert, testconfig, authorino_labels):
"""Envoy + Httpbin backend"""
authorino_secret = create_secret(authorino_authority, "authca")
envoy_ca_secret = create_secret(envoy_authority, "backendca")
envoy_ca_secret = create_secret(envoy_authority, "backendca", labels=authorino_labels)
envoy_secret = create_secret(envoy_cert, "envoycert")

envoy = TLSEnvoy(openshift, authorino, blame("backend"), label, backend.url, testconfig["envoy"]["image"],
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
"""mTLS authentication tests"""
import pytest
from httpx import ReadError, ConnectError

from testsuite.objects import Rule


@pytest.fixture(scope="module", autouse=True)
def authorization(authorization, blame, selector_params, cert_attributes):
"""Create AuthConfig with mtls identity and pattern matching rule"""
authorization.remove_all_identities()

authorization.add_mtls_identity(blame("mtls"), *selector_params)
rule = Rule("auth.identity.Organization", "incl", cert_attributes["O"])
authorization.add_auth_rule(blame("redhat"), rule)
return authorization


def test_mtls_success(envoy_authority, valid_cert, envoy):
"""Test successful mtls authentication"""
with envoy.client(verify=envoy_authority, cert=valid_cert) as client:
response = client.get("/get")
assert response.status_code == 200


@pytest.mark.parametrize("cert_authority, certificate, err, err_match", [
pytest.param("envoy_authority", "invalid_cert", ReadError, "unknown ca", id="Invalid certificate"),
pytest.param("invalid_authority", "valid_cert", ConnectError, "certificate verify failed", id="Unknown authority"),
])
def test_mtls_fail(request, cert_authority, certificate, err, err_match: str, envoy):
"""Test mtls verification with invalid certificate or unknown signed authority"""
ca = request.getfixturevalue(cert_authority)
cert = request.getfixturevalue(certificate)

with pytest.raises(err, match=err_match):
with envoy.client(verify=ca, cert=cert) as client:
client.get("/get")


def test_mtls_unmatched_attributes(envoy_authority, custom_cert, envoy):
"""Test certificate that signed by the trusted CA, though their attributes are unmatched"""
with envoy.client(verify=envoy_authority, cert=custom_cert) as client:
response = client.get("/get")
assert response.status_code == 403
6 changes: 4 additions & 2 deletions testsuite/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,11 @@ def cert_builder(cfssl: CFSSLClient, chain: dict, hosts: Union[str, Collection[s
parsed_hosts = [parsed_hosts] # type: ignore

if info.ca or info.children:
cert = cfssl.create_authority(name, hosts=parsed_hosts, certificate_authority=parent)
cert = cfssl.create_authority(name, names=info.names,
hosts=parsed_hosts, certificate_authority=parent)
else:
cert = cfssl.create(name, hosts=parsed_hosts, certificate_authority=parent) # type: ignore
cert = cfssl.create(name, names=info.names,
hosts=parsed_hosts, certificate_authority=parent) # type: ignore
cert.chain = cert.certificate + parent.chain if parent else cert.certificate # type: ignore
if info.children is not None:
result.update(cert_builder(cfssl, info.children, parsed_hosts, cert))
Expand Down

0 comments on commit 8bb4c6e

Please sign in to comment.