diff --git a/nano/core_test/active_transactions.cpp b/nano/core_test/active_transactions.cpp index 74e3250871..910a518cb9 100644 --- a/nano/core_test/active_transactions.cpp +++ b/nano/core_test/active_transactions.cpp @@ -255,7 +255,6 @@ TEST (active_transactions, inactive_votes_cache) node.vote_processor.vote (vote, std::make_shared (node, node)); ASSERT_TIMELY_EQ (5s, node.vote_cache.size (), 1); node.process_active (send); - node.block_processor.flush (); ASSERT_TIMELY (5s, node.ledger.block_confirmed (node.store.tx_begin_read (), send->hash ())); ASSERT_EQ (1, node.stats.count (nano::stat::type::election, nano::stat::detail::vote_cached)); } @@ -356,7 +355,6 @@ TEST (active_transactions, inactive_votes_cache_existing_vote) .build_shared (); node.process_active (send); node.block_processor.add (open); - node.block_processor.flush (); ASSERT_TIMELY_EQ (5s, node.active.size (), 1); auto election (node.active.election (send->qualified_root ())); ASSERT_NE (nullptr, election); @@ -725,7 +723,7 @@ TEST (active_transactions, republish_winner) .build_shared (); node1.process_active (send1); - node1.block_processor.flush (); + ASSERT_TIMELY (5s, nano::test::exists (node1, { send1 })); ASSERT_TIMELY_EQ (3s, node2.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in), 1); // Several forks @@ -741,8 +739,8 @@ TEST (active_transactions, republish_winner) .work (*system.work.generate (nano::dev::genesis->hash ())) .build_shared (); node1.process_active (fork); + ASSERT_TIMELY (5s, node1.active.active (*fork)); } - node1.block_processor.flush (); ASSERT_TIMELY (3s, !node1.active.empty ()); ASSERT_EQ (1, node2.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in)); @@ -758,13 +756,12 @@ TEST (active_transactions, republish_winner) .build_shared (); node1.process_active (fork); - node1.block_processor.flush (); + ASSERT_TIMELY (5s, node1.active.active (fork->hash ())); auto election = node1.active.election (fork->qualified_root ()); ASSERT_NE (nullptr, election); auto vote = nano::test::make_final_vote (nano::dev::genesis_key, { fork }); node1.vote_processor.vote (vote, std::make_shared (node1, node1)); node1.vote_processor.flush (); - node1.block_processor.flush (); ASSERT_TIMELY (5s, election->confirmed ()); ASSERT_EQ (fork->hash (), election->status.winner->hash ()); ASSERT_TIMELY (5s, node2.block_confirmed (fork->hash ())); @@ -977,12 +974,10 @@ TEST (active_transactions, fork_replacement_tally) node1.network.publish_filter.clear (); node2.network.flood_block (send_last); ASSERT_TIMELY (3s, node1.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in) > 0); - node1.block_processor.flush (); - system.delay_ms (50ms); // Correct block without votes is ignored - auto blocks1 (election->blocks ()); - ASSERT_EQ (max_blocks, blocks1.size ()); + std::unordered_map> blocks1; + ASSERT_TIMELY_EQ (5s, max_blocks, (blocks1 = election->blocks (), blocks1.size ())); ASSERT_FALSE (blocks1.find (send_last->hash ()) != blocks1.end ()); // Process vote for correct block & replace existing lowest tally block @@ -1055,7 +1050,6 @@ TEST (active_transactions, DISABLED_confirm_new) .work (*system.work.generate (nano::dev::genesis->hash ())) .build_shared (); node1.process_active (send); - node1.block_processor.flush (); ASSERT_TIMELY_EQ (5s, 1, node1.active.size ()); auto & node2 = *system.add_node (); // Add key to node2 diff --git a/nano/core_test/confirmation_height.cpp b/nano/core_test/confirmation_height.cpp index 2271e4985c..a57657b580 100644 --- a/nano/core_test/confirmation_height.cpp +++ b/nano/core_test/confirmation_height.cpp @@ -61,8 +61,7 @@ TEST (confirmation_height, single) ASSERT_EQ (nano::dev::genesis->hash (), confirmation_height_info.frontier); node->process_active (send1); - node->block_processor.flush (); - + ASSERT_TIMELY (5s, nano::test::exists (*node, { send1 })); ASSERT_TIMELY_EQ (10s, node->stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out), 1); { @@ -514,14 +513,16 @@ TEST (confirmation_height, gap_live) node->block_processor.add (send1); node->block_processor.add (send2); node->block_processor.add (send3); + // node->block_processor.add (open1); Witheld for test node->block_processor.add (receive1); - node->block_processor.flush (); + ASSERT_TIMELY (5s, nano::test::exists (*node, { send1, send2, send3 })); + ASSERT_TIMELY (5s, node->unchecked.exists ({ open1->hash (), receive1->hash () })); add_callback_stats (*node); // Receive 2 comes in on the live network, however the chain has not been finished so it gets added to unchecked node->process_active (receive2); - node->block_processor.flush (); + ASSERT_TIMELY (5s, node->unchecked.exists ({ receive1->hash (), receive2->hash () })); // Confirmation heights should not be updated { @@ -538,7 +539,6 @@ TEST (confirmation_height, gap_live) // Now complete the chain where the block comes in on the live network node->process_active (open1); - node->block_processor.flush (); ASSERT_TIMELY_EQ (10s, node->stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out), 6); @@ -1220,7 +1220,6 @@ TEST (confirmation_height, observers) add_callback_stats (*node1); node1->process_active (send1); - node1->block_processor.flush (); ASSERT_TIMELY_EQ (10s, node1->stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out), 1); auto transaction = node1->store.tx_begin_read (); ASSERT_TRUE (node1->ledger.block_confirmed (transaction, send1->hash ())); @@ -1498,17 +1497,13 @@ TEST (confirmation_height, callback_confirmed_history) add_callback_stats (*node); node->process_active (send1); - ASSERT_NE (nano::test::start_election (system, *node, send1->hash ()), nullptr); + std::shared_ptr election; + ASSERT_TIMELY (5s, election = nano::test::start_election (system, *node, send1->hash ())); { - node->process_active (send); - node->block_processor.flush (); - // The write guard prevents the confirmation height processor doing any writes auto write_guard = node->write_database_queue.wait (nano::writer::testing); // Confirm send1 - auto election = node->active.election (send1->qualified_root ()); - ASSERT_NE (nullptr, election); election->force_confirm (); ASSERT_TIMELY_EQ (10s, node->active.size (), 0); ASSERT_EQ (0, node->active.recently_cemented.list ().size ()); diff --git a/nano/core_test/gap_cache.cpp b/nano/core_test/gap_cache.cpp index 7b6924e396..561088db11 100644 --- a/nano/core_test/gap_cache.cpp +++ b/nano/core_test/gap_cache.cpp @@ -157,13 +157,10 @@ TEST (gap_cache, two_dependencies) .build_shared (); ASSERT_EQ (0, node1.gap_cache.size ()); node1.block_processor.add (send2); - node1.block_processor.flush (); - ASSERT_EQ (1, node1.gap_cache.size ()); + ASSERT_TIMELY_EQ (5s, 1, node1.gap_cache.size ()); node1.block_processor.add (open); - node1.block_processor.flush (); - ASSERT_EQ (2, node1.gap_cache.size ()); + ASSERT_TIMELY_EQ (5s, 2, node1.gap_cache.size ()); node1.block_processor.add (send1); - node1.block_processor.flush (); ASSERT_TIMELY_EQ (5s, node1.gap_cache.size (), 0); ASSERT_TIMELY (5s, node1.store.block.exists (node1.store.tx_begin_read (), send1->hash ())); ASSERT_TIMELY (5s, node1.store.block.exists (node1.store.tx_begin_read (), send2->hash ())); diff --git a/nano/core_test/network.cpp b/nano/core_test/network.cpp index d2ab36df39..acdc87d43f 100644 --- a/nano/core_test/network.cpp +++ b/nano/core_test/network.cpp @@ -423,9 +423,9 @@ TEST (receivable_processor, send_with_receive) ASSERT_EQ (amount, node2.balance (nano::dev::genesis_key.pub)); ASSERT_EQ (0, node2.balance (key2.pub)); node1.process_active (block1); - node1.block_processor.flush (); + ASSERT_TIMELY (5s, nano::test::exists (node1, { block1 })); node2.process_active (block1); - node2.block_processor.flush (); + ASSERT_TIMELY (5s, nano::test::exists (node2, { block1 })); ASSERT_EQ (amount - node1.config.receive_minimum.number (), node1.balance (nano::dev::genesis_key.pub)); ASSERT_EQ (0, node1.balance (key2.pub)); ASSERT_EQ (amount - node1.config.receive_minimum.number (), node2.balance (nano::dev::genesis_key.pub)); diff --git a/nano/core_test/node.cpp b/nano/core_test/node.cpp index 36470a63ca..68084f9236 100644 --- a/nano/core_test/node.cpp +++ b/nano/core_test/node.cpp @@ -355,8 +355,7 @@ TEST (node, receive_gap) nano::publish message{ nano::dev::network_params.network, block }; auto channel1 = std::make_shared (node1); node1.network.inbound (message, channel1); - node1.block_processor.flush (); - ASSERT_EQ (1, node1.gap_cache.size ()); + ASSERT_TIMELY_EQ (5s, 1, node1.gap_cache.size ()); } TEST (node, merge_peers) @@ -575,14 +574,13 @@ TEST (node, fork_publish) .build_shared (); node1.work_generate_blocking (*send2); node1.process_active (send1); - node1.block_processor.flush (); ASSERT_TIMELY_EQ (5s, 1, node1.active.size ()); auto election (node1.active.election (send1->qualified_root ())); ASSERT_NE (nullptr, election); // Wait until the genesis rep activated & makes vote ASSERT_TIMELY_EQ (1s, election->votes ().size (), 2); node1.process_active (send2); - node1.block_processor.flush (); + ASSERT_TIMELY (5s, node1.active.active (*send2)); auto votes1 (election->votes ()); auto existing1 (votes1.find (nano::dev::genesis_key.pub)); ASSERT_NE (votes1.end (), existing1); @@ -673,16 +671,15 @@ TEST (node, fork_keep) .work (*system.work.generate (nano::dev::genesis->hash ())) .build_shared (); node1.process_active (send1); - node1.block_processor.flush (); node2.process_active (send1); - node2.block_processor.flush (); ASSERT_TIMELY_EQ (5s, 1, node1.active.size ()); ASSERT_TIMELY_EQ (5s, 1, node2.active.size ()); system.wallet (0)->insert_adhoc (nano::dev::genesis_key.prv); + // Fill node with forked blocks node1.process_active (send2); - node1.block_processor.flush (); + ASSERT_TIMELY (5s, node1.active.active (*send2)); node2.process_active (send2); - node2.block_processor.flush (); + ASSERT_TIMELY (5s, node2.active.active (*send2)); auto election1 (node2.active.election (nano::qualified_root (nano::dev::genesis->hash (), nano::dev::genesis->hash ()))); ASSERT_NE (nullptr, election1); ASSERT_EQ (1, election1->votes ().size ()); @@ -728,16 +725,15 @@ TEST (node, fork_flip) auto ignored_channel{ std::make_shared (node1, std::weak_ptr ()) }; node1.network.inbound (publish1, ignored_channel); - node1.block_processor.flush (); node2.network.inbound (publish2, ignored_channel); - node2.block_processor.flush (); ASSERT_TIMELY_EQ (5s, 1, node1.active.size ()); ASSERT_TIMELY_EQ (5s, 1, node2.active.size ()); system.wallet (0)->insert_adhoc (nano::dev::genesis_key.prv); + // Fill nodes with forked blocks node1.network.inbound (publish2, ignored_channel); - node1.block_processor.flush (); + ASSERT_TIMELY (5s, node1.active.active (*send2)); node2.network.inbound (publish1, ignored_channel); - node2.block_processor.flush (); + ASSERT_TIMELY (5s, node2.active.active (*send1)); auto election1 (node2.active.election (nano::qualified_root (nano::dev::genesis->hash (), nano::dev::genesis->hash ()))); ASSERT_NE (nullptr, election1); ASSERT_EQ (1, election1->votes ().size ()); @@ -2083,7 +2079,6 @@ TEST (node, online_reps_election) .work (*node1.work_generate_blocking (nano::dev::genesis->hash ())) .build_shared (); node1.process_active (send1); - node1.block_processor.flush (); ASSERT_TIMELY_EQ (5s, 1, node1.active.size ()); // Process vote for ongoing election auto vote = std::make_shared (nano::dev::genesis_key.pub, nano::dev::genesis_key.prv, nano::milliseconds_since_epoch (), 0, std::vector{ send1->hash () }); @@ -2453,7 +2448,6 @@ TEST (node, local_votes_cache_fork) node_config.peering_port = system.get_available_port (); auto & node2 (*system.add_node (node_config, node_flags)); node2.process_active (send1_fork); - node2.block_processor.flush (); ASSERT_TIMELY (5s, node2.ledger.block_or_pruned_exists (send1->hash ())); } @@ -2977,8 +2971,7 @@ TEST (node, block_processor_reject_state) send1->signature.bytes[0] ^= 1; ASSERT_FALSE (node.ledger.block_or_pruned_exists (send1->hash ())); node.process_active (send1); - auto flushed = std::async (std::launch::async, [&node] { node.block_processor.flush (); }); - ASSERT_NE (std::future_status::timeout, flushed.wait_for (5s)); + ASSERT_TIMELY_EQ (5s, 1, node.stats.count (nano::stat::type::blockprocessor, nano::stat::detail::bad_signature)); ASSERT_FALSE (node.ledger.block_or_pruned_exists (send1->hash ())); auto send2 = builder.make_block () .account (nano::dev::genesis_key.pub) @@ -2990,9 +2983,7 @@ TEST (node, block_processor_reject_state) .work (*node.work_generate_blocking (nano::dev::genesis->hash ())) .build_shared (); node.process_active (send2); - auto flushed2 = std::async (std::launch::async, [&node] { node.block_processor.flush (); }); - ASSERT_NE (std::future_status::timeout, flushed2.wait_for (5s)); - ASSERT_TRUE (node.ledger.block_or_pruned_exists (send2->hash ())); + ASSERT_TIMELY (5s, node.ledger.block_or_pruned_exists (send2->hash ())); } TEST (node, block_processor_full) @@ -3337,7 +3328,6 @@ TEST (node, bidirectional_tcp) .work (*node1->work_generate_blocking (send1->hash ())) .build_shared (); node2->process_active (send2); - node2->block_processor.flush (); ASSERT_TIMELY (10s, node1->ledger.block_or_pruned_exists (send2->hash ()) && node2->ledger.block_or_pruned_exists (send2->hash ())); // Test block confirmation from node 2 (add representative to node 2) system.wallet (1)->insert_adhoc (nano::dev::genesis_key.prv); diff --git a/nano/core_test/wallet.cpp b/nano/core_test/wallet.cpp index c27749522f..625dbb795a 100644 --- a/nano/core_test/wallet.cpp +++ b/nano/core_test/wallet.cpp @@ -946,7 +946,7 @@ TEST (wallet, change_seed) wallet->insert_adhoc (nano::dev::genesis_key.prv, false); auto block (wallet->send_action (nano::dev::genesis_key.pub, pub, 100)); ASSERT_NE (nullptr, block); - system.nodes[0]->block_processor.flush (); + ASSERT_TIMELY (5s, nano::test::exists (*system.nodes[0], { block })); { auto transaction (wallet->wallets.tx_begin_write ()); wallet->change_seed (transaction, seed1); @@ -980,7 +980,7 @@ TEST (wallet, deterministic_restore) wallet->insert_adhoc (nano::dev::genesis_key.prv, false); auto block (wallet->send_action (nano::dev::genesis_key.pub, pub, 100)); ASSERT_NE (nullptr, block); - system.nodes[0]->block_processor.flush (); + ASSERT_TIMELY (5s, nano::test::exists (*system.nodes[0], { block })); { auto transaction (wallet->wallets.tx_begin_write ()); wallet->deterministic_restore (transaction); diff --git a/nano/core_test/websocket.cpp b/nano/core_test/websocket.cpp index 5a010d684d..fb88af4901 100644 --- a/nano/core_test/websocket.cpp +++ b/nano/core_test/websocket.cpp @@ -210,8 +210,7 @@ TEST (websocket, stopped_election) nano::publish publish1{ nano::dev::network_params.network, send1 }; auto channel1 = std::make_shared (*node1); node1->network.inbound (publish1, channel1); - node1->block_processor.flush (); - ASSERT_TIMELY (1s, node1->active.election (send1->qualified_root ())); + ASSERT_TIMELY (5s, node1->active.election (send1->qualified_root ())); node1->active.erase (*send1); ASSERT_TIMELY_EQ (5s, future.wait_for (0s), std::future_status::ready); diff --git a/nano/nano_node/entry.cpp b/nano/nano_node/entry.cpp index e77751762e..89cff93745 100644 --- a/nano/nano_node/entry.cpp +++ b/nano/nano_node/entry.cpp @@ -990,7 +990,6 @@ int main (int argc, char * const * argv) } } - node->block_processor.flush (); auto end (std::chrono::high_resolution_clock::now ()); auto time (std::chrono::duration_cast (end - begin).count ()); node->stop (); @@ -1083,7 +1082,10 @@ int main (int argc, char * const * argv) node->process_active (block); blocks.pop_front (); } - node->block_processor.flush (); + while (node->block_processor.size () > 0) + { + std::this_thread::sleep_for (std::chrono::milliseconds (100)); + } // Processing votes std::cerr << boost::str (boost::format ("Starting processing %1% votes\n") % max_votes); auto begin (std::chrono::high_resolution_clock::now ()); @@ -1191,7 +1193,6 @@ int main (int argc, char * const * argv) { node1->block_processor.add (block); } - node1->block_processor.flush (); auto iteration (0); while (node1->ledger.cache.block_count != count * 2 + 1) { @@ -1241,7 +1242,6 @@ int main (int argc, char * const * argv) node2->block_processor.add (block); blocks.pop_front (); } - node2->block_processor.flush (); while (node2->ledger.cache.block_count != count * 2 + 1) { std::this_thread::sleep_for (std::chrono::milliseconds (500)); @@ -1836,8 +1836,6 @@ int main (int argc, char * const * argv) } } - node.node->block_processor.flush (); - auto end (std::chrono::high_resolution_clock::now ()); auto time (std::chrono::duration_cast (end - begin).count ()); auto us_in_second (1000000); diff --git a/nano/node/blockprocessor.cpp b/nano/node/blockprocessor.cpp index e696b5966b..76786fd247 100644 --- a/nano/node/blockprocessor.cpp +++ b/nano/node/blockprocessor.cpp @@ -37,17 +37,6 @@ void nano::block_processor::stop () nano::join_or_pass (processing_thread); } -void nano::block_processor::flush () -{ - flushing = true; - nano::unique_lock lock{ mutex }; - while (!stopped && (have_blocks () || active)) - { - condition.wait (lock); - } - flushing = false; -} - std::size_t nano::block_processor::size () { nano::unique_lock lock{ mutex }; diff --git a/nano/node/blockprocessor.hpp b/nano/node/blockprocessor.hpp index fc6c7172a0..a53fa80761 100644 --- a/nano/node/blockprocessor.hpp +++ b/nano/node/blockprocessor.hpp @@ -29,7 +29,6 @@ class block_processor final public: explicit block_processor (nano::node &, nano::write_database_queue &); void stop (); - void flush (); std::size_t size (); bool full (); bool half_full (); diff --git a/nano/node/bootstrap/bootstrap_legacy.cpp b/nano/node/bootstrap/bootstrap_legacy.cpp index 446d094646..9badee97cc 100644 --- a/nano/node/bootstrap/bootstrap_legacy.cpp +++ b/nano/node/bootstrap/bootstrap_legacy.cpp @@ -223,10 +223,12 @@ void nano::bootstrap_attempt_legacy::run () condition.wait (lock, [&stopped = stopped, &pulling = pulling] { return stopped || pulling == 0; }); } - // Flushing may resolve forks which can add more pulls - lock.unlock (); - node->block_processor.flush (); - lock.lock (); + // TODO: This check / wait is a heuristic and should be improved. + auto wait_start = std::chrono::steady_clock::now (); + while (!stopped && node->block_processor.size () != 0 && ((std::chrono::steady_clock::now () - wait_start) < std::chrono::seconds{ 10 })) + { + condition.wait_for (lock, std::chrono::milliseconds{ 100 }, [this, node] { return stopped || node->block_processor.size () == 0; }); + } if (start_account.number () != std::numeric_limits::max ()) { diff --git a/nano/slow_test/node.cpp b/nano/slow_test/node.cpp index 4310968dae..c36a60b4f9 100644 --- a/nano/slow_test/node.cpp +++ b/nano/slow_test/node.cpp @@ -1887,7 +1887,10 @@ TEST (node, mass_block_new) } ASSERT_TIMELY_EQ (200s, node.ledger.cache.block_count, next_block_count); next_block_count += num_blocks; - node.block_processor.flush (); + while (node.block_processor.size () > 0) + { + std::this_thread::sleep_for (std::chrono::milliseconds{ 100 }); + } // Clear all active { nano::lock_guard guard{ node.active.mutex };