Skip to content

Commit

Permalink
Merge pull request #1028 from Aiven-Open/jjaakola-aiven-handle-urlenc…
Browse files Browse the repository at this point in the history
…oded-forward-slash-in-subject

feat: handle urlencoded forward slash in subject
  • Loading branch information
nosahama authored Jan 24, 2025
2 parents 3925a4b + 2876794 commit b21230c
Show file tree
Hide file tree
Showing 12 changed files with 747 additions and 744 deletions.
93 changes: 92 additions & 1 deletion src/karapace/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
See LICENSE for details
"""

from typing import Literal
from aiohttp import BasicAuth, ClientSession
from collections.abc import Awaitable, Callable, Mapping
from karapace.typing import JsonData
from urllib.parse import urljoin
from urllib.parse import urljoin, quote_plus

import logging
import ssl
Expand Down Expand Up @@ -115,6 +116,7 @@ async def delete(
path: Path,
headers: Headers | None = None,
auth: BasicAuth | None = None,
params: Mapping[str, str] | None = None,
) -> Result:
path = self.path_for(path)
if not headers:
Expand All @@ -125,6 +127,7 @@ async def delete(
headers=headers,
auth=auth,
ssl=self.ssl_mode,
params=params,
) as res:
json_result = {} if res.status == 204 else await res.json()
return Result(res.status, json_result, headers=res.headers)
Expand All @@ -135,6 +138,7 @@ async def post(
json: JsonData,
headers: Headers | None = None,
auth: BasicAuth | None = None,
params: Mapping[str, str] | None = None,
) -> Result:
path = self.path_for(path)
if not headers:
Expand All @@ -147,6 +151,7 @@ async def post(
auth=auth,
json=json,
ssl=self.ssl_mode,
params=params,
) as res:
json_result = {} if res.status == 204 else await res.json()
return Result(res.status, json_result, headers=res.headers)
Expand Down Expand Up @@ -191,3 +196,89 @@ async def put_with_data(
) as res:
json_result = await res.json()
return Result(res.status, json_result, headers=res.headers)

# Per resource functions
# COMPATIBILITY
async def post_compatibility_subject_version(
self, *, subject: str, version: int | Literal["latest"], json: JsonData
) -> Result:
return await self.post(
path=f"compatibility/subjects/{quote_plus(subject)}/versions/{version}",
json=json,
)

# CONFIG
async def get_config(self) -> Result:
return await self.get(path="/config")

async def put_config(self, *, json: JsonData) -> Result:
return await self.put(path="/config", json=json)

async def get_config_subject(self, *, subject: str, defaultToGlobal: bool | None = None) -> Result:
path = f"/config/{quote_plus(subject)}"
if defaultToGlobal is not None:
path = f"{path}?defaultToGlobal={str(defaultToGlobal).lower()}"
return await self.get(path=path)

async def put_config_subject(self, *, subject: str, json: JsonData) -> Result:
return await self.put(path=f"config/{quote_plus(subject)}", json=json)

async def delete_config_subject(self, *, subject: str) -> Result:
return await self.delete(path=f"config/{quote_plus(subject)}")

# MODE
async def get_mode(self) -> Result:
return await self.get(path="/mode")

async def get_mode_subject(self, *, subject: str) -> Result:
return await self.get(path=f"/mode/{quote_plus(subject)}")

# SCHEMAS
async def get_schemas(self) -> Result:
return await self.get("/schemas")

async def get_types(self) -> Result:
return await self.get(path="/schemas/types")

async def get_schema_by_id(self, *, schema_id: int, params: Mapping[str, str] | None = None) -> Result:
return await self.get(path=f"/schemas/ids/{schema_id}", params=params)

async def get_schema_by_id_versions(self, *, schema_id: int, params: Mapping[str, str] | None = None) -> Result:
return await self.get(path=f"/schemas/ids/{schema_id}/versions", params=params)

# SUBJECTS
async def get_subjects(self, *, params: Mapping[str, str] | None = None) -> Result:
return await self.get("/subjects", params=params)

async def get_subjects_versions(self, *, subject: str) -> Result:
return await self.get(f"subjects/{quote_plus(subject)}/versions")

async def post_subjects(self, *, subject: str, json: JsonData, params: Mapping[str, str] | None = None) -> Result:
return await self.post(f"/subjects/{quote_plus(subject)}", json=json, params=params)

async def post_subjects_versions(
self, *, subject: str, json: JsonData, params: Mapping[str, str] | None = None
) -> Result:
return await self.post(f"/subjects/{quote_plus(subject)}/versions", json=json, params=params)

async def get_subjects_subject_version(
self, *, subject: str, version: int | Literal["latest"], params: Mapping[str, str] | None = None
) -> Result:
return await self.get(f"/subjects/{quote_plus(subject)}/versions/{version}", params=params)

async def get_subjects_subject_version_schema(self, *, subject: str, version: int | Literal["latest"]) -> Result:
return await self.get(f"subjects/{quote_plus(subject)}/versions/{version}/schema")

async def get_subjects_subject_version_referenced_by(self, *, subject: str, version: int | Literal["latest"]) -> Result:
return await self.get(f"subjects/{quote_plus(subject)}/versions/{version}/referencedby")

async def delete_subjects(self, *, subject: str, params: Mapping[str, str] | None = None) -> Result:
return await self.delete(path=f"/subjects/{quote_plus(subject)}", params=params)

async def delete_subjects_version(
self, *, subject: str, version: int | Literal["latest"], permanent: bool | None = None
) -> Result:
path = f"subjects/{quote_plus(subject)}/versions/{version}"
if permanent is not None:
path = f"{path}?permanent={str(permanent).lower()}"
return await self.delete(path)
4 changes: 4 additions & 0 deletions src/schema_registry/routers/compatibility.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,17 @@
from schema_registry.container import SchemaRegistryContainer
from schema_registry.controller import KarapaceSchemaRegistryController
from schema_registry.routers.errors import unauthorized
from schema_registry.routers.raw_path_router import RawPathRoute
from schema_registry.routers.requests import CompatibilityCheckResponse, SchemaRequest
from schema_registry.user import get_current_user
from typing import Annotated
from urllib.parse import unquote_plus

compatibility_router = APIRouter(
prefix="/compatibility",
tags=["compatibility"],
responses={404: {"description": "Not found"}},
route_class=RawPathRoute,
)


Expand All @@ -31,6 +34,7 @@ async def compatibility_post(
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]),
controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]),
) -> CompatibilityCheckResponse:
subject = Subject(unquote_plus(subject))
if authorizer and not authorizer.check_authorization(user, Operation.Read, f"Subject:{subject}"):
raise unauthorized()

Expand Down
7 changes: 7 additions & 0 deletions src/schema_registry/routers/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,18 @@
from schema_registry.controller import KarapaceSchemaRegistryController
from schema_registry.registry import KarapaceSchemaRegistry
from schema_registry.routers.errors import no_primary_url_error, unauthorized
from schema_registry.routers.raw_path_router import RawPathRoute
from schema_registry.routers.requests import CompatibilityLevelResponse, CompatibilityRequest, CompatibilityResponse
from schema_registry.user import get_current_user
from typing import Annotated
from urllib.parse import unquote_plus


config_router = APIRouter(
prefix="/config",
tags=["config"],
responses={404: {"description": "Not found"}},
route_class=RawPathRoute,
)


Expand Down Expand Up @@ -69,6 +73,7 @@ async def config_get_subject(
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]),
controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]),
) -> CompatibilityLevelResponse:
subject = Subject(unquote_plus(subject))
if authorizer and not authorizer.check_authorization(user, Operation.Read, f"Subject:{subject}"):
raise unauthorized()

Expand All @@ -87,6 +92,7 @@ async def config_set_subject(
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]),
controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]),
) -> CompatibilityResponse:
subject = Subject(unquote_plus(subject))
if authorizer and not authorizer.check_authorization(user, Operation.Write, f"Subject:{subject}"):
raise unauthorized()

Expand All @@ -111,6 +117,7 @@ async def config_delete_subject(
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]),
controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]),
) -> CompatibilityResponse:
subject = Subject(unquote_plus(subject))
if authorizer and not authorizer.check_authorization(user, Operation.Write, f"Subject:{subject}"):
raise unauthorized()

Expand Down
4 changes: 4 additions & 0 deletions src/schema_registry/routers/mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,17 @@
from schema_registry.container import SchemaRegistryContainer
from schema_registry.controller import KarapaceSchemaRegistryController
from schema_registry.routers.errors import unauthorized
from schema_registry.routers.raw_path_router import RawPathRoute
from schema_registry.routers.requests import ModeResponse
from schema_registry.user import get_current_user
from typing import Annotated
from urllib.parse import unquote_plus

mode_router = APIRouter(
prefix="/mode",
tags=["mode"],
responses={404: {"description": "Not found"}},
route_class=RawPathRoute,
)


Expand All @@ -42,6 +45,7 @@ async def mode_get_subject(
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]),
controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]),
) -> ModeResponse:
subject = Subject(unquote_plus(subject))
if authorizer and not authorizer.check_authorization(user, Operation.Read, f"Subject:{subject}"):
raise unauthorized()

Expand Down
44 changes: 44 additions & 0 deletions src/schema_registry/routers/raw_path_router.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
"""
Copyright (c) 2025 Aiven Ltd
See LICENSE for details
"""

"""
The MIT License (MIT)
Copyright (c) 2018 Sebastián Ramírez
"""

from fastapi import HTTPException # noqa: E402
from fastapi.routing import APIRoute # noqa: E402
from starlette.routing import Match # noqa: E402
from starlette.types import Scope # noqa: E402

import re # noqa: E402


class RawPathRoute(APIRoute):
"""The subject in the paths can contain url encoded forward slash
See explanation and origin of the solution:
- https://github.com/fastapi/fastapi/discussions/7328#discussioncomment-8443865
Starlette defect discussion:
- https://github.com/encode/starlette/issues/826
"""

def matches(self, scope: Scope) -> tuple[Match, Scope]:
raw_path: str | None = None

if "raw_path" in scope and scope["raw_path"] is not None:
raw_path = scope["raw_path"].decode("utf-8")

if raw_path is None:
raise HTTPException(status_code=500, detail="Internal routing error")

# Drop the last forward slash if present. e.g. '/path/' -> '/path', but from path '/'
if len(raw_path) > 1:
raw_path = raw_path if raw_path[-1] != "/" else raw_path[:-1]

new_path = re.sub(r"\?.*", "", raw_path)
scope["path"] = new_path
return super().matches(scope)
14 changes: 11 additions & 3 deletions src/schema_registry/routers/subjects.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,17 @@
from dependency_injector.wiring import inject, Provide
from fastapi import APIRouter, Depends, Request
from karapace.auth import AuthenticatorAndAuthorizer, Operation, User
from karapace.config import Config
from karapace.container import KarapaceContainer
from karapace.forward_client import ForwardClient
from karapace.typing import Subject
from schema_registry.container import SchemaRegistryContainer
from schema_registry.controller import KarapaceSchemaRegistryController
from schema_registry.registry import KarapaceSchemaRegistry
from schema_registry.routers.errors import no_primary_url_error, unauthorized
from schema_registry.routers.raw_path_router import RawPathRoute
from schema_registry.routers.requests import SchemaIdResponse, SchemaRequest, SchemaResponse, SubjectSchemaVersionResponse
from schema_registry.user import get_current_user
from typing import Annotated
from urllib.parse import unquote_plus

import logging

Expand All @@ -27,6 +27,7 @@
prefix="/subjects",
tags=["subjects"],
responses={404: {"description": "Not found"}},
route_class=RawPathRoute,
)


Expand Down Expand Up @@ -56,6 +57,7 @@ async def subjects_subject_post(
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]),
controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]),
) -> SchemaResponse:
subject = Subject(unquote_plus(subject))
if authorizer and not authorizer.check_authorization(user, Operation.Read, f"Subject:{subject}"):
raise unauthorized()

Expand All @@ -78,8 +80,8 @@ async def subjects_subject_delete(
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]),
schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.schema_registry]),
controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]),
config: Config = Depends(Provide[KarapaceContainer.config]),
) -> list[int]:
subject = Subject(unquote_plus(subject))
if authorizer and not authorizer.check_authorization(user, Operation.Write, f"Subject:{subject}"):
raise unauthorized()

Expand All @@ -105,6 +107,7 @@ async def subjects_subject_versions_post(
normalize: bool = False,
controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]),
) -> SchemaIdResponse:
subject = Subject(unquote_plus(subject))
if authorizer and not authorizer.check_authorization(user, Operation.Write, f"Subject:{subject}"):
raise unauthorized()

Expand All @@ -128,6 +131,7 @@ async def subjects_subject_versions_list(
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]),
controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]),
) -> list[int]:
subject = Subject(unquote_plus(subject))
if authorizer and not authorizer.check_authorization(user, Operation.Read, f"Subject:{subject}"):
raise unauthorized()

Expand All @@ -144,6 +148,7 @@ async def subjects_subject_version_get(
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]),
controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]),
) -> SubjectSchemaVersionResponse:
subject = Subject(unquote_plus(subject))
if authorizer and not authorizer.check_authorization(user, Operation.Read, f"Subject:{subject}"):
raise unauthorized()

Expand All @@ -163,6 +168,7 @@ async def subjects_subject_version_delete(
schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.schema_registry]),
controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]),
) -> int:
subject = Subject(unquote_plus(subject))
if authorizer and not authorizer.check_authorization(user, Operation.Write, f"Subject:{subject}"):
raise unauthorized()

Expand All @@ -185,6 +191,7 @@ async def subjects_subject_version_schema_get(
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]),
controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]),
) -> dict:
subject = Subject(unquote_plus(subject))
if authorizer and not authorizer.check_authorization(user, Operation.Read, f"Subject:{subject}"):
raise unauthorized()

Expand All @@ -200,6 +207,7 @@ async def subjects_subject_version_referenced_by(
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]),
controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]),
) -> list[int]:
subject = Subject(unquote_plus(subject))
if authorizer and not authorizer.check_authorization(user, Operation.Read, f"Subject:{subject}"):
raise unauthorized()

Expand Down
Loading

0 comments on commit b21230c

Please sign in to comment.