diff --git a/README.rst b/README.rst index 1207f691f..1a8eae1f5 100644 --- a/README.rst +++ b/README.rst @@ -597,6 +597,22 @@ The REST proxy process manages a set of producer and consumer clients, which are Before a client refreshes its OAuth2 JWT token, it is expected to remove currently running consumers (eg. after committing their offsets) and producers using the current token. +Schema Normalization +-------------------- + +If specified as a rest parameter for the POST ``/subjects/{subject}/versions?normalize=true`` endpoint and the POST ``subjects/{subject}?normalize=true`` endpoint, +Karapace uses a schema normalization algorithm to ensure that the schema is stored in a canonical form. + +This normalization process is done so that schemas semantically equivalent are stored in the same way and should be considered equal. + +Normalization is currently only supported for Protobuf schemas. Karapace does not support all normalization features implemented by Confluent Schema Registry. +Currently the normalization process is done only for the ordering of the optional fields in the schema. +Use the feature with the assumption that it will be extended in the future and so two schemas that are semantically equivalent could be considered +different by the normalization process in different future versions of Karapace. +The safe choice, when using a normalization process, is always to consider as different two schemas that are semantically equivalent while the problem is when two semantically different schemas are considered equivalent. +In that view the future extension of the normalization process isn't considered a breaking change but rather an extension of the normalization process. + + Uninstall ========= diff --git a/karapace/kafka_rest_apis/consumer_manager.py b/karapace/kafka_rest_apis/consumer_manager.py index 8aaf4e396..6b27c274c 100644 --- a/karapace/kafka_rest_apis/consumer_manager.py +++ b/karapace/kafka_rest_apis/consumer_manager.py @@ -467,7 +467,6 @@ async def fetch(self, internal_name: Tuple[str, str], content_type: str, formats ) # we get to be more in line with the confluent proxy by doing a bunch of fetches each time and # respecting the max fetch request size - # pylint: disable=protected-access max_bytes = ( int(query_params["max_bytes"]) if "max_bytes" in query_params diff --git a/karapace/protobuf/compare_type_lists.py b/karapace/protobuf/compare_type_lists.py index 0dfe26547..d4d181a95 100644 --- a/karapace/protobuf/compare_type_lists.py +++ b/karapace/protobuf/compare_type_lists.py @@ -11,12 +11,12 @@ from karapace.protobuf.exception import IllegalStateException from karapace.protobuf.message_element import MessageElement from karapace.protobuf.type_element import TypeElement -from typing import List +from typing import Sequence def compare_type_lists( - self_types_list: List[TypeElement], - other_types_list: List[TypeElement], + self_types_list: Sequence[TypeElement], + other_types_list: Sequence[TypeElement], result: CompareResult, compare_types: CompareTypes, ) -> CompareResult: diff --git a/karapace/protobuf/enum_element.py b/karapace/protobuf/enum_element.py index 37a49f566..dcee9522c 100644 --- a/karapace/protobuf/enum_element.py +++ b/karapace/protobuf/enum_element.py @@ -14,6 +14,7 @@ from karapace.protobuf.option_element import OptionElement from karapace.protobuf.type_element import TypeElement from karapace.protobuf.utils import append_documentation, append_indented +from typing import Sequence class EnumElement(TypeElement): @@ -22,8 +23,8 @@ def __init__( location: Location, name: str, documentation: str = "", - options: list[OptionElement] | None = None, - constants: list[EnumConstantElement] | None = None, + options: Sequence[OptionElement] | None = None, + constants: Sequence[EnumConstantElement] | None = None, ) -> None: # Enums do not allow nested type declarations. super().__init__(location, name, documentation, options or [], []) diff --git a/karapace/protobuf/extend_element.py b/karapace/protobuf/extend_element.py index 906b4f42e..da8229650 100644 --- a/karapace/protobuf/extend_element.py +++ b/karapace/protobuf/extend_element.py @@ -10,6 +10,7 @@ from karapace.protobuf.field_element import FieldElement from karapace.protobuf.location import Location from karapace.protobuf.utils import append_documentation, append_indented +from typing import Sequence @dataclass @@ -17,7 +18,7 @@ class ExtendElement: location: Location name: str documentation: str = "" - fields: list[FieldElement] | None = None + fields: Sequence[FieldElement] | None = None def to_schema(self) -> str: result: list[str] = [] diff --git a/karapace/protobuf/group_element.py b/karapace/protobuf/group_element.py index 8f0307817..1eeecf31c 100644 --- a/karapace/protobuf/group_element.py +++ b/karapace/protobuf/group_element.py @@ -11,6 +11,7 @@ from karapace.protobuf.field_element import FieldElement from karapace.protobuf.location import Location from karapace.protobuf.utils import append_documentation, append_indented +from typing import Sequence @dataclass @@ -20,7 +21,7 @@ class GroupElement: name: str tag: int documentation: str = "" - fields: list[FieldElement] | None = None + fields: Sequence[FieldElement] | None = None def to_schema(self) -> str: result: list[str] = [] diff --git a/karapace/protobuf/message_element.py b/karapace/protobuf/message_element.py index 0c9dd8d44..57eb03fa6 100644 --- a/karapace/protobuf/message_element.py +++ b/karapace/protobuf/message_element.py @@ -19,21 +19,27 @@ from karapace.protobuf.reserved_element import ReservedElement from karapace.protobuf.type_element import TypeElement from karapace.protobuf.utils import append_documentation, append_indented +from typing import Sequence class MessageElement(TypeElement): + nested_types: Sequence[TypeElement] + fields: Sequence[FieldElement] + one_ofs: Sequence[OneOfElement] + groups: Sequence[GroupElement] + def __init__( self, location: Location, name: str, documentation: str = "", - nested_types: list[TypeElement] | None = None, - options: list[OptionElement] | None = None, - reserveds: list[ReservedElement] | None = None, - fields: list[FieldElement] | None = None, - one_ofs: list[OneOfElement] | None = None, - extensions: list[ExtensionsElement] | None = None, - groups: list[GroupElement] | None = None, + nested_types: Sequence[TypeElement] | None = None, + options: Sequence[OptionElement] | None = None, + reserveds: Sequence[ReservedElement] | None = None, + fields: Sequence[FieldElement] | None = None, + one_ofs: Sequence[OneOfElement] | None = None, + extensions: Sequence[ExtensionsElement] | None = None, + groups: Sequence[GroupElement] | None = None, ) -> None: super().__init__(location, name, documentation, options or [], nested_types or []) self.reserveds = reserveds or [] diff --git a/karapace/protobuf/one_of_element.py b/karapace/protobuf/one_of_element.py index 3d7d1993f..278886e23 100644 --- a/karapace/protobuf/one_of_element.py +++ b/karapace/protobuf/one_of_element.py @@ -14,6 +14,7 @@ from karapace.protobuf.group_element import GroupElement from karapace.protobuf.option_element import OptionElement from karapace.protobuf.utils import append_documentation, append_indented +from typing import Sequence class OneOfElement: @@ -21,9 +22,9 @@ def __init__( self, name: str, documentation: str = "", - fields: list[FieldElement] | None = None, - groups: list[GroupElement] | None = None, - options: list[OptionElement] | None = None, + fields: Sequence[FieldElement] | None = None, + groups: Sequence[GroupElement] | None = None, + options: Sequence[OptionElement] | None = None, ) -> None: self.name = name self.documentation = documentation diff --git a/karapace/protobuf/option_element.py b/karapace/protobuf/option_element.py index 3488326aa..9ff50fe8e 100644 --- a/karapace/protobuf/option_element.py +++ b/karapace/protobuf/option_element.py @@ -11,6 +11,8 @@ class OptionElement: + name: str + class Kind(Enum): STRING = 1 BOOLEAN = 2 diff --git a/karapace/protobuf/proto_file_element.py b/karapace/protobuf/proto_file_element.py index b60ff33ab..c9f4be031 100644 --- a/karapace/protobuf/proto_file_element.py +++ b/karapace/protobuf/proto_file_element.py @@ -13,7 +13,7 @@ from karapace.protobuf.service_element import ServiceElement from karapace.protobuf.syntax import Syntax from karapace.protobuf.type_element import TypeElement -from typing import Dict, List, NewType, Optional +from typing import Dict, List, NewType, Optional, Sequence def _collect_dependencies_types(compare_types: CompareTypes, dependencies: Optional[Dict[str, Dependency]], is_self: bool): @@ -37,17 +37,21 @@ def _collect_dependencies_types(compare_types: CompareTypes, dependencies: Optio class ProtoFileElement: + types: Sequence[TypeElement] + services: Sequence[ServiceElement] + extend_declarations: Sequence[ExtendElement] + def __init__( self, location: Location, package_name: Optional[PackageName] = None, syntax: Optional[Syntax] = None, - imports: Optional[List[TypeName]] = None, - public_imports: Optional[List[TypeName]] = None, - types: Optional[List[TypeElement]] = None, - services: Optional[List[ServiceElement]] = None, - extend_declarations: Optional[List[ExtendElement]] = None, - options: Optional[List[OptionElement]] = None, + imports: Optional[Sequence[TypeName]] = None, + public_imports: Optional[Sequence[TypeName]] = None, + types: Optional[Sequence[TypeElement]] = None, + services: Optional[Sequence[ServiceElement]] = None, + extend_declarations: Optional[Sequence[ExtendElement]] = None, + options: Optional[Sequence[OptionElement]] = None, ) -> None: if types is None: types = list() diff --git a/karapace/protobuf/proto_normalizations.py b/karapace/protobuf/proto_normalizations.py new file mode 100644 index 000000000..9f65a2623 --- /dev/null +++ b/karapace/protobuf/proto_normalizations.py @@ -0,0 +1,274 @@ +""" +Copyright (c) 2024 Aiven Ltd +See LICENSE for details +""" + +from __future__ import annotations + +from karapace.dependency import Dependency +from karapace.protobuf.enum_constant_element import EnumConstantElement +from karapace.protobuf.enum_element import EnumElement +from karapace.protobuf.extend_element import ExtendElement +from karapace.protobuf.field_element import FieldElement +from karapace.protobuf.group_element import GroupElement +from karapace.protobuf.message_element import MessageElement +from karapace.protobuf.one_of_element import OneOfElement +from karapace.protobuf.option_element import OptionElement +from karapace.protobuf.proto_file_element import ProtoFileElement +from karapace.protobuf.rpc_element import RpcElement +from karapace.protobuf.schema import ProtobufSchema +from karapace.protobuf.service_element import ServiceElement +from karapace.protobuf.type_element import TypeElement +from karapace.schema_references import Reference +from typing import Mapping, Sequence + +import abc + + +def sort_by_name(element: OptionElement) -> str: + return element.name + + +class NormalizedRpcElement(RpcElement): + pass + + +class NormalizedServiceElement(ServiceElement): + rpcs: Sequence[NormalizedRpcElement] | None = None + + +class NormalizedFieldElement(FieldElement): + pass + + +class NormalizedExtendElement(ExtendElement): + fields: Sequence[NormalizedFieldElement] | None = None + + +class NormalizedTypeElement(TypeElement, abc.ABC): + nested_types: Sequence[NormalizedTypeElement] + + +class NormalizedProtoFileElement(ProtoFileElement): + types: Sequence[NormalizedTypeElement] + services: Sequence[NormalizedServiceElement] + extend_declarations: Sequence[NormalizedExtendElement] + + +class NormalizedMessageElement(MessageElement, NormalizedTypeElement): + nested_types: Sequence[NormalizedTypeElement] + fields: Sequence[NormalizedFieldElement] + one_ofs: Sequence[OneOfElement] + groups: Sequence[GroupElement] + + +class NormalizedEnumConstantElement(EnumConstantElement): + pass + + +class NormalizedEnumElement(EnumElement, NormalizedTypeElement): + constants: Sequence[NormalizedEnumConstantElement] + + +class NormalizedGroupElement(GroupElement): + fields: Sequence[NormalizedFieldElement] | None = None + + +class NormalizedProtobufSchema(ProtobufSchema): + proto_file_element: NormalizedProtoFileElement + + def __init__( + self, + schema: str, + references: Sequence[Reference] | None = None, + dependencies: Mapping[str, Dependency] | None = None, + proto_file_element: ProtoFileElement | None = None, + ) -> None: + super().__init__(schema, references, dependencies, proto_file_element) + self.proto_file_element = normalize(self.proto_file_element) + + +class NormalizedOneOfElement(OneOfElement): + fields: Sequence[NormalizedFieldElement] + groups: Sequence[NormalizedGroupElement] + + +def type_field_element_with_sorted_options(type_field: FieldElement) -> NormalizedFieldElement: + sorted_options = None if type_field.options is None else list(sorted(type_field.options, key=sort_by_name)) + return NormalizedFieldElement( + location=type_field.location, + label=type_field.label, + element_type=type_field.element_type, + name=type_field.name, + default_value=type_field.default_value, + json_name=type_field.json_name, + tag=type_field.tag, + documentation=type_field.documentation, + options=sorted_options, + ) + + +def enum_constant_element_with_sorted_options(enum_constant: EnumConstantElement) -> NormalizedEnumConstantElement: + sorted_options = None if enum_constant.options is None else list(sorted(enum_constant.options, key=sort_by_name)) + return NormalizedEnumConstantElement( + location=enum_constant.location, + name=enum_constant.name, + tag=enum_constant.tag, + documentation=enum_constant.documentation, + options=sorted_options, + ) + + +def enum_element_with_sorted_options(enum_element: EnumElement) -> NormalizedEnumElement: + sorted_options = None if enum_element.options is None else list(sorted(enum_element.options, key=sort_by_name)) + constants_with_sorted_options = ( + None + if enum_element.constants is None + else [enum_constant_element_with_sorted_options(constant) for constant in enum_element.constants] + ) + return NormalizedEnumElement( + location=enum_element.location, + name=enum_element.name, + documentation=enum_element.documentation, + options=sorted_options, + constants=constants_with_sorted_options, + ) + + +def groups_with_sorted_options(group: GroupElement) -> NormalizedGroupElement: + sorted_fields = ( + None if group.fields is None else [type_field_element_with_sorted_options(field) for field in group.fields] + ) + return NormalizedGroupElement( + label=group.label, + location=group.location, + name=group.name, + tag=group.tag, + documentation=group.documentation, + fields=sorted_fields, + ) + + +def one_ofs_with_sorted_options(one_ofs: OneOfElement) -> NormalizedOneOfElement: + sorted_options = None if one_ofs.options is None else list(sorted(one_ofs.options, key=sort_by_name)) + sorted_fields = [type_field_element_with_sorted_options(field) for field in one_ofs.fields] + sorted_groups = [groups_with_sorted_options(group) for group in one_ofs.groups] + + return NormalizedOneOfElement( + name=one_ofs.name, + documentation=one_ofs.documentation, + fields=sorted_fields, + groups=sorted_groups, + options=sorted_options, + ) + + +def message_element_with_sorted_options(message_element: MessageElement) -> NormalizedMessageElement: + sorted_options = None if message_element.options is None else list(sorted(message_element.options, key=sort_by_name)) + sorted_nested_types = [type_element_with_sorted_options(nested_type) for nested_type in message_element.nested_types] + sorted_fields = [type_field_element_with_sorted_options(field) for field in message_element.fields] + sorted_one_ofs = [one_ofs_with_sorted_options(one_of) for one_of in message_element.one_ofs] + + return NormalizedMessageElement( + location=message_element.location, + name=message_element.name, + documentation=message_element.documentation, + nested_types=sorted_nested_types, + options=sorted_options, + reserveds=message_element.reserveds, + fields=sorted_fields, + one_ofs=sorted_one_ofs, + extensions=message_element.extensions, + groups=message_element.groups, + ) + + +def type_element_with_sorted_options(type_element: TypeElement) -> NormalizedTypeElement: + sorted_nested_types: list[TypeElement] = [] + + for nested_type in type_element.nested_types: + if isinstance(nested_type, EnumElement): + sorted_nested_types.append(enum_element_with_sorted_options(nested_type)) + elif isinstance(nested_type, MessageElement): + sorted_nested_types.append(message_element_with_sorted_options(nested_type)) + else: + raise ValueError("Unknown type element") # tried with assert_never but it did not work + + # doing it here since the subtypes do not declare the nested_types property + type_element.nested_types = sorted_nested_types + + if isinstance(type_element, EnumElement): + return enum_element_with_sorted_options(type_element) + + if isinstance(type_element, MessageElement): + return message_element_with_sorted_options(type_element) + + raise ValueError("Unknown type element") # tried with assert_never but it did not work + + +def extends_element_with_sorted_options(extend_element: ExtendElement) -> NormalizedExtendElement: + sorted_fields = ( + None + if extend_element.fields is None + else [type_field_element_with_sorted_options(field) for field in extend_element.fields] + ) + return NormalizedExtendElement( + location=extend_element.location, + name=extend_element.name, + documentation=extend_element.documentation, + fields=sorted_fields, + ) + + +def rpc_element_with_sorted_options(rpc: RpcElement) -> NormalizedRpcElement: + sorted_options = None if rpc.options is None else list(sorted(rpc.options, key=sort_by_name)) + return NormalizedRpcElement( + location=rpc.location, + name=rpc.name, + documentation=rpc.documentation, + request_type=rpc.request_type, + response_type=rpc.response_type, + request_streaming=rpc.request_streaming, + response_streaming=rpc.response_streaming, + options=sorted_options, + ) + + +def service_element_with_sorted_options(service_element: ServiceElement) -> NormalizedServiceElement: + sorted_options = None if service_element.options is None else list(sorted(service_element.options, key=sort_by_name)) + sorted_rpc = ( + None if service_element.rpcs is None else [rpc_element_with_sorted_options(rpc) for rpc in service_element.rpcs] + ) + + return NormalizedServiceElement( + location=service_element.location, + name=service_element.name, + documentation=service_element.documentation, + rpcs=sorted_rpc, + options=sorted_options, + ) + + +def normalize(proto_file_element: ProtoFileElement) -> NormalizedProtoFileElement: + sorted_types: Sequence[NormalizedTypeElement] = [ + type_element_with_sorted_options(type_element) for type_element in proto_file_element.types + ] + sorted_options = list(sorted(proto_file_element.options, key=sort_by_name)) + sorted_services: Sequence[NormalizedServiceElement] = [ + service_element_with_sorted_options(service) for service in proto_file_element.services + ] + sorted_extend_declarations: Sequence[NormalizedExtendElement] = [ + extends_element_with_sorted_options(extend) for extend in proto_file_element.extend_declarations + ] + + return NormalizedProtoFileElement( + location=proto_file_element.location, + package_name=proto_file_element.package_name, + syntax=proto_file_element.syntax, + imports=proto_file_element.imports, + public_imports=proto_file_element.public_imports, + types=sorted_types, + services=sorted_services, + extend_declarations=sorted_extend_declarations, + options=sorted_options, + ) diff --git a/karapace/protobuf/serialization.py b/karapace/protobuf/serialization.py index 59943d21c..abc01247d 100644 --- a/karapace/protobuf/serialization.py +++ b/karapace/protobuf/serialization.py @@ -19,7 +19,7 @@ from karapace.protobuf.syntax import Syntax from karapace.protobuf.type_element import TypeElement from types import MappingProxyType -from typing import Any +from typing import Any, Sequence import base64 import google.protobuf.descriptor @@ -269,7 +269,7 @@ def _serialize_msgtype(t: MessageElement) -> google.protobuf.descriptor_pb2.Desc return d -def _serialize_options(options: list[OptionElement], result: google.protobuf.descriptor_pb2.FileOptions) -> None: +def _serialize_options(options: Sequence[OptionElement], result: google.protobuf.descriptor_pb2.FileOptions) -> None: for opt in options: if opt.name == ("java_package"): result.java_package = opt.value diff --git a/karapace/protobuf/service_element.py b/karapace/protobuf/service_element.py index 5ccc3bb6d..ed714c58c 100644 --- a/karapace/protobuf/service_element.py +++ b/karapace/protobuf/service_element.py @@ -11,6 +11,7 @@ from karapace.protobuf.option_element import OptionElement from karapace.protobuf.rpc_element import RpcElement from karapace.protobuf.utils import append_documentation, append_indented +from typing import Sequence @dataclass @@ -18,8 +19,8 @@ class ServiceElement: location: Location name: str documentation: str = "" - rpcs: list[RpcElement] | None = None - options: list[OptionElement] | None = None + rpcs: Sequence[RpcElement] | None = None + options: Sequence[OptionElement] | None = None def to_schema(self) -> str: result: list[str] = [] diff --git a/karapace/protobuf/type_element.py b/karapace/protobuf/type_element.py index c33753ed6..ec840a801 100644 --- a/karapace/protobuf/type_element.py +++ b/karapace/protobuf/type_element.py @@ -8,7 +8,7 @@ from dataclasses import dataclass from karapace.protobuf.location import Location -from typing import TYPE_CHECKING +from typing import Sequence, TYPE_CHECKING if TYPE_CHECKING: from karapace.protobuf.compare_result import CompareResult @@ -21,8 +21,8 @@ class TypeElement: location: Location name: str documentation: str - options: list[OptionElement] - nested_types: list[TypeElement] + options: Sequence[OptionElement] + nested_types: Sequence[TypeElement] def to_schema(self) -> str: """Convert the object to valid protobuf syntax. diff --git a/karapace/schema_models.py b/karapace/schema_models.py index 155dc7853..46e3832d5 100644 --- a/karapace/schema_models.py +++ b/karapace/schema_models.py @@ -19,6 +19,7 @@ ProtobufUnresolvedDependencyException, SchemaParseException as ProtobufSchemaParseException, ) +from karapace.protobuf.proto_normalizations import NormalizedProtobufSchema from karapace.protobuf.schema import ProtobufSchema from karapace.schema_references import Reference from karapace.schema_type import SchemaType @@ -62,6 +63,7 @@ def parse_protobuf_schema_definition( references: Sequence[Reference] | None = None, dependencies: Mapping[str, Dependency] | None = None, validate_references: bool = True, + normalize: bool = False, ) -> ProtobufSchema: """Parses and validates `schema_definition`. @@ -69,11 +71,16 @@ def parse_protobuf_schema_definition( ProtobufUnresolvedDependencyException if Protobuf dependency cannot be resolved. """ - protobuf_schema = ProtobufSchema(schema_definition, references, dependencies) + protobuf_schema = ( + ProtobufSchema(schema_definition, references, dependencies) + if not normalize + else NormalizedProtobufSchema(schema_definition, references, dependencies) + ) if validate_references: result = protobuf_schema.verify_schema_dependencies() if not result.result: raise ProtobufUnresolvedDependencyException(f"{result.message}") + return protobuf_schema @@ -168,6 +175,7 @@ def schema(self) -> Draft7Validator | AvroSchema | ProtobufSchema: validate_avro_enum_symbols=True, references=self.references, dependencies=self.dependencies, + normalize=False, ) return parsed_typed_schema.schema @@ -179,6 +187,7 @@ def parse( validate_avro_names: bool, references: Sequence[Reference] | None = None, dependencies: Mapping[str, Dependency] | None = None, + normalize: bool = False, ) -> ParsedTypedSchema: if schema_type not in [SchemaType.AVRO, SchemaType.JSONSCHEMA, SchemaType.PROTOBUF]: raise InvalidSchema(f"Unknown parser {schema_type} for {schema_str}") @@ -203,7 +212,9 @@ def parse( elif schema_type is SchemaType.PROTOBUF: try: - parsed_schema = parse_protobuf_schema_definition(schema_str, references, dependencies) + parsed_schema = parse_protobuf_schema_definition( + schema_str, references, dependencies, validate_references=True, normalize=normalize + ) except ( TypeError, SchemaError, @@ -270,6 +281,7 @@ def parse( schema_str: str, references: Sequence[Reference] | None = None, dependencies: Mapping[str, Dependency] | None = None, + normalize: bool = False, ) -> ParsedTypedSchema: return parse( schema_type=schema_type, @@ -278,6 +290,7 @@ def parse( validate_avro_names=False, references=references, dependencies=dependencies, + normalize=normalize, ) def __str__(self) -> str: @@ -352,6 +365,7 @@ def parse( schema_str: str, references: Sequence[Reference] | None = None, dependencies: Mapping[str, Dependency] | None = None, + normalize: bool = False, ) -> ValidatedTypedSchema: parsed_schema = parse( schema_type=schema_type, @@ -360,6 +374,7 @@ def parse( validate_avro_names=True, references=references, dependencies=dependencies, + normalize=normalize, ) return cast(ValidatedTypedSchema, parsed_schema) diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index 0fdcd2f16..70aaa3a77 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -517,6 +517,7 @@ def _handle_msg_schema(self, key: dict, value: dict | None) -> None: resolved_references, resolved_dependencies, validate_references=False, + normalize=False, ) schema_str = str(parsed_schema) except InvalidSchema: diff --git a/karapace/schema_registry_apis.py b/karapace/schema_registry_apis.py index 9c04cbf0b..cc8aa341f 100644 --- a/karapace/schema_registry_apis.py +++ b/karapace/schema_registry_apis.py @@ -1097,11 +1097,16 @@ async def subjects_schema_post( schema_type = self._validate_schema_type(content_type=content_type, data=body) references = self._validate_references(content_type, schema_type, body) references, new_schema_dependencies = self.schema_registry.resolve_references(references) + normalize = request.query.get("normalize", "false").lower() == "true" try: # When checking if schema is already registered, allow unvalidated schema in as # there might be stored schemas that are non-compliant from the past. new_schema = ParsedTypedSchema.parse( - schema_type=schema_type, schema_str=schema_str, references=references, dependencies=new_schema_dependencies + schema_type=schema_type, + schema_str=schema_str, + references=references, + dependencies=new_schema_dependencies, + normalize=normalize, ) except InvalidSchema: self.log.warning("Invalid schema: %r", schema_str) @@ -1133,6 +1138,7 @@ async def subjects_schema_post( schema_version.schema.schema_str, references=other_references, dependencies=other_dependencies, + normalize=normalize, ) except InvalidSchema as e: failed_schema_id = schema_version.schema_id @@ -1191,6 +1197,7 @@ async def subject_post( self._validate_schema_request_body(content_type, body) schema_type = self._validate_schema_type(content_type, body) self._validate_schema_key(content_type, body) + normalize = request.query.get("normalize", "false").lower() == "true" references = self._validate_references(content_type, schema_type, body) try: @@ -1200,6 +1207,7 @@ async def subject_post( schema_str=body["schema"], references=references, dependencies=resolved_dependencies, + normalize=normalize, ) except (InvalidReferences, InvalidSchema, InvalidSchemaType) as e: self.log.warning("Invalid schema: %r", body["schema"], exc_info=True) diff --git a/tests/integration/test_schema_protobuf.py b/tests/integration/test_schema_protobuf.py index 243d9b1fb..ede01737a 100644 --- a/tests/integration/test_schema_protobuf.py +++ b/tests/integration/test_schema_protobuf.py @@ -1270,3 +1270,81 @@ async def test_protobuf_update_ordering(registry_async_client: Client) -> None: assert res.status_code == 200 assert "id" in res.json() assert schema_id != res.json()["id"] + + +SCHEMA_WITH_OPTION_UNORDERDERED = """\ +syntax = "proto3"; +package tc4; + +option java_package = "com.example"; +option java_generate_equals_and_hash = true; +option java_string_check_utf8 = true; +option java_multiple_files = true; +option java_outer_classname = "FredProto"; +option java_generic_services = true; + +message Foo { + string code = 1; +} +""" + + +SCHEMA_WITH_OPTION_ORDERED = """\ +syntax = "proto3"; +package tc4; + +option java_generate_equals_and_hash = true; +option java_generic_services = true; +option java_multiple_files = true; +option java_outer_classname = "FredProto"; +option java_package = "com.example"; +option java_string_check_utf8 = true; + +message Foo { + string code = 1; +} +""" + + +async def test_registering_normalized_schema(registry_async_client: Client) -> None: + subject = create_subject_name_factory("test_protobuf_normalization")() + + body = {"schemaType": "PROTOBUF", "schema": SCHEMA_WITH_OPTION_ORDERED} + res = await registry_async_client.post(f"subjects/{subject}/versions?normalize=true", json=body) + + assert res.status_code == 200 + assert "id" in res.json() + original_schema_id = res.json()["id"] + + body = {"schemaType": "PROTOBUF", "schema": SCHEMA_WITH_OPTION_UNORDERDERED} + res = await registry_async_client.post(f"subjects/{subject}", json=body) + assert res.status_code == 404 + + res = await registry_async_client.post(f"subjects/{subject}?normalize=true", json=body) + + assert res.status_code == 200 + assert "id" in res.json() + assert original_schema_id == res.json()["id"] + + +async def test_normalized_schema_idempotence_produce_and_fetch(registry_async_client: Client) -> None: + subject = create_subject_name_factory("test_protobuf_normalization")() + + body = {"schemaType": "PROTOBUF", "schema": SCHEMA_WITH_OPTION_UNORDERDERED} + res = await registry_async_client.post(f"subjects/{subject}/versions?normalize=true", json=body) + + assert res.status_code == 200 + assert "id" in res.json() + original_schema_id = res.json()["id"] + + body = {"schemaType": "PROTOBUF", "schema": SCHEMA_WITH_OPTION_ORDERED} + res = await registry_async_client.post(f"subjects/{subject}/versions?normalize=true", json=body) + + assert res.status_code == 200 + assert "id" in res.json() + assert original_schema_id == res.json()["id"] + + res = await registry_async_client.get(f"/schemas/ids/{original_schema_id}") + assert res.status_code == 200 + assert "schema" in res.json() + assert res.json()["schema"] == SCHEMA_WITH_OPTION_ORDERED diff --git a/tests/unit/protobuf/test_protobuf_normalization.py b/tests/unit/protobuf/test_protobuf_normalization.py new file mode 100644 index 000000000..b772b293c --- /dev/null +++ b/tests/unit/protobuf/test_protobuf_normalization.py @@ -0,0 +1,520 @@ +""" +Copyright (c) 2024 Aiven Ltd +See LICENSE for details +""" +from karapace.protobuf.compare_result import CompareResult +from karapace.protobuf.location import Location +from karapace.protobuf.proto_normalizations import normalize +from karapace.protobuf.proto_parser import ProtoParser + +import pytest + +location: Location = Location("some/folder", "file.proto") + + +# this would be a good case for using a property based test with a well-formed message generator +# (hypothesis could work if the objects representing the language +# do always correspond to a valid proto schema, that for a correct parser should always be true) +# we should create a set of random valid DSLs and check that the normalization is giving back a sorted options list +# +# nb: we could use the protoc compiler as an oracle to check that the files are always valid protobuf schemas + +PROTO_WITH_OPTIONS_ORDERED = """\ +syntax = "proto3"; + +package pkg; + +option cc_generic_services = true; +option java_generate_equals_and_hash = true; +option java_generic_services = true; +option java_multiple_files = true; +option java_outer_classname = "FooProto"; +option java_package = "com.example.foo"; +option java_string_check_utf8 = true; +option optimize_for = SPEED; + +message Foo { + string fieldA = 1; +} +""" + +PROTO_WITH_OPTIONS_UNORDERED = """\ +syntax = "proto3"; + +package pkg; + +option java_package = "com.example.foo"; +option cc_generic_services = true; +option java_generate_equals_and_hash = true; +option java_generic_services = true; +option java_outer_classname = "FooProto"; +option optimize_for = SPEED; +option java_string_check_utf8 = true; +option java_multiple_files = true; + +message Foo { + string fieldA = 1; +} +""" + + +PROTO_WITH_OPTIONS_IN_ENUM_ORDERED = """\ +syntax = "proto3"; + +package pkg; + +enum MyEnum { + option (my_option) = "my_value"; + option (my_option2) = "my_value2"; + option (my_option3) = "my_value3"; +} +""" + +PROTO_WITH_OPTIONS_IN_ENUM_UNORDERED = """\ +syntax = "proto3"; + +package pkg; + +enum MyEnum { + option (my_option3) = "my_value3"; + option (my_option) = "my_value"; + option (my_option2) = "my_value2"; +} +""" + +PROTO_WITH_OPTIONS_IN_SERVICE_ORDERED = """\ +syntax = "proto3"; + +package pkg; + +service MyService { + option (my_option) = "my_value"; + option (my_option2) = "my_value2"; + option (my_option3) = "my_value3"; +} +""" + +PROTO_WITH_OPTIONS_IN_SERVICE_UNORDERED = """\ +syntax = "proto3"; + +package pkg; + +service MyService { + option (my_option3) = "my_value3"; + option (my_option) = "my_value"; + option (my_option2) = "my_value2"; +} +""" + +PROTO_WITH_OPTIONS_IN_RPC_ORDERED = """\ +syntax = "proto3"; + +package pkg; + +service MyService { + rpc MyRpc (Foo) returns (Foo) { + option (my_option) = "my_value"; + option (my_option2) = "my_value2"; + option (my_option3) = "my_value3"; + } +} +""" + +PROTO_WITH_OPTIONS_IN_RPC_UNORDERED = """\ +syntax = "proto3"; + +package pkg; + +service MyService { + rpc MyRpc (Foo) returns (Foo) { + option (my_option3) = "my_value3"; + option (my_option) = "my_value"; + option (my_option2) = "my_value2"; + } +} +""" + +PROTO_WITH_OPTIONS_IN_EXTEND_ORDERED = """\ +syntax = "proto3"; + +package pkg; + +message Foo { + string fieldA = 1; +} + +extend Foo { + option (my_option) = "my_value"; + option (my_option2) = "my_value2"; + option (my_option3) = "my_value3"; +} +""" + +PROTO_WITH_OPTIONS_IN_EXTEND_UNORDERED = """\ +syntax = "proto3"; + +package pkg; + +message Foo { + string fieldA = 1; +} + +extend Foo { + option (my_option3) = "my_value3"; + option (my_option) = "my_value"; + option (my_option2) = "my_value2"; +} +""" + +PROTO_WITH_OPTIONS_IN_ONEOF_ORDERED = """\ +syntax = "proto3"; + +package pkg; + +message Foo { + oneof my_oneof { + option (my_option) = "my_value"; + option (my_option2) = "my_value2"; + option (my_option3) = "my_value3"; + } +} +""" + +PROTO_WITH_OPTIONS_IN_ONEOF_UNORDERED = """\ +syntax = "proto3"; + +package pkg; + +message Foo { + oneof my_oneof { + option (my_option3) = "my_value3"; + option (my_option) = "my_value"; + option (my_option2) = "my_value2"; + } +} +""" + +PROTO_WITH_OPTIONS_IN_ENUM_CONSTANTS_ORDERED = """\ +syntax = "proto3"; + +package pkg; + +enum MyEnum { + MY_ENUM_CONSTANT = 0 [(my_option) = "my_value", (my_option2) = "my_value2", (my_option3) = "my_value3"]; +} +""" + +PROTO_WITH_OPTIONS_IN_ENUM_CONSTANTS_UNORDERED = """\ +syntax = "proto3"; + +package pkg; + +enum MyEnum { + MY_ENUM_CONSTANT = 0 [(my_option3) = "my_value3", (my_option) = "my_value", (my_option2) = "my_value2"]; +} +""" + +PROTO_WITH_OPTIONS_IN_FIELD_OF_MESSAGE_ORDERED = """\ +syntax = "proto3"; + +package pkg; + +message Foo { + string fieldA = 1 [(my_option) = "my_value", (my_option2) = "my_value2", (my_option3) = "my_value3"]; +} +""" + +PROTO_WITH_OPTIONS_IN_FIELD_OF_MESSAGE_UNORDERED = """\ +syntax = "proto3"; + +package pkg; + +message Foo { + string fieldA = 1 [(my_option3) = "my_value3", (my_option) = "my_value", (my_option2) = "my_value2"]; +} +""" + +PROTO_WITH_NEASTED_ENUM_IN_MESSAGE_WITH_OPTIONS_ORDERED = """\ +syntax = "proto3"; + +package pkg; + +message Foo { + enum MyEnum { + option (my_option) = "my_value"; + option (my_option2) = "my_value2"; + option (my_option3) = "my_value3"; + } +} +""" + +PROTO_WITH_NEASTED_ENUM_IN_MESSAGE_WITH_OPTIONS_UNORDERED = """\ +syntax = "proto3"; + +package pkg; + +message Foo { + enum MyEnum { + option (my_option3) = "my_value3"; + option (my_option) = "my_value"; + option (my_option2) = "my_value2"; + } +} +""" + +PROTO_WITH_OPTIONS_IN_FIELD_OF_MESSAGE_WITH_OPTIONS_ORDERED = """\ +syntax = "proto3"; + +package pkg; + +message Foo { + message Bar { + string fieldA = 1 [(my_option) = "my_value", (my_option2) = "my_value2", (my_option3) = "my_value3"]; + } +} +""" + +PROTO_WITH_OPTIONS_IN_FIELD_OF_MESSAGE_WITH_OPTIONS_UNORDERED = """\ +syntax = "proto3"; + +package pkg; + +message Foo { + message Bar { + string fieldA = 1 [(my_option3) = "my_value3", (my_option) = "my_value", (my_option2) = "my_value2"]; + } +} +""" + + +PROTO_WITH_OPTIONS_IN_FIELD_OF_ENUM_ORDERED = """\ +syntax = "proto3"; + +package pkg; + +enum MyEnum { + MY_ENUM_CONSTANT = 0; +} + +message Foo { + MyEnum fieldA = 1 [(my_option) = "my_value", (my_option2) = "my_value2", (my_option3) = "my_value3"]; +} +""" + +PROTO_WITH_OPTIONS_IN_FIELD_OF_ENUM_UNORDERED = """\ +syntax = "proto3"; + +package pkg; + +enum MyEnum { + MY_ENUM_CONSTANT = 0; +} + +message Foo { + MyEnum fieldA = 1 [(my_option3) = "my_value3", (my_option) = "my_value", (my_option2) = "my_value2"]; +} +""" + +PROTO_WITH_OPTIONS_IN_FIELD_OF_ENUM_WITH_OPTIONS_ORDERED = """\ +syntax = "proto3"; + +package pkg; + +enum MyEnum { + MY_ENUM_CONSTANT = 0 [(my_option) = "my_value", (my_option2) = "my_value2", (my_option3) = "my_value3"]; +} + +message Foo { + MyEnum fieldA = 1; +} +""" + +PROTO_WITH_OPTIONS_IN_FIELD_OF_ENUM_WITH_OPTIONS_UNORDERED = """\ +syntax = "proto3"; + +package pkg; + +enum MyEnum { + MY_ENUM_CONSTANT = 0 [(my_option3) = "my_value3", (my_option) = "my_value", (my_option2) = "my_value2"]; +} + +message Foo { + MyEnum fieldA = 1; +} +""" + +PROTO_WITH_COMPLEX_SCHEMA_ORDERED = """\ +syntax = "proto3"; + +package pkg; + +option cc_generic_services = true; +option java_generate_equals_and_hash = true; +option java_generic_services = true; +option java_multiple_files = true; +option java_outer_classname = "FooProto"; +option java_package = "com.example.foo"; +option java_string_check_utf8 = true; +option optimize_for = SPEED; + +message Foo { + string fieldA = 1; + + string fieldB = 2; + + string fieldC = 3; + + string fieldX = 4; + + message NestedFoo { + string fieldA = 1; + option (my_option) = "my_value"; + option (my_option2) = "my_value2"; + } + + + option (my_option3) = "my_value3"; + option (my_option2) = "my_value2"; + option (my_option) = "my_value"; + + + oneof my_oneof { + option (my_option3) = "my_value3"; + option (my_option2) = "my_value2"; + option (my_option) = "my_value"; + } + + + enum MyEnum { + option (my_option3) = "my_value3"; + option (my_option2) = "my_value2"; + option (my_option) = "my_value"; + } +} + + +extend Foo { + option (my_option3) = "my_value3"; + option (my_option2) = "my_value2"; + option (my_option) = "my_value"; +} + +service MyService { + option (my_option3) = "my_value3"; + option (my_option2) = "my_value2"; + option (my_option) = "my_value"; + + + rpc MyRpc (Foo) returns (Foo) { + option (my_option) = "my_value"; + option (my_option2) = "my_value2"; + option (my_option3) = "my_value3"; + } +} + + +""" + +PROTO_WITH_COMPLEX_SCHEMA_UNORDERED = """\ +syntax = "proto3"; + +package pkg; + +option cc_generic_services = true; +option java_outer_classname = "FooProto"; +option optimize_for = SPEED; +option java_string_check_utf8 = true; +option java_generate_equals_and_hash = true; +option java_generic_services = true; +option java_multiple_files = true; +option java_package = "com.example.foo"; + +message Foo { + string fieldA = 1; + + string fieldB = 2; + + string fieldC = 3; + + string fieldX = 4; + + message NestedFoo { + string fieldA = 1; + option (my_option2) = "my_value2"; + option (my_option) = "my_value"; + } + + option (my_option2) = "my_value2"; + option (my_option3) = "my_value3"; + option (my_option) = "my_value"; + + oneof my_oneof { + option (my_option) = "my_value"; + option (my_option3) = "my_value3"; + option (my_option2) = "my_value2"; + } + + + enum MyEnum { + option (my_option) = "my_value"; + option (my_option3) = "my_value3"; + option (my_option2) = "my_value2"; + } +} + + +extend Foo { + option (my_option2) = "my_value2"; + option (my_option3) = "my_value3"; + option (my_option) = "my_value"; +} + + +service MyService { + option (my_option2) = "my_value2"; + option (my_option) = "my_value"; + option (my_option3) = "my_value3"; + + rpc MyRpc (Foo) returns (Foo) { + option (my_option2) = "my_value2"; + option (my_option3) = "my_value3"; + option (my_option) = "my_value"; + } +} + + +""" + + +@pytest.mark.parametrize( + ("ordered_schema", "unordered_schema"), + ( + (PROTO_WITH_OPTIONS_ORDERED, PROTO_WITH_OPTIONS_UNORDERED), + (PROTO_WITH_OPTIONS_IN_ENUM_ORDERED, PROTO_WITH_OPTIONS_IN_ENUM_UNORDERED), + (PROTO_WITH_OPTIONS_IN_SERVICE_ORDERED, PROTO_WITH_OPTIONS_IN_SERVICE_UNORDERED), + (PROTO_WITH_OPTIONS_IN_RPC_ORDERED, PROTO_WITH_OPTIONS_IN_RPC_UNORDERED), + (PROTO_WITH_OPTIONS_IN_EXTEND_ORDERED, PROTO_WITH_OPTIONS_IN_EXTEND_UNORDERED), + (PROTO_WITH_OPTIONS_IN_ONEOF_ORDERED, PROTO_WITH_OPTIONS_IN_ONEOF_UNORDERED), + (PROTO_WITH_OPTIONS_IN_ENUM_CONSTANTS_ORDERED, PROTO_WITH_OPTIONS_IN_ENUM_CONSTANTS_UNORDERED), + (PROTO_WITH_OPTIONS_IN_FIELD_OF_MESSAGE_ORDERED, PROTO_WITH_OPTIONS_IN_FIELD_OF_MESSAGE_UNORDERED), + (PROTO_WITH_NEASTED_ENUM_IN_MESSAGE_WITH_OPTIONS_ORDERED, PROTO_WITH_NEASTED_ENUM_IN_MESSAGE_WITH_OPTIONS_UNORDERED), + ( + PROTO_WITH_OPTIONS_IN_FIELD_OF_MESSAGE_WITH_OPTIONS_ORDERED, + PROTO_WITH_OPTIONS_IN_FIELD_OF_MESSAGE_WITH_OPTIONS_UNORDERED, + ), + (PROTO_WITH_OPTIONS_IN_FIELD_OF_ENUM_ORDERED, PROTO_WITH_OPTIONS_IN_FIELD_OF_ENUM_UNORDERED), + ( + PROTO_WITH_OPTIONS_IN_FIELD_OF_ENUM_WITH_OPTIONS_ORDERED, + PROTO_WITH_OPTIONS_IN_FIELD_OF_ENUM_WITH_OPTIONS_UNORDERED, + ), + (PROTO_WITH_COMPLEX_SCHEMA_ORDERED, PROTO_WITH_COMPLEX_SCHEMA_UNORDERED), + ), +) +def test_differently_ordered_options_normalizes_equally(ordered_schema: str, unordered_schema: str) -> None: + ordered_proto = ProtoParser.parse(location, ordered_schema) + unordered_proto = ProtoParser.parse(location, unordered_schema) + + result = CompareResult() + normalize(ordered_proto).compare(normalize(unordered_proto), result) + assert result.is_compatible() + assert normalize(ordered_proto).to_schema() == normalize(unordered_proto).to_schema()