Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sr/avro: fix missing default compat check for null #23839

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions MODULE.bazel.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions bazel/repositories.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ def data_dependency():
http_archive(
name = "avro",
build_file = "//bazel/thirdparty:avro.BUILD",
sha256 = "f1a7d13b28ce5cc8812f26c705a6ea27b8bc63554d82d556c63b437da0338cf1",
strip_prefix = "avro-e54bf712fce903652f3eab7a6c16264ac5d17285",
url = "https://github.com/redpanda-data/avro/archive/e54bf712fce903652f3eab7a6c16264ac5d17285.tar.gz",
sha256 = "989171347c4fedb74e2758eaaf234f40aa065d25e738b7bb0f55f82befb9322c",
strip_prefix = "avro-2e73edda63cdaf4611e5fe19d4bcf46e8e3c7d27",
url = "https://github.com/redpanda-data/avro/archive/2e73edda63cdaf4611e5fe19d4bcf46e8e3c7d27.tar.gz",
patches = ["//bazel/thirdparty:avro-snappy-includes.patch"],
patch_args = ["-p1"],
)
Expand Down
6 changes: 4 additions & 2 deletions src/v/iceberg/schema_avro.cc
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ avro::Schema field_to_avro(const nested_field& field) {

avro::Schema
struct_type_to_avro(const struct_type& type, std::string_view name) {
static const std::optional<avro::GenericDatum> no_default = std::nullopt;
avro::RecordSchema avro_schema(std::string{name});
const auto& fields = type.fields;
for (const auto& field_ptr : fields) {
Expand All @@ -203,10 +204,11 @@ struct_type_to_avro(const struct_type& type, std::string_view name) {
field_to_avro(*field_ptr),
field_attrs(field_ptr->id()),
// Optional fields (unions types with null children) have
// defaults of GenericUnion(null), rather than no default.
// defaults of GenericUnion(null) with the same union type as the
// child_schema, rather than no default.
// NOTE: no other union types are allowed by Iceberg.
field_ptr->required
? avro::GenericDatum()
? no_default
: avro::GenericDatum(
child_schema.root(), avro::GenericUnion(child_schema.root())));
}
Expand Down
27 changes: 15 additions & 12 deletions src/v/pandaproxy/schema_registry/avro.cc
Original file line number Diff line number Diff line change
Expand Up @@ -116,18 +116,21 @@ avro_compatibility_result check_compatible(
*reader.leafAt(int(r_idx)),
*writer.leafAt(int(w_idx)),
fields_p / std::to_string(r_idx) / "type"));
} else if (
reader.defaultValueAt(int(r_idx)).type() == avro::AVRO_NULL) {
// if the reader's record schema has a field with no default
// value, and writer's schema does not have a field with the
// same name, an error is signalled.

// For union, the default must correspond to the first type.
// The default may be null.
} else {
// if the reader's record schema has a field with no
// default value, and writer's schema does not have a
// field with the same name, an error is signalled.

// For union, the default must correspond to the first
// type. The default may be null.
const auto& r_leaf = reader.leafAt(int(r_idx));
if (
r_leaf->type() != avro::Type::AVRO_UNION
|| r_leaf->leafAt(0)->type() != avro::Type::AVRO_NULL) {
const auto& r_def = reader.defaultValueAt(int(r_idx));

auto expected_type = (r_leaf->type()
== avro::Type::AVRO_UNION)
? r_leaf->leafAt(0)->type()
: r_leaf->type();
if (!r_def || r_def->type() != expected_type) {
compat_result.emplace<avro_incompatibility>(
fields_p / std::to_string(r_idx),
avro_incompatibility::Type::
Expand All @@ -140,7 +143,7 @@ avro_compatibility_result check_compatible(
// if the writer's symbol is not present in the reader's enum and
// the reader has a default value, then that value is used,
// otherwise an error is signalled.
if (reader.defaultValueAt(0).type() == avro::AVRO_NULL) {
if (!reader.defaultValueAt(0)) {
std::vector<std::string_view> missing;
for (size_t w_idx = 0; w_idx < writer.names(); ++w_idx) {
size_t r_idx{0};
Expand Down
140 changes: 118 additions & 22 deletions src/v/pandaproxy/schema_registry/test/compatibility_avro.cc
Original file line number Diff line number Diff line change
Expand Up @@ -325,9 +325,7 @@ SEASTAR_THREAD_TEST_CASE(test_avro_alias_resolution_stopgap) {

namespace {

const auto schema_old = pps::sanitize_avro_schema_definition(
{
R"({
const auto schema_old = R"({
"type": "record",
"name": "myrecord",
"fields": [
Expand Down Expand Up @@ -392,13 +390,9 @@ const auto schema_old = pps::sanitize_avro_schema_definition(
}
}
]
})",
pps::schema_type::avro})
.value();
})";

const auto schema_new = pps::sanitize_avro_schema_definition(
{
R"({
const auto schema_new = R"({
"type": "record",
"name": "myrecord",
"fields": [
Expand Down Expand Up @@ -464,9 +458,7 @@ const auto schema_new = pps::sanitize_avro_schema_definition(
}
}
]
})",
pps::schema_type::avro})
.value();
})";

