Skip to content

Commit

Permalink
Deferred confirming set
Browse files Browse the repository at this point in the history
  • Loading branch information
pwojcikdev committed Oct 20, 2024
1 parent 5c31172 commit 7ef8130
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 8 deletions.
3 changes: 3 additions & 0 deletions nano/lib/stats_enums.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ enum class detail
empty,
done,
retry,
requeued,
evicted,

// processing queue
queue,
Expand Down Expand Up @@ -506,6 +508,7 @@ enum class detail
cementing,
cemented_hash,
cementing_failed,
deferred_failed,

// election_state
passive,
Expand Down
99 changes: 93 additions & 6 deletions nano/node/confirming_set.cpp
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
#include <nano/lib/blocks.hpp>
#include <nano/lib/logging.hpp>
#include <nano/lib/thread_roles.hpp>
#include <nano/node/blockprocessor.hpp>
#include <nano/node/confirming_set.hpp>
#include <nano/secure/ledger.hpp>
#include <nano/secure/ledger_set_any.hpp>
#include <nano/secure/ledger_set_confirmed.hpp>
#include <nano/store/component.hpp>
#include <nano/store/write_queue.hpp>

nano::confirming_set::confirming_set (confirming_set_config const & config_a, nano::ledger & ledger_a, nano::stats & stats_a, nano::logger & logger_a) :
nano::confirming_set::confirming_set (confirming_set_config const & config_a, nano::ledger & ledger_a, nano::block_processor & block_processor_a, nano::stats & stats_a, nano::logger & logger_a) :
config{ config_a },
ledger{ ledger_a },
block_processor{ block_processor_a },
stats{ stats_a },
logger{ logger_a },
notification_workers{ 1, nano::thread_role::name::confirmation_height_notifications }
Expand All @@ -20,6 +23,28 @@ nano::confirming_set::confirming_set (confirming_set_config const & config_a, na
cemented_observers.notify (context.block);
}
});

// Requeue blocks that failed to cement immediately due to missing ledger blocks
block_processor.batch_processed.add ([this] (auto const & batch) {
bool should_notify = false;
{
std::lock_guard lock{ mutex };
for (auto const & [result, context] : batch)
{
if (auto it = deferred.get<tag_hash> ().find (context.block->hash ()); it != deferred.get<tag_hash> ().end ())
{
stats.inc (nano::stat::type::confirming_set, nano::stat::detail::requeued);
set.push_back (*it);
deferred.get<tag_hash> ().erase (it);
should_notify = true;
}
}
}
if (should_notify)
{
condition.notify_all ();
}
});
}

nano::confirming_set::~confirming_set ()
Expand Down Expand Up @@ -78,13 +103,14 @@ void nano::confirming_set::stop ()
bool nano::confirming_set::contains (nano::block_hash const & hash) const
{
std::lock_guard lock{ mutex };
return set.get<tag_hash> ().contains (hash) || current.contains (hash);
return set.get<tag_hash> ().contains (hash) || deferred.get<tag_hash> ().contains (hash) || current.contains (hash);
}

std::size_t nano::confirming_set::size () const
{
// Do not report deferred blocks, as they are not currently being processed (and might never be requeued)
std::lock_guard lock{ mutex };
return set.size ();
return set.size () + current.size ();
}

void nano::confirming_set::run ()
Expand All @@ -94,6 +120,9 @@ void nano::confirming_set::run ()
{
stats.inc (nano::stat::type::confirming_set, nano::stat::detail::loop);

cleanup (lock);
debug_assert (lock.owns_lock ());

if (!set.empty ())
{
run_batch (lock);
Expand Down Expand Up @@ -121,6 +150,46 @@ auto nano::confirming_set::next_batch (size_t max_count) -> std::deque<entry>
return results;
}

void nano::confirming_set::cleanup (std::unique_lock<std::mutex> & lock)
{
debug_assert (lock.owns_lock ());
debug_assert (!mutex.try_lock ());

auto const cutoff = std::chrono::steady_clock::now () - config.deferred_age_cutoff;
std::deque<entry> evicted;

auto should_evict = [&] (entry const & entry) {
return entry.timestamp < cutoff;
};

// Iterate in sequenced (insertion) order
for (auto it = deferred.begin (), end = deferred.end (); it != end;)
{
if (should_evict (*it) || deferred.size () > config.max_deferred)
{
stats.inc (nano::stat::type::confirming_set, nano::stat::detail::evicted);
debug_assert (ledger.any.block_exists (ledger.tx_begin_read (), it->hash));
evicted.push_back (*it);
it = deferred.erase (it);
}
else
{
break; // Entries are sequenced, so we can stop here and avoid unnecessary iteration
}
}

// Notify about evicted blocks so that other components can perform necessary cleanup
if (!evicted.empty ())
{
lock.unlock ();
for (auto const & entry : evicted)
{
cementing_failed.notify (entry.hash);
}
lock.lock ();
}
}

void nano::confirming_set::run_batch (std::unique_lock<std::mutex> & lock)
{
debug_assert (lock.owns_lock ());
Expand All @@ -134,9 +203,9 @@ void nano::confirming_set::run_batch (std::unique_lock<std::mutex> & lock)

// Keep track of the blocks we're currently cementing, so that the .contains (...) check is accurate
debug_assert (current.empty ());
for (auto const & [hash, election] : batch)
for (auto const & entry : batch)
{
current.insert (hash);
current.insert (entry.hash);
}

lock.unlock ();
Expand Down Expand Up @@ -175,10 +244,14 @@ void nano::confirming_set::run_batch (std::unique_lock<std::mutex> & lock)
}
};

