-
Notifications
You must be signed in to change notification settings - Fork 15
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #74 from pehala/rhsso_rewrite
Rework RHSSO into a more general OIDCProvider interface
- Loading branch information
Showing
16 changed files
with
299 additions
and
317 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
"""Common classes for OIDC provider""" | ||
from abc import ABC, abstractmethod | ||
from dataclasses import dataclass | ||
from typing import Callable, Tuple | ||
|
||
|
||
@dataclass | ||
class Token: | ||
"""Token class""" | ||
access_token: str | ||
refresh_function: Callable[[str], "Token"] | ||
refresh_token: str | ||
|
||
def refresh(self) -> "Token": | ||
"""Refreshes token""" | ||
return self.refresh_function(self.refresh_token) | ||
|
||
def __str__(self) -> str: | ||
return self.access_token | ||
|
||
|
||
class OIDCProvider(ABC): | ||
"""Interface for all methods we need for OIDCProvider""" | ||
|
||
@property | ||
@abstractmethod | ||
def well_known(self): | ||
"""Dict (or a dict-like structure) access to all well_known URLS""" | ||
|
||
@abstractmethod | ||
def get_token(self, username=None, password=None) -> Token: | ||
"""Returns Token wrapper class with current access token and ability to refresh it""" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
"""Module containing all classes necessary to work with Auth0""" | ||
from functools import cached_property | ||
|
||
import httpx | ||
|
||
from testsuite.oidc import OIDCProvider, Token | ||
|
||
|
||
class Auth0Provider(OIDCProvider): | ||
"""Auth0 OIDC provider""" | ||
|
||
def __init__(self, domain, client_id, client_secret) -> None: | ||
super().__init__() | ||
self.domain = domain | ||
self.client_id = client_id | ||
self.client_secret = client_secret | ||
|
||
@property | ||
def token_endpoint(self): | ||
"""Returns token_endpoint URL""" | ||
return self.well_known["token_endpoint"] | ||
|
||
@cached_property | ||
def well_known(self): | ||
response = httpx.get(self.domain + "/.well-known/openid-configuration") | ||
return response.json() | ||
|
||
# pylint: disable=unused-argument | ||
def refresh_token(self, refresh_token): | ||
"""Refresh tokens are not yet implemented for Auth0, will attempt to acquire new token instead""" | ||
return self.get_token() | ||
|
||
def get_token(self, username=None, password=None) -> Token: | ||
response = httpx.post(self.token_endpoint, json={ | ||
"client_id": self.client_id, | ||
"client_secret": self.client_secret, | ||
"grant_type": "client_credentials", | ||
"audience": self.domain + "/api/v2/" | ||
}) | ||
data = response.json() | ||
assert response.status_code == 200, f"Unable to acquire token from Auth0, reason: {data}" | ||
return Token(data["access_token"], self.refresh_token, "None") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
"""Objects for RHSSO""" | ||
from functools import cached_property | ||
from urllib.parse import urlparse | ||
|
||
from keycloak import KeycloakOpenID, KeycloakAdmin, KeycloakPostError | ||
|
||
from testsuite.oidc import OIDCProvider, Token | ||
from testsuite.objects import LifecycleObject | ||
from .objects import Realm, Client | ||
|
||
|
||
# pylint: disable=too-many-instance-attributes | ||
class RHSSO(OIDCProvider, LifecycleObject): | ||
""" | ||
OIDCProvider implementation for RHSSO. It creates Realm, client and user. | ||
""" | ||
|
||
def __init__(self, server_url, username, password, realm_name, client_name, | ||
test_username="testUser", test_password="testPassword") -> None: | ||
self.test_username = test_username | ||
self.test_password = test_password | ||
self.realm_name = realm_name | ||
self.client_name = client_name | ||
self.realm = None | ||
self.user = None | ||
self.client = None | ||
|
||
try: | ||
self.master = KeycloakAdmin(server_url=server_url, | ||
username=username, | ||
password=password, | ||
realm_name="master", | ||
verify=False, | ||
auto_refresh_token=['get', 'put', 'post', 'delete']) | ||
self.server_url = server_url | ||
except KeycloakPostError: | ||
# Older Keycloaks versions (and RHSSO) needs requires url to be pointed at auth/ endpoint | ||
# pylint: disable=protected-access | ||
self.server_url = urlparse(server_url)._replace(path="auth/").geturl() | ||
self.master = KeycloakAdmin(server_url=self.server_url, | ||
username=username, | ||
password=password, | ||
realm_name="master", | ||
verify=False, | ||
auto_refresh_token=['get', 'put', 'post', 'delete']) | ||
|
||
def create_realm(self, name: str, **kwargs) -> Realm: | ||
"""Creates new realm""" | ||
self.master.create_realm(payload={ | ||
"realm": name, | ||
"enabled": True, | ||
"sslRequired": "None", | ||
**kwargs | ||
}) | ||
return Realm(self.master, name) | ||
|
||
def commit(self): | ||
self.realm: Realm = self.create_realm(self.realm_name, accessTokenLifespan=24 * 60 * 60) | ||
|
||
self.client = self.realm.create_client( | ||
name=self.client_name, | ||
directAccessGrantsEnabled=True, | ||
publicClient=False, | ||
protocol="openid-connect", | ||
standardFlowEnabled=False) | ||
self.user = self.realm.create_user(self.test_username, self.test_password) | ||
|
||
def delete(self): | ||
self.realm.delete() | ||
|
||
@property | ||
def oidc_client(self) -> KeycloakOpenID: | ||
"""OIDCClient for the created client""" | ||
return self.client.oidc_client # type: ignore | ||
|
||
@cached_property | ||
def well_known(self): | ||
return self.oidc_client.well_known() | ||
|
||
def refresh_token(self, refresh_token): | ||
"""Refreshes token""" | ||
data = self.oidc_client.refresh_token(refresh_token) | ||
return Token(data["access_token"], self.refresh_token, data["refresh_token"]) | ||
|
||
def get_token(self, username=None, password=None) -> Token: | ||
data = self.oidc_client.token(username or self.test_username, password or self.test_password) | ||
return Token(data["access_token"], self.refresh_token, data["refresh_token"]) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.