Skip to content

Commit

Permalink
Merge pull request #4404 from clemahieu/block_processor_flush_remove
Browse files Browse the repository at this point in the history
Remove block_processor::flush function
  • Loading branch information
clemahieu authored Jan 31, 2024
2 parents 97e9b32 + 0ce4ed4 commit 284d328
Show file tree
Hide file tree
Showing 12 changed files with 43 additions and 77 deletions.
16 changes: 5 additions & 11 deletions nano/core_test/active_transactions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,6 @@ TEST (active_transactions, inactive_votes_cache)
node.vote_processor.vote (vote, std::make_shared<nano::transport::inproc::channel> (node, node));
ASSERT_TIMELY_EQ (5s, node.vote_cache.size (), 1);
node.process_active (send);
node.block_processor.flush ();
ASSERT_TIMELY (5s, node.ledger.block_confirmed (node.store.tx_begin_read (), send->hash ()));
ASSERT_EQ (1, node.stats.count (nano::stat::type::election, nano::stat::detail::vote_cached));
}
Expand Down Expand Up @@ -356,7 +355,6 @@ TEST (active_transactions, inactive_votes_cache_existing_vote)
.build_shared ();
node.process_active (send);
node.block_processor.add (open);
node.block_processor.flush ();
ASSERT_TIMELY_EQ (5s, node.active.size (), 1);
auto election (node.active.election (send->qualified_root ()));
ASSERT_NE (nullptr, election);
Expand Down Expand Up @@ -725,7 +723,7 @@ TEST (active_transactions, republish_winner)
.build_shared ();

node1.process_active (send1);
node1.block_processor.flush ();
ASSERT_TIMELY (5s, nano::test::exists (node1, { send1 }));
ASSERT_TIMELY_EQ (3s, node2.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in), 1);

// Several forks
Expand All @@ -741,8 +739,8 @@ TEST (active_transactions, republish_winner)
.work (*system.work.generate (nano::dev::genesis->hash ()))
.build_shared ();
node1.process_active (fork);
ASSERT_TIMELY (5s, node1.active.active (*fork));
}
node1.block_processor.flush ();
ASSERT_TIMELY (3s, !node1.active.empty ());
ASSERT_EQ (1, node2.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in));

Expand All @@ -758,13 +756,12 @@ TEST (active_transactions, republish_winner)
.build_shared ();

node1.process_active (fork);
node1.block_processor.flush ();
ASSERT_TIMELY (5s, node1.active.active (fork->hash ()));
auto election = node1.active.election (fork->qualified_root ());
ASSERT_NE (nullptr, election);
auto vote = nano::test::make_final_vote (nano::dev::genesis_key, { fork });
node1.vote_processor.vote (vote, std::make_shared<nano::transport::inproc::channel> (node1, node1));
node1.vote_processor.flush ();
node1.block_processor.flush ();
ASSERT_TIMELY (5s, election->confirmed ());
ASSERT_EQ (fork->hash (), election->status.winner->hash ());
ASSERT_TIMELY (5s, node2.block_confirmed (fork->hash ()));
Expand Down Expand Up @@ -977,12 +974,10 @@ TEST (active_transactions, fork_replacement_tally)
node1.network.publish_filter.clear ();
node2.network.flood_block (send_last);
ASSERT_TIMELY (3s, node1.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in) > 0);
node1.block_processor.flush ();
system.delay_ms (50ms);

// Correct block without votes is ignored
auto blocks1 (election->blocks ());
ASSERT_EQ (max_blocks, blocks1.size ());
std::unordered_map<nano::block_hash, std::shared_ptr<nano::block>> blocks1;
ASSERT_TIMELY_EQ (5s, max_blocks, (blocks1 = election->blocks (), blocks1.size ()));
ASSERT_FALSE (blocks1.find (send_last->hash ()) != blocks1.end ());

// Process vote for correct block & replace existing lowest tally block
Expand Down Expand Up @@ -1055,7 +1050,6 @@ TEST (active_transactions, DISABLED_confirm_new)
.work (*system.work.generate (nano::dev::genesis->hash ()))
.build_shared ();
node1.process_active (send);
node1.block_processor.flush ();
ASSERT_TIMELY_EQ (5s, 1, node1.active.size ());
auto & node2 = *system.add_node ();
// Add key to node2
Expand Down
19 changes: 7 additions & 12 deletions nano/core_test/confirmation_height.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,7 @@ TEST (confirmation_height, single)
ASSERT_EQ (nano::dev::genesis->hash (), confirmation_height_info.frontier);