std::deque<entry> failed;
{
auto transaction = ledger.tx_begin_write (nano::store::writer::confirmation_height);
for (auto const & [hash, election] : batch)
for (auto const & entry : batch)
{
auto const & hash = entry.hash;
auto const & election = entry.election;

size_t cemented_count = 0;
bool success = false;
do
Expand Down Expand Up @@ -233,15 +306,28 @@ void nano::confirming_set::run_batch (std::unique_lock<std::mutex> & lock)
{
stats.inc (nano::stat::type::confirming_set, nano::stat::detail::cementing_failed);
logger.debug (nano::log::type::confirming_set, "Failed to cement block: {}", hash.to_string ());
failed.push_back (entry);
}
}

lock.lock ();

// Requeue failed blocks for processing later
// Add them to the deferred set while still holding the exclusive database write transaction to avoid block processor races
for (auto const & entry : failed)
{
deferred.push_back (entry);
}

lock.unlock ();
}

notify ();
release_assert (cemented.empty ());

already_cemented.notify (already);

// Clear current set only after the transaction is committed
lock.lock ();
current.clear ();
lock.unlock ();
Expand All @@ -253,6 +339,7 @@ nano::container_info nano::confirming_set::container_info () const

nano::container_info info;
info.put ("set", set);
info.put ("deferred", deferred);
info.add ("notification_workers", notification_workers.container_info ());
return info;
}
15 changes: 14 additions & 1 deletion nano/node/confirming_set.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ class confirming_set_config final
/** Maximum number of dependent blocks to be stored in memory during processing */
size_t max_blocks{ 128 * 1024 };
size_t max_queued_notifications{ 8 };

/** Maximum number of failed blocks to wait for requeuing */
size_t max_deferred{ 16 * 1024 };
/** Max age of deferred blocks before they are dropped */
std::chrono::seconds deferred_age_cutoff{ 15min };
};

/**
Expand All @@ -45,7 +50,7 @@ class confirming_set final
friend class confirmation_height_pruned_source_Test;

public:
confirming_set (confirming_set_config const &, nano::ledger &, nano::stats &, nano::logger &);
confirming_set (confirming_set_config const &, nano::ledger &, nano::block_processor &, nano::stats &, nano::logger &);
~confirming_set ();

void start ();
Expand All @@ -69,12 +74,14 @@ class confirming_set final

nano::observer_set<std::deque<context> const &> batch_cemented;
nano::observer_set<std::deque<nano::block_hash> const &> already_cemented;
nano::observer_set<nano::block_hash> cementing_failed;

nano::observer_set<std::shared_ptr<nano::block>> cemented_observers;

private: // Dependencies
confirming_set_config const & config;
nano::ledger & ledger;
nano::block_processor & block_processor;
nano::stats & stats;
nano::logger & logger;

Expand All @@ -83,11 +90,13 @@ class confirming_set final
{
nano::block_hash hash;
std::shared_ptr<nano::election> election;
std::chrono::steady_clock::time_point timestamp{ std::chrono::steady_clock::now () };
};

void run ();
void run_batch (std::unique_lock<std::mutex> &);
std::deque<entry> next_batch (size_t max_count);
void cleanup (std::unique_lock<std::mutex> &);

private:
// clang-format off
Expand All @@ -102,7 +111,11 @@ class confirming_set final
>>;
// clang-format on

// Blocks that are ready to be cemented
ordered_entries set;
// Blocks that could not be cemented immediately (e.g. waiting for rollbacks to complete)
ordered_entries deferred;
// Blocks that are being cemented in the current batch
std::unordered_set<nano::block_hash> current;

nano::thread_pool notification_workers;
Expand Down
2 changes: 1 addition & 1 deletion nano/node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ nano::node::node (std::shared_ptr<boost::asio::io_context> io_ctx_a, std::filesy
port_mapping_impl{ std::make_unique<nano::port_mapping> (*this) },
port_mapping{ *port_mapping_impl },
block_processor (*this),
confirming_set_impl{ std::make_unique<nano::confirming_set> (config.confirming_set, ledger, stats, logger) },
confirming_set_impl{ std::make_unique<nano::confirming_set> (config.confirming_set, ledger, block_processor, stats, logger) },
confirming_set{ *confirming_set_impl },
active_impl{ std::make_unique<nano::active_elections> (*this, confirming_set, block_processor) },
active{ *active_impl },
Expand Down

0 comments on commit 7ef8130

Please sign in to comment.