Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework nano::thread_pool #4762

Merged
merged 9 commits into from
Oct 27, 2024
Prev Previous commit
Next Next commit
Replace post_timed with post_delayed
  • Loading branch information
pwojcikdev committed Oct 22, 2024
commit f53f1f8d98b044d7fe670f2c08e5b53f84a903cb
8 changes: 4 additions & 4 deletions nano/core_test/thread_pool.cpp
Original file line number Diff line number Diff line change
@@ -39,7 +39,7 @@ TEST (thread_pool, one)
nano::condition_variable condition;
nano::thread_pool workers (1u, nano::thread_role::name::unknown);
nano::test::start_stop_guard stop_guard{ workers };
workers.post_timed (std::chrono::steady_clock::now (), [&] () {
workers.post ([&] () {
{
nano::lock_guard<nano::mutex> lock{ mutex };
done = true;
@@ -59,7 +59,7 @@ TEST (thread_pool, many)
nano::test::start_stop_guard stop_guard{ workers };
for (auto i (0); i < 50; ++i)
{
workers.post_timed (std::chrono::steady_clock::now (), [&] () {
workers.post ([&] () {
{
nano::lock_guard<nano::mutex> lock{ mutex };
count += 1;
@@ -79,12 +79,12 @@ TEST (thread_pool, top_execution)
std::promise<bool> promise;
nano::thread_pool workers (1u, nano::thread_role::name::unknown);
nano::test::start_stop_guard stop_guard{ workers };
workers.post_timed (std::chrono::steady_clock::now (), [&] () {
workers.post ([&] () {
nano::lock_guard<nano::mutex> lock{ mutex };
value1 = 1;
value2 = 1;
});
workers.post_timed (std::chrono::steady_clock::now () + std::chrono::milliseconds (1), [&] () {
workers.post_delayed (std::chrono::milliseconds (1), [&] () {
nano::lock_guard<nano::mutex> lock{ mutex };
value2 = 2;
promise.set_value (false);
15 changes: 12 additions & 3 deletions nano/lib/thread_pool.hpp
Original file line number Diff line number Diff line change
@@ -84,17 +84,19 @@ class thread_pool final
}

template <typename F>
void post_timed (std::chrono::steady_clock::time_point const & expiry_time, F && task)
void post_delayed (std::chrono::steady_clock::duration const & delay, F && task)
{
nano::lock_guard<nano::mutex> guard{ mutex };
if (!stopped)
{
++num_delayed;
release_assert (thread_pool_impl);
auto timer = std::make_shared<boost::asio::steady_timer> (thread_pool_impl->get_executor ());
timer->expires_at (expiry_time);
timer->expires_after (delay);
timer->async_wait ([this, t = std::forward<F> (task), /* preserve lifetime */ timer] (boost::system::error_code const & ec) mutable {
if (!ec)
{
--num_delayed;
post (std::move (t));
}
});
@@ -112,10 +114,16 @@ class thread_pool final
return num_tasks;
}

uint64_t delayed_tasks () const
{
return num_delayed;
}

nano::container_info container_info () const
{
nano::container_info info;
info.put ("tasks", queued_tasks ());
info.put ("tasks", num_tasks);
info.put ("delayed", num_delayed);
return info;
}

@@ -141,5 +149,6 @@ class thread_pool final
std::atomic<bool> stopped{ false };
std::unique_ptr<boost::asio::thread_pool> thread_pool_impl;
std::atomic<uint64_t> num_tasks{ 0 };
std::atomic<uint64_t> num_delayed{ 0 };
};
}
2 changes: 1 addition & 1 deletion nano/node/bootstrap/bootstrap_bulk_pull.cpp
Original file line number Diff line number Diff line change
@@ -127,7 +127,7 @@ void nano::bulk_pull_client::throttled_receive_block ()
else
{
auto this_l (shared_from_this ());
node->workers.post_timed (std::chrono::steady_clock::now () + std::chrono::seconds (1), [this_l] () {
node->workers.post_delayed (std::chrono::seconds (1), [this_l] () {
if (!this_l->connection->pending_stop && !this_l->attempt->stopped)
{
this_l->throttled_receive_block ();
2 changes: 1 addition & 1 deletion nano/node/bootstrap/bootstrap_bulk_push.cpp
Original file line number Diff line number Diff line change
@@ -144,7 +144,7 @@ void nano::bulk_push_server::throttled_receive ()
else
{
auto this_l (shared_from_this ());
node->workers.post_timed (std::chrono::steady_clock::now () + std::chrono::seconds (1), [this_l] () {
node->workers.post_delayed (std::chrono::seconds (1), [this_l] () {
if (!this_l->connection->stopped)
{
this_l->throttled_receive ();
2 changes: 1 addition & 1 deletion nano/node/bootstrap/bootstrap_connections.cpp
Original file line number Diff line number Diff line change
@@ -306,7 +306,7 @@ void nano::bootstrap_connections::populate_connections (bool repeat)
if (!stopped && repeat)
{
std::weak_ptr<nano::bootstrap_connections> this_w (shared_from_this ());
node.workers.post_timed (std::chrono::steady_clock::now () + std::chrono::seconds (1), [this_w] () {
node.workers.post_delayed (std::chrono::seconds (1), [this_w] () {
if (auto this_l = this_w.lock ())
{
this_l->populate_connections ();
3 changes: 1 addition & 2 deletions nano/node/distributed_work.cpp
Original file line number Diff line number Diff line change
@@ -400,10 +400,9 @@ void nano::distributed_work::handle_failure ()

status = work_generation_status::failure_peers;

auto now (std::chrono::steady_clock::now ());
std::weak_ptr<nano::node> node_weak (node.shared ());
auto next_backoff (std::min (backoff * 2, std::chrono::seconds (5 * 60)));
node.workers.post_timed (now + std::chrono::seconds (backoff), [node_weak, request_l = request, next_backoff] {
node.workers.post_delayed (std::chrono::seconds (backoff), [node_weak, request_l = request, next_backoff] {
bool error_l{ true };
if (auto node_l = node_weak.lock ())
{
2 changes: 1 addition & 1 deletion nano/node/network.cpp
Original file line number Diff line number Diff line change
@@ -300,7 +300,7 @@ void nano::network::flood_block_many (std::deque<std::shared_ptr<nano::block>> b
if (!blocks_a.empty ())
{
std::weak_ptr<nano::node> node_w (node.shared ());
node.workers.post_timed (std::chrono::steady_clock::now () + std::chrono::milliseconds (delay_a + std::rand () % delay_a), [node_w, blocks (std::move (blocks_a)), callback_a, delay_a] () {
node.workers.post_delayed (std::chrono::milliseconds (delay_a + std::rand () % delay_a), [node_w, blocks (std::move (blocks_a)), callback_a, delay_a] () {
if (auto node_l = node_w.lock ())
{
node_l->network.flood_block_many (std::move (blocks), callback_a, delay_a);
14 changes: 7 additions & 7 deletions nano/node/node.cpp
Original file line number Diff line number Diff line change
@@ -609,7 +609,7 @@ void nano::node::start ()
{
// Delay to start wallet lazy bootstrap
auto this_l (shared ());
workers.post_timed (std::chrono::steady_clock::now () + std::chrono::minutes (1), [this_l] () {
workers.post_delayed (std::chrono::minutes (1), [this_l] () {
this_l->bootstrap_wallet ();
});
}
@@ -829,7 +829,7 @@ void nano::node::ongoing_bootstrap ()
// Bootstrap and schedule for next attempt
bootstrap_initiator.bootstrap (false, boost::str (boost::format ("auto_bootstrap_%1%") % previous_bootstrap_count), frontiers_age);
std::weak_ptr<nano::node> node_w (shared_from_this ());
workers.post_timed (std::chrono::steady_clock::now () + next_wakeup, [node_w] () {
workers.post_delayed (next_wakeup, [node_w] () {
if (auto node_l = node_w.lock ())
{
node_l->ongoing_bootstrap ();
@@ -850,7 +850,7 @@ void nano::node::backup_wallet ()
i->second->store.write_backup (transaction, backup_path / (i->first.to_string () + ".json"));
}
auto this_l (shared ());
workers.post_timed (std::chrono::steady_clock::now () + network_params.node.backup_interval, [this_l] () {
workers.post_delayed (network_params.node.backup_interval, [this_l] () {
this_l->backup_wallet ();
});
}
@@ -862,7 +862,7 @@ void nano::node::search_receivable_all ()
// Search pending
wallets.search_receivable_all ();
auto this_l (shared ());
workers.post_timed (std::chrono::steady_clock::now () + network_params.node.search_pending_interval, [this_l] () {
workers.post_delayed (network_params.node.search_pending_interval, [this_l] () {
this_l->search_receivable_all ();
});
}
@@ -987,7 +987,7 @@ void nano::node::ongoing_ledger_pruning ()
ledger_pruning (flags.block_processor_batch_size != 0 ? flags.block_processor_batch_size : 2 * 1024, bootstrap_weight_reached);
auto const ledger_pruning_interval (bootstrap_weight_reached ? config.max_pruning_age : std::min (config.max_pruning_age, std::chrono::seconds (15 * 60)));
auto this_l (shared ());
workers.post_timed (std::chrono::steady_clock::now () + ledger_pruning_interval, [this_l] () {
workers.post_delayed (ledger_pruning_interval, [this_l] () {
this_l->workers.post ([this_l] () {
this_l->ongoing_ledger_pruning ();
});
@@ -1132,7 +1132,7 @@ bool nano::node::block_confirmed_or_being_confirmed (nano::block_hash const & ha
void nano::node::ongoing_online_weight_calculation_queue ()
{
std::weak_ptr<nano::node> node_w (shared_from_this ());
workers.post_timed (std::chrono::steady_clock::now () + (std::chrono::seconds (network_params.node.weight_period)), [node_w] () {
workers.post_delayed ((std::chrono::seconds (network_params.node.weight_period)), [node_w] () {
if (auto node_l = node_w.lock ())
{
node_l->ongoing_online_weight_calculation ();
@@ -1171,7 +1171,7 @@ void nano::node::process_confirmed (nano::block_hash hash, std::shared_ptr<nano:
stats.inc (nano::stat::type::process_confirmed, nano::stat::detail::retry);

// Try again later
election_workers.post_timed (std::chrono::steady_clock::now () + network_params.node.process_confirmed_interval, [this, hash, election, iteration] () {
election_workers.post_delayed (network_params.node.process_confirmed_interval, [this, hash, election, iteration] () {
process_confirmed (hash, election, iteration + 1);
});
}
2 changes: 1 addition & 1 deletion nano/node/transport/tcp_socket.cpp
Original file line number Diff line number Diff line change
@@ -274,7 +274,7 @@ void nano::transport::tcp_socket::ongoing_checkup ()
return;
}

node_l->workers.post_timed (std::chrono::steady_clock::now () + std::chrono::seconds (node_l->network_params.network.is_dev_network () ? 1 : 5), [this_w = weak_from_this ()] () {
node_l->workers.post_delayed (std::chrono::seconds (node_l->network_params.network.is_dev_network () ? 1 : 5), [this_w = weak_from_this ()] () {
auto this_l = this_w.lock ();
if (!this_l)
{
4 changes: 2 additions & 2 deletions nano/node/wallet.cpp
Original file line number Diff line number Diff line change
@@ -1158,7 +1158,7 @@ void nano::wallet::work_ensure (nano::account const & account_a, nano::root cons

wallets.delayed_work->operator[] (account_a) = root_a;

wallets.node.workers.post_timed (std::chrono::steady_clock::now () + precache_delay, [this_l = shared_from_this (), account_a, root_a] {
wallets.node.workers.post_delayed (precache_delay, [this_l = shared_from_this (), account_a, root_a] {
auto delayed_work = this_l->wallets.delayed_work.lock ();
auto existing (delayed_work->find (account_a));
if (existing != delayed_work->end () && existing->second == root_a)
@@ -1705,7 +1705,7 @@ void nano::wallets::ongoing_compute_reps ()
auto & node_l (node);
// Representation drifts quickly on the test network but very slowly on the live network
auto compute_delay = network_params.network.is_dev_network () ? std::chrono::milliseconds (10) : (network_params.network.is_test_network () ? std::chrono::milliseconds (nano::test_scan_wallet_reps_delay ()) : std::chrono::minutes (15));
node.workers.post_timed (std::chrono::steady_clock::now () + compute_delay, [&node_l] () {
node.workers.post_delayed (compute_delay, [&node_l] () {
node_l.wallets.ongoing_compute_reps ();
});
}
Loading
Loading