Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disallow database operations on network IO threads #4772

Merged
merged 5 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions nano/lib/thread_roles.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ std::string nano::thread_role::get_string (nano::thread_role::name role)
case nano::thread_role::name::io_daemon:
thread_role_name_string = "I/O (daemon)";
break;
case nano::thread_role::name::io_ipc:
thread_role_name_string = "I/O (IPC)";
break;
case nano::thread_role::name::work:
thread_role_name_string = "Work pool";
break;
Expand Down Expand Up @@ -206,9 +209,12 @@ std::string nano::thread_role::get_string ()

void nano::thread_role::set (nano::thread_role::name role)
{
auto thread_role_name_string (get_string (role));

nano::thread_role::set_os_name (thread_role_name_string);

auto thread_role_name_string = get_string (role);
nano::thread_role::set_os_name (thread_role_name_string); // Implementation is platform specific
current_thread_role = role;
}

bool nano::thread_role::is_network_io ()
{
return nano::thread_role::get () == nano::thread_role::name::io;
}
6 changes: 6 additions & 0 deletions nano/lib/thread_roles.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ enum class name
unknown,
io,
io_daemon,
io_ipc,
work,
message_processing,
vote_processing,
Expand Down Expand Up @@ -87,4 +88,9 @@ std::string get_string ();
* Internal only, should not be called directly
*/
void set_os_name (std::string const &);

/*
* Check if the current thread is a network IO thread
*/
bool is_network_io ();
}
2 changes: 1 addition & 1 deletion nano/node/ipc/ipc_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ class socket_transport : public nano::ipc::transport
acceptor->set_option (option_keepalive);
accept ();

runner = std::make_unique<nano::thread_runner> (io_ctx, server.logger, static_cast<unsigned> (std::max (1, concurrency_a)));
runner = std::make_unique<nano::thread_runner> (io_ctx, server.logger, static_cast<unsigned> (std::max (1, concurrency_a)), nano::thread_role::name::io_ipc);
}

