Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
pwojcikdev committed Apr 16, 2024
1 parent 1bf4e9d commit 5035aac
Show file tree
Hide file tree
Showing 13 changed files with 250 additions and 59 deletions.
1 change: 1 addition & 0 deletions nano/lib/logging_enums.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ enum class type
rep_tiers,
syn_cookies,
thread_runner,
peer_cache,

// bootstrap
bulk_pull_client,
Expand Down
3 changes: 3 additions & 0 deletions nano/lib/stats_enums.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ enum class type : uint8_t
local_block_broadcaster,
rep_tiers,
syn_cookies,
peer_cache,

bootstrap_ascending,
bootstrap_ascending_accounts,
Expand All @@ -78,6 +79,8 @@ enum class detail : uint8_t
ignored,
update,
updated,
inserted,
erased,
request,
broadcast,
cleanup,
Expand Down
3 changes: 3 additions & 0 deletions nano/lib/thread_roles.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ std::string nano::thread_role::get_string (nano::thread_role::name role)
case nano::thread_role::name::signal_manager:
thread_role_name_string = "Signal manager";
break;
case nano::thread_role::name::peer_cache:
thread_role_name_string = "Peer cache";
break;
default:
debug_assert (false && "nano::thread_role::get_string unhandled thread role");
}
Expand Down
1 change: 1 addition & 0 deletions nano/lib/thread_roles.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ enum class name
network_keepalive,
network_reachout,
signal_manager,
peer_cache,
};

