Skip to content

Commit

Permalink
Create Poetry package (#1)
Browse files Browse the repository at this point in the history
  • Loading branch information
disrupted authored Feb 19, 2024
1 parent 8e7c36e commit fbd340d
Show file tree
Hide file tree
Showing 6 changed files with 913 additions and 1 deletion.
21 changes: 21 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
MIT License

Copyright (c) 2024 bakdata

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
79 changes: 78 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,78 @@
# starlette-keycloak
# python-keycloak-oauth

Keycloak OAuth client for Python projects with optional integrations for [FastAPI](https://github.com/tiangolo/fastapi) & [Starlette-Admin](https://github.com/jowilf/starlette-admin).

## Getting started

### FastAPI

```sh
pip install keycloak-oauth[fastapi]
```

```python
from fastapi import FastAPI
from backend.settings import settings, BASE_URL # secrets
from keycloak.oauth import KeycloakOAuth2

keycloak = KeycloakOAuth2(
client_id=settings.keycloak.client_id,
client_secret=settings.keycloak.client_secret,
server_metadata_url=str(settings.keycloak.server_metadata_url),
client_kwargs=settings.keycloak.client_kwargs,
base_url=BASE_URL,
)
# create router and register API endpoints
keycloak.setup_fastapi_routes()

app = FastAPI()
app.include_router(keycloak.router, prefix="/auth")
```

We now expose the API endpoints for Keycloak:

- `/auth/login`: redirect to Keycloak login page
- `/auth/callback`: authorize user with Keycloak access token
- `/auth/logout`: deauthorize user and redirect to the logout page

### Starlette-Admin

```sh
pip install keycloak-oauth[starlette-admin]
```

```python
from starlette_admin.contrib.sqla import Admin
from backend.settings import settings, BASE_URL # secrets
from keycloak.oauth import KeycloakOAuth2
from keycloak.starlette_admin import KeycloakAuthProvider

keycloak = KeycloakOAuth2(
client_id=settings.keycloak.client_id,
client_secret=settings.keycloak.client_secret,
server_metadata_url=str(settings.keycloak.server_metadata_url),
client_kwargs=settings.keycloak.client_kwargs,
base_url=BASE_URL,
)

admin = Admin(
# engine,
title=...,
base_url=BASE_URL,
auth_provider=KeycloakAuthProvider(keycloak),
)
```

## Development

If you want to contribute to this project, you can simply clone the repository and run `poetry install`.

Please also run `pre-commit install` for linting and enforcing a consistent code style.

## Contributing

We are happy if you want to contribute to this project. If you find any bugs or have suggestions for improvements, please open an issue. We are also happy to accept your PRs. Just open an issue beforehand and let us know what you want to do and why.

## License

This project is licensed under the MIT license. Have a look at the [LICENSE](LICENSE.md) for more details.
108 changes: 108 additions & 0 deletions keycloak/oauth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
from typing import Any
import pydantic
from authlib.integrations.starlette_client import OAuth, StarletteOAuth2App
from authlib.jose import JWTClaims, JsonWebToken, JsonWebKey

from starlette import status
from starlette.datastructures import URL
from starlette.exceptions import HTTPException
from starlette.requests import Request
from starlette.responses import RedirectResponse


class User(pydantic.BaseModel):
name: str
email: pydantic.EmailStr | None
roles: list[str]
"""Complete access token. Required for token propagation."""
token: str


class KeycloakOAuth2:
def __init__(
self,
client_id: str,
client_secret: str,
server_metadata_url: str,
client_kwargs: dict[str, Any],
base_url: str = "/",
logout_target: str = "/",
) -> None:
self._base_url = base_url
self._logout_page = logout_target
oauth = OAuth()
oauth.register(
name="keycloak",
# client_id and client_secret are created in keycloak
client_id=client_id,
client_secret=client_secret,
server_metadata_url=server_metadata_url,
client_kwargs=client_kwargs,
)
assert isinstance(oauth.keycloak, StarletteOAuth2App)
self.keycloak = oauth.keycloak

def setup_fastapi_routes(self) -> None:
"""Create FastAPI router and register API endpoints."""
import fastapi

self.router = fastapi.APIRouter()
self.router.add_api_route("/login", self.login_page)
self.router.add_api_route("/callback", self.auth)
self.router.add_api_route("/logout", self.logout)

async def login_page(
self, request: Request, redirect_target: str | None = None
) -> RedirectResponse:
"""Redirect to Keycloak login page."""
redirect_uri = (
URL(redirect_target)
if redirect_target
else request.url_for("auth") # /auth/callback
)
if next := request.query_params.get("next"):
redirect_uri = redirect_uri.include_query_params(next=next)
return await self.keycloak.authorize_redirect(request, redirect_uri)

async def auth(self, request: Request) -> RedirectResponse:
"""Authorize user with Keycloak access token."""
token = await self.keycloak.authorize_access_token(request)
claims = await self.parse_claims(token)
user = User(
name=claims["preferred_username"],
email=claims.get("email"),
roles=claims["realm_access"]["roles"],
token=token["access_token"],
)
request.session["user"] = user.model_dump(mode="json")
redirect_uri = request.query_params.get("next") or self._base_url
return RedirectResponse(redirect_uri)

async def parse_claims(self, token: dict[str, Any]) -> JWTClaims:
metadata = await self.keycloak.load_server_metadata()
alg_values: list[str] = metadata.get(
"id_token_signing_alg_values_supported"
) or ["RS256"]
jwt = JsonWebToken(alg_values)
jwk_set = await self.keycloak.fetch_jwk_set()
claims = jwt.decode(
token["access_token"],
key=JsonWebKey.import_key_set(jwk_set),
)
return claims

async def logout(self, request: Request) -> RedirectResponse:
"""Deauthorize user and redirect to logout page."""
request.session.pop("user", None)
return RedirectResponse(self._logout_page)

@staticmethod
async def get_user(request: Request) -> User:
if (user := request.session.get("user")) is not None:
return User.model_validate(user)
else:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
63 changes: 63 additions & 0 deletions keycloak/starlette_admin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
from typing import Sequence
from starlette.exceptions import HTTPException
from starlette.requests import Request
from starlette.responses import RedirectResponse
from starlette.routing import Route
from starlette_admin.auth import AdminUser, AuthProvider, login_not_required
from starlette_admin.base import BaseAdmin
from keycloak.oauth import KeycloakOAuth2, User


class KeycloakAuthProvider(AuthProvider):
def __init__(
self,
keycloak: KeycloakOAuth2,
login_path: str = "/login",
logout_path: str = "/logout",
allow_paths: Sequence[str] | None = None,
allow_routes: Sequence[str] | None = None,
) -> None:
self.keycloak = keycloak
super().__init__(login_path, logout_path, allow_paths, allow_routes)

async def is_authenticated(self, request: Request) -> bool:
try:
user = await self.keycloak.get_user(request)
request.state.user = user
return True
except HTTPException:
return False

def get_admin_user(self, request: Request) -> AdminUser | None:
user: User = request.state.user
return AdminUser(
username=user.name,
# photo_url=user.avatar, # TODO
)

async def render_login(
self, request: Request, admin: BaseAdmin
) -> RedirectResponse:
redirect_uri = request.url_for(admin.route_name + ":authorize_keycloak")
return await self.keycloak.login_page(request, str(redirect_uri))

async def render_logout(
self, request: Request, admin: BaseAdmin
) -> RedirectResponse:
return await self.keycloak.logout(request)

@login_not_required
async def handle_auth_callback(self, request: Request) -> RedirectResponse:
return await self.keycloak.auth(request)

def setup_admin(self, admin: BaseAdmin) -> None:
super().setup_admin(admin)
"""add custom authentication callback route"""
admin.routes.append(
Route(
"/auth/callback",
self.handle_auth_callback,
methods=["GET"],
name="authorize_keycloak",
)
)
Loading

0 comments on commit fbd340d

Please sign in to comment.