From 7354df72147a9bd66e38728b5e10eb68cc069a91 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gell=C3=A9rt=20Peresztegi-Nagy?= Date: Tue, 12 Mar 2024 11:08:59 +0000 Subject: [PATCH 1/6] kafka: extract topic configs code This extracts the topic config specific code in describe_configs and moves it into a separate file so that it can later be reused in the CreateTopic handler. (cherry picked from commit 47cf65f252cdc42f8c66fad732c8f55f10785375) --- src/v/kafka/CMakeLists.txt | 1 + .../handlers/configs/config_response_utils.cc | 853 +++++++++++++++++ .../handlers/configs/config_response_utils.h | 33 + .../kafka/server/handlers/describe_configs.cc | 879 +----------------- 4 files changed, 892 insertions(+), 874 deletions(-) create mode 100644 src/v/kafka/server/handlers/configs/config_response_utils.cc create mode 100644 src/v/kafka/server/handlers/configs/config_response_utils.h diff --git a/src/v/kafka/CMakeLists.txt b/src/v/kafka/CMakeLists.txt index d9e9fc93c8dfd..f031e7968e0c4 100644 --- a/src/v/kafka/CMakeLists.txt +++ b/src/v/kafka/CMakeLists.txt @@ -20,6 +20,7 @@ set(handlers_srcs server/handlers/alter_partition_reassignments.cc server/handlers/list_partition_reassignments.cc server/handlers/handler_interface.cc + server/handlers/configs/config_response_utils.cc server/handlers/topics/types.cc server/handlers/topics/topic_utils.cc server/handlers/delete_records.cc diff --git a/src/v/kafka/server/handlers/configs/config_response_utils.cc b/src/v/kafka/server/handlers/configs/config_response_utils.cc new file mode 100644 index 0000000000000..aa476e577ae78 --- /dev/null +++ b/src/v/kafka/server/handlers/configs/config_response_utils.cc @@ -0,0 +1,853 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#include "kafka/server/handlers/configs/config_response_utils.h" + +#include "cluster/metadata_cache.h" +#include "cluster/types.h" +#include "config/node_config.h" +#include "kafka/server/handlers/topics/types.h" + +#include + +namespace kafka { + +static bool config_property_requested( + const std::optional>& configuration_keys, + const std::string_view property_name) { + return !configuration_keys.has_value() + || std::find( + configuration_keys->begin(), + configuration_keys->end(), + property_name) + != configuration_keys->end(); +} + +template +static void add_config( + describe_configs_result& result, + std::string_view name, + T value, + describe_configs_source source) { + result.configs.push_back(describe_configs_resource_result{ + .name = ss::sstring(name), + .value = ssx::sformat("{}", value), + .config_source = source, + }); +} + +template +static void add_config_if_requested( + const describe_configs_resource& resource, + describe_configs_result& result, + std::string_view name, + T value, + describe_configs_source source) { + if (config_property_requested(resource.configuration_keys, name)) { + add_config(result, name, value, source); + } +} + +template +static ss::sstring describe_as_string(const T& t) { + return ssx::sformat("{}", t); +} + +// Kafka protocol defines integral types by sizes. See +// https://kafka.apache.org/protocol.html +// Therefore we should also use type sizes for integrals and use Java type sizes +// as a guideline. See +// https://docs.oracle.com/javase/tutorial/java/nutsandbolts/datatypes.html +template +constexpr auto num_bits = CHAR_BIT * sizeof(T); + +template +constexpr bool is_short = std::is_integral_v && !std::is_same_v + && num_bits <= 16; + +template +constexpr bool is_int = std::is_integral_v && num_bits > 16 + && num_bits <= 32; + +template +constexpr bool is_long = std::is_integral_v && num_bits > 32 + && num_bits <= 64; + +// property_config_type maps the datatype for a config property to +// describe_configs_type. Currently class_type and password are not used in +// Redpanda so we do not include checks for those types. You may find a similar +// mapping in Apache Kafka at +// https://github.com/apache/kafka/blob/be032735b39360df1a6de1a7feea8b4336e5bcc0/core/src/main/scala/kafka/server/ConfigHelper.scala +template +consteval describe_configs_type property_config_type() { + // clang-format off + constexpr auto is_string_type = std::is_same_v || + std::is_same_v || + std::is_same_v || + std::is_same_v || + std::is_same_v || + std::is_same_v || + std::is_same_v; + + constexpr auto is_long_type = is_long || + // Long type since seconds is atleast a 35-bit signed integral + // https://en.cppreference.com/w/cpp/chrono/duration + std::is_same_v || + // Long type since milliseconds is atleast a 45-bit signed integral + // https://en.cppreference.com/w/cpp/chrono/duration + std::is_same_v; + // clang-format on + + if constexpr ( + reflection::is_std_optional || reflection::is_tristate) { + return property_config_type(); + return property_config_type(); + } else if constexpr (std::is_same_v) { + return describe_configs_type::boolean; + } else if constexpr (is_string_type) { + return describe_configs_type::string; + } else if constexpr (is_short) { + return describe_configs_type::short_type; + } else if constexpr (is_int) { + return describe_configs_type::int_type; + } else if constexpr (is_long_type) { + return describe_configs_type::long_type; + } else if constexpr (std::is_floating_point_v) { + return describe_configs_type::double_type; + } else if constexpr (reflection::is_std_vector) { + return describe_configs_type::list; + } else { + static_assert( + config::detail::dependent_false::value, + "Type name is not supported in describe_configs_type"); + } +} + +template +static void add_broker_config( + describe_configs_result& result, + std::string_view name, + const config::property& property, + bool include_synonyms, + std::optional documentation, + Func&& describe_f) { + describe_configs_source src + = property.is_overriden() ? describe_configs_source::static_broker_config + : describe_configs_source::default_config; + + std::vector synonyms; + if (include_synonyms) { + synonyms.reserve(2); + /** + * If value was overriden, include override + */ + if (src == describe_configs_source::static_broker_config) { + synonyms.push_back(describe_configs_synonym{ + .name = ss::sstring(property.name()), + .value = describe_f(property.value()), + .source = static_cast( + describe_configs_source::static_broker_config), + }); + } + /** + * If property is required it has no default + */ + if (!property.is_required()) { + synonyms.push_back(describe_configs_synonym{ + .name = ss::sstring(property.name()), + .value = describe_f(property.default_value()), + .source = static_cast( + describe_configs_source::default_config), + }); + } + } + + result.configs.push_back(describe_configs_resource_result{ + .name = ss::sstring(name), + .value = describe_f(property.value()), + .config_source = src, + .synonyms = std::move(synonyms), + .config_type = property_config_type(), + .documentation = documentation, + }); +} + +template +static void add_broker_config_if_requested( + const describe_configs_resource& resource, + describe_configs_result& result, + std::string_view name, + const config::property& property, + bool include_synonyms, + std::optional documentation, + Func&& describe_f) { + if (config_property_requested(resource.configuration_keys, name)) { + add_broker_config( + result, + name, + property, + include_synonyms, + documentation, + std::forward(describe_f)); + } +} + +template +static void add_topic_config( + describe_configs_result& result, + std::string_view default_name, + const T& default_value, + std::string_view override_name, + const std::optional& overrides, + bool include_synonyms, + std::optional documentation, + Func&& describe_f) { + describe_configs_source src = overrides + ? describe_configs_source::topic + : describe_configs_source::default_config; + + std::vector synonyms; + if (include_synonyms) { + synonyms.reserve(2); + if (overrides) { + synonyms.push_back(describe_configs_synonym{ + .name = ss::sstring(override_name), + .value = describe_f(*overrides), + .source = static_cast(describe_configs_source::topic), + }); + } + synonyms.push_back(describe_configs_synonym{ + .name = ss::sstring(default_name), + .value = describe_f(default_value), + .source = static_cast( + describe_configs_source::default_config), + }); + } + + result.configs.push_back(describe_configs_resource_result{ + .name = ss::sstring(override_name), + .value = describe_f(overrides.value_or(default_value)), + .config_source = src, + .synonyms = std::move(synonyms), + .config_type = property_config_type(), + .documentation = documentation, + }); +} + +/** + * For faking DEFAULT_CONFIG status for properties that are actually + * topic overrides: cloud storage properties. We do not support cluster + * defaults for these, the values are always "sticky" to topics, but + * some Kafka clients insist that after an AlterConfig RPC, anything + * they didn't set should be DEFAULT_CONFIG. + * + * See https://github.com/redpanda-data/redpanda/issues/7451 + */ +template +std::optional +override_if_not_default(const std::optional& override, const T& def) { + if (override && override.value() != def) { + return override; + } else { + return std::nullopt; + } +} + +template +static void add_topic_config_if_requested( + const describe_configs_resource& resource, + describe_configs_result& result, + std::string_view default_name, + const T& default_value, + std::string_view override_name, + const std::optional& overrides, + bool include_synonyms, + std::optional documentation, + Func&& describe_f, + bool hide_default_override = false) { + if (config_property_requested(resource.configuration_keys, override_name)) { + std::optional overrides_val; + if (hide_default_override) { + overrides_val = override_if_not_default(overrides, default_value); + } else { + overrides_val = overrides; + } + + add_topic_config( + result, + default_name, + default_value, + override_name, + overrides_val, + include_synonyms, + documentation, + std::forward(describe_f)); + } +} + +template +static ss::sstring maybe_print_tristate(const tristate& tri) { + if (tri.is_disabled() || !tri.has_optional_value()) { + return "-1"; + } + return ssx::sformat("{}", tri.value()); +} + +template +static void add_topic_config( + describe_configs_result& result, + std::string_view default_name, + const std::optional& default_value, + std::string_view override_name, + const tristate& overrides, + bool include_synonyms, + std::optional documentation) { + // Wrap overrides in an optional because add_topic_config expects + // optional where S = tristate + std::optional> override_value; + if (overrides.is_disabled() || overrides.has_optional_value()) { + override_value = std::make_optional(overrides); + } + + add_topic_config( + result, + default_name, + tristate{default_value}, + override_name, + override_value, + include_synonyms, + documentation, + &maybe_print_tristate); +} + +template +static void add_topic_config_if_requested( + const describe_configs_resource& resource, + describe_configs_result& result, + std::string_view default_name, + const std::optional& default_value, + std::string_view override_name, + const tristate& overrides, + bool include_synonyms, + std::optional documentation) { + if (config_property_requested(resource.configuration_keys, override_name)) { + add_topic_config( + result, + default_name, + default_value, + override_name, + overrides, + include_synonyms, + documentation); + } +} + +static ss::sstring +kafka_endpoint_format(const std::vector& endpoints) { + std::vector uris; + uris.reserve(endpoints.size()); + std::transform( + endpoints.cbegin(), + endpoints.cend(), + std::back_inserter(uris), + [](const model::broker_endpoint& ep) { + return ssx::sformat( + "{}://{}:{}", + (ep.name.empty() ? "plain" : ep.name), + ep.address.host(), + ep.address.port()); + }); + return ssx::sformat("{}", fmt::join(uris, ",")); +} + +static ss::sstring kafka_authn_endpoint_format( + const std::vector& endpoints) { + std::vector uris; + uris.reserve(endpoints.size()); + std::transform( + endpoints.cbegin(), + endpoints.cend(), + std::back_inserter(uris), + [](const config::broker_authn_endpoint& ep) { + return ssx::sformat( + "{}://{}:{}", + (ep.name.empty() ? "plain" : ep.name), + ep.address.host(), + ep.address.port()); + }); + return ssx::sformat("{}", fmt::join(uris, ",")); +} + +static inline std::optional maybe_make_documentation( + bool include_documentation, const std::string_view& docstring) { + return include_documentation ? std::make_optional(ss::sstring{docstring}) + : std::nullopt; +} + +void report_topic_config( + const describe_configs_resource& resource, + describe_configs_result& result, + const cluster::metadata_cache& metadata_cache, + const cluster::topic_properties& topic_properties, + bool include_synonyms, + bool include_documentation) { + /** + * Kafka properties + */ + add_topic_config_if_requested( + resource, + result, + config::shard_local_cfg().log_compression_type.name(), + metadata_cache.get_default_compression(), + topic_property_compression, + topic_properties.compression, + include_synonyms, + maybe_make_documentation( + include_documentation, + config::shard_local_cfg().log_compression_type.desc()), + &describe_as_string); + + add_topic_config_if_requested( + resource, + result, + config::shard_local_cfg().log_cleanup_policy.name(), + metadata_cache.get_default_cleanup_policy_bitflags(), + topic_property_cleanup_policy, + topic_properties.cleanup_policy_bitflags, + include_synonyms, + maybe_make_documentation( + include_documentation, + config::shard_local_cfg().log_cleanup_policy.desc()), + &describe_as_string); + + const std::string_view docstring{ + topic_properties.is_compacted() + ? config::shard_local_cfg().compacted_log_segment_size.desc() + : config::shard_local_cfg().log_segment_size.desc()}; + add_topic_config_if_requested( + resource, + result, + topic_properties.is_compacted() + ? config::shard_local_cfg().compacted_log_segment_size.name() + : config::shard_local_cfg().log_segment_size.name(), + topic_properties.is_compacted() + ? metadata_cache.get_default_compacted_topic_segment_size() + : metadata_cache.get_default_segment_size(), + topic_property_segment_size, + topic_properties.segment_size, + include_synonyms, + maybe_make_documentation(include_documentation, docstring), + &describe_as_string); + + add_topic_config_if_requested( + resource, + result, + config::shard_local_cfg().delete_retention_ms.name(), + metadata_cache.get_default_retention_duration(), + topic_property_retention_duration, + topic_properties.retention_duration, + include_synonyms, + maybe_make_documentation( + include_documentation, + config::shard_local_cfg().delete_retention_ms.desc())); + + add_topic_config_if_requested( + resource, + result, + config::shard_local_cfg().retention_bytes.name(), + metadata_cache.get_default_retention_bytes(), + topic_property_retention_bytes, + topic_properties.retention_bytes, + include_synonyms, + maybe_make_documentation( + include_documentation, + config::shard_local_cfg().retention_bytes.desc())); + + add_topic_config_if_requested( + resource, + result, + config::shard_local_cfg().log_message_timestamp_type.name(), + metadata_cache.get_default_timestamp_type(), + topic_property_timestamp_type, + topic_properties.timestamp_type, + include_synonyms, + maybe_make_documentation( + include_documentation, + config::shard_local_cfg().log_message_timestamp_type.desc()), + &describe_as_string); + + add_topic_config_if_requested( + resource, + result, + config::shard_local_cfg().kafka_batch_max_bytes.name(), + metadata_cache.get_default_batch_max_bytes(), + topic_property_max_message_bytes, + topic_properties.batch_max_bytes, + include_synonyms, + maybe_make_documentation( + include_documentation, + config::shard_local_cfg().kafka_batch_max_bytes.desc()), + &describe_as_string); + + // Shadow indexing properties + add_topic_config_if_requested( + resource, + result, + topic_property_remote_read, + model::is_fetch_enabled( + metadata_cache.get_default_shadow_indexing_mode()), + topic_property_remote_read, + topic_properties.shadow_indexing.has_value() ? std::make_optional( + model::is_fetch_enabled(*topic_properties.shadow_indexing)) + : std::nullopt, + include_synonyms, + maybe_make_documentation( + include_documentation, + config::shard_local_cfg().cloud_storage_enable_remote_read.desc()), + &describe_as_string, + true); + + add_topic_config_if_requested( + resource, + result, + topic_property_remote_write, + model::is_archival_enabled( + metadata_cache.get_default_shadow_indexing_mode()), + topic_property_remote_write, + topic_properties.shadow_indexing.has_value() ? std::make_optional( + model::is_archival_enabled(*topic_properties.shadow_indexing)) + : std::nullopt, + include_synonyms, + maybe_make_documentation( + include_documentation, + config::shard_local_cfg().cloud_storage_enable_remote_write.desc()), + &describe_as_string, + true); + + add_topic_config_if_requested( + resource, + result, + topic_property_retention_local_target_bytes, + metadata_cache.get_default_retention_local_target_bytes(), + topic_property_retention_local_target_bytes, + topic_properties.retention_local_target_bytes, + include_synonyms, + maybe_make_documentation( + include_documentation, + config::shard_local_cfg().retention_local_target_bytes_default.desc())); + + add_topic_config_if_requested( + resource, + result, + topic_property_retention_local_target_ms, + std::make_optional( + metadata_cache.get_default_retention_local_target_ms()), + topic_property_retention_local_target_ms, + topic_properties.retention_local_target_ms, + include_synonyms, + maybe_make_documentation( + include_documentation, + config::shard_local_cfg().retention_local_target_ms_default.desc())); + + if (config_property_requested( + resource.configuration_keys, topic_property_remote_delete)) { + add_topic_config( + result, + topic_property_remote_delete, + storage::ntp_config::default_remote_delete, + topic_property_remote_delete, + override_if_not_default( + std::make_optional(topic_properties.remote_delete), + storage::ntp_config::default_remote_delete), + true, + maybe_make_documentation( + include_documentation, + "Controls whether topic deletion should imply deletion in " + "S3"), + [](const bool& b) { return b ? "true" : "false"; }); + } + + add_topic_config_if_requested( + resource, + result, + topic_property_segment_ms, + metadata_cache.get_default_segment_ms(), + topic_property_segment_ms, + topic_properties.segment_ms, + include_synonyms, + maybe_make_documentation( + include_documentation, + config::shard_local_cfg().log_segment_ms.desc())); + + constexpr std::string_view key_validation + = "Enable validation of the schema id for keys on a record"; + constexpr std::string_view val_validation + = "Enable validation of the schema id for values on a record"; + constexpr bool validation_hide_default_override = true; + + switch (config::shard_local_cfg().enable_schema_id_validation()) { + case pandaproxy::schema_registry::schema_id_validation_mode::compat: { + add_topic_config_if_requested( + resource, + result, + topic_property_record_key_schema_id_validation_compat, + metadata_cache.get_default_record_key_schema_id_validation(), + topic_property_record_key_schema_id_validation_compat, + topic_properties.record_key_schema_id_validation_compat, + include_synonyms, + maybe_make_documentation(include_documentation, key_validation), + &describe_as_string, + validation_hide_default_override); + + add_topic_config_if_requested( + resource, + result, + topic_property_record_key_subject_name_strategy_compat, + metadata_cache.get_default_record_key_subject_name_strategy(), + topic_property_record_key_subject_name_strategy_compat, + topic_properties.record_key_subject_name_strategy_compat, + include_synonyms, + maybe_make_documentation( + include_documentation, + fmt::format( + "The subject name strategy for keys if {} is enabled", + topic_property_record_key_schema_id_validation_compat)), + [](auto sns) { return ss::sstring(to_string_view_compat(sns)); }, + validation_hide_default_override); + + add_topic_config_if_requested( + resource, + result, + topic_property_record_value_schema_id_validation_compat, + metadata_cache.get_default_record_value_schema_id_validation(), + topic_property_record_value_schema_id_validation_compat, + topic_properties.record_value_schema_id_validation_compat, + include_synonyms, + maybe_make_documentation(include_documentation, val_validation), + &describe_as_string, + validation_hide_default_override); + + add_topic_config_if_requested( + resource, + result, + topic_property_record_value_subject_name_strategy_compat, + metadata_cache.get_default_record_value_subject_name_strategy(), + topic_property_record_value_subject_name_strategy_compat, + topic_properties.record_value_subject_name_strategy_compat, + include_synonyms, + maybe_make_documentation( + include_documentation, + fmt::format( + "The subject name strategy for values if {} is enabled", + topic_property_record_value_schema_id_validation_compat)), + [](auto sns) { return ss::sstring(to_string_view_compat(sns)); }, + validation_hide_default_override); + [[fallthrough]]; + } + case pandaproxy::schema_registry::schema_id_validation_mode::redpanda: { + add_topic_config_if_requested( + resource, + result, + topic_property_record_key_schema_id_validation, + metadata_cache.get_default_record_key_schema_id_validation(), + topic_property_record_key_schema_id_validation, + topic_properties.record_key_schema_id_validation, + include_synonyms, + maybe_make_documentation(include_documentation, key_validation), + &describe_as_string, + validation_hide_default_override); + + add_topic_config_if_requested( + resource, + result, + topic_property_record_key_subject_name_strategy, + metadata_cache.get_default_record_key_subject_name_strategy(), + topic_property_record_key_subject_name_strategy, + topic_properties.record_key_subject_name_strategy, + include_synonyms, + maybe_make_documentation( + include_documentation, + fmt::format( + "The subject name strategy for keys if {} is enabled", + topic_property_record_key_schema_id_validation)), + &describe_as_string< + pandaproxy::schema_registry::subject_name_strategy>, + validation_hide_default_override); + + add_topic_config_if_requested( + resource, + result, + topic_property_record_value_schema_id_validation, + metadata_cache.get_default_record_value_schema_id_validation(), + topic_property_record_value_schema_id_validation, + topic_properties.record_value_schema_id_validation, + include_synonyms, + maybe_make_documentation(include_documentation, val_validation), + &describe_as_string, + validation_hide_default_override); + + add_topic_config_if_requested( + resource, + result, + topic_property_record_value_subject_name_strategy, + metadata_cache.get_default_record_value_subject_name_strategy(), + topic_property_record_value_subject_name_strategy, + topic_properties.record_value_subject_name_strategy, + include_synonyms, + maybe_make_documentation( + include_documentation, + fmt::format( + "The subject name strategy for values if {} is enabled", + topic_property_record_value_schema_id_validation)), + &describe_as_string< + pandaproxy::schema_registry::subject_name_strategy>, + validation_hide_default_override); + } + case pandaproxy::schema_registry::schema_id_validation_mode::none: { + break; + } + } +} + +void report_broker_config( + const describe_configs_resource& resource, + describe_configs_result& result, + bool include_synonyms, + bool include_documentation) { + if (!result.resource_name.empty()) { + int32_t broker_id = -1; + auto res = std::from_chars( + result.resource_name.data(), + result.resource_name.data() + result.resource_name.size(), // NOLINT + broker_id); + if (res.ec == std::errc()) { + if (broker_id != *config::node().node_id()) { + result.error_code = error_code::invalid_request; + result.error_message = ssx::sformat( + "Unexpected broker id {} expected {}", + broker_id, + *config::node().node_id()); + return; + } + } else { + result.error_code = error_code::invalid_request; + result.error_message = ssx::sformat( + "Broker id must be an integer but received {}", + result.resource_name); + return; + } + } + + add_broker_config_if_requested( + resource, + result, + "listeners", + config::node().kafka_api, + include_synonyms, + maybe_make_documentation( + include_documentation, config::node().kafka_api.desc()), + &kafka_authn_endpoint_format); + + add_broker_config_if_requested( + resource, + result, + "advertised.listeners", + config::node().advertised_kafka_api_property(), + include_synonyms, + maybe_make_documentation( + include_documentation, + config::node().advertised_kafka_api_property().desc()), + &kafka_endpoint_format); + + add_broker_config_if_requested( + resource, + result, + "log.segment.bytes", + config::shard_local_cfg().log_segment_size, + include_synonyms, + maybe_make_documentation( + include_documentation, + config::shard_local_cfg().log_segment_size.desc()), + &describe_as_string); + + add_broker_config_if_requested( + resource, + result, + "log.retention.bytes", + config::shard_local_cfg().retention_bytes, + include_synonyms, + maybe_make_documentation( + include_documentation, + config::shard_local_cfg().retention_bytes.desc()), + [](std::optional sz) { + return ssx::sformat("{}", sz ? sz.value() : -1); + }); + + add_broker_config_if_requested( + resource, + result, + "log.retention.ms", + config::shard_local_cfg().delete_retention_ms, + include_synonyms, + maybe_make_documentation( + include_documentation, + config::shard_local_cfg().delete_retention_ms.desc()), + [](const std::optional& ret) { + return ssx::sformat("{}", ret.value_or(-1ms).count()); + }); + + add_broker_config_if_requested( + resource, + result, + "num.partitions", + config::shard_local_cfg().default_topic_partitions, + include_synonyms, + maybe_make_documentation( + include_documentation, + config::shard_local_cfg().default_topic_partitions.desc()), + &describe_as_string); + + add_broker_config_if_requested( + resource, + result, + "default.replication.factor", + config::shard_local_cfg().default_topic_replication, + include_synonyms, + maybe_make_documentation( + include_documentation, + config::shard_local_cfg().default_topic_replication.desc()), + &describe_as_string); + + add_broker_config_if_requested( + resource, + result, + "log.dirs", + config::node().data_directory, + include_synonyms, + maybe_make_documentation( + include_documentation, config::node().data_directory.desc()), + [](const config::data_directory_path& path) { + return path.as_sstring(); + }); + + add_broker_config_if_requested( + resource, + result, + "auto.create.topics.enable", + config::shard_local_cfg().auto_create_topics_enabled, + include_synonyms, + maybe_make_documentation( + include_documentation, + config::shard_local_cfg().auto_create_topics_enabled.desc()), + &describe_as_string); +} + +} // namespace kafka diff --git a/src/v/kafka/server/handlers/configs/config_response_utils.h b/src/v/kafka/server/handlers/configs/config_response_utils.h new file mode 100644 index 0000000000000..cbff3ac7d60c6 --- /dev/null +++ b/src/v/kafka/server/handlers/configs/config_response_utils.h @@ -0,0 +1,33 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#pragma once + +#include "cluster/types.h" +#include "kafka/protocol/describe_configs.h" + +namespace kafka { + +void report_topic_config( + const describe_configs_resource& resource, + describe_configs_result& result, + const cluster::metadata_cache& metadata_cache, + const cluster::topic_properties& topic_properties, + bool include_synonyms, + bool include_documentation); + +void report_broker_config( + const describe_configs_resource& resource, + describe_configs_result& result, + bool include_synonyms, + bool include_documentation); + +} // namespace kafka diff --git a/src/v/kafka/server/handlers/describe_configs.cc b/src/v/kafka/server/handlers/describe_configs.cc index 02001608b3594..219f81f8bae28 100644 --- a/src/v/kafka/server/handlers/describe_configs.cc +++ b/src/v/kafka/server/handlers/describe_configs.cc @@ -14,6 +14,7 @@ #include "config/data_directory_path.h" #include "config/node_config.h" #include "kafka/protocol/errors.h" +#include "kafka/server/handlers/configs/config_response_utils.h" #include "kafka/server/handlers/topics/topic_utils.h" #include "kafka/server/handlers/topics/types.h" #include "kafka/server/request_context.h" @@ -39,535 +40,6 @@ namespace kafka { -static bool config_property_requested( - const std::optional>& configuration_keys, - const std::string_view property_name) { - return !configuration_keys.has_value() - || std::find( - configuration_keys->begin(), - configuration_keys->end(), - property_name) - != configuration_keys->end(); -} - -template -static void add_config( - describe_configs_result& result, - std::string_view name, - T value, - describe_configs_source source) { - result.configs.push_back(describe_configs_resource_result{ - .name = ss::sstring(name), - .value = ssx::sformat("{}", value), - .config_source = source, - }); -} - -template -static void add_config_if_requested( - const describe_configs_resource& resource, - describe_configs_result& result, - std::string_view name, - T value, - describe_configs_source source) { - if (config_property_requested(resource.configuration_keys, name)) { - add_config(result, name, value, source); - } -} - -template -static ss::sstring describe_as_string(const T& t) { - return ssx::sformat("{}", t); -} - -// Kafka protocol defines integral types by sizes. See -// https://kafka.apache.org/protocol.html -// Therefore we should also use type sizes for integrals and use Java type sizes -// as a guideline. See -// https://docs.oracle.com/javase/tutorial/java/nutsandbolts/datatypes.html -template -constexpr auto num_bits = CHAR_BIT * sizeof(T); - -template -constexpr bool is_short = std::is_integral_v && !std::is_same_v - && num_bits <= 16; - -template -constexpr bool is_int = std::is_integral_v && num_bits > 16 - && num_bits <= 32; - -template -constexpr bool is_long = std::is_integral_v && num_bits > 32 - && num_bits <= 64; - -// property_config_type maps the datatype for a config property to -// describe_configs_type. Currently class_type and password are not used in -// Redpanda so we do not include checks for those types. You may find a similar -// mapping in Apache Kafka at -// https://github.com/apache/kafka/blob/be032735b39360df1a6de1a7feea8b4336e5bcc0/core/src/main/scala/kafka/server/ConfigHelper.scala -template -consteval describe_configs_type property_config_type() { - // clang-format off - constexpr auto is_string_type = std::is_same_v || - std::is_same_v || - std::is_same_v || - std::is_same_v || - std::is_same_v || - std::is_same_v || - std::is_same_v; - - constexpr auto is_long_type = is_long || - // Long type since seconds is atleast a 35-bit signed integral - // https://en.cppreference.com/w/cpp/chrono/duration - std::is_same_v || - // Long type since milliseconds is atleast a 45-bit signed integral - // https://en.cppreference.com/w/cpp/chrono/duration - std::is_same_v; - // clang-format on - - if constexpr ( - reflection::is_std_optional || reflection::is_tristate) { - return property_config_type(); - return property_config_type(); - } else if constexpr (std::is_same_v) { - return describe_configs_type::boolean; - } else if constexpr (is_string_type) { - return describe_configs_type::string; - } else if constexpr (is_short) { - return describe_configs_type::short_type; - } else if constexpr (is_int) { - return describe_configs_type::int_type; - } else if constexpr (is_long_type) { - return describe_configs_type::long_type; - } else if constexpr (std::is_floating_point_v) { - return describe_configs_type::double_type; - } else if constexpr (reflection::is_std_vector) { - return describe_configs_type::list; - } else { - static_assert( - config::detail::dependent_false::value, - "Type name is not supported in describe_configs_type"); - } -} - -template -static void add_broker_config( - describe_configs_result& result, - std::string_view name, - const config::property& property, - bool include_synonyms, - std::optional documentation, - Func&& describe_f) { - describe_configs_source src - = property.is_overriden() ? describe_configs_source::static_broker_config - : describe_configs_source::default_config; - - std::vector synonyms; - if (include_synonyms) { - synonyms.reserve(2); - /** - * If value was overriden, include override - */ - if (src == describe_configs_source::static_broker_config) { - synonyms.push_back(describe_configs_synonym{ - .name = ss::sstring(property.name()), - .value = describe_f(property.value()), - .source = static_cast( - describe_configs_source::static_broker_config), - }); - } - /** - * If property is required it has no default - */ - if (!property.is_required()) { - synonyms.push_back(describe_configs_synonym{ - .name = ss::sstring(property.name()), - .value = describe_f(property.default_value()), - .source = static_cast( - describe_configs_source::default_config), - }); - } - } - - result.configs.push_back(describe_configs_resource_result{ - .name = ss::sstring(name), - .value = describe_f(property.value()), - .config_source = src, - .synonyms = std::move(synonyms), - .config_type = property_config_type(), - .documentation = documentation, - }); -} - -template -static void add_broker_config_if_requested( - const describe_configs_resource& resource, - describe_configs_result& result, - std::string_view name, - const config::property& property, - bool include_synonyms, - std::optional documentation, - Func&& describe_f) { - if (config_property_requested(resource.configuration_keys, name)) { - add_broker_config( - result, - name, - property, - include_synonyms, - documentation, - std::forward(describe_f)); - } -} - -template -static void add_topic_config( - describe_configs_result& result, - std::string_view default_name, - const T& default_value, - std::string_view override_name, - const std::optional& overrides, - bool include_synonyms, - std::optional documentation, - Func&& describe_f) { - describe_configs_source src = overrides - ? describe_configs_source::topic - : describe_configs_source::default_config; - - std::vector synonyms; - if (include_synonyms) { - synonyms.reserve(2); - if (overrides) { - synonyms.push_back(describe_configs_synonym{ - .name = ss::sstring(override_name), - .value = describe_f(*overrides), - .source = static_cast(describe_configs_source::topic), - }); - } - synonyms.push_back(describe_configs_synonym{ - .name = ss::sstring(default_name), - .value = describe_f(default_value), - .source = static_cast( - describe_configs_source::default_config), - }); - } - - result.configs.push_back(describe_configs_resource_result{ - .name = ss::sstring(override_name), - .value = describe_f(overrides.value_or(default_value)), - .config_source = src, - .synonyms = std::move(synonyms), - .config_type = property_config_type(), - .documentation = documentation, - }); -} - -/** - * For faking DEFAULT_CONFIG status for properties that are actually - * topic overrides: cloud storage properties. We do not support cluster - * defaults for these, the values are always "sticky" to topics, but - * some Kafka clients insist that after an AlterConfig RPC, anything - * they didn't set should be DEFAULT_CONFIG. - * - * See https://github.com/redpanda-data/redpanda/issues/7451 - */ -template -std::optional -override_if_not_default(const std::optional& override, const T& def) { - if (override && override.value() != def) { - return override; - } else { - return std::nullopt; - } -} - -template -static void add_topic_config_if_requested( - const describe_configs_resource& resource, - describe_configs_result& result, - std::string_view default_name, - const T& default_value, - std::string_view override_name, - const std::optional& overrides, - bool include_synonyms, - std::optional documentation, - Func&& describe_f, - bool hide_default_override = false) { - if (config_property_requested(resource.configuration_keys, override_name)) { - std::optional overrides_val; - if (hide_default_override) { - overrides_val = override_if_not_default(overrides, default_value); - } else { - overrides_val = overrides; - } - - add_topic_config( - result, - default_name, - default_value, - override_name, - overrides_val, - include_synonyms, - documentation, - std::forward(describe_f)); - } -} - -template -static ss::sstring maybe_print_tristate(const tristate& tri) { - if (tri.is_disabled() || !tri.has_optional_value()) { - return "-1"; - } - return ssx::sformat("{}", tri.value()); -} - -template -static void add_topic_config( - describe_configs_result& result, - std::string_view default_name, - const std::optional& default_value, - std::string_view override_name, - const tristate& overrides, - bool include_synonyms, - std::optional documentation) { - // Wrap overrides in an optional because add_topic_config expects - // optional where S = tristate - std::optional> override_value; - if (overrides.is_disabled() || overrides.has_optional_value()) { - override_value = std::make_optional(overrides); - } - - add_topic_config( - result, - default_name, - tristate{default_value}, - override_name, - override_value, - include_synonyms, - documentation, - &maybe_print_tristate); -} - -template -static void add_topic_config_if_requested( - const describe_configs_resource& resource, - describe_configs_result& result, - std::string_view default_name, - const std::optional& default_value, - std::string_view override_name, - const tristate& overrides, - bool include_synonyms, - std::optional documentation) { - if (config_property_requested(resource.configuration_keys, override_name)) { - add_topic_config( - result, - default_name, - default_value, - override_name, - overrides, - include_synonyms, - documentation); - } -} - -static ss::sstring -kafka_endpoint_format(const std::vector& endpoints) { - std::vector uris; - uris.reserve(endpoints.size()); - std::transform( - endpoints.cbegin(), - endpoints.cend(), - std::back_inserter(uris), - [](const model::broker_endpoint& ep) { - return ssx::sformat( - "{}://{}:{}", - (ep.name.empty() ? "plain" : ep.name), - ep.address.host(), - ep.address.port()); - }); - return ssx::sformat("{}", fmt::join(uris, ",")); -} - -static ss::sstring kafka_authn_endpoint_format( - const std::vector& endpoints) { - std::vector uris; - uris.reserve(endpoints.size()); - std::transform( - endpoints.cbegin(), - endpoints.cend(), - std::back_inserter(uris), - [](const config::broker_authn_endpoint& ep) { - return ssx::sformat( - "{}://{}:{}", - (ep.name.empty() ? "plain" : ep.name), - ep.address.host(), - ep.address.port()); - }); - return ssx::sformat("{}", fmt::join(uris, ",")); -} - -static inline std::optional maybe_make_documentation( - bool include_documentation, const std::string_view& docstring) { - return include_documentation ? std::make_optional(ss::sstring{docstring}) - : std::nullopt; -} - -static void report_broker_config( - const describe_configs_resource& resource, - describe_configs_result& result, - bool include_synonyms, - bool include_documentation) { - if (!result.resource_name.empty()) { - int32_t broker_id = -1; - auto res = std::from_chars( - result.resource_name.data(), - result.resource_name.data() + result.resource_name.size(), // NOLINT - broker_id); - if (res.ec == std::errc()) { - if (broker_id != *config::node().node_id()) { - result.error_code = error_code::invalid_request; - result.error_message = ssx::sformat( - "Unexpected broker id {} expected {}", - broker_id, - *config::node().node_id()); - return; - } - } else { - result.error_code = error_code::invalid_request; - result.error_message = ssx::sformat( - "Broker id must be an integer but received {}", - result.resource_name); - return; - } - } - - add_broker_config_if_requested( - resource, - result, - "listeners", - config::node().kafka_api, - include_synonyms, - maybe_make_documentation( - include_documentation, config::node().kafka_api.desc()), - &kafka_authn_endpoint_format); - - add_broker_config_if_requested( - resource, - result, - "advertised.listeners", - config::node().advertised_kafka_api_property(), - include_synonyms, - maybe_make_documentation( - include_documentation, - config::node().advertised_kafka_api_property().desc()), - &kafka_endpoint_format); - - add_broker_config_if_requested( - resource, - result, - "log.segment.bytes", - config::shard_local_cfg().log_segment_size, - include_synonyms, - maybe_make_documentation( - include_documentation, - config::shard_local_cfg().log_segment_size.desc()), - &describe_as_string); - - add_broker_config_if_requested( - resource, - result, - "log.retention.bytes", - config::shard_local_cfg().retention_bytes, - include_synonyms, - maybe_make_documentation( - include_documentation, - config::shard_local_cfg().retention_bytes.desc()), - [](std::optional sz) { - return ssx::sformat("{}", sz ? sz.value() : -1); - }); - - add_broker_config_if_requested( - resource, - result, - "log.retention.ms", - config::shard_local_cfg().delete_retention_ms, - include_synonyms, - maybe_make_documentation( - include_documentation, - config::shard_local_cfg().delete_retention_ms.desc()), - [](const std::optional& ret) { - return ssx::sformat("{}", ret.value_or(-1ms).count()); - }); - - add_broker_config_if_requested( - resource, - result, - "num.partitions", - config::shard_local_cfg().default_topic_partitions, - include_synonyms, - maybe_make_documentation( - include_documentation, - config::shard_local_cfg().default_topic_partitions.desc()), - &describe_as_string); - - add_broker_config_if_requested( - resource, - result, - "default.replication.factor", - config::shard_local_cfg().default_topic_replication, - include_synonyms, - maybe_make_documentation( - include_documentation, - config::shard_local_cfg().default_topic_replication.desc()), - &describe_as_string); - - add_broker_config_if_requested( - resource, - result, - "log.dirs", - config::node().data_directory, - include_synonyms, - maybe_make_documentation( - include_documentation, config::node().data_directory.desc()), - [](const config::data_directory_path& path) { - return path.as_sstring(); - }); - - add_broker_config_if_requested( - resource, - result, - "auto.create.topics.enable", - config::shard_local_cfg().auto_create_topics_enabled, - include_synonyms, - maybe_make_documentation( - include_documentation, - config::shard_local_cfg().auto_create_topics_enabled.desc()), - &describe_as_string); -} - -int64_t describe_retention_duration( - tristate& overrides, - std::optional def) { - if (overrides.is_disabled()) { - return -1; - } - if (overrides.has_optional_value()) { - return overrides.value().count(); - } - - return def ? def->count() : -1; -} -int64_t describe_retention_bytes( - tristate& overrides, std::optional def) { - if (overrides.is_disabled()) { - return -1; - } - if (overrides.has_optional_value()) { - return overrides.value(); - } - - return def.value_or(-1); -} - template<> ss::future describe_configs_handler::handle( request_context ctx, [[maybe_unused]] ss::smp_service_group ssg) { @@ -613,354 +85,13 @@ ss::future describe_configs_handler::handle( continue; } - /** - * Kafka properties - */ - add_topic_config_if_requested( - resource, - result, - config::shard_local_cfg().log_compression_type.name(), - ctx.metadata_cache().get_default_compression(), - topic_property_compression, - topic_config->properties.compression, - request.data.include_synonyms, - maybe_make_documentation( - request.data.include_documentation, - config::shard_local_cfg().log_compression_type.desc()), - &describe_as_string); - - add_topic_config_if_requested( - resource, - result, - config::shard_local_cfg().log_cleanup_policy.name(), - ctx.metadata_cache().get_default_cleanup_policy_bitflags(), - topic_property_cleanup_policy, - topic_config->properties.cleanup_policy_bitflags, - request.data.include_synonyms, - maybe_make_documentation( - request.data.include_documentation, - config::shard_local_cfg().log_cleanup_policy.desc()), - &describe_as_string); - - const std::string_view docstring{ - topic_config->properties.is_compacted() - ? config::shard_local_cfg().compacted_log_segment_size.desc() - : config::shard_local_cfg().log_segment_size.desc()}; - add_topic_config_if_requested( - resource, - result, - topic_config->properties.is_compacted() - ? config::shard_local_cfg().compacted_log_segment_size.name() - : config::shard_local_cfg().log_segment_size.name(), - topic_config->properties.is_compacted() - ? ctx.metadata_cache() - .get_default_compacted_topic_segment_size() - : ctx.metadata_cache().get_default_segment_size(), - topic_property_segment_size, - topic_config->properties.segment_size, - request.data.include_synonyms, - maybe_make_documentation( - request.data.include_documentation, docstring), - &describe_as_string); - - add_topic_config_if_requested( - resource, - result, - config::shard_local_cfg().delete_retention_ms.name(), - ctx.metadata_cache().get_default_retention_duration(), - topic_property_retention_duration, - topic_config->properties.retention_duration, - request.data.include_synonyms, - maybe_make_documentation( - request.data.include_documentation, - config::shard_local_cfg().delete_retention_ms.desc())); - - add_topic_config_if_requested( + report_topic_config( resource, result, - config::shard_local_cfg().retention_bytes.name(), - ctx.metadata_cache().get_default_retention_bytes(), - topic_property_retention_bytes, - topic_config->properties.retention_bytes, + ctx.metadata_cache(), + topic_config->properties, request.data.include_synonyms, - maybe_make_documentation( - request.data.include_documentation, - config::shard_local_cfg().retention_bytes.desc())); - - add_topic_config_if_requested( - resource, - result, - config::shard_local_cfg().log_message_timestamp_type.name(), - ctx.metadata_cache().get_default_timestamp_type(), - topic_property_timestamp_type, - topic_config->properties.timestamp_type, - request.data.include_synonyms, - maybe_make_documentation( - request.data.include_documentation, - config::shard_local_cfg().log_message_timestamp_type.desc()), - &describe_as_string); - - add_topic_config_if_requested( - resource, - result, - config::shard_local_cfg().kafka_batch_max_bytes.name(), - ctx.metadata_cache().get_default_batch_max_bytes(), - topic_property_max_message_bytes, - topic_config->properties.batch_max_bytes, - request.data.include_synonyms, - maybe_make_documentation( - request.data.include_documentation, - config::shard_local_cfg().kafka_batch_max_bytes.desc()), - &describe_as_string); - - // Shadow indexing properties - add_topic_config_if_requested( - resource, - result, - topic_property_remote_read, - model::is_fetch_enabled( - ctx.metadata_cache().get_default_shadow_indexing_mode()), - topic_property_remote_read, - topic_config->properties.shadow_indexing.has_value() - ? std::make_optional(model::is_fetch_enabled( - *topic_config->properties.shadow_indexing)) - : std::nullopt, - request.data.include_synonyms, - maybe_make_documentation( - request.data.include_documentation, - config::shard_local_cfg() - .cloud_storage_enable_remote_read.desc()), - &describe_as_string, - true); - - add_topic_config_if_requested( - resource, - result, - topic_property_remote_write, - model::is_archival_enabled( - ctx.metadata_cache().get_default_shadow_indexing_mode()), - topic_property_remote_write, - topic_config->properties.shadow_indexing.has_value() - ? std::make_optional(model::is_archival_enabled( - *topic_config->properties.shadow_indexing)) - : std::nullopt, - request.data.include_synonyms, - maybe_make_documentation( - request.data.include_documentation, - config::shard_local_cfg() - .cloud_storage_enable_remote_write.desc()), - &describe_as_string, - true); - - add_topic_config_if_requested( - resource, - result, - topic_property_retention_local_target_bytes, - ctx.metadata_cache().get_default_retention_local_target_bytes(), - topic_property_retention_local_target_bytes, - topic_config->properties.retention_local_target_bytes, - request.data.include_synonyms, - maybe_make_documentation( - request.data.include_documentation, - config::shard_local_cfg() - .retention_local_target_bytes_default.desc())); - - add_topic_config_if_requested( - resource, - result, - topic_property_retention_local_target_ms, - std::make_optional( - ctx.metadata_cache().get_default_retention_local_target_ms()), - topic_property_retention_local_target_ms, - topic_config->properties.retention_local_target_ms, - request.data.include_synonyms, - maybe_make_documentation( - request.data.include_documentation, - config::shard_local_cfg() - .retention_local_target_ms_default.desc())); - - if (config_property_requested( - resource.configuration_keys, topic_property_remote_delete)) { - add_topic_config( - result, - topic_property_remote_delete, - storage::ntp_config::default_remote_delete, - topic_property_remote_delete, - override_if_not_default( - std::make_optional( - topic_config->properties.remote_delete), - storage::ntp_config::default_remote_delete), - true, - maybe_make_documentation( - request.data.include_documentation, - "Controls whether topic deletion should imply deletion in " - "S3"), - [](const bool& b) { return b ? "true" : "false"; }); - } - - add_topic_config_if_requested( - resource, - result, - topic_property_segment_ms, - ctx.metadata_cache().get_default_segment_ms(), - topic_property_segment_ms, - topic_config->properties.segment_ms, - request.data.include_synonyms, - maybe_make_documentation( - request.data.include_documentation, - config::shard_local_cfg().log_segment_ms.desc())); - - constexpr std::string_view key_validation - = "Enable validation of the schema id for keys on a record"; - constexpr std::string_view val_validation - = "Enable validation of the schema id for values on a record"; - constexpr bool validation_hide_default_override = true; - - switch (config::shard_local_cfg().enable_schema_id_validation()) { - case pandaproxy::schema_registry::schema_id_validation_mode:: - compat: { - add_topic_config_if_requested( - resource, - result, - topic_property_record_key_schema_id_validation_compat, - ctx.metadata_cache() - .get_default_record_key_schema_id_validation(), - topic_property_record_key_schema_id_validation_compat, - topic_config->properties - .record_key_schema_id_validation_compat, - request.data.include_synonyms, - maybe_make_documentation( - request.data.include_documentation, key_validation), - &describe_as_string, - validation_hide_default_override); - - add_topic_config_if_requested( - resource, - result, - topic_property_record_key_subject_name_strategy_compat, - ctx.metadata_cache() - .get_default_record_key_subject_name_strategy(), - topic_property_record_key_subject_name_strategy_compat, - topic_config->properties - .record_key_subject_name_strategy_compat, - request.data.include_synonyms, - maybe_make_documentation( - request.data.include_documentation, - fmt::format( - "The subject name strategy for keys if {} is enabled", - topic_property_record_key_schema_id_validation_compat)), - [](auto sns) { - return ss::sstring(to_string_view_compat(sns)); - }, - validation_hide_default_override); - - add_topic_config_if_requested( - resource, - result, - topic_property_record_value_schema_id_validation_compat, - ctx.metadata_cache() - .get_default_record_value_schema_id_validation(), - topic_property_record_value_schema_id_validation_compat, - topic_config->properties - .record_value_schema_id_validation_compat, - request.data.include_synonyms, - maybe_make_documentation( - request.data.include_documentation, val_validation), - &describe_as_string, - validation_hide_default_override); - - add_topic_config_if_requested( - resource, - result, - topic_property_record_value_subject_name_strategy_compat, - ctx.metadata_cache() - .get_default_record_value_subject_name_strategy(), - topic_property_record_value_subject_name_strategy_compat, - topic_config->properties - .record_value_subject_name_strategy_compat, - request.data.include_synonyms, - maybe_make_documentation( - request.data.include_documentation, - fmt::format( - "The subject name strategy for values if {} is enabled", - topic_property_record_value_schema_id_validation_compat)), - [](auto sns) { - return ss::sstring(to_string_view_compat(sns)); - }, - validation_hide_default_override); - [[fallthrough]]; - } - case pandaproxy::schema_registry::schema_id_validation_mode:: - redpanda: { - add_topic_config_if_requested( - resource, - result, - topic_property_record_key_schema_id_validation, - ctx.metadata_cache() - .get_default_record_key_schema_id_validation(), - topic_property_record_key_schema_id_validation, - topic_config->properties.record_key_schema_id_validation, - request.data.include_synonyms, - maybe_make_documentation( - request.data.include_documentation, key_validation), - &describe_as_string, - validation_hide_default_override); - - add_topic_config_if_requested( - resource, - result, - topic_property_record_key_subject_name_strategy, - ctx.metadata_cache() - .get_default_record_key_subject_name_strategy(), - topic_property_record_key_subject_name_strategy, - topic_config->properties.record_key_subject_name_strategy, - request.data.include_synonyms, - maybe_make_documentation( - request.data.include_documentation, - fmt::format( - "The subject name strategy for keys if {} is enabled", - topic_property_record_key_schema_id_validation)), - &describe_as_string< - pandaproxy::schema_registry::subject_name_strategy>, - validation_hide_default_override); - - add_topic_config_if_requested( - resource, - result, - topic_property_record_value_schema_id_validation, - ctx.metadata_cache() - .get_default_record_value_schema_id_validation(), - topic_property_record_value_schema_id_validation, - topic_config->properties.record_value_schema_id_validation, - request.data.include_synonyms, - maybe_make_documentation( - request.data.include_documentation, val_validation), - &describe_as_string, - validation_hide_default_override); - - add_topic_config_if_requested( - resource, - result, - topic_property_record_value_subject_name_strategy, - ctx.metadata_cache() - .get_default_record_value_subject_name_strategy(), - topic_property_record_value_subject_name_strategy, - topic_config->properties.record_value_subject_name_strategy, - request.data.include_synonyms, - maybe_make_documentation( - request.data.include_documentation, - fmt::format( - "The subject name strategy for values if {} is enabled", - topic_property_record_value_schema_id_validation)), - &describe_as_string< - pandaproxy::schema_registry::subject_name_strategy>, - validation_hide_default_override); - } - case pandaproxy::schema_registry::schema_id_validation_mode::none: { - break; - } - } - + request.data.include_documentation); break; } From 24ed9d9c0e62ff533b4f2877871d9729cdf18435 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gell=C3=A9rt=20Peresztegi-Nagy?= Date: Tue, 12 Mar 2024 07:32:13 +0000 Subject: [PATCH 2/6] kafka/test: config_response_utils_test This adds unit tests for the config response creating methods. These methods grew quite complex with various overloads of the method exhibiting different behaviour, so I've added some tests partly to demonstrate their behaviour and partly to enable a future cleanup of the methods. (cherry picked from commit 773f83656c5f6243240777946716212da6d013b1) --- .../handlers/configs/config_response_utils.cc | 23 ++- src/v/kafka/server/tests/CMakeLists.txt | 1 + .../tests/config_response_utils_test.cc | 142 ++++++++++++++++++ .../tests/config_response_utils_test_help.h | 45 ++++++ 4 files changed, 208 insertions(+), 3 deletions(-) create mode 100644 src/v/kafka/server/tests/config_response_utils_test.cc create mode 100644 src/v/kafka/server/tests/config_response_utils_test_help.h diff --git a/src/v/kafka/server/handlers/configs/config_response_utils.cc b/src/v/kafka/server/handlers/configs/config_response_utils.cc index aa476e577ae78..30dee1d133f8c 100644 --- a/src/v/kafka/server/handlers/configs/config_response_utils.cc +++ b/src/v/kafka/server/handlers/configs/config_response_utils.cc @@ -57,10 +57,13 @@ static void add_config_if_requested( } template -static ss::sstring describe_as_string(const T& t) { +ss::sstring describe_as_string(const T& t) { return ssx::sformat("{}", t); } +// Instantiate explicitly for unit testing +template ss::sstring describe_as_string(const int&); + // Kafka protocol defines integral types by sizes. See // https://kafka.apache.org/protocol.html // Therefore we should also use type sizes for integrals and use Java type sizes @@ -262,7 +265,7 @@ override_if_not_default(const std::optional& override, const T& def) { } template -static void add_topic_config_if_requested( +void add_topic_config_if_requested( const describe_configs_resource& resource, describe_configs_result& result, std::string_view default_name, @@ -293,6 +296,20 @@ static void add_topic_config_if_requested( } } +// Instantiate explicitly for unit testing +using describe_int_t = decltype(&describe_as_string); +template void add_topic_config_if_requested( + const describe_configs_resource& resource, + describe_configs_result& result, + std::string_view default_name, + const int& default_value, + std::string_view override_name, + const std::optional& overrides, + bool include_synonyms, + std::optional documentation, + describe_int_t&& describe_f, + bool hide_default_override = false); + template static ss::sstring maybe_print_tristate(const tristate& tri) { if (tri.is_disabled() || !tri.has_optional_value()) { @@ -329,7 +346,7 @@ static void add_topic_config( } template -static void add_topic_config_if_requested( +void add_topic_config_if_requested( const describe_configs_resource& resource, describe_configs_result& result, std::string_view default_name, diff --git a/src/v/kafka/server/tests/CMakeLists.txt b/src/v/kafka/server/tests/CMakeLists.txt index 44bdd9e5f11b9..82fc3666d61e0 100644 --- a/src/v/kafka/server/tests/CMakeLists.txt +++ b/src/v/kafka/server/tests/CMakeLists.txt @@ -11,6 +11,7 @@ rp_test( validator_tests.cc fetch_unit_test.cc config_utils_test.cc + config_response_utils_test.cc DEFINITIONS BOOST_TEST_DYN_LINK LIBRARIES Boost::unit_test_framework v::kafka LABELS kafka diff --git a/src/v/kafka/server/tests/config_response_utils_test.cc b/src/v/kafka/server/tests/config_response_utils_test.cc new file mode 100644 index 0000000000000..74c70fb89b673 --- /dev/null +++ b/src/v/kafka/server/tests/config_response_utils_test.cc @@ -0,0 +1,142 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +#include "kafka/server/tests/config_response_utils_test_help.h" +#include "utils/to_string.h" + +#include +#include +#include + +#include +#include +#include +#include + +std::optional +get_config(const kafka::describe_configs_result& result, std::string_view key) { + for (const auto& config : result.configs) { + if (config.name == key) { + return config.value; + } + } + throw std::runtime_error(fmt::format("Key not found: {}", key)); +} + +kafka::describe_configs_source get_config_source( + const kafka::describe_configs_result& result, std::string_view key) { + for (const auto& config : result.configs) { + if (config.name == key) { + return config.config_source; + } + } + throw std::runtime_error(fmt::format("Key not found: {}", key)); +} + +BOOST_AUTO_TEST_CASE(add_topic_config_if_requested_tristate) { + using namespace kafka; + auto verify_tristate_config = []( + std::optional default_value, + tristate override_value, + std::optional expected_value) { + describe_configs_resource resource{}; + describe_configs_result result{}; + + add_topic_config_if_requested( + resource, + result, + "test-global-broker-config-name", + default_value, + "test-topic-override-name", + override_value, + false, + std::nullopt); + + BOOST_CHECK_EQUAL( + get_config(result, "test-topic-override-name"), expected_value); + }; + + // clang-format off + verify_tristate_config(std::make_optional(2), tristate(1), std::make_optional("1")); + verify_tristate_config(std::make_optional(2), tristate(std::nullopt), std::make_optional("2")); + verify_tristate_config(std::make_optional(2), tristate(), std::make_optional("-1")); + + verify_tristate_config(std::nullopt, tristate(1), std::make_optional("1")); + verify_tristate_config(std::nullopt, tristate(std::nullopt), std::make_optional("-1")); + verify_tristate_config(std::nullopt, tristate(), std::make_optional("-1")); + // clang-format on +} + +BOOST_AUTO_TEST_CASE(add_topic_config_if_requested_optional) { + using namespace kafka; + auto verify_optional_config = []( + int default_value, + std::optional override_value, + std::optional expected_value, + bool hide_default_override) { + describe_configs_resource resource{}; + describe_configs_result result{}; + + add_topic_config_if_requested( + resource, + result, + "test-global-broker-config-name", + default_value, + "test-topic-override-name", + override_value, + false, + std::nullopt, + &describe_as_string, + hide_default_override); + + BOOST_CHECK_EQUAL( + get_config(result, "test-topic-override-name"), expected_value); + }; + + // clang-format off + verify_optional_config(2, std::make_optional(1), std::make_optional("1"), false); + verify_optional_config(2, std::nullopt, std::make_optional("2"), false); + // clang-format on +} + +BOOST_AUTO_TEST_CASE(add_topic_config_if_requested_optional_hide_default) { + using namespace kafka; + + auto verify_optional_config_with_hide_override = + []( + bool hide_default_override, + kafka::describe_configs_source expected_source) { + describe_configs_resource resource{}; + describe_configs_result result{}; + + add_topic_config_if_requested( + resource, + result, + "test-global-broker-config-name", + 2, + "test-topic-override-name", + std::make_optional(2), + false, + std::nullopt, + &describe_as_string, + hide_default_override); + + BOOST_CHECK_EQUAL( + get_config(result, "test-topic-override-name"), + std::make_optional("2")); + BOOST_CHECK_EQUAL( + get_config_source(result, "test-topic-override-name"), + expected_source); + }; + + // clang-format off + verify_optional_config_with_hide_override(false, kafka::describe_configs_source::topic); + verify_optional_config_with_hide_override(true, kafka::describe_configs_source::default_config); + // clang-format on +} diff --git a/src/v/kafka/server/tests/config_response_utils_test_help.h b/src/v/kafka/server/tests/config_response_utils_test_help.h new file mode 100644 index 0000000000000..e4b9d6ed5de93 --- /dev/null +++ b/src/v/kafka/server/tests/config_response_utils_test_help.h @@ -0,0 +1,45 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#pragma once + +#include "kafka/protocol/describe_configs.h" + +namespace kafka { + +template +ss::sstring describe_as_string(const T& t); + +template +void add_topic_config_if_requested( + const describe_configs_resource& resource, + describe_configs_result& result, + std::string_view default_name, + const T& default_value, + std::string_view override_name, + const std::optional& overrides, + bool include_synonyms, + std::optional documentation, + Func&& describe_f, + bool hide_default_override = false); + +template +void add_topic_config_if_requested( + const describe_configs_resource& resource, + describe_configs_result& result, + std::string_view default_name, + const std::optional& default_value, + std::string_view override_name, + const tristate& overrides, + bool include_synonyms, + std::optional documentation); + +} // namespace kafka From 618d3e5e80ee75191efb54847b4391ebc6190b48 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gell=C3=A9rt=20Peresztegi-Nagy?= Date: Tue, 12 Mar 2024 09:12:08 +0000 Subject: [PATCH 3/6] kafka: switch to report full topic configs This is the main change of the PR. It changes the CreateTopics handler to use the shared code of DescribeConfigs to generate the configs returned to the client. We can share this code because the DescribeConfigs object is a more general version of the configs returned in the CreateTopics endpoint, so we can just derive the latter from the former. The reason for changing the CreateTopics handler this way is that currently the CreateTopics handler has a bug where it only ever returns the topic-specific override configs of the topic in the response, whereas it should return all the configs of the topic (incl both the topic-specific override configs and the global broker-level-set topic config defaults). This is the behavour that Apache Kafka exhibits and the behaviour that KIP 525 prescribes. Ref: * https://cwiki.apache.org/confluence/display/KAFKA/KIP-525+-+Return+topic+metadata+and+configs+in+CreateTopics+response (cherry picked from commit 41f9d664ba37841befae30480ff712dbc4df2eea) --- .../handlers/configs/config_response_utils.cc | 23 ++++++++++++++ .../handlers/configs/config_response_utils.h | 5 ++++ src/v/kafka/server/handlers/create_topics.cc | 30 +++++-------------- .../kafka/server/tests/create_topics_test.cc | 12 +++++--- 4 files changed, 43 insertions(+), 27 deletions(-) diff --git a/src/v/kafka/server/handlers/configs/config_response_utils.cc b/src/v/kafka/server/handlers/configs/config_response_utils.cc index 30dee1d133f8c..6b65a381c7dd2 100644 --- a/src/v/kafka/server/handlers/configs/config_response_utils.cc +++ b/src/v/kafka/server/handlers/configs/config_response_utils.cc @@ -867,4 +867,27 @@ void report_broker_config( &describe_as_string); } +std::vector make_configs( + const cluster::metadata_cache& metadata_cache, + const cluster::topic_properties& topic_config) { + describe_configs_resource resource{}; + describe_configs_result describe_result{}; + + report_topic_config( + resource, describe_result, metadata_cache, topic_config, false, false); + + std::vector result; + result.reserve(describe_result.configs.size()); + + for (auto& describe_conf : describe_result.configs) { + result.push_back(creatable_topic_configs{ + .name = std::move(describe_conf.name), + .value = std::move(describe_conf.value), + .config_source = describe_conf.config_source, + }); + } + + return result; +} + } // namespace kafka diff --git a/src/v/kafka/server/handlers/configs/config_response_utils.h b/src/v/kafka/server/handlers/configs/config_response_utils.h index cbff3ac7d60c6..a22c5c56f64bf 100644 --- a/src/v/kafka/server/handlers/configs/config_response_utils.h +++ b/src/v/kafka/server/handlers/configs/config_response_utils.h @@ -13,6 +13,7 @@ #include "cluster/types.h" #include "kafka/protocol/describe_configs.h" +#include "kafka/protocol/create_topics.h" namespace kafka { @@ -30,4 +31,8 @@ void report_broker_config( bool include_synonyms, bool include_documentation); +std::vector make_configs( + const cluster::metadata_cache& metadata_cache, + const cluster::topic_properties& topic_config); + } // namespace kafka diff --git a/src/v/kafka/server/handlers/create_topics.cc b/src/v/kafka/server/handlers/create_topics.cc index f4967ad9a51e0..851aba4120ef9 100644 --- a/src/v/kafka/server/handlers/create_topics.cc +++ b/src/v/kafka/server/handlers/create_topics.cc @@ -12,9 +12,11 @@ #include "cluster/cluster_utils.h" #include "cluster/metadata_cache.h" #include "cluster/topics_frontend.h" +#include "cluster/types.h" #include "config/configuration.h" #include "kafka/protocol/errors.h" #include "kafka/protocol/timeout.h" +#include "kafka/server/handlers/configs/config_response_utils.h" #include "kafka/server/handlers/topics/topic_utils.h" #include "kafka/server/handlers/topics/types.h" #include "kafka/server/quota_manager.h" @@ -31,6 +33,7 @@ #include #include +#include #include namespace kafka { @@ -83,24 +86,6 @@ using validators = make_validator_types< batch_max_bytes_limits, subject_name_strategy_validator>; -static std::vector -properties_to_result_configs(config_map_t config_map) { - std::vector configs; - configs.reserve(config_map.size()); - std::transform( - config_map.begin(), - config_map.end(), - std::back_inserter(configs), - [](auto& cfg) { - return creatable_topic_configs{ - .name = cfg.first, - .value = {std::move(cfg.second)}, - .config_source = kafka::describe_configs_source::default_config, - }; - }); - return configs; -} - static void append_topic_configs(request_context& ctx, create_topics_response& response) { for (auto& ct_result : response.data.topics) { @@ -111,9 +96,8 @@ append_topic_configs(request_context& ctx, create_topics_response& response) { auto cfg = ctx.metadata_cache().get_topic_cfg( model::topic_namespace_view{model::kafka_namespace, ct_result.name}); if (cfg) { - auto config_map = from_cluster_type(cfg->properties); - ct_result.configs = { - properties_to_result_configs(std::move(config_map))}; + ct_result.configs = std::make_optional( + make_configs(ctx.metadata_cache(), cfg->properties)); ct_result.topic_config_error_code = kafka::error_code::none; } else { // Topic was sucessfully created but metadata request did not @@ -231,8 +215,8 @@ ss::future create_topics_handler::handle( if (ctx.header().version >= api_version(5)) { auto default_properties = ctx.metadata_cache().get_default_properties(); - result.configs = {properties_to_result_configs( - from_cluster_type(default_properties))}; + result.configs = std::make_optional( + make_configs(ctx.metadata_cache(), default_properties)); } return result; }); diff --git a/src/v/kafka/server/tests/create_topics_test.cc b/src/v/kafka/server/tests/create_topics_test.cc index 40ea8d8751901..48292ba0d184a 100644 --- a/src/v/kafka/server/tests/create_topics_test.cc +++ b/src/v/kafka/server/tests/create_topics_test.cc @@ -9,6 +9,7 @@ #include "kafka/protocol/create_topics.h" #include "kafka/protocol/metadata.h" +#include "kafka/server/handlers/configs/config_response_utils.h" #include "kafka/server/handlers/topics/types.h" #include "redpanda/tests/fixture.h" #include "resource_mgmt/io_priority.h" @@ -131,8 +132,10 @@ class create_topic_fixture : public redpanda_thread_fixture { /// Server should return default configs BOOST_TEST(topic_res.configs, "empty config response"); auto cfg_map = config_map(*topic_res.configs); - const auto default_topic_properties = kafka::from_cluster_type( - app.metadata_cache.local().get_default_properties()); + const auto default_topic_properties = config_map( + kafka::make_configs( + app.metadata_cache.local(), + app.metadata_cache.local().get_default_properties())); BOOST_TEST( cfg_map == default_topic_properties, "incorrect default properties"); @@ -149,8 +152,9 @@ class create_topic_fixture : public redpanda_thread_fixture { auto cfg = app.metadata_cache.local().get_topic_cfg( model::topic_namespace_view{model::kafka_namespace, topic_res.name}); BOOST_TEST(cfg, "missing topic config"); - auto config_map = kafka::from_cluster_type(cfg->properties); - BOOST_TEST(config_map == resp_cfgs, "configs didn't match"); + auto cfg_map = config_map( + kafka::make_configs(app.metadata_cache.local(), cfg->properties)); + BOOST_TEST(cfg_map == resp_cfgs, "configs didn't match"); BOOST_CHECK_EQUAL( topic_res.topic_config_error_code, kafka::error_code::none); } From 30824408e1789ba5850962d26c6d3bb8cbb08114 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gell=C3=A9rt=20Peresztegi-Nagy?= Date: Tue, 12 Mar 2024 09:12:55 +0000 Subject: [PATCH 4/6] kafka: delete unused config code This removes somee methods in the kafka config handling code that have either now become unused or have already been unused even before the changes in this PR. (cherry picked from commit 1002b63893b73f7bea371350a32294aaa2f92f85) --- src/v/kafka/server/handlers/topics/types.cc | 135 -------------------- src/v/kafka/server/handlers/topics/types.h | 1 - 2 files changed, 136 deletions(-) diff --git a/src/v/kafka/server/handlers/topics/types.cc b/src/v/kafka/server/handlers/topics/types.cc index 38c919c22541d..e0c280ccde1ba 100644 --- a/src/v/kafka/server/handlers/topics/types.cc +++ b/src/v/kafka/server/handlers/topics/types.cc @@ -218,139 +218,4 @@ to_cluster_type(const creatable_topic& t) { return ret; } -template -static ss::sstring from_config_type(const T& v) { - if constexpr (std::is_enum_v) { - return ssx::sformat("{}", v); - } else if constexpr (std::is_same_v) { - return v ? "true" : "false"; - } else if constexpr (std::is_same_v) { - return ss::to_sstring( - std::chrono::duration_cast(v).count()); - } else { - return ss::to_sstring(v); - } -} - -config_map_t from_cluster_type(const cluster::topic_properties& properties) { - config_map_t config_entries; - if (properties.compression) { - config_entries[topic_property_compression] = from_config_type( - *properties.compression); - } - if (properties.cleanup_policy_bitflags) { - config_entries[topic_property_cleanup_policy] = from_config_type( - *properties.cleanup_policy_bitflags); - } - if (properties.compaction_strategy) { - config_entries[topic_property_compaction_strategy] = from_config_type( - *properties.compaction_strategy); - } - if (properties.timestamp_type) { - config_entries[topic_property_timestamp_type] = from_config_type( - *properties.timestamp_type); - } - if (properties.segment_size) { - config_entries[topic_property_segment_size] = from_config_type( - *properties.segment_size); - } - if (properties.retention_bytes.has_optional_value()) { - config_entries[topic_property_retention_bytes] = from_config_type( - properties.retention_bytes.value()); - } - if (properties.retention_duration.has_optional_value()) { - config_entries[topic_property_retention_duration] = from_config_type( - *properties.retention_duration); - } - if (properties.recovery) { - config_entries[topic_property_recovery] = from_config_type( - *properties.recovery); - } - if (properties.batch_max_bytes) { - config_entries[topic_property_max_message_bytes] = from_config_type( - *properties.batch_max_bytes); - } - if (properties.shadow_indexing) { - config_entries[topic_property_remote_write] = "false"; - config_entries[topic_property_remote_read] = "false"; - - switch (*properties.shadow_indexing) { - case model::shadow_indexing_mode::archival: - config_entries[topic_property_remote_write] = "true"; - break; - case model::shadow_indexing_mode::fetch: - config_entries[topic_property_remote_read] = "true"; - break; - case model::shadow_indexing_mode::full: - config_entries[topic_property_remote_write] = "true"; - config_entries[topic_property_remote_read] = "true"; - break; - default: - break; - } - } - if (properties.read_replica_bucket) { - config_entries[topic_property_read_replica] = from_config_type( - *properties.read_replica_bucket); - } - - if (properties.retention_local_target_bytes.has_optional_value()) { - config_entries[topic_property_retention_local_target_bytes] - = from_config_type(*properties.retention_local_target_bytes); - } - - if (properties.retention_local_target_ms.has_optional_value()) { - config_entries[topic_property_retention_local_target_ms] - = from_config_type(*properties.retention_local_target_ms); - } - - config_entries[topic_property_remote_delete] = from_config_type( - properties.remote_delete); - - if (properties.segment_ms.has_optional_value()) { - config_entries[topic_property_segment_ms] = from_config_type( - properties.segment_ms.value()); - } - - if (properties.record_key_schema_id_validation) { - config_entries[topic_property_record_key_schema_id_validation] - = from_config_type(properties.record_key_schema_id_validation); - } - if (properties.record_key_schema_id_validation_compat) { - config_entries[topic_property_record_key_schema_id_validation_compat] - = from_config_type(properties.record_key_schema_id_validation_compat); - } - if (properties.record_key_subject_name_strategy) { - config_entries[topic_property_record_key_subject_name_strategy] - = from_config_type(properties.record_key_subject_name_strategy); - } - if (properties.record_key_subject_name_strategy_compat) { - config_entries[topic_property_record_key_subject_name_strategy_compat] - = from_config_type( - properties.record_key_subject_name_strategy_compat); - } - if (properties.record_value_schema_id_validation) { - config_entries[topic_property_record_value_schema_id_validation] - = from_config_type(properties.record_value_schema_id_validation); - } - if (properties.record_value_schema_id_validation_compat) { - config_entries[topic_property_record_value_schema_id_validation_compat] - = from_config_type( - properties.record_value_schema_id_validation_compat); - } - if (properties.record_value_subject_name_strategy) { - config_entries[topic_property_record_value_subject_name_strategy] - = from_config_type(properties.record_value_subject_name_strategy); - } - if (properties.record_value_subject_name_strategy_compat) { - config_entries[topic_property_record_value_subject_name_strategy_compat] - = from_config_type( - properties.record_value_subject_name_strategy_compat); - } - - /// Final topic_property not encoded here is \ref remote_topic_properties, - /// is more of an implementation detail no need to ever show user - return config_entries; -} - } // namespace kafka diff --git a/src/v/kafka/server/handlers/topics/types.h b/src/v/kafka/server/handlers/topics/types.h index 75a0e6451b279..88aa34b0c3872 100644 --- a/src/v/kafka/server/handlers/topics/types.h +++ b/src/v/kafka/server/handlers/topics/types.h @@ -139,5 +139,4 @@ config_map_t config_map(const std::vector& config); cluster::custom_assignable_topic_configuration to_cluster_type(const creatable_topic& t); -config_map_t from_cluster_type(const cluster::topic_properties&); } // namespace kafka From e10f89d79189ecc6208e01ddc55342411d93542a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gell=C3=A9rt=20Peresztegi-Nagy?= Date: Wed, 6 Mar 2024 19:31:41 +0000 Subject: [PATCH 5/6] kafka/test: create topics response This adds a regression test to ensure that the configs returned by describe configs and the configs returned by create topics match. This updates/removes an earlier test which only checked that a single specific config was present. NOTE: the old test I am updating would fail with the current code. That is because the new code returns a different source for the cleanup.policy config, namely DYNAMIC_TOPIC_CONFIG (source=1) instead of DEFAULT_CONFIG (source=5). Apache Kafka (and our DescribeConfigs endpoint) both return source=5 in this case, so I believe this is a bugfix rather than a regression. (cherry picked from commit 5edfb74f3f8d25bf499e1f06a500ffa316564cb9) --- tests/rptest/tests/topic_creation_test.py | 39 ++++++++++++++++++----- 1 file changed, 31 insertions(+), 8 deletions(-) diff --git a/tests/rptest/tests/topic_creation_test.py b/tests/rptest/tests/topic_creation_test.py index f6df6df85ce01..73f1363629993 100644 --- a/tests/rptest/tests/topic_creation_test.py +++ b/tests/rptest/tests/topic_creation_test.py @@ -309,6 +309,11 @@ class CreateTopicsResponseTest(RedpandaTest): DEFAULT_CLEANUP_POLICY = 'delete' DEFAULT_CONFIG_SOURCE = 5 + CONFIG_SOURCE_MAPPING = { + 1: 'DYNAMIC_TOPIC_CONFIG', + 5: 'DEFAULT_CONFIG', + } + def __init__(self, test_context): super(CreateTopicsResponseTest, self).__init__(test_context=test_context) @@ -331,6 +336,16 @@ def create_topics(self, p_cnt, r_fac, n=1, validate_only=False): topics=topics, validate_only=validate_only) + def create_topic(self, name): + topics = [{ + 'name': f"{name}", + 'partition_count': 1, + 'replication_factor': 1 + }] + return self.kcl_client.create_topics(6, + topics=topics, + validate_only=False) + def get_np(self, tp): return tp['NumPartitions'] @@ -390,14 +405,22 @@ def test_create_topic_response_configs(self): b. serialized correctly """ - topics = self.create_topics(1, 1) - for topic in topics: - cleanup_policy = self.get_config_by_name(topic, 'cleanup.policy') - assert cleanup_policy is not None, "cleanup.policy missing from topic config" - assert cleanup_policy[ - 'Value'] == self.DEFAULT_CLEANUP_POLICY, f"cleanup.policy = {cleanup_policy['Value']}, expected {self.DEFAULT_CLEANUP_POLICY}" - assert cleanup_policy[ - 'Source'] == self.DEFAULT_CONFIG_SOURCE, f"cleanup.policy = {cleanup_policy['Source']}, expected {self.DEFAULT_CONFIG_SOURCE}" + topic_name = 'test-create-topic-response' + create_topics_response = self.create_topic(topic_name) + topic_response = create_topics_response[0] + + res = self.kcl_client.describe_topic(topic_name) + describe_configs = [line.split() for line in res.strip().split('\n')] + + for (key, value, source) in describe_configs: + topic_config = self.get_config_by_name(topic_response, key) + + assert topic_config, f"Config '{key}' returned by DescribeConfigs is missing from configs response in CreateTopic" + assert topic_config[ + 'Value'] == value, f"config value mismatch for {key} across CreateTopic and DescribeConfigs: {topic_config['Value']} != {value}" + + assert self.CONFIG_SOURCE_MAPPING[topic_config[ + 'Source']] == source, f"config source mismatch for {key} across CreateTopic and DescribeConfigs: {self.CONFIG_SOURCE_MAPPING[topic_config['Source']]} != {source}" @cluster(num_nodes=3) def test_create_topic_validate_only(self): From 2501f66bc9e12153ee47f3c258db278dbd9d52cb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gell=C3=A9rt=20Peresztegi-Nagy?= Date: Thu, 14 Mar 2024 08:30:28 +0000 Subject: [PATCH 6/6] kafka: refactor with general config types This refactors the config response utility code to use types that are not specific to neither the describe configs nor the create topics handlers. (cherry picked from commit 4f5d6b671436dd91052a3674daaaa7ebd60e275e) --- .../handlers/configs/config_response_utils.cc | 215 +++++++++--------- .../handlers/configs/config_response_utils.h | 37 ++- src/v/kafka/server/handlers/create_topics.cc | 6 +- .../kafka/server/handlers/describe_configs.cc | 59 +++++ src/v/kafka/server/handlers/topics/types.cc | 21 ++ src/v/kafka/server/handlers/topics/types.h | 5 + .../tests/config_response_utils_test.cc | 25 +- .../tests/config_response_utils_test_help.h | 9 +- .../kafka/server/tests/create_topics_test.cc | 6 +- 9 files changed, 245 insertions(+), 138 deletions(-) diff --git a/src/v/kafka/server/handlers/configs/config_response_utils.cc b/src/v/kafka/server/handlers/configs/config_response_utils.cc index 6b65a381c7dd2..828153bda3f96 100644 --- a/src/v/kafka/server/handlers/configs/config_response_utils.cc +++ b/src/v/kafka/server/handlers/configs/config_response_utils.cc @@ -21,7 +21,7 @@ namespace kafka { static bool config_property_requested( - const std::optional>& configuration_keys, + const config_key_t& configuration_keys, const std::string_view property_name) { return !configuration_keys.has_value() || std::find( @@ -46,12 +46,12 @@ static void add_config( template static void add_config_if_requested( - const describe_configs_resource& resource, + const config_key_t& configuration_keys, describe_configs_result& result, std::string_view name, T value, describe_configs_source source) { - if (config_property_requested(resource.configuration_keys, name)) { + if (config_property_requested(configuration_keys, name)) { add_config(result, name, value, source); } } @@ -136,7 +136,7 @@ consteval describe_configs_type property_config_type() { template static void add_broker_config( - describe_configs_result& result, + config_response_container_t& result, std::string_view name, const config::property& property, bool include_synonyms, @@ -173,7 +173,7 @@ static void add_broker_config( } } - result.configs.push_back(describe_configs_resource_result{ + result.push_back(config_response{ .name = ss::sstring(name), .value = describe_f(property.value()), .config_source = src, @@ -185,14 +185,14 @@ static void add_broker_config( template static void add_broker_config_if_requested( - const describe_configs_resource& resource, - describe_configs_result& result, + const config_key_t& config_keys, + config_response_container_t& result, std::string_view name, const config::property& property, bool include_synonyms, std::optional documentation, Func&& describe_f) { - if (config_property_requested(resource.configuration_keys, name)) { + if (config_property_requested(config_keys, name)) { add_broker_config( result, name, @@ -205,7 +205,7 @@ static void add_broker_config_if_requested( template static void add_topic_config( - describe_configs_result& result, + config_response_container_t& result, std::string_view default_name, const T& default_value, std::string_view override_name, @@ -235,7 +235,7 @@ static void add_topic_config( }); } - result.configs.push_back(describe_configs_resource_result{ + result.push_back(config_response{ .name = ss::sstring(override_name), .value = describe_f(overrides.value_or(default_value)), .config_source = src, @@ -266,8 +266,8 @@ override_if_not_default(const std::optional& override, const T& def) { template void add_topic_config_if_requested( - const describe_configs_resource& resource, - describe_configs_result& result, + const config_key_t& config_keys, + config_response_container_t& result, std::string_view default_name, const T& default_value, std::string_view override_name, @@ -276,7 +276,7 @@ void add_topic_config_if_requested( std::optional documentation, Func&& describe_f, bool hide_default_override = false) { - if (config_property_requested(resource.configuration_keys, override_name)) { + if (config_property_requested(config_keys, override_name)) { std::optional overrides_val; if (hide_default_override) { overrides_val = override_if_not_default(overrides, default_value); @@ -299,8 +299,8 @@ void add_topic_config_if_requested( // Instantiate explicitly for unit testing using describe_int_t = decltype(&describe_as_string); template void add_topic_config_if_requested( - const describe_configs_resource& resource, - describe_configs_result& result, + const config_key_t& config_keys, + config_response_container_t& result, std::string_view default_name, const int& default_value, std::string_view override_name, @@ -320,7 +320,7 @@ static ss::sstring maybe_print_tristate(const tristate& tri) { template static void add_topic_config( - describe_configs_result& result, + config_response_container_t& result, std::string_view default_name, const std::optional& default_value, std::string_view override_name, @@ -347,15 +347,15 @@ static void add_topic_config( template void add_topic_config_if_requested( - const describe_configs_resource& resource, - describe_configs_result& result, + const config_key_t& config_keys, + config_response_container_t& result, std::string_view default_name, const std::optional& default_value, std::string_view override_name, const tristate& overrides, bool include_synonyms, std::optional documentation) { - if (config_property_requested(resource.configuration_keys, override_name)) { + if (config_property_requested(config_keys, override_name)) { add_topic_config( result, default_name, @@ -367,6 +367,37 @@ void add_topic_config_if_requested( } } +// Instantiate explicitly for unit testing +template void add_topic_config_if_requested( + const config_key_t& config_keys, + config_response_container_t& result, + std::string_view default_name, + const std::optional& default_value, + std::string_view override_name, + const tristate& overrides, + bool include_synonyms, + std::optional documentation); + +template +static void add_topic_config_if_requested( + const config_key_t& config_keys, + config_response_container_t& result, + std::string_view override_name, + const std::optional& overrides, + bool include_synonyms, + std::optional documentation, + Func&& describe_f) { + if (config_property_requested(config_keys, override_name)) { + add_topic_config( + result, + override_name, + overrides, + include_synonyms, + documentation, + std::forward(describe_f)); + } +} + static ss::sstring kafka_endpoint_format(const std::vector& endpoints) { std::vector uris; @@ -409,18 +440,16 @@ static inline std::optional maybe_make_documentation( : std::nullopt; } -void report_topic_config( - const describe_configs_resource& resource, - describe_configs_result& result, +config_response_container_t make_topic_configs( const cluster::metadata_cache& metadata_cache, const cluster::topic_properties& topic_properties, + const config_key_t& config_keys, bool include_synonyms, bool include_documentation) { - /** - * Kafka properties - */ + config_response_container_t result; + add_topic_config_if_requested( - resource, + config_keys, result, config::shard_local_cfg().log_compression_type.name(), metadata_cache.get_default_compression(), @@ -433,7 +462,7 @@ void report_topic_config( &describe_as_string); add_topic_config_if_requested( - resource, + config_keys, result, config::shard_local_cfg().log_cleanup_policy.name(), metadata_cache.get_default_cleanup_policy_bitflags(), @@ -450,7 +479,7 @@ void report_topic_config( ? config::shard_local_cfg().compacted_log_segment_size.desc() : config::shard_local_cfg().log_segment_size.desc()}; add_topic_config_if_requested( - resource, + config_keys, result, topic_properties.is_compacted() ? config::shard_local_cfg().compacted_log_segment_size.name() @@ -465,7 +494,7 @@ void report_topic_config( &describe_as_string); add_topic_config_if_requested( - resource, + config_keys, result, config::shard_local_cfg().delete_retention_ms.name(), metadata_cache.get_default_retention_duration(), @@ -477,7 +506,7 @@ void report_topic_config( config::shard_local_cfg().delete_retention_ms.desc())); add_topic_config_if_requested( - resource, + config_keys, result, config::shard_local_cfg().retention_bytes.name(), metadata_cache.get_default_retention_bytes(), @@ -489,7 +518,7 @@ void report_topic_config( config::shard_local_cfg().retention_bytes.desc())); add_topic_config_if_requested( - resource, + config_keys, result, config::shard_local_cfg().log_message_timestamp_type.name(), metadata_cache.get_default_timestamp_type(), @@ -502,7 +531,7 @@ void report_topic_config( &describe_as_string); add_topic_config_if_requested( - resource, + config_keys, result, config::shard_local_cfg().kafka_batch_max_bytes.name(), metadata_cache.get_default_batch_max_bytes(), @@ -516,7 +545,7 @@ void report_topic_config( // Shadow indexing properties add_topic_config_if_requested( - resource, + config_keys, result, topic_property_remote_read, model::is_fetch_enabled( @@ -533,7 +562,7 @@ void report_topic_config( true); add_topic_config_if_requested( - resource, + config_keys, result, topic_property_remote_write, model::is_archival_enabled( @@ -550,7 +579,7 @@ void report_topic_config( true); add_topic_config_if_requested( - resource, + config_keys, result, topic_property_retention_local_target_bytes, metadata_cache.get_default_retention_local_target_bytes(), @@ -562,7 +591,7 @@ void report_topic_config( config::shard_local_cfg().retention_local_target_bytes_default.desc())); add_topic_config_if_requested( - resource, + config_keys, result, topic_property_retention_local_target_ms, std::make_optional( @@ -574,8 +603,7 @@ void report_topic_config( include_documentation, config::shard_local_cfg().retention_local_target_ms_default.desc())); - if (config_property_requested( - resource.configuration_keys, topic_property_remote_delete)) { + if (config_property_requested(config_keys, topic_property_remote_delete)) { add_topic_config( result, topic_property_remote_delete, @@ -593,7 +621,7 @@ void report_topic_config( } add_topic_config_if_requested( - resource, + config_keys, result, topic_property_segment_ms, metadata_cache.get_default_segment_ms(), @@ -613,7 +641,7 @@ void report_topic_config( switch (config::shard_local_cfg().enable_schema_id_validation()) { case pandaproxy::schema_registry::schema_id_validation_mode::compat: { add_topic_config_if_requested( - resource, + config_keys, result, topic_property_record_key_schema_id_validation_compat, metadata_cache.get_default_record_key_schema_id_validation(), @@ -625,7 +653,7 @@ void report_topic_config( validation_hide_default_override); add_topic_config_if_requested( - resource, + config_keys, result, topic_property_record_key_subject_name_strategy_compat, metadata_cache.get_default_record_key_subject_name_strategy(), @@ -641,7 +669,7 @@ void report_topic_config( validation_hide_default_override); add_topic_config_if_requested( - resource, + config_keys, result, topic_property_record_value_schema_id_validation_compat, metadata_cache.get_default_record_value_schema_id_validation(), @@ -653,7 +681,7 @@ void report_topic_config( validation_hide_default_override); add_topic_config_if_requested( - resource, + config_keys, result, topic_property_record_value_subject_name_strategy_compat, metadata_cache.get_default_record_value_subject_name_strategy(), @@ -671,7 +699,7 @@ void report_topic_config( } case pandaproxy::schema_registry::schema_id_validation_mode::redpanda: { add_topic_config_if_requested( - resource, + config_keys, result, topic_property_record_key_schema_id_validation, metadata_cache.get_default_record_key_schema_id_validation(), @@ -683,7 +711,7 @@ void report_topic_config( validation_hide_default_override); add_topic_config_if_requested( - resource, + config_keys, result, topic_property_record_key_subject_name_strategy, metadata_cache.get_default_record_key_subject_name_strategy(), @@ -700,7 +728,7 @@ void report_topic_config( validation_hide_default_override); add_topic_config_if_requested( - resource, + config_keys, result, topic_property_record_value_schema_id_validation, metadata_cache.get_default_record_value_schema_id_validation(), @@ -712,7 +740,7 @@ void report_topic_config( validation_hide_default_override); add_topic_config_if_requested( - resource, + config_keys, result, topic_property_record_value_subject_name_strategy, metadata_cache.get_default_record_value_subject_name_strategy(), @@ -732,39 +760,17 @@ void report_topic_config( break; } } + return result; } -void report_broker_config( - const describe_configs_resource& resource, - describe_configs_result& result, +config_response_container_t make_broker_configs( + const config_key_t& config_keys, bool include_synonyms, bool include_documentation) { - if (!result.resource_name.empty()) { - int32_t broker_id = -1; - auto res = std::from_chars( - result.resource_name.data(), - result.resource_name.data() + result.resource_name.size(), // NOLINT - broker_id); - if (res.ec == std::errc()) { - if (broker_id != *config::node().node_id()) { - result.error_code = error_code::invalid_request; - result.error_message = ssx::sformat( - "Unexpected broker id {} expected {}", - broker_id, - *config::node().node_id()); - return; - } - } else { - result.error_code = error_code::invalid_request; - result.error_message = ssx::sformat( - "Broker id must be an integer but received {}", - result.resource_name); - return; - } - } + config_response_container_t result; add_broker_config_if_requested( - resource, + config_keys, result, "listeners", config::node().kafka_api, @@ -774,7 +780,7 @@ void report_broker_config( &kafka_authn_endpoint_format); add_broker_config_if_requested( - resource, + config_keys, result, "advertised.listeners", config::node().advertised_kafka_api_property(), @@ -785,7 +791,7 @@ void report_broker_config( &kafka_endpoint_format); add_broker_config_if_requested( - resource, + config_keys, result, "log.segment.bytes", config::shard_local_cfg().log_segment_size, @@ -796,7 +802,7 @@ void report_broker_config( &describe_as_string); add_broker_config_if_requested( - resource, + config_keys, result, "log.retention.bytes", config::shard_local_cfg().retention_bytes, @@ -809,7 +815,7 @@ void report_broker_config( }); add_broker_config_if_requested( - resource, + config_keys, result, "log.retention.ms", config::shard_local_cfg().delete_retention_ms, @@ -822,7 +828,7 @@ void report_broker_config( }); add_broker_config_if_requested( - resource, + config_keys, result, "num.partitions", config::shard_local_cfg().default_topic_partitions, @@ -833,7 +839,7 @@ void report_broker_config( &describe_as_string); add_broker_config_if_requested( - resource, + config_keys, result, "default.replication.factor", config::shard_local_cfg().default_topic_replication, @@ -844,7 +850,7 @@ void report_broker_config( &describe_as_string); add_broker_config_if_requested( - resource, + config_keys, result, "log.dirs", config::node().data_directory, @@ -856,7 +862,7 @@ void report_broker_config( }); add_broker_config_if_requested( - resource, + config_keys, result, "auto.create.topics.enable", config::shard_local_cfg().auto_create_topics_enabled, @@ -865,29 +871,32 @@ void report_broker_config( include_documentation, config::shard_local_cfg().auto_create_topics_enabled.desc()), &describe_as_string); -} - -std::vector make_configs( - const cluster::metadata_cache& metadata_cache, - const cluster::topic_properties& topic_config) { - describe_configs_resource resource{}; - describe_configs_result describe_result{}; - - report_topic_config( - resource, describe_result, metadata_cache, topic_config, false, false); - - std::vector result; - result.reserve(describe_result.configs.size()); - - for (auto& describe_conf : describe_result.configs) { - result.push_back(creatable_topic_configs{ - .name = std::move(describe_conf.name), - .value = std::move(describe_conf.value), - .config_source = describe_conf.config_source, - }); - } return result; } +describe_configs_resource_result config_response::to_describe_config() { + return { + .name = name, + .value = value, + .read_only = read_only, + .is_default = is_default, + .config_source = config_source, + .is_sensitive = is_sensitive, + .synonyms = synonyms, + .config_type = config_type, + .documentation = documentation, + }; +}; + +creatable_topic_configs config_response::to_create_config() { + return { + .name = name, + .value = value, + .read_only = read_only, + .config_source = config_source, + .is_sensitive = is_sensitive, + }; +}; + } // namespace kafka diff --git a/src/v/kafka/server/handlers/configs/config_response_utils.h b/src/v/kafka/server/handlers/configs/config_response_utils.h index a22c5c56f64bf..8f86a8a756bd4 100644 --- a/src/v/kafka/server/handlers/configs/config_response_utils.h +++ b/src/v/kafka/server/handlers/configs/config_response_utils.h @@ -13,26 +13,41 @@ #include "cluster/types.h" #include "kafka/protocol/describe_configs.h" -#include "kafka/protocol/create_topics.h" +#include "kafka/protocol/schemata/create_topics_response.h" + +#include +#include namespace kafka { -void report_topic_config( - const describe_configs_resource& resource, - describe_configs_result& result, +struct config_response { + ss::sstring name{}; + std::optional value{}; + bool read_only{}; + bool is_default{}; + kafka::describe_configs_source config_source{-1}; + bool is_sensitive{}; + std::vector synonyms{}; + kafka::describe_configs_type config_type{0}; + std::optional documentation{}; + + describe_configs_resource_result to_describe_config(); + creatable_topic_configs to_create_config(); +}; + +using config_response_container_t = std::vector; +using config_key_t = std::optional>; + +config_response_container_t make_topic_configs( const cluster::metadata_cache& metadata_cache, const cluster::topic_properties& topic_properties, + const config_key_t& config_keys, bool include_synonyms, bool include_documentation); -void report_broker_config( - const describe_configs_resource& resource, - describe_configs_result& result, +config_response_container_t make_broker_configs( + const config_key_t& config_keys, bool include_synonyms, bool include_documentation); -std::vector make_configs( - const cluster::metadata_cache& metadata_cache, - const cluster::topic_properties& topic_config); - } // namespace kafka diff --git a/src/v/kafka/server/handlers/create_topics.cc b/src/v/kafka/server/handlers/create_topics.cc index 851aba4120ef9..96f73b80d5eed 100644 --- a/src/v/kafka/server/handlers/create_topics.cc +++ b/src/v/kafka/server/handlers/create_topics.cc @@ -97,7 +97,7 @@ append_topic_configs(request_context& ctx, create_topics_response& response) { model::topic_namespace_view{model::kafka_namespace, ct_result.name}); if (cfg) { ct_result.configs = std::make_optional( - make_configs(ctx.metadata_cache(), cfg->properties)); + report_topic_configs(ctx.metadata_cache(), cfg->properties)); ct_result.topic_config_error_code = kafka::error_code::none; } else { // Topic was sucessfully created but metadata request did not @@ -215,8 +215,8 @@ ss::future create_topics_handler::handle( if (ctx.header().version >= api_version(5)) { auto default_properties = ctx.metadata_cache().get_default_properties(); - result.configs = std::make_optional( - make_configs(ctx.metadata_cache(), default_properties)); + result.configs = std::make_optional(report_topic_configs( + ctx.metadata_cache(), default_properties)); } return result; }); diff --git a/src/v/kafka/server/handlers/describe_configs.cc b/src/v/kafka/server/handlers/describe_configs.cc index 219f81f8bae28..e1a42d6285ccc 100644 --- a/src/v/kafka/server/handlers/describe_configs.cc +++ b/src/v/kafka/server/handlers/describe_configs.cc @@ -10,6 +10,7 @@ #include "kafka/server/handlers/describe_configs.h" #include "cluster/metadata_cache.h" +#include "cluster/types.h" #include "config/configuration.h" #include "config/data_directory_path.h" #include "config/node_config.h" @@ -40,6 +41,64 @@ namespace kafka { +static void report_topic_config( + const describe_configs_resource& resource, + describe_configs_result& result, + const cluster::metadata_cache& metadata_cache, + const cluster::topic_properties& topic_properties, + bool include_synonyms, + bool include_documentation) { + auto res = make_topic_configs( + metadata_cache, + topic_properties, + resource.configuration_keys, + include_synonyms, + include_documentation); + + result.configs.reserve(res.size()); + for (auto& conf : res) { + result.configs.push_back(conf.to_describe_config()); + } +} + +static void report_broker_config( + const describe_configs_resource& resource, + describe_configs_result& result, + bool include_synonyms, + bool include_documentation) { + if (!result.resource_name.empty()) { + int32_t broker_id = -1; + auto res = std::from_chars( + result.resource_name.data(), + result.resource_name.data() + result.resource_name.size(), // NOLINT + broker_id); + if (res.ec == std::errc()) { + if (broker_id != *config::node().node_id()) { + result.error_code = error_code::invalid_request; + result.error_message = ssx::sformat( + "Unexpected broker id {} expected {}", + broker_id, + *config::node().node_id()); + return; + } + } else { + result.error_code = error_code::invalid_request; + result.error_message = ssx::sformat( + "Broker id must be an integer but received {}", + result.resource_name); + return; + } + } + + auto res = make_broker_configs( + resource.configuration_keys, include_synonyms, include_documentation); + + result.configs.reserve(res.size()); + for (auto& conf : res) { + result.configs.push_back(conf.to_describe_config()); + } +} + template<> ss::future describe_configs_handler::handle( request_context ctx, [[maybe_unused]] ss::smp_service_group ssg) { diff --git a/src/v/kafka/server/handlers/topics/types.cc b/src/v/kafka/server/handlers/topics/types.cc index e0c280ccde1ba..c2dd0207de5a9 100644 --- a/src/v/kafka/server/handlers/topics/types.cc +++ b/src/v/kafka/server/handlers/topics/types.cc @@ -218,4 +218,25 @@ to_cluster_type(const creatable_topic& t) { return ret; } +static std::vector +convert_topic_configs(std::vector&& topic_cfgs) { + auto configs = std::vector(); + configs.reserve(topic_cfgs.size()); + + for (auto& conf : topic_cfgs) { + configs.push_back(conf.to_create_config()); + } + + return configs; +} + +std::vector report_topic_configs( + const cluster::metadata_cache& metadata_cache, + const cluster::topic_properties& topic_properties) { + auto topic_cfgs = make_topic_configs( + metadata_cache, topic_properties, std::nullopt, false, false); + + return convert_topic_configs(std::move(topic_cfgs)); +} + } // namespace kafka diff --git a/src/v/kafka/server/handlers/topics/types.h b/src/v/kafka/server/handlers/topics/types.h index 88aa34b0c3872..733881585c558 100644 --- a/src/v/kafka/server/handlers/topics/types.h +++ b/src/v/kafka/server/handlers/topics/types.h @@ -14,6 +14,7 @@ #include "kafka/protocol/schemata/create_topics_request.h" #include "kafka/protocol/schemata/create_topics_response.h" #include "kafka/server/errors.h" +#include "kafka/server/handlers/configs/config_response_utils.h" #include "model/fundamental.h" #include "model/namespace.h" #include "utils/absl_sstring_hash.h" @@ -139,4 +140,8 @@ config_map_t config_map(const std::vector& config); cluster::custom_assignable_topic_configuration to_cluster_type(const creatable_topic& t); +std::vector report_topic_configs( + const cluster::metadata_cache& metadata_cache, + const cluster::topic_properties& topic_properties); + } // namespace kafka diff --git a/src/v/kafka/server/tests/config_response_utils_test.cc b/src/v/kafka/server/tests/config_response_utils_test.cc index 74c70fb89b673..0dc79bc6febfb 100644 --- a/src/v/kafka/server/tests/config_response_utils_test.cc +++ b/src/v/kafka/server/tests/config_response_utils_test.cc @@ -19,9 +19,9 @@ #include #include -std::optional -get_config(const kafka::describe_configs_result& result, std::string_view key) { - for (const auto& config : result.configs) { +std::optional get_config( + const kafka::config_response_container_t& result, std::string_view key) { + for (const auto& config : result) { if (config.name == key) { return config.value; } @@ -30,8 +30,8 @@ get_config(const kafka::describe_configs_result& result, std::string_view key) { } kafka::describe_configs_source get_config_source( - const kafka::describe_configs_result& result, std::string_view key) { - for (const auto& config : result.configs) { + const kafka::config_response_container_t& result, std::string_view key) { + for (const auto& config : result) { if (config.name == key) { return config.config_source; } @@ -45,11 +45,10 @@ BOOST_AUTO_TEST_CASE(add_topic_config_if_requested_tristate) { std::optional default_value, tristate override_value, std::optional expected_value) { - describe_configs_resource resource{}; - describe_configs_result result{}; + config_response_container_t result; add_topic_config_if_requested( - resource, + std::nullopt, result, "test-global-broker-config-name", default_value, @@ -80,11 +79,10 @@ BOOST_AUTO_TEST_CASE(add_topic_config_if_requested_optional) { std::optional override_value, std::optional expected_value, bool hide_default_override) { - describe_configs_resource resource{}; - describe_configs_result result{}; + config_response_container_t result; add_topic_config_if_requested( - resource, + std::nullopt, result, "test-global-broker-config-name", default_value, @@ -112,11 +110,10 @@ BOOST_AUTO_TEST_CASE(add_topic_config_if_requested_optional_hide_default) { []( bool hide_default_override, kafka::describe_configs_source expected_source) { - describe_configs_resource resource{}; - describe_configs_result result{}; + config_response_container_t result; add_topic_config_if_requested( - resource, + std::nullopt, result, "test-global-broker-config-name", 2, diff --git a/src/v/kafka/server/tests/config_response_utils_test_help.h b/src/v/kafka/server/tests/config_response_utils_test_help.h index e4b9d6ed5de93..d85e553f2639e 100644 --- a/src/v/kafka/server/tests/config_response_utils_test_help.h +++ b/src/v/kafka/server/tests/config_response_utils_test_help.h @@ -12,6 +12,7 @@ #pragma once #include "kafka/protocol/describe_configs.h" +#include "kafka/server/handlers/configs/config_response_utils.h" namespace kafka { @@ -20,8 +21,8 @@ ss::sstring describe_as_string(const T& t); template void add_topic_config_if_requested( - const describe_configs_resource& resource, - describe_configs_result& result, + const config_key_t& config_keys, + config_response_container_t& result, std::string_view default_name, const T& default_value, std::string_view override_name, @@ -33,8 +34,8 @@ void add_topic_config_if_requested( template void add_topic_config_if_requested( - const describe_configs_resource& resource, - describe_configs_result& result, + const config_key_t& config_keys, + config_response_container_t& result, std::string_view default_name, const std::optional& default_value, std::string_view override_name, diff --git a/src/v/kafka/server/tests/create_topics_test.cc b/src/v/kafka/server/tests/create_topics_test.cc index 48292ba0d184a..b44a6f17e8afb 100644 --- a/src/v/kafka/server/tests/create_topics_test.cc +++ b/src/v/kafka/server/tests/create_topics_test.cc @@ -133,7 +133,7 @@ class create_topic_fixture : public redpanda_thread_fixture { BOOST_TEST(topic_res.configs, "empty config response"); auto cfg_map = config_map(*topic_res.configs); const auto default_topic_properties = config_map( - kafka::make_configs( + kafka::report_topic_configs( app.metadata_cache.local(), app.metadata_cache.local().get_default_properties())); BOOST_TEST( @@ -152,8 +152,8 @@ class create_topic_fixture : public redpanda_thread_fixture { auto cfg = app.metadata_cache.local().get_topic_cfg( model::topic_namespace_view{model::kafka_namespace, topic_res.name}); BOOST_TEST(cfg, "missing topic config"); - auto cfg_map = config_map( - kafka::make_configs(app.metadata_cache.local(), cfg->properties)); + auto cfg_map = config_map(kafka::report_topic_configs( + app.metadata_cache.local(), cfg->properties)); BOOST_TEST(cfg_map == resp_cfgs, "configs didn't match"); BOOST_CHECK_EQUAL( topic_res.topic_config_error_code, kafka::error_code::none);