Skip to content

Commit

Permalink
fix AVRO references issues/ add more test code
Browse files Browse the repository at this point in the history
  • Loading branch information
libretto committed May 21, 2024
1 parent d77ef9a commit d934d9c
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 4 deletions.
11 changes: 8 additions & 3 deletions karapace/schema_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,17 +181,22 @@ def schema(self) -> Draft7Validator | AvroSchema | ProtobufSchema:
return parsed_typed_schema.schema


def avro_schema_merge(schema_str: str, dependencies: Mapping[str, Dependency]) -> str:
def avro_schema_merge_builder(schema_str: str, dependencies: Mapping[str, Dependency]) -> str:
"""To support references in AVRO we recursively merge all referenced schemas with current schema"""
if dependencies:
merged_schema = ""
for dependency in dependencies.values():
merged_schema += avro_schema_merge(dependency.schema.schema_str, dependency.schema.dependencies) + ",\n"
merged_schema += avro_schema_merge_builder(dependency.schema.schema_str,
dependency.schema.dependencies) + ",\n"
merged_schema += schema_str
return "[\n" + merged_schema + "\n]"
return merged_schema
return schema_str


def avro_schema_merge(schema_str: str, dependencies: Mapping[str, Dependency]) -> str:
return "[\n" + avro_schema_merge_builder(schema_str, dependencies) + "\n]"


def parse(
schema_type: SchemaType,
schema_str: str,
Expand Down
130 changes: 129 additions & 1 deletion tests/integration/test_schema_avro_references.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,30 @@ async def test_avro_references(registry_async_client: Client) -> None:
],
}

schema_job = {
"type": "record",
"name": "Job",
"namespace": "com.netapp",
"fields": [{"name": "title", "type": "string"}, {"name": "salary", "type": "double"}],
}

schema_person = {
"type": "record",
"name": "Person",
"namespace": "com.netapp",
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": "int"},
{"name": "address", "type": "Address"},
{"name": "job", "type": "Job"},
],
}

res = await registry_async_client.post("subjects/country/versions", json={"schema": json.dumps(schema_country)})
assert res.status_code == 200
assert "id" in res.json()
country_references = [{"name": "country.proto", "subject": "country", "version": 1}]

country_references = [{"name": "country.avsc", "subject": "country", "version": 1}]
res = await registry_async_client.post(
"subjects/address/versions",
json={"schemaType": "AVRO", "schema": json.dumps(schema_address), "references": country_references},
Expand All @@ -56,3 +75,112 @@ async def test_avro_references(registry_async_client: Client) -> None:
assert address_id == res.json()["id"]
assert "version" in res.json()
assert "schema" in res.json()

res = await registry_async_client.post("subjects/job/versions", json={"schema": json.dumps(schema_job)})
assert res.status_code == 200
assert "id" in res.json()

two_references = [
{"name": "address.avsc", "subject": "address", "version": 1},
{"name": "job.avsc", "subject": "job", "version": 1},
]

res = await registry_async_client.post(
"subjects/person/versions",
json={"schemaType": "AVRO", "schema": json.dumps(schema_person), "references": two_references},
)
assert res.status_code == 200
assert "id" in res.json()

result = {"id": 4,
"references": [
{"name": "address.avsc",
"subject": "address",
"version": 1},
{"name": "job.avsc",
"subject": "job",
"version": 1}],
"schema":
json.dumps({
"fields": [
{"name": "name",
"type": "string"},
{"name": "age",
"type": "int"},
{"name": "address",
"type": "Address"},
{"name": "job",
"type": "Job"}],
"name": "Person",
"namespace": "com.netapp",
"type": "record"
}, separators=(',', ':')),
"subject": "person",
"version": 1}

res = await registry_async_client.get("subjects/person/versions/latest")
assert res.status_code == 200
a = res.json()
assert res.json() == result

schema_person["fields"] = [
{"name": "name", "type": "string"},
{"name": "age", "type": "long"},
{"name": "address", "type": "Address"},
{"name": "job", "type": "Job"},
]

res = await registry_async_client.post(
"compatibility/subjects/person/versions/latest",
json={"schemaType": "AVRO", "schema": json.dumps(schema_person), "references": two_references},
)
assert res.status_code == 200
assert res.json() == {"is_compatible": True}

schema_person["fields"] = [
{"name": "name", "type": "string"},
{"name": "age", "type": "string"},
{"name": "address", "type": "Address"},
{"name": "job", "type": "Job"},
]
res = await registry_async_client.post(
"compatibility/subjects/person/versions/latest",
json={"schemaType": "AVRO", "schema": json.dumps(schema_person), "references": two_references},
)

assert res.status_code == 200
assert res.json() == {"is_compatible": False}

schema_union = {
"type": "record",
"namespace": "com.netapp",
"name": "Person2",
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": "int"},
{"name": "address", "type": "Address"},
{"name": "job", "type": "Job"},
{
"name": "children",
"type": [
"null",
{
"type": "record",
"name": "child",
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": "int"}
]
}
]

}
]
}

res = await registry_async_client.post(
"subjects/person2/versions",
json={"schemaType": "AVRO", "schema": json.dumps(schema_union), "references": two_references},
)
assert res.status_code == 200
assert "id" in res.json()

0 comments on commit d934d9c

Please sign in to comment.