using incompatibility = pps::avro_incompatibility;

Expand Down Expand Up @@ -534,16 +526,107 @@ const absl::flat_hash_set<incompatibility> backward_expected{
"expected: someEnum1 (alias resolution is not yet fully supported)"},
};

const auto compat_data = std::to_array<compat_test_data<incompatibility>>({
struct compat_test_case {
std::string reader;
std::string writer;
absl::flat_hash_set<incompatibility> expected;
};

const auto compat_data = std::to_array<compat_test_case>({
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:
I find the color test cases a bit verbose. To help with readability, i would suggest:

  • Parameterize the reader generation through a function that accepts the fields string.
  • define the writer once.
  • define the expected once and use where applicable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair point. The benefit of the verbosity here is that I can copy the test case and run it against a real schema registry for further testing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Valid point. I am good either way.

{
.reader=schema_old,
.writer=schema_new,
.expected=forward_expected,
},
{
schema_old.share(),
schema_new.share(),
forward_expected,
.reader=schema_new,
.writer=schema_old,
.expected=backward_expected,
},
{
schema_new.share(),
schema_old.share(),
backward_expected,
.reader=R"({
"type": "record",
"name": "car",
"fields": [
{
"name": "color",
"type": ["null", "string"]
}
]
})",
.writer=R"({
"type": "record",
"name": "car",
"fields": []
})",
.expected={
{"/fields/0",
incompatibility::Type::reader_field_missing_default_value,
"color"},
},
},
{
.reader=R"({
"type": "record",
"name": "car",
"fields": [
{
"name": "color",
"type": ["string", "null"]
}
]
})",
.writer=R"({
"type": "record",
"name": "car",
"fields": []
})",
.expected={
{"/fields/0",
incompatibility::Type::reader_field_missing_default_value,
"color"},
},
},
{
.reader=R"({
"type": "record",
"name": "car",
"fields": [
{
"name": "color",
"type": "null",
"default": null
}
]
})",
.writer=R"({
"type": "record",
"name": "car",
"fields": []
})",
.expected = {},
},
{
.reader=R"({
"type": "record",
"name": "car",
"fields": [
{
"name": "color",
"type": "null"
}
]
})",
.writer=R"({
"type": "record",
"name": "car",
"fields": []
})",
.expected={
{"/fields/0",
incompatibility::Type::reader_field_missing_default_value,
"color"},
},
},
});

Expand All @@ -555,13 +638,26 @@ std::string format_set(const absl::flat_hash_set<ss::sstring>& d) {

SEASTAR_THREAD_TEST_CASE(test_avro_compat_messages) {
for (const auto& cd : compat_data) {
auto compat = check_compatible_verbose(cd.reader, cd.writer);
auto compat = check_compatible_verbose(
pps::sanitize_avro_schema_definition(
{cd.reader, pps::schema_type::avro})
.value(),
pps::sanitize_avro_schema_definition(
{cd.writer, pps::schema_type::avro})
.value());

pps::raw_compatibility_result raw;
absl::c_for_each(cd.expected, [&raw](auto e) {
raw.emplace<incompatibility>(std::move(e));
});
auto exp_compat = std::move(raw)(pps::verbose::yes);

absl::flat_hash_set<ss::sstring> errs{
compat.messages.begin(), compat.messages.end()};
absl::flat_hash_set<ss::sstring> expected{
cd.expected.messages.begin(), cd.expected.messages.end()};
exp_compat.messages.begin(), exp_compat.messages.end()};

BOOST_CHECK(!compat.is_compat);
BOOST_CHECK_EQUAL(compat.is_compat, exp_compat.is_compat);
BOOST_CHECK_EQUAL(errs.size(), expected.size());
BOOST_REQUIRE_MESSAGE(
errs == expected,
Expand Down