From 2856989ff2175f5c9d119ee25abca8e5ee192525 Mon Sep 17 00:00:00 2001 From: Oren Leiman Date: Tue, 12 Nov 2024 12:13:07 -0800 Subject: [PATCH] c/topics: Separate key and value schema ID validation sanctions In the previous scheme, enabling record_value schema ID validation would pass the sanctions check if record_key schema ID validation was already enabled. This commit splits an existing check into two, such that changes to either can be detected and subjected to license policy enforcement if required. Signed-off-by: Oren Leiman --- src/v/cluster/topics_frontend.cc | 21 +++++++++--- src/v/kafka/server/tests/alter_config_test.cc | 34 +++++++++++++++++++ 2 files changed, 50 insertions(+), 5 deletions(-) diff --git a/src/v/cluster/topics_frontend.cc b/src/v/cluster/topics_frontend.cc index fcc66bc019c44..53674e9603dc4 100644 --- a/src/v/cluster/topics_frontend.cc +++ b/src/v/cluster/topics_frontend.cc @@ -108,14 +108,24 @@ std::vector get_enterprise_features( features.emplace_back("tiered storage"); } - constexpr auto schema_id_validation_enabled = + static constexpr auto key_schema_id_validation_enabled = [](const cluster::topic_properties& pp) -> bool { return pp.record_key_schema_id_validation.value_or(false) - || pp.record_key_schema_id_validation_compat.value_or(false) - || pp.record_value_schema_id_validation.value_or(false) + || pp.record_key_schema_id_validation_compat.value_or(false); + }; + + static constexpr auto value_schema_id_validation_enabled = + [](const cluster::topic_properties& pp) -> bool { + return pp.record_value_schema_id_validation.value_or(false) || pp.record_value_schema_id_validation_compat.value_or(false); }; + static constexpr auto schema_id_validation_enabled = + [](const cluster::topic_properties& pp) -> bool { + return key_schema_id_validation_enabled(pp) + || value_schema_id_validation_enabled(pp); + }; + constexpr auto unset_or_unchanged = []( const reflection::is_std_optional auto& curr, @@ -144,8 +154,9 @@ std::vector get_enterprise_features( }; if ( - (schema_id_validation_enabled(properties) - < schema_id_validation_enabled(updated_properties)) + ((key_schema_id_validation_enabled(properties) + < key_schema_id_validation_enabled(updated_properties)) + || (value_schema_id_validation_enabled(properties) < value_schema_id_validation_enabled(updated_properties))) || (schema_id_validation_enabled(updated_properties) && sns_modified())) { features.emplace_back("schema id validation"); } diff --git a/src/v/kafka/server/tests/alter_config_test.cc b/src/v/kafka/server/tests/alter_config_test.cc index 9a1b3e6782f83..29ffae44a5783 100644 --- a/src/v/kafka/server/tests/alter_config_test.cc +++ b/src/v/kafka/server/tests/alter_config_test.cc @@ -1395,6 +1395,40 @@ FIXTURE_TEST(test_unlicensed_alter_configs, alter_config_test_fixture) { }, failure); + const auto key_validation = props_t{ + with(kafka::topic_property_record_key_schema_id_validation, true), + }; + test_cases.emplace_back( + "set_value_after_key", + key_validation, + alter_props_t{{set( + kafka::topic_property_record_value_schema_id_validation_compat, + true)}}, + failure); + test_cases.emplace_back( + "unset_key", + key_validation, + alter_props_t{{set( + kafka::topic_property_record_key_schema_id_validation, false)}}, + success); + + const auto value_validation = props_t{ + with(kafka::topic_property_record_value_schema_id_validation, true), + }; + test_cases.emplace_back( + "set_key_after_value", + value_validation, + alter_props_t{{set( + kafka::topic_property_record_key_schema_id_validation_compat, + true)}}, + failure); + test_cases.emplace_back( + "unset_value", + value_validation, + alter_props_t{{set( + kafka::topic_property_record_value_schema_id_validation, false)}}, + success); + const auto validation_with_strat = props_t{ with(kafka::topic_property_record_key_schema_id_validation, true), with(kafka::topic_property_record_value_schema_id_validation, true),