From a742ab8f99273fab44c12a0b9428c468b00f4dbc Mon Sep 17 00:00:00 2001 From: Russel Waters Date: Tue, 11 Jun 2019 14:32:27 -0400 Subject: [PATCH] Bandwidth limiting (#2035) * bandwidth_limit in bytes added to config update v17 upgrade path update v17 config tests * bandwidth limiter class test to validate limiter * hook to nano::transport::channel update nano::transport::channel::send to send if message shouldnt be dropped otherwise increment drop stat and dont send * initialize array with brace-enclosed initializer * correctly 1.5Mb * update tests to account for correct math and full confirm_ack confirm_req blocks use 1.5 * 1024 * 1024 for bytes to Mb limit * update config test * formatting * update default value in error text * add logging message and convenience detail_raw_to_string for dropped data type and size include publish as it is vote traffic * should always log dropped messages, this could be improved with a logging config option * stuff under logging;packet logging * typo and log out limit on node start * remove duplicate convert to key before passing to now public detail_to_string * Readability and simplification * use static constexpr in function * unsigned int < 0 always false * add republish_vote to could_drop * trend rate over minimum of 1 sec simplify limiter add bool not_dropable default false to nano::transport::channel::send allowing calls to channel::send to drop * update test to validate ramp down to 0 rate again * merge master * remove unused variables in tests const limit as not changed after construction reverse send logic, default true adjust to exclude messages such as keepalives and bootstrap traffic * clean up tests and add multiple limiters simplify bandwidth limiter * formatting * fix merge issues flip logic on is_dropable * allow dropping of keep alives * whitespace * use send instead of send_buffer a couple more places --- nano/core_test/network.cpp | 54 ++++++++++++++++++++++++++ nano/core_test/node.cpp | 6 +++ nano/lib/stats.cpp | 3 ++ nano/lib/stats.hpp | 7 +++- nano/node/bootstrap.cpp | 12 ++++-- nano/node/network.cpp | 14 +++---- nano/node/node.cpp | 2 + nano/node/nodeconfig.cpp | 8 +++- nano/node/nodeconfig.hpp | 1 + nano/node/transport/transport.cpp | 64 +++++++++++++++++++++++++++++-- nano/node/transport/transport.hpp | 24 +++++++++++- nano/node/vote_processor.cpp | 2 +- 12 files changed, 176 insertions(+), 21 deletions(-) diff --git a/nano/core_test/network.cpp b/nano/core_test/network.cpp index e44c37bbee..e4be25bc04 100644 --- a/nano/core_test/network.cpp +++ b/nano/core_test/network.cpp @@ -2207,3 +2207,57 @@ TEST (network, replace_port) ASSERT_EQ (system.nodes[0]->network.udp_channels.size (), 0); node1->stop (); } + +TEST (bandwidth_limiter, validate) +{ + size_t const full_confirm_ack (488 + 8); + { + nano::bandwidth_limiter limiter_0 (0); + nano::bandwidth_limiter limiter_1 (1024); + nano::bandwidth_limiter limiter_256 (1024 * 256); + nano::bandwidth_limiter limiter_1024 (1024 * 1024); + nano::bandwidth_limiter limiter_1536 (1024 * 1536); + + auto now (std::chrono::steady_clock::now ()); + + while (now + 1s >= std::chrono::steady_clock::now ()) + { + ASSERT_FALSE (limiter_0.should_drop (full_confirm_ack)); // will never drop + ASSERT_TRUE (limiter_1.should_drop (full_confirm_ack)); // always drop as message > limit / rate_buffer.size () + limiter_256.should_drop (full_confirm_ack); + limiter_1024.should_drop (full_confirm_ack); + limiter_1536.should_drop (full_confirm_ack); + std::this_thread::sleep_for (10ms); + } + ASSERT_FALSE (limiter_0.should_drop (full_confirm_ack)); // will never drop + ASSERT_TRUE (limiter_1.should_drop (full_confirm_ack)); // always drop as message > limit / rate_buffer.size () + ASSERT_FALSE (limiter_256.should_drop (full_confirm_ack)); // as a second has passed counter is started and nothing is dropped + ASSERT_FALSE (limiter_1024.should_drop (full_confirm_ack)); // as a second has passed counter is started and nothing is dropped + ASSERT_FALSE (limiter_1536.should_drop (full_confirm_ack)); // as a second has passed counter is started and nothing is dropped + } + + { + nano::bandwidth_limiter limiter_0 (0); + nano::bandwidth_limiter limiter_1 (1024); + nano::bandwidth_limiter limiter_256 (1024 * 256); + nano::bandwidth_limiter limiter_1024 (1024 * 1024); + nano::bandwidth_limiter limiter_1536 (1024 * 1536); + + auto now (std::chrono::steady_clock::now ()); + //trend rate for 5 sec + while (now + 5s >= std::chrono::steady_clock::now ()) + { + ASSERT_FALSE (limiter_0.should_drop (full_confirm_ack)); // will never drop + ASSERT_TRUE (limiter_1.should_drop (full_confirm_ack)); // always drop as message > limit / rate_buffer.size () + limiter_256.should_drop (full_confirm_ack); + limiter_1024.should_drop (full_confirm_ack); + limiter_1536.should_drop (full_confirm_ack); + std::this_thread::sleep_for (50ms); + } + ASSERT_EQ (limiter_0.get_rate (), 0); //should be 0 as rate is not gathered if not needed + ASSERT_EQ (limiter_1.get_rate (), 0); //should be 0 since nothing is small enough to pass through is tracked + ASSERT_EQ (limiter_256.get_rate (), full_confirm_ack); //should be 0 since nothing is small enough to pass through is tracked + ASSERT_EQ (limiter_1024.get_rate (), full_confirm_ack); //should be 0 since nothing is small enough to pass through is tracked + ASSERT_EQ (limiter_1536.get_rate (), full_confirm_ack); //should be 0 since nothing is small enough to pass through is tracked + } +} diff --git a/nano/core_test/node.cpp b/nano/core_test/node.cpp index cb7a7c829c..359d15c5eb 100644 --- a/nano/core_test/node.cpp +++ b/nano/core_test/node.cpp @@ -714,6 +714,7 @@ TEST (node_config, v16_v17_upgrade) ASSERT_FALSE (tree.get_optional_child ("use_memory_pools")); ASSERT_FALSE (tree.get_optional_child ("confirmation_history_size")); ASSERT_FALSE (tree.get_optional_child ("active_elections_size")); + ASSERT_FALSE (tree.get_optional_child ("bandwidth_limit")); config.deserialize_json (upgraded, tree); // The config options should be added after the upgrade @@ -727,6 +728,7 @@ TEST (node_config, v16_v17_upgrade) ASSERT_TRUE (!!tree.get_optional_child ("use_memory_pools")); ASSERT_TRUE (!!tree.get_optional_child ("confirmation_history_size")); ASSERT_TRUE (!!tree.get_optional_child ("active_elections_size")); + ASSERT_TRUE (!!tree.get_optional_child ("bandwidth_limit")); ASSERT_TRUE (upgraded); auto version (tree.get ("version")); @@ -764,6 +766,7 @@ TEST (node_config, v17_values) tree.put ("use_memory_pools", true); tree.put ("confirmation_history_size", 2048); tree.put ("active_elections_size", 8000); + tree.put ("bandwidth_limit", 1572864); } config.deserialize_json (upgraded, tree); @@ -780,6 +783,7 @@ TEST (node_config, v17_values) ASSERT_TRUE (config.use_memory_pools); ASSERT_EQ (config.confirmation_history_size, 2048); ASSERT_EQ (config.active_elections_size, 8000); + ASSERT_EQ (config.bandwidth_limit, 1572864); // Check config is correct with other values tree.put ("tcp_io_timeout", std::numeric_limits::max () - 100); @@ -799,6 +803,7 @@ TEST (node_config, v17_values) tree.put ("use_memory_pools", false); tree.put ("confirmation_history_size", std::numeric_limits::max ()); tree.put ("active_elections_size", std::numeric_limits::max ()); + tree.put ("bandwidth_limit", std::numeric_limits::max ()); upgraded = false; config.deserialize_json (upgraded, tree); @@ -817,6 +822,7 @@ TEST (node_config, v17_values) ASSERT_FALSE (config.use_memory_pools); ASSERT_EQ (config.confirmation_history_size, std::numeric_limits::max ()); ASSERT_EQ (config.active_elections_size, std::numeric_limits::max ()); + ASSERT_EQ (config.bandwidth_limit, std::numeric_limits::max ()); } // Regression test to ensure that deserializing includes changes node via get_required_child diff --git a/nano/lib/stats.cpp b/nano/lib/stats.cpp index 45a07036fe..68b0c4d707 100644 --- a/nano/lib/stats.cpp +++ b/nano/lib/stats.cpp @@ -379,6 +379,9 @@ std::string nano::stat::type_to_string (uint32_t key) break; case nano::stat::type::confirmation_height: res = "confirmation_height"; + break; + case nano::stat::type::drop: + res = "drop"; } return res; } diff --git a/nano/lib/stats.hpp b/nano/lib/stats.hpp index c4c12df7cf..0860a7a27a 100644 --- a/nano/lib/stats.hpp +++ b/nano/lib/stats.hpp @@ -232,7 +232,8 @@ class stat final ipc, tcp, udp, - confirmation_height + confirmation_height, + drop }; /** Optional detail type */ @@ -457,12 +458,14 @@ class stat final /** Returns a new JSON log sink */ std::unique_ptr log_sink_json () const; + /** Returns string representation of detail */ + static std::string detail_to_string (uint32_t key); + /** Stop stats being output */ void stop (); private: static std::string type_to_string (uint32_t key); - static std::string detail_to_string (uint32_t key); static std::string dir_to_string (uint32_t key); /** Constructs a key given type, detail and direction. This is used as input to update(...) and get_entry(...) */ diff --git a/nano/node/bootstrap.cpp b/nano/node/bootstrap.cpp index 0f14ee0fb3..28231a3100 100644 --- a/nano/node/bootstrap.cpp +++ b/nano/node/bootstrap.cpp @@ -79,7 +79,8 @@ void nano::frontier_req_client::run () this_l->connection->node->logger.try_log (boost::str (boost::format ("Error while sending bootstrap request %1%") % ec.message ())); } } - }); + }, + false); // is bootstrap traffic is_dropable false } std::shared_ptr nano::bootstrap_client::shared () @@ -346,7 +347,8 @@ void nano::bulk_pull_client::request () } this_l->connection->node->stats.inc (nano::stat::type::bootstrap, nano::stat::detail::bulk_pull_request_failure, nano::stat::dir::in); } - }); + }, + false); // is bootstrap traffic is_dropable false } void nano::bulk_pull_client::receive_block () @@ -531,7 +533,8 @@ void nano::bulk_push_client::start () this_l->connection->node->logger.try_log (boost::str (boost::format ("Unable to send bulk_push request: %1%") % ec.message ())); } } - }); + }, + false); // is bootstrap traffic is_dropable false } void nano::bulk_push_client::push (nano::transaction const & transaction_a) @@ -668,7 +671,8 @@ void nano::bulk_pull_account_client::request () } this_l->connection->node->stats.inc (nano::stat::type::bootstrap, nano::stat::detail::bulk_pull_error_starting_request, nano::stat::dir::in); } - }); + }, + false); // is bootstrap traffic is_dropable false } void nano::bulk_pull_account_client::receive_pending () diff --git a/nano/node/network.cpp b/nano/node/network.cpp index fda573cf96..df3ae872f4 100644 --- a/nano/node/network.cpp +++ b/nano/node/network.cpp @@ -140,10 +140,9 @@ bool confirm_block (nano::transaction const & transaction_a, nano::node & node_a result = true; auto vote (node_a.store.vote_generate (transaction_a, pub_a, prv_a, std::vector (1, hash))); nano::confirm_ack confirm (vote); - auto vote_bytes = confirm.to_bytes (); for (auto j (list_a.begin ()), m (list_a.end ()); j != m; ++j) { - j->get ()->send_buffer (vote_bytes, nano::stat::detail::confirm_ack); + j->get ()->send (confirm); } node_a.votes_cache.add (vote); }); @@ -154,10 +153,9 @@ bool confirm_block (nano::transaction const & transaction_a, nano::node & node_a for (auto & vote : votes) { nano::confirm_ack confirm (vote); - auto vote_bytes = confirm.to_bytes (); for (auto j (list_a.begin ()), m (list_a.end ()); j != m; ++j) { - j->get ()->send_buffer (vote_bytes, nano::stat::detail::confirm_ack); + j->get ()->send (confirm); } } } @@ -165,10 +163,9 @@ bool confirm_block (nano::transaction const & transaction_a, nano::node & node_a if (also_publish) { nano::publish publish (block_a); - auto publish_bytes (publish.to_bytes ()); for (auto j (list_a.begin ()), m (list_a.end ()); j != m; ++j) { - j->get ()->send_buffer (publish_bytes, nano::stat::detail::publish); + j->get ()->send (publish); } } } @@ -194,7 +191,7 @@ void nano::network::confirm_hashes (nano::transaction const & transaction_a, std nano::vectorstream stream (*bytes); confirm.serialize (stream); } - channel_a->send_buffer (bytes, nano::stat::detail::confirm_ack); + channel_a->send (confirm); this->node.votes_cache.add (vote); }); } @@ -208,8 +205,7 @@ bool nano::network::send_votes_cache (std::shared_ptr for (auto & vote : votes) { nano::confirm_ack confirm (vote); - auto vote_bytes = confirm.to_bytes (); - channel_a->send_buffer (vote_bytes, nano::stat::detail::confirm_ack); + channel_a->send (confirm); } // Returns true if votes were sent bool result (!votes.empty ()); diff --git a/nano/node/node.cpp b/nano/node/node.cpp index adac2bad18..fe51432643 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -436,6 +436,8 @@ startup_time (std::chrono::steady_clock::now ()) logger.always_log ("Constructing node"); } + logger.always_log (boost::str (boost::format ("Outbound Voting Bandwidth limited to %1% bytes per second") % config.bandwidth_limit)); + // First do a pass with a read to see if any writing needs doing, this saves needing to open a write lock (and potentially blocking) auto is_initialized (false); { diff --git a/nano/node/nodeconfig.cpp b/nano/node/nodeconfig.cpp index e1b817d646..4de9fbae68 100644 --- a/nano/node/nodeconfig.cpp +++ b/nano/node/nodeconfig.cpp @@ -132,6 +132,7 @@ nano::error nano::node_config::serialize_json (nano::jsonconfig & json) const json.put_child ("diagnostics", diagnostics_l); json.put ("confirmation_history_size", confirmation_history_size); json.put ("active_elections_size", active_elections_size); + json.put ("bandwidth_limit", bandwidth_limit); return json.get_error (); } @@ -251,6 +252,7 @@ bool nano::node_config::upgrade_json (unsigned version_a, nano::jsonconfig & jso json.put ("use_memory_pools", use_memory_pools); json.put ("confirmation_history_size", confirmation_history_size); json.put ("active_elections_size", active_elections_size); + json.put ("bandwidth_limit", bandwidth_limit); } case 17: break; @@ -399,8 +401,8 @@ nano::error nano::node_config::deserialize_json (bool & upgraded_a, nano::jsonco pow_sleep_interval = std::chrono::nanoseconds (pow_sleep_interval_l); json.get ("use_memory_pools", use_memory_pools); json.get ("confirmation_history_size", confirmation_history_size); - json.get ("active_elections_size", active_elections_size); + json.get ("bandwidth_limit", bandwidth_limit); nano::network_params network; // Validate ranges if (online_weight_quorum > 100) @@ -419,6 +421,10 @@ nano::error nano::node_config::deserialize_json (bool & upgraded_a, nano::jsonco { json.get_error ().set ("active_elections_size must be grater than 250"); } + if (bandwidth_limit > std::numeric_limits::max ()) + { + json.get_error ().set ("bandwidth_limit unbounded = 0, default = 1572864, max = 18446744073709551615"); + } } catch (std::runtime_error const & ex) { diff --git a/nano/node/nodeconfig.hpp b/nano/node/nodeconfig.hpp index 0d6b70c2ce..d00d7e8c79 100644 --- a/nano/node/nodeconfig.hpp +++ b/nano/node/nodeconfig.hpp @@ -74,6 +74,7 @@ class node_config static std::chrono::seconds constexpr keepalive_period = std::chrono::seconds (60); static std::chrono::seconds constexpr keepalive_cutoff = keepalive_period * 5; static std::chrono::minutes constexpr wallet_backup_interval = std::chrono::minutes (5); + size_t bandwidth_limit{ 1536 * 1024 }; static int json_version () { return 17; diff --git a/nano/node/transport/transport.cpp b/nano/node/transport/transport.cpp index 38c6703e40..66d7c6bf87 100644 --- a/nano/node/transport/transport.cpp +++ b/nano/node/transport/transport.cpp @@ -2,6 +2,8 @@ #include #include +#include + namespace { class callback_visitor : public nano::message_visitor @@ -68,18 +70,31 @@ nano::tcp_endpoint nano::transport::map_endpoint_to_tcp (nano::endpoint const & } nano::transport::channel::channel (nano::node & node_a) : +limiter (node_a.config.bandwidth_limit), node (node_a) { } -void nano::transport::channel::send (nano::message const & message_a, std::function const & callback_a) +void nano::transport::channel::send (nano::message const & message_a, std::function const & callback_a, bool const & is_dropable) { callback_visitor visitor; message_a.visit (visitor); auto buffer (message_a.to_bytes ()); auto detail (visitor.result); - send_buffer (buffer, detail, callback_a); - node.stats.inc (nano::stat::type::message, detail, nano::stat::dir::out); + if (!is_dropable || !limiter.should_drop (buffer->size ())) + { + send_buffer (buffer, detail, callback_a); + node.stats.inc (nano::stat::type::message, detail, nano::stat::dir::out); + } + else + { + node.stats.inc (nano::stat::type::drop, detail, nano::stat::dir::out); + if (node.config.logging.network_packet_logging ()) + { + auto key = static_cast (detail) << 8; + node.logger.always_log (boost::str (boost::format ("%1% of size %2% dropped") % node.stats.detail_to_string (key) % buffer->size ())); + } + } } namespace @@ -188,3 +203,46 @@ bool nano::transport::reserved_address (nano::endpoint const & endpoint_a, bool } return result; } + +using namespace std::chrono_literals; + +nano::bandwidth_limiter::bandwidth_limiter (const size_t limit_a) : +next_trend (std::chrono::steady_clock::now () + 50ms), +limit (limit_a), +rate (0), +trended_rate (0) +{ +} + +bool nano::bandwidth_limiter::should_drop (const size_t & message_size) +{ + bool result (false); + if (limit == 0) //never drop if limit is 0 + { + return result; + } + std::lock_guard lock (mutex); + + if (message_size > limit / rate_buffer.size () || rate + message_size > limit) + { + result = true; + } + else + { + rate = rate + message_size; + } + if (next_trend < std::chrono::steady_clock::now ()) + { + next_trend = std::chrono::steady_clock::now () + 50ms; + rate_buffer.push_back (rate); + trended_rate = std::accumulate (rate_buffer.begin (), rate_buffer.end (), 0) / rate_buffer.size (); + rate = 0; + } + return result; +} + +size_t nano::bandwidth_limiter::get_rate () +{ + std::lock_guard lock (mutex); + return trended_rate; +} diff --git a/nano/node/transport/transport.hpp b/nano/node/transport/transport.hpp index 32fb395b15..539bce14ed 100644 --- a/nano/node/transport/transport.hpp +++ b/nano/node/transport/transport.hpp @@ -8,6 +8,27 @@ namespace nano { +class bandwidth_limiter final +{ +public: + // initialize with rate 0 = unbounded + bandwidth_limiter (const size_t); + bool should_drop (const size_t &); + size_t get_rate (); + +private: + //last time rate was adjusted + std::chrono::steady_clock::time_point next_trend; + //trend rate over 20 poll periods + boost::circular_buffer rate_buffer{ 20, 0 }; + //limit bandwidth to + const size_t limit; + //rate, increment if message_size + rate < rate + size_t rate; + //trended rate to even out spikes in traffic + size_t trended_rate; + std::mutex mutex; +}; namespace transport { class message; @@ -32,7 +53,7 @@ namespace transport virtual ~channel () = default; virtual size_t hash_code () const = 0; virtual bool operator== (nano::transport::channel const &) const = 0; - void send (nano::message const &, std::function const & = nullptr); + void send (nano::message const &, std::function const & = nullptr, bool const & = true); virtual void send_buffer (std::shared_ptr>, nano::stat::detail, std::function const & = nullptr) = 0; virtual std::function callback (std::shared_ptr>, nano::stat::detail, std::function const & = nullptr) const = 0; virtual std::string to_string () const = 0; @@ -99,6 +120,7 @@ namespace transport } mutable std::mutex channel_mutex; + nano::bandwidth_limiter limiter; private: std::chrono::steady_clock::time_point last_bootstrap_attempt{ std::chrono::steady_clock::time_point () }; diff --git a/nano/node/vote_processor.cpp b/nano/node/vote_processor.cpp index fe6c2aadd4..f9c063efda 100644 --- a/nano/node/vote_processor.cpp +++ b/nano/node/vote_processor.cpp @@ -214,7 +214,7 @@ nano::vote_code nano::vote_processor::vote_blocking (nano::transaction const & t if (max_vote->sequence > vote_a->sequence + 10000) { nano::confirm_ack confirm (max_vote); - channel_a->send_buffer (confirm.to_bytes (), nano::stat::detail::confirm_ack); + channel_a->send (confirm); // this is non essential traffic as it will be resolicited if not received } break; case nano::vote_code::invalid: