Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize cemented callbacks #4642

Merged
6 changes: 3 additions & 3 deletions nano/core_test/active_elections.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,7 @@ TEST (active_elections, dropped_cleanup)
ASSERT_FALSE (node.network.publish_filter.apply (block_bytes.data (), block_bytes.size ()));

// An election was recently dropped
ASSERT_EQ (1, node.stats.count (nano::stat::type::active_dropped, nano::stat::detail::manual));
ASSERT_EQ (1, node.stats.count (nano::stat::type::active_elections_dropped, nano::stat::detail::manual));

// Block cleared from active
ASSERT_FALSE (node.vote_router.active (hash));
Expand All @@ -684,7 +684,7 @@ TEST (active_elections, dropped_cleanup)
ASSERT_TRUE (node.network.publish_filter.apply (block_bytes.data (), block_bytes.size ()));

// Not dropped
ASSERT_EQ (1, node.stats.count (nano::stat::type::active_dropped, nano::stat::detail::manual));
ASSERT_EQ (1, node.stats.count (nano::stat::type::active_elections_dropped, nano::stat::detail::manual));

// Block cleared from active
ASSERT_FALSE (node.vote_router.active (hash));
Expand Down Expand Up @@ -1387,7 +1387,7 @@ TEST (active_elections, limit_vote_hinted_elections)
ASSERT_TIMELY (5s, nano::test::active (node, { open1 }));

// Ensure there was no overflow of elections
ASSERT_EQ (0, node.stats.count (nano::stat::type::active_dropped, nano::stat::detail::priority));
ASSERT_EQ (0, node.stats.count (nano::stat::type::active_elections_dropped, nano::stat::detail::priority));
}

/*
Expand Down
8 changes: 4 additions & 4 deletions nano/core_test/confirming_set.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ using namespace std::chrono_literals;
TEST (confirming_set, construction)
{
auto ctx = nano::test::context::ledger_empty ();
nano::confirming_set confirming_set (ctx.ledger ());
nano::confirming_set confirming_set (ctx.ledger (), ctx.stats ());
}

TEST (confirming_set, add_exists)
{
auto ctx = nano::test::context::ledger_send_receive ();
nano::confirming_set confirming_set (ctx.ledger ());
nano::confirming_set confirming_set (ctx.ledger (), ctx.stats ());
auto send = ctx.blocks ()[0];
confirming_set.add (send->hash ());
ASSERT_TRUE (confirming_set.exists (send->hash ()));
Expand All @@ -34,7 +34,7 @@ TEST (confirming_set, add_exists)
TEST (confirming_set, process_one)
{
auto ctx = nano::test::context::ledger_send_receive ();
nano::confirming_set confirming_set (ctx.ledger ());
nano::confirming_set confirming_set (ctx.ledger (), ctx.stats ());
std::atomic<int> count = 0;
std::mutex mutex;
std::condition_variable condition;
Expand All @@ -50,7 +50,7 @@ TEST (confirming_set, process_one)
TEST (confirming_set, process_multiple)
{
auto ctx = nano::test::context::ledger_send_receive ();
nano::confirming_set confirming_set (ctx.ledger ());
nano::confirming_set confirming_set (ctx.ledger (), ctx.stats ());
std::atomic<int> count = 0;
std::mutex mutex;
std::condition_variable condition;
Expand Down
1 change: 1 addition & 0 deletions nano/lib/logging_enums.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ enum class detail
// active_elections
active_started,
active_stopped,
active_cemented,

// election
election_confirmed,
Expand Down
37 changes: 33 additions & 4 deletions nano/lib/stats_enums.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ enum class type
socket,
confirmation_height,
confirmation_observer,
confirming_set,
drop,
aggregator,
requests,
Expand All @@ -60,10 +61,12 @@ enum class type
bootstrap_server_response,
active,
active_elections,
active_started,
active_confirmed,
active_dropped,
active_timeout,
active_elections_started,
active_elections_stopped,
active_elections_confirmed,
active_elections_dropped,
active_elections_timeout,
active_elections_cemented,
backlog,
unchecked,
election_scheduler,
Expand Down Expand Up @@ -114,6 +117,11 @@ enum class detail
rebroadcast,
queue_overflow,
triggered,
notify,
duplicate,
confirmed,
unconfirmed,
cemented,

// processing queue
queue,
Expand Down Expand Up @@ -373,6 +381,10 @@ enum class detail
insert,
insert_failed,

// active_elections
started,
stopped,

// unchecked
put,
satisfied,
Expand Down Expand Up @@ -440,6 +452,23 @@ enum class detail
tier_2,
tier_3,

// confirming_set
notify_cemented,
notify_already_cemented,
already_cemented,

// election_state
passive,
active,
expired_confirmed,
expired_unconfirmed,

// election_status_type
ongoing,
active_confirmed_quorum,
active_confirmation_height,
inactive_confirmation_height,

_last // Must be the last enum
};

Expand Down
4 changes: 2 additions & 2 deletions nano/lib/thread_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,9 @@ void nano::thread_pool::set_thread_names (nano::thread_role::name thread_name)
thread_names_latch.wait ();
}

std::unique_ptr<nano::container_info_component> nano::collect_container_info (thread_pool & thread_pool, std::string const & name)
std::unique_ptr<nano::container_info_component> nano::thread_pool::collect_container_info (std::string const & name) const
{
auto composite = std::make_unique<container_info_composite> (name);
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "count", thread_pool.num_queued_tasks (), sizeof (std::function<void ()>) }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "count", num_queued_tasks (), sizeof (std::function<void ()>) }));
return composite;
}
6 changes: 3 additions & 3 deletions nano/lib/thread_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ namespace nano
class thread_pool final
{
public:
explicit thread_pool (unsigned, nano::thread_role::name);
explicit thread_pool (unsigned num_threads, nano::thread_role::name);
~thread_pool ();

/** This will run when there is an available thread for execution */
Expand All @@ -37,6 +37,8 @@ class thread_pool final
/** Returns the number of tasks which are awaiting execution by the thread pool **/
uint64_t num_queued_tasks () const;

std::unique_ptr<nano::container_info_component> collect_container_info (std::string const & name) const;

private:
nano::mutex mutex;
std::atomic<bool> stopped{ false };
Expand All @@ -48,6 +50,4 @@ class thread_pool final
std::latch thread_names_latch;
void set_thread_names (nano::thread_role::name thread_name);
};

std::unique_ptr<nano::container_info_component> collect_container_info (thread_pool & thread_pool, std::string const & name);
} // namespace nano
9 changes: 9 additions & 0 deletions nano/lib/thread_roles.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,21 @@ std::string nano::thread_role::get_string (nano::thread_role::name role)
case nano::thread_role::name::confirmation_height_processing:
thread_role_name_string = "Conf height";
break;
case nano::thread_role::name::confirmation_height_notifications:
thread_role_name_string = "Conf notif";
break;
case nano::thread_role::name::worker:
thread_role_name_string = "Worker";
break;
case nano::thread_role::name::bootstrap_worker:
thread_role_name_string = "Bootstrap work";
break;
case nano::thread_role::name::wallet_worker:
thread_role_name_string = "Wallet work";
break;
case nano::thread_role::name::election_worker:
thread_role_name_string = "Election work";
break;
case nano::thread_role::name::request_aggregator:
thread_role_name_string = "Req aggregator";
break;
Expand Down
3 changes: 3 additions & 0 deletions nano/lib/thread_roles.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,11 @@ enum class name
rpc_request_processor,
rpc_process_container,
confirmation_height_processing,
confirmation_height_notifications,
worker,
bootstrap_worker,
wallet_worker,
election_worker,
request_aggregator,
state_block_signature_verification,
epoch_upgrader,
Expand Down
95 changes: 68 additions & 27 deletions nano/node/active_elections.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include <nano/lib/blocks.hpp>
#include <nano/lib/enum_util.hpp>
#include <nano/lib/numbers.hpp>
#include <nano/lib/threading.hpp>
#include <nano/node/active_elections.hpp>
Expand All @@ -18,25 +19,31 @@

using namespace std::chrono;

nano::active_elections::active_elections (nano::node & node_a, nano::confirming_set & confirming_set, nano::block_processor & block_processor_a) :
nano::active_elections::active_elections (nano::node & node_a, nano::confirming_set & confirming_set_a, nano::block_processor & block_processor_a) :
config{ node_a.config.active_elections },
node{ node_a },
confirming_set{ confirming_set },
confirming_set{ confirming_set_a },
block_processor{ block_processor_a },
recently_confirmed{ config.confirmation_cache },
recently_cemented{ config.confirmation_history_size },
election_time_to_live{ node_a.network_params.network.is_dev_network () ? 0s : 2s }
{
count_by_behavior.fill (0); // Zero initialize array

// Register a callback which will get called after a block is cemented
confirming_set.cemented_observers.add ([this] (std::shared_ptr<nano::block> const & callback_block_a) {
this->block_cemented_callback (callback_block_a);
});
confirming_set.batch_cemented.add ([this] (nano::confirming_set::cemented_notification const & notification) {
{
auto transaction = node.ledger.tx_begin_read ();
for (auto const & [block, confirmation_root] : notification.cemented)
{
transaction.refresh_if_needed ();

// Register a callback which will get called if a block is already cemented
confirming_set.block_already_cemented_observers.add ([this] (nano::block_hash const & hash_a) {
this->block_already_cemented_callback (hash_a);
block_cemented_callback (transaction, block, confirmation_root);
}
}
for (auto const & hash : notification.already_cemented)
{
block_already_cemented_callback (hash);
}
});

// Notify elections about alternative (forked) blocks
Expand Down Expand Up @@ -84,9 +91,10 @@ void nano::active_elections::stop ()
clear ();
}

void nano::active_elections::block_cemented_callback (std::shared_ptr<nano::block> const & block)
void nano::active_elections::block_cemented_callback (nano::secure::transaction const & transaction, std::shared_ptr<nano::block> const & block, nano::block_hash const & confirmation_root)
{
debug_assert (node.block_confirmed (block->hash ()));

if (auto election_l = election (block->qualified_root ()))
{
election_l->try_confirm (block->hash ());
Expand All @@ -100,7 +108,7 @@ void nano::active_elections::block_cemented_callback (std::shared_ptr<nano::bloc
status = election->get_status ();
votes = election->votes_with_weight ();
}
if (confirming_set.exists (block->hash ()))
if (block->hash () == confirmation_root)
{
status.type = nano::election_status_type::active_confirmed_quorum;
}
Expand All @@ -113,8 +121,14 @@ void nano::active_elections::block_cemented_callback (std::shared_ptr<nano::bloc
status.type = nano::election_status_type::inactive_confirmation_height;
}
recently_cemented.put (status);
auto transaction = node.ledger.tx_begin_read ();

node.stats.inc (nano::stat::type::active_elections, nano::stat::detail::cemented);
node.stats.inc (nano::stat::type::active_elections_cemented, to_stat_detail (status.type));

node.logger.trace (nano::log::type::active_elections, nano::log::detail::active_cemented, nano::log::arg{ "election", election });

notify_observers (transaction, status, votes);

bool cemented_bootstrap_count_reached = node.ledger.cemented_count () >= node.ledger.bootstrap_weight_max_blocks;
bool was_active = status.type == nano::election_status_type::active_confirmed_quorum || status.type == nano::election_status_type::active_confirmation_height;

Expand Down Expand Up @@ -296,7 +310,11 @@ void nano::active_elections::cleanup_election (nano::unique_lock<nano::mutex> &

roots.get<tag_root> ().erase (roots.get<tag_root> ().find (election->qualified_root));

node.stats.inc (completion_type (*election), to_stat_detail (election->behavior ()));
node.stats.inc (nano::stat::type::active_elections, nano::stat::detail::stopped);
node.stats.inc (nano::stat::type::active_elections, election->confirmed () ? nano::stat::detail::confirmed : nano::stat::detail::unconfirmed);
node.stats.inc (nano::stat::type::active_elections_stopped, to_stat_detail (election->state ()));
node.stats.inc (to_stat_type (election->state ()), to_stat_detail (election->behavior ()));

node.logger.trace (nano::log::type::active_elections, nano::log::detail::active_stopped, nano::log::arg{ "election", election });

node.logger.debug (nano::log::type::active_elections, "Erased election for blocks: {} (behavior: {}, state: {})",
Expand Down Expand Up @@ -326,19 +344,6 @@ void nano::active_elections::cleanup_election (nano::unique_lock<nano::mutex> &
}
}

nano::stat::type nano::active_elections::completion_type (nano::election const & election) const
{
if (election.confirmed ())
{
return nano::stat::type::active_confirmed;
}
if (election.failed ())
{
return nano::stat::type::active_timeout;
}
return nano::stat::type::active_dropped;
}

std::vector<std::shared_ptr<nano::election>> nano::active_elections::list_active (std::size_t max_a)
{
nano::lock_guard<nano::mutex> guard{ mutex };
Expand Down Expand Up @@ -415,7 +420,9 @@ nano::election_insertion_result nano::active_elections::insert (std::shared_ptr<
debug_assert (count_by_behavior[result.election->behavior ()] >= 0);
count_by_behavior[result.election->behavior ()]++;

node.stats.inc (nano::stat::type::active_started, to_stat_detail (election_behavior_a));
node.stats.inc (nano::stat::type::active_elections, nano::stat::detail::started);
node.stats.inc (nano::stat::type::active_elections_started, to_stat_detail (election_behavior_a));

node.logger.trace (nano::log::type::active_elections, nano::log::detail::active_started,
nano::log::arg{ "behavior", election_behavior_a },
nano::log::arg{ "election", result.election });
Expand Down Expand Up @@ -592,3 +599,37 @@ nano::error nano::active_elections_config::deserialize (nano::tomlconfig & toml)

return toml.get_error ();
}

/*
*
*/

nano::stat::type nano::to_stat_type (nano::election_state state)
{
switch (state)
{
case election_state::passive:
case election_state::active:
return nano::stat::type::active_elections_dropped;
break;
case election_state::confirmed:
case election_state::expired_confirmed:
return nano::stat::type::active_elections_confirmed;
break;
case election_state::expired_unconfirmed:
return nano::stat::type::active_elections_timeout;
break;
}
debug_assert (false);
return {};
}

nano::stat::detail nano::to_stat_detail (nano::election_state state)
{
return nano::enum_util::cast<nano::stat::detail> (state);
}

nano::stat::detail nano::to_stat_detail (nano::election_status_type type)
{
return nano::enum_util::cast<nano::stat::detail> (type);
}
Loading
Loading