Skip to content

Commit

Permalink
dt/sr_test: Test for post_compat messages
Browse files Browse the repository at this point in the history
Signed-off-by: Oren Leiman <[email protected]>
  • Loading branch information
oleiman authored and pgellert committed Aug 15, 2024
1 parent b4ad37a commit 364ad1d
Showing 1 changed file with 168 additions and 2 deletions.
170 changes: 168 additions & 2 deletions tests/rptest/tests/schema_registry_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from typing import Literal, NamedTuple, Optional
import uuid
import re
import urllib.parse
import requests
import time
import random
Expand Down Expand Up @@ -111,6 +112,95 @@ def get_subject_name(sns: str, topic: str, field: MessageField,

json_number_schema_def = '{"type":"number"}'

validation_schemas = dict(
proto3="""
syntax = "proto3";
message myrecord {
message Msg1 {
int32 f1 = 1;
}
Msg1 m1 = 1;
Msg1 m2 = 2;
}
""",
proto3_incompat="""
syntax = "proto3";
message myrecord {
// MESSAGE_REMOVED
message Msg1d {
int32 f1 = 1;
}
// FIELD_NAMED_TYPE_CHANGED
Msg1d m1 = 1;
}
""",
proto2="""
syntax = "proto2";
message myrecord {
message Msg1 {
required int32 f1 = 1;
}
required Msg1 m1 = 1;
required Msg1 m2 = 2;
}
""",
proto2_incompat="""
syntax = "proto2";
message myrecord {
// MESSAGE_REMOVED
message Msg1d {
required int32 f1 = 1;
}
// FIELD_NAMED_TYPE_CHANGED
required Msg1d m1 = 1;
}
""",
avro="""
{
"type": "record",
"name": "myrecord",
"fields": [
{
"name": "f1",
"type": "string"
},
{
"name": "enumF",
"type": {
"name": "ABorC",
"type": "enum",
"symbols": ["a", "b", "c"]
}
}
]
}
""",
avro_incompat="""
{
"type": "record",
"name": "myrecord",
"fields": [
{
"name": "f1",
"type": "int"
},
{
"name": "enumF",
"type": {
"name": "ABorC",
"type": "enum",
"symbols": ["a"]
}
}
]
}
""",
)

log_config = LoggingConfig('info',
logger_levels={
'security': 'trace',
Expand Down Expand Up @@ -681,10 +771,16 @@ def _post_compatibility_subject_version(self,
version,
data,
headers=HTTP_POST_HEADERS,
verbose: bool | None = None,
**kwargs):
params = {}
if verbose is not None:
params['verbose'] = verbose

return self._request(
"POST",
f"compatibility/subjects/{subject}/versions/{version}",
params=params,
headers=headers,
data=data,
**kwargs)
Expand Down Expand Up @@ -1314,6 +1410,7 @@ def test_post_compatibility_subject_version(self,
subject=f"{topic}-key", version=1, data=schema_2_data)
assert result_raw.status_code == requests.codes.ok
assert result_raw.json()["is_compatible"] == True
assert len(result_raw.json()["messages"]) == 0

self.logger.debug("Set subject config - BACKWARD")
result_raw = self._set_config_subject(
Expand All @@ -1326,14 +1423,32 @@ def test_post_compatibility_subject_version(self,
subject=f"{topic}-key", version=1, data=schema_2_data)
assert result_raw.status_code == requests.codes.ok
assert result_raw.json()["is_compatible"] == True
assert len(result_raw.json()["messages"]) == 0

self.logger.debug("Check compatibility backward, no default, verbose")
result_raw = self._post_compatibility_subject_version(
subject=f"{topic}-key",
version=1,
data=schema_3_data,
verbose=True)
assert result_raw.status_code == requests.codes.ok
assert result_raw.json()["is_compatible"] == False

self.logger.debug("Check compatibility backward, no default")
self.logger.debug(
"Check compatibility backward, no default, not verbose")
result_raw = self._post_compatibility_subject_version(
subject=f"{topic}-key", version=1, data=schema_3_data)
subject=f"{topic}-key",
version=1,
data=schema_3_data,
verbose=False)
assert result_raw.status_code == requests.codes.ok
assert result_raw.json()["is_compatible"] == False
assert not result_raw.json().get(
"messages",
None), f"Expected no messages, got {result_raw.json()['messages']}"

self.logger.debug("Posting incompatible schema 3 as a subject key")

result_raw = self._post_subjects_subject_versions(
subject=f"{topic}-key", data=schema_3_data)
assert result_raw.status_code == requests.codes.conflict
Expand Down Expand Up @@ -1374,6 +1489,57 @@ def test_post_compatibility_subject_version(self,
assert result_raw.status_code == requests.codes.ok
assert result_raw.json()["id"] == v1_id

@cluster(num_nodes=3)
@parametrize(schemas=("avro", "avro_incompat", "AVRO"))
@parametrize(schemas=("proto3", "proto3_incompat", "PROTOBUF"))
@parametrize(schemas=("proto2", "proto2_incompat", "PROTOBUF"))
def test_compatibility_messages(self, schemas):
"""
Verify compatibility messages
"""

topic = create_topic_names(1)[0]

self.logger.debug(f"Register a schema against a subject")
schema_data = json.dumps({
"schema": validation_schemas[schemas[0]],
"schemaType": schemas[2],
})
incompatible_data = json.dumps({
"schema": validation_schemas[schemas[1]],
"schemaType": schemas[2],
})

super_username, super_password, _ = self.redpanda.SUPERUSER_CREDENTIALS

self.logger.debug("Posting schema as a subject key")
result_raw = self._post_subjects_subject_versions(
subject=f"{topic}-key", data=schema_data)
self.logger.debug(result_raw)
assert result_raw.status_code == requests.codes.ok
v1_id = result_raw.json()["id"]

self.logger.debug("Set subject config - BACKWARD")
result_raw = self._set_config_subject(
subject=f"{topic}-key",
data=json.dumps({"compatibility": "BACKWARD"}))
assert result_raw.status_code == requests.codes.ok

self.logger.debug("Check compatibility full")
result_raw = self._post_compatibility_subject_version(
subject=f"{topic}-key",
version=1,
data=incompatible_data,
verbose=True)

assert result_raw.status_code == requests.codes.ok
assert result_raw.json()["is_compatible"] == False
msgs = result_raw.json()["messages"]
for message in ["oldSchemaVersion", "oldSchema", "compatibility"]:
assert any(
message in m for m in msgs
), f"Expected to find an instance of '{message}', got {msgs}"

@cluster(num_nodes=3)
def test_delete_subject(self):
"""
Expand Down

0 comments on commit 364ad1d

Please sign in to comment.