node->process_active (send1);
node->block_processor.flush ();

ASSERT_TIMELY (5s, nano::test::exists (*node, { send1 }));
ASSERT_TIMELY_EQ (10s, node->stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out), 1);

{
Expand Down Expand Up @@ -514,14 +513,16 @@ TEST (confirmation_height, gap_live)
node->block_processor.add (send1);
node->block_processor.add (send2);
node->block_processor.add (send3);
// node->block_processor.add (open1); Witheld for test
node->block_processor.add (receive1);
node->block_processor.flush ();
ASSERT_TIMELY (5s, nano::test::exists (*node, { send1, send2, send3 }));
ASSERT_TIMELY (5s, node->unchecked.exists ({ open1->hash (), receive1->hash () }));

add_callback_stats (*node);

// Receive 2 comes in on the live network, however the chain has not been finished so it gets added to unchecked
node->process_active (receive2);
node->block_processor.flush ();
ASSERT_TIMELY (5s, node->unchecked.exists ({ receive1->hash (), receive2->hash () }));

// Confirmation heights should not be updated
{
Expand All @@ -538,7 +539,6 @@ TEST (confirmation_height, gap_live)

// Now complete the chain where the block comes in on the live network
node->process_active (open1);
node->block_processor.flush ();

ASSERT_TIMELY_EQ (10s, node->stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out), 6);

Expand Down Expand Up @@ -1220,7 +1220,6 @@ TEST (confirmation_height, observers)
add_callback_stats (*node1);

node1->process_active (send1);
node1->block_processor.flush ();
ASSERT_TIMELY_EQ (10s, node1->stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out), 1);
auto transaction = node1->store.tx_begin_read ();
ASSERT_TRUE (node1->ledger.block_confirmed (transaction, send1->hash ()));
Expand Down Expand Up @@ -1498,17 +1497,13 @@ TEST (confirmation_height, callback_confirmed_history)
add_callback_stats (*node);

node->process_active (send1);
ASSERT_NE (nano::test::start_election (system, *node, send1->hash ()), nullptr);
std::shared_ptr<nano::election> election;
ASSERT_TIMELY (5s, election = nano::test::start_election (system, *node, send1->hash ()));
{
node->process_active (send);
node->block_processor.flush ();

// The write guard prevents the confirmation height processor doing any writes
auto write_guard = node->write_database_queue.wait (nano::writer::testing);

// Confirm send1
auto election = node->active.election (send1->qualified_root ());
ASSERT_NE (nullptr, election);
election->force_confirm ();
ASSERT_TIMELY_EQ (10s, node->active.size (), 0);
ASSERT_EQ (0, node->active.recently_cemented.list ().size ());
Expand Down
7 changes: 2 additions & 5 deletions nano/core_test/gap_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,13 +157,10 @@ TEST (gap_cache, two_dependencies)
.build_shared ();
ASSERT_EQ (0, node1.gap_cache.size ());
node1.block_processor.add (send2);
node1.block_processor.flush ();
ASSERT_EQ (1, node1.gap_cache.size ());
ASSERT_TIMELY_EQ (5s, 1, node1.gap_cache.size ());
node1.block_processor.add (open);
node1.block_processor.flush ();
ASSERT_EQ (2, node1.gap_cache.size ());
ASSERT_TIMELY_EQ (5s, 2, node1.gap_cache.size ());
node1.block_processor.add (send1);
node1.block_processor.flush ();
ASSERT_TIMELY_EQ (5s, node1.gap_cache.size (), 0);
ASSERT_TIMELY (5s, node1.store.block.exists (node1.store.tx_begin_read (), send1->hash ()));
ASSERT_TIMELY (5s, node1.store.block.exists (node1.store.tx_begin_read (), send2->hash ()));
Expand Down
4 changes: 2 additions & 2 deletions nano/core_test/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -423,9 +423,9 @@ TEST (receivable_processor, send_with_receive)
ASSERT_EQ (amount, node2.balance (nano::dev::genesis_key.pub));
ASSERT_EQ (0, node2.balance (key2.pub));
node1.process_active (block1);
node1.block_processor.flush ();
ASSERT_TIMELY (5s, nano::test::exists (node1, { block1 }));
node2.process_active (block1);
node2.block_processor.flush ();
ASSERT_TIMELY (5s, nano::test::exists (node2, { block1 }));
ASSERT_EQ (amount - node1.config.receive_minimum.number (), node1.balance (nano::dev::genesis_key.pub));
ASSERT_EQ (0, node1.balance (key2.pub));
ASSERT_EQ (amount - node1.config.receive_minimum.number (), node2.balance (nano::dev::genesis_key.pub));
Expand Down
30 changes: 10 additions & 20 deletions nano/core_test/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -355,8 +355,7 @@ TEST (node, receive_gap)
nano::publish message{ nano::dev::network_params.network, block };
auto channel1 = std::make_shared<nano::transport::fake::channel> (node1);
node1.network.inbound (message, channel1);
node1.block_processor.flush ();
ASSERT_EQ (1, node1.gap_cache.size ());
ASSERT_TIMELY_EQ (5s, 1, node1.gap_cache.size ());
}

