diff --git a/terraso_backend/apps/auth/middleware.py b/terraso_backend/apps/auth/middleware.py index 5cc8fa00d..cd913ca6a 100644 --- a/terraso_backend/apps/auth/middleware.py +++ b/terraso_backend/apps/auth/middleware.py @@ -89,6 +89,9 @@ def _get_user_from_jwt(self, request): except InvalidTokenError as e: logger.exception("Failure to verify JWT token", extra={"token": token}) raise ValidationError(f"Invalid JWT token: {e}") + except ValueError as e: + logger.exception("Not valid JWT token type", extra={"token": token}) + raise ValidationError(f"Invalid JWT token: {e}") user = self._get_user(decoded_payload["sub"]) diff --git a/terraso_backend/apps/auth/services.py b/terraso_backend/apps/auth/services.py index 37ef30c90..640ac64e6 100644 --- a/terraso_backend/apps/auth/services.py +++ b/terraso_backend/apps/auth/services.py @@ -155,7 +155,11 @@ def create_access_token(self, user): def verify_access_token(self, token): decoded = self._verify_token(token) - if not decoded["access"] or not decoded["exp"]: + is_access_token = decoded.get("access", False) + is_test_token = decoded.get("test", False) + has_expiration = decoded.get("exp", False) + can_access = is_access_token and (has_expiration or is_test_token) + if not can_access: raise ValueError("Token is not an access token") return decoded diff --git a/terraso_backend/tests/auth/test_access_tokens.py b/terraso_backend/tests/auth/test_access_tokens.py new file mode 100644 index 000000000..f93b2f97a --- /dev/null +++ b/terraso_backend/tests/auth/test_access_tokens.py @@ -0,0 +1,86 @@ +# Copyright © 2021-2023 Technology Matters +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see https://www.gnu.org/licenses/. +import pytest +from graphene_django.utils.testing import graphql_query + +from apps.auth.services import JWTService + +pytestmark = pytest.mark.django_db + + +@pytest.fixture +def user(users): + return users[0] + + +@pytest.fixture +def access_token(user): + return JWTService().create_access_token(user) + + +@pytest.fixture +def test_access_token(user): + return JWTService().create_test_access_token(user) + + +@pytest.fixture +def invalid_access_token(user): + return JWTService().create_token(user) + + +@pytest.fixture +def token_client_query(client): + def _client_query(token, *args, **kwargs): + headers = { + "CONTENT_TYPE": "application/json", + "HTTP_AUTHORIZATION": f"Bearer {token}", + } + return graphql_query(*args, **kwargs, headers=headers, client=client) + + return _client_query + + +def execute_query(token_client_query, user, token): + query = ( + """ + {users(email: "%s") { + edges { + node { + email + } + } + }} + """ + % user.email + ) + response = token_client_query(token, query) + return response.json() + + +def test_access_token_valid(token_client_query, user, access_token): + response = execute_query(token_client_query, user, access_token) + user_result = response["data"]["users"]["edges"][0]["node"] + assert user_result["email"] == user.email + + +def test_test_access_token_valid(token_client_query, user, test_access_token): + response = execute_query(token_client_query, user, test_access_token) + user_result = response["data"]["users"]["edges"][0]["node"] + assert user_result["email"] == user.email + + +def test_access_token_invalid(token_client_query, user, invalid_access_token): + response = execute_query(token_client_query, user, invalid_access_token) + assert response["error"] == "Unauthorized request"