diff --git a/nano/lib/stats_enums.hpp b/nano/lib/stats_enums.hpp index 10cab361ba..8adcfedf1e 100644 --- a/nano/lib/stats_enums.hpp +++ b/nano/lib/stats_enums.hpp @@ -48,6 +48,7 @@ enum class type : uint8_t election_scheduler, optimistic_scheduler, handshake, + block_broadcaster, bootstrap_ascending, bootstrap_ascending_accounts, @@ -323,6 +324,12 @@ enum class detail : uint8_t deprioritize, deprioritize_failed, + // block broadcaster + broadcast_normal, + broadcast_aggressive, + erase_old, + erase_confirmed, + _last // Must be the last enum }; diff --git a/nano/lib/thread_roles.cpp b/nano/lib/thread_roles.cpp index 0824d23733..99caa097fe 100644 --- a/nano/lib/thread_roles.cpp +++ b/nano/lib/thread_roles.cpp @@ -100,6 +100,9 @@ std::string nano::thread_role::get_string (nano::thread_role::name role) case nano::thread_role::name::scheduler_priority: thread_role_name_string = "Sched Priority"; break; + case nano::thread_role::name::local_block_broadcasting: + thread_role_name_string = "Local broadcast"; + break; default: debug_assert (false && "nano::thread_role::get_string unhandled thread role"); } diff --git a/nano/lib/thread_roles.hpp b/nano/lib/thread_roles.hpp index 311ae58d1b..56848761ce 100644 --- a/nano/lib/thread_roles.hpp +++ b/nano/lib/thread_roles.hpp @@ -42,6 +42,7 @@ enum class name scheduler_manual, scheduler_optimistic, scheduler_priority, + local_block_broadcasting, }; /* diff --git a/nano/node/CMakeLists.txt b/nano/node/CMakeLists.txt index 4452df575c..aca5269b25 100644 --- a/nano/node/CMakeLists.txt +++ b/nano/node/CMakeLists.txt @@ -20,8 +20,8 @@ add_library( backlog_population.cpp bandwidth_limiter.hpp bandwidth_limiter.cpp - block_broadcast.cpp - block_broadcast.hpp + block_broadcaster.cpp + block_broadcaster.hpp blockprocessor.hpp blockprocessor.cpp bootstrap/block_deserializer.hpp diff --git a/nano/node/block_broadcast.cpp b/nano/node/block_broadcast.cpp deleted file mode 100644 index 2fb61e6a17..0000000000 --- a/nano/node/block_broadcast.cpp +++ /dev/null @@ -1,50 +0,0 @@ -#include -#include -#include - -nano::block_broadcast::block_broadcast (nano::network & network, bool enabled) : - network{ network }, - enabled{ enabled } -{ -} - -void nano::block_broadcast::connect (nano::block_processor & block_processor) -{ - if (!enabled) - { - return; - } - block_processor.block_processed.add ([this] (auto const & result, auto const & block, auto const & context) { - switch (result.code) - { - case nano::process_result::progress: - observe (block, context); - break; - default: - break; - } - }); -} - -void nano::block_broadcast::observe (std::shared_ptr const & block, nano::block_processor::context const & context) -{ - if (context.source == nano::block_source::local) - { - // Block created on this node - // Perform more agressive initial flooding - network.flood_block_initial (block); - } - else - { - if (context.source != nano::block_source::bootstrap) - { - // Block arrived from realtime traffic, do normal gossip. - network.flood_block (block, nano::transport::buffer_drop_policy::limiter); - } - else - { - // Block arrived from bootstrap - // Don't broadcast blocks we're bootstrapping - } - } -} diff --git a/nano/node/block_broadcast.hpp b/nano/node/block_broadcast.hpp deleted file mode 100644 index 4129ddc415..0000000000 --- a/nano/node/block_broadcast.hpp +++ /dev/null @@ -1,28 +0,0 @@ -#pragma once - -#include -#include - -#include -#include - -namespace nano -{ -class network; - -// This class tracks blocks that originated from this node. -class block_broadcast -{ -public: - block_broadcast (nano::network & network, bool enabled = false); - // Add batch_processed observer to block_processor if enabled - void connect (nano::block_processor & block_processor); - -private: - // Block_processor observer - void observe (std::shared_ptr const & block, nano::block_processor::context const &); - - nano::network & network; - bool enabled; -}; -} diff --git a/nano/node/block_broadcaster.cpp b/nano/node/block_broadcaster.cpp new file mode 100644 index 0000000000..75bca7f764 --- /dev/null +++ b/nano/node/block_broadcaster.cpp @@ -0,0 +1,160 @@ +#include +#include +#include +#include +#include +#include + +nano::block_broadcaster::block_broadcaster (nano::node & node_a, nano::block_processor & block_processor_a, nano::network & network_a, nano::stats & stats_a, bool enabled_a) : + node{ node_a }, + block_processor{ block_processor_a }, + network{ network_a }, + stats{ stats_a }, + enabled{ enabled_a } +{ + if (!enabled) + { + return; + } + + block_processor.batch_processed.add ([this] (auto const & batch) { + bool should_notify = false; + for (auto const & [result, block, context] : batch) + { + // Only rebroadcast local blocks that were successfully processed (no forks or gaps) + if (result.code == nano::process_result::progress && context.source == nano::block_source::local) + { + nano::lock_guard guard{ mutex }; + local_blocks.emplace_back (local_entry{ block, std::chrono::steady_clock::now () }); + stats.inc (nano::stat::type::block_broadcaster, nano::stat::detail::insert); + should_notify = true; + } + } + if (should_notify) + { + condition.notify_all (); + } + }); + + block_processor.rolled_back.add ([this] (auto const & block) { + nano::lock_guard guard{ mutex }; + auto erased = local_blocks.get ().erase (block->hash ()); + stats.add (nano::stat::type::block_broadcaster, nano::stat::detail::rollback, nano::stat::dir::in, erased); + }); +} + +nano::block_broadcaster::~block_broadcaster () +{ + // Thread must be stopped before destruction + debug_assert (!thread.joinable ()); +} + +void nano::block_broadcaster::start () +{ + if (!enabled) + { + return; + } + + debug_assert (!thread.joinable ()); + + thread = std::thread{ [this] () { + nano::thread_role::set (nano::thread_role::name::local_block_broadcasting); + run (); + } }; +} + +void nano::block_broadcaster::stop () +{ + { + nano::lock_guard lock{ mutex }; + stopped = true; + } + condition.notify_all (); + nano::join_or_pass (thread); +} + +void nano::block_broadcaster::run () +{ + nano::unique_lock lock{ mutex }; + while (!stopped) + { + stats.inc (nano::stat::type::block_broadcaster, nano::stat::detail::loop); + + condition.wait_for (lock, check_interval); + debug_assert ((std::this_thread::yield (), true)); // Introduce some random delay in debug builds + + if (!stopped) + { + cleanup (); + run_broadcasts (lock); + debug_assert (lock.owns_lock ()); + } + } +} + +void nano::block_broadcaster::run_broadcasts (nano::unique_lock & lock) +{ + debug_assert (lock.owns_lock ()); + + std::vector> to_broadcast; + + auto const now = std::chrono::steady_clock::now (); + for (auto & entry : local_blocks) + { + if (elapsed (entry.last_broadcast, broadcast_interval, now)) + { + entry.last_broadcast = now; + to_broadcast.push_back (entry.block); + } + } + + lock.unlock (); + + for (auto const & block : to_broadcast) + { + stats.inc (nano::stat::type::block_broadcaster, nano::stat::detail::broadcast, nano::stat::dir::out); + network.flood_block_initial (block); + } + + lock.lock (); +} + +void nano::block_broadcaster::cleanup () +{ + debug_assert (!mutex.try_lock ()); + + // Erase oldest blocks if the queue gets too big + while (local_blocks.size () > max_size) + { + stats.inc (nano::stat::type::block_broadcaster, nano::stat::detail::erase_oldest); + local_blocks.pop_front (); + } + + // TODO: Mutex is held during IO, but it should be fine since it's not performance critical + auto transaction = node.store.tx_begin_read (); + erase_if (local_blocks, [this, &transaction] (auto const & entry) { + transaction.refresh_if_needed (); + + if (entry.last_broadcast == std::chrono::steady_clock::time_point{}) + { + // This block has never been broadcasted, keep it so it's broadcasted at least once + return false; + } + if (node.block_confirmed_or_being_confirmed (transaction, entry.block->hash ())) + { + stats.inc (nano::stat::type::block_broadcaster, nano::stat::detail::erase_confirmed); + return true; + } + return false; + }); +} + +std::unique_ptr nano::block_broadcaster::collect_container_info (const std::string & name) const +{ + nano::lock_guard guard{ mutex }; + + auto composite = std::make_unique (name); + composite->add_component (std::make_unique (container_info{ "local", local_blocks.size (), sizeof (decltype (local_blocks)::value_type) })); + return composite; +} \ No newline at end of file diff --git a/nano/node/block_broadcaster.hpp b/nano/node/block_broadcaster.hpp new file mode 100644 index 0000000000..08ccb510e9 --- /dev/null +++ b/nano/node/block_broadcaster.hpp @@ -0,0 +1,131 @@ +#pragma once + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include +#include +#include + +namespace mi = boost::multi_index; + +namespace nano +{ +class node; +class network; +} + +namespace nano +{ +/** + * Broadcasts blocks to the network + * Tracks local blocks for more aggressive propagation + */ +class block_broadcaster +{ + enum class broadcast_strategy + { + normal, + aggressive, + }; + +public: + block_broadcaster (nano::node &, nano::block_processor &, nano::network &, nano::stats &, bool enabled = false); + ~block_broadcaster (); + + void start (); + void stop (); + + std::unique_ptr collect_container_info (std::string const & name) const; + +private: + void run (); + void run_broadcasts (nano::unique_lock &); + void cleanup (); + +private: // Dependencies + nano::node & node; + nano::block_processor & block_processor; + nano::network & network; + nano::stats & stats; + +private: + struct local_entry + { + std::shared_ptr const block; + std::chrono::steady_clock::time_point const arrival; + mutable std::chrono::steady_clock::time_point last_broadcast{}; // Not part of any index + + nano::block_hash hash () const + { + return block->hash (); + } + }; + + // clang-format off + class tag_sequenced {}; + class tag_hash {}; + + using ordered_locals = boost::multi_index_container>, + mi::hashed_unique, + mi::const_mem_fun> + >>; + // clang-format on + + ordered_locals local_blocks; + +private: + // class hash_tracker + // { + // public: + // void add (nano::block_hash const &); + // void erase (nano::block_hash const &); + // bool contains (nano::block_hash const &) const; + // + // private: + // mutable nano::mutex mutex; + // + // // clang-format off + // class tag_sequenced {}; + // class tag_hash {}; + // + // using ordered_hashes = boost::multi_index_container>, + // mi::hashed_unique, + // mi::identity> + // >>; + // // clang-format on + // + // // Blocks originated on this node + // ordered_hashes hashes; + // + // static std::size_t constexpr max_size = 1024 * 128; + // }; + // hash_tracker local; + +private: + bool enabled{ false }; + + std::atomic stopped{ false }; + nano::condition_variable condition; + mutable nano::mutex mutex; + std::thread thread; + + static std::size_t constexpr max_size{ 1024 * 8 }; + static std::chrono::seconds constexpr check_interval{ 30 }; + static std::chrono::seconds constexpr broadcast_interval{ 60 }; + // static std::chrono::seconds constexpr age_cutoff{ 60 * 60 }; +}; +} diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 884a0ae848..c03b6746fb 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -197,12 +197,11 @@ nano::node::node (boost::asio::io_context & io_ctx_a, std::filesystem::path cons epoch_upgrader{ *this, ledger, store, network_params, logger }, startup_time (std::chrono::steady_clock::now ()), node_seq (seq), - block_broadcast{ network, !flags.disable_block_processor_republishing }, + block_broadcaster{ *this, block_processor, network, stats, !flags.disable_block_processor_republishing }, process_live_dispatcher{ ledger, scheduler.priority, vote_cache, websocket } { logger.debug (nano::log::type::node, "Constructing node..."); - block_broadcast.connect (block_processor); process_live_dispatcher.connect (block_processor); unchecked.satisfied.add ([this] (nano::unchecked_info const & info) { @@ -658,6 +657,7 @@ void nano::node::start () } websocket.start (); telemetry.start (); + block_broadcaster.start (); } void nano::node::stop () @@ -698,6 +698,7 @@ void nano::node::stop () stats.stop (); epoch_upgrader.stop (); workers.stop (); + block_broadcaster.stop (); // work pool is not stopped on purpose due to testing setup } diff --git a/nano/node/node.hpp b/nano/node/node.hpp index 19e5a7a2ab..d6360ba72f 100644 --- a/nano/node/node.hpp +++ b/nano/node/node.hpp @@ -8,7 +8,7 @@ #include #include #include -#include +#include #include #include #include @@ -187,7 +187,7 @@ class node final : public std::enable_shared_from_this nano::bootstrap_ascending::service ascendboot; nano::websocket_server websocket; nano::epoch_upgrader epoch_upgrader; - nano::block_broadcast block_broadcast; + nano::block_broadcaster block_broadcaster; nano::process_live_dispatcher process_live_dispatcher; std::chrono::steady_clock::time_point const startup_time;