TEST (node, merge_peers)
Expand Down Expand Up @@ -575,14 +574,13 @@ TEST (node, fork_publish)
.build_shared ();
node1.work_generate_blocking (*send2);
node1.process_active (send1);
node1.block_processor.flush ();
ASSERT_TIMELY_EQ (5s, 1, node1.active.size ());
auto election (node1.active.election (send1->qualified_root ()));
ASSERT_NE (nullptr, election);
// Wait until the genesis rep activated & makes vote
ASSERT_TIMELY_EQ (1s, election->votes ().size (), 2);
node1.process_active (send2);
node1.block_processor.flush ();
ASSERT_TIMELY (5s, node1.active.active (*send2));
auto votes1 (election->votes ());
auto existing1 (votes1.find (nano::dev::genesis_key.pub));
ASSERT_NE (votes1.end (), existing1);
Expand Down Expand Up @@ -673,16 +671,15 @@ TEST (node, fork_keep)
.work (*system.work.generate (nano::dev::genesis->hash ()))
.build_shared ();
node1.process_active (send1);
node1.block_processor.flush ();
node2.process_active (send1);
node2.block_processor.flush ();
ASSERT_TIMELY_EQ (5s, 1, node1.active.size ());
ASSERT_TIMELY_EQ (5s, 1, node2.active.size ());
system.wallet (0)->insert_adhoc (nano::dev::genesis_key.prv);
// Fill node with forked blocks
node1.process_active (send2);
node1.block_processor.flush ();
ASSERT_TIMELY (5s, node1.active.active (*send2));
node2.process_active (send2);
node2.block_processor.flush ();
ASSERT_TIMELY (5s, node2.active.active (*send2));
auto election1 (node2.active.election (nano::qualified_root (nano::dev::genesis->hash (), nano::dev::genesis->hash ())));
ASSERT_NE (nullptr, election1);
ASSERT_EQ (1, election1->votes ().size ());
Expand Down Expand Up @@ -728,16 +725,15 @@ TEST (node, fork_flip)
auto ignored_channel{ std::make_shared<nano::transport::channel_tcp> (node1, std::weak_ptr<nano::transport::socket> ()) };

