Skip to content

Commit

Permalink
Add mtls identity tests
Browse files Browse the repository at this point in the history
Closes #31
  • Loading branch information
averevki committed Oct 20, 2022
1 parent d95c6d4 commit be36295
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 20 deletions.
15 changes: 13 additions & 2 deletions testsuite/certificates/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,16 @@ class UnsignedKey:

class CFSSLClient:
"""Client for working with CFSSL library"""
DEFAULT_NAMES = [
{
"O": "Red Hat Inc.",
"OU": "IT",
"L": "San Francisco",
"ST": "California",
"C": "US",
}
]

def __init__(self, binary) -> None:
super().__init__()
self.binary = binary
Expand Down Expand Up @@ -115,16 +125,16 @@ def create_authority(self,
:param names: dict of all names
:param certificate_authority: Optional Authority to sign this new authority, making it intermediate
"""
names = names or self.DEFAULT_NAMES
data = {
"CN": common_name,
"names": names,
"hosts": hosts,
"key": {
"algo": "rsa",
"size": 4096
},
}
if names:
data["names"] = names # type: ignore

result = self._execute_command("genkey", "-initca", "-", stdin=json.dumps(data))
key = UnsignedKey(key=result["key"], csr=result["csr"])
Expand All @@ -145,6 +155,7 @@ def create(self,
:param names: Names field in the csr
:param certificate_authority: Certificate Authority to be used for signing
"""
names = names or self.DEFAULT_NAMES
key = self.generate_key(common_name, names, hosts)
certificate = self.sign(key, certificate_authority=certificate_authority)
return certificate
9 changes: 6 additions & 3 deletions testsuite/openshift/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import enum
import os
from functools import cached_property
from typing import Dict
from typing import Dict, Optional

import openshift as oc
from openshift import Context, Selector, OpenShiftPythonException
Expand Down Expand Up @@ -126,9 +126,9 @@ def is_ready(self, selector: Selector):
success, _, _ = selector.until_all(success_func=lambda obj: "readyReplicas" in obj.model.status)
return success

def create_tls_secret(self, name: str, certificate: Certificate):
def create_tls_secret(self, name: str, certificate: Certificate, labels: Optional[Dict[str, str]] = None):
"""Creates a TLS secret"""
model = {
model: Dict = {
'kind': 'Secret',
'apiVersion': 'v1',
'metadata': {
Expand All @@ -140,6 +140,9 @@ def create_tls_secret(self, name: str, certificate: Certificate):
},
"type": "kubernetes.io/tls"
}
if labels is not None:
model["metadata"]["labels"] = labels

with self.context:
return oc.create(model, ["--save-config=true"])

Expand Down
60 changes: 45 additions & 15 deletions testsuite/tests/kuadrant/authorino/operator/tls/conftest.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""Conftest for all TLS-enabled tests"""
from typing import Dict
from typing import Optional, Dict

import pytest

Expand All @@ -11,20 +11,27 @@
@pytest.fixture(scope="session")
def cert_attributes() -> Dict[str, str]:
"""Certificate attributes"""
return dict(O="Red Hat Inc.",
OU="IT",
L="San Francisco",
ST="California",
C="US",)
return dict(O="Organization Test",
OU="Unit Test",
L="Location Test",
ST="State Test",
C="Country Test")


@pytest.fixture(scope="session")
def certificates(cfssl, authorino_domain, wildcard_domain, cert_attributes):
def cert_attributes_other(cert_attributes) -> Dict[str, str]:
"""Certificate attributes that are different from the default ones"""
return {k: f"{v}-other" for k, v in cert_attributes.items()}


@pytest.fixture(scope="session")
def certificates(cfssl, authorino_domain, wildcard_domain, cert_attributes, cert_attributes_other):
"""Certificate hierarchy used for the tests"""
chain = {
"envoy_ca": CertInfo(names=[cert_attributes], children={
"envoy_ca": CertInfo(children={
"envoy_cert": None,
"valid_cert": CertInfo(names=[cert_attributes])
"valid_cert": CertInfo(names=[cert_attributes]),
"custom_cert": CertInfo(names=[cert_attributes_other])
}),
"authorino_ca": CertInfo(children={
"authorino_cert": CertInfo(hosts=authorino_domain),
Expand All @@ -39,9 +46,9 @@ def certificates(cfssl, authorino_domain, wildcard_domain, cert_attributes):
@pytest.fixture(scope="session")
def create_secret(blame, request, openshift):
"""Creates TLS secret from Certificate"""
def _create_secret(certificate: Certificate, name):
def _create_secret(certificate: Certificate, name: str, labels: Optional[Dict[str, str]] = None):
secret_name = blame(name)
secret = openshift.create_tls_secret(secret_name, certificate)
secret = openshift.create_tls_secret(secret_name, certificate, labels=labels)
request.addfinalizer(lambda: openshift.delete_selector(secret))
return secret_name
return _create_secret
Expand Down Expand Up @@ -102,17 +109,40 @@ def valid_cert(certificates):


@pytest.fixture(scope="session")
def invalid_cert(invalid_authority, cfssl, wildcard_domain):
def custom_cert(certificates):
"""Envoy certificate that have different attributes"""
return certificates["custom_cert"]


@pytest.fixture(scope="session")
def invalid_cert(certificates):
"""Certificate rejected by Envoy"""
return cfssl.create("invalid", hosts=[wildcard_domain], certificate_authority=invalid_authority)
return certificates["invalid_cert"]


@pytest.fixture(scope="module")
def selector_params(module_label):
"""Label key-value pair for the CA secret discovery"""
return "testLabel", module_label


@pytest.fixture(scope="module")
def authorino_labels(selector_params) -> Dict[str, str]:
"""Labels for the proper Authorino discovery"""
labels = {
"authorino.kuadrant.io/managed-by": "authorino",
selector_params[0]: selector_params[1]
}
return labels


# pylint: disable-msg=too-many-locals
@pytest.fixture(scope="module")
def envoy(request, authorino, openshift, create_secret, blame, label, backend,
authorino_authority, envoy_authority, envoy_cert, testconfig):
authorino_authority, envoy_authority, envoy_cert, testconfig, authorino_labels):
"""Envoy + Httpbin backend"""
authorino_secret = create_secret(authorino_authority, "authca")
envoy_ca_secret = create_secret(envoy_authority, "backendca")
envoy_ca_secret = create_secret(envoy_authority, "backendca", labels=authorino_labels)
envoy_secret = create_secret(envoy_cert, "envoycert")

envoy = TLSEnvoy(openshift, authorino, blame("backend"), label, backend.url, testconfig["envoy"]["image"],
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
"""mTLS authentication tests"""
import pytest
from httpx import ReadError, ConnectError

from testsuite.objects import Rule


@pytest.fixture(scope="module", autouse=True)
def authorization(authorization, blame, selector_params, cert_attributes):
"""Create AuthConfig with mtls identity and pattern matching rule"""
authorization.remove_all_identities()

authorization.add_mtls_identity(blame("mtls"), *selector_params)
rule = Rule("auth.identity.Organization", "incl", cert_attributes["O"])
authorization.add_auth_rule(blame("redhat"), rule)
return authorization


def test_mtls_success(envoy_authority, valid_cert, envoy):
"""Test successful mtls authentication"""
with envoy.client(verify=envoy_authority, cert=valid_cert) as client:
response = client.get("/get")
assert response.status_code == 200


@pytest.mark.parametrize("cert_authority, certificate, err, err_match", [
pytest.param("envoy_authority", "invalid_cert", ReadError, "unknown ca", id="Invalid certificate"),
pytest.param("invalid_authority", "valid_cert", ConnectError, "certificate verify failed", id="Unknown authority"),
])
def test_mtls_fail(request, cert_authority, certificate, err, err_match: str, envoy):
"""Test mtls verification with invalid certificate or unknown signed authority"""
ca = request.getfixturevalue(cert_authority)
cert = request.getfixturevalue(certificate)

with pytest.raises(err, match=err_match):
with envoy.client(verify=ca, cert=cert) as client:
client.get("/get")


def test_mtls_unmatched_attributes(envoy_authority, custom_cert, envoy):
"""Test certificate that signed by the trusted CA, though their attributes are unmatched"""
with envoy.client(verify=envoy_authority, cert=custom_cert) as client:
response = client.get("/get")
assert response.status_code == 403

0 comments on commit be36295

Please sign in to comment.