Skip to content

Commit

Permalink
k/h/metadata: use fragmented vec for topic metadata
Browse files Browse the repository at this point in the history
For workloads with a large amount of topics, this topics vector can grow
to be an oversized allocation.

Signed-off-by: Tyler Rockwood <[email protected]>
(cherry picked from commit c535652)
  • Loading branch information
rockwotj committed Dec 19, 2023
1 parent 8ee81c6 commit a09da9b
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 15 deletions.
3 changes: 2 additions & 1 deletion src/v/kafka/client/topic_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@
#include "kafka/client/partitioners.h"
#include "kafka/protocol/metadata.h"
#include "random/generators.h"
#include "utils/fragmented_vector.h"

#include <seastar/core/future.hh>

namespace kafka::client {

ss::future<>
topic_cache::apply(std::vector<metadata_response::topic>&& topics) {
topic_cache::apply(small_fragment_vector<metadata_response::topic>&& topics) {
topics_t cache;
cache.reserve(topics.size());
for (const auto& t : topics) {
Expand Down
4 changes: 3 additions & 1 deletion src/v/kafka/client/topic_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "model/fundamental.h"
#include "model/metadata.h"
#include "seastarx.h"
#include "utils/fragmented_vector.h"

#include <seastar/core/future.hh>

Expand Down Expand Up @@ -46,7 +47,8 @@ class topic_cache {
~topic_cache() noexcept = default;

/// \brief Apply the given metadata response.
ss::future<> apply(std::vector<metadata_response::topic>&& topics);
ss::future<>
apply(small_fragment_vector<metadata_response::topic>&& topics);

/// \brief Obtain the leader for the given topic-partition
ss::future<model::node_id> leader(model::topic_partition tp) const;
Expand Down
1 change: 1 addition & 0 deletions src/v/kafka/protocol/schemata/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,7 @@
# the container type from std::vector
override_member_container = {
'metadata_response_partition': 'large_fragment_vector',
'metadata_response_topic': 'small_fragment_vector',
'fetchable_partition_response': 'small_fragment_vector'
}

Expand Down
17 changes: 6 additions & 11 deletions src/v/kafka/server/handlers/metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -247,18 +247,16 @@ static metadata_response::topic make_topic_response(
return res;
}

static ss::future<std::vector<metadata_response::topic>> get_topic_metadata(
static ss::future<small_fragment_vector<metadata_response::topic>>
get_topic_metadata(
request_context& ctx,
metadata_request& request,
const is_node_isolated_or_decommissioned is_node_isolated) {
std::vector<metadata_response::topic> res;
small_fragment_vector<metadata_response::topic> res;

// request can be served from whatever happens to be in the cache
if (request.list_all_topics) {
auto& topics_md = ctx.metadata_cache().all_topics_metadata();
// reserve vector capacity to full size as there are only few topics
// outside of kafka namespace
res.reserve(topics_md.size());
for (const auto& [tp_ns, md] : topics_md) {
// only serve topics from the kafka namespace
if (tp_ns.ns != model::kafka_namespace) {
Expand All @@ -278,8 +276,8 @@ static ss::future<std::vector<metadata_response::topic>> get_topic_metadata(
make_topic_response(ctx, request, md.metadata, is_node_isolated));
}

return ss::make_ready_future<std::vector<metadata_response::topic>>(
std::move(res));
return ss::make_ready_future<
small_fragment_vector<metadata_response::topic>>(std::move(res));
}

std::vector<ss::future<metadata_response::topic>> new_topics;
Expand Down Expand Up @@ -326,10 +324,7 @@ static ss::future<std::vector<metadata_response::topic>> get_topic_metadata(
return ss::when_all_succeed(new_topics.begin(), new_topics.end())
.then([res = std::move(res)](
std::vector<metadata_response::topic> topics) mutable {
res.insert(
res.end(),
std::make_move_iterator(topics.begin()),
std::make_move_iterator(topics.end()));
std::move(topics.begin(), topics.end(), std::back_inserter(res));
return std::move(res);
});
}
Expand Down
4 changes: 2 additions & 2 deletions src/v/kafka/server/tests/delete_topics_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@ class delete_topics_request_fixture : public redpanda_thread_fixture {
void validate_topic_is_deleteted(const model::topic& tp) {
kafka::metadata_response resp = get_topic_metadata(tp);
auto it = std::find_if(
std::cbegin(resp.data.topics),
std::cend(resp.data.topics),
resp.data.topics.begin(),
resp.data.topics.end(),
[tp](const kafka::metadata_response::topic& md_tp) {
return md_tp.name == tp;
});
Expand Down

0 comments on commit a09da9b

Please sign in to comment.