diff --git a/src/v/kafka/CMakeLists.txt b/src/v/kafka/CMakeLists.txt index d9e9fc93c8dfd..f031e7968e0c4 100644 --- a/src/v/kafka/CMakeLists.txt +++ b/src/v/kafka/CMakeLists.txt @@ -20,6 +20,7 @@ set(handlers_srcs server/handlers/alter_partition_reassignments.cc server/handlers/list_partition_reassignments.cc server/handlers/handler_interface.cc + server/handlers/configs/config_response_utils.cc server/handlers/topics/types.cc server/handlers/topics/topic_utils.cc server/handlers/delete_records.cc diff --git a/src/v/kafka/server/handlers/configs/config_response_utils.cc b/src/v/kafka/server/handlers/configs/config_response_utils.cc new file mode 100644 index 0000000000000..828153bda3f96 --- /dev/null +++ b/src/v/kafka/server/handlers/configs/config_response_utils.cc @@ -0,0 +1,902 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#include "kafka/server/handlers/configs/config_response_utils.h" + +#include "cluster/metadata_cache.h" +#include "cluster/types.h" +#include "config/node_config.h" +#include "kafka/server/handlers/topics/types.h" + +#include + +namespace kafka { + +static bool config_property_requested( + const config_key_t& configuration_keys, + const std::string_view property_name) { + return !configuration_keys.has_value() + || std::find( + configuration_keys->begin(), + configuration_keys->end(), + property_name) + != configuration_keys->end(); +} + +template +static void add_config( + describe_configs_result& result, + std::string_view name, + T value, + describe_configs_source source) { + result.configs.push_back(describe_configs_resource_result{ + .name = ss::sstring(name), + .value = ssx::sformat("{}", value), + .config_source = source, + }); +} + +template +static void add_config_if_requested( + const config_key_t& configuration_keys, + describe_configs_result& result, + std::string_view name, + T value, + describe_configs_source source) { + if (config_property_requested(configuration_keys, name)) { + add_config(result, name, value, source); + } +} + +template +ss::sstring describe_as_string(const T& t) { + return ssx::sformat("{}", t); +} + +// Instantiate explicitly for unit testing +template ss::sstring describe_as_string(const int&); + +// Kafka protocol defines integral types by sizes. See +// https://kafka.apache.org/protocol.html +// Therefore we should also use type sizes for integrals and use Java type sizes +// as a guideline. See +// https://docs.oracle.com/javase/tutorial/java/nutsandbolts/datatypes.html +template +constexpr auto num_bits = CHAR_BIT * sizeof(T); + +template +constexpr bool is_short = std::is_integral_v && !std::is_same_v + && num_bits <= 16; + +template +constexpr bool is_int = std::is_integral_v && num_bits > 16 + && num_bits <= 32; + +template +constexpr bool is_long = std::is_integral_v && num_bits > 32 + && num_bits <= 64; + +// property_config_type maps the datatype for a config property to +// describe_configs_type. Currently class_type and password are not used in +// Redpanda so we do not include checks for those types. You may find a similar +// mapping in Apache Kafka at +// https://github.com/apache/kafka/blob/be032735b39360df1a6de1a7feea8b4336e5bcc0/core/src/main/scala/kafka/server/ConfigHelper.scala +template +consteval describe_configs_type property_config_type() { + // clang-format off + constexpr auto is_string_type = std::is_same_v || + std::is_same_v || + std::is_same_v || + std::is_same_v || + std::is_same_v || + std::is_same_v || + std::is_same_v; + + constexpr auto is_long_type = is_long || + // Long type since seconds is atleast a 35-bit signed integral + // https://en.cppreference.com/w/cpp/chrono/duration + std::is_same_v || + // Long type since milliseconds is atleast a 45-bit signed integral + // https://en.cppreference.com/w/cpp/chrono/duration + std::is_same_v; + // clang-format on + + if constexpr ( + reflection::is_std_optional || reflection::is_tristate) { + return property_config_type(); + return property_config_type(); + } else if constexpr (std::is_same_v) { + return describe_configs_type::boolean; + } else if constexpr (is_string_type) { + return describe_configs_type::string; + } else if constexpr (is_short) { + return describe_configs_type::short_type; + } else if constexpr (is_int) { + return describe_configs_type::int_type; + } else if constexpr (is_long_type) { + return describe_configs_type::long_type; + } else if constexpr (std::is_floating_point_v) { + return describe_configs_type::double_type; + } else if constexpr (reflection::is_std_vector) { + return describe_configs_type::list; + } else { + static_assert( + config::detail::dependent_false::value, + "Type name is not supported in describe_configs_type"); + } +} + +template +static void add_broker_config( + config_response_container_t& result, + std::string_view name, + const config::property& property, + bool include_synonyms, + std::optional documentation, + Func&& describe_f) { + describe_configs_source src + = property.is_overriden() ? describe_configs_source::static_broker_config + : describe_configs_source::default_config; + + std::vector synonyms; + if (include_synonyms) { + synonyms.reserve(2); + /** + * If value was overriden, include override + */ + if (src == describe_configs_source::static_broker_config) { + synonyms.push_back(describe_configs_synonym{ + .name = ss::sstring(property.name()), + .value = describe_f(property.value()), + .source = static_cast( + describe_configs_source::static_broker_config), + }); + } + /** + * If property is required it has no default + */ + if (!property.is_required()) { + synonyms.push_back(describe_configs_synonym{ + .name = ss::sstring(property.name()), + .value = describe_f(property.default_value()), + .source = static_cast( + describe_configs_source::default_config), + }); + } + } + + result.push_back(config_response{ + .name = ss::sstring(name), + .value = describe_f(property.value()), + .config_source = src, + .synonyms = std::move(synonyms), + .config_type = property_config_type(), + .documentation = documentation, + }); +} + +template +static void add_broker_config_if_requested( + const config_key_t& config_keys, + config_response_container_t& result, + std::string_view name, + const config::property& property, + bool include_synonyms, + std::optional documentation, + Func&& describe_f) { + if (config_property_requested(config_keys, name)) { + add_broker_config( + result, + name, + property, + include_synonyms, + documentation, + std::forward(describe_f)); + } +} + +template +static void add_topic_config( + config_response_container_t& result, + std::string_view default_name, + const T& default_value, + std::string_view override_name, + const std::optional& overrides, + bool include_synonyms, + std::optional documentation, + Func&& describe_f) { + describe_configs_source src = overrides + ? describe_configs_source::topic + : describe_configs_source::default_config; + + std::vector synonyms; + if (include_synonyms) { + synonyms.reserve(2); + if (overrides) { + synonyms.push_back(describe_configs_synonym{ + .name = ss::sstring(override_name), + .value = describe_f(*overrides), + .source = static_cast(describe_configs_source::topic), + }); + } + synonyms.push_back(describe_configs_synonym{ + .name = ss::sstring(default_name), + .value = describe_f(default_value), + .source = static_cast( + describe_configs_source::default_config), + }); + } + + result.push_back(config_response{ + .name = ss::sstring(override_name), + .value = describe_f(overrides.value_or(default_value)), + .config_source = src, + .synonyms = std::move(synonyms), + .config_type = property_config_type(), + .documentation = documentation, + }); +} + +/** + * For faking DEFAULT_CONFIG status for properties that are actually + * topic overrides: cloud storage properties. We do not support cluster + * defaults for these, the values are always "sticky" to topics, but + * some Kafka clients insist that after an AlterConfig RPC, anything + * they didn't set should be DEFAULT_CONFIG. + * + * See https://github.com/redpanda-data/redpanda/issues/7451 + */ +template +std::optional +override_if_not_default(const std::optional& override, const T& def) { + if (override && override.value() != def) { + return override; + } else { + return std::nullopt; + } +} + +template +void add_topic_config_if_requested( + const config_key_t& config_keys, + config_response_container_t& result, + std::string_view default_name, + const T& default_value, + std::string_view override_name, + const std::optional& overrides, + bool include_synonyms, + std::optional documentation, + Func&& describe_f, + bool hide_default_override = false) { + if (config_property_requested(config_keys, override_name)) { + std::optional overrides_val; + if (hide_default_override) { + overrides_val = override_if_not_default(overrides, default_value); + } else { + overrides_val = overrides; + } + + add_topic_config( + result, + default_name, + default_value, + override_name, + overrides_val, + include_synonyms, + documentation, + std::forward(describe_f)); + } +} + +// Instantiate explicitly for unit testing +using describe_int_t = decltype(&describe_as_string); +template void add_topic_config_if_requested( + const config_key_t& config_keys, + config_response_container_t& result, + std::string_view default_name, + const int& default_value, + std::string_view override_name, + const std::optional& overrides, + bool include_synonyms, + std::optional documentation, + describe_int_t&& describe_f, + bool hide_default_override = false); + +template +static ss::sstring maybe_print_tristate(const tristate& tri) { + if (tri.is_disabled() || !tri.has_optional_value()) { + return "-1"; + } + return ssx::sformat("{}", tri.value()); +} + +template +static void add_topic_config( + config_response_container_t& result, + std::string_view default_name, + const std::optional& default_value, + std::string_view override_name, + const tristate& overrides, + bool include_synonyms, + std::optional documentation) { + // Wrap overrides in an optional because add_topic_config expects + // optional where S = tristate + std::optional> override_value; + if (overrides.is_disabled() || overrides.has_optional_value()) { + override_value = std::make_optional(overrides); + } + + add_topic_config( + result, + default_name, + tristate{default_value}, + override_name, + override_value, + include_synonyms, + documentation, + &maybe_print_tristate); +} + +template +void add_topic_config_if_requested( + const config_key_t& config_keys, + config_response_container_t& result, + std::string_view default_name, + const std::optional& default_value, + std::string_view override_name, + const tristate& overrides, + bool include_synonyms, + std::optional documentation) { + if (config_property_requested(config_keys, override_name)) { + add_topic_config( + result, + default_name, + default_value, + override_name, + overrides, + include_synonyms, + documentation); + } +} + +// Instantiate explicitly for unit testing +template void add_topic_config_if_requested( + const config_key_t& config_keys, + config_response_container_t& result, + std::string_view default_name, + const std::optional& default_value, + std::string_view override_name, + const tristate& overrides, + bool include_synonyms, + std::optional documentation); + +template +static void add_topic_config_if_requested( + const config_key_t& config_keys, + config_response_container_t& result, + std::string_view override_name, + const std::optional& overrides, + bool include_synonyms, + std::optional documentation, + Func&& describe_f) { + if (config_property_requested(config_keys, override_name)) { + add_topic_config( + result, + override_name, + overrides, + include_synonyms, + documentation, + std::forward(describe_f)); + } +} + +static ss::sstring +kafka_endpoint_format(const std::vector& endpoints) { + std::vector uris; + uris.reserve(endpoints.size()); + std::transform( + endpoints.cbegin(), + endpoints.cend(), + std::back_inserter(uris), + [](const model::broker_endpoint& ep) { + return ssx::sformat( + "{}://{}:{}", + (ep.name.empty() ? "plain" : ep.name), + ep.address.host(), + ep.address.port()); + }); + return ssx::sformat("{}", fmt::join(uris, ",")); +} + +static ss::sstring kafka_authn_endpoint_format( + const std::vector& endpoints) { + std::vector uris; + uris.reserve(endpoints.size()); + std::transform( + endpoints.cbegin(), + endpoints.cend(), + std::back_inserter(uris), + [](const config::broker_authn_endpoint& ep) { + return ssx::sformat( + "{}://{}:{}", + (ep.name.empty() ? "plain" : ep.name), + ep.address.host(), + ep.address.port()); + }); + return ssx::sformat("{}", fmt::join(uris, ",")); +} + +static inline std::optional maybe_make_documentation( + bool include_documentation, const std::string_view& docstring) { + return include_documentation ? std::make_optional(ss::sstring{docstring}) + : std::nullopt; +} + +config_response_container_t make_topic_configs( + const cluster::metadata_cache& metadata_cache, + const cluster::topic_properties& topic_properties, + const config_key_t& config_keys, + bool include_synonyms, + bool include_documentation) { + config_response_container_t result; + + add_topic_config_if_requested( + config_keys, + result, + config::shard_local_cfg().log_compression_type.name(), + metadata_cache.get_default_compression(), + topic_property_compression, + topic_properties.compression, + include_synonyms, + maybe_make_documentation( + include_documentation, + config::shard_local_cfg().log_compression_type.desc()), + &describe_as_string); + + add_topic_config_if_requested( + config_keys, + result, + config::shard_local_cfg().log_cleanup_policy.name(), + metadata_cache.get_default_cleanup_policy_bitflags(), + topic_property_cleanup_policy, + topic_properties.cleanup_policy_bitflags, + include_synonyms, + maybe_make_documentation( + include_documentation, + config::shard_local_cfg().log_cleanup_policy.desc()), + &describe_as_string); + + const std::string_view docstring{ + topic_properties.is_compacted() + ? config::shard_local_cfg().compacted_log_segment_size.desc() + : config::shard_local_cfg().log_segment_size.desc()}; + add_topic_config_if_requested( + config_keys, + result, + topic_properties.is_compacted() + ? config::shard_local_cfg().compacted_log_segment_size.name() + : config::shard_local_cfg().log_segment_size.name(), + topic_properties.is_compacted() + ? metadata_cache.get_default_compacted_topic_segment_size() + : metadata_cache.get_default_segment_size(), + topic_property_segment_size, + topic_properties.segment_size, + include_synonyms, + maybe_make_documentation(include_documentation, docstring), + &describe_as_string); + + add_topic_config_if_requested( + config_keys, + result, + config::shard_local_cfg().delete_retention_ms.name(), + metadata_cache.get_default_retention_duration(), + topic_property_retention_duration, + topic_properties.retention_duration, + include_synonyms, + maybe_make_documentation( + include_documentation, + config::shard_local_cfg().delete_retention_ms.desc())); + + add_topic_config_if_requested( + config_keys, + result, + config::shard_local_cfg().retention_bytes.name(), + metadata_cache.get_default_retention_bytes(), + topic_property_retention_bytes, + topic_properties.retention_bytes, + include_synonyms, + maybe_make_documentation( + include_documentation, + config::shard_local_cfg().retention_bytes.desc())); + + add_topic_config_if_requested( + config_keys, + result, + config::shard_local_cfg().log_message_timestamp_type.name(), + metadata_cache.get_default_timestamp_type(), + topic_property_timestamp_type, + topic_properties.timestamp_type, + include_synonyms, + maybe_make_documentation( + include_documentation, + config::shard_local_cfg().log_message_timestamp_type.desc()), + &describe_as_string); + + add_topic_config_if_requested( + config_keys, + result, + config::shard_local_cfg().kafka_batch_max_bytes.name(), + metadata_cache.get_default_batch_max_bytes(), + topic_property_max_message_bytes, + topic_properties.batch_max_bytes, + include_synonyms, + maybe_make_documentation( + include_documentation, + config::shard_local_cfg().kafka_batch_max_bytes.desc()), + &describe_as_string); + + // Shadow indexing properties + add_topic_config_if_requested( + config_keys, + result, + topic_property_remote_read, + model::is_fetch_enabled( + metadata_cache.get_default_shadow_indexing_mode()), + topic_property_remote_read, + topic_properties.shadow_indexing.has_value() ? std::make_optional( + model::is_fetch_enabled(*topic_properties.shadow_indexing)) + : std::nullopt, + include_synonyms, + maybe_make_documentation( + include_documentation, + config::shard_local_cfg().cloud_storage_enable_remote_read.desc()), + &describe_as_string, + true); + + add_topic_config_if_requested( + config_keys, + result, + topic_property_remote_write, + model::is_archival_enabled( + metadata_cache.get_default_shadow_indexing_mode()), + topic_property_remote_write, + topic_properties.shadow_indexing.has_value() ? std::make_optional( + model::is_archival_enabled(*topic_properties.shadow_indexing)) + : std::nullopt, + include_synonyms, + maybe_make_documentation( + include_documentation, + config::shard_local_cfg().cloud_storage_enable_remote_write.desc()), + &describe_as_string, + true); + + add_topic_config_if_requested( + config_keys, + result, + topic_property_retention_local_target_bytes, + metadata_cache.get_default_retention_local_target_bytes(), + topic_property_retention_local_target_bytes, + topic_properties.retention_local_target_bytes, + include_synonyms, + maybe_make_documentation( + include_documentation, + config::shard_local_cfg().retention_local_target_bytes_default.desc())); + + add_topic_config_if_requested( + config_keys, + result, + topic_property_retention_local_target_ms, + std::make_optional( + metadata_cache.get_default_retention_local_target_ms()), + topic_property_retention_local_target_ms, + topic_properties.retention_local_target_ms, + include_synonyms, + maybe_make_documentation( + include_documentation, + config::shard_local_cfg().retention_local_target_ms_default.desc())); + + if (config_property_requested(config_keys, topic_property_remote_delete)) { + add_topic_config( + result, + topic_property_remote_delete, + storage::ntp_config::default_remote_delete, + topic_property_remote_delete, + override_if_not_default( + std::make_optional(topic_properties.remote_delete), + storage::ntp_config::default_remote_delete), + true, + maybe_make_documentation( + include_documentation, + "Controls whether topic deletion should imply deletion in " + "S3"), + [](const bool& b) { return b ? "true" : "false"; }); + } + + add_topic_config_if_requested( + config_keys, + result, + topic_property_segment_ms, + metadata_cache.get_default_segment_ms(), + topic_property_segment_ms, + topic_properties.segment_ms, + include_synonyms, + maybe_make_documentation( + include_documentation, + config::shard_local_cfg().log_segment_ms.desc())); + + constexpr std::string_view key_validation + = "Enable validation of the schema id for keys on a record"; + constexpr std::string_view val_validation + = "Enable validation of the schema id for values on a record"; + constexpr bool validation_hide_default_override = true; + + switch (config::shard_local_cfg().enable_schema_id_validation()) { + case pandaproxy::schema_registry::schema_id_validation_mode::compat: { + add_topic_config_if_requested( + config_keys, + result, + topic_property_record_key_schema_id_validation_compat, + metadata_cache.get_default_record_key_schema_id_validation(), + topic_property_record_key_schema_id_validation_compat, + topic_properties.record_key_schema_id_validation_compat, + include_synonyms, + maybe_make_documentation(include_documentation, key_validation), + &describe_as_string, + validation_hide_default_override); + + add_topic_config_if_requested( + config_keys, + result, + topic_property_record_key_subject_name_strategy_compat, + metadata_cache.get_default_record_key_subject_name_strategy(), + topic_property_record_key_subject_name_strategy_compat, + topic_properties.record_key_subject_name_strategy_compat, + include_synonyms, + maybe_make_documentation( + include_documentation, + fmt::format( + "The subject name strategy for keys if {} is enabled", + topic_property_record_key_schema_id_validation_compat)), + [](auto sns) { return ss::sstring(to_string_view_compat(sns)); }, + validation_hide_default_override); + + add_topic_config_if_requested( + config_keys, + result, + topic_property_record_value_schema_id_validation_compat, + metadata_cache.get_default_record_value_schema_id_validation(), + topic_property_record_value_schema_id_validation_compat, + topic_properties.record_value_schema_id_validation_compat, + include_synonyms, + maybe_make_documentation(include_documentation, val_validation), + &describe_as_string, + validation_hide_default_override); + + add_topic_config_if_requested( + config_keys, + result, + topic_property_record_value_subject_name_strategy_compat, + metadata_cache.get_default_record_value_subject_name_strategy(), + topic_property_record_value_subject_name_strategy_compat, + topic_properties.record_value_subject_name_strategy_compat, + include_synonyms, + maybe_make_documentation( + include_documentation, + fmt::format( + "The subject name strategy for values if {} is enabled", + topic_property_record_value_schema_id_validation_compat)), + [](auto sns) { return ss::sstring(to_string_view_compat(sns)); }, + validation_hide_default_override); + [[fallthrough]]; + } + case pandaproxy::schema_registry::schema_id_validation_mode::redpanda: { + add_topic_config_if_requested( + config_keys, + result, + topic_property_record_key_schema_id_validation, + metadata_cache.get_default_record_key_schema_id_validation(), + topic_property_record_key_schema_id_validation, + topic_properties.record_key_schema_id_validation, + include_synonyms, + maybe_make_documentation(include_documentation, key_validation), + &describe_as_string, + validation_hide_default_override); + + add_topic_config_if_requested( + config_keys, + result, + topic_property_record_key_subject_name_strategy, + metadata_cache.get_default_record_key_subject_name_strategy(), + topic_property_record_key_subject_name_strategy, + topic_properties.record_key_subject_name_strategy, + include_synonyms, + maybe_make_documentation( + include_documentation, + fmt::format( + "The subject name strategy for keys if {} is enabled", + topic_property_record_key_schema_id_validation)), + &describe_as_string< + pandaproxy::schema_registry::subject_name_strategy>, + validation_hide_default_override); + + add_topic_config_if_requested( + config_keys, + result, + topic_property_record_value_schema_id_validation, + metadata_cache.get_default_record_value_schema_id_validation(), + topic_property_record_value_schema_id_validation, + topic_properties.record_value_schema_id_validation, + include_synonyms, + maybe_make_documentation(include_documentation, val_validation), + &describe_as_string, + validation_hide_default_override); + + add_topic_config_if_requested( + config_keys, + result, + topic_property_record_value_subject_name_strategy, + metadata_cache.get_default_record_value_subject_name_strategy(), + topic_property_record_value_subject_name_strategy, + topic_properties.record_value_subject_name_strategy, + include_synonyms, + maybe_make_documentation( + include_documentation, + fmt::format( + "The subject name strategy for values if {} is enabled", + topic_property_record_value_schema_id_validation)), + &describe_as_string< + pandaproxy::schema_registry::subject_name_strategy>, + validation_hide_default_override); + } + case pandaproxy::schema_registry::schema_id_validation_mode::none: { + break; + } + } + return result; +} + +config_response_container_t make_broker_configs( + const config_key_t& config_keys, + bool include_synonyms, + bool include_documentation) { + config_response_container_t result; + + add_broker_config_if_requested( + config_keys, + result, + "listeners", + config::node().kafka_api, + include_synonyms, + maybe_make_documentation( + include_documentation, config::node().kafka_api.desc()), + &kafka_authn_endpoint_format); + + add_broker_config_if_requested( + config_keys, + result, + "advertised.listeners", + config::node().advertised_kafka_api_property(), + include_synonyms, + maybe_make_documentation( + include_documentation, + config::node().advertised_kafka_api_property().desc()), + &kafka_endpoint_format); + + add_broker_config_if_requested( + config_keys, + result, + "log.segment.bytes", + config::shard_local_cfg().log_segment_size, + include_synonyms, + maybe_make_documentation( + include_documentation, + config::shard_local_cfg().log_segment_size.desc()), + &describe_as_string); + + add_broker_config_if_requested( + config_keys, + result, + "log.retention.bytes", + config::shard_local_cfg().retention_bytes, + include_synonyms, + maybe_make_documentation( + include_documentation, + config::shard_local_cfg().retention_bytes.desc()), + [](std::optional sz) { + return ssx::sformat("{}", sz ? sz.value() : -1); + }); + + add_broker_config_if_requested( + config_keys, + result, + "log.retention.ms", + config::shard_local_cfg().delete_retention_ms, + include_synonyms, + maybe_make_documentation( + include_documentation, + config::shard_local_cfg().delete_retention_ms.desc()), + [](const std::optional& ret) { + return ssx::sformat("{}", ret.value_or(-1ms).count()); + }); + + add_broker_config_if_requested( + config_keys, + result, + "num.partitions", + config::shard_local_cfg().default_topic_partitions, + include_synonyms, + maybe_make_documentation( + include_documentation, + config::shard_local_cfg().default_topic_partitions.desc()), + &describe_as_string); + + add_broker_config_if_requested( + config_keys, + result, + "default.replication.factor", + config::shard_local_cfg().default_topic_replication, + include_synonyms, + maybe_make_documentation( + include_documentation, + config::shard_local_cfg().default_topic_replication.desc()), + &describe_as_string); + + add_broker_config_if_requested( + config_keys, + result, + "log.dirs", + config::node().data_directory, + include_synonyms, + maybe_make_documentation( + include_documentation, config::node().data_directory.desc()), + [](const config::data_directory_path& path) { + return path.as_sstring(); + }); + + add_broker_config_if_requested( + config_keys, + result, + "auto.create.topics.enable", + config::shard_local_cfg().auto_create_topics_enabled, + include_synonyms, + maybe_make_documentation( + include_documentation, + config::shard_local_cfg().auto_create_topics_enabled.desc()), + &describe_as_string); + + return result; +} + +describe_configs_resource_result config_response::to_describe_config() { + return { + .name = name, + .value = value, + .read_only = read_only, + .is_default = is_default, + .config_source = config_source, + .is_sensitive = is_sensitive, + .synonyms = synonyms, + .config_type = config_type, + .documentation = documentation, + }; +}; + +creatable_topic_configs config_response::to_create_config() { + return { + .name = name, + .value = value, + .read_only = read_only, + .config_source = config_source, + .is_sensitive = is_sensitive, + }; +}; + +} // namespace kafka diff --git a/src/v/kafka/server/handlers/configs/config_response_utils.h b/src/v/kafka/server/handlers/configs/config_response_utils.h new file mode 100644 index 0000000000000..8f86a8a756bd4 --- /dev/null +++ b/src/v/kafka/server/handlers/configs/config_response_utils.h @@ -0,0 +1,53 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#pragma once + +#include "cluster/types.h" +#include "kafka/protocol/describe_configs.h" +#include "kafka/protocol/schemata/create_topics_response.h" + +#include +#include + +namespace kafka { + +struct config_response { + ss::sstring name{}; + std::optional value{}; + bool read_only{}; + bool is_default{}; + kafka::describe_configs_source config_source{-1}; + bool is_sensitive{}; + std::vector synonyms{}; + kafka::describe_configs_type config_type{0}; + std::optional documentation{}; + + describe_configs_resource_result to_describe_config(); + creatable_topic_configs to_create_config(); +}; + +using config_response_container_t = std::vector; +using config_key_t = std::optional>; + +config_response_container_t make_topic_configs( + const cluster::metadata_cache& metadata_cache, + const cluster::topic_properties& topic_properties, + const config_key_t& config_keys, + bool include_synonyms, + bool include_documentation); + +config_response_container_t make_broker_configs( + const config_key_t& config_keys, + bool include_synonyms, + bool include_documentation); + +} // namespace kafka diff --git a/src/v/kafka/server/handlers/create_topics.cc b/src/v/kafka/server/handlers/create_topics.cc index f4967ad9a51e0..96f73b80d5eed 100644 --- a/src/v/kafka/server/handlers/create_topics.cc +++ b/src/v/kafka/server/handlers/create_topics.cc @@ -12,9 +12,11 @@ #include "cluster/cluster_utils.h" #include "cluster/metadata_cache.h" #include "cluster/topics_frontend.h" +#include "cluster/types.h" #include "config/configuration.h" #include "kafka/protocol/errors.h" #include "kafka/protocol/timeout.h" +#include "kafka/server/handlers/configs/config_response_utils.h" #include "kafka/server/handlers/topics/topic_utils.h" #include "kafka/server/handlers/topics/types.h" #include "kafka/server/quota_manager.h" @@ -31,6 +33,7 @@ #include #include +#include #include namespace kafka { @@ -83,24 +86,6 @@ using validators = make_validator_types< batch_max_bytes_limits, subject_name_strategy_validator>; -static std::vector -properties_to_result_configs(config_map_t config_map) { - std::vector configs; - configs.reserve(config_map.size()); - std::transform( - config_map.begin(), - config_map.end(), - std::back_inserter(configs), - [](auto& cfg) { - return creatable_topic_configs{ - .name = cfg.first, - .value = {std::move(cfg.second)}, - .config_source = kafka::describe_configs_source::default_config, - }; - }); - return configs; -} - static void append_topic_configs(request_context& ctx, create_topics_response& response) { for (auto& ct_result : response.data.topics) { @@ -111,9 +96,8 @@ append_topic_configs(request_context& ctx, create_topics_response& response) { auto cfg = ctx.metadata_cache().get_topic_cfg( model::topic_namespace_view{model::kafka_namespace, ct_result.name}); if (cfg) { - auto config_map = from_cluster_type(cfg->properties); - ct_result.configs = { - properties_to_result_configs(std::move(config_map))}; + ct_result.configs = std::make_optional( + report_topic_configs(ctx.metadata_cache(), cfg->properties)); ct_result.topic_config_error_code = kafka::error_code::none; } else { // Topic was sucessfully created but metadata request did not @@ -231,8 +215,8 @@ ss::future create_topics_handler::handle( if (ctx.header().version >= api_version(5)) { auto default_properties = ctx.metadata_cache().get_default_properties(); - result.configs = {properties_to_result_configs( - from_cluster_type(default_properties))}; + result.configs = std::make_optional(report_topic_configs( + ctx.metadata_cache(), default_properties)); } return result; }); diff --git a/src/v/kafka/server/handlers/describe_configs.cc b/src/v/kafka/server/handlers/describe_configs.cc index 02001608b3594..e1a42d6285ccc 100644 --- a/src/v/kafka/server/handlers/describe_configs.cc +++ b/src/v/kafka/server/handlers/describe_configs.cc @@ -10,10 +10,12 @@ #include "kafka/server/handlers/describe_configs.h" #include "cluster/metadata_cache.h" +#include "cluster/types.h" #include "config/configuration.h" #include "config/data_directory_path.h" #include "config/node_config.h" #include "kafka/protocol/errors.h" +#include "kafka/server/handlers/configs/config_response_utils.h" #include "kafka/server/handlers/topics/topic_utils.h" #include "kafka/server/handlers/topics/types.h" #include "kafka/server/request_context.h" @@ -39,378 +41,26 @@ namespace kafka { -static bool config_property_requested( - const std::optional>& configuration_keys, - const std::string_view property_name) { - return !configuration_keys.has_value() - || std::find( - configuration_keys->begin(), - configuration_keys->end(), - property_name) - != configuration_keys->end(); -} - -template -static void add_config( - describe_configs_result& result, - std::string_view name, - T value, - describe_configs_source source) { - result.configs.push_back(describe_configs_resource_result{ - .name = ss::sstring(name), - .value = ssx::sformat("{}", value), - .config_source = source, - }); -} - -template -static void add_config_if_requested( - const describe_configs_resource& resource, - describe_configs_result& result, - std::string_view name, - T value, - describe_configs_source source) { - if (config_property_requested(resource.configuration_keys, name)) { - add_config(result, name, value, source); - } -} - -template -static ss::sstring describe_as_string(const T& t) { - return ssx::sformat("{}", t); -} - -// Kafka protocol defines integral types by sizes. See -// https://kafka.apache.org/protocol.html -// Therefore we should also use type sizes for integrals and use Java type sizes -// as a guideline. See -// https://docs.oracle.com/javase/tutorial/java/nutsandbolts/datatypes.html -template -constexpr auto num_bits = CHAR_BIT * sizeof(T); - -template -constexpr bool is_short = std::is_integral_v && !std::is_same_v - && num_bits <= 16; - -template -constexpr bool is_int = std::is_integral_v && num_bits > 16 - && num_bits <= 32; - -template -constexpr bool is_long = std::is_integral_v && num_bits > 32 - && num_bits <= 64; - -// property_config_type maps the datatype for a config property to -// describe_configs_type. Currently class_type and password are not used in -// Redpanda so we do not include checks for those types. You may find a similar -// mapping in Apache Kafka at -// https://github.com/apache/kafka/blob/be032735b39360df1a6de1a7feea8b4336e5bcc0/core/src/main/scala/kafka/server/ConfigHelper.scala -template -consteval describe_configs_type property_config_type() { - // clang-format off - constexpr auto is_string_type = std::is_same_v || - std::is_same_v || - std::is_same_v || - std::is_same_v || - std::is_same_v || - std::is_same_v || - std::is_same_v; - - constexpr auto is_long_type = is_long || - // Long type since seconds is atleast a 35-bit signed integral - // https://en.cppreference.com/w/cpp/chrono/duration - std::is_same_v || - // Long type since milliseconds is atleast a 45-bit signed integral - // https://en.cppreference.com/w/cpp/chrono/duration - std::is_same_v; - // clang-format on - - if constexpr ( - reflection::is_std_optional || reflection::is_tristate) { - return property_config_type(); - return property_config_type(); - } else if constexpr (std::is_same_v) { - return describe_configs_type::boolean; - } else if constexpr (is_string_type) { - return describe_configs_type::string; - } else if constexpr (is_short) { - return describe_configs_type::short_type; - } else if constexpr (is_int) { - return describe_configs_type::int_type; - } else if constexpr (is_long_type) { - return describe_configs_type::long_type; - } else if constexpr (std::is_floating_point_v) { - return describe_configs_type::double_type; - } else if constexpr (reflection::is_std_vector) { - return describe_configs_type::list; - } else { - static_assert( - config::detail::dependent_false::value, - "Type name is not supported in describe_configs_type"); - } -} - -template -static void add_broker_config( - describe_configs_result& result, - std::string_view name, - const config::property& property, - bool include_synonyms, - std::optional documentation, - Func&& describe_f) { - describe_configs_source src - = property.is_overriden() ? describe_configs_source::static_broker_config - : describe_configs_source::default_config; - - std::vector synonyms; - if (include_synonyms) { - synonyms.reserve(2); - /** - * If value was overriden, include override - */ - if (src == describe_configs_source::static_broker_config) { - synonyms.push_back(describe_configs_synonym{ - .name = ss::sstring(property.name()), - .value = describe_f(property.value()), - .source = static_cast( - describe_configs_source::static_broker_config), - }); - } - /** - * If property is required it has no default - */ - if (!property.is_required()) { - synonyms.push_back(describe_configs_synonym{ - .name = ss::sstring(property.name()), - .value = describe_f(property.default_value()), - .source = static_cast( - describe_configs_source::default_config), - }); - } - } - - result.configs.push_back(describe_configs_resource_result{ - .name = ss::sstring(name), - .value = describe_f(property.value()), - .config_source = src, - .synonyms = std::move(synonyms), - .config_type = property_config_type(), - .documentation = documentation, - }); -} - -template -static void add_broker_config_if_requested( - const describe_configs_resource& resource, - describe_configs_result& result, - std::string_view name, - const config::property& property, - bool include_synonyms, - std::optional documentation, - Func&& describe_f) { - if (config_property_requested(resource.configuration_keys, name)) { - add_broker_config( - result, - name, - property, - include_synonyms, - documentation, - std::forward(describe_f)); - } -} - -template -static void add_topic_config( - describe_configs_result& result, - std::string_view default_name, - const T& default_value, - std::string_view override_name, - const std::optional& overrides, - bool include_synonyms, - std::optional documentation, - Func&& describe_f) { - describe_configs_source src = overrides - ? describe_configs_source::topic - : describe_configs_source::default_config; - - std::vector synonyms; - if (include_synonyms) { - synonyms.reserve(2); - if (overrides) { - synonyms.push_back(describe_configs_synonym{ - .name = ss::sstring(override_name), - .value = describe_f(*overrides), - .source = static_cast(describe_configs_source::topic), - }); - } - synonyms.push_back(describe_configs_synonym{ - .name = ss::sstring(default_name), - .value = describe_f(default_value), - .source = static_cast( - describe_configs_source::default_config), - }); - } - - result.configs.push_back(describe_configs_resource_result{ - .name = ss::sstring(override_name), - .value = describe_f(overrides.value_or(default_value)), - .config_source = src, - .synonyms = std::move(synonyms), - .config_type = property_config_type(), - .documentation = documentation, - }); -} - -/** - * For faking DEFAULT_CONFIG status for properties that are actually - * topic overrides: cloud storage properties. We do not support cluster - * defaults for these, the values are always "sticky" to topics, but - * some Kafka clients insist that after an AlterConfig RPC, anything - * they didn't set should be DEFAULT_CONFIG. - * - * See https://github.com/redpanda-data/redpanda/issues/7451 - */ -template -std::optional -override_if_not_default(const std::optional& override, const T& def) { - if (override && override.value() != def) { - return override; - } else { - return std::nullopt; - } -} - -template -static void add_topic_config_if_requested( +static void report_topic_config( const describe_configs_resource& resource, describe_configs_result& result, - std::string_view default_name, - const T& default_value, - std::string_view override_name, - const std::optional& overrides, - bool include_synonyms, - std::optional documentation, - Func&& describe_f, - bool hide_default_override = false) { - if (config_property_requested(resource.configuration_keys, override_name)) { - std::optional overrides_val; - if (hide_default_override) { - overrides_val = override_if_not_default(overrides, default_value); - } else { - overrides_val = overrides; - } - - add_topic_config( - result, - default_name, - default_value, - override_name, - overrides_val, - include_synonyms, - documentation, - std::forward(describe_f)); - } -} - -template -static ss::sstring maybe_print_tristate(const tristate& tri) { - if (tri.is_disabled() || !tri.has_optional_value()) { - return "-1"; - } - return ssx::sformat("{}", tri.value()); -} - -template -static void add_topic_config( - describe_configs_result& result, - std::string_view default_name, - const std::optional& default_value, - std::string_view override_name, - const tristate& overrides, + const cluster::metadata_cache& metadata_cache, + const cluster::topic_properties& topic_properties, bool include_synonyms, - std::optional documentation) { - // Wrap overrides in an optional because add_topic_config expects - // optional where S = tristate - std::optional> override_value; - if (overrides.is_disabled() || overrides.has_optional_value()) { - override_value = std::make_optional(overrides); - } - - add_topic_config( - result, - default_name, - tristate{default_value}, - override_name, - override_value, + bool include_documentation) { + auto res = make_topic_configs( + metadata_cache, + topic_properties, + resource.configuration_keys, include_synonyms, - documentation, - &maybe_print_tristate); -} + include_documentation); -template -static void add_topic_config_if_requested( - const describe_configs_resource& resource, - describe_configs_result& result, - std::string_view default_name, - const std::optional& default_value, - std::string_view override_name, - const tristate& overrides, - bool include_synonyms, - std::optional documentation) { - if (config_property_requested(resource.configuration_keys, override_name)) { - add_topic_config( - result, - default_name, - default_value, - override_name, - overrides, - include_synonyms, - documentation); + result.configs.reserve(res.size()); + for (auto& conf : res) { + result.configs.push_back(conf.to_describe_config()); } } -static ss::sstring -kafka_endpoint_format(const std::vector& endpoints) { - std::vector uris; - uris.reserve(endpoints.size()); - std::transform( - endpoints.cbegin(), - endpoints.cend(), - std::back_inserter(uris), - [](const model::broker_endpoint& ep) { - return ssx::sformat( - "{}://{}:{}", - (ep.name.empty() ? "plain" : ep.name), - ep.address.host(), - ep.address.port()); - }); - return ssx::sformat("{}", fmt::join(uris, ",")); -} - -static ss::sstring kafka_authn_endpoint_format( - const std::vector& endpoints) { - std::vector uris; - uris.reserve(endpoints.size()); - std::transform( - endpoints.cbegin(), - endpoints.cend(), - std::back_inserter(uris), - [](const config::broker_authn_endpoint& ep) { - return ssx::sformat( - "{}://{}:{}", - (ep.name.empty() ? "plain" : ep.name), - ep.address.host(), - ep.address.port()); - }); - return ssx::sformat("{}", fmt::join(uris, ",")); -} - -static inline std::optional maybe_make_documentation( - bool include_documentation, const std::string_view& docstring) { - return include_documentation ? std::make_optional(ss::sstring{docstring}) - : std::nullopt; -} - static void report_broker_config( const describe_configs_resource& resource, describe_configs_result& result, @@ -440,132 +90,13 @@ static void report_broker_config( } } - add_broker_config_if_requested( - resource, - result, - "listeners", - config::node().kafka_api, - include_synonyms, - maybe_make_documentation( - include_documentation, config::node().kafka_api.desc()), - &kafka_authn_endpoint_format); - - add_broker_config_if_requested( - resource, - result, - "advertised.listeners", - config::node().advertised_kafka_api_property(), - include_synonyms, - maybe_make_documentation( - include_documentation, - config::node().advertised_kafka_api_property().desc()), - &kafka_endpoint_format); - - add_broker_config_if_requested( - resource, - result, - "log.segment.bytes", - config::shard_local_cfg().log_segment_size, - include_synonyms, - maybe_make_documentation( - include_documentation, - config::shard_local_cfg().log_segment_size.desc()), - &describe_as_string); - - add_broker_config_if_requested( - resource, - result, - "log.retention.bytes", - config::shard_local_cfg().retention_bytes, - include_synonyms, - maybe_make_documentation( - include_documentation, - config::shard_local_cfg().retention_bytes.desc()), - [](std::optional sz) { - return ssx::sformat("{}", sz ? sz.value() : -1); - }); - - add_broker_config_if_requested( - resource, - result, - "log.retention.ms", - config::shard_local_cfg().delete_retention_ms, - include_synonyms, - maybe_make_documentation( - include_documentation, - config::shard_local_cfg().delete_retention_ms.desc()), - [](const std::optional& ret) { - return ssx::sformat("{}", ret.value_or(-1ms).count()); - }); - - add_broker_config_if_requested( - resource, - result, - "num.partitions", - config::shard_local_cfg().default_topic_partitions, - include_synonyms, - maybe_make_documentation( - include_documentation, - config::shard_local_cfg().default_topic_partitions.desc()), - &describe_as_string); - - add_broker_config_if_requested( - resource, - result, - "default.replication.factor", - config::shard_local_cfg().default_topic_replication, - include_synonyms, - maybe_make_documentation( - include_documentation, - config::shard_local_cfg().default_topic_replication.desc()), - &describe_as_string); - - add_broker_config_if_requested( - resource, - result, - "log.dirs", - config::node().data_directory, - include_synonyms, - maybe_make_documentation( - include_documentation, config::node().data_directory.desc()), - [](const config::data_directory_path& path) { - return path.as_sstring(); - }); - - add_broker_config_if_requested( - resource, - result, - "auto.create.topics.enable", - config::shard_local_cfg().auto_create_topics_enabled, - include_synonyms, - maybe_make_documentation( - include_documentation, - config::shard_local_cfg().auto_create_topics_enabled.desc()), - &describe_as_string); -} - -int64_t describe_retention_duration( - tristate& overrides, - std::optional def) { - if (overrides.is_disabled()) { - return -1; - } - if (overrides.has_optional_value()) { - return overrides.value().count(); - } + auto res = make_broker_configs( + resource.configuration_keys, include_synonyms, include_documentation); - return def ? def->count() : -1; -} -int64_t describe_retention_bytes( - tristate& overrides, std::optional def) { - if (overrides.is_disabled()) { - return -1; - } - if (overrides.has_optional_value()) { - return overrides.value(); + result.configs.reserve(res.size()); + for (auto& conf : res) { + result.configs.push_back(conf.to_describe_config()); } - - return def.value_or(-1); } template<> @@ -613,354 +144,13 @@ ss::future describe_configs_handler::handle( continue; } - /** - * Kafka properties - */ - add_topic_config_if_requested( - resource, - result, - config::shard_local_cfg().log_compression_type.name(), - ctx.metadata_cache().get_default_compression(), - topic_property_compression, - topic_config->properties.compression, - request.data.include_synonyms, - maybe_make_documentation( - request.data.include_documentation, - config::shard_local_cfg().log_compression_type.desc()), - &describe_as_string); - - add_topic_config_if_requested( - resource, - result, - config::shard_local_cfg().log_cleanup_policy.name(), - ctx.metadata_cache().get_default_cleanup_policy_bitflags(), - topic_property_cleanup_policy, - topic_config->properties.cleanup_policy_bitflags, - request.data.include_synonyms, - maybe_make_documentation( - request.data.include_documentation, - config::shard_local_cfg().log_cleanup_policy.desc()), - &describe_as_string); - - const std::string_view docstring{ - topic_config->properties.is_compacted() - ? config::shard_local_cfg().compacted_log_segment_size.desc() - : config::shard_local_cfg().log_segment_size.desc()}; - add_topic_config_if_requested( - resource, - result, - topic_config->properties.is_compacted() - ? config::shard_local_cfg().compacted_log_segment_size.name() - : config::shard_local_cfg().log_segment_size.name(), - topic_config->properties.is_compacted() - ? ctx.metadata_cache() - .get_default_compacted_topic_segment_size() - : ctx.metadata_cache().get_default_segment_size(), - topic_property_segment_size, - topic_config->properties.segment_size, - request.data.include_synonyms, - maybe_make_documentation( - request.data.include_documentation, docstring), - &describe_as_string); - - add_topic_config_if_requested( - resource, - result, - config::shard_local_cfg().delete_retention_ms.name(), - ctx.metadata_cache().get_default_retention_duration(), - topic_property_retention_duration, - topic_config->properties.retention_duration, - request.data.include_synonyms, - maybe_make_documentation( - request.data.include_documentation, - config::shard_local_cfg().delete_retention_ms.desc())); - - add_topic_config_if_requested( - resource, - result, - config::shard_local_cfg().retention_bytes.name(), - ctx.metadata_cache().get_default_retention_bytes(), - topic_property_retention_bytes, - topic_config->properties.retention_bytes, - request.data.include_synonyms, - maybe_make_documentation( - request.data.include_documentation, - config::shard_local_cfg().retention_bytes.desc())); - - add_topic_config_if_requested( - resource, - result, - config::shard_local_cfg().log_message_timestamp_type.name(), - ctx.metadata_cache().get_default_timestamp_type(), - topic_property_timestamp_type, - topic_config->properties.timestamp_type, - request.data.include_synonyms, - maybe_make_documentation( - request.data.include_documentation, - config::shard_local_cfg().log_message_timestamp_type.desc()), - &describe_as_string); - - add_topic_config_if_requested( - resource, - result, - config::shard_local_cfg().kafka_batch_max_bytes.name(), - ctx.metadata_cache().get_default_batch_max_bytes(), - topic_property_max_message_bytes, - topic_config->properties.batch_max_bytes, - request.data.include_synonyms, - maybe_make_documentation( - request.data.include_documentation, - config::shard_local_cfg().kafka_batch_max_bytes.desc()), - &describe_as_string); - - // Shadow indexing properties - add_topic_config_if_requested( - resource, - result, - topic_property_remote_read, - model::is_fetch_enabled( - ctx.metadata_cache().get_default_shadow_indexing_mode()), - topic_property_remote_read, - topic_config->properties.shadow_indexing.has_value() - ? std::make_optional(model::is_fetch_enabled( - *topic_config->properties.shadow_indexing)) - : std::nullopt, - request.data.include_synonyms, - maybe_make_documentation( - request.data.include_documentation, - config::shard_local_cfg() - .cloud_storage_enable_remote_read.desc()), - &describe_as_string, - true); - - add_topic_config_if_requested( - resource, - result, - topic_property_remote_write, - model::is_archival_enabled( - ctx.metadata_cache().get_default_shadow_indexing_mode()), - topic_property_remote_write, - topic_config->properties.shadow_indexing.has_value() - ? std::make_optional(model::is_archival_enabled( - *topic_config->properties.shadow_indexing)) - : std::nullopt, - request.data.include_synonyms, - maybe_make_documentation( - request.data.include_documentation, - config::shard_local_cfg() - .cloud_storage_enable_remote_write.desc()), - &describe_as_string, - true); - - add_topic_config_if_requested( - resource, - result, - topic_property_retention_local_target_bytes, - ctx.metadata_cache().get_default_retention_local_target_bytes(), - topic_property_retention_local_target_bytes, - topic_config->properties.retention_local_target_bytes, - request.data.include_synonyms, - maybe_make_documentation( - request.data.include_documentation, - config::shard_local_cfg() - .retention_local_target_bytes_default.desc())); - - add_topic_config_if_requested( - resource, - result, - topic_property_retention_local_target_ms, - std::make_optional( - ctx.metadata_cache().get_default_retention_local_target_ms()), - topic_property_retention_local_target_ms, - topic_config->properties.retention_local_target_ms, - request.data.include_synonyms, - maybe_make_documentation( - request.data.include_documentation, - config::shard_local_cfg() - .retention_local_target_ms_default.desc())); - - if (config_property_requested( - resource.configuration_keys, topic_property_remote_delete)) { - add_topic_config( - result, - topic_property_remote_delete, - storage::ntp_config::default_remote_delete, - topic_property_remote_delete, - override_if_not_default( - std::make_optional( - topic_config->properties.remote_delete), - storage::ntp_config::default_remote_delete), - true, - maybe_make_documentation( - request.data.include_documentation, - "Controls whether topic deletion should imply deletion in " - "S3"), - [](const bool& b) { return b ? "true" : "false"; }); - } - - add_topic_config_if_requested( + report_topic_config( resource, result, - topic_property_segment_ms, - ctx.metadata_cache().get_default_segment_ms(), - topic_property_segment_ms, - topic_config->properties.segment_ms, + ctx.metadata_cache(), + topic_config->properties, request.data.include_synonyms, - maybe_make_documentation( - request.data.include_documentation, - config::shard_local_cfg().log_segment_ms.desc())); - - constexpr std::string_view key_validation - = "Enable validation of the schema id for keys on a record"; - constexpr std::string_view val_validation - = "Enable validation of the schema id for values on a record"; - constexpr bool validation_hide_default_override = true; - - switch (config::shard_local_cfg().enable_schema_id_validation()) { - case pandaproxy::schema_registry::schema_id_validation_mode:: - compat: { - add_topic_config_if_requested( - resource, - result, - topic_property_record_key_schema_id_validation_compat, - ctx.metadata_cache() - .get_default_record_key_schema_id_validation(), - topic_property_record_key_schema_id_validation_compat, - topic_config->properties - .record_key_schema_id_validation_compat, - request.data.include_synonyms, - maybe_make_documentation( - request.data.include_documentation, key_validation), - &describe_as_string, - validation_hide_default_override); - - add_topic_config_if_requested( - resource, - result, - topic_property_record_key_subject_name_strategy_compat, - ctx.metadata_cache() - .get_default_record_key_subject_name_strategy(), - topic_property_record_key_subject_name_strategy_compat, - topic_config->properties - .record_key_subject_name_strategy_compat, - request.data.include_synonyms, - maybe_make_documentation( - request.data.include_documentation, - fmt::format( - "The subject name strategy for keys if {} is enabled", - topic_property_record_key_schema_id_validation_compat)), - [](auto sns) { - return ss::sstring(to_string_view_compat(sns)); - }, - validation_hide_default_override); - - add_topic_config_if_requested( - resource, - result, - topic_property_record_value_schema_id_validation_compat, - ctx.metadata_cache() - .get_default_record_value_schema_id_validation(), - topic_property_record_value_schema_id_validation_compat, - topic_config->properties - .record_value_schema_id_validation_compat, - request.data.include_synonyms, - maybe_make_documentation( - request.data.include_documentation, val_validation), - &describe_as_string, - validation_hide_default_override); - - add_topic_config_if_requested( - resource, - result, - topic_property_record_value_subject_name_strategy_compat, - ctx.metadata_cache() - .get_default_record_value_subject_name_strategy(), - topic_property_record_value_subject_name_strategy_compat, - topic_config->properties - .record_value_subject_name_strategy_compat, - request.data.include_synonyms, - maybe_make_documentation( - request.data.include_documentation, - fmt::format( - "The subject name strategy for values if {} is enabled", - topic_property_record_value_schema_id_validation_compat)), - [](auto sns) { - return ss::sstring(to_string_view_compat(sns)); - }, - validation_hide_default_override); - [[fallthrough]]; - } - case pandaproxy::schema_registry::schema_id_validation_mode:: - redpanda: { - add_topic_config_if_requested( - resource, - result, - topic_property_record_key_schema_id_validation, - ctx.metadata_cache() - .get_default_record_key_schema_id_validation(), - topic_property_record_key_schema_id_validation, - topic_config->properties.record_key_schema_id_validation, - request.data.include_synonyms, - maybe_make_documentation( - request.data.include_documentation, key_validation), - &describe_as_string, - validation_hide_default_override); - - add_topic_config_if_requested( - resource, - result, - topic_property_record_key_subject_name_strategy, - ctx.metadata_cache() - .get_default_record_key_subject_name_strategy(), - topic_property_record_key_subject_name_strategy, - topic_config->properties.record_key_subject_name_strategy, - request.data.include_synonyms, - maybe_make_documentation( - request.data.include_documentation, - fmt::format( - "The subject name strategy for keys if {} is enabled", - topic_property_record_key_schema_id_validation)), - &describe_as_string< - pandaproxy::schema_registry::subject_name_strategy>, - validation_hide_default_override); - - add_topic_config_if_requested( - resource, - result, - topic_property_record_value_schema_id_validation, - ctx.metadata_cache() - .get_default_record_value_schema_id_validation(), - topic_property_record_value_schema_id_validation, - topic_config->properties.record_value_schema_id_validation, - request.data.include_synonyms, - maybe_make_documentation( - request.data.include_documentation, val_validation), - &describe_as_string, - validation_hide_default_override); - - add_topic_config_if_requested( - resource, - result, - topic_property_record_value_subject_name_strategy, - ctx.metadata_cache() - .get_default_record_value_subject_name_strategy(), - topic_property_record_value_subject_name_strategy, - topic_config->properties.record_value_subject_name_strategy, - request.data.include_synonyms, - maybe_make_documentation( - request.data.include_documentation, - fmt::format( - "The subject name strategy for values if {} is enabled", - topic_property_record_value_schema_id_validation)), - &describe_as_string< - pandaproxy::schema_registry::subject_name_strategy>, - validation_hide_default_override); - } - case pandaproxy::schema_registry::schema_id_validation_mode::none: { - break; - } - } - + request.data.include_documentation); break; } diff --git a/src/v/kafka/server/handlers/topics/types.cc b/src/v/kafka/server/handlers/topics/types.cc index 38c919c22541d..c2dd0207de5a9 100644 --- a/src/v/kafka/server/handlers/topics/types.cc +++ b/src/v/kafka/server/handlers/topics/types.cc @@ -218,139 +218,25 @@ to_cluster_type(const creatable_topic& t) { return ret; } -template -static ss::sstring from_config_type(const T& v) { - if constexpr (std::is_enum_v) { - return ssx::sformat("{}", v); - } else if constexpr (std::is_same_v) { - return v ? "true" : "false"; - } else if constexpr (std::is_same_v) { - return ss::to_sstring( - std::chrono::duration_cast(v).count()); - } else { - return ss::to_sstring(v); - } -} - -config_map_t from_cluster_type(const cluster::topic_properties& properties) { - config_map_t config_entries; - if (properties.compression) { - config_entries[topic_property_compression] = from_config_type( - *properties.compression); - } - if (properties.cleanup_policy_bitflags) { - config_entries[topic_property_cleanup_policy] = from_config_type( - *properties.cleanup_policy_bitflags); - } - if (properties.compaction_strategy) { - config_entries[topic_property_compaction_strategy] = from_config_type( - *properties.compaction_strategy); - } - if (properties.timestamp_type) { - config_entries[topic_property_timestamp_type] = from_config_type( - *properties.timestamp_type); - } - if (properties.segment_size) { - config_entries[topic_property_segment_size] = from_config_type( - *properties.segment_size); - } - if (properties.retention_bytes.has_optional_value()) { - config_entries[topic_property_retention_bytes] = from_config_type( - properties.retention_bytes.value()); - } - if (properties.retention_duration.has_optional_value()) { - config_entries[topic_property_retention_duration] = from_config_type( - *properties.retention_duration); - } - if (properties.recovery) { - config_entries[topic_property_recovery] = from_config_type( - *properties.recovery); - } - if (properties.batch_max_bytes) { - config_entries[topic_property_max_message_bytes] = from_config_type( - *properties.batch_max_bytes); - } - if (properties.shadow_indexing) { - config_entries[topic_property_remote_write] = "false"; - config_entries[topic_property_remote_read] = "false"; - - switch (*properties.shadow_indexing) { - case model::shadow_indexing_mode::archival: - config_entries[topic_property_remote_write] = "true"; - break; - case model::shadow_indexing_mode::fetch: - config_entries[topic_property_remote_read] = "true"; - break; - case model::shadow_indexing_mode::full: - config_entries[topic_property_remote_write] = "true"; - config_entries[topic_property_remote_read] = "true"; - break; - default: - break; - } - } - if (properties.read_replica_bucket) { - config_entries[topic_property_read_replica] = from_config_type( - *properties.read_replica_bucket); - } - - if (properties.retention_local_target_bytes.has_optional_value()) { - config_entries[topic_property_retention_local_target_bytes] - = from_config_type(*properties.retention_local_target_bytes); - } +static std::vector +convert_topic_configs(std::vector&& topic_cfgs) { + auto configs = std::vector(); + configs.reserve(topic_cfgs.size()); - if (properties.retention_local_target_ms.has_optional_value()) { - config_entries[topic_property_retention_local_target_ms] - = from_config_type(*properties.retention_local_target_ms); + for (auto& conf : topic_cfgs) { + configs.push_back(conf.to_create_config()); } - config_entries[topic_property_remote_delete] = from_config_type( - properties.remote_delete); + return configs; +} - if (properties.segment_ms.has_optional_value()) { - config_entries[topic_property_segment_ms] = from_config_type( - properties.segment_ms.value()); - } +std::vector report_topic_configs( + const cluster::metadata_cache& metadata_cache, + const cluster::topic_properties& topic_properties) { + auto topic_cfgs = make_topic_configs( + metadata_cache, topic_properties, std::nullopt, false, false); - if (properties.record_key_schema_id_validation) { - config_entries[topic_property_record_key_schema_id_validation] - = from_config_type(properties.record_key_schema_id_validation); - } - if (properties.record_key_schema_id_validation_compat) { - config_entries[topic_property_record_key_schema_id_validation_compat] - = from_config_type(properties.record_key_schema_id_validation_compat); - } - if (properties.record_key_subject_name_strategy) { - config_entries[topic_property_record_key_subject_name_strategy] - = from_config_type(properties.record_key_subject_name_strategy); - } - if (properties.record_key_subject_name_strategy_compat) { - config_entries[topic_property_record_key_subject_name_strategy_compat] - = from_config_type( - properties.record_key_subject_name_strategy_compat); - } - if (properties.record_value_schema_id_validation) { - config_entries[topic_property_record_value_schema_id_validation] - = from_config_type(properties.record_value_schema_id_validation); - } - if (properties.record_value_schema_id_validation_compat) { - config_entries[topic_property_record_value_schema_id_validation_compat] - = from_config_type( - properties.record_value_schema_id_validation_compat); - } - if (properties.record_value_subject_name_strategy) { - config_entries[topic_property_record_value_subject_name_strategy] - = from_config_type(properties.record_value_subject_name_strategy); - } - if (properties.record_value_subject_name_strategy_compat) { - config_entries[topic_property_record_value_subject_name_strategy_compat] - = from_config_type( - properties.record_value_subject_name_strategy_compat); - } - - /// Final topic_property not encoded here is \ref remote_topic_properties, - /// is more of an implementation detail no need to ever show user - return config_entries; + return convert_topic_configs(std::move(topic_cfgs)); } } // namespace kafka diff --git a/src/v/kafka/server/handlers/topics/types.h b/src/v/kafka/server/handlers/topics/types.h index 75a0e6451b279..733881585c558 100644 --- a/src/v/kafka/server/handlers/topics/types.h +++ b/src/v/kafka/server/handlers/topics/types.h @@ -14,6 +14,7 @@ #include "kafka/protocol/schemata/create_topics_request.h" #include "kafka/protocol/schemata/create_topics_response.h" #include "kafka/server/errors.h" +#include "kafka/server/handlers/configs/config_response_utils.h" #include "model/fundamental.h" #include "model/namespace.h" #include "utils/absl_sstring_hash.h" @@ -139,5 +140,8 @@ config_map_t config_map(const std::vector& config); cluster::custom_assignable_topic_configuration to_cluster_type(const creatable_topic& t); -config_map_t from_cluster_type(const cluster::topic_properties&); +std::vector report_topic_configs( + const cluster::metadata_cache& metadata_cache, + const cluster::topic_properties& topic_properties); + } // namespace kafka diff --git a/src/v/kafka/server/tests/CMakeLists.txt b/src/v/kafka/server/tests/CMakeLists.txt index 44bdd9e5f11b9..82fc3666d61e0 100644 --- a/src/v/kafka/server/tests/CMakeLists.txt +++ b/src/v/kafka/server/tests/CMakeLists.txt @@ -11,6 +11,7 @@ rp_test( validator_tests.cc fetch_unit_test.cc config_utils_test.cc + config_response_utils_test.cc DEFINITIONS BOOST_TEST_DYN_LINK LIBRARIES Boost::unit_test_framework v::kafka LABELS kafka diff --git a/src/v/kafka/server/tests/config_response_utils_test.cc b/src/v/kafka/server/tests/config_response_utils_test.cc new file mode 100644 index 0000000000000..0dc79bc6febfb --- /dev/null +++ b/src/v/kafka/server/tests/config_response_utils_test.cc @@ -0,0 +1,139 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +#include "kafka/server/tests/config_response_utils_test_help.h" +#include "utils/to_string.h" + +#include +#include +#include + +#include +#include +#include +#include + +std::optional get_config( + const kafka::config_response_container_t& result, std::string_view key) { + for (const auto& config : result) { + if (config.name == key) { + return config.value; + } + } + throw std::runtime_error(fmt::format("Key not found: {}", key)); +} + +kafka::describe_configs_source get_config_source( + const kafka::config_response_container_t& result, std::string_view key) { + for (const auto& config : result) { + if (config.name == key) { + return config.config_source; + } + } + throw std::runtime_error(fmt::format("Key not found: {}", key)); +} + +BOOST_AUTO_TEST_CASE(add_topic_config_if_requested_tristate) { + using namespace kafka; + auto verify_tristate_config = []( + std::optional default_value, + tristate override_value, + std::optional expected_value) { + config_response_container_t result; + + add_topic_config_if_requested( + std::nullopt, + result, + "test-global-broker-config-name", + default_value, + "test-topic-override-name", + override_value, + false, + std::nullopt); + + BOOST_CHECK_EQUAL( + get_config(result, "test-topic-override-name"), expected_value); + }; + + // clang-format off + verify_tristate_config(std::make_optional(2), tristate(1), std::make_optional("1")); + verify_tristate_config(std::make_optional(2), tristate(std::nullopt), std::make_optional("2")); + verify_tristate_config(std::make_optional(2), tristate(), std::make_optional("-1")); + + verify_tristate_config(std::nullopt, tristate(1), std::make_optional("1")); + verify_tristate_config(std::nullopt, tristate(std::nullopt), std::make_optional("-1")); + verify_tristate_config(std::nullopt, tristate(), std::make_optional("-1")); + // clang-format on +} + +BOOST_AUTO_TEST_CASE(add_topic_config_if_requested_optional) { + using namespace kafka; + auto verify_optional_config = []( + int default_value, + std::optional override_value, + std::optional expected_value, + bool hide_default_override) { + config_response_container_t result; + + add_topic_config_if_requested( + std::nullopt, + result, + "test-global-broker-config-name", + default_value, + "test-topic-override-name", + override_value, + false, + std::nullopt, + &describe_as_string, + hide_default_override); + + BOOST_CHECK_EQUAL( + get_config(result, "test-topic-override-name"), expected_value); + }; + + // clang-format off + verify_optional_config(2, std::make_optional(1), std::make_optional("1"), false); + verify_optional_config(2, std::nullopt, std::make_optional("2"), false); + // clang-format on +} + +BOOST_AUTO_TEST_CASE(add_topic_config_if_requested_optional_hide_default) { + using namespace kafka; + + auto verify_optional_config_with_hide_override = + []( + bool hide_default_override, + kafka::describe_configs_source expected_source) { + config_response_container_t result; + + add_topic_config_if_requested( + std::nullopt, + result, + "test-global-broker-config-name", + 2, + "test-topic-override-name", + std::make_optional(2), + false, + std::nullopt, + &describe_as_string, + hide_default_override); + + BOOST_CHECK_EQUAL( + get_config(result, "test-topic-override-name"), + std::make_optional("2")); + BOOST_CHECK_EQUAL( + get_config_source(result, "test-topic-override-name"), + expected_source); + }; + + // clang-format off + verify_optional_config_with_hide_override(false, kafka::describe_configs_source::topic); + verify_optional_config_with_hide_override(true, kafka::describe_configs_source::default_config); + // clang-format on +} diff --git a/src/v/kafka/server/tests/config_response_utils_test_help.h b/src/v/kafka/server/tests/config_response_utils_test_help.h new file mode 100644 index 0000000000000..d85e553f2639e --- /dev/null +++ b/src/v/kafka/server/tests/config_response_utils_test_help.h @@ -0,0 +1,46 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#pragma once + +#include "kafka/protocol/describe_configs.h" +#include "kafka/server/handlers/configs/config_response_utils.h" + +namespace kafka { + +template +ss::sstring describe_as_string(const T& t); + +template +void add_topic_config_if_requested( + const config_key_t& config_keys, + config_response_container_t& result, + std::string_view default_name, + const T& default_value, + std::string_view override_name, + const std::optional& overrides, + bool include_synonyms, + std::optional documentation, + Func&& describe_f, + bool hide_default_override = false); + +template +void add_topic_config_if_requested( + const config_key_t& config_keys, + config_response_container_t& result, + std::string_view default_name, + const std::optional& default_value, + std::string_view override_name, + const tristate& overrides, + bool include_synonyms, + std::optional documentation); + +} // namespace kafka diff --git a/src/v/kafka/server/tests/create_topics_test.cc b/src/v/kafka/server/tests/create_topics_test.cc index 40ea8d8751901..b44a6f17e8afb 100644 --- a/src/v/kafka/server/tests/create_topics_test.cc +++ b/src/v/kafka/server/tests/create_topics_test.cc @@ -9,6 +9,7 @@ #include "kafka/protocol/create_topics.h" #include "kafka/protocol/metadata.h" +#include "kafka/server/handlers/configs/config_response_utils.h" #include "kafka/server/handlers/topics/types.h" #include "redpanda/tests/fixture.h" #include "resource_mgmt/io_priority.h" @@ -131,8 +132,10 @@ class create_topic_fixture : public redpanda_thread_fixture { /// Server should return default configs BOOST_TEST(topic_res.configs, "empty config response"); auto cfg_map = config_map(*topic_res.configs); - const auto default_topic_properties = kafka::from_cluster_type( - app.metadata_cache.local().get_default_properties()); + const auto default_topic_properties = config_map( + kafka::report_topic_configs( + app.metadata_cache.local(), + app.metadata_cache.local().get_default_properties())); BOOST_TEST( cfg_map == default_topic_properties, "incorrect default properties"); @@ -149,8 +152,9 @@ class create_topic_fixture : public redpanda_thread_fixture { auto cfg = app.metadata_cache.local().get_topic_cfg( model::topic_namespace_view{model::kafka_namespace, topic_res.name}); BOOST_TEST(cfg, "missing topic config"); - auto config_map = kafka::from_cluster_type(cfg->properties); - BOOST_TEST(config_map == resp_cfgs, "configs didn't match"); + auto cfg_map = config_map(kafka::report_topic_configs( + app.metadata_cache.local(), cfg->properties)); + BOOST_TEST(cfg_map == resp_cfgs, "configs didn't match"); BOOST_CHECK_EQUAL( topic_res.topic_config_error_code, kafka::error_code::none); } diff --git a/tests/rptest/tests/topic_creation_test.py b/tests/rptest/tests/topic_creation_test.py index f6df6df85ce01..73f1363629993 100644 --- a/tests/rptest/tests/topic_creation_test.py +++ b/tests/rptest/tests/topic_creation_test.py @@ -309,6 +309,11 @@ class CreateTopicsResponseTest(RedpandaTest): DEFAULT_CLEANUP_POLICY = 'delete' DEFAULT_CONFIG_SOURCE = 5 + CONFIG_SOURCE_MAPPING = { + 1: 'DYNAMIC_TOPIC_CONFIG', + 5: 'DEFAULT_CONFIG', + } + def __init__(self, test_context): super(CreateTopicsResponseTest, self).__init__(test_context=test_context) @@ -331,6 +336,16 @@ def create_topics(self, p_cnt, r_fac, n=1, validate_only=False): topics=topics, validate_only=validate_only) + def create_topic(self, name): + topics = [{ + 'name': f"{name}", + 'partition_count': 1, + 'replication_factor': 1 + }] + return self.kcl_client.create_topics(6, + topics=topics, + validate_only=False) + def get_np(self, tp): return tp['NumPartitions'] @@ -390,14 +405,22 @@ def test_create_topic_response_configs(self): b. serialized correctly """ - topics = self.create_topics(1, 1) - for topic in topics: - cleanup_policy = self.get_config_by_name(topic, 'cleanup.policy') - assert cleanup_policy is not None, "cleanup.policy missing from topic config" - assert cleanup_policy[ - 'Value'] == self.DEFAULT_CLEANUP_POLICY, f"cleanup.policy = {cleanup_policy['Value']}, expected {self.DEFAULT_CLEANUP_POLICY}" - assert cleanup_policy[ - 'Source'] == self.DEFAULT_CONFIG_SOURCE, f"cleanup.policy = {cleanup_policy['Source']}, expected {self.DEFAULT_CONFIG_SOURCE}" + topic_name = 'test-create-topic-response' + create_topics_response = self.create_topic(topic_name) + topic_response = create_topics_response[0] + + res = self.kcl_client.describe_topic(topic_name) + describe_configs = [line.split() for line in res.strip().split('\n')] + + for (key, value, source) in describe_configs: + topic_config = self.get_config_by_name(topic_response, key) + + assert topic_config, f"Config '{key}' returned by DescribeConfigs is missing from configs response in CreateTopic" + assert topic_config[ + 'Value'] == value, f"config value mismatch for {key} across CreateTopic and DescribeConfigs: {topic_config['Value']} != {value}" + + assert self.CONFIG_SOURCE_MAPPING[topic_config[ + 'Source']] == source, f"config source mismatch for {key} across CreateTopic and DescribeConfigs: {self.CONFIG_SOURCE_MAPPING[topic_config['Source']]} != {source}" @cluster(num_nodes=3) def test_create_topic_validate_only(self):