Skip to content

Commit

Permalink
Bandwidth limiting (#2035)
Browse files Browse the repository at this point in the history
* bandwidth_limit in bytes added to config
update v17 upgrade path
update v17 config tests

* bandwidth limiter class
test to validate limiter

* hook to nano::transport::channel
update nano::transport::channel::send to send if message shouldnt be dropped
otherwise increment drop stat and dont send

* initialize array with brace-enclosed initializer

* correctly 1.5Mb

* update tests to account for correct math and full confirm_ack confirm_req blocks
use 1.5 * 1024 * 1024 for bytes to Mb limit

* update config test

* formatting

* update default value in error text

* add logging message and convenience detail_raw_to_string for dropped data type and size
include publish as it is vote traffic

* should always log dropped messages, this could be improved with a logging config option

* stuff under logging;packet logging

* typo and log out limit on node start

* remove duplicate
convert to key before passing to now public detail_to_string

* Readability and simplification

* use static constexpr in function

* unsigned int < 0 always false

* add republish_vote to could_drop

* trend rate over minimum of 1 sec
simplify limiter
add bool not_dropable default false to nano::transport::channel::send allowing calls to channel::send to drop

* update test to validate ramp down to 0 rate again

* merge master

* remove unused variables in tests
const limit as not changed after construction
reverse send logic, default true
adjust to exclude messages such as keepalives and bootstrap traffic

* clean up tests and add multiple limiters
simplify bandwidth limiter

* formatting

* fix merge issues
flip logic on is_dropable

* allow dropping of keep alives

* whitespace

* use send instead of send_buffer a couple more places
  • Loading branch information
Russel Waters committed Jun 11, 2019
1 parent 3b75102 commit a742ab8
Show file tree
Hide file tree
Showing 12 changed files with 176 additions and 21 deletions.
54 changes: 54 additions & 0 deletions nano/core_test/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2207,3 +2207,57 @@ TEST (network, replace_port)
ASSERT_EQ (system.nodes[0]->network.udp_channels.size (), 0);
node1->stop ();
}

TEST (bandwidth_limiter, validate)
{
size_t const full_confirm_ack (488 + 8);
{
nano::bandwidth_limiter limiter_0 (0);
nano::bandwidth_limiter limiter_1 (1024);
nano::bandwidth_limiter limiter_256 (1024 * 256);
nano::bandwidth_limiter limiter_1024 (1024 * 1024);
nano::bandwidth_limiter limiter_1536 (1024 * 1536);

auto now (std::chrono::steady_clock::now ());

while (now + 1s >= std::chrono::steady_clock::now ())
{
ASSERT_FALSE (limiter_0.should_drop (full_confirm_ack)); // will never drop
ASSERT_TRUE (limiter_1.should_drop (full_confirm_ack)); // always drop as message > limit / rate_buffer.size ()
limiter_256.should_drop (full_confirm_ack);
limiter_1024.should_drop (full_confirm_ack);
limiter_1536.should_drop (full_confirm_ack);
std::this_thread::sleep_for (10ms);
}
ASSERT_FALSE (limiter_0.should_drop (full_confirm_ack)); // will never drop
ASSERT_TRUE (limiter_1.should_drop (full_confirm_ack)); // always drop as message > limit / rate_buffer.size ()
ASSERT_FALSE (limiter_256.should_drop (full_confirm_ack)); // as a second has passed counter is started and nothing is dropped
ASSERT_FALSE (limiter_1024.should_drop (full_confirm_ack)); // as a second has passed counter is started and nothing is dropped
ASSERT_FALSE (limiter_1536.should_drop (full_confirm_ack)); // as a second has passed counter is started and nothing is dropped
}

{
nano::bandwidth_limiter limiter_0 (0);
nano::bandwidth_limiter limiter_1 (1024);
nano::bandwidth_limiter limiter_256 (1024 * 256);
nano::bandwidth_limiter limiter_1024 (1024 * 1024);
nano::bandwidth_limiter limiter_1536 (1024 * 1536);

auto now (std::chrono::steady_clock::now ());
//trend rate for 5 sec
while (now + 5s >= std::chrono::steady_clock::now ())
{
ASSERT_FALSE (limiter_0.should_drop (full_confirm_ack)); // will never drop
ASSERT_TRUE (limiter_1.should_drop (full_confirm_ack)); // always drop as message > limit / rate_buffer.size ()
limiter_256.should_drop (full_confirm_ack);
limiter_1024.should_drop (full_confirm_ack);
limiter_1536.should_drop (full_confirm_ack);
std::this_thread::sleep_for (50ms);
}
ASSERT_EQ (limiter_0.get_rate (), 0); //should be 0 as rate is not gathered if not needed
ASSERT_EQ (limiter_1.get_rate (), 0); //should be 0 since nothing is small enough to pass through is tracked
ASSERT_EQ (limiter_256.get_rate (), full_confirm_ack); //should be 0 since nothing is small enough to pass through is tracked
ASSERT_EQ (limiter_1024.get_rate (), full_confirm_ack); //should be 0 since nothing is small enough to pass through is tracked
ASSERT_EQ (limiter_1536.get_rate (), full_confirm_ack); //should be 0 since nothing is small enough to pass through is tracked
}
}
6 changes: 6 additions & 0 deletions nano/core_test/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -714,6 +714,7 @@ TEST (node_config, v16_v17_upgrade)
ASSERT_FALSE (tree.get_optional_child ("use_memory_pools"));
ASSERT_FALSE (tree.get_optional_child ("confirmation_history_size"));
ASSERT_FALSE (tree.get_optional_child ("active_elections_size"));
ASSERT_FALSE (tree.get_optional_child ("bandwidth_limit"));

