From e231db2c0ce2db5e61690bf037cea1607a7daa79 Mon Sep 17 00:00:00 2001 From: Salomon Popp Date: Thu, 29 Feb 2024 15:59:27 +0000 Subject: [PATCH] Test authentication flow --- tests/test_oauth.py | 51 ++++++++++++++++++++++++++------------------- 1 file changed, 30 insertions(+), 21 deletions(-) diff --git a/tests/test_oauth.py b/tests/test_oauth.py index 235ba06..2bf9a9a 100644 --- a/tests/test_oauth.py +++ b/tests/test_oauth.py @@ -89,21 +89,20 @@ def bar( def test_keycloak_setup(self, keycloak: KeycloakAdmin): assert keycloak.connection.realm_name == "bakdata" - def test_protected_endpoint(self, client: TestClient): - response = client.get("/") + @pytest.mark.parametrize("endpoint", ["/", "/foo", "/bar"]) + def test_protected_endpoint(self, client: TestClient, endpoint: str): + response = client.get(endpoint) assert response.status_code == status.HTTP_401_UNAUTHORIZED - # TODO: test flow: access protected endpoint -> redirect to login -> auth callback -> access resource -> logout -> redirect to index @pytest.mark.parametrize( "query_params", [ None, - # FIXME - # httpx.QueryParams({"next": "/foo"}), - # httpx.QueryParams({"next": "/bar", "unrelated": "should be hidden"}), + httpx.QueryParams({"next": "/foo"}), + httpx.QueryParams({"next": "/bar", "unrelated": "should be hidden"}), ], ) - def test_login( + def test_login_redirect( self, client: TestClient, keycloak: KeycloakAdmin, @@ -124,11 +123,19 @@ def test_login( else: assert not redirect_uri.params - # open Keycloak login page - response = httpx.get(response.url) - assert response.status_code == status.HTTP_200_OK + def test_auth_flow(self, client: TestClient, keycloak: KeycloakAdmin): + # try accessing protected endpoint + response = client.get("/", follow_redirects=False) + assert response.status_code == status.HTTP_401_UNAUTHORIZED + + # let's login, should redirect to Keycloak login page + response = client.get("/auth/login", follow_redirects=False) + assert response.is_redirect + redirect_target = response.headers["location"] + assert "/realms/bakdata/protocol/openid-connect/auth" in redirect_target - browser.go(str(response.url)) + # open Keycloak login form in browser and login + browser.go(redirect_target) assert browser.title == "Sign in to bakdata" browser.show_forms() assert keycloak.connection.username # HACK: fix wrong type annotation @@ -143,21 +150,23 @@ def test_login( # first check the redirect response = client.get(exc.value.request.url, follow_redirects=False) assert response.status_code == status.HTTP_307_TEMPORARY_REDIRECT - if query_params: - assert response.headers["location"] == query_params["next"] - else: - assert response.headers["location"] == "/" + assert response.headers["location"] == "/" # now follow the redirect response = client.get(response.headers["location"]) assert response.is_success - if query_params: - assert response.read() == f'"{query_params[next]}"' - else: - assert response.read() == b'"Hello test"' + assert response.read() == b'"Hello test"' + + # logout user + response = client.get("/auth/logout", follow_redirects=False) + assert response.status_code == status.HTTP_307_TEMPORARY_REDIRECT + assert response.headers["location"] == "/" + + # check that endpoint is inaccessible again + response = client.get("/", follow_redirects=False) + assert response.status_code == status.HTTP_401_UNAUTHORIZED - def test_logout(self, client: TestClient): + def test_logout_redirect(self, client: TestClient): response = client.get("/auth/logout", follow_redirects=False) assert response.status_code == status.HTTP_307_TEMPORARY_REDIRECT assert response.headers["location"] == "/" - assert not client.app_state.items() # TODO