diff --git a/nano/core_test/bootstrap.cpp b/nano/core_test/bootstrap.cpp index 34617b1d0a..e66e04545c 100644 --- a/nano/core_test/bootstrap.cpp +++ b/nano/core_test/bootstrap.cpp @@ -15,7 +15,7 @@ using namespace std::chrono_literals; TEST (bulk_pull, no_address) { nano::test::system system (1); - auto connection (std::make_shared (std::make_shared (*system.nodes[0], nano::transport::socket::endpoint_type_t::server), system.nodes[0])); + auto connection (std::make_shared (system.nodes[0], std::make_shared (*system.nodes[0], nano::transport::socket::endpoint_type_t::server))); auto req = std::make_unique (nano::dev::network_params.network); req->start = 1; req->end = 2; @@ -27,7 +27,7 @@ TEST (bulk_pull, no_address) TEST (bulk_pull, genesis_to_end) { nano::test::system system (1); - auto connection (std::make_shared (std::make_shared (*system.nodes[0], nano::transport::socket::endpoint_type_t::server), system.nodes[0])); + auto connection (std::make_shared (system.nodes[0], std::make_shared (*system.nodes[0], nano::transport::socket::endpoint_type_t::server))); auto req = std::make_unique (nano::dev::network_params.network); req->start = nano::dev::genesis_key.pub; req->end.clear (); @@ -40,7 +40,7 @@ TEST (bulk_pull, genesis_to_end) TEST (bulk_pull, no_end) { nano::test::system system (1); - auto connection (std::make_shared (std::make_shared (*system.nodes[0], nano::transport::socket::endpoint_type_t::server), system.nodes[0])); + auto connection (std::make_shared (system.nodes[0], std::make_shared (*system.nodes[0], nano::transport::socket::endpoint_type_t::server))); auto req = std::make_unique (nano::dev::network_params.network); req->start = nano::dev::genesis_key.pub; req->end = 1; @@ -72,7 +72,7 @@ TEST (bulk_pull, end_not_owned) open->signature = nano::sign_message (key2.prv, key2.pub, open->hash ()); system.nodes[0]->work_generate_blocking (*open); ASSERT_EQ (nano::block_status::progress, system.nodes[0]->process (open)); - auto connection (std::make_shared (std::make_shared (*system.nodes[0], nano::transport::socket::endpoint_type_t::server), system.nodes[0])); + auto connection (std::make_shared (system.nodes[0], std::make_shared (*system.nodes[0], nano::transport::socket::endpoint_type_t::server))); auto req = std::make_unique (nano::dev::network_params.network); req->start = key2.pub; req->end = nano::dev::genesis->hash (); @@ -83,7 +83,7 @@ TEST (bulk_pull, end_not_owned) TEST (bulk_pull, none) { nano::test::system system (1); - auto connection (std::make_shared (std::make_shared (*system.nodes[0], nano::transport::socket::endpoint_type_t::server), system.nodes[0])); + auto connection (std::make_shared (system.nodes[0], std::make_shared (*system.nodes[0], nano::transport::socket::endpoint_type_t::server))); auto req = std::make_unique (nano::dev::network_params.network); req->start = nano::dev::genesis_key.pub; req->end = nano::dev::genesis->hash (); @@ -95,7 +95,7 @@ TEST (bulk_pull, none) TEST (bulk_pull, get_next_on_open) { nano::test::system system (1); - auto connection (std::make_shared (std::make_shared (*system.nodes[0], nano::transport::socket::endpoint_type_t::server), system.nodes[0])); + auto connection (std::make_shared (system.nodes[0], std::make_shared (*system.nodes[0], nano::transport::socket::endpoint_type_t::server))); auto req = std::make_unique (nano::dev::network_params.network); req->start = nano::dev::genesis_key.pub; req->end.clear (); @@ -126,7 +126,7 @@ TEST (bulk_pull, ascending_one_hash) node.work_generate_blocking (*block1); ASSERT_EQ (nano::block_status::progress, node.process (block1)); auto socket = std::make_shared (node, nano::transport::socket::endpoint_type_t::server); - auto connection = std::make_shared (socket, system.nodes[0]); + auto connection = std::make_shared (system.nodes[0], socket); auto req = std::make_unique (nano::dev::network_params.network); req->start = nano::dev::genesis->hash (); req->end.clear (); @@ -158,7 +158,7 @@ TEST (bulk_pull, ascending_two_account) node.work_generate_blocking (*block1); ASSERT_EQ (nano::block_status::progress, node.process (block1)); auto socket = std::make_shared (node, nano::transport::socket::endpoint_type_t::server); - auto connection = std::make_shared (socket, system.nodes[0]); + auto connection = std::make_shared (system.nodes[0], socket); auto req = std::make_unique (nano::dev::network_params.network); req->start = nano::dev::genesis_key.pub; req->end.clear (); @@ -193,7 +193,7 @@ TEST (bulk_pull, ascending_end) node.work_generate_blocking (*block1); ASSERT_EQ (nano::block_status::progress, node.process (block1)); auto socket = std::make_shared (node, nano::transport::socket::endpoint_type_t::server); - auto connection = std::make_shared (socket, system.nodes[0]); + auto connection = std::make_shared (system.nodes[0], socket); auto req = std::make_unique (nano::dev::network_params.network); req->start = nano::dev::genesis_key.pub; req->end = block1->hash (); @@ -208,7 +208,7 @@ TEST (bulk_pull, ascending_end) TEST (bulk_pull, by_block) { nano::test::system system (1); - auto connection (std::make_shared (std::make_shared (*system.nodes[0], nano::transport::socket::endpoint_type_t::server), system.nodes[0])); + auto connection (std::make_shared (system.nodes[0], std::make_shared (*system.nodes[0], nano::transport::socket::endpoint_type_t::server))); auto req = std::make_unique (nano::dev::network_params.network); req->start = nano::dev::genesis->hash (); req->end.clear (); @@ -224,7 +224,7 @@ TEST (bulk_pull, by_block) TEST (bulk_pull, by_block_single) { nano::test::system system (1); - auto connection (std::make_shared (std::make_shared (*system.nodes[0], nano::transport::socket::endpoint_type_t::server), system.nodes[0])); + auto connection (std::make_shared (system.nodes[0], std::make_shared (*system.nodes[0], nano::transport::socket::endpoint_type_t::server))); auto req = std::make_unique (nano::dev::network_params.network); req->start = nano::dev::genesis->hash (); req->end = nano::dev::genesis->hash (); @@ -261,7 +261,7 @@ TEST (bulk_pull, count_limit) .build (); ASSERT_EQ (nano::block_status::progress, node0->process (receive1)); - auto connection (std::make_shared (std::make_shared (*node0, nano::transport::socket::endpoint_type_t::server), node0)); + auto connection (std::make_shared (node0, std::make_shared (*node0, nano::transport::socket::endpoint_type_t::server))); auto req = std::make_unique (nano::dev::network_params.network); req->start = receive1->hash (); req->set_count_present (true); @@ -1692,7 +1692,7 @@ TEST (frontier_req_response, DISABLED_destruction) std::shared_ptr hold; // Destructing tcp acceptor on non-existent io_context { nano::test::system system (1); - auto connection (std::make_shared (nullptr, system.nodes[0])); + auto connection (std::make_shared (system.nodes[0], nullptr)); auto req = std::make_unique (nano::dev::network_params.network); req->start.clear (); req->age = std::numeric_limitsage)>::max (); @@ -1706,7 +1706,7 @@ TEST (frontier_req_response, DISABLED_destruction) TEST (frontier_req, begin) { nano::test::system system (1); - auto connection (std::make_shared (std::make_shared (*system.nodes[0], nano::transport::socket::endpoint_type_t::server), system.nodes[0])); + auto connection (std::make_shared (system.nodes[0], std::make_shared (*system.nodes[0], nano::transport::socket::endpoint_type_t::server))); auto req = std::make_unique (nano::dev::network_params.network); req->start.clear (); req->age = std::numeric_limitsage)>::max (); @@ -1719,7 +1719,7 @@ TEST (frontier_req, begin) TEST (frontier_req, end) { nano::test::system system (1); - auto connection (std::make_shared (std::make_shared (*system.nodes[0], nano::transport::socket::endpoint_type_t::server), system.nodes[0])); + auto connection (std::make_shared (system.nodes[0], std::make_shared (*system.nodes[0], nano::transport::socket::endpoint_type_t::server))); auto req = std::make_unique (nano::dev::network_params.network); req->start = nano::dev::genesis_key.pub.number () + 1; req->age = std::numeric_limitsage)>::max (); @@ -1760,7 +1760,7 @@ TEST (frontier_req, count) node1->work_generate_blocking (*receive1); ASSERT_EQ (nano::block_status::progress, node1->process (receive1)); - auto connection (std::make_shared (std::make_shared (*node1, nano::transport::socket::endpoint_type_t::server), node1)); + auto connection (std::make_shared (node1, std::make_shared (*node1, nano::transport::socket::endpoint_type_t::server))); auto req = std::make_unique (nano::dev::network_params.network); req->start.clear (); req->age = std::numeric_limitsage)>::max (); @@ -1773,7 +1773,7 @@ TEST (frontier_req, count) TEST (frontier_req, time_bound) { nano::test::system system (1); - auto connection (std::make_shared (std::make_shared (*system.nodes[0], nano::transport::socket::endpoint_type_t::server), system.nodes[0])); + auto connection (std::make_shared (system.nodes[0], std::make_shared (*system.nodes[0], nano::transport::socket::endpoint_type_t::server))); auto req = std::make_unique (nano::dev::network_params.network); req->start.clear (); req->age = 1; @@ -1786,7 +1786,7 @@ TEST (frontier_req, time_bound) req2->start.clear (); req2->age = 1; req2->count = std::numeric_limitscount)>::max (); - auto connection2 (std::make_shared (std::make_shared (*system.nodes[0], nano::transport::socket::endpoint_type_t::server), system.nodes[0])); + auto connection2 (std::make_shared (system.nodes[0], std::make_shared (*system.nodes[0], nano::transport::socket::endpoint_type_t::server))); auto request2 (std::make_shared (connection, std::move (req2))); ASSERT_TRUE (request2->current.is_zero ()); } @@ -1794,7 +1794,7 @@ TEST (frontier_req, time_bound) TEST (frontier_req, time_cutoff) { nano::test::system system (1); - auto connection (std::make_shared (std::make_shared (*system.nodes[0], nano::transport::socket::endpoint_type_t::server), system.nodes[0])); + auto connection (std::make_shared (system.nodes[0], std::make_shared (*system.nodes[0], nano::transport::socket::endpoint_type_t::server))); auto req = std::make_unique (nano::dev::network_params.network); req->start.clear (); req->age = 3; @@ -1808,7 +1808,7 @@ TEST (frontier_req, time_cutoff) req2->start.clear (); req2->age = 3; req2->count = std::numeric_limitscount)>::max (); - auto connection2 (std::make_shared (std::make_shared (*system.nodes[0], nano::transport::socket::endpoint_type_t::server), system.nodes[0])); + auto connection2 (std::make_shared (system.nodes[0], std::make_shared (*system.nodes[0], nano::transport::socket::endpoint_type_t::server))); auto request2 (std::make_shared (connection, std::move (req2))); ASSERT_TRUE (request2->frontier.is_zero ()); } @@ -1880,7 +1880,7 @@ TEST (frontier_req, confirmed_frontier) ASSERT_EQ (nano::block_status::progress, node1->process (receive2)); // Request for all accounts (confirmed only) - auto connection (std::make_shared (std::make_shared (*node1, nano::transport::socket::endpoint_type_t::server), node1)); + auto connection (std::make_shared (node1, std::make_shared (*node1, nano::transport::socket::endpoint_type_t::server))); auto req = std::make_unique (nano::dev::network_params.network); req->start.clear (); req->age = std::numeric_limitsage)>::max (); @@ -1893,7 +1893,7 @@ TEST (frontier_req, confirmed_frontier) ASSERT_EQ (nano::dev::genesis->hash (), request->frontier); // Request starting with account before genesis (confirmed only) - auto connection2 (std::make_shared (std::make_shared (*node1, nano::transport::socket::endpoint_type_t::server), node1)); + auto connection2 (std::make_shared (node1, std::make_shared (*node1, nano::transport::socket::endpoint_type_t::server))); auto req2 = std::make_unique (nano::dev::network_params.network); req2->start = key_before_genesis.pub; req2->age = std::numeric_limitsage)>::max (); @@ -1906,7 +1906,7 @@ TEST (frontier_req, confirmed_frontier) ASSERT_EQ (nano::dev::genesis->hash (), request2->frontier); // Request starting with account after genesis (confirmed only) - auto connection3 (std::make_shared (std::make_shared (*node1, nano::transport::socket::endpoint_type_t::server), node1)); + auto connection3 (std::make_shared (node1, std::make_shared (*node1, nano::transport::socket::endpoint_type_t::server))); auto req3 = std::make_unique (nano::dev::network_params.network); req3->start = key_after_genesis.pub; req3->age = std::numeric_limitsage)>::max (); @@ -1919,7 +1919,7 @@ TEST (frontier_req, confirmed_frontier) ASSERT_TRUE (request3->frontier.is_zero ()); // Request for all accounts (unconfirmed blocks) - auto connection4 (std::make_shared (std::make_shared (*node1, nano::transport::socket::endpoint_type_t::server), node1)); + auto connection4 (std::make_shared (node1, std::make_shared (*node1, nano::transport::socket::endpoint_type_t::server))); auto req4 = std::make_unique (nano::dev::network_params.network); req4->start.clear (); req4->age = std::numeric_limitsage)>::max (); @@ -1930,7 +1930,7 @@ TEST (frontier_req, confirmed_frontier) ASSERT_EQ (receive1->hash (), request4->frontier); // Request starting with account after genesis (unconfirmed blocks) - auto connection5 (std::make_shared (std::make_shared (*node1, nano::transport::socket::endpoint_type_t::server), node1)); + auto connection5 (std::make_shared (node1, std::make_shared (*node1, nano::transport::socket::endpoint_type_t::server))); auto req5 = std::make_unique (nano::dev::network_params.network); req5->start = key_after_genesis.pub; req5->age = std::numeric_limitsage)>::max (); @@ -1943,7 +1943,7 @@ TEST (frontier_req, confirmed_frontier) // Confirm account before genesis (confirmed only) ASSERT_TRUE (nano::test::start_elections (system, *node1, { send1, receive1 }, true)); ASSERT_TIMELY (5s, node1->block_confirmed (send1->hash ()) && node1->block_confirmed (receive1->hash ())); - auto connection6 (std::make_shared (std::make_shared (*node1, nano::transport::socket::endpoint_type_t::server), node1)); + auto connection6 (std::make_shared (node1, std::make_shared (*node1, nano::transport::socket::endpoint_type_t::server))); auto req6 = std::make_unique (nano::dev::network_params.network); req6->start = key_before_genesis.pub; req6->age = std::numeric_limitsage)>::max (); @@ -1958,7 +1958,7 @@ TEST (frontier_req, confirmed_frontier) // Confirm account after genesis (confirmed only) ASSERT_TRUE (nano::test::start_elections (system, *node1, { send2, receive2 }, true)); ASSERT_TIMELY (5s, node1->block_confirmed (send2->hash ()) && node1->block_confirmed (receive2->hash ())); - auto connection7 (std::make_shared (std::make_shared (*node1, nano::transport::socket::endpoint_type_t::server), node1)); + auto connection7 (std::make_shared (node1, std::make_shared (*node1, nano::transport::socket::endpoint_type_t::server))); auto req7 = std::make_unique (nano::dev::network_params.network); req7->start = key_after_genesis.pub; req7->age = std::numeric_limitsage)>::max (); @@ -2128,7 +2128,7 @@ TEST (bulk_pull_account, basics) auto send2 (system.wallet (0)->send_action (nano::dev::genesis_key.pub, key1.pub, 10)); auto send3 (system.wallet (0)->send_action (nano::dev::genesis_key.pub, key1.pub, 2)); ASSERT_TIMELY_EQ (5s, system.nodes[0]->balance (key1.pub), 25); - auto connection (std::make_shared (std::make_shared (*system.nodes[0], nano::transport::socket::endpoint_type_t::server), system.nodes[0])); + auto connection (std::make_shared (system.nodes[0], std::make_shared (*system.nodes[0], nano::transport::socket::endpoint_type_t::server))); { auto req = std::make_unique (nano::dev::network_params.network); diff --git a/nano/core_test/network.cpp b/nano/core_test/network.cpp index 6ada05260b..0115e5f472 100644 --- a/nano/core_test/network.cpp +++ b/nano/core_test/network.cpp @@ -992,38 +992,50 @@ TEST (network, tcp_no_connect_excluded_peers) ASSERT_TIMELY_EQ (5s, node0->network.size (), 1); } -namespace nano -{ TEST (network, tcp_message_manager) { - nano::tcp_message_manager manager (1); - nano::tcp_message_item item; - item.node_id = nano::account (100); - ASSERT_EQ (0, manager.entries.size ()); - manager.put_message (item); - ASSERT_EQ (1, manager.entries.size ()); - ASSERT_EQ (manager.get_message ().node_id, item.node_id); - ASSERT_EQ (0, manager.entries.size ()); + nano::test::system system{ 1 }; + + nano::message_queue manager (1); + + auto channel1 = nano::test::fake_channel (system.node (0)); + auto make_message = [&] () { + auto message = std::make_unique (system.nodes[0]->network_params.network); + return message; + }; + + ASSERT_EQ (0, manager.size ()); + manager.put_message (make_message (), channel1); + ASSERT_EQ (1, manager.size ()); + auto item = manager.get_message (); + ASSERT_EQ (item.message->type (), nano::message_type::keepalive); + ASSERT_EQ (item.channel, channel1); + ASSERT_EQ (0, manager.size ()); // Fill the queue - manager.entries = decltype (manager.entries) (manager.max_entries, item); - ASSERT_EQ (manager.entries.size (), manager.max_entries); + auto channel2 = nano::test::fake_channel (system.node (0)); + for (int n = 0; n < manager.max_entries; ++n) + { + manager.put_message (make_message (), channel2); + } // This task will wait until a message is consumed auto future = std::async (std::launch::async, [&] { - manager.put_message (item); + manager.put_message (make_message (), channel2); }); // This should give sufficient time to execute put_message // and prove that it waits on condition variable std::this_thread::sleep_for (200ms); - ASSERT_EQ (manager.entries.size (), manager.max_entries); - ASSERT_EQ (manager.get_message ().node_id, item.node_id); + ASSERT_EQ (manager.size (), manager.max_entries); + ASSERT_EQ (manager.get_message ().channel, channel2); ASSERT_NE (std::future_status::timeout, future.wait_for (1s)); - ASSERT_EQ (manager.entries.size (), manager.max_entries); + ASSERT_EQ (manager.size (), manager.max_entries); + + nano::message_queue manager2 (2); + auto channel3 = nano::test::fake_channel (system.node (0)); - nano::tcp_message_manager manager2 (2); size_t message_count = 10'000; std::vector consumers; for (auto i = 0; i < 4; ++i) @@ -1031,7 +1043,7 @@ TEST (network, tcp_message_manager) consumers.emplace_back ([&] { for (auto i = 0; i < message_count; ++i) { - ASSERT_EQ (manager.get_message ().node_id, item.node_id); + ASSERT_EQ (manager.get_message ().channel, channel3); } }); } @@ -1041,7 +1053,7 @@ TEST (network, tcp_message_manager) producers.emplace_back ([&] { for (auto i = 0; i < message_count; ++i) { - manager.put_message (item); + manager.put_message (make_message (), channel3); } }); } @@ -1051,7 +1063,6 @@ TEST (network, tcp_message_manager) t.join (); } } -} TEST (network, cleanup_purge) { diff --git a/nano/node/network.cpp b/nano/node/network.cpp index 74f909a900..802c619f1e 100644 --- a/nano/node/network.cpp +++ b/nano/node/network.cpp @@ -735,6 +735,12 @@ void nano::message_queue::stop () producer_condition.notify_all (); } +size_t nano::message_queue::size () const +{ + nano::lock_guard lock{ mutex }; + return entries.size (); +} + /* * syn_cookies */ diff --git a/nano/node/network.hpp b/nano/node/network.hpp index 06d5883489..95681ac4c8 100644 --- a/nano/node/network.hpp +++ b/nano/node/network.hpp @@ -27,21 +27,23 @@ class message_queue final public: message_queue (unsigned incoming_connections_max_a); - void put_message (std::unique_ptr, std::shared_ptr const &); - entry get_message (); + // Stop container and notify waiting threads void stop (); + size_t size () const; + + void put_message (std::unique_ptr, std::shared_ptr const &); + entry get_message (); + + unsigned const max_entries; private: - nano::mutex mutex; + mutable nano::mutex mutex; nano::condition_variable producer_condition; nano::condition_variable consumer_condition; std::deque entries; - unsigned max_entries; static unsigned const max_entries_per_connection = 16; bool stopped{ false }; - - friend class network_tcp_message_manager_Test; }; /**