config.deserialize_json (upgraded, tree);
// The config options should be added after the upgrade
Expand All @@ -727,6 +728,7 @@ TEST (node_config, v16_v17_upgrade)
ASSERT_TRUE (!!tree.get_optional_child ("use_memory_pools"));
ASSERT_TRUE (!!tree.get_optional_child ("confirmation_history_size"));
ASSERT_TRUE (!!tree.get_optional_child ("active_elections_size"));
ASSERT_TRUE (!!tree.get_optional_child ("bandwidth_limit"));

ASSERT_TRUE (upgraded);
auto version (tree.get<std::string> ("version"));
Expand Down Expand Up @@ -764,6 +766,7 @@ TEST (node_config, v17_values)
tree.put ("use_memory_pools", true);
tree.put ("confirmation_history_size", 2048);
tree.put ("active_elections_size", 8000);
tree.put ("bandwidth_limit", 1572864);
}

config.deserialize_json (upgraded, tree);
Expand All @@ -780,6 +783,7 @@ TEST (node_config, v17_values)
ASSERT_TRUE (config.use_memory_pools);
ASSERT_EQ (config.confirmation_history_size, 2048);
ASSERT_EQ (config.active_elections_size, 8000);
ASSERT_EQ (config.bandwidth_limit, 1572864);

// Check config is correct with other values
tree.put ("tcp_io_timeout", std::numeric_limits<unsigned long>::max () - 100);
Expand All @@ -799,6 +803,7 @@ TEST (node_config, v17_values)
tree.put ("use_memory_pools", false);
tree.put ("confirmation_history_size", std::numeric_limits<unsigned long long>::max ());
tree.put ("active_elections_size", std::numeric_limits<unsigned long long>::max ());
tree.put ("bandwidth_limit", std::numeric_limits<size_t>::max ());

upgraded = false;
config.deserialize_json (upgraded, tree);
Expand All @@ -817,6 +822,7 @@ TEST (node_config, v17_values)
ASSERT_FALSE (config.use_memory_pools);
ASSERT_EQ (config.confirmation_history_size, std::numeric_limits<unsigned long long>::max ());
ASSERT_EQ (config.active_elections_size, std::numeric_limits<unsigned long long>::max ());
ASSERT_EQ (config.bandwidth_limit, std::numeric_limits<size_t>::max ());
}

// Regression test to ensure that deserializing includes changes node via get_required_child
Expand Down
3 changes: 3 additions & 0 deletions nano/lib/stats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,9 @@ std::string nano::stat::type_to_string (uint32_t key)
break;
case nano::stat::type::confirmation_height:
res = "confirmation_height";
break;
case nano::stat::type::drop:
res = "drop";
}
return res;
}
Expand Down
7 changes: 5 additions & 2 deletions nano/lib/stats.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,8 @@ class stat final
ipc,
tcp,
udp,
confirmation_height
confirmation_height,
drop
};

/** Optional detail type */
Expand Down Expand Up @@ -457,12 +458,14 @@ class stat final
/** Returns a new JSON log sink */
std::unique_ptr<stat_log_sink> log_sink_json () const;

/** Returns string representation of detail */
static std::string detail_to_string (uint32_t key);

/** Stop stats being output */
void stop ();

private:
static std::string type_to_string (uint32_t key);
static std::string detail_to_string (uint32_t key);
static std::string dir_to_string (uint32_t key);

/** Constructs a key given type, detail and direction. This is used as input to update(...) and get_entry(...) */
Expand Down
12 changes: 8 additions & 4 deletions nano/node/bootstrap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ void nano::frontier_req_client::run ()
this_l->connection->node->logger.try_log (boost::str (boost::format ("Error while sending bootstrap request %1%") % ec.message ()));
}
}
});
},
false); // is bootstrap traffic is_dropable false
}

