Skip to content

Commit

Permalink
Test authentication flow
Browse files Browse the repository at this point in the history
  • Loading branch information
disrupted committed Feb 29, 2024
1 parent 50ba975 commit e231db2
Showing 1 changed file with 30 additions and 21 deletions.
51 changes: 30 additions & 21 deletions tests/test_oauth.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,21 +89,20 @@ def bar(
def test_keycloak_setup(self, keycloak: KeycloakAdmin):
assert keycloak.connection.realm_name == "bakdata"

def test_protected_endpoint(self, client: TestClient):
response = client.get("/")
@pytest.mark.parametrize("endpoint", ["/", "/foo", "/bar"])
def test_protected_endpoint(self, client: TestClient, endpoint: str):
response = client.get(endpoint)
assert response.status_code == status.HTTP_401_UNAUTHORIZED

# TODO: test flow: access protected endpoint -> redirect to login -> auth callback -> access resource -> logout -> redirect to index
@pytest.mark.parametrize(
"query_params",
[
None,
# FIXME
# httpx.QueryParams({"next": "/foo"}),
# httpx.QueryParams({"next": "/bar", "unrelated": "should be hidden"}),
httpx.QueryParams({"next": "/foo"}),
httpx.QueryParams({"next": "/bar", "unrelated": "should be hidden"}),
],
)
def test_login(
def test_login_redirect(
self,
client: TestClient,
keycloak: KeycloakAdmin,
Expand All @@ -124,11 +123,19 @@ def test_login(
else:
assert not redirect_uri.params

# open Keycloak login page
response = httpx.get(response.url)
assert response.status_code == status.HTTP_200_OK
def test_auth_flow(self, client: TestClient, keycloak: KeycloakAdmin):
# try accessing protected endpoint
response = client.get("/", follow_redirects=False)
assert response.status_code == status.HTTP_401_UNAUTHORIZED

# let's login, should redirect to Keycloak login page
response = client.get("/auth/login", follow_redirects=False)
assert response.is_redirect
redirect_target = response.headers["location"]
assert "/realms/bakdata/protocol/openid-connect/auth" in redirect_target

browser.go(str(response.url))
# open Keycloak login form in browser and login
browser.go(redirect_target)
assert browser.title == "Sign in to bakdata"
browser.show_forms()
assert keycloak.connection.username # HACK: fix wrong type annotation
Expand All @@ -143,21 +150,23 @@ def test_login(
# first check the redirect
response = client.get(exc.value.request.url, follow_redirects=False)
assert response.status_code == status.HTTP_307_TEMPORARY_REDIRECT
if query_params:
assert response.headers["location"] == query_params["next"]
else:
assert response.headers["location"] == "/"
assert response.headers["location"] == "/"

# now follow the redirect
response = client.get(response.headers["location"])
assert response.is_success
if query_params:
assert response.read() == f'"{query_params[next]}"'
else:
assert response.read() == b'"Hello test"'
assert response.read() == b'"Hello test"'

# logout user
response = client.get("/auth/logout", follow_redirects=False)
assert response.status_code == status.HTTP_307_TEMPORARY_REDIRECT
assert response.headers["location"] == "/"

# check that endpoint is inaccessible again
response = client.get("/", follow_redirects=False)
assert response.status_code == status.HTTP_401_UNAUTHORIZED

def test_logout(self, client: TestClient):
def test_logout_redirect(self, client: TestClient):
response = client.get("/auth/logout", follow_redirects=False)
assert response.status_code == status.HTTP_307_TEMPORARY_REDIRECT
assert response.headers["location"] == "/"
assert not client.app_state.items() # TODO

0 comments on commit e231db2

Please sign in to comment.