Skip to content

Commit

Permalink
Block processor batch rolled back event
Browse files Browse the repository at this point in the history
  • Loading branch information
pwojcikdev committed Nov 25, 2024
1 parent b00064b commit 3d14f1e
Show file tree
Hide file tree
Showing 8 changed files with 33 additions and 36 deletions.
9 changes: 6 additions & 3 deletions nano/node/active_elections.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,13 @@ nano::active_elections::active_elections (nano::node & node_a, nano::confirming_
});

// Stop all rolled back active transactions except initial
block_processor.rolled_back.add ([this] (auto const & block, auto const & rollback_root) {
if (block->qualified_root () != rollback_root)
block_processor.rolled_back.add ([this] (auto const & blocks, auto const & rollback_root) {
for (auto const & block : blocks)
{
erase (block->qualified_root ());
if (block->qualified_root () != rollback_root)
{
erase (block->qualified_root ());
}
}
});
}
Expand Down
15 changes: 2 additions & 13 deletions nano/node/blockprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,6 @@ nano::block_processor::block_processor (nano::node_config const & node_config, n
logger{ logger_a },
workers{ 1, nano::thread_role::name::block_processing_notifications }
{
batch_processed.add ([this] (auto const & items) {
// For every batch item: notify the 'processed' observer.
for (auto const & [result, context] : items)
{
block_processed.notify (result, context);
}
});

queue.max_size_query = [this] (auto const & origin) {
switch (origin.source)
{
Expand Down Expand Up @@ -193,7 +185,7 @@ void nano::block_processor::rollback_competitor (secure::write_transaction const
// Replace our block with the winner and roll back any dependent blocks
logger.debug (nano::log::type::blockprocessor, "Rolling back: {} and replacing with: {}", successor->hash ().to_string (), hash.to_string ());

std::vector<std::shared_ptr<nano::block>> rollback_list;
std::deque<std::shared_ptr<nano::block>> rollback_list;
if (ledger.rollback (transaction, successor->hash (), rollback_list))
{
stats.inc (nano::stat::type::ledger, nano::stat::detail::rollback_failed);
Expand All @@ -206,10 +198,7 @@ void nano::block_processor::rollback_competitor (secure::write_transaction const
}

// Notify observers of the rolled back blocks
for (auto const & block : rollback_list)
{
rolled_back.notify (block, fork_block.qualified_root ());
}
rolled_back.notify (rollback_list, fork_block.qualified_root ());
}
}

Expand Down
17 changes: 8 additions & 9 deletions nano/node/blockprocessor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,14 @@ class block_processor final
std::atomic<bool> flushing{ false };

public: // Events
using processed_t = std::tuple<nano::block_status, context>;
using processed_batch_t = std::deque<processed_t>;

// The batch observer feeds the processed observer
nano::observer_set<nano::block_status const &, context const &> block_processed;
nano::observer_set<processed_batch_t const &> batch_processed;

// Rolled back blocks <rolled back block, root of rollback>
nano::observer_set<std::shared_ptr<nano::block> const &, nano::qualified_root const &> rolled_back;
// All processed blocks including forks, rejected etc
using processed_batch_t = std::deque<std::pair<nano::block_status, context>>;
using processed_batch_event_t = nano::observer_set<processed_batch_t>;
processed_batch_event_t batch_processed;

// Rolled back blocks <rolled back blocks, root of rollback>
using rolled_back_event_t = nano::observer_set<std::deque<std::shared_ptr<nano::block>>, nano::qualified_root>;
rolled_back_event_t rolled_back;

private: // Dependencies
block_processor_config const & config;
Expand Down
9 changes: 6 additions & 3 deletions nano/node/local_block_broadcaster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,13 @@ nano::local_block_broadcaster::local_block_broadcaster (local_block_broadcaster_
}
});

block_processor.rolled_back.add ([this] (auto const & block, auto const & rollback_root) {
block_processor.rolled_back.add ([this] (auto const & blocks, auto const & rollback_root) {
nano::lock_guard<nano::mutex> guard{ mutex };
auto erased = local_blocks.get<tag_hash> ().erase (block->hash ());
stats.add (nano::stat::type::local_block_broadcaster, nano::stat::detail::rollback, erased);
for (auto const & block : blocks)
{
auto erased = local_blocks.get<tag_hash> ().erase (block->hash ());
stats.add (nano::stat::type::local_block_broadcaster, nano::stat::detail::rollback, erased);
}
});

confirming_set.cemented_observers.add ([this] (auto const & block) {
Expand Down
7 changes: 5 additions & 2 deletions nano/node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,11 @@ nano::node::node (std::shared_ptr<boost::asio::io_context> io_ctx_a, std::filesy
});

// Do some cleanup of rolled back blocks
block_processor.rolled_back.add ([this] (auto const & block, auto const & rollback_root) {
history.erase (block->root ());
block_processor.rolled_back.add ([this] (auto const & blocks, auto const & rollback_root) {
for (auto const & block : blocks)
{
history.erase (block->root ());
}
});

if (!init_error ())
Expand Down
8 changes: 4 additions & 4 deletions nano/secure/ledger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ namespace
class rollback_visitor : public nano::block_visitor
{
public:
rollback_visitor (nano::secure::write_transaction const & transaction_a, nano::ledger & ledger_a, std::vector<std::shared_ptr<nano::block>> & list_a) :
rollback_visitor (nano::secure::write_transaction const & transaction_a, nano::ledger & ledger_a, std::deque<std::shared_ptr<nano::block>> & list_a) :
transaction (transaction_a),
ledger (ledger_a),
list (list_a)
Expand Down Expand Up @@ -179,7 +179,7 @@ class rollback_visitor : public nano::block_visitor
}
nano::secure::write_transaction const & transaction;
nano::ledger & ledger;
std::vector<std::shared_ptr<nano::block>> & list;
std::deque<std::shared_ptr<nano::block>> & list;
bool error{ false };
};

Expand Down Expand Up @@ -992,7 +992,7 @@ nano::uint128_t nano::ledger::weight_exact (secure::transaction const & txn_a, n
}

// Rollback blocks until `block_a' doesn't exist or it tries to penetrate the confirmation height
bool nano::ledger::rollback (secure::write_transaction const & transaction_a, nano::block_hash const & block_a, std::vector<std::shared_ptr<nano::block>> & list_a)
bool nano::ledger::rollback (secure::write_transaction const & transaction_a, nano::block_hash const & block_a, std::deque<std::shared_ptr<nano::block>> & list_a)
{
debug_assert (any.block_exists (transaction_a, block_a));
auto account_l = any.block_account (transaction_a, block_a).value ();
Expand Down Expand Up @@ -1026,7 +1026,7 @@ bool nano::ledger::rollback (secure::write_transaction const & transaction_a, na

bool nano::ledger::rollback (secure::write_transaction const & transaction_a, nano::block_hash const & block_a)
{
std::vector<std::shared_ptr<nano::block>> rollback_list;
std::deque<std::shared_ptr<nano::block>> rollback_list;
return rollback (transaction_a, block_a, rollback_list);
}

Expand Down
2 changes: 1 addition & 1 deletion nano/secure/ledger.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class ledger final
std::optional<nano::pending_info> pending_info (secure::transaction const &, nano::pending_key const & key) const;
std::deque<std::shared_ptr<nano::block>> confirm (secure::write_transaction &, nano::block_hash const & hash, size_t max_blocks = 1024 * 128);
nano::block_status process (secure::write_transaction const &, std::shared_ptr<nano::block> block);
bool rollback (secure::write_transaction const &, nano::block_hash const &, std::vector<std::shared_ptr<nano::block>> &);
bool rollback (secure::write_transaction const &, nano::block_hash const &, std::deque<std::shared_ptr<nano::block>> & rollback_list);
bool rollback (secure::write_transaction const &, nano::block_hash const &);
void update_account (secure::write_transaction const &, nano::account const &, nano::account_info const &, nano::account_info const &);
uint64_t pruning_action (secure::write_transaction &, nano::block_hash const &, uint64_t const);
Expand Down
2 changes: 1 addition & 1 deletion nano/test_common/system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ void nano::test::system::generate_rollback (nano::node & node_a, std::vector<nan
{
accounts_a[index] = accounts_a[accounts_a.size () - 1];
accounts_a.pop_back ();
std::vector<std::shared_ptr<nano::block>> rollback_list;
std::deque<std::shared_ptr<nano::block>> rollback_list;
auto error = node_a.ledger.rollback (transaction, hash, rollback_list);
(void)error;
debug_assert (!error);
Expand Down

0 comments on commit 3d14f1e

Please sign in to comment.