Skip to content

Commit

Permalink
Merge pull request #4772 from pwojcikdev/prevent-db-on-io-threads
Browse files Browse the repository at this point in the history
Disallow database operations on network IO threads
  • Loading branch information
pwojcikdev authored Nov 27, 2024
2 parents 7855c82 + b07d3db commit 308f394
Show file tree
Hide file tree
Showing 14 changed files with 62 additions and 36 deletions.
14 changes: 10 additions & 4 deletions nano/lib/thread_roles.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ std::string nano::thread_role::get_string (nano::thread_role::name role)
case nano::thread_role::name::io_daemon:
thread_role_name_string = "I/O (daemon)";
break;
case nano::thread_role::name::io_ipc:
thread_role_name_string = "I/O (IPC)";
break;
case nano::thread_role::name::work:
thread_role_name_string = "Work pool";
break;
Expand Down Expand Up @@ -206,9 +209,12 @@ std::string nano::thread_role::get_string ()

void nano::thread_role::set (nano::thread_role::name role)
{
auto thread_role_name_string (get_string (role));

nano::thread_role::set_os_name (thread_role_name_string);

auto thread_role_name_string = get_string (role);
nano::thread_role::set_os_name (thread_role_name_string); // Implementation is platform specific
current_thread_role = role;
}

bool nano::thread_role::is_network_io ()
{
return nano::thread_role::get () == nano::thread_role::name::io;
}
6 changes: 6 additions & 0 deletions nano/lib/thread_roles.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ enum class name
unknown,
io,
io_daemon,
io_ipc,
work,
message_processing,
vote_processing,
Expand Down Expand Up @@ -87,4 +88,9 @@ std::string get_string ();
* Internal only, should not be called directly
*/
void set_os_name (std::string const &);

/*
* Check if the current thread is a network IO thread
*/
bool is_network_io ();
}
2 changes: 1 addition & 1 deletion nano/node/ipc/ipc_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ class socket_transport : public nano::ipc::transport
acceptor->set_option (option_keepalive);
accept ();

runner = std::make_unique<nano::thread_runner> (io_ctx, server.logger, static_cast<unsigned> (std::max (1, concurrency_a)));
runner = std::make_unique<nano::thread_runner> (io_ctx, server.logger, static_cast<unsigned> (std::max (1, concurrency_a)), nano::thread_role::name::io_ipc);
}

