From 696d3ccee2c04e47263bbcc19d56e1546e22fb1f Mon Sep 17 00:00:00 2001 From: Alejandro Villar Date: Tue, 18 Jun 2024 16:49:36 +0200 Subject: [PATCH] Fix schema annotation with provided/forced contents --- ogc/na/annotate_schema.py | 45 +++++++++++++++++---------------------- requirements.txt | 1 + 2 files changed, 21 insertions(+), 25 deletions(-) diff --git a/ogc/na/annotate_schema.py b/ogc/na/annotate_schema.py index 00423d7..6c48bfd 100644 --- a/ogc/na/annotate_schema.py +++ b/ogc/na/annotate_schema.py @@ -127,6 +127,7 @@ from typing import Any, AnyStr, Callable, Sequence, Iterable from urllib.parse import urlparse, urljoin +import jsonpointer import jsonschema import requests_cache @@ -180,26 +181,21 @@ class SchemaResolver: def __init__(self, working_directory=Path()): self.working_directory = working_directory.resolve() - self._schema_cache: dict[str | Path, tuple[Any, bool]] = {} + self._schema_cache: dict[str | Path, Any] = {} @staticmethod def _get_branch(schema: dict, ref: str): - path = re.sub(r'^#?/?', '', ref).split('/') - pointer = schema - for item in path: - if item: - pointer = pointer[item] - return pointer + return jsonpointer.resolve_pointer(schema, re.sub('^#', '', ref)) def load_contents(self, s: str | Path) -> tuple[dict, bool]: """ Load the contents of a schema. Can be overriden by subclasses to alter the loading process. """ - contents, is_json = self._schema_cache.get(s, (None, False)) + contents = self._schema_cache.get(s) if contents is None: - contents, is_json = load_json_yaml(read_contents(s)[0]) - self._schema_cache[s] = contents, is_json - return contents, is_json + contents = read_contents(s)[0] + self._schema_cache[s] = contents + return load_json_yaml(contents) def resolve_ref(self, ref: str | Path, from_schema: ReferencedSchema | None = None) -> tuple[Path | str, str]: location = ref @@ -230,7 +226,8 @@ def resolve_ref(self, ref: str | Path, from_schema: ReferencedSchema | None = No return location, fragment - def resolve_schema(self, ref: str | Path, from_schema: ReferencedSchema | None = None) -> ReferencedSchema | None: + def resolve_schema(self, ref: str | Path, from_schema: ReferencedSchema | None = None, + force_contents: dict | None = None) -> ReferencedSchema | None: chain = from_schema.chain + [from_schema] if from_schema else [] try: schema_source, fragment = self.resolve_ref(ref, from_schema) @@ -251,6 +248,8 @@ def resolve_schema(self, ref: str | Path, from_schema: ReferencedSchema | None = is_json=from_schema.is_json) contents, is_json = self.load_contents(schema_source) + if force_contents: + contents = force_contents if fragment: return ReferencedSchema(location=schema_source, fragment=fragment, subschema=SchemaResolver._get_branch(contents, fragment), @@ -457,12 +456,8 @@ def __init__(self, schema_resolver: SchemaResolver | None = None, def process_schema(self, location: Path | str | None, default_context: str | Path | dict | None = None, contents: dict | None = None) -> AnnotatedSchema | None: - resolved_schema = self.schema_resolver.resolve_schema(location) - if contents: - # overriden - schema = contents - else: - schema = resolved_schema.subschema + resolved_schema = self.schema_resolver.resolve_schema(location, force_contents=contents) + schema = resolved_schema.subschema if all(x not in schema for x in ('schema', 'openapi')): validate_schema(schema) @@ -568,6 +563,13 @@ def process_subschema(subschema, context_stack, from_schema: ReferencedSchema, l used_terms = set() + # Annotate definitions and $defs - can later be overridden if referenced from a different path + for p in ('definitions', '$defs'): + defs = subschema.get(p) + if defs and isinstance(defs, dict): + for entry in defs.values(): + used_terms.update(process_subschema(entry, context_stack, from_schema, level + 1)) + if '$ref' in subschema and id(subschema) not in updated_refs: if self._ref_mapper: subschema['$ref'] = self._ref_mapper(subschema['$ref'], subschema) @@ -585,13 +587,6 @@ def process_subschema(subschema, context_stack, from_schema: ReferencedSchema, l for entry in collection: used_terms.update(process_subschema(entry, context_stack, from_schema, level + 1)) - # Annotate definitions and $defs - for p in ('definitions', '$defs'): - defs = subschema.get(p) - if defs and isinstance(defs, dict): - for entry in defs.values(): - used_terms.update(process_subschema(entry, context_stack, from_schema, level + 1)) - for p in ('then', 'else', 'additionalProperties'): branch = subschema.get(p) if branch and isinstance(branch, dict): diff --git a/requirements.txt b/requirements.txt index a9bbf23..8d26250 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,6 +11,7 @@ GitPython>=3.1.32 rfc3987 requests-cache xmltodict +jsonpointer~=2.4 # to be removed once https://github.com/RDFLib/pySHACL/issues/212 is fixed setuptools