diff --git a/supertokens_python/recipe/oauth2provider/asyncio/__init__.py b/supertokens_python/recipe/oauth2provider/asyncio/__init__.py index a59c06f5..2e54a714 100644 --- a/supertokens_python/recipe/oauth2provider/asyncio/__init__.py +++ b/supertokens_python/recipe/oauth2provider/asyncio/__init__.py @@ -11,3 +11,231 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. +from __future__ import annotations + +import base64 +from typing import Any, Dict, Union, Optional, List + +from ..interfaces import ( + ActiveTokenResponse, + CreateOAuth2ClientInput, + CreateOAuth2ClientOkResult, + DeleteOAuth2ClientOkResult, + ErrorOAuth2Response, + GetOAuth2ClientOkResult, + GetOAuth2ClientsOkResult, + InactiveTokenResponse, + OAuth2TokenValidationRequirements, + RevokeTokenUsingAuthorizationHeader, + RevokeTokenUsingClientIDAndClientSecret, + TokenInfo, + UpdateOAuth2ClientInput, + UpdateOAuth2ClientOkResult, +) + + +async def get_oauth2_client( + client_id: str, user_context: Optional[Dict[str, Any]] = None +) -> Union[GetOAuth2ClientOkResult, ErrorOAuth2Response]: + if user_context is None: + user_context = {} + from ..recipe import OAuth2ProviderRecipe + + return await OAuth2ProviderRecipe.get_instance().recipe_implementation.get_oauth2_client( + client_id=client_id, user_context=user_context + ) + + +async def get_oauth2_clients( + page_size: Optional[int] = None, + pagination_token: Optional[str] = None, + client_name: Optional[str] = None, + user_context: Optional[Dict[str, Any]] = None, +) -> Union[GetOAuth2ClientsOkResult, ErrorOAuth2Response]: + if user_context is None: + user_context = {} + from ..recipe import OAuth2ProviderRecipe + + return await OAuth2ProviderRecipe.get_instance().recipe_implementation.get_oauth2_clients( + page_size=page_size, + pagination_token=pagination_token, + client_name=client_name, + user_context=user_context, + ) + + +async def create_oauth2_client( + params: CreateOAuth2ClientInput, + user_context: Optional[Dict[str, Any]] = None, +) -> Union[CreateOAuth2ClientOkResult, ErrorOAuth2Response]: + if user_context is None: + user_context = {} + from ..recipe import OAuth2ProviderRecipe + + return await OAuth2ProviderRecipe.get_instance().recipe_implementation.create_oauth2_client( + params=params, + user_context=user_context, + ) + + +async def update_oauth2_client( + params: UpdateOAuth2ClientInput, + user_context: Optional[Dict[str, Any]] = None, +) -> Union[UpdateOAuth2ClientOkResult, ErrorOAuth2Response]: + if user_context is None: + user_context = {} + from ..recipe import OAuth2ProviderRecipe + + return await OAuth2ProviderRecipe.get_instance().recipe_implementation.update_oauth2_client( + params=params, + user_context=user_context, + ) + + +async def delete_oauth2_client( + client_id: str, user_context: Optional[Dict[str, Any]] = None +) -> Union[DeleteOAuth2ClientOkResult, ErrorOAuth2Response]: + if user_context is None: + user_context = {} + from ..recipe import OAuth2ProviderRecipe + + return await OAuth2ProviderRecipe.get_instance().recipe_implementation.delete_oauth2_client( + client_id=client_id, user_context=user_context + ) + + +async def validate_oauth2_access_token( + token: str, + requirements: Optional[OAuth2TokenValidationRequirements] = None, + check_database: Optional[bool] = None, + user_context: Optional[Dict[str, Any]] = None, +) -> Dict[str, Any]: + if user_context is None: + user_context = {} + from ..recipe import OAuth2ProviderRecipe + + return await OAuth2ProviderRecipe.get_instance().recipe_implementation.validate_oauth2_access_token( + token=token, + requirements=requirements, + check_database=check_database, + user_context=user_context, + ) + + +async def create_token_for_client_credentials( + client_id: str, + client_secret: str, + scope: Optional[List[str]] = None, + audience: Optional[str] = None, + user_context: Optional[Dict[str, Any]] = None, +) -> Union[TokenInfo, ErrorOAuth2Response]: + if user_context is None: + user_context = {} + from ..recipe import OAuth2ProviderRecipe + + return ( + await OAuth2ProviderRecipe.get_instance().recipe_implementation.token_exchange( + authorization_header=None, + body={ + "grant_type": "client_credentials", + "client_id": client_id, + "client_secret": client_secret, + "scope": " ".join(scope) if scope else None, + "audience": audience, + }, + user_context=user_context, + ) + ) + + +async def revoke_token( + token: str, + client_id: str, + client_secret: Optional[str] = None, + user_context: Optional[Dict[str, Any]] = None, +) -> Optional[ErrorOAuth2Response]: + if user_context is None: + user_context = {} + from ..recipe import OAuth2ProviderRecipe + + recipe = OAuth2ProviderRecipe.get_instance() + + client_info = await recipe.recipe_implementation.get_oauth2_client( + client_id=client_id, user_context=user_context + ) + + if isinstance(client_info, ErrorOAuth2Response): + raise Exception( + f"Failed to get OAuth2 client with id {client_id}: {client_info.error}" + ) + + token_endpoint_auth_method = client_info.client.token_endpoint_auth_method + + if token_endpoint_auth_method == "none": + auth_header = "Basic " + base64.b64encode(f"{client_id}:".encode()).decode() + return await recipe.recipe_implementation.revoke_token( + RevokeTokenUsingAuthorizationHeader( + token=token, + authorization_header=auth_header, + ), + user_context=user_context, + ) + elif token_endpoint_auth_method == "client_secret_basic" and client_secret: + auth_header = ( + "Basic " + + base64.b64encode(f"{client_id}:{client_secret}".encode()).decode() + ) + return await recipe.recipe_implementation.revoke_token( + RevokeTokenUsingAuthorizationHeader( + token=token, + authorization_header=auth_header, + ), + user_context=user_context, + ) + + return await recipe.recipe_implementation.revoke_token( + RevokeTokenUsingClientIDAndClientSecret( + token=token, + client_id=client_id, + client_secret=client_secret, + ), + user_context=user_context, + ) + + +async def revoke_tokens_by_client_id( + client_id: str, user_context: Optional[Dict[str, Any]] = None +) -> None: + if user_context is None: + user_context = {} + from ..recipe import OAuth2ProviderRecipe + + return await OAuth2ProviderRecipe.get_instance().recipe_implementation.revoke_tokens_by_client_id( + client_id=client_id, user_context=user_context + ) + + +async def revoke_tokens_by_session_handle( + session_handle: str, user_context: Optional[Dict[str, Any]] = None +) -> None: + if user_context is None: + user_context = {} + from ..recipe import OAuth2ProviderRecipe + + return await OAuth2ProviderRecipe.get_instance().recipe_implementation.revoke_tokens_by_session_handle( + session_handle=session_handle, user_context=user_context + ) + + +async def validate_oauth2_refresh_token( + token: str, + scopes: Optional[List[str]] = None, + user_context: Optional[Dict[str, Any]] = None, +) -> Union[ActiveTokenResponse, InactiveTokenResponse]: + if user_context is None: + user_context = {} + from ..recipe import OAuth2ProviderRecipe + + return await OAuth2ProviderRecipe.get_instance().recipe_implementation.introspect_token( + token=token, scopes=scopes, user_context=user_context + ) diff --git a/supertokens_python/recipe/oauth2provider/interfaces.py b/supertokens_python/recipe/oauth2provider/interfaces.py index 0b6fe9c8..8b9f2649 100644 --- a/supertokens_python/recipe/oauth2provider/interfaces.py +++ b/supertokens_python/recipe/oauth2provider/interfaces.py @@ -343,7 +343,7 @@ def __init__(self, token: str, authorization_header: str): class RevokeTokenUsingClientIDAndClientSecret: - def __init__(self, token: str, client_id: str, client_secret: str): + def __init__(self, token: str, client_id: str, client_secret: Optional[str]): self.token = token self.client_id = client_id self.client_secret = client_secret @@ -362,6 +362,595 @@ def to_json(self): return {"active": True, **self.payload} +class OAuth2ClientOptions: + def __init__( + self, + client_id: str, + client_secret: Optional[str], + created_at: str, + updated_at: str, + client_name: str, + scope: str, + redirect_uris: Optional[List[str]], + post_logout_redirect_uris: Optional[List[str]], + authorization_code_grant_access_token_lifespan: Optional[str], + authorization_code_grant_id_token_lifespan: Optional[str], + authorization_code_grant_refresh_token_lifespan: Optional[str], + client_credentials_grant_access_token_lifespan: Optional[str], + implicit_grant_access_token_lifespan: Optional[str], + implicit_grant_id_token_lifespan: Optional[str], + refresh_token_grant_access_token_lifespan: Optional[str], + refresh_token_grant_id_token_lifespan: Optional[str], + refresh_token_grant_refresh_token_lifespan: Optional[str], + token_endpoint_auth_method: str, + audience: Optional[List[str]], + grant_types: Optional[List[str]], + response_types: Optional[List[str]], + client_uri: Optional[str], + logo_uri: Optional[str], + policy_uri: Optional[str], + tos_uri: Optional[str], + metadata: Optional[Dict[str, Any]], + enable_refresh_token_rotation: Optional[bool], + ): + self.client_id = client_id + self.client_secret = client_secret + self.created_at = created_at + self.updated_at = updated_at + self.client_name = client_name + self.scope = scope + self.redirect_uris = redirect_uris + self.post_logout_redirect_uris = post_logout_redirect_uris + self.authorization_code_grant_access_token_lifespan = ( + authorization_code_grant_access_token_lifespan + ) + self.authorization_code_grant_id_token_lifespan = ( + authorization_code_grant_id_token_lifespan + ) + self.authorization_code_grant_refresh_token_lifespan = ( + authorization_code_grant_refresh_token_lifespan + ) + self.client_credentials_grant_access_token_lifespan = ( + client_credentials_grant_access_token_lifespan + ) + self.implicit_grant_access_token_lifespan = implicit_grant_access_token_lifespan + self.implicit_grant_id_token_lifespan = implicit_grant_id_token_lifespan + self.refresh_token_grant_access_token_lifespan = ( + refresh_token_grant_access_token_lifespan + ) + self.refresh_token_grant_id_token_lifespan = ( + refresh_token_grant_id_token_lifespan + ) + self.refresh_token_grant_refresh_token_lifespan = ( + refresh_token_grant_refresh_token_lifespan + ) + self.token_endpoint_auth_method = token_endpoint_auth_method + self.audience = audience + self.grant_types = grant_types + self.response_types = response_types + self.client_uri = client_uri + self.logo_uri = logo_uri + self.policy_uri = policy_uri + self.tos_uri = tos_uri + self.metadata = metadata + self.enable_refresh_token_rotation = enable_refresh_token_rotation + + def to_json(self) -> Dict[str, Any]: + result: Dict[str, Any] = { + "clientId": self.client_id, + "createdAt": self.created_at, + "updatedAt": self.updated_at, + "clientName": self.client_name, + "scope": self.scope, + "tokenEndpointAuthMethod": self.token_endpoint_auth_method, + } + if self.client_secret is not None: + result["clientSecret"] = self.client_secret + if self.redirect_uris is not None: + result["redirectUris"] = self.redirect_uris + if self.post_logout_redirect_uris is not None: + result["postLogoutRedirectUris"] = self.post_logout_redirect_uris + if self.authorization_code_grant_access_token_lifespan is not None: + result["authorizationCodeGrantAccessTokenLifespan"] = ( + self.authorization_code_grant_access_token_lifespan + ) + if self.authorization_code_grant_id_token_lifespan is not None: + result["authorizationCodeGrantIdTokenLifespan"] = ( + self.authorization_code_grant_id_token_lifespan + ) + if self.authorization_code_grant_refresh_token_lifespan is not None: + result["authorizationCodeGrantRefreshTokenLifespan"] = ( + self.authorization_code_grant_refresh_token_lifespan + ) + if self.client_credentials_grant_access_token_lifespan is not None: + result["clientCredentialsGrantAccessTokenLifespan"] = ( + self.client_credentials_grant_access_token_lifespan + ) + if self.implicit_grant_access_token_lifespan is not None: + result["implicitGrantAccessTokenLifespan"] = ( + self.implicit_grant_access_token_lifespan + ) + if self.implicit_grant_id_token_lifespan is not None: + result["implicitGrantIdTokenLifespan"] = ( + self.implicit_grant_id_token_lifespan + ) + if self.refresh_token_grant_access_token_lifespan is not None: + result["refreshTokenGrantAccessTokenLifespan"] = ( + self.refresh_token_grant_access_token_lifespan + ) + if self.refresh_token_grant_id_token_lifespan is not None: + result["refreshTokenGrantIdTokenLifespan"] = ( + self.refresh_token_grant_id_token_lifespan + ) + if self.refresh_token_grant_refresh_token_lifespan is not None: + result["refreshTokenGrantRefreshTokenLifespan"] = ( + self.refresh_token_grant_refresh_token_lifespan + ) + if self.audience is not None: + result["audience"] = self.audience + if self.grant_types is not None: + result["grantTypes"] = self.grant_types + if self.response_types is not None: + result["responseTypes"] = self.response_types + if self.client_uri is not None: + result["clientUri"] = self.client_uri + if self.logo_uri is not None: + result["logoUri"] = self.logo_uri + if self.policy_uri is not None: + result["policyUri"] = self.policy_uri + if self.tos_uri is not None: + result["tosUri"] = self.tos_uri + if self.metadata is not None: + result["metadata"] = self.metadata + if self.enable_refresh_token_rotation is not None: + result["enableRefreshTokenRotation"] = self.enable_refresh_token_rotation + return result + + @staticmethod + def from_json(json: Dict[str, Any]) -> "OAuth2ClientOptions": + return OAuth2ClientOptions( + client_id=json["clientId"], + client_secret=json["clientSecret"], + created_at=json["createdAt"], + updated_at=json["updatedAt"], + client_name=json["clientName"], + scope=json["scope"], + redirect_uris=json["redirectUris"], + post_logout_redirect_uris=json["postLogoutRedirectUris"], + authorization_code_grant_access_token_lifespan=json.get( + "authorizationCodeGrantAccessTokenLifespan" + ), + authorization_code_grant_id_token_lifespan=json.get( + "authorizationCodeGrantIdTokenLifespan" + ), + authorization_code_grant_refresh_token_lifespan=json.get( + "authorizationCodeGrantRefreshTokenLifespan" + ), + client_credentials_grant_access_token_lifespan=json.get( + "clientCredentialsGrantAccessTokenLifespan" + ), + implicit_grant_access_token_lifespan=json.get( + "implicitGrantAccessTokenLifespan" + ), + implicit_grant_id_token_lifespan=json.get("implicitGrantIdTokenLifespan"), + refresh_token_grant_access_token_lifespan=json.get( + "refreshTokenGrantAccessTokenLifespan" + ), + refresh_token_grant_id_token_lifespan=json.get( + "refreshTokenGrantIdTokenLifespan" + ), + refresh_token_grant_refresh_token_lifespan=json.get( + "refreshTokenGrantRefreshTokenLifespan" + ), + token_endpoint_auth_method=json["tokenEndpointAuthMethod"], + audience=json.get("audience"), + grant_types=json.get("grantTypes"), + response_types=json.get("responseTypes"), + client_uri=json.get("clientUri"), + logo_uri=json.get("logoUri"), + policy_uri=json.get("policyUri"), + tos_uri=json.get("tosUri"), + metadata=json.get("metadata"), + enable_refresh_token_rotation=json.get("enableRefreshTokenRotation"), + ) + + +class CreateOAuth2ClientInput: + def __init__( + self, + client_id: Optional[str], + client_secret: Optional[str], + client_name: Optional[str], + scope: Optional[str], + redirect_uris: Optional[List[str]], + post_logout_redirect_uris: Optional[List[str]], + authorization_code_grant_access_token_lifespan: Optional[str], + authorization_code_grant_id_token_lifespan: Optional[str], + authorization_code_grant_refresh_token_lifespan: Optional[str], + client_credentials_grant_access_token_lifespan: Optional[str], + implicit_grant_access_token_lifespan: Optional[str], + implicit_grant_id_token_lifespan: Optional[str], + refresh_token_grant_access_token_lifespan: Optional[str], + refresh_token_grant_id_token_lifespan: Optional[str], + refresh_token_grant_refresh_token_lifespan: Optional[str], + token_endpoint_auth_method: Optional[str], + audience: Optional[List[str]], + grant_types: Optional[List[str]], + response_types: Optional[List[str]], + client_uri: Optional[str], + logo_uri: Optional[str], + policy_uri: Optional[str], + tos_uri: Optional[str], + metadata: Optional[Dict[str, Any]], + enable_refresh_token_rotation: Optional[bool], + ): + self.client_id = client_id + self.client_secret = client_secret + self.client_name = client_name + self.scope = scope + self.redirect_uris = redirect_uris + self.post_logout_redirect_uris = post_logout_redirect_uris + self.authorization_code_grant_access_token_lifespan = ( + authorization_code_grant_access_token_lifespan + ) + self.authorization_code_grant_id_token_lifespan = ( + authorization_code_grant_id_token_lifespan + ) + self.authorization_code_grant_refresh_token_lifespan = ( + authorization_code_grant_refresh_token_lifespan + ) + self.client_credentials_grant_access_token_lifespan = ( + client_credentials_grant_access_token_lifespan + ) + self.implicit_grant_access_token_lifespan = implicit_grant_access_token_lifespan + self.implicit_grant_id_token_lifespan = implicit_grant_id_token_lifespan + self.refresh_token_grant_access_token_lifespan = ( + refresh_token_grant_access_token_lifespan + ) + self.refresh_token_grant_id_token_lifespan = ( + refresh_token_grant_id_token_lifespan + ) + self.refresh_token_grant_refresh_token_lifespan = ( + refresh_token_grant_refresh_token_lifespan + ) + self.token_endpoint_auth_method = token_endpoint_auth_method + self.audience = audience + self.grant_types = grant_types + self.response_types = response_types + self.client_uri = client_uri + self.logo_uri = logo_uri + self.policy_uri = policy_uri + self.tos_uri = tos_uri + self.metadata = metadata + self.enable_refresh_token_rotation = enable_refresh_token_rotation + + def to_json(self) -> Dict[str, Any]: + result: Dict[str, Any] = {} + if self.client_id is not None: + result["clientId"] = self.client_id + if self.client_name is not None: + result["clientName"] = self.client_name + if self.scope is not None: + result["scope"] = self.scope + if self.token_endpoint_auth_method is not None: + result["tokenEndpointAuthMethod"] = self.token_endpoint_auth_method + if self.client_secret is not None: + result["clientSecret"] = self.client_secret + if self.redirect_uris is not None: + result["redirectUris"] = self.redirect_uris + if self.post_logout_redirect_uris is not None: + result["postLogoutRedirectUris"] = self.post_logout_redirect_uris + if self.authorization_code_grant_access_token_lifespan is not None: + result["authorizationCodeGrantAccessTokenLifespan"] = ( + self.authorization_code_grant_access_token_lifespan + ) + if self.authorization_code_grant_id_token_lifespan is not None: + result["authorizationCodeGrantIdTokenLifespan"] = ( + self.authorization_code_grant_id_token_lifespan + ) + if self.authorization_code_grant_refresh_token_lifespan is not None: + result["authorizationCodeGrantRefreshTokenLifespan"] = ( + self.authorization_code_grant_refresh_token_lifespan + ) + if self.client_credentials_grant_access_token_lifespan is not None: + result["clientCredentialsGrantAccessTokenLifespan"] = ( + self.client_credentials_grant_access_token_lifespan + ) + if self.implicit_grant_access_token_lifespan is not None: + result["implicitGrantAccessTokenLifespan"] = ( + self.implicit_grant_access_token_lifespan + ) + if self.implicit_grant_id_token_lifespan is not None: + result["implicitGrantIdTokenLifespan"] = ( + self.implicit_grant_id_token_lifespan + ) + if self.refresh_token_grant_access_token_lifespan is not None: + result["refreshTokenGrantAccessTokenLifespan"] = ( + self.refresh_token_grant_access_token_lifespan + ) + if self.refresh_token_grant_id_token_lifespan is not None: + result["refreshTokenGrantIdTokenLifespan"] = ( + self.refresh_token_grant_id_token_lifespan + ) + if self.refresh_token_grant_refresh_token_lifespan is not None: + result["refreshTokenGrantRefreshTokenLifespan"] = ( + self.refresh_token_grant_refresh_token_lifespan + ) + if self.audience is not None: + result["audience"] = self.audience + if self.grant_types is not None: + result["grantTypes"] = self.grant_types + if self.response_types is not None: + result["responseTypes"] = self.response_types + if self.client_uri is not None: + result["clientUri"] = self.client_uri + if self.logo_uri is not None: + result["logoUri"] = self.logo_uri + if self.policy_uri is not None: + result["policyUri"] = self.policy_uri + if self.tos_uri is not None: + result["tosUri"] = self.tos_uri + if self.metadata is not None: + result["metadata"] = self.metadata + if self.enable_refresh_token_rotation is not None: + result["enableRefreshTokenRotation"] = self.enable_refresh_token_rotation + return result + + @staticmethod + def from_json(json: Dict[str, Any]) -> "CreateOAuth2ClientInput": + return CreateOAuth2ClientInput( + client_id=json.get("clientId"), + client_secret=json.get("clientSecret"), + client_name=json.get("clientName"), + scope=json.get("scope"), + redirect_uris=json.get("redirectUris"), + post_logout_redirect_uris=json.get("postLogoutRedirectUris"), + authorization_code_grant_access_token_lifespan=json.get( + "authorizationCodeGrantAccessTokenLifespan" + ), + authorization_code_grant_id_token_lifespan=json.get( + "authorizationCodeGrantIdTokenLifespan" + ), + authorization_code_grant_refresh_token_lifespan=json.get( + "authorizationCodeGrantRefreshTokenLifespan" + ), + client_credentials_grant_access_token_lifespan=json.get( + "clientCredentialsGrantAccessTokenLifespan" + ), + implicit_grant_access_token_lifespan=json.get( + "implicitGrantAccessTokenLifespan" + ), + implicit_grant_id_token_lifespan=json.get("implicitGrantIdTokenLifespan"), + refresh_token_grant_access_token_lifespan=json.get( + "refreshTokenGrantAccessTokenLifespan" + ), + refresh_token_grant_id_token_lifespan=json.get( + "refreshTokenGrantIdTokenLifespan" + ), + refresh_token_grant_refresh_token_lifespan=json.get( + "refreshTokenGrantRefreshTokenLifespan" + ), + token_endpoint_auth_method=json.get("tokenEndpointAuthMethod"), + audience=json.get("audience"), + grant_types=json.get("grantTypes"), + response_types=json.get("responseTypes"), + client_uri=json.get("clientUri"), + logo_uri=json.get("logoUri"), + policy_uri=json.get("policyUri"), + tos_uri=json.get("tosUri"), + metadata=json.get("metadata"), + enable_refresh_token_rotation=json.get("enableRefreshTokenRotation"), + ) + + +class NotSet: + pass + + +class UpdateOAuth2ClientInput: + def __init__( + self, + client_id: str, + client_secret: Union[Optional[str], NotSet] = NotSet(), + client_name: Union[Optional[str], NotSet] = NotSet(), + scope: Union[Optional[str], NotSet] = NotSet(), + redirect_uris: Union[Optional[List[str]], NotSet] = NotSet(), + post_logout_redirect_uris: Union[Optional[List[str]], NotSet] = NotSet(), + authorization_code_grant_access_token_lifespan: Union[ + Optional[str], NotSet + ] = NotSet(), + authorization_code_grant_id_token_lifespan: Union[ + Optional[str], NotSet + ] = NotSet(), + authorization_code_grant_refresh_token_lifespan: Union[ + Optional[str], NotSet + ] = NotSet(), + client_credentials_grant_access_token_lifespan: Union[ + Optional[str], NotSet + ] = NotSet(), + implicit_grant_access_token_lifespan: Union[Optional[str], NotSet] = NotSet(), + implicit_grant_id_token_lifespan: Union[Optional[str], NotSet] = NotSet(), + refresh_token_grant_access_token_lifespan: Union[ + Optional[str], NotSet + ] = NotSet(), + refresh_token_grant_id_token_lifespan: Union[Optional[str], NotSet] = NotSet(), + refresh_token_grant_refresh_token_lifespan: Union[ + Optional[str], NotSet + ] = NotSet(), + token_endpoint_auth_method: Union[Optional[str], NotSet] = NotSet(), + audience: Union[Optional[List[str]], NotSet] = NotSet(), + grant_types: Union[Optional[List[str]], NotSet] = NotSet(), + response_types: Union[Optional[List[str]], NotSet] = NotSet(), + client_uri: Union[Optional[str], NotSet] = NotSet(), + logo_uri: Union[Optional[str], NotSet] = NotSet(), + policy_uri: Union[Optional[str], NotSet] = NotSet(), + tos_uri: Union[Optional[str], NotSet] = NotSet(), + metadata: Union[Optional[Dict[str, Any]], NotSet] = NotSet(), + enable_refresh_token_rotation: Union[Optional[bool], NotSet] = NotSet(), + ): + self.client_id = client_id + self.client_secret = client_secret + self.client_name = client_name + self.scope = scope + self.redirect_uris = redirect_uris + self.post_logout_redirect_uris = post_logout_redirect_uris + self.authorization_code_grant_access_token_lifespan = ( + authorization_code_grant_access_token_lifespan + ) + self.authorization_code_grant_id_token_lifespan = ( + authorization_code_grant_id_token_lifespan + ) + self.authorization_code_grant_refresh_token_lifespan = ( + authorization_code_grant_refresh_token_lifespan + ) + self.client_credentials_grant_access_token_lifespan = ( + client_credentials_grant_access_token_lifespan + ) + self.implicit_grant_access_token_lifespan = implicit_grant_access_token_lifespan + self.implicit_grant_id_token_lifespan = implicit_grant_id_token_lifespan + self.refresh_token_grant_access_token_lifespan = ( + refresh_token_grant_access_token_lifespan + ) + self.refresh_token_grant_id_token_lifespan = ( + refresh_token_grant_id_token_lifespan + ) + self.refresh_token_grant_refresh_token_lifespan = ( + refresh_token_grant_refresh_token_lifespan + ) + self.token_endpoint_auth_method = token_endpoint_auth_method + self.audience = audience + self.grant_types = grant_types + self.response_types = response_types + self.client_uri = client_uri + self.logo_uri = logo_uri + self.policy_uri = policy_uri + self.tos_uri = tos_uri + self.metadata = metadata + self.enable_refresh_token_rotation = enable_refresh_token_rotation + + def to_json(self) -> Dict[str, Any]: + result: Dict[str, Any] = {} + result["clientId"] = self.client_id + + if not isinstance(self.client_name, NotSet): + result["clientName"] = self.client_name + if not isinstance(self.scope, NotSet): + result["scope"] = self.scope + if not isinstance(self.token_endpoint_auth_method, NotSet): + result["tokenEndpointAuthMethod"] = self.token_endpoint_auth_method + if not isinstance(self.client_secret, NotSet): + result["clientSecret"] = self.client_secret + if not isinstance(self.redirect_uris, NotSet): + result["redirectUris"] = self.redirect_uris + if not isinstance(self.post_logout_redirect_uris, NotSet): + result["postLogoutRedirectUris"] = self.post_logout_redirect_uris + if not isinstance(self.authorization_code_grant_access_token_lifespan, NotSet): + result["authorizationCodeGrantAccessTokenLifespan"] = ( + self.authorization_code_grant_access_token_lifespan + ) + if not isinstance(self.authorization_code_grant_id_token_lifespan, NotSet): + result["authorizationCodeGrantIdTokenLifespan"] = ( + self.authorization_code_grant_id_token_lifespan + ) + if not isinstance(self.authorization_code_grant_refresh_token_lifespan, NotSet): + result["authorizationCodeGrantRefreshTokenLifespan"] = ( + self.authorization_code_grant_refresh_token_lifespan + ) + if not isinstance(self.client_credentials_grant_access_token_lifespan, NotSet): + result["clientCredentialsGrantAccessTokenLifespan"] = ( + self.client_credentials_grant_access_token_lifespan + ) + if not isinstance(self.implicit_grant_access_token_lifespan, NotSet): + result["implicitGrantAccessTokenLifespan"] = ( + self.implicit_grant_access_token_lifespan + ) + if not isinstance(self.implicit_grant_id_token_lifespan, NotSet): + result["implicitGrantIdTokenLifespan"] = ( + self.implicit_grant_id_token_lifespan + ) + if not isinstance(self.refresh_token_grant_access_token_lifespan, NotSet): + result["refreshTokenGrantAccessTokenLifespan"] = ( + self.refresh_token_grant_access_token_lifespan + ) + if not isinstance(self.refresh_token_grant_id_token_lifespan, NotSet): + result["refreshTokenGrantIdTokenLifespan"] = ( + self.refresh_token_grant_id_token_lifespan + ) + if not isinstance(self.refresh_token_grant_refresh_token_lifespan, NotSet): + result["refreshTokenGrantRefreshTokenLifespan"] = ( + self.refresh_token_grant_refresh_token_lifespan + ) + if not isinstance(self.audience, NotSet): + result["audience"] = self.audience + if not isinstance(self.grant_types, NotSet): + result["grantTypes"] = self.grant_types + if self.response_types is not None: + result["responseTypes"] = self.response_types + if not isinstance(self.client_uri, NotSet): + result["clientUri"] = self.client_uri + if not isinstance(self.logo_uri, NotSet): + result["logoUri"] = self.logo_uri + if not isinstance(self.policy_uri, NotSet): + result["policyUri"] = self.policy_uri + if not isinstance(self.tos_uri, NotSet): + result["tosUri"] = self.tos_uri + if not isinstance(self.metadata, NotSet): + result["metadata"] = self.metadata + if not isinstance(self.enable_refresh_token_rotation, NotSet): + result["enableRefreshTokenRotation"] = self.enable_refresh_token_rotation + return result + + @staticmethod + def from_json(json: Dict[str, Any]) -> "UpdateOAuth2ClientInput": + return UpdateOAuth2ClientInput( + client_id=json["clientId"], + client_secret=json.get("clientSecret", NotSet()), + client_name=json.get("clientName", NotSet()), + scope=json.get("scope", NotSet()), + redirect_uris=json.get("redirectUris", NotSet()), + post_logout_redirect_uris=json.get("postLogoutRedirectUris", NotSet()), + authorization_code_grant_access_token_lifespan=json.get( + "authorizationCodeGrantAccessTokenLifespan", NotSet() + ), + authorization_code_grant_id_token_lifespan=json.get( + "authorizationCodeGrantIdTokenLifespan", NotSet() + ), + authorization_code_grant_refresh_token_lifespan=json.get( + "authorizationCodeGrantRefreshTokenLifespan", NotSet() + ), + client_credentials_grant_access_token_lifespan=json.get( + "clientCredentialsGrantAccessTokenLifespan", NotSet() + ), + implicit_grant_access_token_lifespan=json.get( + "implicitGrantAccessTokenLifespan", NotSet() + ), + implicit_grant_id_token_lifespan=json.get( + "implicitGrantIdTokenLifespan", NotSet() + ), + refresh_token_grant_access_token_lifespan=json.get( + "refreshTokenGrantAccessTokenLifespan", NotSet() + ), + refresh_token_grant_id_token_lifespan=json.get( + "refreshTokenGrantIdTokenLifespan", NotSet() + ), + refresh_token_grant_refresh_token_lifespan=json.get( + "refreshTokenGrantRefreshTokenLifespan", NotSet() + ), + token_endpoint_auth_method=json.get("tokenEndpointAuthMethod", NotSet()), + audience=json.get("audience", NotSet()), + grant_types=json.get("grantTypes", NotSet()), + response_types=json.get("responseTypes", NotSet()), + client_uri=json.get("clientUri", NotSet()), + logo_uri=json.get("logoUri", NotSet()), + policy_uri=json.get("policyUri", NotSet()), + tos_uri=json.get("tosUri", NotSet()), + metadata=json.get("metadata", NotSet()), + enable_refresh_token_rotation=json.get( + "enableRefreshTokenRotation", NotSet() + ), + ) + + class RecipeInterface(ABC): @abstractmethod async def authorization( @@ -461,6 +1050,7 @@ async def get_oauth2_client( @abstractmethod async def create_oauth2_client( self, + params: CreateOAuth2ClientInput, user_context: Dict[str, Any], ) -> Union[CreateOAuth2ClientOkResult, ErrorOAuth2Response]: pass @@ -468,6 +1058,7 @@ async def create_oauth2_client( @abstractmethod async def update_oauth2_client( self, + params: UpdateOAuth2ClientInput, user_context: Dict[str, Any], ) -> Union[UpdateOAuth2ClientOkResult, ErrorOAuth2Response]: pass diff --git a/supertokens_python/recipe/oauth2provider/recipe_implementation.py b/supertokens_python/recipe/oauth2provider/recipe_implementation.py index c08ec852..32e134e7 100644 --- a/supertokens_python/recipe/oauth2provider/recipe_implementation.py +++ b/supertokens_python/recipe/oauth2provider/recipe_implementation.py @@ -30,6 +30,7 @@ from supertokens_python.types import RecipeUserId, User from .interfaces import ( + CreateOAuth2ClientInput, FrontendRedirectionURLTypeLogin, FrontendRedirectionURLTypeLogoutConfirmation, FrontendRedirectionURLTypePostLogoutFallback, @@ -44,6 +45,7 @@ CreateOAuth2ClientOkResult, RevokeTokenUsingAuthorizationHeader, RevokeTokenUsingClientIDAndClientSecret, + UpdateOAuth2ClientInput, UpdateOAuth2ClientOkResult, DeleteOAuth2ClientOkResult, ConsentRequest, @@ -570,11 +572,12 @@ async def get_oauth2_client( async def create_oauth2_client( self, + params: CreateOAuth2ClientInput, user_context: Dict[str, Any], ) -> Union[CreateOAuth2ClientOkResult, ErrorOAuth2Response]: response = await self.querier.send_post_request( NormalisedURLPath("/recipe/oauth/clients"), - {}, # Empty dict since no input params in function signature + params.to_json(), user_context=user_context, ) @@ -586,11 +589,12 @@ async def create_oauth2_client( async def update_oauth2_client( self, + params: UpdateOAuth2ClientInput, user_context: Dict[str, Any], ) -> Union[UpdateOAuth2ClientOkResult, ErrorOAuth2Response]: response = await self.querier.send_put_request( NormalisedURLPath("/recipe/oauth/clients"), - {}, # TODO update params + params.to_json(), None, user_context=user_context, ) @@ -672,7 +676,7 @@ async def validate_oauth2_access_token( if response.get("active") is not True: raise Exception("The token is expired, invalid or has been revoked") - return {"status": "OK", "payload": payload} + return payload async def get_requested_scopes( self, @@ -787,7 +791,8 @@ async def revoke_token( request_body["authorizationHeader"] = params.authorization_header else: request_body["client_id"] = params.client_id - request_body["client_secret"] = params.client_secret + if params.client_secret is not None: + request_body["client_secret"] = params.client_secret res = await self.querier.send_post_request( NormalisedURLPath("/recipe/oauth/token/revoke"), diff --git a/supertokens_python/recipe/oauth2provider/syncio/__init__.py b/supertokens_python/recipe/oauth2provider/syncio/__init__.py index a59c06f5..671e614f 100644 --- a/supertokens_python/recipe/oauth2provider/syncio/__init__.py +++ b/supertokens_python/recipe/oauth2provider/syncio/__init__.py @@ -11,3 +11,166 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. +from __future__ import annotations + +from typing import Any, Dict, Union, Optional, List + +from supertokens_python.async_to_sync_wrapper import sync + +from ..interfaces import ( + ActiveTokenResponse, + CreateOAuth2ClientInput, + CreateOAuth2ClientOkResult, + DeleteOAuth2ClientOkResult, + ErrorOAuth2Response, + GetOAuth2ClientOkResult, + GetOAuth2ClientsOkResult, + InactiveTokenResponse, + OAuth2TokenValidationRequirements, + TokenInfo, + UpdateOAuth2ClientInput, + UpdateOAuth2ClientOkResult, +) + + +def get_oauth2_client( + client_id: str, user_context: Optional[Dict[str, Any]] = None +) -> Union[GetOAuth2ClientOkResult, ErrorOAuth2Response]: + if user_context is None: + user_context = {} + + from ..asyncio import get_oauth2_client + + return sync(get_oauth2_client(client_id, user_context)) + + +def get_oauth2_clients( + page_size: Optional[int] = None, + pagination_token: Optional[str] = None, + client_name: Optional[str] = None, + user_context: Optional[Dict[str, Any]] = None, +) -> Union[GetOAuth2ClientsOkResult, ErrorOAuth2Response]: + if user_context is None: + user_context = {} + + from ..asyncio import get_oauth2_clients + + return sync( + get_oauth2_clients(page_size, pagination_token, client_name, user_context) + ) + + +def create_oauth2_client( + params: CreateOAuth2ClientInput, + user_context: Optional[Dict[str, Any]] = None, +) -> Union[CreateOAuth2ClientOkResult, ErrorOAuth2Response]: + if user_context is None: + user_context = {} + from ..asyncio import create_oauth2_client + + return sync(create_oauth2_client(params, user_context)) + + +def update_oauth2_client( + params: UpdateOAuth2ClientInput, + user_context: Optional[Dict[str, Any]] = None, +) -> Union[UpdateOAuth2ClientOkResult, ErrorOAuth2Response]: + if user_context is None: + user_context = {} + + from ..asyncio import update_oauth2_client + + return sync(update_oauth2_client(params, user_context)) + + +def delete_oauth2_client( + client_id: str, user_context: Optional[Dict[str, Any]] = None +) -> Union[DeleteOAuth2ClientOkResult, ErrorOAuth2Response]: + if user_context is None: + user_context = {} + from ..asyncio import delete_oauth2_client + + return sync(delete_oauth2_client(client_id, user_context)) + + +def validate_oauth2_access_token( + token: str, + requirements: Optional[OAuth2TokenValidationRequirements] = None, + check_database: Optional[bool] = None, + user_context: Optional[Dict[str, Any]] = None, +) -> Dict[str, Any]: + if user_context is None: + user_context = {} + + from ..asyncio import validate_oauth2_access_token + + return sync( + validate_oauth2_access_token(token, requirements, check_database, user_context) + ) + + +def create_token_for_client_credentials( + client_id: str, + client_secret: str, + scope: Optional[List[str]] = None, + audience: Optional[str] = None, + user_context: Optional[Dict[str, Any]] = None, +) -> Union[TokenInfo, ErrorOAuth2Response]: + if user_context is None: + user_context = {} + + from ..asyncio import create_token_for_client_credentials + + return sync( + create_token_for_client_credentials( + client_id, client_secret, scope, audience, user_context + ) + ) + + +def revoke_token( + token: str, + client_id: str, + client_secret: Optional[str] = None, + user_context: Optional[Dict[str, Any]] = None, +) -> Optional[ErrorOAuth2Response]: + if user_context is None: + user_context = {} + from ..asyncio import revoke_token + + return sync(revoke_token(token, client_id, client_secret, user_context)) + + +def revoke_tokens_by_client_id( + client_id: str, user_context: Optional[Dict[str, Any]] = None +) -> None: + if user_context is None: + user_context = {} + + from ..asyncio import revoke_tokens_by_client_id + + return sync(revoke_tokens_by_client_id(client_id, user_context)) + + +def revoke_tokens_by_session_handle( + session_handle: str, user_context: Optional[Dict[str, Any]] = None +) -> None: + if user_context is None: + user_context = {} + + from ..asyncio import revoke_tokens_by_session_handle + + return sync(revoke_tokens_by_session_handle(session_handle, user_context)) + + +def validate_oauth2_refresh_token( + token: str, + scopes: Optional[List[str]] = None, + user_context: Optional[Dict[str, Any]] = None, +) -> Union[ActiveTokenResponse, InactiveTokenResponse]: + if user_context is None: + user_context = {} + + from ..asyncio import validate_oauth2_refresh_token + + return sync(validate_oauth2_refresh_token(token, scopes, user_context))