std::shared_ptr<nano::bootstrap_client> nano::bootstrap_client::shared ()
Expand Down Expand Up @@ -346,7 +347,8 @@ void nano::bulk_pull_client::request ()
}
this_l->connection->node->stats.inc (nano::stat::type::bootstrap, nano::stat::detail::bulk_pull_request_failure, nano::stat::dir::in);
}
});
},
false); // is bootstrap traffic is_dropable false
}

void nano::bulk_pull_client::receive_block ()
Expand Down Expand Up @@ -531,7 +533,8 @@ void nano::bulk_push_client::start ()
this_l->connection->node->logger.try_log (boost::str (boost::format ("Unable to send bulk_push request: %1%") % ec.message ()));
}
}
});
},
false); // is bootstrap traffic is_dropable false
}

void nano::bulk_push_client::push (nano::transaction const & transaction_a)
Expand Down Expand Up @@ -668,7 +671,8 @@ void nano::bulk_pull_account_client::request ()
}
this_l->connection->node->stats.inc (nano::stat::type::bootstrap, nano::stat::detail::bulk_pull_error_starting_request, nano::stat::dir::in);
}
});
},
false); // is bootstrap traffic is_dropable false
}

void nano::bulk_pull_account_client::receive_pending ()
Expand Down
14 changes: 5 additions & 9 deletions nano/node/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,9 @@ bool confirm_block (nano::transaction const & transaction_a, nano::node & node_a
result = true;
auto vote (node_a.store.vote_generate (transaction_a, pub_a, prv_a, std::vector<nano::block_hash> (1, hash)));
nano::confirm_ack confirm (vote);
auto vote_bytes = confirm.to_bytes ();
for (auto j (list_a.begin ()), m (list_a.end ()); j != m; ++j)
{
j->get ()->send_buffer (vote_bytes, nano::stat::detail::confirm_ack);
j->get ()->send (confirm);
}
node_a.votes_cache.add (vote);
});
Expand All @@ -154,21 +153,19 @@ bool confirm_block (nano::transaction const & transaction_a, nano::node & node_a
for (auto & vote : votes)
{
nano::confirm_ack confirm (vote);
auto vote_bytes = confirm.to_bytes ();
for (auto j (list_a.begin ()), m (list_a.end ()); j != m; ++j)
{
j->get ()->send_buffer (vote_bytes, nano::stat::detail::confirm_ack);
j->get ()->send (confirm);
}
}
}
// Republish if required
if (also_publish)
{
nano::publish publish (block_a);
auto publish_bytes (publish.to_bytes ());
for (auto j (list_a.begin ()), m (list_a.end ()); j != m; ++j)
{
j->get ()->send_buffer (publish_bytes, nano::stat::detail::publish);
j->get ()->send (publish);
}
}
}
Expand All @@ -194,7 +191,7 @@ void nano::network::confirm_hashes (nano::transaction const & transaction_a, std
nano::vectorstream stream (*bytes);
confirm.serialize (stream);
}
channel_a->send_buffer (bytes, nano::stat::detail::confirm_ack);
channel_a->send (confirm);
this->node.votes_cache.add (vote);
});
}
Expand All @@ -208,8 +205,7 @@ bool nano::network::send_votes_cache (std::shared_ptr<nano::transport::channel>
for (auto & vote : votes)
{
nano::confirm_ack confirm (vote);
auto vote_bytes = confirm.to_bytes ();
channel_a->send_buffer (vote_bytes, nano::stat::detail::confirm_ack);
channel_a->send (confirm);
}
// Returns true if votes were sent
bool result (!votes.empty ());
Expand Down
2 changes: 2 additions & 0 deletions nano/node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,8 @@ startup_time (std::chrono::steady_clock::now ())
logger.always_log ("Constructing node");
}

logger.always_log (boost::str (boost::format ("Outbound Voting Bandwidth limited to %1% bytes per second") % config.bandwidth_limit));

