diff --git a/nano/lib/logging_enums.hpp b/nano/lib/logging_enums.hpp index 79f4df8dec..cd0db5f9db 100644 --- a/nano/lib/logging_enums.hpp +++ b/nano/lib/logging_enums.hpp @@ -75,6 +75,7 @@ enum class type rep_tiers, syn_cookies, thread_runner, + peer_cache, // bootstrap bulk_pull_client, diff --git a/nano/lib/stats_enums.hpp b/nano/lib/stats_enums.hpp index 709902d59e..2f2e8b9973 100644 --- a/nano/lib/stats_enums.hpp +++ b/nano/lib/stats_enums.hpp @@ -56,6 +56,7 @@ enum class type : uint8_t local_block_broadcaster, rep_tiers, syn_cookies, + peer_cache, bootstrap_ascending, bootstrap_ascending_accounts, @@ -78,6 +79,8 @@ enum class detail : uint8_t ignored, update, updated, + inserted, + erased, request, broadcast, cleanup, diff --git a/nano/lib/thread_roles.cpp b/nano/lib/thread_roles.cpp index 724fd13861..917ecebac6 100644 --- a/nano/lib/thread_roles.cpp +++ b/nano/lib/thread_roles.cpp @@ -128,6 +128,9 @@ std::string nano::thread_role::get_string (nano::thread_role::name role) case nano::thread_role::name::signal_manager: thread_role_name_string = "Signal manager"; break; + case nano::thread_role::name::peer_cache: + thread_role_name_string = "Peer cache"; + break; default: debug_assert (false && "nano::thread_role::get_string unhandled thread role"); } diff --git a/nano/lib/thread_roles.hpp b/nano/lib/thread_roles.hpp index d3822f1981..7f1659a952 100644 --- a/nano/lib/thread_roles.hpp +++ b/nano/lib/thread_roles.hpp @@ -49,6 +49,7 @@ enum class name network_keepalive, network_reachout, signal_manager, + peer_cache, }; std::string_view to_string (name); diff --git a/nano/node/CMakeLists.txt b/nano/node/CMakeLists.txt index aa7b9a774a..780c8aa521 100644 --- a/nano/node/CMakeLists.txt +++ b/nano/node/CMakeLists.txt @@ -119,6 +119,8 @@ add_library( openclconfig.cpp openclwork.hpp openclwork.cpp + peer_cache.hpp + peer_cache.cpp peer_exclusion.hpp peer_exclusion.cpp portmapping.hpp diff --git a/nano/node/node.cpp b/nano/node/node.cpp index e19ba0688a..dec8a2057b 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -10,6 +10,7 @@ #include #include #include +#include #include #include #include @@ -198,6 +199,8 @@ nano::node::node (std::shared_ptr io_ctx_a, std::filesy epoch_upgrader{ *this, ledger, store, network_params, logger }, local_block_broadcaster{ *this, block_processor, network, stats, !flags.disable_block_processor_republishing }, process_live_dispatcher{ ledger, scheduler.priority, vote_cache, websocket }, + peer_cache_impl{ std::make_unique (config.peer_cache, store, network, logger, stats) }, + peer_cache{ *peer_cache_impl }, startup_time (std::chrono::steady_clock::now ()), node_seq (seq) { @@ -613,8 +616,9 @@ void nano::node::process_local_async (std::shared_ptr const & block void nano::node::start () { long_inactivity_cleanup (); + network.start (); - add_initial_peers (); + if (!flags.disable_legacy_bootstrap && !flags.disable_ongoing_bootstrap) { ongoing_bootstrap (); @@ -630,7 +634,7 @@ void nano::node::start () { rep_crawler.start (); } - ongoing_peer_store (); + ongoing_online_weight_calculation_queue (); bool tcp_enabled = false; @@ -692,6 +696,9 @@ void nano::node::start () websocket.start (); telemetry.start (); local_block_broadcaster.start (); + peer_cache.start (); + + add_initial_peers (); } void nano::node::stop () @@ -704,6 +711,7 @@ void nano::node::stop () logger.info (nano::log::type::node, "Node stopping..."); + peer_cache.stop (); // Cancels ongoing work generation tasks, which may be blocking other threads // No tasks may wait for work generation in I/O threads, or termination signal capturing will be unable to call node::stop() distributed_work.stop (); @@ -867,18 +875,6 @@ void nano::node::ongoing_bootstrap () }); } -void nano::node::ongoing_peer_store () -{ - const bool stored{ network.tcp_channels.store_all (true) }; - std::weak_ptr node_w (shared_from_this ()); - workers.add_timed_task (std::chrono::steady_clock::now () + network_params.network.peer_dump_interval, [node_w] () { - if (auto node_l = node_w.lock ()) - { - node_l->ongoing_peer_store (); - } - }); -} - void nano::node::backup_wallet () { auto transaction (wallets.tx_begin_read ()); @@ -1157,15 +1153,7 @@ void nano::node::add_initial_peers () return; } - std::vector initial_peers; - { - auto transaction = store.tx_begin_read (); - for (auto i (store.peer.begin (transaction)), n (store.peer.end ()); i != n; ++i) - { - nano::endpoint endpoint (boost::asio::ip::address_v6 (i->first.address_bytes ()), i->first.port ()); - initial_peers.push_back (endpoint); - } - } + auto initial_peers = peer_cache.cached_peers (); logger.info (nano::log::type::node, "Adding cached initial peers: {}", initial_peers.size ()); diff --git a/nano/node/node.hpp b/nano/node/node.hpp index 101d3391cc..dcb4d99773 100644 --- a/nano/node/node.hpp +++ b/nano/node/node.hpp @@ -48,6 +48,7 @@ class active_transactions; class confirming_set; class node; class work_pool; +class peer_cache; namespace scheduler { @@ -97,7 +98,6 @@ class node final : public std::enable_shared_from_this nano::uint128_t weight (nano::account const &); nano::uint128_t minimum_principal_weight (); void ongoing_bootstrap (); - void ongoing_peer_store (); void backup_wallet (); void search_receivable_all (); void bootstrap_wallet (); @@ -183,11 +183,7 @@ class node final : public std::enable_shared_from_this nano::vote_generator & generator; std::unique_ptr final_generator_impl; nano::vote_generator & final_generator; - -private: // Placed here to maintain initialization order std::unique_ptr scheduler_impl; - -public: nano::scheduler::component & scheduler; nano::request_aggregator aggregator; nano::wallets wallets; @@ -197,6 +193,8 @@ class node final : public std::enable_shared_from_this nano::epoch_upgrader epoch_upgrader; nano::local_block_broadcaster local_block_broadcaster; nano::process_live_dispatcher process_live_dispatcher; + std::unique_ptr peer_cache_impl; + nano::peer_cache & peer_cache; std::chrono::steady_clock::time_point const startup_time; std::chrono::seconds unchecked_cutoff = std::chrono::seconds (7 * 24 * 60 * 60); // Week diff --git a/nano/node/nodeconfig.cpp b/nano/node/nodeconfig.cpp index cfa5fcbf66..2661e5d7d8 100644 --- a/nano/node/nodeconfig.cpp +++ b/nano/node/nodeconfig.cpp @@ -34,7 +34,8 @@ nano::node_config::node_config (const std::optional & peering_port_a, ipc_config{ network_params.network }, external_address{ boost::asio::ip::address_v6{}.to_string () }, rep_crawler{ network_params.network }, - block_processor{ network_params.network } + block_processor{ network_params.network }, + peer_cache{ network_params.network } { if (peering_port == 0) { @@ -217,6 +218,10 @@ nano::error nano::node_config::serialize_toml (nano::tomlconfig & toml) const block_processor.serialize (block_processor_l); toml.put_child ("block_processor", block_processor_l); + nano::tomlconfig peer_cache_l; + peer_cache.serialize (peer_cache_l); + toml.put_child ("peer_cache", peer_cache_l); + return toml.get_error (); } @@ -298,6 +303,12 @@ nano::error nano::node_config::deserialize_toml (nano::tomlconfig & toml) block_processor.deserialize (config_l); } + if (toml.has_key ("peer_cache")) + { + auto config_l = toml.get_required_child ("peer_cache"); + peer_cache.deserialize (config_l); + } + if (toml.has_key ("work_peers")) { work_peers.clear (); diff --git a/nano/node/nodeconfig.hpp b/nano/node/nodeconfig.hpp index db09480e35..75156d6476 100644 --- a/nano/node/nodeconfig.hpp +++ b/nano/node/nodeconfig.hpp @@ -11,6 +11,7 @@ #include #include #include +#include #include #include #include @@ -138,6 +139,7 @@ class node_config nano::vote_cache_config vote_cache; nano::rep_crawler_config rep_crawler; nano::block_processor_config block_processor; + nano::peer_cache_config peer_cache; public: std::string serialize_frontiers_confirmation (nano::frontiers_confirmation_mode) const; diff --git a/nano/node/peer_cache.cpp b/nano/node/peer_cache.cpp new file mode 100644 index 0000000000..315bea1f67 --- /dev/null +++ b/nano/node/peer_cache.cpp @@ -0,0 +1,155 @@ +#include +#include +#include +#include +#include +#include + +nano::peer_cache::peer_cache (nano::peer_cache_config const & config_a, nano::store::component & store_a, nano::network & network_a, nano::logger & logger_a, nano::stats & stats_a) : + config{ config_a }, + store{ store_a }, + network{ network_a }, + logger{ logger_a }, + stats{ stats_a } +{ +} + +nano::peer_cache::~peer_cache () +{ + debug_assert (!thread.joinable ()); +} + +void nano::peer_cache::start () +{ + debug_assert (!thread.joinable ()); + + thread = std::thread ([this] { + nano::thread_role::set (nano::thread_role::name::peer_cache); + run (); + }); +} + +void nano::peer_cache::stop () +{ + { + nano::lock_guard guard{ mutex }; + stopped = true; + } + condition.notify_all (); + if (thread.joinable ()) + { + thread.join (); + } +} + +bool nano::peer_cache::exists (nano::endpoint const & endpoint) const +{ + auto transaction = store.tx_begin_read (); + return store.peer.exists (transaction, endpoint); +} + +size_t nano::peer_cache::size () const +{ + auto transaction = store.tx_begin_read (); + return store.peer.count (transaction); +} + +void nano::peer_cache::trigger () +{ + condition.notify_all (); +} + +void nano::peer_cache::run () +{ + nano::unique_lock lock{ mutex }; + while (!stopped) + { + condition.wait_for (lock, config.check_interval, [this] { return stopped.load (); }); + if (!stopped) + { + stats.inc (nano::stat::type::peer_cache, nano::stat::detail::loop); + + lock.unlock (); + + run_one (); + + lock.lock (); + } + } +} + +void nano::peer_cache::run_one () +{ + auto live_peers = network.list (); + auto transaction = store.tx_begin_write ({ tables::peers }); + + // Add or update live peers + for (auto const & peer : live_peers) + { + auto const endpoint = peer->get_endpoint (); + bool const exists = store.peer.exists (transaction, endpoint); + store.peer.put (transaction, endpoint, nano::milliseconds_since_epoch ()); + if (!exists) + { + stats.inc (nano::stat::type::peer_cache, nano::stat::detail::inserted); + logger.debug (nano::log::type::peer_cache, "Cached new peer: {}", fmt::streamed (endpoint)); + } + else + { + stats.inc (nano::stat::type::peer_cache, nano::stat::detail::updated); + } + } + + // Erase old peers + auto const cutoff = std::chrono::system_clock::now () - config.erase_cutoff; + + for (auto it = store.peer.begin (transaction); it != store.peer.end (); ++it) + { + auto const [endpoint, timestamp_millis] = *it; + auto timestamp = nano::from_milliseconds_since_epoch (timestamp_millis); + if (timestamp < cutoff) + { + store.peer.del (transaction, endpoint); + + stats.inc (nano::stat::type::peer_cache, nano::stat::detail::erased); + logger.debug (nano::log::type::peer_cache, "Erased peer: {} (not seen for {}s)", + fmt::streamed (endpoint.endpoint ()), + nano::log::seconds_delta (timestamp)); + } + } +} + +std::vector nano::peer_cache::cached_peers () const +{ + auto transaction = store.tx_begin_read (); + std::vector peers; + for (auto it = store.peer.begin (transaction); it != store.peer.end (); ++it) + { + auto const [endpoint, timestamp_millis] = *it; + peers.push_back (endpoint.endpoint ()); + } + return peers; +} + +/* + * peer_cache_config + */ + +nano::peer_cache_config::peer_cache_config (nano::network_constants const & network) +{ + if (network.is_dev_network ()) + { + check_interval = 1s; + erase_cutoff = 3s; + } +} + +nano::error nano::peer_cache_config::serialize (nano::tomlconfig & toml) const +{ + return toml.get_error (); +} + +nano::error nano::peer_cache_config::deserialize (nano::tomlconfig & toml) +{ + return toml.get_error (); +} diff --git a/nano/node/peer_cache.hpp b/nano/node/peer_cache.hpp new file mode 100644 index 0000000000..c7e5a621b5 --- /dev/null +++ b/nano/node/peer_cache.hpp @@ -0,0 +1,57 @@ +#pragma once + +#include +#include +#include + +#include +#include +#include + +namespace nano +{ +class peer_cache_config final +{ +public: + explicit peer_cache_config (nano::network_constants const & network); + + nano::error deserialize (nano::tomlconfig & toml); + nano::error serialize (nano::tomlconfig & toml) const; + +public: + std::chrono::seconds erase_cutoff{ 60 * 60s }; + std::chrono::seconds check_interval{ 15s }; +}; + +class peer_cache final +{ +public: + peer_cache (peer_cache_config const &, nano::store::component &, nano::network &, nano::logger &, nano::stats &); + ~peer_cache (); + + void start (); + void stop (); + + std::vector cached_peers () const; + bool exists (nano::endpoint const & endpoint) const; + size_t size () const; + void trigger (); + +private: + void run (); + void run_one (); + +private: // Dependencies + peer_cache_config const & config; + nano::store::component & store; + nano::network & network; + nano::logger & logger; + nano::stats & stats; + +private: + std::atomic stopped{ false }; + mutable nano::mutex mutex; + nano::condition_variable condition; + std::thread thread; +}; +} \ No newline at end of file diff --git a/nano/node/transport/tcp.cpp b/nano/node/transport/tcp.cpp index 5176b6c58d..f27772a29f 100644 --- a/nano/node/transport/tcp.cpp +++ b/nano/node/transport/tcp.cpp @@ -241,35 +241,6 @@ void nano::transport::tcp_channels::random_fill (std::array & } } -bool nano::transport::tcp_channels::store_all (bool clear_peers) -{ - // We can't hold the mutex while starting a write transaction, so - // we collect endpoints to be saved and then relase the lock. - std::vector endpoints; - { - nano::lock_guard lock{ mutex }; - endpoints.reserve (channels.size ()); - std::transform (channels.begin (), channels.end (), - std::back_inserter (endpoints), [] (auto const & channel) { return nano::transport::map_tcp_to_endpoint (channel.endpoint ()); }); - } - bool result (false); - if (!endpoints.empty ()) - { - // Clear all peers then refresh with the current list of peers - auto transaction (node.store.tx_begin_write ({ tables::peers })); - if (clear_peers) - { - node.store.peer.clear (transaction); - } - for (auto const & endpoint : endpoints) - { - node.store.peer.put (transaction, nano::endpoint_key{ endpoint.address ().to_v6 ().to_bytes (), endpoint.port () }, nano::milliseconds_since_epoch ()); - } - result = true; - } - return result; -} - std::shared_ptr nano::transport::tcp_channels::find_node_id (nano::account const & node_id_a) { std::shared_ptr result; diff --git a/nano/node/transport/tcp.hpp b/nano/node/transport/tcp.hpp index a1430171a5..81c038e66f 100644 --- a/nano/node/transport/tcp.hpp +++ b/nano/node/transport/tcp.hpp @@ -154,7 +154,6 @@ namespace transport std::shared_ptr find_channel (nano::tcp_endpoint const &) const; void random_fill (std::array &) const; std::unordered_set> random_set (std::size_t, uint8_t = 0, bool = false) const; - bool store_all (bool = true); std::shared_ptr find_node_id (nano::account const &); // Get the next peer for attempting a tcp connection nano::tcp_endpoint bootstrap_peer ();