Skip to content

Commit

Permalink
kafka: refactor with general config types
Browse files Browse the repository at this point in the history
This refactors the config response utility code to use types that are
not specific to neither the describe configs nor the create topics
handlers.

(cherry picked from commit 4f5d6b6)
  • Loading branch information
pgellert committed Mar 21, 2024
1 parent e10f89d commit 2501f66
Show file tree
Hide file tree
Showing 9 changed files with 245 additions and 138 deletions.
215 changes: 112 additions & 103 deletions src/v/kafka/server/handlers/configs/config_response_utils.cc

Large diffs are not rendered by default.

37 changes: 26 additions & 11 deletions src/v/kafka/server/handlers/configs/config_response_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,41 @@

#include "cluster/types.h"
#include "kafka/protocol/describe_configs.h"
#include "kafka/protocol/create_topics.h"
#include "kafka/protocol/schemata/create_topics_response.h"

#include <iterator>
#include <optional>

namespace kafka {

void report_topic_config(
const describe_configs_resource& resource,
describe_configs_result& result,
struct config_response {
ss::sstring name{};
std::optional<ss::sstring> value{};
bool read_only{};
bool is_default{};
kafka::describe_configs_source config_source{-1};
bool is_sensitive{};
std::vector<describe_configs_synonym> synonyms{};
kafka::describe_configs_type config_type{0};
std::optional<ss::sstring> documentation{};

describe_configs_resource_result to_describe_config();
creatable_topic_configs to_create_config();
};

using config_response_container_t = std::vector<config_response>;
using config_key_t = std::optional<std::vector<ss::sstring>>;

config_response_container_t make_topic_configs(
const cluster::metadata_cache& metadata_cache,
const cluster::topic_properties& topic_properties,
const config_key_t& config_keys,
bool include_synonyms,
bool include_documentation);

void report_broker_config(
const describe_configs_resource& resource,
describe_configs_result& result,
config_response_container_t make_broker_configs(
const config_key_t& config_keys,
bool include_synonyms,
bool include_documentation);

std::vector<creatable_topic_configs> make_configs(
const cluster::metadata_cache& metadata_cache,
const cluster::topic_properties& topic_config);

} // namespace kafka
6 changes: 3 additions & 3 deletions src/v/kafka/server/handlers/create_topics.cc
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ append_topic_configs(request_context& ctx, create_topics_response& response) {
model::topic_namespace_view{model::kafka_namespace, ct_result.name});
if (cfg) {
ct_result.configs = std::make_optional(
make_configs(ctx.metadata_cache(), cfg->properties));
report_topic_configs(ctx.metadata_cache(), cfg->properties));
ct_result.topic_config_error_code = kafka::error_code::none;
} else {
// Topic was sucessfully created but metadata request did not
Expand Down Expand Up @@ -215,8 +215,8 @@ ss::future<response_ptr> create_topics_handler::handle(
if (ctx.header().version >= api_version(5)) {
auto default_properties
= ctx.metadata_cache().get_default_properties();
result.configs = std::make_optional(
make_configs(ctx.metadata_cache(), default_properties));
result.configs = std::make_optional(report_topic_configs(
ctx.metadata_cache(), default_properties));
}
return result;
});
Expand Down
59 changes: 59 additions & 0 deletions src/v/kafka/server/handlers/describe_configs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "kafka/server/handlers/describe_configs.h"

#include "cluster/metadata_cache.h"
#include "cluster/types.h"
#include "config/configuration.h"
#include "config/data_directory_path.h"
#include "config/node_config.h"
Expand Down Expand Up @@ -40,6 +41,64 @@

namespace kafka {

static void report_topic_config(
const describe_configs_resource& resource,
describe_configs_result& result,
const cluster::metadata_cache& metadata_cache,
const cluster::topic_properties& topic_properties,
bool include_synonyms,
bool include_documentation) {
auto res = make_topic_configs(
metadata_cache,
topic_properties,
resource.configuration_keys,
include_synonyms,
include_documentation);

result.configs.reserve(res.size());
for (auto& conf : res) {
result.configs.push_back(conf.to_describe_config());
}
}

static void report_broker_config(
const describe_configs_resource& resource,
describe_configs_result& result,
bool include_synonyms,
bool include_documentation) {
if (!result.resource_name.empty()) {
int32_t broker_id = -1;
auto res = std::from_chars(
result.resource_name.data(),
result.resource_name.data() + result.resource_name.size(), // NOLINT
broker_id);
if (res.ec == std::errc()) {
if (broker_id != *config::node().node_id()) {
result.error_code = error_code::invalid_request;
result.error_message = ssx::sformat(
"Unexpected broker id {} expected {}",
broker_id,
*config::node().node_id());
return;
}
} else {
result.error_code = error_code::invalid_request;
result.error_message = ssx::sformat(
"Broker id must be an integer but received {}",
result.resource_name);
return;
}
}

auto res = make_broker_configs(
resource.configuration_keys, include_synonyms, include_documentation);

result.configs.reserve(res.size());
for (auto& conf : res) {
result.configs.push_back(conf.to_describe_config());
}
}

template<>
ss::future<response_ptr> describe_configs_handler::handle(
request_context ctx, [[maybe_unused]] ss::smp_service_group ssg) {
Expand Down
21 changes: 21 additions & 0 deletions src/v/kafka/server/handlers/topics/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -218,4 +218,25 @@ to_cluster_type(const creatable_topic& t) {
return ret;
}

static std::vector<kafka::creatable_topic_configs>
convert_topic_configs(std::vector<kafka::config_response>&& topic_cfgs) {
auto configs = std::vector<kafka::creatable_topic_configs>();
configs.reserve(topic_cfgs.size());

for (auto& conf : topic_cfgs) {
configs.push_back(conf.to_create_config());
}

return configs;
}

std::vector<kafka::creatable_topic_configs> report_topic_configs(
const cluster::metadata_cache& metadata_cache,
const cluster::topic_properties& topic_properties) {
auto topic_cfgs = make_topic_configs(
metadata_cache, topic_properties, std::nullopt, false, false);

return convert_topic_configs(std::move(topic_cfgs));
}

} // namespace kafka
5 changes: 5 additions & 0 deletions src/v/kafka/server/handlers/topics/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "kafka/protocol/schemata/create_topics_request.h"
#include "kafka/protocol/schemata/create_topics_response.h"
#include "kafka/server/errors.h"
#include "kafka/server/handlers/configs/config_response_utils.h"
#include "model/fundamental.h"
#include "model/namespace.h"
#include "utils/absl_sstring_hash.h"
Expand Down Expand Up @@ -139,4 +140,8 @@ config_map_t config_map(const std::vector<creatable_topic_configs>& config);
cluster::custom_assignable_topic_configuration
to_cluster_type(const creatable_topic& t);

std::vector<kafka::creatable_topic_configs> report_topic_configs(
const cluster::metadata_cache& metadata_cache,
const cluster::topic_properties& topic_properties);

} // namespace kafka
25 changes: 11 additions & 14 deletions src/v/kafka/server/tests/config_response_utils_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
#include <string_view>
#include <tuple>

std::optional<ss::sstring>
get_config(const kafka::describe_configs_result& result, std::string_view key) {
for (const auto& config : result.configs) {
std::optional<ss::sstring> get_config(
const kafka::config_response_container_t& result, std::string_view key) {
for (const auto& config : result) {
if (config.name == key) {
return config.value;
}
Expand All @@ -30,8 +30,8 @@ get_config(const kafka::describe_configs_result& result, std::string_view key) {
}

kafka::describe_configs_source get_config_source(
const kafka::describe_configs_result& result, std::string_view key) {
for (const auto& config : result.configs) {
const kafka::config_response_container_t& result, std::string_view key) {
for (const auto& config : result) {
if (config.name == key) {
return config.config_source;
}
Expand All @@ -45,11 +45,10 @@ BOOST_AUTO_TEST_CASE(add_topic_config_if_requested_tristate) {
std::optional<int> default_value,
tristate<int> override_value,
std::optional<ss::sstring> expected_value) {
describe_configs_resource resource{};
describe_configs_result result{};
config_response_container_t result;

add_topic_config_if_requested(
resource,
std::nullopt,
result,
"test-global-broker-config-name",
default_value,
Expand Down Expand Up @@ -80,11 +79,10 @@ BOOST_AUTO_TEST_CASE(add_topic_config_if_requested_optional) {
std::optional<int> override_value,
std::optional<ss::sstring> expected_value,
bool hide_default_override) {
describe_configs_resource resource{};
describe_configs_result result{};
config_response_container_t result;

add_topic_config_if_requested(
resource,
std::nullopt,
result,
"test-global-broker-config-name",
default_value,
Expand Down Expand Up @@ -112,11 +110,10 @@ BOOST_AUTO_TEST_CASE(add_topic_config_if_requested_optional_hide_default) {
[](
bool hide_default_override,
kafka::describe_configs_source expected_source) {
describe_configs_resource resource{};
describe_configs_result result{};
config_response_container_t result;

add_topic_config_if_requested(
resource,
std::nullopt,
result,
"test-global-broker-config-name",
2,
Expand Down
9 changes: 5 additions & 4 deletions src/v/kafka/server/tests/config_response_utils_test_help.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#pragma once

#include "kafka/protocol/describe_configs.h"
#include "kafka/server/handlers/configs/config_response_utils.h"

namespace kafka {

Expand All @@ -20,8 +21,8 @@ ss::sstring describe_as_string(const T& t);

template<typename T, typename Func>
void add_topic_config_if_requested(
const describe_configs_resource& resource,
describe_configs_result& result,
const config_key_t& config_keys,
config_response_container_t& result,
std::string_view default_name,
const T& default_value,
std::string_view override_name,
Expand All @@ -33,8 +34,8 @@ void add_topic_config_if_requested(

template<typename T>
void add_topic_config_if_requested(
const describe_configs_resource& resource,
describe_configs_result& result,
const config_key_t& config_keys,
config_response_container_t& result,
std::string_view default_name,
const std::optional<T>& default_value,
std::string_view override_name,
Expand Down
6 changes: 3 additions & 3 deletions src/v/kafka/server/tests/create_topics_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ class create_topic_fixture : public redpanda_thread_fixture {
BOOST_TEST(topic_res.configs, "empty config response");
auto cfg_map = config_map(*topic_res.configs);
const auto default_topic_properties = config_map(
kafka::make_configs(
kafka::report_topic_configs(
app.metadata_cache.local(),
app.metadata_cache.local().get_default_properties()));
BOOST_TEST(
Expand All @@ -152,8 +152,8 @@ class create_topic_fixture : public redpanda_thread_fixture {
auto cfg = app.metadata_cache.local().get_topic_cfg(
model::topic_namespace_view{model::kafka_namespace, topic_res.name});
BOOST_TEST(cfg, "missing topic config");
auto cfg_map = config_map(
kafka::make_configs(app.metadata_cache.local(), cfg->properties));
auto cfg_map = config_map(kafka::report_topic_configs(
app.metadata_cache.local(), cfg->properties));
BOOST_TEST(cfg_map == resp_cfgs, "configs didn't match");
BOOST_CHECK_EQUAL(
topic_res.topic_config_error_code, kafka::error_code::none);
Expand Down

0 comments on commit 2501f66

Please sign in to comment.