std::string_view to_string (name);
Expand Down
2 changes: 2 additions & 0 deletions nano/node/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ add_library(
openclconfig.cpp
openclwork.hpp
openclwork.cpp
peer_cache.hpp
peer_cache.cpp
peer_exclusion.hpp
peer_exclusion.cpp
portmapping.hpp
Expand Down
34 changes: 11 additions & 23 deletions nano/node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <nano/node/local_vote_history.hpp>
#include <nano/node/make_store.hpp>
#include <nano/node/node.hpp>
#include <nano/node/peer_cache.hpp>
#include <nano/node/scheduler/component.hpp>
#include <nano/node/scheduler/hinted.hpp>
#include <nano/node/scheduler/manual.hpp>
Expand Down Expand Up @@ -198,6 +199,8 @@ nano::node::node (std::shared_ptr<boost::asio::io_context> io_ctx_a, std::filesy
epoch_upgrader{ *this, ledger, store, network_params, logger },
local_block_broadcaster{ *this, block_processor, network, stats, !flags.disable_block_processor_republishing },
process_live_dispatcher{ ledger, scheduler.priority, vote_cache, websocket },
peer_cache_impl{ std::make_unique<nano::peer_cache> (config.peer_cache, store, network, logger, stats) },
peer_cache{ *peer_cache_impl },
startup_time (std::chrono::steady_clock::now ()),
node_seq (seq)
{
Expand Down Expand Up @@ -613,8 +616,9 @@ void nano::node::process_local_async (std::shared_ptr<nano::block> const & block
void nano::node::start ()
{
long_inactivity_cleanup ();

network.start ();
add_initial_peers ();

if (!flags.disable_legacy_bootstrap && !flags.disable_ongoing_bootstrap)
{
ongoing_bootstrap ();
Expand All @@ -630,7 +634,7 @@ void nano::node::start ()
{
rep_crawler.start ();
}
ongoing_peer_store ();

ongoing_online_weight_calculation_queue ();

bool tcp_enabled = false;
Expand Down Expand Up @@ -692,6 +696,9 @@ void nano::node::start ()
websocket.start ();
telemetry.start ();
local_block_broadcaster.start ();
peer_cache.start ();

add_initial_peers ();
}

void nano::node::stop ()
Expand All @@ -704,6 +711,7 @@ void nano::node::stop ()

logger.info (nano::log::type::node, "Node stopping...");

peer_cache.stop ();
// Cancels ongoing work generation tasks, which may be blocking other threads
// No tasks may wait for work generation in I/O threads, or termination signal capturing will be unable to call node::stop()
distributed_work.stop ();
Expand Down Expand Up @@ -867,18 +875,6 @@ void nano::node::ongoing_bootstrap ()
});
}

void nano::node::ongoing_peer_store ()
{
const bool stored{ network.tcp_channels.store_all (true) };
std::weak_ptr<nano::node> node_w (shared_from_this ());
workers.add_timed_task (std::chrono::steady_clock::now () + network_params.network.peer_dump_interval, [node_w] () {
if (auto node_l = node_w.lock ())
{
node_l->ongoing_peer_store ();
}
});
}

void nano::node::backup_wallet ()
{
auto transaction (wallets.tx_begin_read ());
Expand Down Expand Up @@ -1157,15 +1153,7 @@ void nano::node::add_initial_peers ()
return;
}

std::vector<nano::endpoint> initial_peers;
{
auto transaction = store.tx_begin_read ();
for (auto i (store.peer.begin (transaction)), n (store.peer.end ()); i != n; ++i)
{
nano::endpoint endpoint (boost::asio::ip::address_v6 (i->first.address_bytes ()), i->first.port ());
initial_peers.push_back (endpoint);
}
}
auto initial_peers = peer_cache.cached_peers ();

logger.info (nano::log::type::node, "Adding cached initial peers: {}", initial_peers.size ());

Expand Down
8 changes: 3 additions & 5 deletions nano/node/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class active_transactions;
class confirming_set;
class node;
class work_pool;
class peer_cache;

namespace scheduler
{
Expand Down Expand Up @@ -97,7 +98,6 @@ class node final : public std::enable_shared_from_this<node>
nano::uint128_t weight (nano::account const &);
nano::uint128_t minimum_principal_weight ();
void ongoing_bootstrap ();
void ongoing_peer_store ();
void backup_wallet ();
void search_receivable_all ();
void bootstrap_wallet ();
Expand Down Expand Up @@ -183,11 +183,7 @@ class node final : public std::enable_shared_from_this<node>
nano::vote_generator & generator;
std::unique_ptr<nano::vote_generator> final_generator_impl;
nano::vote_generator & final_generator;

private: // Placed here to maintain initialization order
std::unique_ptr<nano::scheduler::component> scheduler_impl;

public:
nano::scheduler::component & scheduler;
nano::request_aggregator aggregator;
nano::wallets wallets;
Expand All @@ -197,6 +193,8 @@ class node final : public std::enable_shared_from_this<node>
nano::epoch_upgrader epoch_upgrader;
nano::local_block_broadcaster local_block_broadcaster;
nano::process_live_dispatcher process_live_dispatcher;
std::unique_ptr<nano::peer_cache> peer_cache_impl;
nano::peer_cache & peer_cache;

std::chrono::steady_clock::time_point const startup_time;
std::chrono::seconds unchecked_cutoff = std::chrono::seconds (7 * 24 * 60 * 60); // Week
Expand Down
13 changes: 12 additions & 1 deletion nano/node/nodeconfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ nano::node_config::node_config (const std::optional<uint16_t> & peering_port_a,
ipc_config{ network_params.network },
external_address{ boost::asio::ip::address_v6{}.to_string () },
rep_crawler{ network_params.network },
block_processor{ network_params.network }
block_processor{ network_params.network },
peer_cache{ network_params.network }
{
if (peering_port == 0)
{
Expand Down Expand Up @@ -217,6 +218,10 @@ nano::error nano::node_config::serialize_toml (nano::tomlconfig & toml) const
block_processor.serialize (block_processor_l);
toml.put_child ("block_processor", block_processor_l);

nano::tomlconfig peer_cache_l;
peer_cache.serialize (peer_cache_l);
toml.put_child ("peer_cache", peer_cache_l);

return toml.get_error ();
}

Expand Down Expand Up @@ -298,6 +303,12 @@ nano::error nano::node_config::deserialize_toml (nano::tomlconfig & toml)
block_processor.deserialize (config_l);
}

if (toml.has_key ("peer_cache"))
{
auto config_l = toml.get_required_child ("peer_cache");
peer_cache.deserialize (config_l);
}

if (toml.has_key ("work_peers"))
{
work_peers.clear ();
Expand Down
2 changes: 2 additions & 0 deletions nano/node/nodeconfig.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <nano/node/blockprocessor.hpp>
#include <nano/node/bootstrap/bootstrap_config.hpp>
#include <nano/node/ipc/ipc_config.hpp>
#include <nano/node/peer_cache.hpp>
#include <nano/node/repcrawler.hpp>
#include <nano/node/scheduler/hinted.hpp>
#include <nano/node/scheduler/optimistic.hpp>
Expand Down Expand Up @@ -138,6 +139,7 @@ class node_config
nano::vote_cache_config vote_cache;
nano::rep_crawler_config rep_crawler;
nano::block_processor_config block_processor;
nano::peer_cache_config peer_cache;

public:
std::string serialize_frontiers_confirmation (nano::frontiers_confirmation_mode) const;
Expand Down
155 changes: 155 additions & 0 deletions nano/node/peer_cache.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
#include <nano/lib/thread_roles.hpp>
#include <nano/node/network.hpp>
#include <nano/node/peer_cache.hpp>
#include <nano/node/transport/channel.hpp>
#include <nano/store/component.hpp>
#include <nano/store/peer.hpp>

nano::peer_cache::peer_cache (nano::peer_cache_config const & config_a, nano::store::component & store_a, nano::network & network_a, nano::logger & logger_a, nano::stats & stats_a) :
config{ config_a },
store{ store_a },
network{ network_a },
logger{ logger_a },
stats{ stats_a }
{
}

nano::peer_cache::~peer_cache ()
{
debug_assert (!thread.joinable ());
}

void nano::peer_cache::start ()
{
debug_assert (!thread.joinable ());

thread = std::thread ([this] {
nano::thread_role::set (nano::thread_role::name::peer_cache);
run ();
});
}

void nano::peer_cache::stop ()
{
{
nano::lock_guard<nano::mutex> guard{ mutex };
stopped = true;
}
condition.notify_all ();
if (thread.joinable ())
{
thread.join ();
}
}

bool nano::peer_cache::exists (nano::endpoint const & endpoint) const
{
auto transaction = store.tx_begin_read ();
return store.peer.exists (transaction, endpoint);
}

size_t nano::peer_cache::size () const
{
auto transaction = store.tx_begin_read ();
return store.peer.count (transaction);
}

void nano::peer_cache::trigger ()
{
condition.notify_all ();
}

void nano::peer_cache::run ()
{
nano::unique_lock<nano::mutex> lock{ mutex };
while (!stopped)
{
condition.wait_for (lock, config.check_interval, [this] { return stopped.load (); });
if (!stopped)
{
stats.inc (nano::stat::type::peer_cache, nano::stat::detail::loop);

lock.unlock ();

run_one ();

lock.lock ();
}
}
}

void nano::peer_cache::run_one ()
{
auto live_peers = network.list ();
auto transaction = store.tx_begin_write ({ tables::peers });

// Add or update live peers
for (auto const & peer : live_peers)
{
auto const endpoint = peer->get_endpoint ();
bool const exists = store.peer.exists (transaction, endpoint);
store.peer.put (transaction, endpoint, nano::milliseconds_since_epoch ());
if (!exists)
{
stats.inc (nano::stat::type::peer_cache, nano::stat::detail::inserted);
logger.debug (nano::log::type::peer_cache, "Cached new peer: {}", fmt::streamed (endpoint));
}
else
{
stats.inc (nano::stat::type::peer_cache, nano::stat::detail::updated);
}
}

// Erase old peers
auto const cutoff = std::chrono::system_clock::now () - config.erase_cutoff;

for (auto it = store.peer.begin (transaction); it != store.peer.end (); ++it)
{
auto const [endpoint, timestamp_millis] = *it;
auto timestamp = nano::from_milliseconds_since_epoch (timestamp_millis);
if (timestamp < cutoff)
{
store.peer.del (transaction, endpoint);

stats.inc (nano::stat::type::peer_cache, nano::stat::detail::erased);
logger.debug (nano::log::type::peer_cache, "Erased peer: {} (not seen for {}s)",
fmt::streamed (endpoint.endpoint ()),
nano::log::seconds_delta (timestamp));
}
}
}

std::vector<nano::endpoint> nano::peer_cache::cached_peers () const
{
auto transaction = store.tx_begin_read ();
std::vector<nano::endpoint> peers;
for (auto it = store.peer.begin (transaction); it != store.peer.end (); ++it)
{
auto const [endpoint, timestamp_millis] = *it;
peers.push_back (endpoint.endpoint ());
}
return peers;
}

/*
* peer_cache_config
*/

nano::peer_cache_config::peer_cache_config (nano::network_constants const & network)
{
if (network.is_dev_network ())
{
check_interval = 1s;
erase_cutoff = 3s;
}
}

nano::error nano::peer_cache_config::serialize (nano::tomlconfig & toml) const
{
return toml.get_error ();
}

nano::error nano::peer_cache_config::deserialize (nano::tomlconfig & toml)
{
return toml.get_error ();
}
Loading

0 comments on commit 5035aac

Please sign in to comment.