diff --git a/README.md b/README.md index 47deb81b..470d0806 100644 --- a/README.md +++ b/README.md @@ -51,6 +51,8 @@ To run all tests you can then use ```make test``` For just running tests, the container image is the easiest option, you can log in to OpenShift and then run it like this +If you omit any options, Testsuite will run only subset of tests that don't require that variable e.g. not providing Auth0 will result in skipping Auth0 tests. + NOTE: For binding kubeconfig file, the "others" need to have permission to read, otherwise it will not work. The results and reports will be saved in `/test-run-results` in the container. @@ -61,6 +63,9 @@ podman run \ -v $HOME/.kube/config:/run/kubeconfig:z \ -e KUADRANT_OPENSHIFT__project=authorino \ -e KUADRANT_OPENSHIFT2__project=authorino2 \ + -e KUADRANT_AUTH0__url="AUTH0_URL" \ + -e KUADRANT_AUTH0__client_id="AUTH0_CLIENT_ID" \ + -e KUADRANT_AUTH0__client_secret="AUTH0_CLIENT_SECRET" \ quay.io/kuadrant/testsuite:latest ``` @@ -74,5 +79,8 @@ podman run \ -e KUADRANT_RHSSO__url="https://my-sso.net" \ -e KUADRANT_RHSSO__password="ADMIN_PASSWORD" \ -e KUADRANT_RHSSO__username="ADMIN_USERNAME" \ + -e KUADRANT_AUTH0__url="AUTH0_URL" \ + -e KUADRANT_AUTH0__client_id="AUTH0_CLIENT_ID" \ + -e KUADRANT_AUTH0__client_secret="AUTH0_CLIENT_SECRET" \ quay.io/kuadrant/testsuite:latest ``` diff --git a/config/settings.local.yaml.tpl b/config/settings.local.yaml.tpl index 7c2b9049..963d9cf6 100644 --- a/config/settings.local.yaml.tpl +++ b/config/settings.local.yaml.tpl @@ -16,9 +16,9 @@ # username: "testUser" # password: "testPassword" # auth0: -# client: "CLIENT_ID" -# client-secret: "CLIENT_SECRET" -# domain: "AUTH0_DOMAIN" +# client_id: "CLIENT_ID" +# client_secret: "CLIENT_SECRET" +# url: "AUTH0_URL" # cfssl: "cfssl" # Path to the CFSSL library for TLS tests # authorino: # image: "quay.io/kuadrant/authorino:latest" # If specified will override the authorino image diff --git a/testsuite/httpx/auth.py b/testsuite/httpx/auth.py index 83e3e2b9..dad1cb62 100644 --- a/testsuite/httpx/auth.py +++ b/testsuite/httpx/auth.py @@ -1,20 +1,26 @@ """Auth Classes for HttpX""" -import typing -from typing import Generator +from functools import cached_property +from typing import Generator, Callable, Union from httpx import Auth, Request, URL, Response from testsuite.openshift.objects.api_key import APIKey -from testsuite.rhsso import Client +from testsuite.oidc import Token class HttpxOidcClientAuth(Auth): """Auth class for Httpx client for product secured by oidc""" - def __init__(self, client: Client, location, username=None, password=None) -> None: + def __init__(self, token: Union[Token, Callable[[], Token]], location="authorization") -> None: self.location = location - self.oidc_client = client.oidc_client - self.token = self.oidc_client.token(username, password) + self._token = token + + @cached_property + def token(self): + """Lazily retrieves token from OIDC provider""" + if callable(self._token): + return self._token() + return self._token def _add_credentials(self, request: Request, token): if self.location == 'authorization': @@ -27,13 +33,13 @@ def _add_credentials(self, request: Request, token): raise ValueError(f"Unknown credentials location '{self.location}'") def auth_flow(self, request: Request) -> Generator[Request, Response, None]: - self._add_credentials(request, self.token["access_token"]) + self._add_credentials(request, self.token.access_token) response = yield request if response.status_code == 403: # Renew access token and try again - self.token = self.oidc_client.refresh_token(self.token["refresh_token"]) - self._add_credentials(request, self.token["access_token"]) + self.token.refresh() + self._add_credentials(request, self.token.access_token) yield request @@ -45,18 +51,6 @@ def __init__(self, api_key: APIKey, prefix: str = "APIKEY") -> None: self.api_key = str(api_key) self.prefix = prefix - def auth_flow(self, request: Request) -> typing.Generator[Request, Response, None]: + def auth_flow(self, request: Request) -> Generator[Request, Response, None]: request.headers["Authorization"] = f"{self.prefix} {self.api_key}" yield request - - -class Auth0Auth(Auth): - """Auth class for authentication with Auth0 token""" - - def __init__(self, token: str) -> None: - super().__init__() - self.token = token - - def auth_flow(self, request: Request) -> typing.Generator[Request, Response, None]: - request.headers["Authorization"] = f"Bearer {self.token}" - yield request diff --git a/testsuite/oidc/__init__.py b/testsuite/oidc/__init__.py new file mode 100644 index 00000000..86169710 --- /dev/null +++ b/testsuite/oidc/__init__.py @@ -0,0 +1,32 @@ +"""Common classes for OIDC provider""" +from abc import ABC, abstractmethod +from dataclasses import dataclass +from typing import Callable, Tuple + + +@dataclass +class Token: + """Token class""" + access_token: str + refresh_function: Callable[[str], "Token"] + refresh_token: str + + def refresh(self) -> "Token": + """Refreshes token""" + return self.refresh_function(self.refresh_token) + + def __str__(self) -> str: + return self.access_token + + +class OIDCProvider(ABC): + """Interface for all methods we need for OIDCProvider""" + + @property + @abstractmethod + def well_known(self): + """Dict (or a dict-like structure) access to all well_known URLS""" + + @abstractmethod + def get_token(self, username=None, password=None) -> Token: + """Returns Token wrapper class with current access token and ability to refresh it""" diff --git a/testsuite/oidc/auth0.py b/testsuite/oidc/auth0.py new file mode 100644 index 00000000..1bb51c74 --- /dev/null +++ b/testsuite/oidc/auth0.py @@ -0,0 +1,42 @@ +"""Module containing all classes necessary to work with Auth0""" +from functools import cached_property + +import httpx + +from testsuite.oidc import OIDCProvider, Token + + +class Auth0Provider(OIDCProvider): + """Auth0 OIDC provider""" + + def __init__(self, domain, client_id, client_secret) -> None: + super().__init__() + self.domain = domain + self.client_id = client_id + self.client_secret = client_secret + + @property + def token_endpoint(self): + """Returns token_endpoint URL""" + return self.well_known["token_endpoint"] + + @cached_property + def well_known(self): + response = httpx.get(self.domain + "/.well-known/openid-configuration") + return response.json() + + # pylint: disable=unused-argument + def refresh_token(self, refresh_token): + """Refresh tokens are not yet implemented for Auth0, will attempt to acquire new token instead""" + return self.get_token() + + def get_token(self, username=None, password=None) -> Token: + response = httpx.post(self.token_endpoint, json={ + "client_id": self.client_id, + "client_secret": self.client_secret, + "grant_type": "client_credentials", + "audience": self.domain + "/api/v2/" + }) + data = response.json() + assert response.status_code == 200, f"Unable to acquire token from Auth0, reason: {data}" + return Token(data["access_token"], self.refresh_token, "None") diff --git a/testsuite/oidc/rhsso/__init__.py b/testsuite/oidc/rhsso/__init__.py new file mode 100644 index 00000000..2bc4f4a0 --- /dev/null +++ b/testsuite/oidc/rhsso/__init__.py @@ -0,0 +1,87 @@ +"""Objects for RHSSO""" +from functools import cached_property +from urllib.parse import urlparse + +from keycloak import KeycloakOpenID, KeycloakAdmin, KeycloakPostError + +from testsuite.oidc import OIDCProvider, Token +from testsuite.objects import LifecycleObject +from .objects import Realm, Client + + +# pylint: disable=too-many-instance-attributes +class RHSSO(OIDCProvider, LifecycleObject): + """ + OIDCProvider implementation for RHSSO. It creates Realm, client and user. + """ + + def __init__(self, server_url, username, password, realm_name, client_name, + test_username="testUser", test_password="testPassword") -> None: + self.test_username = test_username + self.test_password = test_password + self.realm_name = realm_name + self.client_name = client_name + self.realm = None + self.user = None + self.client = None + + try: + self.master = KeycloakAdmin(server_url=server_url, + username=username, + password=password, + realm_name="master", + verify=False, + auto_refresh_token=['get', 'put', 'post', 'delete']) + self.server_url = server_url + except KeycloakPostError: + # Older Keycloaks versions (and RHSSO) needs requires url to be pointed at auth/ endpoint + # pylint: disable=protected-access + self.server_url = urlparse(server_url)._replace(path="auth/").geturl() + self.master = KeycloakAdmin(server_url=self.server_url, + username=username, + password=password, + realm_name="master", + verify=False, + auto_refresh_token=['get', 'put', 'post', 'delete']) + + def create_realm(self, name: str, **kwargs) -> Realm: + """Creates new realm""" + self.master.create_realm(payload={ + "realm": name, + "enabled": True, + "sslRequired": "None", + **kwargs + }) + return Realm(self.master, name) + + def commit(self): + self.realm: Realm = self.create_realm(self.realm_name, accessTokenLifespan=24 * 60 * 60) + + self.client = self.realm.create_client( + name=self.client_name, + directAccessGrantsEnabled=True, + publicClient=False, + protocol="openid-connect", + standardFlowEnabled=False) + self.user = self.realm.create_user(self.test_username, self.test_password) + + def delete(self): + self.realm.delete() + + @property + def oidc_client(self) -> KeycloakOpenID: + """OIDCClient for the created client""" + return self.client.oidc_client # type: ignore + + @cached_property + def well_known(self): + return self.oidc_client.well_known() + + def refresh_token(self, refresh_token): + """Refreshes token""" + data = self.oidc_client.refresh_token(refresh_token) + return Token(data["access_token"], self.refresh_token, data["refresh_token"]) + + def get_token(self, username=None, password=None) -> Token: + data = self.oidc_client.token(username or self.test_username, password or self.test_password) + return Token(data["access_token"], self.refresh_token, data["refresh_token"]) diff --git a/testsuite/rhsso/objects.py b/testsuite/oidc/rhsso/objects.py similarity index 64% rename from testsuite/rhsso/objects.py rename to testsuite/oidc/rhsso/objects.py index 41f77904..e89e2a33 100644 --- a/testsuite/rhsso/objects.py +++ b/testsuite/oidc/rhsso/objects.py @@ -1,7 +1,7 @@ """Object wrappers of RHSSO resources""" -from urllib.parse import urlparse +from functools import cached_property -from keycloak import KeycloakOpenID, KeycloakAdmin, KeycloakPostError +from keycloak import KeycloakOpenID, KeycloakAdmin class Realm: @@ -80,51 +80,10 @@ def assign_role(self, role_name): role = self.admin.get_client_role(realm_management, role_name) self.admin.assign_client_role(user["id"], realm_management, role) - @property + @cached_property def oidc_client(self): """OIDC client""" # Note This is different clientId (clientId) than self.client_id (Id), because RHSSO client_id = self.admin.get_client(self.client_id)["clientId"] secret = self.admin.get_client_secrets(self.client_id)["value"] return self.realm.oidc_client(client_id, secret) - - -class RHSSO: - """Helper class for RHSSO server""" - - def __init__(self, server_url, username, password) -> None: - try: - self.master = KeycloakAdmin(server_url=server_url, - username=username, - password=password, - realm_name="master", - verify=False, - auto_refresh_token=['get', 'put', 'post', 'delete']) - self.server_url = server_url - except KeycloakPostError: - # Older Keycloaks versions (and RHSSO) needs requires url to be pointed at auth/ endpoint - # pylint: disable=protected-access - self.server_url = urlparse(server_url)._replace(path="auth/").geturl() - self.master = KeycloakAdmin(server_url=self.server_url, - username=username, - password=password, - realm_name="master", - verify=False, - auto_refresh_token=['get', 'put', 'post', 'delete']) - - def create_realm(self, name: str, **kwargs) -> Realm: - """Creates new realm""" - self.master.create_realm(payload={ - "realm": name, - "enabled": True, - "sslRequired": "None", - **kwargs - }) - return Realm(self.master, name) - - def create_oidc_client(self, realm, client_id, secret) -> KeycloakOpenID: - """Creates OIDC client""" - return KeycloakOpenID(server_url=self.master.server_url, - client_id=client_id, - realm_name=realm, - client_secret_key=secret) diff --git a/testsuite/rhsso/__init__.py b/testsuite/rhsso/__init__.py deleted file mode 100644 index 70d26e28..00000000 --- a/testsuite/rhsso/__init__.py +++ /dev/null @@ -1,111 +0,0 @@ -"""Objects for RHSSO""" -import backoff -from keycloak import KeycloakGetError, KeycloakOpenID - -from testsuite.rhsso.objects import RHSSO, Realm, Client - - -class RHSSOServiceConfiguration: - """ - Wrapper for all information that tests need to know about RHSSO - """ - - def __init__(self, rhsso: RHSSO, realm: Realm, client: Client, user, username, password) -> None: - self.rhsso = rhsso - self.realm = realm - self.user = user - self.client = client - self.username = username - self.password = password - self._oidc_client = None - - @property - def oidc_client(self) -> KeycloakOpenID: - """OIDCClient for the created client""" - if not self._oidc_client: - self._oidc_client = self.client.oidc_client - return self._oidc_client - - def issuer_url(self) -> str: - """ - Returns issuer url for 3scale in format - http(s)://:/auth/realms/ - :return: url - """ - return self.oidc_client.well_known()["issuer"] - - def jwks_uri(self): - """ - Returns jwks uri for 3scale in format - http(s)://:o/auth/realms//protocol/openid-connect/certs - :return: url - """ - return self.oidc_client.well_known()["jwks_uri"] - - def authorization_url(self) -> str: - """ - Returns authorization url for 3scale in format - http(s)://:@:/auth/realms/ - :return: url - """ - url = self.issuer_url() - client_id = self.oidc_client.client_id - secret = self.oidc_client.client_secret_key - return url.replace("://", f"://{client_id}:{secret}@", 1) - - @backoff.on_exception(backoff.fibo, KeycloakGetError, max_tries=8, jitter=None) - def password_authorize(self, client_id=None, secret=None, username=None, password=None): - """Returns token retrieved by password authentication""" - username = username or self.username - password = password or self.password - client_id = client_id or self.oidc_client.client_id - secret = secret or self.oidc_client.client_secret_key - return self.realm.oidc_client(client_id, secret).token(username, password) - - def token_url(self) -> str: - """ - Returns token endpoint url - http(s)://:/auth/realms//protocol/openid-connect/token - :return: url - """ - return self.oidc_client.well_known()["token_endpoint"] - - def body_for_token_creation(self, app, use_service_accounts=False) -> str: - """ - Returns body for creation of token - :return: body - """ - app_key = app.keys.list()["keys"][0]["key"]["value"] - app_id = app["client_id"] - grant_type = "client_credentials" if use_service_accounts else "password" - user_credentials = "" if use_service_accounts else f"&username={self.username}&password={self.password}" - return f"grant_type={grant_type}&client_id={app_id}&client_secret={app_key}{user_credentials}" - - def __getstate__(self): - """ - Custom serializer for pickle module - more info here: https://docs.python.org/3/library/pickle.html#object.__getstate__ - """ - return {"client": self.client.client_id, - "realm": self.realm.name, - "rhsso": {"url": self.rhsso.server_url, - "username": self.rhsso.master.username, - "password": self.rhsso.master.password}, - "user": self.user, - "username": self.username, - "password": self.password} - - def __setstate__(self, state): - """ - Custom deserializer for pickle module - more info here: https://docs.python.org/3/library/pickle.html#object.__setstate__ - """ - self.rhsso = RHSSO(server_url=state["rhsso"]["url"], - username=state["rhsso"]["username"], - password=state["rhsso"]["password"]) - self.realm = Realm(self.rhsso.master, state["realm"]) - self.user = state["user"] - self.client = Client(self.realm, state["client"]) - self.username = state["username"] - self.password = state["password"] - self._oidc_client = self.client.oidc_client diff --git a/testsuite/tests/conftest.py b/testsuite/tests/conftest.py index 2c0c1ee5..360e9f08 100644 --- a/testsuite/tests/conftest.py +++ b/testsuite/tests/conftest.py @@ -4,10 +4,12 @@ import pytest from keycloak import KeycloakAuthenticationError +from testsuite.oidc import OIDCProvider from testsuite.config import settings +from testsuite.oidc.auth0 import Auth0Provider from testsuite.openshift.httpbin import Httpbin from testsuite.openshift.envoy import Envoy -from testsuite.rhsso import RHSSO, Realm, RHSSOServiceConfiguration +from testsuite.oidc.rhsso import RHSSO from testsuite.utils import randomize, _whoami @@ -38,38 +40,38 @@ def openshift2(testconfig): @pytest.fixture(scope="session") -def rhsso_service_info(request, testconfig, blame): - """ - Set up client for zync - :return: dict with all important details - """ +def rhsso(request, testconfig, blame): + """RHSSO OIDC Provider fixture""" cnf = testconfig["rhsso"] try: - rhsso = RHSSO(server_url=cnf["url"], - username=cnf["username"], - password=cnf["password"]) + info = RHSSO(cnf["url"], cnf["username"], cnf["password"], blame("realm"), blame("client"), + cnf["test_user"]["password"], cnf["test_user"]["username"]) + + if not testconfig["skip_cleanup"]: + request.addfinalizer(info.delete) + + info.commit() + return info except KeycloakAuthenticationError: return pytest.skip("Unable to login into SSO, please check the credentials provided") except KeyError as exc: return pytest.skip(f"SSO configuration item is missing: {exc}") - realm: Realm = rhsso.create_realm(blame("realm"), accessTokenLifespan=24*60*60) - - if not testconfig["skip_cleanup"]: - request.addfinalizer(realm.delete) - client = realm.create_client( - name=blame("client"), - directAccessGrantsEnabled=True, - publicClient=False, - protocol="openid-connect", - standardFlowEnabled=False) +@pytest.fixture(scope="session") +def auth0(testconfig): + """Auth0 OIDC provider fixture""" + try: + section = testconfig["auth0"] + return Auth0Provider(section["url"], section["client_id"], section["client_secret"]) + except KeyError as exc: + return pytest.skip(f"Auth0 configuration item is missing: {exc}") - username = cnf["test_user"]["username"] - password = cnf["test_user"]["password"] - user = realm.create_user(username, password) - return RHSSOServiceConfiguration(rhsso, realm, client, user, username, password) +@pytest.fixture(scope="session") +def oidc_provider(rhsso) -> OIDCProvider: + """Fixture which enables switching out OIDC providers for individual modules""" + return rhsso @pytest.fixture(scope="session") diff --git a/testsuite/tests/kuadrant/authorino/conftest.py b/testsuite/tests/kuadrant/authorino/conftest.py index ce98303a..95bd6b8b 100644 --- a/testsuite/tests/kuadrant/authorino/conftest.py +++ b/testsuite/tests/kuadrant/authorino/conftest.py @@ -42,20 +42,19 @@ def authorino(authorino, openshift, blame, request, testconfig, module_label, au # pylint: disable=unused-argument @pytest.fixture(scope="module") -def authorization(authorization, authorino, envoy, blame, openshift, module_label, rhsso_service_info) -> Authorization: +def authorization(authorization, authorino, envoy, blame, openshift, module_label, oidc_provider) -> Authorization: """In case of Authorino, AuthConfig used for authorization""" if authorization is None: authorization = AuthConfig.create_instance(openshift, blame("ac"), envoy.hostname, labels={"testRun": module_label}) - authorization.add_oidc_identity("rhsso", rhsso_service_info.issuer_url()) + authorization.add_oidc_identity("rhsso", oidc_provider.well_known["issuer"]) return authorization @pytest.fixture(scope="module") -def auth(rhsso_service_info): +def auth(oidc_provider): """Returns RHSSO authentication object for HTTPX""" - return HttpxOidcClientAuth(rhsso_service_info.client, "authorization", - rhsso_service_info.username, rhsso_service_info.password) + return HttpxOidcClientAuth(oidc_provider.get_token, "authorization") @pytest.fixture(scope="module") diff --git a/testsuite/tests/kuadrant/authorino/identity/auth/conftest.py b/testsuite/tests/kuadrant/authorino/identity/auth/conftest.py deleted file mode 100644 index c8c767cd..00000000 --- a/testsuite/tests/kuadrant/authorino/identity/auth/conftest.py +++ /dev/null @@ -1,67 +0,0 @@ -"""Conftest for auth tests""" -import http.client -import json - -import pytest - -from testsuite.config import settings -from testsuite.httpx.auth import Auth0Auth - - -@pytest.fixture(scope="module") -def auth0_token(): - """Token for Auth0 API""" - try: - auth = settings["auth0"] - conn = http.client.HTTPSConnection(auth['domain']) - payload = '{' + f"\"client_id\":\"{auth['client']}\"," \ - f"\"client_secret\":\"{auth['client-secret']}\"," \ - f"\"audience\":\"https://{auth['domain']}/api/v2/\",\"grant_type\":\"client_credentials\"" + '}' - headers = {'content-type': "application/json"} - conn.request("POST", "/oauth/token", payload, headers) - res = conn.getresponse() - data = res.read() - return json.loads(data.decode("utf-8"))["access_token"] - except KeyError as exc: - return pytest.skip(f"Auth0 configuration item is missing: {exc}") - - -@pytest.fixture(scope="module") -def auth0_authorization(authorization): - """Add Auth0 identity to AuthConfig""" - try: - authorization.add_oidc_identity("auth", f"https://{settings['auth0']['domain']}/") - return authorization - except KeyError as exc: - return pytest.skip(f"Auth0 domain configuration is missing: {exc}") - - -@pytest.fixture(scope="module") -def auth0_auth(auth0_token): - """Returns Auth0 authentication object for HTTPX""" - return Auth0Auth(auth0_token) - - -@pytest.fixture(scope="module") -def rhsso_authorization(authorization, rhsso_service_info): - """Add RHSSO identity to AuthConfig""" - authorization.add_oidc_identity("rhsso", rhsso_service_info.issuer_url()) - return authorization - - -# pylint: disable=unused-argument -@pytest.fixture(scope="module") -def auth0_client(auth0_authorization, envoy): - """Returns httpx client to be used for requests, it also commits AuthConfig""" - client = envoy.client() - yield client - client.close() - - -# pylint: disable=unused-argument -@pytest.fixture(scope="module") -def rhsso_client(rhsso_authorization, envoy): - """Returns httpx client to be used for requests, it also commits AuthConfig""" - client = envoy.client() - yield client - client.close() diff --git a/testsuite/tests/kuadrant/authorino/identity/auth/test_auth_identity.py b/testsuite/tests/kuadrant/authorino/identity/auth/test_auth_identity.py index ae542df8..9c920c0a 100644 --- a/testsuite/tests/kuadrant/authorino/identity/auth/test_auth_identity.py +++ b/testsuite/tests/kuadrant/authorino/identity/auth/test_auth_identity.py @@ -1,36 +1,54 @@ """Tests basic authentication with RHSSO/Auth0 as identity provider""" import pytest +from testsuite.httpx.auth import HttpxOidcClientAuth +from testsuite.oidc import OIDCProvider +from testsuite.oidc.rhsso import RHSSO -@pytest.mark.parametrize(("client_fixture", "auth_fixture"), [ - pytest.param("rhsso_client", "auth", id="RHSSO"), - pytest.param("auth0_client", "auth0_auth", id="Auth0"), -]) -def test_correct_auth(client_fixture, auth_fixture, request): + +@pytest.fixture(scope="module") +def authorization(authorization, oidc_provider): + """Add RHSSO identity to AuthConfig""" + authorization.add_oidc_identity("rhsso", oidc_provider.well_known["issuer"]) + return authorization + + +@pytest.fixture(scope="module", params=("rhsso", "auth0")) +def oidc_provider(request) -> OIDCProvider: + """Fixture which enables switching out OIDC providers for individual modules""" + return request.getfixturevalue(request.param) + + +@pytest.fixture(scope="module") +def wrong_auth(oidc_provider, auth0, rhsso): + """Different (but valid) auth than was configured""" + token = rhsso.get_token + if isinstance(oidc_provider, RHSSO): + token = auth0.get_token + return HttpxOidcClientAuth(token) + + +def test_correct_auth(client, auth): """Tests correct auth""" - client = request.getfixturevalue(client_fixture) - auth = request.getfixturevalue(auth_fixture) response = client.get("/get", auth=auth) assert response.status_code == 200 -@pytest.mark.parametrize("client_fixture", [ - pytest.param("rhsso_client", id="RHSSO"), - pytest.param("auth0_client", id="Auth0"), -]) -def test_no_auth(client_fixture, request): +def test_wrong_auth(wrong_auth, client): + """Tests request with wrong token""" + response = client.get("/get", auth=wrong_auth) + assert response.status_code == 401 + + +# pylint: disable=unused-argument +def test_no_auth(client): """Tests request without any auth""" - client = request.getfixturevalue(client_fixture) response = client.get("/get") assert response.status_code == 401 -@pytest.mark.parametrize("client_fixture", [ - pytest.param("rhsso_client", id="RHSSO"), - pytest.param("auth0_client", id="Auth0"), -]) -def test_wrong_auth(client_fixture, request): - """Tests request with wrong token""" - client = request.getfixturevalue(client_fixture) +# pylint: disable=unused-argument +def test_invalid_auth(client): + """Tests request with invalid token""" response = client.get("/get", headers={"Authorization": "Bearer xyz"}) assert response.status_code == 401 diff --git a/testsuite/tests/kuadrant/authorino/identity/auth/test_multiple_auth_identities.py b/testsuite/tests/kuadrant/authorino/identity/auth/test_multiple_auth_identities.py index 9c5e7d5d..76467742 100644 --- a/testsuite/tests/kuadrant/authorino/identity/auth/test_multiple_auth_identities.py +++ b/testsuite/tests/kuadrant/authorino/identity/auth/test_multiple_auth_identities.py @@ -4,32 +4,45 @@ import pytest +from testsuite.httpx.auth import HttpxOidcClientAuth + + +@pytest.fixture(scope="module") +def auth0_auth(auth0): + """Returns Auth0 authentication object for HTTPX""" + return HttpxOidcClientAuth(auth0.get_token) + + +@pytest.fixture(scope="module") +def rhsso_auth(rhsso): + """Returns RHSSO authentication object for HTTPX""" + return HttpxOidcClientAuth(rhsso.get_token) + -# pylint: disable=unused-argument @pytest.fixture(scope="module") -def client(auth0_authorization, rhsso_authorization, envoy): - """Returns httpx client to be used for requests, it also commits AuthConfig""" - client = envoy.client() - yield client - client.close() +def authorization(authorization, auth0, rhsso): + """Add both RHSSO and Auth0 identities""" + authorization.add_oidc_identity("rhsso", rhsso.well_known["issuer"]) + authorization.add_oidc_identity("auth0", auth0.well_known["issuer"]) + return authorization -def test_correct_auth(client, auth, auth0_auth): +def test_correct_auth(client, rhsso_auth, auth0_auth): """Tests correct auth""" - response = client.get("/get", auth=auth) + response = client.get("/get", auth=rhsso_auth) assert response.status_code == 200 response = client.get("/get", auth=auth0_auth) assert response.status_code == 200 -def test_no_auth(client, request): +def test_no_auth(client): """Tests request without any auth""" response = client.get("/get") assert response.status_code == 401 -def test_wrong_auth(client, request): +def test_wrong_auth(client): """Tests request with wrong token""" response = client.get("/get", headers={"Authorization": "Bearer xyz"}) assert response.status_code == 401 diff --git a/testsuite/tests/kuadrant/authorino/identity/rhsso/conftest.py b/testsuite/tests/kuadrant/authorino/identity/rhsso/conftest.py index f7cd22e4..3f3d43fe 100644 --- a/testsuite/tests/kuadrant/authorino/identity/rhsso/conftest.py +++ b/testsuite/tests/kuadrant/authorino/identity/rhsso/conftest.py @@ -2,8 +2,14 @@ import pytest +@pytest.fixture(scope="session") +def oidc_provider(rhsso): + """Fixture which enables switching out OIDC providers for individual modules""" + return rhsso + + @pytest.fixture(scope="module") -def authorization(authorization, rhsso_service_info): +def authorization(authorization, rhsso): """Add RHSSO identity to AuthConfig""" - authorization.add_oidc_identity("rhsso", rhsso_service_info.issuer_url()) + authorization.add_oidc_identity("rhsso", rhsso.well_known["issuer"]) return authorization diff --git a/testsuite/tests/kuadrant/authorino/identity/rhsso/test_rhsso_roles.py b/testsuite/tests/kuadrant/authorino/identity/rhsso/test_rhsso_roles.py index 31255dd7..5646a647 100644 --- a/testsuite/tests/kuadrant/authorino/identity/rhsso/test_rhsso_roles.py +++ b/testsuite/tests/kuadrant/authorino/identity/rhsso/test_rhsso_roles.py @@ -4,19 +4,19 @@ @pytest.fixture(scope="function") -def user_with_role(rhsso_service_info, realm_role, blame): +def user_with_role(rhsso, realm_role, blame): """Creates new user and adds him into realm_role""" username = blame("someuser") password = blame("password") - user_id = rhsso_service_info.realm.create_user(username, password) - rhsso_service_info.realm.assign_realm_role(realm_role, user_id) + user_id = rhsso.realm.create_user(username, password) + rhsso.realm.assign_realm_role(realm_role, user_id) return {"id": user_id, "username": username, "password": password} @pytest.fixture(scope="module") -def realm_role(rhsso_service_info, blame): +def realm_role(rhsso, blame): """Creates new realm role""" - return rhsso_service_info.realm.create_realm_role(blame("role")) + return rhsso.realm.create_realm_role(blame("role")) @pytest.fixture(scope="module") @@ -26,10 +26,10 @@ def authorization(authorization, realm_role, blame): return authorization -def test_user_with_role(client, user_with_role, rhsso_service_info): +def test_user_with_role(client, user_with_role, rhsso): """Test request when user does have required role using new user with assigned role""" - auth = HttpxOidcClientAuth(rhsso_service_info.client, "authorization", - user_with_role["username"], user_with_role["password"]) + auth = HttpxOidcClientAuth(rhsso.get_token(user_with_role["username"], user_with_role["password"]), + "authorization") response = client.get("/get", auth=auth) assert response.status_code == 200 diff --git a/testsuite/tests/kuadrant/authorino/operator/clusterwide/conftest.py b/testsuite/tests/kuadrant/authorino/operator/clusterwide/conftest.py index c4fbaaaa..d576fe81 100644 --- a/testsuite/tests/kuadrant/authorino/operator/clusterwide/conftest.py +++ b/testsuite/tests/kuadrant/authorino/operator/clusterwide/conftest.py @@ -17,10 +17,10 @@ def hostname2(envoy, blame): @pytest.fixture(scope="module") -def authorization2(hostname2, blame, openshift2, module_label, rhsso_service_info): +def authorization2(hostname2, blame, openshift2, module_label, oidc_provider): """Second valid hostname""" auth = AuthConfig.create_instance(openshift2, blame("ac"), hostname2, labels={"testRun": module_label}) - auth.add_oidc_identity("rhsso", rhsso_service_info.issuer_url()) + auth.add_oidc_identity("rhsso", oidc_provider.well_known["issuer"]) return auth