From 19590739d68b047ca0480d37669225ed52123144 Mon Sep 17 00:00:00 2001 From: Lukas Plank <lupl@tuta.io> Date: Tue, 10 Dec 2024 11:02:49 +0100 Subject: [PATCH] feat: implement strategies for SPARQL query functionality Currently, rdfproxy relies on SPARQLWrapper for querying triplestores; this is not always ideal, since SPARQLWrapper occasionally gets blacklisted e.g. by wikidata and caused severe performance issues in the past. The change introduces SPARQLQuery strategies for better control over what query functionality should run in RDFProxy. The default strategy, SPARQLWrapperStrategy, implements exactly the previous SPARQLWrapper behavior. An HttpxStrategy implements the query functionality required by RDFProxy using raw httpx instead of SPARQLWrapper. --- rdfproxy/adapter.py | 20 +++++------ rdfproxy/sparql_strategies.py | 64 +++++++++++++++++++++++++++++++++++ 2 files changed, 73 insertions(+), 11 deletions(-) create mode 100644 rdfproxy/sparql_strategies.py diff --git a/rdfproxy/adapter.py b/rdfproxy/adapter.py index ef93ad3..05e705b 100644 --- a/rdfproxy/adapter.py +++ b/rdfproxy/adapter.py @@ -6,6 +6,7 @@ from SPARQLWrapper import JSON, SPARQLWrapper from rdfproxy.mapper import ModelBindingsMapper +from rdfproxy.sparql_strategies import SPARQLStrategy, SPARQLWrapperStrategy from rdfproxy.utils._types import _TModelInstance from rdfproxy.utils.models import Page, QueryParameters from rdfproxy.utils.sparql_utils import ( @@ -32,15 +33,16 @@ class SPARQLModelAdapter(Generic[_TModelInstance]): """ def __init__( - self, target: str | SPARQLWrapper, query: str, model: type[_TModelInstance] + self, + target: str, + query: str, + model: type[_TModelInstance], + sparql_strategy: SPARQLStrategy = SPARQLWrapperStrategy, ) -> None: self._query = query self._model = model - self.sparql_wrapper: SPARQLWrapper = ( - SPARQLWrapper(target) if isinstance(target, str) else target - ) - self.sparql_wrapper.setReturnFormat(JSON) + self.sparql_strategy = sparql_strategy(target) def query(self, query_parameters: QueryParameters) -> Page[_TModelInstance]: """Run a query against an endpoint and return a Page model object.""" @@ -52,9 +54,7 @@ def query(self, query_parameters: QueryParameters) -> Page[_TModelInstance]: offset=calculate_offset(query_parameters.page, query_parameters.size), ) - items_query_bindings: Iterator[dict] = query_with_wrapper( - query=items_query, sparql_wrapper=self.sparql_wrapper - ) + items_query_bindings: Iterator[dict] = self.sparql_strategy.query(items_query) mapper = ModelBindingsMapper(self._model, *items_query_bindings) @@ -75,7 +75,5 @@ def _get_count(self, query: str) -> int: Helper for SPARQLModelAdapter.query. """ - result: Iterator[dict] = query_with_wrapper( - query=query, sparql_wrapper=self.sparql_wrapper - ) + result: Iterator[dict] = self.sparql_strategy.query(query) return int(next(result)["cnt"]) diff --git a/rdfproxy/sparql_strategies.py b/rdfproxy/sparql_strategies.py new file mode 100644 index 0000000..4aa0700 --- /dev/null +++ b/rdfproxy/sparql_strategies.py @@ -0,0 +1,64 @@ +"""Strategy classes for SPARQL query functionality.""" + +import abc +from collections.abc import Iterator + +from SPARQLWrapper import JSON, QueryResult, SPARQLWrapper +import httpx + + +class SPARQLStrategy(abc.ABC): + def __init__(self, endpoint: str): + self.endpoint = endpoint + + @abc.abstractmethod + def query(self, sparql_query: str) -> Iterator[dict[str, str]]: + raise NotImplementedError + + @staticmethod + def _get_bindings_from_bindings_dict(bindings_dict: dict) -> Iterator[dict]: + bindings = map( + lambda binding: {k: v["value"] for k, v in binding.items()}, + bindings_dict["results"]["bindings"], + ) + return bindings + + +class SPARQLWrapperStrategy(SPARQLStrategy): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + self._sparql_wrapper = SPARQLWrapper(self.endpoint) + self._sparql_wrapper.setReturnFormat(JSON) + + def query(self, sparql_query: str) -> Iterator[dict[str, str]]: + self._sparql_wrapper.setQuery(sparql_query) + + result: QueryResult = self._sparql_wrapper.query() + return self._get_bindings_from_bindings_dict(result.convert()) + + +class HttpxStrategy(SPARQLStrategy): + def query(self, sparql_query: str) -> Iterator[dict[str, str]]: + result: httpx.Response = self._httpx_run_sparql_query(sparql_query) + return self._get_bindings_from_bindings_dict(result.json()) + + def _httpx_run_sparql_query( + self, query: str, headers: dict | None = None + ) -> httpx.Response: + data = {"output": "json", "query": query} + headers = ( + { + "Accept": "application/sparql-results+json", + } + if headers is None + else headers + ) + + response = httpx.post( + self.endpoint, + headers=headers, + data=data, + ) + + return response