diff --git a/keycloak/oauth.py b/keycloak/oauth.py index cd90d7a..2e6840d 100644 --- a/keycloak/oauth.py +++ b/keycloak/oauth.py @@ -3,6 +3,7 @@ from authlib.common.security import generate_token from authlib.integrations.starlette_client import OAuth, StarletteOAuth2App from authlib.jose import JWTClaims, JsonWebToken, JsonWebKey +from authlib.oauth2.rfc7523 import PrivateKeyJWT from starlette import status from starlette.datastructures import URL @@ -23,7 +24,7 @@ class KeycloakOAuth2: def __init__( self, client_id: str, - client_secret: str, + client_secret: str | bytes, server_metadata_url: str, client_kwargs: dict[str, Any], base_url: str = "/", @@ -32,7 +33,26 @@ def __init__( self.code_verifier = generate_token(48) self._base_url = base_url self._logout_page = logout_target + oauth = OAuth() + + # TODO pass properly + # Generated via `openssl genrsa - out keypair.pem 2048` + with open("keypair.pem", "rb") as f: + client_secret = f.read() + + # Generated via `openssl rsa -in keypair.pem -pubout -out publickey.crt` + with open("publickey.crt", "r") as f: + self.pub = JsonWebKey.import_key( + f.read(), {"kty": "RSA", "use": "sig"} + ).as_dict() + + # TODO call self.keycloak.load_server_metadata() and get token_endpoint + token_endpoint = ( + "http://localhost:8180/realms/daewy/protocol/openid-connect/token" + ) + auth_method = PrivateKeyJWT(token_endpoint) + oauth.register( name="keycloak", # client_id and client_secret are created in keycloak @@ -41,7 +61,10 @@ def __init__( server_metadata_url=server_metadata_url, client_kwargs=client_kwargs, code_challenge_method="S256", + client_auth_methods=[auth_method], + token_endpoint_auth_method=auth_method.name, ) + assert isinstance(oauth.keycloak, StarletteOAuth2App) self.keycloak = oauth.keycloak @@ -53,6 +76,10 @@ def setup_fastapi_routes(self) -> None: self.router.add_api_route("/login", self.login_page) self.router.add_api_route("/callback", self.auth) self.router.add_api_route("/logout", self.logout) + self.router.add_api_route("/certs", self.public_keys) + + async def public_keys(self) -> dict[str, Any]: + return {"keys": [self.pub]} async def login_page( self, request: Request, redirect_target: str | None = None