node1.network.inbound (publish1, ignored_channel);
node1.block_processor.flush ();
node2.network.inbound (publish2, ignored_channel);
node2.block_processor.flush ();
ASSERT_TIMELY_EQ (5s, 1, node1.active.size ());
ASSERT_TIMELY_EQ (5s, 1, node2.active.size ());
system.wallet (0)->insert_adhoc (nano::dev::genesis_key.prv);
// Fill nodes with forked blocks
node1.network.inbound (publish2, ignored_channel);
node1.block_processor.flush ();
ASSERT_TIMELY (5s, node1.active.active (*send2));
node2.network.inbound (publish1, ignored_channel);
node2.block_processor.flush ();
ASSERT_TIMELY (5s, node2.active.active (*send1));
auto election1 (node2.active.election (nano::qualified_root (nano::dev::genesis->hash (), nano::dev::genesis->hash ())));
ASSERT_NE (nullptr, election1);
ASSERT_EQ (1, election1->votes ().size ());
Expand Down Expand Up @@ -2083,7 +2079,6 @@ TEST (node, online_reps_election)
.work (*node1.work_generate_blocking (nano::dev::genesis->hash ()))
.build_shared ();
node1.process_active (send1);
node1.block_processor.flush ();
ASSERT_TIMELY_EQ (5s, 1, node1.active.size ());
// Process vote for ongoing election
auto vote = std::make_shared<nano::vote> (nano::dev::genesis_key.pub, nano::dev::genesis_key.prv, nano::milliseconds_since_epoch (), 0, std::vector<nano::block_hash>{ send1->hash () });
Expand Down Expand Up @@ -2453,7 +2448,6 @@ TEST (node, local_votes_cache_fork)
node_config.peering_port = system.get_available_port ();
auto & node2 (*system.add_node (node_config, node_flags));
node2.process_active (send1_fork);
node2.block_processor.flush ();
ASSERT_TIMELY (5s, node2.ledger.block_or_pruned_exists (send1->hash ()));
}

Expand Down Expand Up @@ -2977,8 +2971,7 @@ TEST (node, block_processor_reject_state)
send1->signature.bytes[0] ^= 1;
ASSERT_FALSE (node.ledger.block_or_pruned_exists (send1->hash ()));
node.process_active (send1);
auto flushed = std::async (std::launch::async, [&node] { node.block_processor.flush (); });
ASSERT_NE (std::future_status::timeout, flushed.wait_for (5s));
ASSERT_TIMELY_EQ (5s, 1, node.stats.count (nano::stat::type::blockprocessor, nano::stat::detail::bad_signature));
ASSERT_FALSE (node.ledger.block_or_pruned_exists (send1->hash ()));
auto send2 = builder.make_block ()
.account (nano::dev::genesis_key.pub)
Expand All @@ -2990,9 +2983,7 @@ TEST (node, block_processor_reject_state)
.work (*node.work_generate_blocking (nano::dev::genesis->hash ()))
.build_shared ();
node.process_active (send2);
auto flushed2 = std::async (std::launch::async, [&node] { node.block_processor.flush (); });
ASSERT_NE (std::future_status::timeout, flushed2.wait_for (5s));
ASSERT_TRUE (node.ledger.block_or_pruned_exists (send2->hash ()));
ASSERT_TIMELY (5s, node.ledger.block_or_pruned_exists (send2->hash ()));
}

TEST (node, block_processor_full)
Expand Down Expand Up @@ -3337,7 +3328,6 @@ TEST (node, bidirectional_tcp)
.work (*node1->work_generate_blocking (send1->hash ()))
.build_shared ();
node2->process_active (send2);
node2->block_processor.flush ();
ASSERT_TIMELY (10s, node1->ledger.block_or_pruned_exists (send2->hash ()) && node2->ledger.block_or_pruned_exists (send2->hash ()));
// Test block confirmation from node 2 (add representative to node 2)
system.wallet (1)->insert_adhoc (nano::dev::genesis_key.prv);
Expand Down
4 changes: 2 additions & 2 deletions nano/core_test/wallet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -946,7 +946,7 @@ TEST (wallet, change_seed)
wallet->insert_adhoc (nano::dev::genesis_key.prv, false);
auto block (wallet->send_action (nano::dev::genesis_key.pub, pub, 100));
ASSERT_NE (nullptr, block);
system.nodes[0]->block_processor.flush ();
ASSERT_TIMELY (5s, nano::test::exists (*system.nodes[0], { block }));
{
auto transaction (wallet->wallets.tx_begin_write ());
wallet->change_seed (transaction, seed1);
Expand Down Expand Up @@ -980,7 +980,7 @@ TEST (wallet, deterministic_restore)
wallet->insert_adhoc (nano::dev::genesis_key.prv, false);
auto block (wallet->send_action (nano::dev::genesis_key.pub, pub, 100));
ASSERT_NE (nullptr, block);
system.nodes[0]->block_processor.flush ();
ASSERT_TIMELY (5s, nano::test::exists (*system.nodes[0], { block }));
{
auto transaction (wallet->wallets.tx_begin_write ());
wallet->deterministic_restore (transaction);
Expand Down
3 changes: 1 addition & 2 deletions nano/core_test/websocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,7 @@ TEST (websocket, stopped_election)
nano::publish publish1{ nano::dev::network_params.network, send1 };
auto channel1 = std::make_shared<nano::transport::fake::channel> (*node1);
node1->network.inbound (publish1, channel1);
node1->block_processor.flush ();
ASSERT_TIMELY (1s, node1->active.election (send1->qualified_root ()));
ASSERT_TIMELY (5s, node1->active.election (send1->qualified_root ()));
node1->active.erase (*send1);

ASSERT_TIMELY_EQ (5s, future.wait_for (0s), std::future_status::ready);
Expand Down
10 changes: 4 additions & 6 deletions nano/nano_node/entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -990,7 +990,6 @@ int main (int argc, char * const * argv)
}
}

