Skip to content

Commit

Permalink
AVRO References functionality and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
libretto committed Jul 29, 2024
1 parent fd811e3 commit f0f9450
Show file tree
Hide file tree
Showing 4 changed files with 331 additions and 49 deletions.
1 change: 1 addition & 0 deletions karapace/compatibility/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ def check_compatibility(
if old_schema.schema_type is SchemaType.AVRO:
assert isinstance(old_schema.schema, AvroSchema)
assert isinstance(new_schema.schema, AvroSchema)

if compatibility_mode in {CompatibilityModes.BACKWARD, CompatibilityModes.BACKWARD_TRANSITIVE}:
result = check_avro_compatibility(
reader_schema=new_schema.schema,
Expand Down
76 changes: 63 additions & 13 deletions karapace/schema_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@
from karapace.utils import assert_never, json_decode, json_encode, JSONDecodeError
from typing import Any, cast, Dict, Final, final, Mapping, Sequence

import avro.schema
import hashlib
import logging
import re

LOG = logging.getLogger(__name__)

Expand Down Expand Up @@ -181,15 +183,42 @@ def schema(self) -> Draft7Validator | AvroSchema | ProtobufSchema:
return parsed_typed_schema.schema


def avro_schema_merge(schema_str: str, dependencies: Mapping[str, Dependency]) -> str:
"""To support references in AVRO we recursively merge all referenced schemas with current schema"""
if dependencies:
merged_schema = ""
for dependency in dependencies.values():
merged_schema += avro_schema_merge(dependency.schema.schema_str, dependency.schema.dependencies) + ",\n"
merged_schema += schema_str
return "[\n" + merged_schema + "\n]"
return schema_str
class AvroMerge:
def __init__(self, schema_str: str, dependencies: Mapping[str, Dependency] | None = None):
self.schema_str = json_encode(json_decode(schema_str), compact=True, sort_keys=True)
self.dependencies = dependencies
self.unique_id = 0

def union_safe_schema_str(self, schema_str: str) -> str:
# in case we meet union - we use it as is
regex = re.compile(r"^\s*\[")
base_schema = (
f'{{ "name": "___RESERVED_KARAPACE_WRAPPER_NAME_{self.unique_id}___",'
f'"type": "record", "fields": [{{"name": "name", "type":'
)
if regex.match(schema_str):
return f"{base_schema} {schema_str}}}]}}"
return f"{base_schema} [{schema_str}]}}]}}"

def builder(self, schema_str: str, dependencies: Mapping[str, Dependency] | None = None) -> str:
"""To support references in AVRO we iteratively merge all referenced schemas with current schema"""
stack: list[tuple[str, Mapping[str, Dependency] | None]] = [(schema_str, dependencies)]
merged_schemas = []

while stack:
current_schema_str, current_dependencies = stack.pop()
if current_dependencies:
stack.append((current_schema_str, None))
for dependency in reversed(current_dependencies.values()):
stack.append((dependency.schema.schema_str, dependency.schema.dependencies))
else:
self.unique_id += 1
merged_schemas.append(self.union_safe_schema_str(current_schema_str))

return ",\n".join(merged_schemas)

def wrap(self) -> str:
return "[\n" + self.builder(self.schema_str, self.dependencies) + "\n]"


def parse(
Expand All @@ -200,21 +229,41 @@ def parse(
references: Sequence[Reference] | None = None,
dependencies: Mapping[str, Dependency] | None = None,
normalize: bool = False,
dependencies_compat: bool = False,
) -> ParsedTypedSchema:
if schema_type not in [SchemaType.AVRO, SchemaType.JSONSCHEMA, SchemaType.PROTOBUF]:
raise InvalidSchema(f"Unknown parser {schema_type} for {schema_str}")

parsed_schema_result: Draft7Validator | AvroSchema | ProtobufSchema
parsed_schema: Draft7Validator | AvroSchema | ProtobufSchema
if schema_type is SchemaType.AVRO:
try:
if dependencies or dependencies_compat:
wrapped_schema_str = AvroMerge(schema_str, dependencies).wrap()
else:
wrapped_schema_str = schema_str
parsed_schema = parse_avro_schema_definition(
avro_schema_merge(schema_str, dependencies),
wrapped_schema_str,
validate_enum_symbols=validate_avro_enum_symbols,
validate_names=validate_avro_names,
)
if dependencies or dependencies_compat:
if isinstance(parsed_schema, avro.schema.UnionSchema):
parsed_schema_result = parsed_schema.schemas[-1].fields[0].type.schemas[-1]

else:
raise InvalidSchema
else:
parsed_schema_result = parsed_schema
return ParsedTypedSchema(
schema_type=schema_type,
schema_str=schema_str,
schema=parsed_schema_result,
references=references,
dependencies=dependencies,
schema_wrapped=parsed_schema,
)
except (SchemaParseException, JSONDecodeError, TypeError) as e:
raise InvalidSchema from e

elif schema_type is SchemaType.JSONSCHEMA:
try:
parsed_schema = parse_jsonschema_definition(schema_str)
Expand Down Expand Up @@ -276,9 +325,10 @@ def __init__(
schema: Draft7Validator | AvroSchema | ProtobufSchema,
references: Sequence[Reference] | None = None,
dependencies: Mapping[str, Dependency] | None = None,
schema_wrapped: Draft7Validator | AvroSchema | ProtobufSchema | None = None,
) -> None:
self._schema_cached: Draft7Validator | AvroSchema | ProtobufSchema | None = schema

self.schema_wrapped = schema_wrapped
super().__init__(
schema_type=schema_type,
schema_str=schema_str,
Expand Down
28 changes: 14 additions & 14 deletions karapace/schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -512,9 +512,9 @@ def _handle_msg_schema(self, key: dict, value: dict | None) -> None:

try:
schema_type_parsed = SchemaType(schema_type)
except ValueError:
except ValueError as e:
LOG.warning("Invalid schema type: %s", schema_type)
return
raise e

# This does two jobs:
# - Validates the schema's JSON
Expand All @@ -531,18 +531,18 @@ def _handle_msg_schema(self, key: dict, value: dict | None) -> None:
candidate_references = [reference_from_mapping(reference_data) for reference_data in schema_references]
resolved_references, resolved_dependencies = self.resolve_references(candidate_references)
schema_str = json.dumps(json.loads(schema_str), sort_keys=True)
except json.JSONDecodeError:
except json.JSONDecodeError as e:
LOG.warning("Schema is not valid JSON")
return
except InvalidReferences:
raise e
except InvalidReferences as e:
LOG.exception("Invalid AVRO references")
return
raise e
elif schema_type_parsed == SchemaType.JSONSCHEMA:
try:
schema_str = json.dumps(json.loads(schema_str), sort_keys=True)
except json.JSONDecodeError:
except json.JSONDecodeError as e:
LOG.warning("Schema is not valid JSON")
return
raise e
elif schema_type_parsed == SchemaType.PROTOBUF:
try:
if schema_references:
Expand All @@ -556,12 +556,12 @@ def _handle_msg_schema(self, key: dict, value: dict | None) -> None:
normalize=False,
)
schema_str = str(parsed_schema)
except InvalidSchema:
except InvalidSchema as e:
LOG.exception("Schema is not valid ProtoBuf definition")
return
except InvalidReferences:
raise e
except InvalidReferences as e:
LOG.exception("Invalid Protobuf references")
return
raise e

try:
typed_schema = TypedSchema(
Expand All @@ -571,8 +571,8 @@ def _handle_msg_schema(self, key: dict, value: dict | None) -> None:
dependencies=resolved_dependencies,
schema=parsed_schema,
)
except (InvalidSchema, JSONDecodeError):
return
except (InvalidSchema, JSONDecodeError) as e:
raise e

self.database.insert_schema_version(
subject=schema_subject,
Expand Down
Loading

0 comments on commit f0f9450

Please sign in to comment.