// First do a pass with a read to see if any writing needs doing, this saves needing to open a write lock (and potentially blocking)
auto is_initialized (false);
{
Expand Down
8 changes: 7 additions & 1 deletion nano/node/nodeconfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ nano::error nano::node_config::serialize_json (nano::jsonconfig & json) const
json.put_child ("diagnostics", diagnostics_l);
json.put ("confirmation_history_size", confirmation_history_size);
json.put ("active_elections_size", active_elections_size);
json.put ("bandwidth_limit", bandwidth_limit);

return json.get_error ();
}
Expand Down Expand Up @@ -251,6 +252,7 @@ bool nano::node_config::upgrade_json (unsigned version_a, nano::jsonconfig & jso
json.put ("use_memory_pools", use_memory_pools);
json.put ("confirmation_history_size", confirmation_history_size);
json.put ("active_elections_size", active_elections_size);
json.put ("bandwidth_limit", bandwidth_limit);
}
case 17:
break;
Expand Down Expand Up @@ -399,8 +401,8 @@ nano::error nano::node_config::deserialize_json (bool & upgraded_a, nano::jsonco
pow_sleep_interval = std::chrono::nanoseconds (pow_sleep_interval_l);
json.get<bool> ("use_memory_pools", use_memory_pools);
json.get<size_t> ("confirmation_history_size", confirmation_history_size);

json.get<size_t> ("active_elections_size", active_elections_size);
json.get<size_t> ("bandwidth_limit", bandwidth_limit);
nano::network_params network;
// Validate ranges
if (online_weight_quorum > 100)
Expand All @@ -419,6 +421,10 @@ nano::error nano::node_config::deserialize_json (bool & upgraded_a, nano::jsonco
{
json.get_error ().set ("active_elections_size must be grater than 250");
}
if (bandwidth_limit > std::numeric_limits<size_t>::max ())
{
json.get_error ().set ("bandwidth_limit unbounded = 0, default = 1572864, max = 18446744073709551615");
}
}
catch (std::runtime_error const & ex)
{
Expand Down
1 change: 1 addition & 0 deletions nano/node/nodeconfig.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ class node_config
static std::chrono::seconds constexpr keepalive_period = std::chrono::seconds (60);
static std::chrono::seconds constexpr keepalive_cutoff = keepalive_period * 5;
static std::chrono::minutes constexpr wallet_backup_interval = std::chrono::minutes (5);
size_t bandwidth_limit{ 1536 * 1024 };
static int json_version ()
{
return 17;
Expand Down
64 changes: 61 additions & 3 deletions nano/node/transport/transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
#include <nano/node/node.hpp>
#include <nano/node/transport/transport.hpp>

#include <numeric>

namespace
{
class callback_visitor : public nano::message_visitor
Expand Down Expand Up @@ -68,18 +70,31 @@ nano::tcp_endpoint nano::transport::map_endpoint_to_tcp (nano::endpoint const &
}

nano::transport::channel::channel (nano::node & node_a) :
limiter (node_a.config.bandwidth_limit),
node (node_a)
{
}

void nano::transport::channel::send (nano::message const & message_a, std::function<void(boost::system::error_code const &, size_t)> const & callback_a)
void nano::transport::channel::send (nano::message const & message_a, std::function<void(boost::system::error_code const &, size_t)> const & callback_a, bool const & is_dropable)
{
callback_visitor visitor;
message_a.visit (visitor);
auto buffer (message_a.to_bytes ());
auto detail (visitor.result);
send_buffer (buffer, detail, callback_a);
node.stats.inc (nano::stat::type::message, detail, nano::stat::dir::out);
if (!is_dropable || !limiter.should_drop (buffer->size ()))
{
send_buffer (buffer, detail, callback_a);
node.stats.inc (nano::stat::type::message, detail, nano::stat::dir::out);
}
else
{
node.stats.inc (nano::stat::type::drop, detail, nano::stat::dir::out);
if (node.config.logging.network_packet_logging ())
{
auto key = static_cast<uint8_t> (detail) << 8;
node.logger.always_log (boost::str (boost::format ("%1% of size %2% dropped") % node.stats.detail_to_string (key) % buffer->size ()));
}
}
}

namespace
Expand Down Expand Up @@ -188,3 +203,46 @@ bool nano::transport::reserved_address (nano::endpoint const & endpoint_a, bool
}
return result;
}

using namespace std::chrono_literals;

nano::bandwidth_limiter::bandwidth_limiter (const size_t limit_a) :
next_trend (std::chrono::steady_clock::now () + 50ms),
limit (limit_a),
rate (0),
trended_rate (0)
{
}

bool nano::bandwidth_limiter::should_drop (const size_t & message_size)
{
bool result (false);
if (limit == 0) //never drop if limit is 0
{
return result;
}
std::lock_guard<std::mutex> lock (mutex);

if (message_size > limit / rate_buffer.size () || rate + message_size > limit)
{
result = true;
}
else
{
rate = rate + message_size;
}
if (next_trend < std::chrono::steady_clock::now ())
{
next_trend = std::chrono::steady_clock::now () + 50ms;
rate_buffer.push_back (rate);
trended_rate = std::accumulate (rate_buffer.begin (), rate_buffer.end (), 0) / rate_buffer.size ();
rate = 0;
}
return result;
}

size_t nano::bandwidth_limiter::get_rate ()
{
std::lock_guard<std::mutex> lock (mutex);
return trended_rate;
}
Loading

0 comments on commit a742ab8

Please sign in to comment.