node->block_processor.flush ();
auto end (std::chrono::high_resolution_clock::now ());
auto time (std::chrono::duration_cast<std::chrono::microseconds> (end - begin).count ());
node->stop ();
Expand Down Expand Up @@ -1083,7 +1082,10 @@ int main (int argc, char * const * argv)
node->process_active (block);
blocks.pop_front ();
}
node->block_processor.flush ();
while (node->block_processor.size () > 0)
{
std::this_thread::sleep_for (std::chrono::milliseconds (100));
}
// Processing votes
std::cerr << boost::str (boost::format ("Starting processing %1% votes\n") % max_votes);
auto begin (std::chrono::high_resolution_clock::now ());
Expand Down Expand Up @@ -1191,7 +1193,6 @@ int main (int argc, char * const * argv)
{
node1->block_processor.add (block);
}
node1->block_processor.flush ();
auto iteration (0);
while (node1->ledger.cache.block_count != count * 2 + 1)
{
Expand Down Expand Up @@ -1241,7 +1242,6 @@ int main (int argc, char * const * argv)
node2->block_processor.add (block);
blocks.pop_front ();
}
node2->block_processor.flush ();
while (node2->ledger.cache.block_count != count * 2 + 1)
{
std::this_thread::sleep_for (std::chrono::milliseconds (500));
Expand Down Expand Up @@ -1836,8 +1836,6 @@ int main (int argc, char * const * argv)
}
}

node.node->block_processor.flush ();

auto end (std::chrono::high_resolution_clock::now ());
auto time (std::chrono::duration_cast<std::chrono::microseconds> (end - begin).count ());
auto us_in_second (1000000);
Expand Down
11 changes: 0 additions & 11 deletions nano/node/blockprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,6 @@ void nano::block_processor::stop ()
nano::join_or_pass (processing_thread);
}

void nano::block_processor::flush ()
{
flushing = true;
nano::unique_lock<nano::mutex> lock{ mutex };
while (!stopped && (have_blocks () || active))
{
condition.wait (lock);
}
flushing = false;
}

std::size_t nano::block_processor::size ()
{
nano::unique_lock<nano::mutex> lock{ mutex };
Expand Down
1 change: 0 additions & 1 deletion nano/node/blockprocessor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ class block_processor final
public:
explicit block_processor (nano::node &, nano::write_database_queue &);
void stop ();
void flush ();
std::size_t size ();
bool full ();
bool half_full ();
Expand Down
10 changes: 6 additions & 4 deletions nano/node/bootstrap/bootstrap_legacy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -223,10 +223,12 @@ void nano::bootstrap_attempt_legacy::run ()
condition.wait (lock, [&stopped = stopped, &pulling = pulling] { return stopped || pulling == 0; });
}

// Flushing may resolve forks which can add more pulls
lock.unlock ();
node->block_processor.flush ();
lock.lock ();
// TODO: This check / wait is a heuristic and should be improved.
auto wait_start = std::chrono::steady_clock::now ();
while (!stopped && node->block_processor.size () != 0 && ((std::chrono::steady_clock::now () - wait_start) < std::chrono::seconds{ 10 }))
{
condition.wait_for (lock, std::chrono::milliseconds{ 100 }, [this, node] { return stopped || node->block_processor.size () == 0; });
}

if (start_account.number () != std::numeric_limits<nano::uint256_t>::max ())
{
Expand Down
Loading

0 comments on commit 284d328

Please sign in to comment.