boost::asio::io_context & context () const
Expand Down
2 changes: 1 addition & 1 deletion nano/node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ nano::node::node (std::shared_ptr<boost::asio::io_context> io_ctx_a, std::filesy
if ((status_a.type == nano::election_status_type::active_confirmed_quorum || status_a.type == nano::election_status_type::active_confirmation_height))
{
auto node_l (shared_from_this ());
background ([node_l, block_a, account_a, amount_a, is_state_send_a, is_state_epoch_a] () {
io_ctx.post ([node_l, block_a, account_a, amount_a, is_state_send_a, is_state_epoch_a] () {
boost::property_tree::ptree event;
event.add ("account", account_a.to_account ());
event.add ("hash", block_a->hash ().to_string ());
Expand Down
6 changes: 0 additions & 6 deletions nano/node/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,6 @@ class node final : public std::enable_shared_from_this<node>

std::shared_ptr<nano::node> shared ();

template <typename T>
void background (T action_a)
{
io_ctx.post (action_a);
}

bool copy_with_compaction (std::filesystem::path const &);
void keepalive (std::string const &, uint16_t);
int store_version ();
Expand Down
36 changes: 26 additions & 10 deletions nano/node/repcrawler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,16 @@ nano::rep_crawler::rep_crawler (nano::rep_crawler_config const & config_a, nano:
network_constants{ node_a.network_params.network },
active{ node_a.active }
{
if (!node.flags.disable_rep_crawler)
{
node.observers.endpoint.add ([this] (std::shared_ptr<nano::transport::channel> const & channel) {
query (channel);
});
}
node.observers.endpoint.add ([this] (std::shared_ptr<nano::transport::channel> const & channel) {
if (!node.flags.disable_rep_crawler)
{
{
nano::lock_guard<nano::mutex> lock{ mutex };
prioritized.push_back (channel);
}
condition.notify_all ();
}
});
}

nano::rep_crawler::~rep_crawler ()
Expand Down Expand Up @@ -161,7 +165,7 @@ void nano::rep_crawler::run ()
lock.lock ();

condition.wait_for (lock, query_interval (sufficient_weight), [this, sufficient_weight] {
return stopped || query_predicate (sufficient_weight) || !responses.empty ();
return stopped || query_predicate (sufficient_weight) || !responses.empty () || !prioritized.empty ();
});

if (stopped)
Expand All @@ -180,6 +184,16 @@ void nano::rep_crawler::run ()

cleanup ();

if (!prioritized.empty ())
{
decltype (prioritized) prioritized_l;
prioritized_l.swap (prioritized);

lock.unlock ();
query (prioritized_l);
lock.lock ();
}

if (query_predicate (sufficient_weight))
{
last_query = std::chrono::steady_clock::now ();
Expand Down Expand Up @@ -230,7 +244,7 @@ void nano::rep_crawler::cleanup ()
});
}

std::vector<std::shared_ptr<nano::transport::channel>> nano::rep_crawler::prepare_crawl_targets (bool sufficient_weight) const
std::deque<std::shared_ptr<nano::transport::channel>> nano::rep_crawler::prepare_crawl_targets (bool sufficient_weight) const
{
debug_assert (!mutex.try_lock ());

Expand Down Expand Up @@ -310,7 +324,7 @@ bool nano::rep_crawler::track_rep_request (hash_root_t hash_root, std::shared_pt
return true;
}

void nano::rep_crawler::query (std::vector<std::shared_ptr<nano::transport::channel>> const & target_channels)
void nano::rep_crawler::query (std::deque<std::shared_ptr<nano::transport::channel>> const & target_channels)
{
auto maybe_hash_root = prepare_query_target ();
if (!maybe_hash_root)
Expand Down Expand Up @@ -356,7 +370,7 @@ void nano::rep_crawler::query (std::vector<std::shared_ptr<nano::transport::chan

void nano::rep_crawler::query (std::shared_ptr<nano::transport::channel> const & target_channel)
{
query (std::vector{ target_channel });
query (std::deque{ target_channel });
}

bool nano::rep_crawler::is_pr (std::shared_ptr<nano::transport::channel> const & channel) const
Expand Down Expand Up @@ -432,6 +446,7 @@ std::vector<nano::representative> nano::rep_crawler::representatives (std::size_
}

std::vector<nano::representative> result;
result.reserve (ordered.size ());
for (auto i = ordered.begin (), n = ordered.end (); i != n && result.size () < count; ++i)
{
auto const & [weight, rep] = *i;
Expand Down Expand Up @@ -483,6 +498,7 @@ nano::container_info nano::rep_crawler::container_info () const
info.put ("reps", reps);
info.put ("queries", queries);
info.put ("responses", responses);
info.put ("prioritized", prioritized);
return info;
}

Expand Down
15 changes: 9 additions & 6 deletions nano/node/repcrawler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class rep_crawler final
bool process (std::shared_ptr<nano::vote> const &, std::shared_ptr<nano::transport::channel> const &);

/** Attempt to determine if the peer manages one or more representative accounts */
void query (std::vector<std::shared_ptr<nano::transport::channel>> const & target_channels);
void query (std::deque<std::shared_ptr<nano::transport::channel>> const & target_channels);

/** Attempt to determine if the peer manages one or more representative accounts */
void query (std::shared_ptr<nano::transport::channel> const & target_channel);
Expand All @@ -75,12 +75,10 @@ class rep_crawler final
/** Get total available weight from representatives */
nano::uint128_t total_weight () const;

/** Request a list of the top \p count known representatives in descending order of weight, with at least \p weight_a voting weight, and optionally with a minimum version \p minimum_protocol_version
*/
/** Request a list of the top \p count known representatives in descending order of weight, with at least \p weight_a voting weight, and optionally with a minimum version \p minimum_protocol_version */
std::vector<representative> representatives (std::size_t count = std::numeric_limits<std::size_t>::max (), nano::uint128_t minimum_weight = 0, std::optional<decltype (nano::network_constants::protocol_version)> const & minimum_protocol_version = {}) const;

/** Request a list of the top \p count known principal representatives in descending order of weight, optionally with a minimum version \p minimum_protocol_version
*/
/** Request a list of the top \p count known principal representatives in descending order of weight, optionally with a minimum version \p minimum_protocol_version */
std::vector<representative> principal_representatives (std::size_t count = std::numeric_limits<std::size_t>::max (), std::optional<decltype (nano::network_constants::protocol_version)> const & minimum_protocol_version = {}) const;

/** Total number of representatives */
Expand All @@ -106,7 +104,8 @@ class rep_crawler final
using hash_root_t = std::pair<nano::block_hash, nano::root>;

/** Returns a list of endpoints to crawl. The total weight is passed in to avoid computing it twice. */
std::vector<std::shared_ptr<nano::transport::channel>> prepare_crawl_targets (bool sufficient_weight) const;

std::deque<std::shared_ptr<nano::transport::channel>> prepare_crawl_targets (bool sufficient_weight) const;
std::optional<hash_root_t> prepare_query_target () const;
bool track_rep_request (hash_root_t hash_root, std::shared_ptr<nano::transport::channel> const & channel);

Expand Down Expand Up @@ -173,9 +172,13 @@ class rep_crawler final

private:
static size_t constexpr max_responses{ 1024 * 4 };

using response_t = std::pair<std::shared_ptr<nano::transport::channel>, std::shared_ptr<nano::vote>>;
boost::circular_buffer<response_t> responses{ max_responses };

// Freshly established connections that should be queried asap
std::deque<std::shared_ptr<nano::transport::channel>> prioritized;

std::chrono::steady_clock::time_point last_query{};

std::atomic<bool> stopped{ false };
Expand Down
2 changes: 1 addition & 1 deletion nano/node/transport/channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ void nano::transport::channel::send (nano::message & message_a, std::function<vo
{
if (callback_a)
{
node.background ([callback_a] () {
node.io_ctx.post ([callback_a] () {
callback_a (boost::system::errc::make_error_code (boost::system::errc::not_supported), 0);
});
}
Expand Down
2 changes: 1 addition & 1 deletion nano/node/transport/fake.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ void nano::transport::fake::channel::send_buffer (nano::shared_const_buffer cons
auto size = buffer_a.size ();
if (callback_a)
{
node.background ([callback_a, size] () {
node.io_ctx.post ([callback_a, size] () {
callback_a (boost::system::errc::make_error_code (boost::system::errc::success), size);
});
}
Expand Down
2 changes: 1 addition & 1 deletion nano/node/transport/inproc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ void nano::transport::inproc::channel::send_buffer (nano::shared_const_buffer co

if (callback_a)
{
node.background ([callback_l = std::move (callback_a), buffer_size = buffer_a.size ()] () {
node.io_ctx.post ([callback_l = std::move (callback_a), buffer_size = buffer_a.size ()] () {
callback_l (boost::system::errc::make_error_code (boost::system::errc::success), buffer_size);
});
}
Expand Down
2 changes: 1 addition & 1 deletion nano/node/transport/tcp_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ void nano::transport::tcp_channel::send_buffer (nano::shared_const_buffer const
}
else if (callback_a)
{
node.background ([callback_a] () {
node.io_ctx.post ([callback_a] () {
callback_a (boost::system::errc::make_error_code (boost::system::errc::not_supported), 0);
});
}
Expand Down
4 changes: 2 additions & 2 deletions nano/node/transport/tcp_socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ void nano::transport::tcp_socket::async_write (nano::shared_const_buffer const &
{
if (callback_a)
{
node_l->background ([callback = std::move (callback_a)] () {
node_l->io_ctx.post ([callback = std::move (callback_a)] () {
callback (boost::system::errc::make_error_code (boost::system::errc::not_supported), 0);
});
}
Expand All @@ -161,7 +161,7 @@ void nano::transport::tcp_socket::async_write (nano::shared_const_buffer const &
{
if (callback_a)
{
node_l->background ([callback = std::move (callback_a)] () {
node_l->io_ctx.post ([callback = std::move (callback_a)] () {
callback (boost::system::errc::make_error_code (boost::system::errc::not_supported), 0);
});
}
Expand Down
4 changes: 2 additions & 2 deletions nano/qt/qt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -714,7 +714,7 @@ nano_qt::block_viewer::block_viewer (nano_qt::wallet & wallet_a) :
if (this->wallet.node.ledger.any.block_exists (transaction, block))
{
rebroadcast->setEnabled (false);
this->wallet.node.background ([this, block] () {
this->wallet.node.workers.post ([this, block] () {
rebroadcast_action (block);
});
}
Expand Down Expand Up @@ -1194,7 +1194,7 @@ void nano_qt::wallet::start ()
if (this_l->wallet_m->store.valid_password (transaction))
{
this_l->send_blocks_send->setEnabled (false);
this_l->node.background ([this_w, account_l, actual] () {
this_l->node.workers.post ([this_w, account_l, actual] () {
if (auto this_l = this_w.lock ())
{
this_l->wallet_m->send_async (this_l->account, account_l, actual, [this_w] (std::shared_ptr<nano::block> const & block_a) {
Expand Down
1 change: 1 addition & 0 deletions nano/store/transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
nano::store::transaction_impl::transaction_impl (nano::id_dispenser::id_t const store_id_a) :
store_id{ store_id_a }
{
debug_assert (!nano::thread_role::is_network_io (), "database operations are not allowed to run on network IO threads");
}

/*
Expand Down
Loading