Skip to content

Commit

Permalink
fix: Tokens in config are no longer ignored when there are tokens in …
Browse files Browse the repository at this point in the history
…the environment, part of refactoring in preparation for app token refreshing (#284)

This PR continues the work started in
#281, completing the major
refactor. This is accomplished by creating PersonalTokenManager and
AppTokenManager classes, moving logic to them, and adding tests.

After this PR, I'll be able to easily implement refreshing for app
tokens and other app token related features.

In additional to the added unit tests, I have tested running an
extractor on top of the proposed code, in a couple scenarios:

with auth_token? | with additional_auth_tokens? | with personal tokens
in env? | with app token? | # tokens that should be used | # tokens used
in test run
--|--|--|--|--|--
no | no | no | yes | 1 | 1 ✅ 
yes | yes (1) | no | yes | 3 | 3 ✅ 
no | yes (2) | no | yes | 3 | 3 ✅ 

I am not easily able to test the case of personal tokens being detected
from the environment, just because changing the name of environment
variables is not simple in my set up.

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
TrishGillett and pre-commit-ci[bot] authored Aug 9, 2024
1 parent b6074c2 commit aade338
Show file tree
Hide file tree
Showing 2 changed files with 408 additions and 35 deletions.
129 changes: 95 additions & 34 deletions tap_github/authenticator.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
import logging
import time
from copy import deepcopy
from datetime import datetime
from datetime import datetime, timedelta
from os import environ
from random import choice, shuffle
from typing import Any, Dict, List, Optional, Set
from typing import Any, Dict, List, Optional, Set, Tuple

import jwt
import requests
Expand All @@ -15,7 +15,9 @@


class TokenManager:
"""A class to store a token's attributes and state."""
"""A class to store a token's attributes and state.
This parent class should not be used directly, use a subclass instead.
"""

DEFAULT_RATE_LIMIT = 5000
# The DEFAULT_RATE_LIMIT_BUFFER buffer serves two purposes:
Expand All @@ -25,7 +27,7 @@ class TokenManager:

def __init__(
self,
token: str,
token: Optional[str],
rate_limit_buffer: Optional[int] = None,
logger: Optional[Any] = None,
):
Expand All @@ -50,6 +52,9 @@ def update_rate_limit(self, response_headers: Any) -> None:

def is_valid_token(self) -> bool:
"""Try making a request with the current token. If the request succeeds return True, else False."""
if not self.token:
return False

try:
response = requests.get(
url="https://api.github.com/rate_limit",
Expand All @@ -61,7 +66,7 @@ def is_valid_token(self) -> bool:
return True
except requests.exceptions.HTTPError:
msg = (
f"A token was dismissed. "
f"A token could not be validated. "
f"{response.status_code} Client Error: "
f"{str(response.content)} (Reason: {response.reason})"
)
Expand All @@ -70,7 +75,7 @@ def is_valid_token(self) -> bool:
return False

def has_calls_remaining(self) -> bool:
"""Check if token is valid.
"""Check if a token has capacity to make more calls.
Returns:
True if the token is valid and has enough api calls remaining.
Expand All @@ -85,6 +90,14 @@ def has_calls_remaining(self) -> bool:
return True


class PersonalTokenManager(TokenManager):
"""A class to store token rate limiting information."""

def __init__(self, token: str, rate_limit_buffer: Optional[int] = None, **kwargs):
"""Init PersonalTokenRateLimit info."""
super().__init__(token, rate_limit_buffer=rate_limit_buffer, **kwargs)


def generate_jwt_token(
github_app_id: str,
github_private_key: str,
Expand All @@ -110,7 +123,8 @@ def generate_app_access_token(
github_app_id: str,
github_private_key: str,
github_installation_id: Optional[str] = None,
) -> str:
) -> Tuple[str, datetime]:
produced_at = datetime.now()
jwt_token = generate_jwt_token(github_app_id, github_private_key)

headers = {"Authorization": f"Bearer {jwt_token}"}
Expand All @@ -135,14 +149,71 @@ def generate_app_access_token(
if resp.status_code != 201:
resp.raise_for_status()

return resp.json()["token"]
expires_at = produced_at + timedelta(hours=1)
return resp.json()["token"], expires_at


class AppTokenManager(TokenManager):
"""A class to store an app token's attributes and state, and handle token refreshing"""

DEFAULT_RATE_LIMIT = 15000
DEFAULT_EXPIRY_BUFFER_MINS = 10

def __init__(self, env_key: str, rate_limit_buffer: Optional[int] = None, **kwargs):
if rate_limit_buffer is None:
rate_limit_buffer = self.DEFAULT_RATE_LIMIT_BUFFER
super().__init__(None, rate_limit_buffer=rate_limit_buffer, **kwargs)

parts = env_key.split(";;")
self.github_app_id = parts[0]
self.github_private_key = (parts[1:2] or [""])[0].replace("\\n", "\n")
self.github_installation_id: Optional[str] = (parts[2:3] or [""])[0]

self.token_expires_at: Optional[datetime] = None
self.claim_token()

def claim_token(self):
"""Updates the TokenManager's token and token_expires_at attributes.
The outcome will be _either_ that self.token is updated to a newly claimed valid token and
self.token_expires_at is updated to the anticipated expiry time (erring on the side of an early estimate)
_or_ self.token and self.token_expires_at are both set to None.
"""
self.token = None
self.token_expires_at = None

# Make sure we have the details we need
if not self.github_app_id or not self.github_private_key:
raise ValueError(
"GITHUB_APP_PRIVATE_KEY could not be parsed. The expected format is "
'":app_id:;;-----BEGIN RSA PRIVATE KEY-----\\n_YOUR_P_KEY_\\n-----END RSA PRIVATE KEY-----"'
)

self.token, self.token_expires_at = generate_app_access_token(
self.github_app_id, self.github_private_key, self.github_installation_id
)

# Check if the token isn't valid. If not, overwrite it with None
if not self.is_valid_token():
if self.logger:
self.logger.warning(
"An app token was generated but could not be validated."
)
self.token = None
self.token_expires_at = None


class GitHubTokenAuthenticator(APIAuthenticatorBase):
"""Base class for offloading API auth."""

@staticmethod
def get_env():
return dict(environ)

def prepare_tokens(self) -> List[TokenManager]:
# Save GitHub tokens
"""Prep GitHub tokens"""

env_dict = self.get_env()
rate_limit_buffer = self._config.get("rate_limit_buffer", None)

personal_tokens: Set[str] = set()
Expand All @@ -156,52 +227,42 @@ def prepare_tokens(self) -> List[TokenManager]:
# Accept multiple tokens using environment variables GITHUB_TOKEN*
env_tokens = {
value
for key, value in environ.items()
for key, value in env_dict.items()
if key.startswith("GITHUB_TOKEN")
}
if len(env_tokens) > 0:
self.logger.info(
f"Found {len(env_tokens)} 'GITHUB_TOKEN' environment variables for authentication."
)
personal_tokens = env_tokens
personal_tokens = personal_tokens.union(env_tokens)

token_managers: List[TokenManager] = []
for token in personal_tokens:
token_manager = TokenManager(
token_manager = PersonalTokenManager(
token, rate_limit_buffer=rate_limit_buffer, logger=self.logger
)
if token_manager.is_valid_token():
token_managers.append(token_manager)
else:
logging.warn("A token was dismissed.")

# Parse App level private key and generate a token
if "GITHUB_APP_PRIVATE_KEY" in environ.keys():
if "GITHUB_APP_PRIVATE_KEY" in env_dict.keys():
# To simplify settings, we use a single env-key formatted as follows:
# "{app_id};;{-----BEGIN RSA PRIVATE KEY-----\n_YOUR_PRIVATE_KEY_\n-----END RSA PRIVATE KEY-----}"
parts = environ["GITHUB_APP_PRIVATE_KEY"].split(";;")
github_app_id = parts[0]
github_private_key = (parts[1:2] or [""])[0].replace("\\n", "\n")
github_installation_id = (parts[2:3] or [""])[0]

if not (github_private_key):
self.logger.warning(
"GITHUB_APP_PRIVATE_KEY could not be parsed. The expected format is "
'":app_id:;;-----BEGIN RSA PRIVATE KEY-----\n_YOUR_P_KEY_\n-----END RSA PRIVATE KEY-----"'
env_key = env_dict["GITHUB_APP_PRIVATE_KEY"]
try:
app_token_manager = AppTokenManager(
env_key, rate_limit_buffer=rate_limit_buffer, logger=self.logger
)

else:
app_token = generate_app_access_token(
github_app_id, github_private_key, github_installation_id or None
)
token_manager = TokenManager(
app_token, rate_limit_buffer=rate_limit_buffer, logger=self.logger
if app_token_manager.is_valid_token():
token_managers.append(app_token_manager)
except ValueError as e:
self.logger.warn(
f"An error was thrown while preparing an app token: {e}"
)
if token_manager.is_valid_token():
token_managers.append(token_manager)

self.logger.info(f"Tap will run with {len(token_managers)} auth tokens")

# Create a dict of TokenManager
# TODO - separate app_token and add logic to refresh the token using generate_app_access_token.
return token_managers

def __init__(self, stream: RESTStream) -> None:
Expand Down
Loading

0 comments on commit aade338

Please sign in to comment.