Skip to content

Commit

Permalink
Merge pull request #24193 from redpanda-data/stephan/high-density-ove…
Browse files Browse the repository at this point in the history
…rsized

Avoid some oversized allocs at high partition density
  • Loading branch information
piyushredpanda authored Nov 22, 2024
2 parents 895f2d5 + d39bde7 commit 9a9a4ae
Show file tree
Hide file tree
Showing 10 changed files with 58 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "model/fundamental.h"
#include "model/metadata.h"
#include "model/namespace.h"
#include "partition_manager.h"
#include "redpanda/application.h"
#include "redpanda/tests/fixture.h"
#include "security/scram_credential.h"
Expand Down Expand Up @@ -167,7 +168,9 @@ TEST_P(ClusterRecoveryBackendLeadershipParamTest, TestRecoveryControllerState) {
cluster::tx_executor{}.run_random_workload(
spec, remote_p->raft()->term(), remote_p->rm_stm(), remote_p->log());

auto partitions = app.partition_manager.local().partitions();
cluster::partition_manager::ntp_table_container partitions(
app.partition_manager.local().partitions().begin(),
app.partition_manager.local().partitions().end());
for (const auto& [ntp, p] : partitions) {
if (ntp == model::controller_ntp) {
continue;
Expand Down Expand Up @@ -264,7 +267,9 @@ TEST_P(ClusterRecoveryBackendLeadershipParamTest, TestRecoveryControllerState) {
auto topic_count
= app.controller->get_topics_state().local().all_topics_count();
ASSERT_LE(2, topic_count);
auto partitions = app.partition_manager.local().partitions();
cluster::partition_manager::ntp_table_container partitions(
app.partition_manager.local().partitions().begin(),
app.partition_manager.local().partitions().end());
for (const auto& [ntp, p] : partitions) {
if (!model::is_user_topic(ntp)) {
continue;
Expand Down
4 changes: 2 additions & 2 deletions src/v/cluster/partition_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -348,14 +348,14 @@ ss::future<> partition_manager::stop_partitions() {
// prevent partitions from being accessed
auto partitions = std::exchange(_ntp_table, {});

co_await ssx::async_clear(_raft_table)();
co_await ssx::async_clear(_raft_table);

// shutdown all partitions
co_await ss::max_concurrent_for_each(partitions, 1024, [this](auto& e) {
return do_shutdown(e.second).discard_result();
});

co_await ssx::async_clear(partitions)();
co_await ssx::async_clear(partitions);
}

ss::future<xshard_transfer_state>
Expand Down
6 changes: 3 additions & 3 deletions src/v/cluster/partition_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "cluster/state_machine_registry.h"
#include "cluster/types.h"
#include "config/property.h"
#include "container/chunked_hash_map.h"
#include "container/intrusive_list_helpers.h"
#include "features/feature_table.h"
#include "model/fundamental.h"
Expand All @@ -37,7 +38,7 @@ class partition_manager
: public ss::peering_sharded_service<partition_manager> {
public:
using ntp_table_container
= model::ntp_flat_map_type<ss::lw_shared_ptr<partition>>;
= model::ntp_map_type<ss::lw_shared_ptr<partition>>;

partition_manager(
ss::sharded<storage::api>&,
Expand Down Expand Up @@ -277,8 +278,7 @@ class partition_manager
ntp_callbacks<unmanage_cb_t> _unmanage_watchers;
// XXX use intrusive containers here
ntp_table_container _ntp_table;
absl::flat_hash_map<raft::group_id, ss::lw_shared_ptr<partition>>
_raft_table;
chunked_hash_map<raft::group_id, ss::lw_shared_ptr<partition>> _raft_table;

ss::sharded<cloud_storage::partition_recovery_manager>&
_partition_recovery_mgr;
Expand Down
1 change: 1 addition & 0 deletions src/v/model/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ redpanda_cc_library(
"//src/v/bytes",
"//src/v/bytes:iobuf",
"//src/v/bytes:iobuf_parser",
"//src/v/container:chunked_hash_map",
"//src/v/container:fragmented_vector",
"//src/v/hashing:crc32c",
"//src/v/reflection:adl",
Expand Down
15 changes: 3 additions & 12 deletions src/v/model/ktp.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#pragma once

#include "container/chunked_hash_map.h"
#include "model/fundamental.h"
#include "model/metadata.h"
#include "model/namespace.h"
Expand Down Expand Up @@ -338,22 +339,12 @@ struct ktp_hash_eq {
};

/**
* @brief Helper alias to declare a flat map from ntp to V.
* @brief Helper alias to declare a map from ntp to V.
*
* Uses transparent comparator to allow any ntp object to be used for lookup.
*/
template<typename V>
using ntp_flat_map_type
= absl::flat_hash_map<model::ntp, V, ktp_hash_eq, ktp_hash_eq>;

/**
* @brief Helper alias to declare a flat map from ntp to V.
*
* Uses transparent comparator to allow any ntp object to be used for lookup.
*/
template<typename V>
using ntp_node_map_type
= absl::node_hash_map<model::ntp, V, ktp_hash_eq, ktp_hash_eq>;
using ntp_map_type = chunked_hash_map<model::ntp, V, ktp_hash_eq, ktp_hash_eq>;

} // namespace model

Expand Down
2 changes: 1 addition & 1 deletion src/v/ssx/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ redpanda_cc_library(
visibility = ["//visibility:public"],
deps = [
"//src/v/base",
"@abseil-cpp//absl/container:flat_hash_map",
"//src/v/container:chunked_hash_map",
"@seastar",
],
)
Expand Down
86 changes: 33 additions & 53 deletions src/v/ssx/async-clear.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,71 +12,51 @@
#pragma once

#include "base/seastarx.h"
#include "container/chunked_hash_map.h"

#include <seastar/core/future.hh>
#include <seastar/coroutine/maybe_yield.hh>

#include <absl/container/flat_hash_map.h>

namespace ssx {

/**
* Wrapping the async clear helper in a class is a workaround
* for LLVM bug https://github.com/llvm/llvm-project/issues/49689
* For sufficiently large containers where the element destructors
* do some work, it is problematic to spend a long time clearing
* the container without yielding to the scheduler.
*
* This function yields every so often while erasing all elements
* in a container.
*
* Once we are on a version with the fix for #46989 backported,
* this can be reduced to just a function.
* The type is specific to absl::flat_hash_map to avoid accidentially
* using this function on types where repeatedly erasing from the start is
* very expensive, like std::vector.
*/
template<typename K, typename V, typename Hash, typename Eq, typename Alloc>
class async_clear {
public:
using map_type = absl::flat_hash_map<K, V, Hash, Eq, Alloc>;
explicit async_clear(map_type& c)
: _container(c) {}

/**
* For sufficiently large containers where the element destructors
* do some work, it is problematic to spend a long time clearing
* the container without yielding to the scheduler.
*
* This function yields every so often while erasing all elements
* in a container.
*
* The type is specific to absl::flat_hash_map to avoid accidentially
* using this function on types where repeatedly erasing from the start is
* very expensive, like std::vector.
*/
ss::future<> operator()() {
// Below threshold_size, just call clear().
// Otherwise yield to the scheduler every `threshold_size` elements
constexpr size_t threshold_size = 100;

if (_container.size() < threshold_size) {
_container.clear();
co_return;
}
template<typename Key, typename Value, typename Hash, typename EqualTo>
ss::future<>
async_clear(chunked_hash_map<Key, Value, Hash, EqualTo>& container) {
// Below threshold_size, just call clear().
// Otherwise yield to the scheduler every `threshold_size` elements
constexpr size_t threshold_size = 100;

if (container.size() < threshold_size) {
container.clear();
co_return;
}

size_t i = 0;
auto it = _container.begin();
while (it != _container.end()) {
// Copy the iterator as erase invalidates it.
auto current = it++;
_container.erase(current);
size_t i = 0;
auto it = container.begin();
while (it != container.end()) {
it = container.erase(it);

if (++i % threshold_size == 0) {
co_await ss::coroutine::maybe_yield();
// incase the iterator got invaliated between scheduling
// points.
it = _container.begin();
}
if (++i % threshold_size == 0) {
co_await ss::coroutine::maybe_yield();
// incase the iterator got invaliated between scheduling
// points.
it = container.begin();
}
vassert(
_container.empty(),
"Container is non empty, size: {}",
_container.size());
}

map_type& _container;
};
vassert(
container.empty(), "Container is non empty, size: {}", container.size());
}

} // namespace ssx
10 changes: 5 additions & 5 deletions src/v/storage/log_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ ss::future<> log_manager::stop() {
return clean_close(entry.second->handle);
});
co_await _batch_cache.stop();
co_await ssx::async_clear(_logs)();
co_await ssx::async_clear(_logs);
if (_compaction_hash_key_map) {
// Clear memory used for the compaction hash map, if any.
co_await _compaction_hash_key_map->initialize(0);
Expand Down Expand Up @@ -594,10 +594,10 @@ ss::future<> log_manager::shutdown(model::ntp ntp) {
vlog(stlog.debug, "Asked to shutdown: {}", ntp);
auto gate = _gate.hold();
auto handle = _logs.extract(ntp);
if (handle.empty()) {
if (!handle) {
co_return;
}
co_await clean_close(handle.mapped()->handle);
co_await clean_close(handle.value().second->handle);
vlog(stlog.debug, "Shutdown: {}", ntp);
}

Expand All @@ -606,11 +606,11 @@ ss::future<> log_manager::remove(model::ntp ntp) {
auto g = _gate.hold();
auto handle = _logs.extract(ntp);
update_log_count();
if (handle.empty()) {
if (!handle) {
co_return;
}
// 'ss::shared_ptr<>' make a copy
auto lg = handle.mapped()->handle;
auto lg = handle.value().second->handle;
vlog(stlog.info, "Removing: {}", lg);
// NOTE: it is ok to *not* externally synchronize the log here
// because remove, takes a write lock on each individual segments
Expand Down
3 changes: 2 additions & 1 deletion src/v/storage/log_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "base/seastarx.h"
#include "base/units.h"
#include "config/property.h"
#include "container/chunked_hash_map.h"
#include "container/intrusive_list_helpers.h"
#include "features/feature_table.h"
#include "model/fundamental.h"
Expand Down Expand Up @@ -255,7 +256,7 @@ class log_manager {

private:
using logs_type
= absl::flat_hash_map<model::ntp, std::unique_ptr<log_housekeeping_meta>>;
= chunked_hash_map<model::ntp, std::unique_ptr<log_housekeeping_meta>>;
using compaction_list_type
= intrusive_list<log_housekeeping_meta, &log_housekeeping_meta::link>;

Expand Down
2 changes: 1 addition & 1 deletion src/v/transform/rpc/tests/transform_rpc_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,7 @@ class fake_partition_manager : public partition_manager {
fake_offset_tracker* _offset_tracker;
int _errors_to_inject = 0;
ss::chunked_fifo<produced_batch> _produced_batches;
model::ntp_flat_map_type<ss::shard_id> _shard_locations;
model::ntp_map_type<ss::shard_id> _shard_locations;
};

constexpr uint16_t test_server_port = 8080;
Expand Down

0 comments on commit 9a9a4ae

Please sign in to comment.