diff --git a/src/v/cluster/cloud_metadata/tests/cluster_recovery_backend_test.cc b/src/v/cluster/cloud_metadata/tests/cluster_recovery_backend_test.cc index 291ef19ca1b1c..cdb7ac135e656 100644 --- a/src/v/cluster/cloud_metadata/tests/cluster_recovery_backend_test.cc +++ b/src/v/cluster/cloud_metadata/tests/cluster_recovery_backend_test.cc @@ -29,6 +29,7 @@ #include "model/fundamental.h" #include "model/metadata.h" #include "model/namespace.h" +#include "partition_manager.h" #include "redpanda/application.h" #include "redpanda/tests/fixture.h" #include "security/scram_credential.h" @@ -167,7 +168,9 @@ TEST_P(ClusterRecoveryBackendLeadershipParamTest, TestRecoveryControllerState) { cluster::tx_executor{}.run_random_workload( spec, remote_p->raft()->term(), remote_p->rm_stm(), remote_p->log()); - auto partitions = app.partition_manager.local().partitions(); + cluster::partition_manager::ntp_table_container partitions( + app.partition_manager.local().partitions().begin(), + app.partition_manager.local().partitions().end()); for (const auto& [ntp, p] : partitions) { if (ntp == model::controller_ntp) { continue; @@ -264,7 +267,9 @@ TEST_P(ClusterRecoveryBackendLeadershipParamTest, TestRecoveryControllerState) { auto topic_count = app.controller->get_topics_state().local().all_topics_count(); ASSERT_LE(2, topic_count); - auto partitions = app.partition_manager.local().partitions(); + cluster::partition_manager::ntp_table_container partitions( + app.partition_manager.local().partitions().begin(), + app.partition_manager.local().partitions().end()); for (const auto& [ntp, p] : partitions) { if (!model::is_user_topic(ntp)) { continue; diff --git a/src/v/cluster/partition_manager.cc b/src/v/cluster/partition_manager.cc index 4179ffa27cb6e..5183f8670bc20 100644 --- a/src/v/cluster/partition_manager.cc +++ b/src/v/cluster/partition_manager.cc @@ -348,14 +348,14 @@ ss::future<> partition_manager::stop_partitions() { // prevent partitions from being accessed auto partitions = std::exchange(_ntp_table, {}); - co_await ssx::async_clear(_raft_table)(); + co_await ssx::async_clear(_raft_table); // shutdown all partitions co_await ss::max_concurrent_for_each(partitions, 1024, [this](auto& e) { return do_shutdown(e.second).discard_result(); }); - co_await ssx::async_clear(partitions)(); + co_await ssx::async_clear(partitions); } ss::future diff --git a/src/v/cluster/partition_manager.h b/src/v/cluster/partition_manager.h index 0aca1e070f448..005e8d2569c34 100644 --- a/src/v/cluster/partition_manager.h +++ b/src/v/cluster/partition_manager.h @@ -20,6 +20,7 @@ #include "cluster/state_machine_registry.h" #include "cluster/types.h" #include "config/property.h" +#include "container/chunked_hash_map.h" #include "container/intrusive_list_helpers.h" #include "features/feature_table.h" #include "model/fundamental.h" @@ -37,7 +38,7 @@ class partition_manager : public ss::peering_sharded_service { public: using ntp_table_container - = model::ntp_flat_map_type>; + = model::ntp_map_type>; partition_manager( ss::sharded&, @@ -277,8 +278,7 @@ class partition_manager ntp_callbacks _unmanage_watchers; // XXX use intrusive containers here ntp_table_container _ntp_table; - absl::flat_hash_map> - _raft_table; + chunked_hash_map> _raft_table; ss::sharded& _partition_recovery_mgr; diff --git a/src/v/model/BUILD b/src/v/model/BUILD index 5a018a578b1a8..888525447d326 100644 --- a/src/v/model/BUILD +++ b/src/v/model/BUILD @@ -52,6 +52,7 @@ redpanda_cc_library( "//src/v/bytes", "//src/v/bytes:iobuf", "//src/v/bytes:iobuf_parser", + "//src/v/container:chunked_hash_map", "//src/v/container:fragmented_vector", "//src/v/hashing:crc32c", "//src/v/reflection:adl", diff --git a/src/v/model/ktp.h b/src/v/model/ktp.h index 0472e07287a52..eff551c70d196 100644 --- a/src/v/model/ktp.h +++ b/src/v/model/ktp.h @@ -11,6 +11,7 @@ #pragma once +#include "container/chunked_hash_map.h" #include "model/fundamental.h" #include "model/metadata.h" #include "model/namespace.h" @@ -338,22 +339,12 @@ struct ktp_hash_eq { }; /** - * @brief Helper alias to declare a flat map from ntp to V. + * @brief Helper alias to declare a map from ntp to V. * * Uses transparent comparator to allow any ntp object to be used for lookup. */ template -using ntp_flat_map_type - = absl::flat_hash_map; - -/** - * @brief Helper alias to declare a flat map from ntp to V. - * - * Uses transparent comparator to allow any ntp object to be used for lookup. - */ -template -using ntp_node_map_type - = absl::node_hash_map; +using ntp_map_type = chunked_hash_map; } // namespace model diff --git a/src/v/ssx/BUILD b/src/v/ssx/BUILD index 9fc9a6dce63c1..1e15dd633d656 100644 --- a/src/v/ssx/BUILD +++ b/src/v/ssx/BUILD @@ -87,7 +87,7 @@ redpanda_cc_library( visibility = ["//visibility:public"], deps = [ "//src/v/base", - "@abseil-cpp//absl/container:flat_hash_map", + "//src/v/container:chunked_hash_map", "@seastar", ], ) diff --git a/src/v/ssx/async-clear.h b/src/v/ssx/async-clear.h index 9e37cfc345199..8339ebf861fcf 100644 --- a/src/v/ssx/async-clear.h +++ b/src/v/ssx/async-clear.h @@ -12,71 +12,51 @@ #pragma once #include "base/seastarx.h" +#include "container/chunked_hash_map.h" #include #include -#include - namespace ssx { /** - * Wrapping the async clear helper in a class is a workaround - * for LLVM bug https://github.com/llvm/llvm-project/issues/49689 + * For sufficiently large containers where the element destructors + * do some work, it is problematic to spend a long time clearing + * the container without yielding to the scheduler. + * + * This function yields every so often while erasing all elements + * in a container. * - * Once we are on a version with the fix for #46989 backported, - * this can be reduced to just a function. + * The type is specific to absl::flat_hash_map to avoid accidentially + * using this function on types where repeatedly erasing from the start is + * very expensive, like std::vector. */ -template -class async_clear { -public: - using map_type = absl::flat_hash_map; - explicit async_clear(map_type& c) - : _container(c) {} - - /** - * For sufficiently large containers where the element destructors - * do some work, it is problematic to spend a long time clearing - * the container without yielding to the scheduler. - * - * This function yields every so often while erasing all elements - * in a container. - * - * The type is specific to absl::flat_hash_map to avoid accidentially - * using this function on types where repeatedly erasing from the start is - * very expensive, like std::vector. - */ - ss::future<> operator()() { - // Below threshold_size, just call clear(). - // Otherwise yield to the scheduler every `threshold_size` elements - constexpr size_t threshold_size = 100; - - if (_container.size() < threshold_size) { - _container.clear(); - co_return; - } +template +ss::future<> +async_clear(chunked_hash_map& container) { + // Below threshold_size, just call clear(). + // Otherwise yield to the scheduler every `threshold_size` elements + constexpr size_t threshold_size = 100; + + if (container.size() < threshold_size) { + container.clear(); + co_return; + } - size_t i = 0; - auto it = _container.begin(); - while (it != _container.end()) { - // Copy the iterator as erase invalidates it. - auto current = it++; - _container.erase(current); + size_t i = 0; + auto it = container.begin(); + while (it != container.end()) { + it = container.erase(it); - if (++i % threshold_size == 0) { - co_await ss::coroutine::maybe_yield(); - // incase the iterator got invaliated between scheduling - // points. - it = _container.begin(); - } + if (++i % threshold_size == 0) { + co_await ss::coroutine::maybe_yield(); + // incase the iterator got invaliated between scheduling + // points. + it = container.begin(); } - vassert( - _container.empty(), - "Container is non empty, size: {}", - _container.size()); } - - map_type& _container; -}; + vassert( + container.empty(), "Container is non empty, size: {}", container.size()); +} } // namespace ssx diff --git a/src/v/storage/log_manager.cc b/src/v/storage/log_manager.cc index 72a7d82e02490..df7fa88883007 100644 --- a/src/v/storage/log_manager.cc +++ b/src/v/storage/log_manager.cc @@ -194,7 +194,7 @@ ss::future<> log_manager::stop() { return clean_close(entry.second->handle); }); co_await _batch_cache.stop(); - co_await ssx::async_clear(_logs)(); + co_await ssx::async_clear(_logs); if (_compaction_hash_key_map) { // Clear memory used for the compaction hash map, if any. co_await _compaction_hash_key_map->initialize(0); @@ -594,10 +594,10 @@ ss::future<> log_manager::shutdown(model::ntp ntp) { vlog(stlog.debug, "Asked to shutdown: {}", ntp); auto gate = _gate.hold(); auto handle = _logs.extract(ntp); - if (handle.empty()) { + if (!handle) { co_return; } - co_await clean_close(handle.mapped()->handle); + co_await clean_close(handle.value().second->handle); vlog(stlog.debug, "Shutdown: {}", ntp); } @@ -606,11 +606,11 @@ ss::future<> log_manager::remove(model::ntp ntp) { auto g = _gate.hold(); auto handle = _logs.extract(ntp); update_log_count(); - if (handle.empty()) { + if (!handle) { co_return; } // 'ss::shared_ptr<>' make a copy - auto lg = handle.mapped()->handle; + auto lg = handle.value().second->handle; vlog(stlog.info, "Removing: {}", lg); // NOTE: it is ok to *not* externally synchronize the log here // because remove, takes a write lock on each individual segments diff --git a/src/v/storage/log_manager.h b/src/v/storage/log_manager.h index 2aef5c7d661dd..dd095e8a6c78e 100644 --- a/src/v/storage/log_manager.h +++ b/src/v/storage/log_manager.h @@ -14,6 +14,7 @@ #include "base/seastarx.h" #include "base/units.h" #include "config/property.h" +#include "container/chunked_hash_map.h" #include "container/intrusive_list_helpers.h" #include "features/feature_table.h" #include "model/fundamental.h" @@ -255,7 +256,7 @@ class log_manager { private: using logs_type - = absl::flat_hash_map>; + = chunked_hash_map>; using compaction_list_type = intrusive_list; diff --git a/src/v/transform/rpc/tests/transform_rpc_test.cc b/src/v/transform/rpc/tests/transform_rpc_test.cc index 7386ed679cf66..0debdc16ec42c 100644 --- a/src/v/transform/rpc/tests/transform_rpc_test.cc +++ b/src/v/transform/rpc/tests/transform_rpc_test.cc @@ -667,7 +667,7 @@ class fake_partition_manager : public partition_manager { fake_offset_tracker* _offset_tracker; int _errors_to_inject = 0; ss::chunked_fifo _produced_batches; - model::ntp_flat_map_type _shard_locations; + model::ntp_map_type _shard_locations; }; constexpr uint16_t test_server_port = 8080;