From d675df77fe47bb94f6ed6cff13a4592060aab5c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Mon, 11 Nov 2024 21:39:23 +0100 Subject: [PATCH 1/4] Rate limiter size accessor --- nano/lib/rate_limiting.cpp | 21 ++++++++++++++++----- nano/lib/rate_limiting.hpp | 9 ++++++--- 2 files changed, 22 insertions(+), 8 deletions(-) diff --git a/nano/lib/rate_limiting.cpp b/nano/lib/rate_limiting.cpp index 2a4715c9ab..ced8828cd1 100644 --- a/nano/lib/rate_limiting.cpp +++ b/nano/lib/rate_limiting.cpp @@ -45,11 +45,6 @@ void nano::rate::token_bucket::refill () } } -std::size_t nano::rate::token_bucket::largest_burst () const -{ - return max_token_count - smallest_size; -} - void nano::rate::token_bucket::reset (std::size_t max_token_count_a, std::size_t refill_rate_a) { // A token count of 0 indicates unlimited capacity. We use 1e9 as @@ -63,6 +58,16 @@ void nano::rate::token_bucket::reset (std::size_t max_token_count_a, std::size_t last_refill = std::chrono::steady_clock::now (); } +std::size_t nano::rate::token_bucket::largest_burst () const +{ + return max_token_count - smallest_size; +} + +std::size_t nano::rate::token_bucket::size () const +{ + return current_size; +} + /* * rate_limiter */ @@ -82,4 +87,10 @@ void nano::rate_limiter::reset (std::size_t limit_a, double burst_ratio_a) { nano::lock_guard guard{ mutex }; bucket.reset (static_cast (limit_a * burst_ratio_a), limit_a); +} + +std::size_t nano::rate_limiter::size () const +{ + nano::lock_guard guard{ mutex }; + return bucket.size (); } \ No newline at end of file diff --git a/nano/lib/rate_limiting.hpp b/nano/lib/rate_limiting.hpp index cf377cb68f..3ba0f3b771 100644 --- a/nano/lib/rate_limiting.hpp +++ b/nano/lib/rate_limiting.hpp @@ -38,12 +38,13 @@ class token_bucket */ bool try_consume (unsigned tokens_required = 1); - /** Returns the largest burst observed */ - std::size_t largest_burst () const; - /** Update the max_token_count and/or refill_rate_a parameters */ void reset (std::size_t max_token_count, std::size_t refill_rate); + /** Returns the largest burst observed */ + std::size_t largest_burst () const; + std::size_t size () const; + private: void refill (); @@ -71,6 +72,8 @@ class rate_limiter final bool should_pass (std::size_t buffer_size); void reset (std::size_t limit, double burst_ratio = 1.0); + std::size_t size () const; + private: nano::rate::token_bucket bucket; mutable nano::mutex mutex; From bd36f3fbb2e51f0acaeffbefd0b8757ad051c082 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Wed, 23 Oct 2024 20:17:32 +0200 Subject: [PATCH 2/4] Move rate tests to a separate file # Conflicts: # nano/core_test/CMakeLists.txt --- nano/core_test/CMakeLists.txt | 1 + nano/core_test/rate_limiting.cpp | 113 +++++++++++++++++++++++++++++++ nano/core_test/utility.cpp | 104 ---------------------------- 3 files changed, 114 insertions(+), 104 deletions(-) create mode 100644 nano/core_test/rate_limiting.cpp diff --git a/nano/core_test/CMakeLists.txt b/nano/core_test/CMakeLists.txt index ade467ca6c..96f75f2bb0 100644 --- a/nano/core_test/CMakeLists.txt +++ b/nano/core_test/CMakeLists.txt @@ -42,6 +42,7 @@ add_executable( processor_service.cpp random.cpp random_pool.cpp + rate_limiting.cpp rep_crawler.cpp receivable.cpp peer_history.cpp diff --git a/nano/core_test/rate_limiting.cpp b/nano/core_test/rate_limiting.cpp new file mode 100644 index 0000000000..0d547e41c0 --- /dev/null +++ b/nano/core_test/rate_limiting.cpp @@ -0,0 +1,113 @@ +#include +#include + +#include + +#include +#include + +using namespace std::chrono_literals; + +TEST (rate, basic) +{ + nano::rate::token_bucket bucket (10, 10); + + // Initial burst + ASSERT_TRUE (bucket.try_consume (10)); + ASSERT_FALSE (bucket.try_consume (10)); + + // With a fill rate of 10 tokens/sec, await 1/3 sec and get 3 tokens + std::this_thread::sleep_for (300ms); + ASSERT_TRUE (bucket.try_consume (3)); + ASSERT_FALSE (bucket.try_consume (10)); + + // Allow time for the bucket to completely refill and do a full burst + std::this_thread::sleep_for (1s); + ASSERT_TRUE (bucket.try_consume (10)); + ASSERT_EQ (bucket.largest_burst (), 10); +} + +TEST (rate, network) +{ + // For the purpose of the test, one token represents 1MB instead of one byte. + // Allow for 10 mb/s bursts (max bucket size), 5 mb/s long term rate + nano::rate::token_bucket bucket (10, 5); + + // Initial burst of 10 mb/s over two calls + ASSERT_TRUE (bucket.try_consume (5)); + ASSERT_EQ (bucket.largest_burst (), 5); + ASSERT_TRUE (bucket.try_consume (5)); + ASSERT_EQ (bucket.largest_burst (), 10); + ASSERT_FALSE (bucket.try_consume (5)); + + // After 200 ms, the 5 mb/s fillrate means we have 1 mb available + std::this_thread::sleep_for (200ms); + ASSERT_TRUE (bucket.try_consume (1)); + ASSERT_FALSE (bucket.try_consume (1)); +} + +TEST (rate, reset) +{ + nano::rate::token_bucket bucket (0, 0); + + // consume lots of tokens, buckets should be unlimited + ASSERT_TRUE (bucket.try_consume (1000000)); + ASSERT_TRUE (bucket.try_consume (1000000)); + + // set bucket to be limited + bucket.reset (1000, 1000); + ASSERT_FALSE (bucket.try_consume (1001)); + ASSERT_TRUE (bucket.try_consume (1000)); + ASSERT_FALSE (bucket.try_consume (1000)); + std::this_thread::sleep_for (2ms); + ASSERT_TRUE (bucket.try_consume (2)); + + // reduce the limit + bucket.reset (100, 100 * 1000); + ASSERT_FALSE (bucket.try_consume (101)); + ASSERT_TRUE (bucket.try_consume (100)); + std::this_thread::sleep_for (1ms); + ASSERT_TRUE (bucket.try_consume (100)); + + // increase the limit + bucket.reset (2000, 1); + ASSERT_FALSE (bucket.try_consume (2001)); + ASSERT_TRUE (bucket.try_consume (2000)); + + // back to unlimited + bucket.reset (0, 0); + ASSERT_TRUE (bucket.try_consume (1000000)); + ASSERT_TRUE (bucket.try_consume (1000000)); +} + +TEST (rate, unlimited) +{ + nano::rate::token_bucket bucket (0, 0); + ASSERT_TRUE (bucket.try_consume (5)); + ASSERT_EQ (bucket.largest_burst (), 5); + ASSERT_TRUE (bucket.try_consume (static_cast (1e9))); + ASSERT_EQ (bucket.largest_burst (), static_cast (1e9)); + + // With unlimited tokens, consuming always succeed + ASSERT_TRUE (bucket.try_consume (static_cast (1e9))); + ASSERT_EQ (bucket.largest_burst (), static_cast (1e9)); +} + +TEST (rate, busy_spin) +{ + // Bucket should refill at a rate of 1 token per second + nano::rate::token_bucket bucket (1, 1); + + // Run a very tight loop for 5 seconds + a bit of wiggle room + int counter = 0; + for (auto start = std::chrono::steady_clock::now (), now = start; now < start + 5500ms; now = std::chrono::steady_clock::now ()) + { + if (bucket.try_consume ()) + { + ++counter; + } + } + + // Bucket starts fully refilled, therefore we see 1 additional request + ASSERT_EQ (counter, 6); +} \ No newline at end of file diff --git a/nano/core_test/utility.cpp b/nano/core_test/utility.cpp index 30ee29544e..e7049a7d9d 100644 --- a/nano/core_test/utility.cpp +++ b/nano/core_test/utility.cpp @@ -15,110 +15,6 @@ using namespace std::chrono_literals; -TEST (rate, basic) -{ - nano::rate::token_bucket bucket (10, 10); - - // Initial burst - ASSERT_TRUE (bucket.try_consume (10)); - ASSERT_FALSE (bucket.try_consume (10)); - - // With a fill rate of 10 tokens/sec, await 1/3 sec and get 3 tokens - std::this_thread::sleep_for (300ms); - ASSERT_TRUE (bucket.try_consume (3)); - ASSERT_FALSE (bucket.try_consume (10)); - - // Allow time for the bucket to completely refill and do a full burst - std::this_thread::sleep_for (1s); - ASSERT_TRUE (bucket.try_consume (10)); - ASSERT_EQ (bucket.largest_burst (), 10); -} - -TEST (rate, network) -{ - // For the purpose of the test, one token represents 1MB instead of one byte. - // Allow for 10 mb/s bursts (max bucket size), 5 mb/s long term rate - nano::rate::token_bucket bucket (10, 5); - - // Initial burst of 10 mb/s over two calls - ASSERT_TRUE (bucket.try_consume (5)); - ASSERT_EQ (bucket.largest_burst (), 5); - ASSERT_TRUE (bucket.try_consume (5)); - ASSERT_EQ (bucket.largest_burst (), 10); - ASSERT_FALSE (bucket.try_consume (5)); - - // After 200 ms, the 5 mb/s fillrate means we have 1 mb available - std::this_thread::sleep_for (200ms); - ASSERT_TRUE (bucket.try_consume (1)); - ASSERT_FALSE (bucket.try_consume (1)); -} - -TEST (rate, reset) -{ - nano::rate::token_bucket bucket (0, 0); - - // consume lots of tokens, buckets should be unlimited - ASSERT_TRUE (bucket.try_consume (1000000)); - ASSERT_TRUE (bucket.try_consume (1000000)); - - // set bucket to be limited - bucket.reset (1000, 1000); - ASSERT_FALSE (bucket.try_consume (1001)); - ASSERT_TRUE (bucket.try_consume (1000)); - ASSERT_FALSE (bucket.try_consume (1000)); - std::this_thread::sleep_for (2ms); - ASSERT_TRUE (bucket.try_consume (2)); - - // reduce the limit - bucket.reset (100, 100 * 1000); - ASSERT_FALSE (bucket.try_consume (101)); - ASSERT_TRUE (bucket.try_consume (100)); - std::this_thread::sleep_for (1ms); - ASSERT_TRUE (bucket.try_consume (100)); - - // increase the limit - bucket.reset (2000, 1); - ASSERT_FALSE (bucket.try_consume (2001)); - ASSERT_TRUE (bucket.try_consume (2000)); - - // back to unlimited - bucket.reset (0, 0); - ASSERT_TRUE (bucket.try_consume (1000000)); - ASSERT_TRUE (bucket.try_consume (1000000)); -} - -TEST (rate, unlimited) -{ - nano::rate::token_bucket bucket (0, 0); - ASSERT_TRUE (bucket.try_consume (5)); - ASSERT_EQ (bucket.largest_burst (), 5); - ASSERT_TRUE (bucket.try_consume (static_cast (1e9))); - ASSERT_EQ (bucket.largest_burst (), static_cast (1e9)); - - // With unlimited tokens, consuming always succeed - ASSERT_TRUE (bucket.try_consume (static_cast (1e9))); - ASSERT_EQ (bucket.largest_burst (), static_cast (1e9)); -} - -TEST (rate, busy_spin) -{ - // Bucket should refill at a rate of 1 token per second - nano::rate::token_bucket bucket (1, 1); - - // Run a very tight loop for 5 seconds + a bit of wiggle room - int counter = 0; - for (auto start = std::chrono::steady_clock::now (), now = start; now < start + std::chrono::milliseconds{ 5500 }; now = std::chrono::steady_clock::now ()) - { - if (bucket.try_consume ()) - { - ++counter; - } - } - - // Bucket starts fully refilled, therefore we see 1 additional request - ASSERT_EQ (counter, 6); -} - TEST (optional_ptr, basic) { struct valtype From e902b698848eb5317ad413f27279b2c5a0542610 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Mon, 11 Nov 2024 19:40:17 +0100 Subject: [PATCH 3/4] Bandwidth limiter container info --- nano/node/bandwidth_limiter.cpp | 8 ++++++++ nano/node/bandwidth_limiter.hpp | 2 ++ nano/node/node.cpp | 1 + 3 files changed, 11 insertions(+) diff --git a/nano/node/bandwidth_limiter.cpp b/nano/node/bandwidth_limiter.cpp index ade752051e..87e804ddc7 100644 --- a/nano/node/bandwidth_limiter.cpp +++ b/nano/node/bandwidth_limiter.cpp @@ -41,6 +41,14 @@ void nano::bandwidth_limiter::reset (std::size_t limit, double burst_ratio, nano limiter.reset (limit, burst_ratio); } +nano::container_info nano::bandwidth_limiter::container_info () const +{ + nano::container_info info; + info.put ("generic", limiter_generic.size ()); + info.put ("bootstrap", limiter_bootstrap.size ()); + return info; +} + /* * bandwidth_limiter_config */ diff --git a/nano/node/bandwidth_limiter.hpp b/nano/node/bandwidth_limiter.hpp index 7afafee75c..0cd774e1aa 100644 --- a/nano/node/bandwidth_limiter.hpp +++ b/nano/node/bandwidth_limiter.hpp @@ -37,6 +37,8 @@ class bandwidth_limiter final */ void reset (std::size_t limit, double burst_ratio, nano::transport::traffic_type type = nano::transport::traffic_type::generic); + nano::container_info container_info () const; + private: /** * Returns reference to limiter corresponding to the limit type diff --git a/nano/node/node.cpp b/nano/node/node.cpp index af513eae49..d27d2188b4 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -1217,6 +1217,7 @@ nano::container_info nano::node::container_info () const info.add ("local_block_broadcaster", local_block_broadcaster.container_info ()); info.add ("rep_tiers", rep_tiers.container_info ()); info.add ("message_processor", message_processor.container_info ()); + info.add ("bandwidth", outbound_limiter.container_info ()); return info; } From 08116617a4905ea998726b2ba77a3161951dfce3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Mon, 11 Nov 2024 22:02:07 +0100 Subject: [PATCH 4/4] Track tcp traffic type --- nano/lib/stats_enums.hpp | 8 ++++++-- nano/node/CMakeLists.txt | 2 ++ nano/node/transport/tcp_socket.cpp | 20 +++++++++++--------- nano/node/transport/tcp_socket.hpp | 4 +++- nano/node/transport/traffic_type.cpp | 7 +++++++ nano/node/transport/traffic_type.hpp | 4 ++++ 6 files changed, 33 insertions(+), 12 deletions(-) create mode 100644 nano/node/transport/traffic_type.cpp diff --git a/nano/lib/stats_enums.hpp b/nano/lib/stats_enums.hpp index a3190747aa..57055653c6 100644 --- a/nano/lib/stats_enums.hpp +++ b/nano/lib/stats_enums.hpp @@ -13,14 +13,12 @@ enum class type _invalid = 0, // Default value, should not be used test, - traffic_tcp, error, message, block, ledger, rollback, network, - tcp_server, vote, vote_processor, vote_processor_tier, @@ -31,11 +29,14 @@ enum class type http_callback, ipc, tcp, + tcp_server, tcp_channels, tcp_channels_rejected, tcp_channels_purge, tcp_listener, tcp_listener_rejected, + traffic_tcp, + traffic_tcp_type, channel, socket, confirmation_height, @@ -294,6 +295,9 @@ enum class detail reachout_live, reachout_cached, + // traffic + generic, + // tcp tcp_write_drop, tcp_write_no_socket_drop, diff --git a/nano/node/CMakeLists.txt b/nano/node/CMakeLists.txt index c4001a14a4..f50649b047 100644 --- a/nano/node/CMakeLists.txt +++ b/nano/node/CMakeLists.txt @@ -166,6 +166,8 @@ add_library( transport/tcp_server.cpp transport/tcp_socket.hpp transport/tcp_socket.cpp + transport/traffic_type.hpp + transport/traffic_type.cpp transport/transport.hpp transport/transport.cpp unchecked_map.cpp diff --git a/nano/node/transport/tcp_socket.cpp b/nano/node/transport/tcp_socket.cpp index 3d5ff9c512..dbaf59c46f 100644 --- a/nano/node/transport/tcp_socket.cpp +++ b/nano/node/transport/tcp_socket.cpp @@ -186,17 +186,18 @@ void nano::transport::tcp_socket::write_queued_messages () return; } - auto next = send_queue.pop (); - if (!next) + auto maybe_next = send_queue.pop (); + if (!maybe_next) { return; } + auto const & [next, type] = *maybe_next; set_default_timeout (); write_in_progress = true; - nano::async_write (raw_socket, next->buffer, - boost::asio::bind_executor (strand, [this_l = shared_from_this (), next /* `next` object keeps buffer in scope */] (boost::system::error_code ec, std::size_t size) { + nano::async_write (raw_socket, next.buffer, + boost::asio::bind_executor (strand, [this_l = shared_from_this (), next /* `next` object keeps buffer in scope */, type] (boost::system::error_code ec, std::size_t size) { debug_assert (this_l->strand.running_in_this_thread ()); auto node_l = this_l->node_w.lock (); @@ -214,12 +215,13 @@ void nano::transport::tcp_socket::write_queued_messages () else { node_l->stats.add (nano::stat::type::traffic_tcp, nano::stat::detail::all, nano::stat::dir::out, size, /* aggregate all */ true); + node_l->stats.add (nano::stat::type::traffic_tcp_type, to_stat_detail (type), nano::stat::dir::out, size); this_l->set_last_completion (); } - if (next->callback) + if (next.callback) { - next->callback (ec, size); + next.callback (ec, size); } if (!ec) @@ -436,17 +438,17 @@ bool nano::transport::socket_queue::insert (const buffer_t & buffer, callback_t return false; // Not queued } -std::optional nano::transport::socket_queue::pop () +auto nano::transport::socket_queue::pop () -> std::optional { nano::lock_guard guard{ mutex }; - auto try_pop = [this] (nano::transport::traffic_type type) -> std::optional { + auto try_pop = [this] (nano::transport::traffic_type type) -> std::optional { auto & que = queues[type]; if (!que.empty ()) { auto item = que.front (); que.pop (); - return item; + return std::make_pair (item, type); } return std::nullopt; }; diff --git a/nano/node/transport/tcp_socket.hpp b/nano/node/transport/tcp_socket.hpp index 8180fb6941..c09448533b 100644 --- a/nano/node/transport/tcp_socket.hpp +++ b/nano/node/transport/tcp_socket.hpp @@ -42,10 +42,12 @@ class socket_queue final }; public: + using result_t = std::pair; + explicit socket_queue (std::size_t max_size); bool insert (buffer_t const &, callback_t, nano::transport::traffic_type); - std::optional pop (); + std::optional pop (); void clear (); std::size_t size (nano::transport::traffic_type) const; bool empty () const; diff --git a/nano/node/transport/traffic_type.cpp b/nano/node/transport/traffic_type.cpp new file mode 100644 index 0000000000..bfb12a6573 --- /dev/null +++ b/nano/node/transport/traffic_type.cpp @@ -0,0 +1,7 @@ +#include +#include + +nano::stat::detail nano::transport::to_stat_detail (nano::transport::traffic_type type) +{ + return nano::enum_util::cast (type); +} \ No newline at end of file diff --git a/nano/node/transport/traffic_type.hpp b/nano/node/transport/traffic_type.hpp index 308a9768eb..1f0914cb06 100644 --- a/nano/node/transport/traffic_type.hpp +++ b/nano/node/transport/traffic_type.hpp @@ -1,5 +1,7 @@ #pragma once +#include + namespace nano::transport { /** @@ -10,4 +12,6 @@ enum class traffic_type generic, bootstrap, // Ascending bootstrap (asc_pull_ack, asc_pull_req) traffic }; + +nano::stat::detail to_stat_detail (traffic_type); } \ No newline at end of file