diff --git a/poetry.lock b/poetry.lock index 73bfa10..ec0918a 100644 --- a/poetry.lock +++ b/poetry.lock @@ -443,17 +443,6 @@ docs = ["sphinx (<5)", "sphinx-rtd-theme"] keepalive = ["keepalive (>=0.5)"] pandas = ["pandas (>=1.3.5)"] -[[package]] -name = "toolz" -version = "0.12.1" -description = "List processing tools and functional utilities" -optional = false -python-versions = ">=3.7" -files = [ - {file = "toolz-0.12.1-py3-none-any.whl", hash = "sha256:d22731364c07d72eea0a0ad45bafb2c2937ab6fd38a3507bf55eae8744aa7d85"}, - {file = "toolz-0.12.1.tar.gz", hash = "sha256:ecca342664893f177a13dac0e6b41cbd8ac25a358e5f215316d43e2100224f4d"}, -] - [[package]] name = "typing-extensions" version = "4.12.2" @@ -468,4 +457,4 @@ files = [ [metadata] lock-version = "2.0" python-versions = "^3.11" -content-hash = "19d4d0682490336d67ea3aab0f28b54347ff8083cd3e9ec28856506adaac4bd6" +content-hash = "391db9e02daec3b0eec55fcf06de8d9fa9adeb30a5fa6b6313f767e9acad2513" diff --git a/pyproject.toml b/pyproject.toml index 3b3a86d..90c4956 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,7 +8,6 @@ readme = "README.md" [tool.poetry.dependencies] python = "^3.11" sparqlwrapper = "^2.0.0" -toolz = "^0.12.1" pydantic = "^2.8.2" diff --git a/rdfproxy/adapter.py b/rdfproxy/adapter.py index da9ece9..3576c86 100644 --- a/rdfproxy/adapter.py +++ b/rdfproxy/adapter.py @@ -1,87 +1,200 @@ -"""SPARQLModelAdapter class for QueryResult to Pydantic model conversions.""" +"""SPARQLModelAdapter class for SPARQL query result set to Pydantic model conversions.""" -from collections.abc import Iterable -from typing import cast +from collections import defaultdict +from collections.abc import Iterator +import math +from typing import Any, Generic, overload from SPARQLWrapper import JSON, QueryResult, SPARQLWrapper -from pydantic import BaseModel -from rdfproxy.utils._types import _TModelConstructorCallable, _TModelInstance +from rdfproxy.utils._exceptions import ( + InterdependentParametersException, + UndefinedBindingException, +) +from rdfproxy.utils._types import _TModelInstance +from rdfproxy.utils.models import Page +from rdfproxy.utils.sparql.sparql_templates import ungrouped_pagination_base_query +from rdfproxy.utils.sparql.sparql_utils import ( + calculate_offset, + construct_count_query, + construct_grouped_count_query, + construct_grouped_pagination_query, + query_with_wrapper, + temporary_query_override, +) from rdfproxy.utils.utils import ( get_bindings_from_query_result, instantiate_model_from_kwargs, ) -class SPARQLModelAdapter: - """Adapter/Mapper for QueryResult to Pydantic model conversions. - - The rdfproxy.SPARQLModelAdapter class allows to run a query against an endpoint - and map a flat SPARQL query result set to a potentially nested Pydantic model. - - Example: - - from SPARQLWrapper import SPARQLWrapper - from pydantic import BaseModel - from rdfproxy import SPARQLModelAdapter, _TModelInstance - - class SimpleModel(BaseModel): - x: int - y: int - - class NestedModel(BaseModel): - a: str - b: SimpleModel - - class ComplexModel(BaseModel): - p: str - q: NestedModel - - - sparql_wrapper = SPARQLWrapper("https://query.wikidata.org/bigdata/namespace/wdq/sparql") +class SPARQLModelAdapter(Generic[_TModelInstance]): + """Adapter/Mapper for SPARQL query result set to Pydantic model conversions. - query = ''' - select ?x ?y ?a ?p - where { - values (?x ?y ?a ?p) { - (1 2 "a value" "p value") - } - } - ''' - - adapter = SPARQLModelAdapter(sparql_wrapper=sparql_wrapper) - models: list[_TModelInstance] = adapter(query=query, model_constructor=ComplexModel) + The rdfproxy.SPARQLModelAdapter class allows to run a query against an endpoint, + map a flat SPARQL query result set to a potentially nested Pydantic model and + optionally paginate and/or group the results by a SPARQL binding. """ - def __init__(self, sparql_wrapper: SPARQLWrapper) -> None: - self.sparql_wrapper = sparql_wrapper - - if self.sparql_wrapper.returnFormat != "json": - self.sparql_wrapper.setReturnFormat(JSON) + def __init__( + self, target: str | SPARQLWrapper, query: str, model: type[_TModelInstance] + ) -> None: + self._query = query + self._model = model - def __call__( - self, - query: str, - model_constructor: type[_TModelInstance] | _TModelConstructorCallable, - ) -> Iterable[_TModelInstance]: + self.sparql_wrapper: SPARQLWrapper = ( + SPARQLWrapper(target) if isinstance(target, str) else target + ) + self.sparql_wrapper.setReturnFormat(JSON) self.sparql_wrapper.setQuery(query) - query_result: QueryResult = self.sparql_wrapper.query() - if isinstance(model_constructor, type(BaseModel)): - model_constructor = cast(type[_TModelInstance], model_constructor) + @overload + def query(self) -> list[_TModelInstance]: ... - bindings = get_bindings_from_query_result(query_result) - models: list[_TModelInstance] = [ - instantiate_model_from_kwargs(model_constructor, **binding) - for binding in bindings - ] + @overload + def query( + self, + *, + group_by: str, + ) -> dict[str, list[_TModelInstance]]: ... - elif isinstance(model_constructor, _TModelConstructorCallable): - models: Iterable[_TModelInstance] = model_constructor(query_result) + @overload + def query( + self, + *, + page: int, + size: int, + ) -> Page[_TModelInstance]: ... - else: - raise TypeError( - "Argument 'model_constructor' must be a model class " - "or a model constructor callable." - ) + @overload + def query( + self, + *, + page: int, + size: int, + group_by: str, + ) -> Page[_TModelInstance]: ... - return models + def query( + self, + *, + page: int | None = None, + size: int | None = None, + group_by: str | None = None, + ) -> ( + list[_TModelInstance] | dict[str, list[_TModelInstance]] | Page[_TModelInstance] + ): + """Run query against endpoint and map the SPARQL query result set to a Pydantic model. + + Optional pagination and/or grouping by a SPARQL binding is avaible by + supplying the group_by and/or page/size parameters. + """ + match page, size, group_by: + case None, None, None: + return self._query_collect_models() + case int(), int(), None: + return self._query_paginate_ungrouped(page=page, size=size) + case None, None, str(): + return self._query_group_by(group_by=group_by) + case int(), int(), str(): + return self._query_paginate_grouped( + page=page, size=size, group_by=group_by + ) + case (None, int(), Any()) | (int(), None, Any()): + raise InterdependentParametersException( + "Parameters 'page' and 'size' are mutually dependent." + ) + case _: + raise Exception("This should never happen.") + + def _query_generate_model_bindings_mapping( + self, query: str | None = None + ) -> Iterator[tuple[_TModelInstance, dict[str, Any]]]: + """Run query, construct model instances and generate a model-bindings mapping. + + The query parameter defaults to the initially defined query and + is run against the endpoint defined in the SPARQLModelAdapter instance. + + Note: The coupling of model instances with flat SPARQL results + allows for easier and more efficient grouping operations (see grouping functionality). + """ + if query is None: + query_result: QueryResult = self.sparql_wrapper.query() + else: + with temporary_query_override(self.sparql_wrapper): + self.sparql_wrapper.setQuery(query) + query_result: QueryResult = self.sparql_wrapper.query() + + _bindings = get_bindings_from_query_result(query_result) + + for bindings in _bindings: + model = instantiate_model_from_kwargs(self._model, **bindings) + yield model, bindings + + def _query_collect_models(self, query: str | None = None) -> list[_TModelInstance]: + """Run query against endpoint and collect model instances.""" + return [ + model + for model, _ in self._query_generate_model_bindings_mapping(query=query) + ] + + def _query_group_by( + self, group_by: str, query: str | None = None + ) -> dict[str, list[_TModelInstance]]: + """Run query against endpoint and group results by a SPARQL binding.""" + group = defaultdict(list) + + for model, bindings in self._query_generate_model_bindings_mapping(query): + try: + key = bindings[group_by] + except KeyError: + raise UndefinedBindingException( + f"SPARQL binding '{group_by}' requested for grouping " + f"not in query projection '{bindings}'." + ) + + group[str(key)].append(model) + + return group + + def _get_count(self, query: str) -> int: + """Construct a count query from the initialized query, run it and return the count result.""" + result = query_with_wrapper(query=query, sparql_wrapper=self.sparql_wrapper) + return int(next(result)["cnt"]) + + def _query_paginate_ungrouped(self, page: int, size: int) -> Page[_TModelInstance]: + """Run query with pagination according to page and size. + + The internal query is dynamically modified according to page (offset)/size (limit) + and run with SPARQLModelAdapter._query_collect_models. + """ + paginated_query = ungrouped_pagination_base_query.substitute( + query=self._query, offset=calculate_offset(page, size), limit=size + ) + count_query = construct_count_query(self._query) + + items = self._query_collect_models(query=paginated_query) + total = self._get_count(count_query) + pages = math.ceil(total / size) + + return Page(items=items, page=page, size=size, total=total, pages=pages) + + def _query_paginate_grouped( + self, page: int, size: int, group_by: str + ) -> Page[_TModelInstance]: + """Run query with pagination according to page/size and group result by a SPARQL binding. + + The internal query is dynamically modified according to page (offset)/size (limit) + and run with SPARQLModelAdapter._query_group_by. + """ + grouped_paginated_query = construct_grouped_pagination_query( + query=self._query, page=page, size=size, group_by=group_by + ) + grouped_count_query = construct_grouped_count_query( + query=self._query, group_by=group_by + ) + + items = self._query_group_by(group_by=group_by, query=grouped_paginated_query) + total = self._get_count(grouped_count_query) + pages = math.ceil(total / size) + + return Page(items=items, page=page, size=size, total=total, pages=pages) diff --git a/rdfproxy/utils/_exceptions.py b/rdfproxy/utils/_exceptions.py new file mode 100644 index 0000000..2a4d219 --- /dev/null +++ b/rdfproxy/utils/_exceptions.py @@ -0,0 +1,9 @@ +"""Custom exceptions for RDFProxy.""" + + +class UndefinedBindingException(KeyError): + """Exception for indicating that a requested key could not be retrieved from a SPARQL binding mapping.""" + + +class InterdependentParametersException(Exception): + """Exceptiono for indicating that two or more parameters are interdependent.""" diff --git a/rdfproxy/utils/_types.py b/rdfproxy/utils/_types.py index 65bc1bf..899327e 100644 --- a/rdfproxy/utils/_types.py +++ b/rdfproxy/utils/_types.py @@ -15,3 +15,24 @@ class _TModelConstructorCallable(Protocol[_TModelInstance]): """Callback protocol for model constructor callables.""" def __call__(self, query_result: QueryResult) -> Iterable[_TModelInstance]: ... + + +class SPARQLBinding(str): + """SPARQLBinding type for explicit SPARQL binding to model field allocation. + + This type's intended use is with typing.Annotated in the context of a Pyantic field definition. + + Example: + + class Work(BaseModel): + name: Annotated[str, SPARQLBinding("title")] + + class Person(BaseModel): + name: str + work: Work + + This signals to the RDFProxy SPARQL-to-model mapping logic + to use the "title" SPARQL binding (not the "name" binding) to populate the Work.name field. + """ + + ... diff --git a/rdfproxy/utils/models.py b/rdfproxy/utils/models.py new file mode 100644 index 0000000..d4ec9e4 --- /dev/null +++ b/rdfproxy/utils/models.py @@ -0,0 +1,23 @@ +"""Pydantic Model definitions for rdfproxy.""" + +from typing import Generic + +from pydantic import BaseModel +from rdfproxy.utils._types import _TModelInstance + + +class Page(BaseModel, Generic[_TModelInstance]): + """Page model for rdfproxy pagination functionality. + + This model is loosely inspired by the fastapi-pagination Page class, + see https://github.com/uriyyo/fastapi-pagination. + + Also see https://docs.pydantic.dev/latest/concepts/models/#generic-models + for Generic Pydantic models. + """ + + items: list[_TModelInstance] | dict[str, list[_TModelInstance]] + page: int + size: int + total: int + pages: int diff --git a/rdfproxy/utils/sparql/sparql_templates.py b/rdfproxy/utils/sparql/sparql_templates.py new file mode 100644 index 0000000..f9b141c --- /dev/null +++ b/rdfproxy/utils/sparql/sparql_templates.py @@ -0,0 +1,10 @@ +"""SPARQL Query templates for RDFProxy paginations.""" + +from string import Template + + +ungrouped_pagination_base_query = Template(""" +$query +limit $limit +offset $offset +""") diff --git a/rdfproxy/utils/sparql/sparql_utils.py b/rdfproxy/utils/sparql/sparql_utils.py new file mode 100644 index 0000000..146fabf --- /dev/null +++ b/rdfproxy/utils/sparql/sparql_utils.py @@ -0,0 +1,125 @@ +"""Functionality for dynamic SPARQL query modifcation.""" + +from collections.abc import Iterator +from contextlib import contextmanager +import re + +from SPARQLWrapper import QueryResult, SPARQLWrapper +from rdfproxy.utils.sparql.sparql_templates import ungrouped_pagination_base_query +from rdfproxy.utils.utils import get_bindings_from_query_result + + +def remove_query_prefixes(query: str) -> str: + """Remove prefix definitions from a SPARQL query. + + Prefix definitions need removing e.g. in injected subqueries. + """ + return re.sub( + pattern=r"^prefix.*", repl="", string=query, flags=re.I | re.MULTILINE + ) + + +def inject_subquery(query: str, subquery: str) -> str: + """Inject a subquery into query.""" + + def _indent_query(query: str, indent: int = 2) -> str: + """Indent a query by n spaces according to indent parameter.""" + indented_query = "".join( + [f"{' ' * indent}{line}\n" for line in query.splitlines()] + ) + return indented_query + + point: int = query.rfind("}") + partial_query: str = query[:point] + + _subquery = remove_query_prefixes(subquery) + indented_subquery: str = _indent_query(_subquery) + + new_query: str = f"{partial_query} " f"{{{indented_subquery}}}\n}}" + return new_query + + +def replace_query_select_clause(query: str, repl: str) -> str: + """Replace the SELECT clause of a query with repl.""" + if re.search(r"select\s.+", query, re.I) is None: + raise Exception("Unable to obtain SELECT clause.") + + count_query = re.sub( + pattern=r"select\s.+", + repl=repl, + string=query, + count=1, + flags=re.I, + ) + + return count_query + + +def construct_count_query(query: str) -> str: + """Construct a generic count query from a SELECT query.""" + count_query = replace_query_select_clause(query, "select (count(*) as ?cnt)") + return count_query + + +def calculate_offset(page: int, size: int) -> int: + """Calculate offset value for paginated SPARQL templates.""" + match page: + case 1: + return 0 + case 2: + return size + case _: + return size * (page - 1) + + +def construct_grouped_pagination_query( + query: str, page: int, size: int, group_by: str +) -> str: + """Dynamically construct a query for grouped pagination. + + Based on the initial query, construct a query with limit/offset according to page/size + and with a SELECT clause that distinctly selects the group_by variable; + then inject that query into the initial query as a subquery. + """ + _paginated_query = ungrouped_pagination_base_query.substitute( + query=query, offset=calculate_offset(page, size), limit=size + ) + subquery = replace_query_select_clause( + _paginated_query, f"select distinct ?{group_by}" + ) + + grouped_pagination_query = inject_subquery(query=query, subquery=subquery) + return grouped_pagination_query + + +def construct_grouped_count_query(query: str, group_by) -> str: + grouped_count_query = replace_query_select_clause( + query, f"select (count(distinct ?{group_by}) as ?cnt)" + ) + + return grouped_count_query + + +@contextmanager +def temporary_query_override(sparql_wrapper: SPARQLWrapper): + """Context manager that allows to contextually overwrite a query in a SPARQLWrapper object.""" + _query_cache = sparql_wrapper.queryString + + try: + yield sparql_wrapper + finally: + sparql_wrapper.setQuery(_query_cache) + + +def query_with_wrapper(query: str, sparql_wrapper: SPARQLWrapper) -> Iterator[dict]: + """Execute a SPARQL query using a predefined sparql_wrapper object. + + The query attribute of the wrapper object is temporarily overridden + and gets restored after query execution. + """ + with temporary_query_override(sparql_wrapper=sparql_wrapper): + sparql_wrapper.setQuery(query) + result: QueryResult = sparql_wrapper.query() + + bindings: Iterator[dict] = get_bindings_from_query_result(result) + return bindings diff --git a/rdfproxy/utils/utils.py b/rdfproxy/utils/utils.py index 9c8506c..979efaa 100644 --- a/rdfproxy/utils/utils.py +++ b/rdfproxy/utils/utils.py @@ -1,12 +1,12 @@ """SPARQL/FastAPI utils.""" -from collections.abc import Iterator, Mapping -from typing import cast +from collections.abc import Iterator +from typing import Any from SPARQLWrapper import QueryResult from pydantic import BaseModel -from rdfproxy.utils._types import _TModelInstance -from toolz import valmap +from pydantic.fields import FieldInfo +from rdfproxy.utils._types import SPARQLBinding, _TModelInstance def get_bindings_from_query_result(query_result: QueryResult) -> Iterator[dict]: @@ -17,9 +17,9 @@ def get_bindings_from_query_result(query_result: QueryResult) -> Iterator[dict]: f"Received object with requestedFormat '{result_format}'." ) - query_json = cast(Mapping, query_result.convert()) + query_json: dict = query_result.convert() bindings = map( - lambda binding: valmap(lambda v: v["value"], binding), + lambda binding: {k: v["value"] for k, v in binding.items()}, query_json["results"]["bindings"], ) @@ -31,6 +31,9 @@ def instantiate_model_from_kwargs( ) -> _TModelInstance: """Instantiate a (potentially nested) model from (flat) kwargs. + More a more generic version of this function see upto.init_model_from_kwargs + https://github.com/lu-pl/upto?tab=readme-ov-file#init_model_from_kwargs. + Example: class SimpleModel(BaseModel): @@ -52,22 +55,36 @@ class ComplexModel(BaseModel): print(model) # p='p value' q=NestedModel(a='a value', b=SimpleModel(x=1, y=2)) """ - def _get_bindings(model: type[_TModelInstance], **kwargs) -> dict: + def _get_key_from_metadata(v: FieldInfo): + """Try to get a SPARQLBinding object from a field's metadata attribute. + + Helper for _generate_binding_pairs. + """ + try: + value = next(filter(lambda x: isinstance(x, SPARQLBinding), v.metadata)) + return value + except StopIteration: + return None + + def _generate_binding_pairs( + model: type[_TModelInstance], **kwargs + ) -> Iterator[tuple[str, Any]]: """Get the bindings needed for model instantation. The function traverses model.model_fields - and constructs a bindings dict by either getting values from kwargs or field defaults. + and constructs binding pairs by either getting values from kwargs or field defaults. For model fields the recursive clause runs. - - Note: This needs exception handling and proper testing. """ - return { - k: ( - v.annotation(**_get_bindings(v.annotation, **kwargs)) - if isinstance(v.annotation, type(BaseModel)) - else kwargs.get(k, v.default) - ) - for k, v in model.model_fields.items() - } - - return model(**_get_bindings(model, **kwargs)) + for k, v in model.model_fields.items(): + if isinstance(v.annotation, type(BaseModel)): + value = v.annotation( + **dict(_generate_binding_pairs(v.annotation, **kwargs)) + ) + else: + binding_key = _get_key_from_metadata(v) or k + value = kwargs.get(binding_key, v.default) + + yield k, value + + bindings = dict(_generate_binding_pairs(model, **kwargs)) + return model(**bindings)