diff --git a/nano/lib/stats_enums.hpp b/nano/lib/stats_enums.hpp index 00bc95872f..3a40ae01fd 100644 --- a/nano/lib/stats_enums.hpp +++ b/nano/lib/stats_enums.hpp @@ -32,6 +32,7 @@ enum class type : uint8_t vote_cache, hinting, blockprocessor, + block_broadcaster, bootstrap_server, active, active_started, @@ -294,6 +295,10 @@ enum class detail : uint8_t deprioritize, deprioritize_failed, + // block_broadcaster + broadcast_normal, + broadcast_aggressive, + _last // Must be the last enum }; diff --git a/nano/lib/thread_roles.cpp b/nano/lib/thread_roles.cpp index 0824d23733..197192c4f4 100644 --- a/nano/lib/thread_roles.cpp +++ b/nano/lib/thread_roles.cpp @@ -25,6 +25,9 @@ std::string nano::thread_role::get_string (nano::thread_role::name role) case nano::thread_role::name::block_processing: thread_role_name_string = "Blck processing"; break; + case nano::thread_role::name::block_broadcasting: + thread_role_name_string = "Blck broadcast"; + break; case nano::thread_role::name::request_loop: thread_role_name_string = "Request loop"; break; diff --git a/nano/lib/thread_roles.hpp b/nano/lib/thread_roles.hpp index 311ae58d1b..aba62feb4a 100644 --- a/nano/lib/thread_roles.hpp +++ b/nano/lib/thread_roles.hpp @@ -15,6 +15,7 @@ enum class name packet_processing, vote_processing, block_processing, + block_broadcasting, request_loop, wallet_actions, bootstrap_initiator, diff --git a/nano/lib/threading.hpp b/nano/lib/threading.hpp index 17b74a4cb2..a70738eb1f 100644 --- a/nano/lib/threading.hpp +++ b/nano/lib/threading.hpp @@ -1,5 +1,6 @@ #pragma once +#include #include #include diff --git a/nano/node/block_broadcast.cpp b/nano/node/block_broadcast.cpp index 20823c646f..9962cc32a1 100644 --- a/nano/node/block_broadcast.cpp +++ b/nano/node/block_broadcast.cpp @@ -1,13 +1,17 @@ +#include +#include #include #include #include #include -nano::block_broadcast::block_broadcast (nano::block_processor & block_processor, nano::network & network, nano::block_arrival & block_arrival, bool enabled) : - block_processor{ block_processor }, - network{ network }, - block_arrival{ block_arrival }, - enabled{ enabled } +nano::block_broadcast::block_broadcast (nano::block_processor & block_processor_a, nano::network & network_a, nano::block_arrival & block_arrival_a, nano::stats & stats_a, bool enabled_a) : + block_processor{ block_processor_a }, + network{ network_a }, + block_arrival{ block_arrival_a }, + stats{ stats_a }, + enabled{ enabled_a }, + queue{ stats_a, nano::stat::type::block_broadcaster, nano::thread_role::name::block_broadcasting, /* single thread */ 1, max_size } { if (!enabled) { @@ -25,6 +29,28 @@ nano::block_broadcast::block_broadcast (nano::block_processor & block_processor, } local.erase (block->hash ()); }); + + queue.process_batch = [this] (auto & batch) { + process_batch (batch); + }; +} + +nano::block_broadcast::~block_broadcast () +{ +} + +void nano::block_broadcast::start () +{ + if (!enabled) + { + return; + } + queue.start (); +} + +void nano::block_broadcast::stop () +{ + queue.stop (); } void nano::block_broadcast::observe (std::shared_ptr const & block) @@ -34,14 +60,14 @@ void nano::block_broadcast::observe (std::shared_ptr const & block) { // Block created on this node // Perform more agressive initial flooding - network.flood_block_initial (block); + queue.add (entry{ block, broadcast_strategy::aggressive }); } else { if (block_arrival.recent (block->hash ())) { // Block arrived from realtime traffic, do normal gossip. - network.flood_block (block, nano::transport::buffer_drop_policy::limiter); + queue.add (entry{ block, broadcast_strategy::normal }); } else { @@ -60,6 +86,28 @@ void nano::block_broadcast::track_local (nano::block_hash const & hash) local.add (hash); } +void nano::block_broadcast::process_batch (queue_t::batch_t & batch) +{ + for (auto & [block, strategy] : batch) + { + switch (strategy) + { + case broadcast_strategy::normal: + { + stats.inc (nano::stat::type::block_broadcaster, nano::stat::detail::broadcast_normal, nano::stat::dir::out); + network.flood_block (block, nano::transport::buffer_drop_policy::limiter); + } + break; + case broadcast_strategy::aggressive: + { + stats.inc (nano::stat::type::block_broadcaster, nano::stat::detail::broadcast_aggressive, nano::stat::dir::out); + network.flood_block_initial (block); + } + break; + } + } +} + /* * hash_tracker */ diff --git a/nano/node/block_broadcast.hpp b/nano/node/block_broadcast.hpp index 2609e697ed..c59b5e68a8 100644 --- a/nano/node/block_broadcast.hpp +++ b/nano/node/block_broadcast.hpp @@ -2,6 +2,7 @@ #include #include +#include #include #include @@ -28,8 +29,21 @@ namespace nano */ class block_broadcast { + enum class broadcast_strategy + { + normal, + aggressive, + }; + + using entry = std::pair, broadcast_strategy>; + using queue_t = nano::processing_queue; + public: - block_broadcast (nano::block_processor &, nano::network &, nano::block_arrival &, bool enabled = false); + block_broadcast (nano::block_processor &, nano::network &, nano::block_arrival &, nano::stats &, bool enabled = false); + ~block_broadcast (); + + void start (); + void stop (); // Mark a block as originating locally void track_local (nano::block_hash const &); @@ -38,13 +52,13 @@ class block_broadcast nano::block_processor & block_processor; nano::network & network; nano::block_arrival & block_arrival; + nano::stats & stats; private: // Block_processor observer void observe (std::shared_ptr const & block); - - void run (); - + void process_batch (queue_t::batch_t & batch); + private: class hash_tracker { @@ -73,13 +87,11 @@ class block_broadcast static std::size_t constexpr max_size = 1024 * 128; }; - hash_tracker local; bool enabled{ false }; - bool stopped{ false }; - nano::condition_variable condition; - mutable nano::mutex mutex; - std::thread thread; + queue_t queue; + + static std::size_t constexpr max_size = 1024 * 32; }; } diff --git a/nano/node/node.cpp b/nano/node/node.cpp index edbafcacf8..00986a02e3 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -196,7 +196,7 @@ nano::node::node (boost::asio::io_context & io_ctx_a, std::filesystem::path cons epoch_upgrader{ *this, ledger, store, network_params, logger }, startup_time (std::chrono::steady_clock::now ()), node_seq (seq), - block_broadcast{ block_processor, network, block_arrival, !flags.disable_block_processor_republishing }, + block_broadcast{ block_processor, network, block_arrival, stats, !flags.disable_block_processor_republishing }, gap_tracker{ gap_cache }, process_live_dispatcher{ ledger, scheduler.priority, vote_cache, websocket } { @@ -689,6 +689,7 @@ void nano::node::start () } websocket.start (); telemetry.start (); + block_broadcast.start (); } void nano::node::stop () @@ -711,6 +712,7 @@ void nano::node::stop () } unchecked.stop (); block_processor.stop (); + block_broadcast.stop (); aggregator.stop (); vote_processor.stop (); scheduler.stop ();