Skip to content

Commit

Permalink
Adds tests for token refresh handling (#48)
Browse files Browse the repository at this point in the history
* Adds tests for token refresh handling

* Optimize imports
  • Loading branch information
djperrefort authored Aug 5, 2024
1 parent e5a0aca commit edcf475
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 5 deletions.
9 changes: 4 additions & 5 deletions keystone_client/authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from __future__ import annotations

from datetime import datetime
from warnings import warn

import jwt
import requests
Expand Down Expand Up @@ -74,20 +73,20 @@ def is_authenticated(self) -> bool:
access_token_refreshable = self.jwt.refresh_expiration > now
return access_token_valid or access_token_refreshable

def get_auth_headers(self, refresh: bool = True, timeout: int = None) -> dict[str, str]:
def get_auth_headers(self, auto_refresh: bool = True, timeout: int = None) -> dict[str, str]:
"""Return headers data for authenticating API requests
The returned dictionary is empty when not authenticated.
Args:
refresh: Automatically refresh the JWT credentials if necessary
auto_refresh: Automatically refresh the JWT credentials if necessary
timeout: Seconds before the token refresh request times out
Returns:
A dictionary with header ata for JWT authentication
"""

if refresh:
if auto_refresh:
self.refresh(timeout=timeout)

if not self.is_authenticated():
Expand Down Expand Up @@ -154,7 +153,7 @@ def refresh(self, force: bool = False, timeout: int = None) -> None:
return

# Alert the user when a refresh is not possible
if self.jwt.refresh_expiration > now:
if self.jwt.refresh_expiration < now:
raise RuntimeError("Refresh token has expired. Login again to continue.")

response = requests.post(
Expand Down
93 changes: 93 additions & 0 deletions tests/authentication/test_authentication_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,3 +188,96 @@ def test_logout_with_no_jwt(self, mock_post: Mock) -> None:

AuthenticationManager(API_HOST).logout()
mock_post.assert_not_called()


class Refresh(TestCase):
"""Test credential refreshing via the `refresh` method"""

def test_refresh_while_not_authenticated(self) -> None:
"""Test the refresh call exits silently when not authenticated"""

manager = AuthenticationManager(API_HOST)
self.assertFalse(manager.is_authenticated())
self.assertIsNone(manager.jwt)

with patch('requests.post') as mock_post:
manager.refresh()
mock_post.assert_not_called()

def test_refresh_with_valid_access_token(self) -> None:
"""Test the refresh call exits silently when not credentials are not expired"""

manager = AuthenticationManager(API_HOST)
manager.login(API_USER, API_PASSWORD)

with patch('requests.post') as mock_post:
manager.refresh()
mock_post.assert_not_called()

def test_refresh_with_valid_access_token_force(self) -> None:
"""Test the refresh call refreshes valid credentials `force=True`"""

manager = AuthenticationManager(API_HOST)
manager.login(API_USER, API_PASSWORD)
refresh_token = manager.jwt.refresh

with patch('requests.post') as mock_post:
manager.refresh(force=True)
mock_post.assert_called_once_with(
manager.refresh_url,
data={'refresh': refresh_token},
timeout=None
)

@patch('requests.post')
def test_refresh_with_expired_access_token(self, mock_post: Mock) -> None:
"""Test refreshing when the access token is expired"""

# Mock a session with an expired token
manager = AuthenticationManager(API_HOST)
manager.jwt = create_token(
access_expires=datetime.now() - timedelta(days=1),
refresh_expires=datetime.now() + timedelta(days=1)
)
refresh_token = manager.jwt.refresh

# Mock response for successful refresh
mock_response = Mock()
mock_response.json.return_value = {"refresh": "new_refresh_token"}
mock_post.return_value = mock_response

manager.refresh()
mock_post.assert_called_once_with(
manager.refresh_url,
data={'refresh': refresh_token},
timeout=None
)

# Check if refresh token was updated
self.assertEqual(manager.jwt.refresh, "new_refresh_token")

@patch('requests.post')
def test_refresh_with_expired_refresh_token(self, mock_post: Mock) -> None:
"""Test refreshing when the refresh token is expired"""

# Mock a session with an expired token
manager = AuthenticationManager(API_HOST)
manager.jwt = create_token(
access_expires=datetime.now() - timedelta(days=1),
refresh_expires=datetime.now() - timedelta(days=1)
)

with self.assertRaisesRegex(RuntimeError, "Refresh token has expired. Login again to continue."):
manager.refresh()

mock_post.assert_not_called()

def test_refresh_error(self) -> None:
"""Test an HTTP error is raised when the credential refresh fails"""

manager = AuthenticationManager(API_HOST)
manager.login(API_USER, API_PASSWORD)

with patch('requests.post') as mock_post, self.assertRaises(requests.HTTPError):
mock_post.side_effect = requests.HTTPError()
manager.refresh(force=True)

0 comments on commit edcf475

Please sign in to comment.