Skip to content

Commit

Permalink
Fix typing in main module
Browse files Browse the repository at this point in the history
  • Loading branch information
athornton committed Dec 19, 2023
1 parent d707638 commit ca6f125
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 14 deletions.
3 changes: 2 additions & 1 deletion giftless/auth/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ def get_authz_header(self, identity: Identity, org: str, repo: str, actions: Opt

class Authentication:

def __init__(self, app=None, default_identity: Identity = None):
def __init__(self, app=None,
default_identity: Optional[Identity] = None) -> None:
self._default_identity = default_identity
self._authenticators: List[Authenticator] = []
self._unauthorized_handler: Optional[Callable] = None
Expand Down
3 changes: 2 additions & 1 deletion giftless/auth/identity.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from abc import ABC
from abc import ABC, abstractmethod
from collections import defaultdict
from enum import Enum
from typing import Dict, Optional, Set
Expand Down Expand Up @@ -29,6 +29,7 @@ class Identity(ABC):
id: Optional[str] = None
email: Optional[str] = None

@abstractmethod
def is_authorized(self, organization: str, repo: str, permission: Permission, oid: Optional[str] = None) -> bool:
"""Tell if user is authorized to perform an operation on an object / repo
"""
Expand Down
14 changes: 10 additions & 4 deletions giftless/auth/jwt.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import jwt
from dateutil.tz import UTC
from flask import Request
from werkzeug.http import parse_authorization_header
from werkzeug import Authorization

from giftless.auth import PreAuthorizedActionAuthenticator, Unauthorized
from giftless.auth.identity import DefaultIdentity, Identity, Permission
Expand Down Expand Up @@ -187,7 +187,13 @@ def _generate_token(self, **kwargs) -> str:
if self.key_id:
headers['kid'] = self.key_id

return jwt.encode(payload, self.private_key, algorithm=self.algorithm, headers=headers)
# This is weird. The jwt 2.x docs say this is `str`. That's a
# change from 1.x, where it was `bytes`. But the typing still
# seems to think it's bytes. So...
token = jwt.encode(payload, self.private_key, algorithm=self.algorithm, headers=headers)
if type(token) is str:
return token # type: ignore
return token.decode('ascii')

def _authenticate(self, request: Request) -> Any:
"""Authenticate a request
Expand Down Expand Up @@ -232,10 +238,10 @@ def _get_token_from_headers(self, request: Request) -> Optional[str]:
self._log.debug("Found token in Authorization: Bearer header")
return payload
elif authz_type.lower() == 'basic' and self.basic_auth_user:
parsed_header = parse_authorization_header(header)
parsed_header = Authorization.from_header(header)
if parsed_header and parsed_header.username == self.basic_auth_user:
self._log.debug("Found token in Authorization: Basic header")
return parsed_header.password
return str(parsed_header.password)

return None

Expand Down
17 changes: 16 additions & 1 deletion giftless/storage/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import mimetypes
from abc import ABC
from abc import ABC, abstractmethod
from typing import Any, BinaryIO, Dict, Iterable, Optional

from . import exc
Expand All @@ -10,6 +10,7 @@ class VerifiableStorage(ABC):
All streaming backends should be 'verifiable'.
"""
@abstractmethod
def verify_object(self, prefix: str, oid: str, size: int) -> bool:
"""Check that object exists and has the right size
Expand All @@ -21,21 +22,27 @@ def verify_object(self, prefix: str, oid: str, size: int) -> bool:
class StreamingStorage(VerifiableStorage, ABC):
"""Interface for streaming storage adapters
"""
@abstractmethod
def get(self, prefix: str, oid: str) -> Iterable[bytes]:
pass

@abstractmethod
def put(self, prefix: str, oid: str, data_stream: BinaryIO) -> int:
pass

@abstractmethod
def exists(self, prefix: str, oid: str) -> bool:
pass

@abstractmethod
def get_size(self, prefix: str, oid: str) -> int:
pass

@abstractmethod
def get_mime_type(self, prefix: str, oid: str) -> Optional[str]:
return "application/octet-stream"

@abstractmethod
def verify_object(self, prefix: str, oid: str, size: int):
"""Verify that an object exists
"""
Expand All @@ -48,17 +55,21 @@ def verify_object(self, prefix: str, oid: str, size: int):
class ExternalStorage(VerifiableStorage, ABC):
"""Interface for streaming storage adapters
"""
@abstractmethod
def get_upload_action(self, prefix: str, oid: str, size: int, expires_in: int,
extra: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
pass

@abstractmethod
def get_download_action(self, prefix: str, oid: str, size: int, expires_in: int,
extra: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
pass

@abstractmethod
def exists(self, prefix: str, oid: str) -> bool:
pass

@abstractmethod
def get_size(self, prefix: str, oid: str) -> int:
pass

Expand All @@ -70,17 +81,21 @@ def verify_object(self, prefix: str, oid: str, size: int) -> bool:


class MultipartStorage(VerifiableStorage, ABC):
@abstractmethod
def get_multipart_actions(self, prefix: str, oid: str, size: int, part_size: int, expires_in: int,
extra: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
pass

@abstractmethod
def get_download_action(self, prefix: str, oid: str, size: int, expires_in: int,
extra: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
pass

@abstractmethod
def exists(self, prefix: str, oid: str) -> bool:
pass

@abstractmethod
def get_size(self, prefix: str, oid: str) -> int:
pass

Expand Down
9 changes: 5 additions & 4 deletions giftless/storage/google_cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
import json
import posixpath
from datetime import timedelta
from typing import Any, BinaryIO, Dict, Optional
from typing import Any, BinaryIO, Dict, Optional, Union

import google.auth
import google.auth # type: ignore
from google.auth import impersonated_credentials
from google.cloud import storage # type: ignore
from google.oauth2 import service_account # type: ignore
Expand Down Expand Up @@ -109,8 +109,9 @@ def _get_blob_path(self, prefix: str, oid: str) -> str:

def _get_signed_url(self, prefix: str, oid: str, expires_in: int, http_method: str = 'GET',
filename: Optional[str] = None, disposition: Optional[str] = None) -> str:
creds = self.credentials
creds: Optional[Union[service_account.Credentials, impersonated_credentials.Credentials]] = self.credentials
if creds is None:
# Try Workload Identity
creds = self._get_workload_identity_credentials(expires_in)
bucket = self.storage_client.bucket(self.bucket_name)
blob = bucket.blob(self._get_blob_path(prefix, oid))
Expand All @@ -137,7 +138,7 @@ def _load_credentials(account_key_file: Optional[str], account_key_base64: Optio
else:
return None # Will use Workload Identity if available

def _get_workload_identity_credentials(self, expires_in: int) -> None:
def _get_workload_identity_credentials(self, expires_in: int) -> impersonated_credentials.Credentials:
lifetime = expires_in
if lifetime > 3600:
lifetime = 3600 # Signing credentials are good for one hour max
Expand Down
6 changes: 3 additions & 3 deletions giftless/storage/local_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class LocalStorage(StreamingStorage, MultipartStorage, ViewProvider):
While it can be used in production, large scale deployment will most likely
want to use a more scalable solution such as one of the cloud storage backends.
"""
def __init__(self, path: str = None, **_):
def __init__(self, path: Optional[str] = None, **_) -> None:
if path is None:
path = 'lfs-storage'
self.path = path
Expand Down Expand Up @@ -49,11 +49,11 @@ def get_mime_type(self, prefix: str, oid: str) -> str:

def get_multipart_actions(self, prefix: str, oid: str, size: int, part_size: int, expires_in: int,
extra: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
return super().get_multipart_actions(prefix, oid, size, part_size, expires_in, extra)
return {}

def get_download_action(self, prefix: str, oid: str, size: int, expires_in: int,
extra: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
return super().get_download_action(prefix, oid, size, expires_in, extra)
return {}

def register_views(self, app):
super().register_views(app)
Expand Down

0 comments on commit ca6f125

Please sign in to comment.