Skip to content

Commit

Permalink
dt/sr_test: Test for post_compat messages
Browse files Browse the repository at this point in the history
  • Loading branch information
oleiman committed May 9, 2024
1 parent f48f915 commit 2f889eb
Showing 1 changed file with 194 additions and 3 deletions.
197 changes: 194 additions & 3 deletions tests/rptest/tests/schema_registry_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from typing import Optional
import uuid
import re
import urllib.parse
import requests
import time
import random
Expand Down Expand Up @@ -79,6 +80,7 @@ def get_subject_name(sns: str, topic: str, field: MessageField,
schema2_def = '{"type":"record","name":"myrecord","fields":[{"name":"f1","type":["null","string"]},{"name":"f2","type":"string","default":"foo"}]}'
# Schema 3 is not backwards compatible
schema3_def = '{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"},{"name":"f2","type":"string"}]}'

invalid_avro = '{"type":"notatype","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'

simple_proto_def = """
Expand All @@ -97,6 +99,95 @@ def get_subject_name(sns: str, topic: str, field: MessageField,
Simple id = 1;
}"""

validation_schemas = dict(
proto3="""
syntax = "proto3";
message myrecord {
message Msg1 {
int32 f1 = 1;
}
Msg1 m1 = 1;
Msg1 m2 = 2;
}
""",
proto3_incompat="""
syntax = "proto3";
message myrecord {
// MESSAGE_REMOVED
message Msg1d {
int32 f1 = 1;
}
// FIELD_NAMED_TYPE_CHANGED
Msg1d m1 = 1;
}
""",
proto2="""
syntax = "proto2";
message myrecord {
message Msg1 {
required int32 f1 = 1;
}
required Msg1 m1 = 1;
required Msg1 m2 = 2;
}
""",
proto2_incompat="""
syntax = "proto2";
message myrecord {
// MESSAGE_REMOVED
message Msg1d {
required int32 f1 = 1;
}
// FIELD_NAMED_TYPE_CHANGED
required Msg1d m1 = 1;
}
""",
avro="""
{
"type": "record",
"name": "myrecord",
"fields": [
{
"name": "f1",
"type": "string"
},
{
"name": "enumF",
"type": {
"name": "ABorC",
"type": "enum",
"symbols": ["a", "b", "c"]
}
}
]
}
""",
avro_incompat="""
{
"type": "record",
"name": "myrecord",
"fields": [
{
"name": "f1",
"type": "int"
},
{
"name": "enumF",
"type": {
"name": "ABorC",
"type": "enum",
"symbols": ["a"]
}
}
]
}
""",
)

log_config = LoggingConfig('info',
logger_levels={
'security': 'trace',
Expand Down Expand Up @@ -438,10 +529,15 @@ def _post_compatibility_subject_version(self,
version,
data,
headers=HTTP_POST_HEADERS,
verbose: bool | None = None,
**kwargs):
params = ''
if verbose is not None:
params = f"?{urllib.parse.urlencode({'verbose': str(verbose).lower()})}"

return self._request(
"POST",
f"compatibility/subjects/{subject}/versions/{version}",
f"compatibility/subjects/{subject}/versions/{version}{params}",
headers=headers,
data=data,
**kwargs)
Expand Down Expand Up @@ -974,6 +1070,7 @@ def test_post_compatibility_subject_version(self):
subject=f"{topic}-key", version=1, data=schema_2_data)
assert result_raw.status_code == requests.codes.ok
assert result_raw.json()["is_compatible"] == True
assert len(result_raw.json()["messages"]) == 0

self.logger.debug("Set subject config - BACKWARD")
result_raw = self._set_config_subject(
Expand All @@ -986,14 +1083,32 @@ def test_post_compatibility_subject_version(self):
subject=f"{topic}-key", version=1, data=schema_2_data)
assert result_raw.status_code == requests.codes.ok
assert result_raw.json()["is_compatible"] == True
assert len(result_raw.json()["messages"]) == 0

self.logger.debug("Check compatibility backward, no default")
self.logger.debug("Check compatibility backward, no default, verbose")
result_raw = self._post_compatibility_subject_version(
subject=f"{topic}-key", version=1, data=schema_3_data)
subject=f"{topic}-key",
version=1,
data=schema_3_data,
verbose=True)
assert result_raw.status_code == requests.codes.ok
assert result_raw.json()["is_compatible"] == False

self.logger.debug(
"Check compatibility backward, no default, not verbose")
result_raw = self._post_compatibility_subject_version(
subject=f"{topic}-key",
version=1,
data=schema_3_data,
verbose=False)
assert result_raw.status_code == requests.codes.ok
assert result_raw.json()["is_compatible"] == False
assert not result_raw.json().get(
"messages",
None), f"Expected no messages, got {result_raw.json()['messages']}"

self.logger.debug("Posting incompatible schema 3 as a subject key")

result_raw = self._post_subjects_subject_versions(
subject=f"{topic}-key", data=schema_3_data)
assert result_raw.status_code == requests.codes.conflict
Expand Down Expand Up @@ -1023,6 +1138,79 @@ def test_post_compatibility_subject_version(self):
assert result_raw.status_code == requests.codes.ok
assert result_raw.json()["id"] == v1_id

p3_errs: list[tuple] = [
("MESSAGE_REMOVED", "#/myrecord/Msg1"),
("FIELD_NAMED_TYPE_CHANGED", "#/myrecord/1"),
]
p2_errs: list[tuple] = p3_errs + [
("REQUIRED_FIELD_REMOVED", "#/myrecord/2"),
]
av_errs: list[tuple] = [
("TYPE_MISMATCH", "/fields/0/type"),
("MISSING_ENUM_SYMBOLS", "/fields/1/type/symbols"),
]
EXPECTED_INCOMPATIBILITIES: dict[str, list[tuple]] = {
"proto3_incompat": p3_errs,
"proto2_incompat": p2_errs,
"avro_incompat": av_errs,
}

@cluster(num_nodes=3)
@parametrize(schemas=("avro", "avro_incompat", "AVRO"))
@parametrize(schemas=("proto3", "proto3_incompat", "PROTOBUF"))
@parametrize(schemas=("proto2", "proto2_incompat", "PROTOBUF"))
def test_compatibility_messages(self, schemas):
"""
Verify compatibility messages
"""

topic = create_topic_names(1)[0]

self.logger.debug(f"Register a schema against a subject")
schema_data = json.dumps({
"schema": validation_schemas[schemas[0]],
"schemaType": schemas[2],
})
incompatible_data = json.dumps({
"schema": validation_schemas[schemas[1]],
"schemaType": schemas[2],
})

super_username, super_password, _ = self.redpanda.SUPERUSER_CREDENTIALS

self.logger.debug("Posting schema as a subject key")
result_raw = self._post_subjects_subject_versions(
subject=f"{topic}-key", data=schema_data)
self.logger.debug(result_raw)
assert result_raw.status_code == requests.codes.ok
v1_id = result_raw.json()["id"]

self.logger.debug("Set subject config - BACKWARD")
result_raw = self._set_config_subject(
subject=f"{topic}-key",
data=json.dumps({"compatibility": "BACKWARD"}))
assert result_raw.status_code == requests.codes.ok

self.logger.debug("Check compatibility full")
result_raw = self._post_compatibility_subject_version(
subject=f"{topic}-key",
version=1,
data=incompatible_data,
verbose=True)

assert result_raw.status_code == requests.codes.ok
assert result_raw.json()["is_compatible"] == False
msgs = result_raw.json()["messages"]
for message in ["oldSchemaVersion", "oldSchema", "compatibility"]:
assert any(
message in m for m in msgs
), f"Expected to find an instance of '{message}', got {msgs}"

expected_errs = self.EXPECTED_INCOMPATIBILITIES[schemas[1]]
for e in expected_errs:
assert any(e[0] in m and e[1] in m
for m in msgs), f"Expected {e} in messages, got {msgs}"

@cluster(num_nodes=3)
def test_delete_subject(self):
"""
Expand Down Expand Up @@ -2054,6 +2242,9 @@ def test_post_compatibility_subject_version(self):
auth=(super_username, super_password))
assert result_raw.status_code == requests.codes.ok
assert result_raw.json()["is_compatible"] == True
assert len(result_raw.json()["messages"]) == 0

result_raw = self._

@cluster(num_nodes=3)
def test_delete_subject(self):
Expand Down

0 comments on commit 2f889eb

Please sign in to comment.