boost::asio::io_context & context () const
Expand Down
2 changes: 1 addition & 1 deletion nano/node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ nano::node::node (std::shared_ptr<boost::asio::io_context> io_ctx_a, std::filesy
if ((status_a.type == nano::election_status_type::active_confirmed_quorum || status_a.type == nano::election_status_type::active_confirmation_height))
{
auto node_l (shared_from_this ());
background ([node_l, block_a, account_a, amount_a, is_state_send_a, is_state_epoch_a] () {
io_ctx.post ([node_l, block_a, account_a, amount_a, is_state_send_a, is_state_epoch_a] () {
boost::property_tree::ptree event;
event.add ("account", account_a.to_account ());
event.add ("hash", block_a->hash ().to_string ());
Expand Down
6 changes: 0 additions & 6 deletions nano/node/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,6 @@ class node final : public std::enable_shared_from_this<node>

std::shared_ptr<nano::node> shared ();

template <typename T>
void background (T action_a)
{
io_ctx.post (action_a);
}

bool copy_with_compaction (std::filesystem::path const &);
void keepalive (std::string const &, uint16_t);
int store_version ();
Expand Down
36 changes: 26 additions & 10 deletions nano/node/repcrawler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,16 @@ nano::rep_crawler::rep_crawler (nano::rep_crawler_config const & config_a, nano:
network_constants{ node_a.network_params.network },
active{ node_a.active }
{
if (!node.flags.disable_rep_crawler)
{
node.observers.endpoint.add ([this] (std::shared_ptr<nano::transport::channel> const & channel) {
query (channel);
});
}
node.observers.endpoint.add ([this] (std::shared_ptr<nano::transport::channel> const & channel) {
if (!node.flags.disable_rep_crawler)
{
{
nano::lock_guard<nano::mutex> lock{ mutex };
prioritized.push_back (channel);
}
condition.notify_all ();
}
});
}

nano::rep_crawler::~rep_crawler ()
Expand Down Expand Up @@ -161,7 +165,7 @@ void nano::rep_crawler::run ()
lock.lock ();

condition.wait_for (lock, query_interval (sufficient_weight), [this, sufficient_weight] {
return stopped || query_predicate (sufficient_weight) || !responses.empty ();
return stopped || query_predicate (sufficient_weight) || !responses.empty () || !prioritized.empty ();
});

if (stopped)
Expand All @@ -180,6 +184,16 @@ void nano::rep_crawler::run ()

cleanup ();

if (!prioritized.empty ())
{
decltype (prioritized) prioritized_l;
prioritized_l.swap (prioritized);

lock.unlock ();
query (prioritized_l);
lock.lock ();
}

if (query_predicate (sufficient_weight))
{
last_query = std::chrono::steady_clock::now ();
Expand Down Expand Up @@ -230,7 +244,7 @@ void nano::rep_crawler::cleanup ()
});
}

std::vector<std::shared_ptr<nano::transport::channel>> nano::rep_crawler::prepare_crawl_targets (bool sufficient_weight) const
std::deque<std::shared_ptr<nano::transport::channel>> nano::rep_crawler::prepare_crawl_targets (bool sufficient_weight) const
{
debug_assert (!mutex.try_lock ());

Expand Down Expand Up @@ -310,7 +324,7 @@ bool nano::rep_crawler::track_rep_request (hash_root_t hash_root, std::shared_pt
return true;
}

void nano::rep_crawler::query (std::vector<std::shared_ptr<nano::transport::channel>> const & target_channels)
void nano::rep_crawler::query (std::deque<std::shared_ptr<nano::transport::channel>> const & target_channels)
{
auto maybe_hash_root = prepare_query_target ();
if (!maybe_hash_root)
Expand Down Expand Up @@ -356,7 +370,7 @@ void nano::rep_crawler::query (std::vector<std::shared_ptr<nano::transport::chan

void nano::rep_crawler::query (std::shared_ptr<nano::transport::channel> const & target_channel)
{
query (std::vector{ target_channel });
query (std::deque{ target_channel });
}

bool nano::rep_crawler::is_pr (std::shared_ptr<nano::transport::channel> const & channel) const
Expand Down Expand Up @@ -432,6 +446,7 @@ std::vector<nano::representative> nano::rep_crawler::representatives (std::size_
}

std::vector<nano::representative> result;
result.reserve (ordered.size ());
for (auto i = ordered.begin (), n = ordered.end (); i != n && result.size () < count; ++i)
{
auto const & [weight, rep] = *i;
Expand Down Expand Up @@ -483,6 +498,7 @@ nano::container_info nano::rep_crawler::container_info () const
info.put ("reps", reps);
info.put ("queries", queries);
info.put ("responses", responses);
info.put ("prioritized", prioritized);
return info;
}

Expand Down
15 changes: 9 additions & 6 deletions nano/node/repcrawler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class rep_crawler final
bool process (std::shared_ptr<nano::vote> const &, std::shared_ptr<nano::transport::channel> const &);

/** Attempt to determine if the peer manages one or more representative accounts */
void query (std::vector<std::shared_ptr<nano::transport::channel>> const & target_channels);
void query (std::deque<std::shared_ptr<nano::transport::channel>> const & target_channels);

/** Attempt to determine if the peer manages one or more representative accounts */
void query (std::shared_ptr<nano::transport::channel> const & target_channel);
Expand All @@ -75,12 +75,10 @@ class rep_crawler final
/** Get total available weight from representatives */
nano::uint128_t total_weight () const;

/** Request a list of the top \p count known representatives in descending order of weight, with at least \p weight_a voting weight, and optionally with a minimum version \p minimum_protocol_version
*/
/** Request a list of the top \p count known representatives in descending order of weight, with at least \p weight_a voting weight, and optionally with a minimum version \p minimum_protocol_version */
std::vector<representative> representatives (std::size_t count = std::numeric_limits<std::size_t>::max (), nano::uint128_t minimum_weight = 0, std::optional<decltype (nano::network_constants::protocol_version)> const & minimum_protocol_version = {}) const;

/** Request a list of the top \p count known principal representatives in descending order of weight, optionally with a minimum version \p minimum_protocol_version
*/
/** Request a list of the top \p count known principal representatives in descending order of weight, optionally with a minimum version \p minimum_protocol_version */
std::vector<representative> principal_representatives (std::size_t count = std::numeric_limits<std::size_t>::max (), std::optional<decltype (nano::network_constants::protocol_version)> const & minimum_protocol_version = {}) const;

/** Total number of representatives */
Expand All @@ -106,7 +104,8 @@ class rep_crawler final
using hash_root_t = std::pair<nano::block_hash, nano::root>;

/** Returns a list of endpoints to crawl. The total weight is passed in to avoid computing it twice. */
std::vector<std::shared_ptr<nano::transport::channel>> prepare_crawl_targets (bool sufficient_weight) const;

std::deque<std::shared_ptr<nano::transport::channel>> prepare_crawl_targets (bool sufficient_weight) const;
std::optional<hash_root_t> prepare_query_target () const;
bool track_rep_request (hash_root_t hash_root, std::shared_ptr<nano::transport::channel> const & channel);

Expand Down Expand Up @@ -173,9 +172,13 @@ class rep_crawler final

private:
static size_t constexpr max_responses{ 1024 * 4 };

using response_t = std::pair<std::shared_ptr<nano::transport::channel>, std::shared_ptr<nano::vote>>;
boost::circular_buffer<response_t> responses{ max_responses };

// Freshly established connections that should be queried asap
std::deque<std::shared_ptr<nano::transport::channel>> prioritized;

std::chrono::steady_clock::time_point last_query{};

std::atomic<bool> stopped{ false };
Expand Down
2 changes: 1 addition & 1 deletion nano/node/transport/channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ void nano::transport::channel::send (nano::message & message_a, std::function<vo
{
if (callback_a)
{
node.background ([callback_a] () {
node.io_ctx.post ([callback_a] () {
callback_a (boost::system::errc::make_error_code (boost::system::errc::not_supported), 0);
});
}
Expand Down
2 changes: 1 addition & 1 deletion nano/node/transport/fake.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ void nano::transport::fake::channel::send_buffer (nano::shared_const_buffer cons
auto size = buffer_a.size ();
if (callback_a)
{
node.background ([callback_a, size] () {
node.io_ctx.post ([callback_a, size] () {
callback_a (boost::system::errc::make_error_code (boost::system::errc::success), size);
});
}
Expand Down
2 changes: 1 addition & 1 deletion nano/node/transport/inproc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ void nano::transport::inproc::channel::send_buffer (nano::shared_const_buffer co

if (callback_a)
{
node.background ([callback_l = std::move (callback_a), buffer_size = buffer_a.size ()] () {
node.io_ctx.post ([callback_l = std::move (callback_a), buffer_size = buffer_a.size ()] () {
callback_l (boost::system::errc::make_error_code (boost::system::errc::success), buffer_size);
});
}
Expand Down
2 changes: 1 addition & 1 deletion nano/node/transport/tcp_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ void nano::transport::tcp_channel::send_buffer (nano::shared_const_buffer const
}
else if (callback_a)
{
node.background ([callback_a] () {
node.io_ctx.post ([callback_a] () {
callback_a (boost::system::errc::make_error_code (boost::system::errc::not_supported), 0);
});
}
Expand Down
4 changes: 2 additions & 2 deletions nano/node/transport/tcp_socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ void nano::transport::tcp_socket::async_write (nano::shared_const_buffer const &
{
if (callback_a)
{
node_l->background ([callback = std::move (callback_a)] () {
node_l->io_ctx.post ([callback = std::move (callback_a)] () {
callback (boost::system::errc::make_error_code (boost::system::errc::not_supported), 0);
});
}
Expand All @@ -161,7 +161,7 @@ void nano::transport::tcp_socket::async_write (nano::shared_const_buffer const &
{
if (callback_a)
{
node_l->background ([callback = std::move (callback_a)] () {
node_l->io_ctx.post ([callback = std::move (callback_a)] () {
callback (boost::system::errc::make_error_code (boost::system::errc::not_supported), 0);
});
}
Expand Down
4 changes: 2 additions & 2 deletions nano/qt/qt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -714,7 +714,7 @@ nano_qt::block_viewer::block_viewer (nano_qt::wallet & wallet_a) :
if (this->wallet.node.ledger.any.block_exists (transaction, block))
{
rebroadcast->setEnabled (false);
this->wallet.node.background ([this, block] () {
this->wallet.node.workers.post ([this, block] () {
rebroadcast_action (block);
});
}
Expand Down Expand Up @@ -1194,7 +1194,7 @@ void nano_qt::wallet::start ()
if (this_l->wallet_m->store.valid_password (transaction))
{
this_l->send_blocks_send->setEnabled (false);
this_l->node.background ([this_w, account_l, actual] () {
this_l->node.workers.post ([this_w, account_l, actual] () {
if (auto this_l = this_w.lock ())
{
this_l->wallet_m->send_async (this_l->account, account_l, actual, [this_w] (std::shared_ptr<nano::block> const & block_a) {
Expand Down
1 change: 1 addition & 0 deletions nano/store/transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
nano::store::transaction_impl::transaction_impl (nano::id_dispenser::id_t const store_id_a) :
store_id{ store_id_a }
{
debug_assert (!nano::thread_role::is_network_io (), "database operations are not allowed to run on network IO threads");
}

/*
Expand Down

0 comments on commit 308f394

Please sign in to comment.