diff --git a/CMakeLists.txt b/CMakeLists.txt index 4da28f153a..296491d1fe 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -116,8 +116,8 @@ set(NANO_FUZZER_TEST OFF CACHE BOOL "") set(NANO_ASIO_HANDLER_TRACKING - 0 - CACHE STRING "") + OFF + CACHE BOOL "") set(NANO_ROCKSDB_TOOLS OFF CACHE BOOL "") @@ -153,9 +153,8 @@ if(${NANO_TIMED_LOCKS} GREATER 0) endif() endif() -if(${NANO_ASIO_HANDLER_TRACKING} GREATER 0) - add_definitions(-DNANO_ASIO_HANDLER_TRACKING=${NANO_ASIO_HANDLER_TRACKING} - -DBOOST_ASIO_ENABLE_HANDLER_TRACKING) +if(NANO_ASIO_HANDLER_TRACKING) + add_definitions(-DBOOST_ASIO_ENABLE_HANDLER_TRACKING) endif() option(NANO_SIMD_OPTIMIZATIONS diff --git a/nano/core_test/active_elections.cpp b/nano/core_test/active_elections.cpp index 7634b46557..57bdc2c056 100644 --- a/nano/core_test/active_elections.cpp +++ b/nano/core_test/active_elections.cpp @@ -970,7 +970,7 @@ TEST (active_elections, fork_replacement_tally) node_config.peering_port = system.get_available_port (); auto & node2 (*system.add_node (node_config)); node1.network.filter.clear (); - node2.network.flood_block (send_last); + node2.network.flood_block (send_last, nano::transport::traffic_type::test); ASSERT_TIMELY (3s, node1.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in) > 0); // Correct block without votes is ignored @@ -984,7 +984,7 @@ TEST (active_elections, fork_replacement_tally) // ensure vote arrives before the block ASSERT_TIMELY_EQ (5s, 1, node1.vote_cache.find (send_last->hash ()).size ()); node1.network.filter.clear (); - node2.network.flood_block (send_last); + node2.network.flood_block (send_last, nano::transport::traffic_type::test); ASSERT_TIMELY (5s, node1.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in) > 1); // the send_last block should replace one of the existing block of the election because it has higher vote weight diff --git a/nano/core_test/network.cpp b/nano/core_test/network.cpp index bedc3aaf2a..00e3593ed7 100644 --- a/nano/core_test/network.cpp +++ b/nano/core_test/network.cpp @@ -133,8 +133,8 @@ TEST (network, last_contacted) { // check that the endpoints are part of the same connection - std::shared_ptr sock0 = channel0->socket.lock (); - std::shared_ptr sock1 = channel1->socket.lock (); + std::shared_ptr sock0 = channel0->socket; + std::shared_ptr sock1 = channel1->socket; ASSERT_EQ (sock0->local_endpoint (), sock1->remote_endpoint ()); ASSERT_EQ (sock1->local_endpoint (), sock0->remote_endpoint ()); } @@ -195,7 +195,7 @@ TEST (network, send_discarded_publish) .build (); { auto transaction = node1.ledger.tx_begin_read (); - node1.network.flood_block (block); + node1.network.flood_block (block, nano::transport::traffic_type::test); ASSERT_EQ (nano::dev::genesis->hash (), node1.ledger.any.account_head (transaction, nano::dev::genesis_key.pub)); ASSERT_EQ (nano::dev::genesis->hash (), node2.latest (nano::dev::genesis_key.pub)); } @@ -221,7 +221,7 @@ TEST (network, send_invalid_publish) .build (); { auto transaction = node1.ledger.tx_begin_read (); - node1.network.flood_block (block); + node1.network.flood_block (block, nano::transport::traffic_type::test); ASSERT_EQ (nano::dev::genesis->hash (), node1.ledger.any.account_head (transaction, nano::dev::genesis_key.pub)); ASSERT_EQ (nano::dev::genesis->hash (), node2.latest (nano::dev::genesis_key.pub)); } @@ -306,7 +306,7 @@ TEST (network, send_insufficient_work) nano::publish publish1{ nano::dev::network_params.network, block1 }; auto tcp_channel (node1.network.tcp_channels.find_node_id (node2.get_node_id ())); ASSERT_NE (nullptr, tcp_channel); - tcp_channel->send (publish1, [] (boost::system::error_code const & ec, size_t size) {}); + tcp_channel->send (publish1, nano::transport::traffic_type::test); ASSERT_EQ (0, node1.stats.count (nano::stat::type::error, nano::stat::detail::insufficient_work)); ASSERT_TIMELY (10s, node2.stats.count (nano::stat::type::error, nano::stat::detail::insufficient_work) != 0); ASSERT_EQ (1, node2.stats.count (nano::stat::type::error, nano::stat::detail::insufficient_work)); @@ -320,7 +320,7 @@ TEST (network, send_insufficient_work) .work (system.work_generate_limited (block1->hash (), node1.network_params.work.epoch_2_receive, node1.network_params.work.epoch_1 - 1)) .build (); nano::publish publish2{ nano::dev::network_params.network, block2 }; - tcp_channel->send (publish2, [] (boost::system::error_code const & ec, size_t size) {}); + tcp_channel->send (publish2, nano::transport::traffic_type::test); ASSERT_TIMELY (10s, node2.stats.count (nano::stat::type::error, nano::stat::detail::insufficient_work) != 1); ASSERT_EQ (2, node2.stats.count (nano::stat::type::error, nano::stat::detail::insufficient_work)); // Legacy block work epoch_1 @@ -333,7 +333,7 @@ TEST (network, send_insufficient_work) .work (*system.work.generate (block2->hash (), node1.network_params.work.epoch_2)) .build (); nano::publish publish3{ nano::dev::network_params.network, block3 }; - tcp_channel->send (publish3, [] (boost::system::error_code const & ec, size_t size) {}); + tcp_channel->send (publish3, nano::transport::traffic_type::test); ASSERT_EQ (0, node2.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in)); ASSERT_TIMELY (10s, node2.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in) != 0); ASSERT_EQ (1, node2.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in)); @@ -349,7 +349,7 @@ TEST (network, send_insufficient_work) .work (system.work_generate_limited (block1->hash (), node1.network_params.work.epoch_2_receive, node1.network_params.work.epoch_1 - 1)) .build (); nano::publish publish4{ nano::dev::network_params.network, block4 }; - tcp_channel->send (publish4, [] (boost::system::error_code const & ec, size_t size) {}); + tcp_channel->send (publish4, nano::transport::traffic_type::test); ASSERT_TIMELY (10s, node2.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in) != 0); ASSERT_EQ (1, node2.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in)); ASSERT_EQ (2, node2.stats.count (nano::stat::type::error, nano::stat::detail::insufficient_work)); @@ -562,14 +562,14 @@ TEST (network, peer_max_tcp_attempts) node_config.network.max_peers_per_ip = 3; auto node = system.add_node (node_config, node_flags); - for (auto i (0); i < node_config.network.max_peers_per_ip; ++i) + for (auto i = 0; i < node_config.network.max_peers_per_ip; ++i) { - auto node2 (std::make_shared (system.io_ctx, system.get_available_port (), nano::unique_path (), system.work, node_flags)); - node2->start (); - system.nodes.push_back (node2); - - // Start TCP attempt - node->network.merge_peer (node2->network.endpoint ()); + // Disable reachout from temporary nodes to avoid mixing outbound and inbound connections + nano::node_config temp_config = system.default_config (); + temp_config.network.peer_reachout = {}; + temp_config.network.cached_peer_reachout = {}; + auto temp_node = system.make_disconnected_node (temp_config, node_flags); + ASSERT_TRUE (node->network.merge_peer (temp_node->network.endpoint ())); } ASSERT_TIMELY_EQ (15s, node->network.size (), node_config.network.max_peers_per_ip); @@ -632,9 +632,9 @@ TEST (network, duplicate_detection) ASSERT_NE (nullptr, tcp_channel); ASSERT_EQ (0, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_publish_message)); - tcp_channel->send (publish); + tcp_channel->send (publish, nano::transport::traffic_type::test); ASSERT_ALWAYS_EQ (100ms, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_publish_message), 0); - tcp_channel->send (publish); + tcp_channel->send (publish, nano::transport::traffic_type::test); ASSERT_TIMELY_EQ (2s, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_publish_message), 1); } @@ -681,9 +681,9 @@ TEST (network, duplicate_vote_detection) ASSERT_NE (nullptr, tcp_channel); ASSERT_EQ (0, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack_message)); - tcp_channel->send (message); + tcp_channel->send (message, nano::transport::traffic_type::test); ASSERT_ALWAYS_EQ (100ms, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack_message), 0); - tcp_channel->send (message); + tcp_channel->send (message, nano::transport::traffic_type::test); ASSERT_TIMELY_EQ (2s, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack_message), 1); } @@ -711,12 +711,12 @@ TEST (network, duplicate_revert_vote) ASSERT_NE (nullptr, tcp_channel); // First vote should be processed - tcp_channel->send (message1); + tcp_channel->send (message1, nano::transport::traffic_type::test); ASSERT_ALWAYS_EQ (100ms, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack_message), 0); ASSERT_TIMELY (5s, node1.network.filter.check (bytes1.data (), bytes1.size ())); // Second vote should get dropped from processor queue - tcp_channel->send (message2); + tcp_channel->send (message2, nano::transport::traffic_type::test); ASSERT_ALWAYS_EQ (100ms, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack_message), 0); // And the filter should not have it WAIT (500ms); // Give the node time to process the vote @@ -741,9 +741,9 @@ TEST (network, expire_duplicate_filter) // Send a vote ASSERT_EQ (0, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack_message)); - tcp_channel->send (message); + tcp_channel->send (message, nano::transport::traffic_type::test); ASSERT_ALWAYS_EQ (100ms, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack_message), 0); - tcp_channel->send (message); + tcp_channel->send (message, nano::transport::traffic_type::test); ASSERT_TIMELY_EQ (2s, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack_message), 1); // The filter should expire the vote after some time @@ -752,7 +752,7 @@ TEST (network, expire_duplicate_filter) } // The test must be completed in less than 1 second -TEST (network, bandwidth_limiter_4_messages) +TEST (network, DISABLED_bandwidth_limiter_4_messages) { nano::test::system system; nano::publish message{ nano::dev::network_params.network, nano::dev::genesis }; @@ -767,22 +767,22 @@ TEST (network, bandwidth_limiter_4_messages) // Send droppable messages for (auto i = 0; i < message_limit; i += 2) // number of channels { - channel1.send (message); - channel2.send (message); + channel1.send (message, nano::transport::traffic_type::test); + channel2.send (message, nano::transport::traffic_type::test); } // Only sent messages below limit, so we don't expect any drops ASSERT_TIMELY_EQ (1s, 0, node.stats.count (nano::stat::type::drop, nano::stat::detail::publish, nano::stat::dir::out)); // Send droppable message; drop stats should increase by one now - channel1.send (message); + channel1.send (message, nano::transport::traffic_type::test); ASSERT_TIMELY_EQ (1s, 1, node.stats.count (nano::stat::type::drop, nano::stat::detail::publish, nano::stat::dir::out)); // Send non-droppable message, i.e. drop stats should not increase - channel2.send (message, nullptr, nano::transport::buffer_drop_policy::no_limiter_drop); + channel2.send (message, nano::transport::traffic_type::test); ASSERT_TIMELY_EQ (1s, 1, node.stats.count (nano::stat::type::drop, nano::stat::detail::publish, nano::stat::dir::out)); } -TEST (network, bandwidth_limiter_2_messages) +TEST (network, DISABLED_bandwidth_limiter_2_messages) { nano::test::system system; nano::publish message{ nano::dev::network_params.network, nano::dev::genesis }; @@ -795,10 +795,10 @@ TEST (network, bandwidth_limiter_2_messages) nano::transport::inproc::channel channel1{ node, node }; nano::transport::inproc::channel channel2{ node, node }; // change the bandwidth settings, 2 packets will be dropped - channel1.send (message); - channel2.send (message); - channel1.send (message); - channel2.send (message); + channel1.send (message, nano::transport::traffic_type::test); + channel2.send (message, nano::transport::traffic_type::test); + channel1.send (message, nano::transport::traffic_type::test); + channel2.send (message, nano::transport::traffic_type::test); ASSERT_TIMELY_EQ (1s, 2, node.stats.count (nano::stat::type::drop, nano::stat::detail::publish, nano::stat::dir::out)); } @@ -815,10 +815,10 @@ TEST (network, bandwidth_limiter_with_burst) nano::transport::inproc::channel channel1{ node, node }; nano::transport::inproc::channel channel2{ node, node }; // change the bandwidth settings, no packet will be dropped - channel1.send (message); - channel2.send (message); - channel1.send (message); - channel2.send (message); + channel1.send (message, nano::transport::traffic_type::test); + channel2.send (message, nano::transport::traffic_type::test); + channel1.send (message, nano::transport::traffic_type::test); + channel2.send (message, nano::transport::traffic_type::test); ASSERT_TIMELY_EQ (1s, 0, node.stats.count (nano::stat::type::drop, nano::stat::detail::publish, nano::stat::dir::out)); } @@ -962,7 +962,7 @@ TEST (network, filter_invalid_network_bytes) // send a keepalive, from node2 to node1, with the wrong network bytes nano::keepalive keepalive{ nano::dev::network_params.network }; const_cast (keepalive.header.network) = nano::networks::invalid; - channel->send (keepalive); + channel->send (keepalive, nano::transport::traffic_type::test); ASSERT_TIMELY_EQ (5s, 1, node1.stats.count (nano::stat::type::error, nano::stat::detail::invalid_network)); } @@ -981,7 +981,7 @@ TEST (network, filter_invalid_version_using) // send a keepalive, from node2 to node1, with the wrong version_using nano::keepalive keepalive{ nano::dev::network_params.network }; const_cast (keepalive.header.version_using) = nano::dev::network_params.network.protocol_version_min - 1; - channel->send (keepalive); + channel->send (keepalive, nano::transport::traffic_type::test); ASSERT_TIMELY_EQ (5s, 1, node1.stats.count (nano::stat::type::error, nano::stat::detail::outdated_version)); } @@ -1068,8 +1068,8 @@ TEST (network, purge_dead_channel) auto & node1 = *system.add_node (flags); - node1.observers.socket_connected.add ([&] (nano::transport::tcp_socket & sock) { - system.logger.debug (nano::log::type::test, "Connected: {}", sock); + node1.observers.socket_connected.add ([&] (auto const & socket) { + system.logger.debug (nano::log::type::test, "Connected socket: {}", nano::streamed (socket)); }); auto & node2 = *system.add_node (flags); @@ -1119,8 +1119,8 @@ TEST (network, purge_dead_channel_remote) auto & node1 = *system.add_node (flags); auto & node2 = *system.add_node (flags); - node2.observers.socket_connected.add ([&] (nano::transport::tcp_socket & sock) { - system.logger.debug (nano::log::type::test, "Connected: {}", sock); + node2.observers.socket_connected.add ([&] (auto const & socket) { + system.logger.debug (nano::log::type::test, "Connected socket: {}", nano::streamed (socket)); }); ASSERT_EQ (node1.network.size (), 1); diff --git a/nano/core_test/node.cpp b/nano/core_test/node.cpp index 2004a9c942..21801313c4 100644 --- a/nano/core_test/node.cpp +++ b/nano/core_test/node.cpp @@ -473,7 +473,7 @@ TEST (node, confirm_locked) .sign (nano::keypair ().prv, 0) .work (0) .build (); - system.nodes[0]->network.flood_block (block); + system.nodes[0]->network.flood_block (block, nano::transport::traffic_type::test); } TEST (node_config, random_rep) @@ -1005,14 +1005,9 @@ TEST (node, fork_no_vote_quorum) ASSERT_FALSE (system.wallet (1)->store.fetch (transaction, key1, key3)); auto vote = std::make_shared (key1, key3, 0, 0, std::vector{ send2->hash () }); nano::confirm_ack confirm{ nano::dev::network_params.network, vote }; - std::vector buffer; - { - nano::vectorstream stream (buffer); - confirm.serialize (stream); - } auto channel = node2.network.find_node_id (node3.node_id.pub); ASSERT_NE (nullptr, channel); - channel->send_buffer (nano::shared_const_buffer (std::move (buffer))); + channel->send (confirm, nano::transport::traffic_type::test); ASSERT_TIMELY (10s, node3.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::in) >= 3); ASSERT_EQ (node1.latest (nano::dev::genesis_key.pub), send1->hash ()); ASSERT_EQ (node2.latest (nano::dev::genesis_key.pub), send1->hash ()); @@ -2662,13 +2657,6 @@ TEST (node, dont_write_lock_node) TEST (node, bidirectional_tcp) { -#ifdef _WIN32 - if (nano::rocksdb_config::using_rocksdb_in_tests ()) - { - // Don't test this in rocksdb mode - GTEST_SKIP (); - } -#endif nano::test::system system; nano::node_flags node_flags; // Disable bootstrap to start elections for new blocks diff --git a/nano/core_test/peer_container.cpp b/nano/core_test/peer_container.cpp index 44f7993b7a..7857f43fef 100644 --- a/nano/core_test/peer_container.cpp +++ b/nano/core_test/peer_container.cpp @@ -55,7 +55,7 @@ TEST (peer_container, reserved_ip_is_not_a_peer) // Test the TCP channel cleanup function works properly. It is used to remove peers that are not // exchanging messages after a while. -TEST (peer_container, tcp_channel_cleanup_works) +TEST (peer_container, DISABLED_tcp_channel_cleanup_works) { nano::test::system system; nano::node_config node_config = system.default_config (); @@ -90,6 +90,7 @@ TEST (peer_container, tcp_channel_cleanup_works) for (auto it = 0; node1.network.tcp_channels.size () > 1 && it < 10; ++it) { + // FIXME: This is racy and doesn't work reliably // we can't control everything the nodes are doing in background, so using the middle time as // the cutoff point. auto const channel1_last_packet_sent = channel1->get_last_packet_sent (); @@ -254,7 +255,7 @@ TEST (peer_container, depeer_on_outdated_version) nano::keepalive keepalive{ nano::dev::network_params.network }; const_cast (keepalive.header.version_using) = nano::dev::network_params.network.protocol_version_min - 1; ASSERT_TIMELY (5s, channel->alive ()); - channel->send (keepalive); + channel->send (keepalive, nano::transport::traffic_type::test); ASSERT_TIMELY (5s, !channel->alive ()); } diff --git a/nano/core_test/rep_crawler.cpp b/nano/core_test/rep_crawler.cpp index e9d0ba8d41..eae30fe8bf 100644 --- a/nano/core_test/rep_crawler.cpp +++ b/nano/core_test/rep_crawler.cpp @@ -323,7 +323,7 @@ TEST (rep_crawler, ignore_rebroadcasted) auto tick = [&] () { nano::confirm_ack msg{ nano::dev::network_params.network, vote, /* rebroadcasted */ true }; - channel2to1->send (msg, nullptr, nano::transport::buffer_drop_policy::no_socket_drop); + channel2to1->send (msg, nano::transport::traffic_type::test); return false; }; diff --git a/nano/core_test/request_aggregator.cpp b/nano/core_test/request_aggregator.cpp index 33b96df5fc..b2c64ced51 100644 --- a/nano/core_test/request_aggregator.cpp +++ b/nano/core_test/request_aggregator.cpp @@ -38,8 +38,9 @@ TEST (request_aggregator, one) std::vector> request{ { send1->hash (), send1->root () } }; - auto client = std::make_shared (node); - std::shared_ptr dummy_channel = std::make_shared (node, client); + // auto client = std::make_shared (node); + // std::shared_ptr dummy_channel = std::make_shared (node, client); + auto dummy_channel = nano::test::fake_channel (node); // Not yet in the ledger node.aggregator.request (request, dummy_channel); @@ -179,8 +180,9 @@ TEST (request_aggregator, two) std::vector> request; request.emplace_back (send2->hash (), send2->root ()); request.emplace_back (receive1->hash (), receive1->root ()); - auto client = std::make_shared (node); - std::shared_ptr dummy_channel = std::make_shared (node, client); + + auto dummy_channel = nano::test::fake_channel (node); + // Process both blocks node.aggregator.request (request, dummy_channel); // One vote should be generated for both blocks @@ -298,8 +300,9 @@ TEST (request_aggregator, split) } ASSERT_TIMELY_EQ (5s, max_vbh + 2, node.ledger.cemented_count ()); ASSERT_EQ (max_vbh + 1, request.size ()); - auto client = std::make_shared (node); - std::shared_ptr dummy_channel = std::make_shared (node, client); + + auto dummy_channel = nano::test::fake_channel (node); + node.aggregator.request (request, dummy_channel); // In the ledger but no vote generated yet ASSERT_TIMELY_EQ (3s, 2, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes)); @@ -337,8 +340,9 @@ TEST (request_aggregator, channel_max_queue) ASSERT_EQ (nano::block_status::progress, node.ledger.process (node.ledger.tx_begin_write (), send1)); std::vector> request; request.emplace_back (send1->hash (), send1->root ()); - auto client = std::make_shared (node); - std::shared_ptr dummy_channel = std::make_shared (node, client); + + auto dummy_channel = nano::test::fake_channel (node); + node.aggregator.request (request, dummy_channel); node.aggregator.request (request, dummy_channel); ASSERT_LT (0, node.stats.count (nano::stat::type::aggregator, nano::stat::detail::aggregator_dropped)); @@ -366,8 +370,9 @@ TEST (request_aggregator, DISABLED_unique) ASSERT_EQ (nano::block_status::progress, node.ledger.process (node.ledger.tx_begin_write (), send1)); std::vector> request; request.emplace_back (send1->hash (), send1->root ()); - auto client = std::make_shared (node); - std::shared_ptr dummy_channel = std::make_shared (node, client); + + auto dummy_channel = nano::test::fake_channel (node); + node.aggregator.request (request, dummy_channel); node.aggregator.request (request, dummy_channel); node.aggregator.request (request, dummy_channel); @@ -410,8 +415,8 @@ TEST (request_aggregator, cannot_vote) // Incorrect hash, correct root request.emplace_back (1, send2->root ()); - auto client = std::make_shared (node); - std::shared_ptr dummy_channel = std::make_shared (node, client); + auto dummy_channel = nano::test::fake_channel (node); + node.aggregator.request (request, dummy_channel); ASSERT_TIMELY (3s, node.aggregator.empty ()); ASSERT_EQ (1, node.stats.count (nano::stat::type::aggregator, nano::stat::detail::aggregator_accepted)); diff --git a/nano/core_test/socket.cpp b/nano/core_test/socket.cpp index 89066f87c0..d0c213a066 100644 --- a/nano/core_test/socket.cpp +++ b/nano/core_test/socket.cpp @@ -11,6 +11,7 @@ #include +#include #include #include #include @@ -126,9 +127,10 @@ TEST (socket, drop_policy) nano::inactive_node inactivenode (nano::unique_path (), node_flags); auto node = inactivenode.node; - std::vector> connections; + std::atomic completed_writes{ 0 }; + std::atomic failed_writes{ 0 }; - auto func = [&] (size_t total_message_count, nano::transport::buffer_drop_policy drop_policy) { + auto func = [&] (size_t total_message_count) { boost::asio::ip::tcp::endpoint endpoint (boost::asio::ip::address_v6::loopback (), system.get_available_port ()); boost::asio::ip::tcp::acceptor acceptor (node->io_ctx); acceptor.open (endpoint.protocol ()); @@ -141,38 +143,39 @@ TEST (socket, drop_policy) }); auto client = std::make_shared (*node); - auto channel = std::make_shared (*node, client); - std::atomic completed_writes{ 0 }; + completed_writes = 0; + failed_writes = 0; client->async_connect (boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v6::loopback (), acceptor.local_endpoint ().port ()), - [&channel, total_message_count, node, &completed_writes, &drop_policy, client] (boost::system::error_code const & ec_a) mutable { + [&] (boost::system::error_code const & ec_a) mutable { for (int i = 0; i < total_message_count; i++) { std::vector buff (1); - channel->send_buffer ( - nano::shared_const_buffer (std::move (buff)), [&completed_writes, client] (boost::system::error_code const & ec, size_t size_a) mutable { - client.reset (); - ++completed_writes; - }, - drop_policy); + client->async_write (nano::shared_const_buffer (std::move (buff)), [&] (boost::system::error_code const & ec, size_t size_a) { + if (!ec) + { + ++completed_writes; + } + else + { + ++failed_writes; + } + }); } }); - ASSERT_TIMELY_EQ (5s, completed_writes, total_message_count); + ASSERT_TIMELY_EQ (5s, completed_writes + failed_writes, total_message_count); ASSERT_EQ (1, client.use_count ()); }; // We're going to write twice the queue size + 1, and the server isn't reading // The total number of drops should thus be 1 (the socket allows doubling the queue size for no_socket_drop) - func (nano::transport::tcp_socket::default_max_queue_size * 2 + 1, nano::transport::buffer_drop_policy::no_socket_drop); - ASSERT_EQ (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_write_no_socket_drop, nano::stat::dir::out)); - ASSERT_EQ (0, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_write_drop, nano::stat::dir::out)); - - func (nano::transport::tcp_socket::default_max_queue_size + 1, nano::transport::buffer_drop_policy::limiter); - // The stats are accumulated from before - ASSERT_EQ (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_write_no_socket_drop, nano::stat::dir::out)); - ASSERT_EQ (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_write_drop, nano::stat::dir::out)); + func (nano::transport::tcp_socket::default_queue_size * 2 + 1); + ASSERT_EQ (1, failed_writes); + + func (nano::transport::tcp_socket::default_queue_size + 1); + ASSERT_EQ (0, failed_writes); } // This is abusing the socket class, it's interfering with the normal node lifetimes and as a result deadlocks @@ -303,7 +306,7 @@ TEST (socket_timeout, connect) // create one node and set timeout to 1 second nano::test::system system (1); std::shared_ptr node = system.nodes[0]; - node->config.tcp_io_timeout = std::chrono::seconds (1); + node->config.tcp_io_timeout = 1s; // try to connect to an IP address that most likely does not exist and will not reply // we want the tcp stack to not receive a negative reply, we want it to see silence and to keep trying @@ -315,22 +318,13 @@ TEST (socket_timeout, connect) std::atomic done = false; boost::system::error_code ec; socket->async_connect (endpoint, [&ec, &done] (boost::system::error_code const & ec_a) { - if (ec_a) - { - ec = ec_a; - done = true; - } + ec = ec_a; + done = true; }); - // check that the callback was called and we got an error + // Sometimes the connect will be aborted but there will be no error, just check that the callback was called due to the timeout ASSERT_TIMELY_EQ (6s, done, true); - ASSERT_TRUE (ec); - ASSERT_EQ (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_connect_error, nano::stat::dir::in)); - - // check that the socket was closed due to tcp_io_timeout timeout - // NOTE: this assert is not guaranteed to be always true, it is only likely that it will be true, we can also get "No route to host" - // if this test is run repeatedly or in parallel then it is guaranteed to fail due to "No route to host" instead of timeout - ASSERT_EQ (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_io_timeout_drop, nano::stat::dir::out)); + ASSERT_TRUE (socket->has_timed_out ()); } TEST (socket_timeout, read) @@ -381,6 +375,8 @@ TEST (socket_timeout, read) TEST (socket_timeout, write) { + std::atomic done = false; + // create one node and set timeout to 1 second nano::test::system system (1); std::shared_ptr node = system.nodes[0]; @@ -402,19 +398,17 @@ TEST (socket_timeout, write) // create a client socket and send lots of data to fill the socket queue on the local and remote side // eventually, the all tcp queues should fill up and async_write will not be able to progress // and the timeout should kick in and close the socket, which will cause the async_write to return an error - auto socket = std::make_shared (*node, nano::transport::socket_endpoint::client, 1024 * 64); // socket with a max queue size much larger than OS buffers - std::atomic done = false; - boost::system::error_code ec; - socket->async_connect (acceptor.local_endpoint (), [&socket, &ec, &done] (boost::system::error_code const & ec_a) { + auto socket = std::make_shared (*node, nano::transport::socket_endpoint::client, 1024 * 1024); // socket with a max queue size much larger than OS buffers + + socket->async_connect (acceptor.local_endpoint (), [&socket, &done] (boost::system::error_code const & ec_a) { EXPECT_FALSE (ec_a); auto buffer = std::make_shared> (128 * 1024); for (auto i = 0; i < 1024; ++i) { - socket->async_write (nano::shared_const_buffer{ buffer }, [&ec, &done] (boost::system::error_code const & ec_a, size_t size_a) { + socket->async_write (nano::shared_const_buffer{ buffer }, [&done] (boost::system::error_code const & ec_a, size_t size_a) { if (ec_a) { - ec = ec_a; done = true; } }); @@ -422,12 +416,11 @@ TEST (socket_timeout, write) }); // check that the callback was called and we got an error - ASSERT_TIMELY_EQ (10s, done, true); - ASSERT_TRUE (ec); - ASSERT_EQ (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_write_error, nano::stat::dir::in)); + ASSERT_TIMELY (10s, done); + ASSERT_LE (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_write_error, nano::stat::dir::in)); // check that the socket was closed due to tcp_io_timeout timeout - ASSERT_EQ (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_io_timeout_drop, nano::stat::dir::out)); + ASSERT_LE (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_io_timeout_drop, nano::stat::dir::out)); } TEST (socket_timeout, read_overlapped) @@ -451,8 +444,8 @@ TEST (socket_timeout, read_overlapped) auto buffer = std::make_shared> (1); nano::async_write (newsock, nano::shared_const_buffer (buffer), [] (boost::system::error_code const & ec_a, size_t size_a) { - debug_assert (!ec_a); - debug_assert (size_a == 1); + EXPECT_TRUE (!ec_a); + EXPECT_TRUE (size_a == 1); }); }); @@ -466,11 +459,12 @@ TEST (socket_timeout, read_overlapped) auto buffer = std::make_shared> (1); socket->async_read (buffer, 1, [] (boost::system::error_code const & ec_a, size_t size_a) { - debug_assert (size_a == 1); + EXPECT_FALSE (ec_a); + EXPECT_TRUE (size_a == 1); }); socket->async_read (buffer, 1, [&ec, &done] (boost::system::error_code const & ec_a, size_t size_a) { - debug_assert (size_a == 0); + EXPECT_EQ (size_a, 0); if (ec_a) { ec = ec_a; @@ -482,14 +476,16 @@ TEST (socket_timeout, read_overlapped) // check that the callback was called and we got an error ASSERT_TIMELY_EQ (10s, done, true); ASSERT_TRUE (ec); - ASSERT_EQ (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_read_error, nano::stat::dir::in)); + ASSERT_LE (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_read_error, nano::stat::dir::in)); // check that the socket was closed due to tcp_io_timeout timeout - ASSERT_EQ (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_io_timeout_drop, nano::stat::dir::out)); + ASSERT_LE (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_io_timeout_drop, nano::stat::dir::out)); } TEST (socket_timeout, write_overlapped) { + std::atomic done = false; + // create one node and set timeout to 1 second nano::test::system system (1); std::shared_ptr node = system.nodes[0]; @@ -509,30 +505,29 @@ TEST (socket_timeout, write_overlapped) EXPECT_FALSE (ec_a); boost::asio::async_read (newsock, boost::asio::buffer (buffer->data (), buffer->size ()), [] (boost::system::error_code const & ec_a, size_t size_a) { - debug_assert (size_a == 1); + EXPECT_FALSE (ec_a); + EXPECT_EQ (size_a, 1); }); }); // create a client socket and send lots of data to fill the socket queue on the local and remote side // eventually, the all tcp queues should fill up and async_write will not be able to progress // and the timeout should kick in and close the socket, which will cause the async_write to return an error - auto socket = std::make_shared (*node, nano::transport::socket_endpoint::client, 1024 * 64); // socket with a max queue size much larger than OS buffers - std::atomic done = false; - boost::system::error_code ec; - socket->async_connect (acceptor.local_endpoint (), [&socket, &ec, &done] (boost::system::error_code const & ec_a) { + auto socket = std::make_shared (*node, nano::transport::socket_endpoint::client, 1024 * 1024); // socket with a max queue size much larger than OS buffers + socket->async_connect (acceptor.local_endpoint (), [&socket, &done] (boost::system::error_code const & ec_a) { EXPECT_FALSE (ec_a); auto buffer1 = std::make_shared> (1); auto buffer2 = std::make_shared> (128 * 1024); socket->async_write (nano::shared_const_buffer{ buffer1 }, [] (boost::system::error_code const & ec_a, size_t size_a) { - debug_assert (size_a == 1); + EXPECT_FALSE (ec_a); + EXPECT_EQ (size_a, 1); }); for (auto i = 0; i < 1024; ++i) { - socket->async_write (nano::shared_const_buffer{ buffer2 }, [&ec, &done] (boost::system::error_code const & ec_a, size_t size_a) { + socket->async_write (nano::shared_const_buffer{ buffer2 }, [&done] (boost::system::error_code const & ec_a, size_t size_a) { if (ec_a) { - ec = ec_a; done = true; } }); @@ -541,9 +536,8 @@ TEST (socket_timeout, write_overlapped) // check that the callback was called and we got an error ASSERT_TIMELY_EQ (10s, done, true); - ASSERT_TRUE (ec); - ASSERT_EQ (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_write_error, nano::stat::dir::in)); + ASSERT_LE (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_write_error, nano::stat::dir::in)); // check that the socket was closed due to tcp_io_timeout timeout - ASSERT_EQ (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_io_timeout_drop, nano::stat::dir::out)); + ASSERT_LE (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_io_timeout_drop, nano::stat::dir::out)); } diff --git a/nano/core_test/system.cpp b/nano/core_test/system.cpp index 0ac3c28a3e..4c30b7b478 100644 --- a/nano/core_test/system.cpp +++ b/nano/core_test/system.cpp @@ -211,7 +211,7 @@ TEST (system, transport_basic) nano::transport::inproc::channel channel{ node0, node1 }; // Send a keepalive message since they are easy to construct nano::keepalive junk{ nano::dev::network_params.network }; - channel.send (junk); + channel.send (junk, nano::transport::traffic_type::test); // Ensure the keepalive has been reecived on the target. ASSERT_TIMELY (5s, node1.stats.count (nano::stat::type::message, nano::stat::detail::keepalive, nano::stat::dir::in) > 0); } diff --git a/nano/core_test/tcp_listener.cpp b/nano/core_test/tcp_listener.cpp index 9173806f46..787066f608 100644 --- a/nano/core_test/tcp_listener.cpp +++ b/nano/core_test/tcp_listener.cpp @@ -275,7 +275,7 @@ TEST (tcp_listener, tcp_listener_timeout_node_id_handshake) auto channel = std::make_shared (*node0, socket); socket->async_connect (node0->tcp_listener.endpoint (), [&node_id_handshake, channel] (boost::system::error_code const & ec) { ASSERT_FALSE (ec); - channel->send (node_id_handshake, [] (boost::system::error_code const & ec, size_t size_a) { + channel->send (node_id_handshake, nano::transport::traffic_type::test, [] (boost::system::error_code const & ec, size_t size_a) { ASSERT_FALSE (ec); }); }); diff --git a/nano/core_test/telemetry.cpp b/nano/core_test/telemetry.cpp index 859d3ffb39..aa213e776b 100644 --- a/nano/core_test/telemetry.cpp +++ b/nano/core_test/telemetry.cpp @@ -143,7 +143,7 @@ TEST (telemetry, dos_tcp) nano::telemetry_req message{ nano::dev::network_params.network }; auto channel = node_client->network.tcp_channels.find_node_id (node_server->get_node_id ()); ASSERT_NE (nullptr, channel); - channel->send (message, [] (boost::system::error_code const & ec, size_t size_a) { + channel->send (message, nano::transport::traffic_type::test, [] (boost::system::error_code const & ec, size_t size_a) { ASSERT_FALSE (ec); }); @@ -152,7 +152,7 @@ TEST (telemetry, dos_tcp) auto orig = std::chrono::steady_clock::now (); for (int i = 0; i < 10; ++i) { - channel->send (message, [] (boost::system::error_code const & ec, size_t size_a) { + channel->send (message, nano::transport::traffic_type::test, [] (boost::system::error_code const & ec, size_t size_a) { ASSERT_FALSE (ec); }); } @@ -165,7 +165,7 @@ TEST (telemetry, dos_tcp) // Now spam messages waiting for it to be processed while (node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in) == 1) { - channel->send (message); + channel->send (message, nano::transport::traffic_type::test); ASSERT_NO_ERROR (system.poll ()); } } @@ -214,7 +214,7 @@ TEST (telemetry, max_possible_size) auto channel = node_client->network.tcp_channels.find_node_id (node_server->get_node_id ()); ASSERT_NE (nullptr, channel); - channel->send (message, [] (boost::system::error_code const & ec, size_t size_a) { + channel->send (message, nano::transport::traffic_type::test, [] (boost::system::error_code const & ec, size_t size_a) { ASSERT_FALSE (ec); }); diff --git a/nano/lib/async.hpp b/nano/lib/async.hpp index 818d6bcafd..1cc8b0bfc5 100644 --- a/nano/lib/async.hpp +++ b/nano/lib/async.hpp @@ -4,6 +4,7 @@ #include +#include #include namespace asio = boost::asio; @@ -21,6 +22,12 @@ inline asio::awaitable sleep_for (auto duration) debug_assert (!ec || ec == asio::error::operation_aborted); } +inline asio::awaitable cancelled () +{ + auto state = co_await asio::this_coro::cancellation_state; + co_return state.cancelled () != asio::cancellation_type::none; +} + /** * A cancellation signal that can be emitted from any thread. * It follows the same semantics as asio::cancellation_signal. @@ -40,7 +47,6 @@ class cancellation { // Can only move if the strands are the same debug_assert (strand == other.strand); - if (this != &other) { signal = std::move (other.signal); @@ -70,6 +76,106 @@ class cancellation bool slotted{ false }; // For debugging purposes }; +class condition +{ +public: + explicit condition (nano::async::strand & strand) : + strand{ strand }, + state{ std::make_shared (strand) } + { + } + + condition (condition &&) = default; + + condition & operator= (condition && other) + { + // Can only move if the strands are the same + debug_assert (strand == other.strand); + if (this != &other) + { + state = std::move (other.state); + } + return *this; + } + + void notify () + { + // Avoid unnecessary dispatch if already scheduled + release_assert (state); + if (state->scheduled.exchange (true) == false) + { + asio::dispatch (strand, [state_s = state] () { + state_s->scheduled = false; + state_s->timer.cancel (); + }); + } + } + + // Spuriously wakes up + asio::awaitable wait () + { + debug_assert (strand.running_in_this_thread ()); + co_await wait_for (std::chrono::seconds{ 1 }); + } + + asio::awaitable wait_for (auto duration) + { + debug_assert (strand.running_in_this_thread ()); + release_assert (state); + state->timer.expires_after (duration); + boost::system::error_code ec; // Swallow error from cancellation + co_await state->timer.async_wait (asio::redirect_error (asio::use_awaitable, ec)); + debug_assert (!ec || ec == asio::error::operation_aborted); + } + + void cancel () + { + release_assert (state); + asio::dispatch (strand, [state_s = state] () { + state_s->scheduled = false; + state_s->timer.cancel (); + }); + } + + bool valid () const + { + return state != nullptr; + } + + nano::async::strand & strand; + +private: + struct shared_state + { + asio::steady_timer timer; + std::atomic scheduled{ false }; + + explicit shared_state (nano::async::strand & strand) : + timer{ strand } {}; + }; + std::shared_ptr state; +}; + +// Concept for awaitables +template +concept async_task = std::same_as>; + +// Concept for callables that return an awaitable +template +concept async_factory = requires (T t) { + { + t () + } -> std::same_as>; +}; + +// Concept for callables that take a condition and return an awaitable +template +concept async_factory_with_condition = requires (T t, condition & c) { + { + t (c) + } -> std::same_as>; +}; + /** * Wrapper with convenience functions and safety checks for asynchronous tasks. * Aims to provide interface similar to std::thread. @@ -86,13 +192,38 @@ class task { } - task (nano::async::strand & strand, auto && func) : + template + task (nano::async::strand & strand, Func && func) : + strand{ strand }, + cancellation{ strand } + { + future = asio::co_spawn ( + strand, + std::forward (func), + asio::bind_cancellation_slot (cancellation.slot (), asio::use_future)); + } + + template + task (nano::async::strand & strand, Func && func) : strand{ strand }, cancellation{ strand } { future = asio::co_spawn ( strand, - std::forward (func), + func (), + asio::bind_cancellation_slot (cancellation.slot (), asio::use_future)); + } + + template + task (nano::async::strand & strand, Func && func) : + strand{ strand }, + cancellation{ strand }, + condition{ std::make_unique (strand) } + { + auto awaitable_func = func (*condition); + future = asio::co_spawn ( + strand, + func (*condition), asio::bind_cancellation_slot (cancellation.slot (), asio::use_future)); } @@ -107,11 +238,11 @@ class task { // Can only move if the strands are the same debug_assert (strand == other.strand); - if (this != &other) { future = std::move (other.future); cancellation = std::move (other.cancellation); + condition = std::move (other.condition); } return *this; } @@ -139,6 +270,18 @@ class task { debug_assert (joinable ()); cancellation.emit (); + if (condition) + { + condition->cancel (); + } + } + + void notify () + { + if (condition) + { + condition->notify (); + } } nano::async::strand & strand; @@ -146,5 +289,6 @@ class task private: std::future future; nano::async::cancellation cancellation; + std::unique_ptr condition; }; } \ No newline at end of file diff --git a/nano/lib/logging_enums.hpp b/nano/lib/logging_enums.hpp index b3f5312577..db68477c15 100644 --- a/nano/lib/logging_enums.hpp +++ b/nano/lib/logging_enums.hpp @@ -55,6 +55,7 @@ enum class type socket, socket_server, tcp, + tcp_socket, tcp_server, tcp_listener, tcp_channels, diff --git a/nano/lib/stats_enums.hpp b/nano/lib/stats_enums.hpp index 23c7e819d2..7918c76a51 100644 --- a/nano/lib/stats_enums.hpp +++ b/nano/lib/stats_enums.hpp @@ -30,6 +30,12 @@ enum class type ipc, tcp, tcp_server, + tcp_channel, + tcp_channel_queued, + tcp_channel_send, + tcp_channel_drop, + tcp_channel_error, + tcp_channel_wait, tcp_channels, tcp_channels_rejected, tcp_channels_purge, @@ -42,7 +48,7 @@ enum class type confirmation_height, confirmation_observer, confirming_set, - drop, + drop, // TODO: Rename to message_drop aggregator, requests, request_aggregator, @@ -71,10 +77,13 @@ enum class type bootstrap_account_sets, bootstrap_frontier_scan, bootstrap_timeout, + bootstrap_wait, bootstrap_server, bootstrap_server_request, bootstrap_server_overfill, bootstrap_server_response, + bootstrap_server_send, + bootstrap_server_ec, active, active_elections, active_elections_started, @@ -93,6 +102,7 @@ enum class type optimistic_scheduler, handshake, rep_crawler, + rep_crawler_send, local_block_broadcaster, rep_tiers, syn_cookies, @@ -152,6 +162,9 @@ enum class detail sync, requeued, evicted, + other, + drop, + queued, // processing queue queue, @@ -300,15 +313,25 @@ enum class detail loop_reachout, loop_reachout_cached, merge_peer, + merge_peer_failed, reachout_live, reachout_cached, + connected, - // traffic + // traffic type generic, + bootstrap_server, + bootstrap_requests, + block_broadcast, + block_broadcast_initial, + block_broadcast_rpc, + confirmation_requests, + vote_rebroadcast, + vote_reply, + rep_crawler, + telemetry, // tcp - tcp_write_drop, - tcp_write_no_socket_drop, tcp_silent_connection_drop, tcp_io_timeout_drop, tcp_connect_error, @@ -335,6 +358,10 @@ enum class detail attempt_timeout, not_a_peer, + // tcp_channel + wait_socket, + wait_bandwidth, + // tcp_channels channel_accepted, channel_rejected, @@ -515,6 +542,12 @@ enum class detail safe, base, + // bootstrap_wait + wait_tags, + wait_blockprocessor, + wait_channel, + wait_priority, + // active started_hinted, started_optimistic, @@ -582,6 +615,12 @@ enum class detail rollback_skipped, loop_scan, + // error codes + no_buffer_space, + timed_out, + host_unreachable, + not_supported, + _last // Must be the last enum }; diff --git a/nano/lib/thread_runner.cpp b/nano/lib/thread_runner.cpp index 576504b0d7..bd9ab6ddf7 100644 --- a/nano/lib/thread_runner.cpp +++ b/nano/lib/thread_runner.cpp @@ -60,16 +60,18 @@ void nano::thread_runner::join () { io_guard.reset (); - for (auto & i : threads) + for (auto & thread : threads) { - if (i.joinable ()) + if (thread.joinable ()) { - i.join (); + logger.debug (nano::log::type::thread_runner, "Joining thread: {}", fmt::streamed (thread.get_id ())); + thread.join (); } } + threads.clear (); - logger.debug (nano::log::type::thread_runner, "Stopped threads ({})", to_string (role)); + logger.debug (nano::log::type::thread_runner, "Stopped all threads ({})", to_string (role)); io_ctx.reset (); // Release shared_ptr to io_context } diff --git a/nano/lib/thread_runner.hpp b/nano/lib/thread_runner.hpp index 7ad2004793..ec5befaf5f 100644 --- a/nano/lib/thread_runner.hpp +++ b/nano/lib/thread_runner.hpp @@ -18,11 +18,11 @@ class thread_runner final thread_runner (std::shared_ptr, nano::logger &, unsigned num_threads = nano::hardware_concurrency (), nano::thread_role::name thread_role = nano::thread_role::name::io); ~thread_runner (); - /** Wait for IO threads to complete */ + // Wait for IO threads to complete void join (); - /** Tells the IO context to stop processing events. - * NOTE: This shouldn't really be used, node should stop gracefully by cancelling any outstanding async operations and calling join() */ + // Tells the IO context to stop processing events. + // TODO: Ideally this shouldn't be needed, node should stop gracefully by cancelling any outstanding async operations and calling join() void abort (); private: diff --git a/nano/lib/timer.cpp b/nano/lib/timer.cpp index 9357f885de..3f817b8183 100644 --- a/nano/lib/timer.cpp +++ b/nano/lib/timer.cpp @@ -99,6 +99,7 @@ void nano::timer::start () template UNIT nano::timer::restart () { + update_ticks (); auto current = ticks; state = nano::timer_state::started; begin = CLOCK::now (); diff --git a/nano/lib/timer.hpp b/nano/lib/timer.hpp index f9b6580d96..49461c973e 100644 --- a/nano/lib/timer.hpp +++ b/nano/lib/timer.hpp @@ -18,10 +18,9 @@ template channel) +bool nano::bootstrap_server::request (nano::asc_pull_req const & message, std::shared_ptr const & channel) { if (!verify (message)) { @@ -113,8 +113,7 @@ bool nano::bootstrap_server::request (nano::asc_pull_req const & message, std::s } // If channel is full our response will be dropped anyway, so filter that early - // TODO: Add per channel limits (this ideally should be done on the channel message processing side) - if (channel->max (nano::transport::traffic_type::bootstrap)) + if (channel->max (nano::transport::traffic_type::bootstrap_server)) { stats.inc (nano::stat::type::bootstrap_server, nano::stat::detail::channel_full, nano::stat::dir::in); return false; @@ -160,6 +159,7 @@ void nano::bootstrap_server::respond (nano::asc_pull_ack & response, std::shared } void operator() (nano::asc_pull_ack::account_info_payload const & pld) { + stats.inc (nano::stat::type::bootstrap_server, nano::stat::detail::account_info, nano::stat::dir::out); } void operator() (nano::asc_pull_ack::frontiers_payload const & pld) { @@ -171,13 +171,9 @@ void nano::bootstrap_server::respond (nano::asc_pull_ack & response, std::shared on_response.notify (response, channel); channel->send ( - response, [this] (auto & ec, auto size) { - if (ec) - { - stats.inc (nano::stat::type::bootstrap_server, nano::stat::detail::write_error, nano::stat::dir::out); - } - }, - nano::transport::buffer_drop_policy::limiter, nano::transport::traffic_type::bootstrap); + response, nano::transport::traffic_type::bootstrap_server, [this] (auto & ec, auto size) { + stats.inc (nano::stat::type::bootstrap_server_ec, to_stat_detail (ec), nano::stat::dir::out); + }); } void nano::bootstrap_server::run () @@ -220,7 +216,7 @@ void nano::bootstrap_server::run_batch (nano::unique_lock & lock) transaction.refresh_if_needed (); - if (!channel->max (nano::transport::traffic_type::bootstrap)) + if (!channel->max (nano::transport::traffic_type::bootstrap_server)) { auto response = process (transaction, request); respond (response, channel); diff --git a/nano/node/bootstrap/bootstrap_server.hpp b/nano/node/bootstrap/bootstrap_server.hpp index 41c33404d1..4a211cafb7 100644 --- a/nano/node/bootstrap/bootstrap_server.hpp +++ b/nano/node/bootstrap/bootstrap_server.hpp @@ -42,7 +42,7 @@ class bootstrap_server final * Process `asc_pull_req` message coming from network. * Reply will be sent back over passed in `channel` */ - bool request (nano::asc_pull_req const & message, std::shared_ptr channel); + bool request (nano::asc_pull_req const & message, std::shared_ptr const & channel); public: // Events nano::observer_set const &> on_response; diff --git a/nano/node/bootstrap/bootstrap_service.cpp b/nano/node/bootstrap/bootstrap_service.cpp index c2d8188249..5a2f2e0789 100644 --- a/nano/node/bootstrap/bootstrap_service.cpp +++ b/nano/node/bootstrap/bootstrap_service.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include @@ -201,11 +202,8 @@ bool nano::bootstrap_service::send (std::shared_ptr co request.update_header (); - stats.inc (nano::stat::type::bootstrap, nano::stat::detail::request, nano::stat::dir::out); - stats.inc (nano::stat::type::bootstrap_request, to_stat_detail (tag.type)); - - channel->send ( - request, [this, id = tag.id] (auto const & ec, auto size) { + bool sent = channel->send ( + request, nano::transport::traffic_type::bootstrap_requests, [this, id = tag.id] (auto const & ec, auto size) { nano::lock_guard lock{ mutex }; if (auto it = tags.get ().find (id); it != tags.get ().end ()) { @@ -222,9 +220,19 @@ bool nano::bootstrap_service::send (std::shared_ptr co stats.inc (nano::stat::type::bootstrap, nano::stat::detail::request_failed, nano::stat::dir::out); tags.get ().erase (it); } - } }, nano::transport::buffer_drop_policy::limiter, nano::transport::traffic_type::bootstrap); + } }); + + if (sent) + { + stats.inc (nano::stat::type::bootstrap, nano::stat::detail::request, nano::stat::dir::out); + stats.inc (nano::stat::type::bootstrap_request, to_stat_detail (tag.type)); + } + else + { + stats.inc (nano::stat::type::bootstrap, nano::stat::detail::request_failed, nano::stat::dir::out); + } - return true; // TODO: Return channel send result + return sent; } std::size_t nano::bootstrap_service::priority_size () const @@ -557,13 +565,19 @@ bool nano::bootstrap_service::request_frontiers (nano::account start, std::share void nano::bootstrap_service::run_one_priority () { + nano::timer timer{ timer_state::started }; + // Wait for the blockprocessor to have capacity wait_blockprocessor (); + stats.add (nano::stat::type::bootstrap_wait, nano::stat::detail::wait_blockprocessor, timer.restart ().count ()); + // Wait for a channel with available capacity auto channel = wait_channel (); + stats.add (nano::stat::type::bootstrap_wait, nano::stat::detail::wait_channel, timer.restart ().count ()); if (!channel) { return; } auto [account, priority, fails] = wait_priority (); + stats.add (nano::stat::type::bootstrap_wait, nano::stat::detail::wait_priority, timer.restart ().count ()); if (account.is_zero ()) { return; diff --git a/nano/node/bootstrap/peer_scoring.cpp b/nano/node/bootstrap/peer_scoring.cpp index a86bdc5745..1e406ab962 100644 --- a/nano/node/bootstrap/peer_scoring.cpp +++ b/nano/node/bootstrap/peer_scoring.cpp @@ -68,7 +68,7 @@ std::shared_ptr nano::bootstrap::peer_scoring::channel { for (auto const & channel : channels) { - if (!channel->max (nano::transport::traffic_type::bootstrap)) + if (!channel->max (traffic_type)) { if (!try_send_message (channel)) { diff --git a/nano/node/bootstrap/peer_scoring.hpp b/nano/node/bootstrap/peer_scoring.hpp index 9b41865393..0bae9255c3 100644 --- a/nano/node/bootstrap/peer_scoring.hpp +++ b/nano/node/bootstrap/peer_scoring.hpp @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include @@ -19,6 +20,9 @@ namespace bootstrap // Container for tracking and scoring peers with respect to bootstrapping class peer_scoring { + public: + static nano::transport::traffic_type constexpr traffic_type = nano::transport::traffic_type::bootstrap_requests; + public: peer_scoring (bootstrap_config const &, nano::network_constants const &); diff --git a/nano/node/confirmation_solicitor.cpp b/nano/node/confirmation_solicitor.cpp index 5695f5f8b9..a0a71a0327 100644 --- a/nano/node/confirmation_solicitor.cpp +++ b/nano/node/confirmation_solicitor.cpp @@ -44,12 +44,13 @@ bool nano::confirmation_solicitor::broadcast (nano::election const & election_a) bool const different (exists && existing->second.hash != hash); if (!exists || different) { - i->channel->send (winner); + i->channel->send (winner, nano::transport::traffic_type::block_broadcast); count += different ? 0 : 1; } } // Random flood for block propagation - network.flood_message (winner, nano::transport::buffer_drop_policy::limiter, 0.5f); + // TODO: Avoid broadcasting to the same peers that were already broadcasted to + network.flood_message (winner, nano::transport::traffic_type::block_broadcast, 0.5f); error = false; } return error; @@ -71,9 +72,9 @@ bool nano::confirmation_solicitor::add (nano::election const & election_a) bool const different (exists && existing->second.hash != hash); if (!exists || !is_final || different) { - auto & request_queue (requests[rep.channel]); - if (!rep.channel->max ()) + if (!rep.channel->max (nano::transport::traffic_type::confirmation_requests)) { + auto & request_queue (requests[rep.channel]); request_queue.emplace_back (election_a.status.winner->hash (), election_a.status.winner->root ()); count += different ? 0 : 1; error = false; @@ -101,14 +102,14 @@ void nano::confirmation_solicitor::flush () if (roots_hashes_l.size () == nano::network::confirm_req_hashes_max) { nano::confirm_req req{ config.network_params.network, roots_hashes_l }; - channel->send (req); + channel->send (req, nano::transport::traffic_type::confirmation_requests); roots_hashes_l.clear (); } } if (!roots_hashes_l.empty ()) { nano::confirm_req req{ config.network_params.network, roots_hashes_l }; - channel->send (req); + channel->send (req, nano::transport::traffic_type::confirmation_requests); } } prepared = false; diff --git a/nano/node/election.cpp b/nano/node/election.cpp index 125c4675fb..d9b329f39f 100644 --- a/nano/node/election.cpp +++ b/nano/node/election.cpp @@ -572,7 +572,7 @@ bool nano::election::publish (std::shared_ptr const & block_a) if (status.winner->hash () == block_a->hash ()) { status.winner = block_a; - node.network.flood_block (block_a, nano::transport::buffer_drop_policy::no_limiter_drop); + node.network.flood_block (block_a, nano::transport::traffic_type::block_broadcast); } } } diff --git a/nano/node/fwd.hpp b/nano/node/fwd.hpp index e477f8414a..702a51f2e8 100644 --- a/nano/node/fwd.hpp +++ b/nano/node/fwd.hpp @@ -18,6 +18,7 @@ class bootstrap_server; class bootstrap_service; class confirming_set; class election; +class election_status; class local_block_broadcaster; class local_vote_history; class logger; diff --git a/nano/node/json_handler.cpp b/nano/node/json_handler.cpp index 708e0e3f98..6cff65f729 100644 --- a/nano/node/json_handler.cpp +++ b/nano/node/json_handler.cpp @@ -3632,7 +3632,7 @@ void nano::json_handler::republish () } hash = node.ledger.any.block_successor (transaction, hash).value_or (0); } - node.network.flood_block_many (std::move (republish_bundle), nullptr, 25); + node.network.flood_block_many (std::move (republish_bundle), nano::transport::traffic_type::block_broadcast_rpc, 25ms); response_l.put ("success", ""); // obsolete response_l.add_child ("blocks", blocks); } @@ -4867,7 +4867,7 @@ void nano::json_handler::wallet_republish () blocks.push_back (std::make_pair ("", entry)); } } - node.network.flood_block_many (std::move (republish_bundle), nullptr, 25); + node.network.flood_block_many (std::move (republish_bundle), nano::transport::traffic_type::keepalive, 25ms); response_l.add_child ("blocks", blocks); } response_errors (); diff --git a/nano/node/message_processor.cpp b/nano/node/message_processor.cpp index 2b83df9044..b6828f812e 100644 --- a/nano/node/message_processor.cpp +++ b/nano/node/message_processor.cpp @@ -171,16 +171,13 @@ class process_visitor : public nano::message_visitor void keepalive (nano::keepalive const & message) override { - // Check for special node port data - auto peer0 (message.peers[0]); - if (peer0.address () == boost::asio::ip::address_v6{} && peer0.port () != 0) + // Check for self reported peering port + auto self_report = message.peers[0]; + if (self_report.address () == boost::asio::ip::address_v6{} && self_report.port () != 0) { - // TODO: Remove this as we do not need to establish a second connection to the same peer - nano::endpoint new_endpoint (channel->get_remote_endpoint ().address (), peer0.port ()); - node.network.merge_peer (new_endpoint); - // Remember this for future forwarding to other peers - channel->set_peering_endpoint (new_endpoint); + nano::endpoint peering_endpoint{ channel->get_remote_endpoint ().address (), self_report.port () }; + channel->set_peering_endpoint (peering_endpoint); } } diff --git a/nano/node/network.cpp b/nano/node/network.cpp index 681fd00874..12ee9718d6 100644 --- a/nano/node/network.cpp +++ b/nano/node/network.cpp @@ -18,16 +18,20 @@ std::size_t nano::network::confirm_ack_hashes_max{ 255 }; * network */ -nano::network::network (nano::node & node, uint16_t port) : - config{ node.config.network }, - node{ node }, +nano::network::network (nano::node & node_a, uint16_t port_a) : + config{ node_a.config.network }, + node{ node_a }, id{ nano::network_constants::active_network }, syn_cookies{ node.config.network.max_peers_per_ip, node.logger }, resolver{ node.io_ctx }, filter{ node.config.network.duplicate_filter_size, node.config.network.duplicate_filter_cutoff }, tcp_channels{ node }, - port{ port } + port{ port_a } { + node.observers.channel_connected.add ([this] (std::shared_ptr const & channel) { + node.stats.inc (nano::stat::type::network, nano::stat::detail::connected); + node.logger.debug (nano::log::type::network, "Connected to: {}", channel->to_string ()); + }); } nano::network::~network () @@ -58,6 +62,11 @@ void nano::network::start () run_reachout (); }); } + else + { + node.logger.warn (nano::log::type::network, "Peer reachout is disabled"); + } + if (config.cached_peer_reachout.count () > 0) { reachout_cached_thread = std::thread ([this] () { @@ -65,11 +74,19 @@ void nano::network::start () run_reachout_cached (); }); } + else + { + node.logger.warn (nano::log::type::network, "Cached peer reachout is disabled"); + } if (!node.flags.disable_tcp_realtime) { tcp_channels.start (); } + else + { + node.logger.warn (nano::log::type::network, "Realtime TCP is disabled"); + } } void nano::network::stop () @@ -216,109 +233,113 @@ void nano::network::run_reachout_cached () } } -void nano::network::send_keepalive (std::shared_ptr const & channel_a) +void nano::network::send_keepalive (std::shared_ptr const & channel) const { nano::keepalive message{ node.network_params.network }; random_fill (message.peers); - channel_a->send (message); + channel->send (message, nano::transport::traffic_type::keepalive); } -void nano::network::send_keepalive_self (std::shared_ptr const & channel_a) +void nano::network::send_keepalive_self (std::shared_ptr const & channel) const { nano::keepalive message{ node.network_params.network }; fill_keepalive_self (message.peers); - channel_a->send (message); + channel->send (message, nano::transport::traffic_type::keepalive); } -void nano::network::flood_message (nano::message & message_a, nano::transport::buffer_drop_policy const drop_policy_a, float const scale_a) +void nano::network::flood_message (nano::message const & message, nano::transport::traffic_type type, float scale) const { - for (auto & i : list (fanout (scale_a))) + for (auto const & channel : list (fanout (scale))) { - i->send (message_a, nullptr, drop_policy_a); + channel->send (message, type); } } -void nano::network::flood_keepalive (float const scale_a) +void nano::network::flood_keepalive (float scale) const { nano::keepalive message{ node.network_params.network }; random_fill (message.peers); - flood_message (message, nano::transport::buffer_drop_policy::limiter, scale_a); + flood_message (message, nano::transport::traffic_type::keepalive, scale); } -void nano::network::flood_keepalive_self (float const scale_a) +void nano::network::flood_keepalive_self (float scale) const { nano::keepalive message{ node.network_params.network }; fill_keepalive_self (message.peers); - flood_message (message, nano::transport::buffer_drop_policy::limiter, scale_a); + flood_message (message, nano::transport::traffic_type::keepalive, scale); } -void nano::network::flood_block (std::shared_ptr const & block, nano::transport::buffer_drop_policy const drop_policy) +void nano::network::flood_block (std::shared_ptr const & block, nano::transport::traffic_type type) const { nano::publish message{ node.network_params.network, block }; - flood_message (message, drop_policy); + flood_message (message, type); } -void nano::network::flood_block_initial (std::shared_ptr const & block) +void nano::network::flood_block_initial (std::shared_ptr const & block) const { nano::publish message{ node.network_params.network, block, /* is_originator */ true }; for (auto const & rep : node.rep_crawler.principal_representatives ()) { - rep.channel->send (message, nullptr, nano::transport::buffer_drop_policy::no_limiter_drop); + rep.channel->send (message, nano::transport::traffic_type::block_broadcast_initial); } for (auto & peer : list_non_pr (fanout (1.0))) { - peer->send (message, nullptr, nano::transport::buffer_drop_policy::no_limiter_drop); + peer->send (message, nano::transport::traffic_type::block_broadcast_initial); } } -void nano::network::flood_vote (std::shared_ptr const & vote, float scale, bool rebroadcasted) +void nano::network::flood_vote (std::shared_ptr const & vote, float scale, bool rebroadcasted) const { nano::confirm_ack message{ node.network_params.network, vote, rebroadcasted }; - for (auto & i : list (fanout (scale))) + for (auto & channel : list (fanout (scale))) { - i->send (message, nullptr); + channel->send (message, rebroadcasted ? nano::transport::traffic_type::vote_rebroadcast : nano::transport::traffic_type::vote); } } -void nano::network::flood_vote_non_pr (std::shared_ptr const & vote, float scale, bool rebroadcasted) +void nano::network::flood_vote_non_pr (std::shared_ptr const & vote, float scale, bool rebroadcasted) const { nano::confirm_ack message{ node.network_params.network, vote, rebroadcasted }; - for (auto & i : list_non_pr (fanout (scale))) + for (auto & channel : list_non_pr (fanout (scale))) { - i->send (message, nullptr); + channel->send (message, rebroadcasted ? nano::transport::traffic_type::vote_rebroadcast : nano::transport::traffic_type::vote); } } -void nano::network::flood_vote_pr (std::shared_ptr const & vote, bool rebroadcasted) +void nano::network::flood_vote_pr (std::shared_ptr const & vote, bool rebroadcasted) const { nano::confirm_ack message{ node.network_params.network, vote, rebroadcasted }; - for (auto const & i : node.rep_crawler.principal_representatives ()) + for (auto const & channel : node.rep_crawler.principal_representatives ()) { - i.channel->send (message, nullptr, nano::transport::buffer_drop_policy::no_limiter_drop); + channel.channel->send (message, rebroadcasted ? nano::transport::traffic_type::vote_rebroadcast : nano::transport::traffic_type::vote); } } -void nano::network::flood_block_many (std::deque> blocks_a, std::function callback_a, unsigned delay_a) +void nano::network::flood_block_many (std::deque> blocks, nano::transport::traffic_type type, std::chrono::milliseconds delay, std::function callback) const { - if (!blocks_a.empty ()) + if (blocks.empty ()) { - auto block_l (blocks_a.front ()); - blocks_a.pop_front (); - flood_block (block_l); - if (!blocks_a.empty ()) - { - std::weak_ptr node_w (node.shared ()); - node.workers.post_delayed (std::chrono::milliseconds (delay_a + std::rand () % delay_a), [node_w, blocks (std::move (blocks_a)), callback_a, delay_a] () { - if (auto node_l = node_w.lock ()) - { - node_l->network.flood_block_many (std::move (blocks), callback_a, delay_a); - } - }); - } - else if (callback_a) - { - callback_a (); - } + return; + } + + auto block = blocks.front (); + blocks.pop_front (); + + flood_block (block, type); + + if (!blocks.empty ()) + { + std::weak_ptr node_w (node.shared ()); + node.workers.post_delayed (delay, [node_w, type, blocks = std::move (blocks), delay, callback] () mutable { + if (auto node_l = node_w.lock ()) + { + node_l->network.flood_block_many (std::move (blocks), type, delay, callback); + } + }); + } + else if (callback) + { + callback (); } } @@ -331,17 +352,24 @@ void nano::network::merge_peers (std::array const & peers_a) } } -void nano::network::merge_peer (nano::endpoint const & peer_a) +bool nano::network::merge_peer (nano::endpoint const & peer) { - if (track_reachout (peer_a)) + if (track_reachout (peer)) { node.stats.inc (nano::stat::type::network, nano::stat::detail::merge_peer); - - tcp_channels.start_tcp (peer_a); + node.logger.debug (nano::log::type::network, "Initiating peer merge: {}", fmt::streamed (peer)); + bool started = tcp_channels.start_tcp (peer); + if (!started) + { + node.stats.inc (nano::stat::type::tcp, nano::stat::detail::merge_peer_failed); + node.logger.debug (nano::log::type::network, "Peer merge failed: {}", fmt::streamed (peer)); + } + return started; } + return false; // Not initiated } -bool nano::network::not_a_peer (nano::endpoint const & endpoint_a, bool allow_local_peers) +bool nano::network::not_a_peer (nano::endpoint const & endpoint_a, bool allow_local_peers) const { bool result (false); if (endpoint_a.address ().to_v6 ().is_unspecified ()) @@ -369,32 +397,32 @@ bool nano::network::track_reachout (nano::endpoint const & endpoint_a) return tcp_channels.track_reachout (endpoint_a); } -std::deque> nano::network::list (std::size_t count_a, uint8_t minimum_version_a, bool include_tcp_temporary_channels_a) +std::deque> nano::network::list (std::size_t max_count, uint8_t minimum_version) const { - std::deque> result; - tcp_channels.list (result, minimum_version_a, include_tcp_temporary_channels_a); - nano::random_pool_shuffle (result.begin (), result.end ()); - if (count_a > 0 && result.size () > count_a) + auto result = tcp_channels.list (minimum_version); + nano::random_pool_shuffle (result.begin (), result.end ()); // Randomize returned peer order + if (max_count > 0 && result.size () > max_count) { - result.resize (count_a, nullptr); + result.resize (max_count, nullptr); } return result; } -std::deque> nano::network::list_non_pr (std::size_t count_a) +std::deque> nano::network::list_non_pr (std::size_t max_count, uint8_t minimum_version) const { - std::deque> result; - tcp_channels.list (result); + auto result = tcp_channels.list (minimum_version); auto partition_point = std::partition (result.begin (), result.end (), [this] (std::shared_ptr const & channel) { return !node.rep_crawler.is_pr (channel); }); result.resize (std::distance (result.begin (), partition_point)); - nano::random_pool_shuffle (result.begin (), result.end ()); - if (result.size () > count_a) + + nano::random_pool_shuffle (result.begin (), result.end ()); // Randomize returned peer order + + if (result.size () > max_count) { - result.resize (count_a, nullptr); + result.resize (max_count, nullptr); } return result; } @@ -405,14 +433,14 @@ std::size_t nano::network::fanout (float scale) const return static_cast (std::ceil (scale * size_sqrt ())); } -std::unordered_set> nano::network::random_set (std::size_t count_a, uint8_t min_version_a, bool include_temporary_channels_a) const +std::unordered_set> nano::network::random_set (std::size_t max_count, uint8_t minimum_version) const { - return tcp_channels.random_set (count_a, min_version_a, include_temporary_channels_a); + return tcp_channels.random_set (max_count, minimum_version); } void nano::network::random_fill (std::array & target_a) const { - auto peers (random_set (target_a.size (), 0, false)); // Don't include channels with ephemeral remote ports + auto peers (random_set (target_a.size (), 0)); debug_assert (peers.size () <= target_a.size ()); auto endpoint (nano::endpoint (boost::asio::ip::address_v6{}, 0)); debug_assert (endpoint.address ().is_v6 ()); diff --git a/nano/node/network.hpp b/nano/node/network.hpp index d1130271d1..621e7fdd93 100644 --- a/nano/node/network.hpp +++ b/nano/node/network.hpp @@ -90,37 +90,48 @@ class network final void start (); void stop (); - void flood_message (nano::message &, nano::transport::buffer_drop_policy const = nano::transport::buffer_drop_policy::limiter, float const = 1.0f); - void flood_keepalive (float const scale_a = 1.0f); - void flood_keepalive_self (float const scale_a = 0.5f); - void flood_vote (std::shared_ptr const &, float scale, bool rebroadcasted = false); - void flood_vote_pr (std::shared_ptr const &, bool rebroadcasted = false); - void flood_vote_non_pr (std::shared_ptr const &, float scale, bool rebroadcasted = false); + nano::endpoint endpoint () const; + + void flood_message (nano::message const &, nano::transport::traffic_type, float scale = 1.0f) const; + void flood_keepalive (float scale = 1.0f) const; + void flood_keepalive_self (float scale = 0.5f) const; + void flood_vote (std::shared_ptr const &, float scale, bool rebroadcasted = false) const; + void flood_vote_pr (std::shared_ptr const &, bool rebroadcasted = false) const; + void flood_vote_non_pr (std::shared_ptr const &, float scale, bool rebroadcasted = false) const; // Flood block to all PRs and a random selection of non-PRs - void flood_block_initial (std::shared_ptr const &); + void flood_block_initial (std::shared_ptr const &) const; // Flood block to a random selection of peers - void flood_block (std::shared_ptr const &, nano::transport::buffer_drop_policy const = nano::transport::buffer_drop_policy::limiter); - void flood_block_many (std::deque>, std::function = nullptr, unsigned = broadcast_interval_ms); - void merge_peers (std::array const &); - void merge_peer (nano::endpoint const &); - void send_keepalive (std::shared_ptr const &); - void send_keepalive_self (std::shared_ptr const &); + void flood_block (std::shared_ptr const &, nano::transport::traffic_type) const; + void flood_block_many (std::deque>, nano::transport::traffic_type, std::chrono::milliseconds delay = 10ms, std::function callback = nullptr) const; + + void send_keepalive (std::shared_ptr const &) const; + void send_keepalive_self (std::shared_ptr const &) const; + + void merge_peers (std::array const & ips); + bool merge_peer (nano::endpoint const & ip); + std::shared_ptr find_node_id (nano::account const &); std::shared_ptr find_channel (nano::endpoint const &); - bool not_a_peer (nano::endpoint const &, bool allow_local_peers); + + // Check if the endpoint address looks OK + bool not_a_peer (nano::endpoint const &, bool allow_local_peers) const; // Should we reach out to this endpoint with a keepalive message? If yes, register a new reachout attempt bool track_reachout (nano::endpoint const &); - std::deque> list (std::size_t max_count = 0, uint8_t = 0, bool = true); - std::deque> list_non_pr (std::size_t); + + std::deque> list (std::size_t max_count = 0, uint8_t minimum_version = 0) const; + std::deque> list_non_pr (std::size_t max_count, uint8_t minimum_version = 0) const; + // Desired fanout for a given scale std::size_t fanout (float scale = 1.0f) const; + void random_fill (std::array &) const; void fill_keepalive_self (std::array &) const; + // Note: The minimum protocol version is used after the random selection, so number of peers can be less than expected. - std::unordered_set> random_set (std::size_t count, uint8_t min_version = 0, bool include_temporary_channels = false) const; + std::unordered_set> random_set (std::size_t max_count, uint8_t minimum_version = 0) const; + // Get the next peer for attempting a tcp bootstrap connection nano::tcp_endpoint bootstrap_peer (); - nano::endpoint endpoint () const; void cleanup (std::chrono::steady_clock::time_point const & cutoff); std::size_t size () const; float size_sqrt () const; @@ -158,8 +169,6 @@ class network final public: // Callbacks std::function disconnect_observer{ [] () {} }; - // Called when a new channel is observed - std::function)> channel_observer{ [] (auto) {} }; private: std::atomic stopped{ false }; @@ -171,7 +180,6 @@ class network final std::thread reachout_cached_thread; public: - static unsigned const broadcast_interval_ms = 10; static std::size_t const buffer_size = 512; static std::size_t confirm_req_hashes_max; diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 5e87808671..a540fd547d 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -229,10 +229,6 @@ nano::node::node (std::shared_ptr io_ctx_a, std::filesy wallets.observer = [this] (bool active) { observers.wallet.notify (active); }; - network.channel_observer = [this] (std::shared_ptr const & channel_a) { - debug_assert (channel_a != nullptr); - observers.endpoint.notify (channel_a); - }; network.disconnect_observer = [this] () { observers.disconnect.notify (); }; @@ -297,8 +293,8 @@ nano::node::node (std::shared_ptr io_ctx_a, std::filesy }); } - observers.endpoint.add ([this] (std::shared_ptr const & channel_a) { - this->network.send_keepalive_self (channel_a); + observers.channel_connected.add ([this] (std::shared_ptr const & channel) { + network.send_keepalive_self (channel); }); observers.vote.add ([this] (std::shared_ptr vote, std::shared_ptr const & channel, nano::vote_source source, nano::vote_code code) { @@ -709,7 +705,7 @@ void nano::node::stop () epoch_upgrader.stop (); local_block_broadcaster.stop (); message_processor.stop (); - network.stop (); // Stop network last to avoid killing in-use sockets + network.stop (); monitor.stop (); bootstrap_workers.stop (); @@ -720,6 +716,7 @@ void nano::node::stop () // work pool is not stopped on purpose due to testing setup // Stop the IO runner last + runner.abort (); runner.join (); debug_assert (io_ctx_shared.use_count () == 1); // Node should be the last user of the io_context } diff --git a/nano/node/node.hpp b/nano/node/node.hpp index 56c6727802..2225728a49 100644 --- a/nano/node/node.hpp +++ b/nano/node/node.hpp @@ -138,6 +138,7 @@ class node final : public std::enable_shared_from_this boost::latch node_initialized_latch; nano::network_params & network_params; nano::stats stats; + nano::node_observers observers; std::unique_ptr workers_impl; nano::thread_pool & workers; std::unique_ptr bootstrap_workers_impl; @@ -165,7 +166,6 @@ class node final : public std::enable_shared_from_this std::unique_ptr tcp_listener_impl; nano::transport::tcp_listener & tcp_listener; std::filesystem::path application_path; - nano::node_observers observers; std::unique_ptr port_mapping_impl; nano::port_mapping & port_mapping; std::unique_ptr block_processor_impl; diff --git a/nano/node/node_observers.cpp b/nano/node/node_observers.cpp index a0bbd03cd7..ed437dd7d5 100644 --- a/nano/node/node_observers.cpp +++ b/nano/node/node_observers.cpp @@ -9,10 +9,10 @@ nano::container_info nano::node_observers::container_info () const info.put ("active_started", active_started.size ()); info.put ("active_stopped", active_stopped.size ()); info.put ("account_balance", account_balance.size ()); - info.put ("endpoint", endpoint.size ()); info.put ("disconnect", disconnect.size ()); info.put ("work_cancel", work_cancel.size ()); info.put ("telemetry", telemetry.size ()); info.put ("socket_connected", socket_connected.size ()); + info.put ("channel_connected", channel_connected.size ()); return info; } diff --git a/nano/node/node_observers.hpp b/nano/node/node_observers.hpp index 1c6d36ed40..af9df9a074 100644 --- a/nano/node/node_observers.hpp +++ b/nano/node/node_observers.hpp @@ -2,21 +2,10 @@ #include #include +#include #include #include -namespace nano -{ -enum class vote_source; -class election_status; -class telemetry; -enum class vote_code; -} -namespace nano::transport -{ -class channel; -} - namespace nano { class node_observers final @@ -29,11 +18,11 @@ class node_observers final nano::observer_set active_started; nano::observer_set active_stopped; nano::observer_set account_balance; - nano::observer_set> endpoint; nano::observer_set<> disconnect; nano::observer_set work_cancel; nano::observer_set const &> telemetry; - nano::observer_set socket_connected; + nano::observer_set> socket_connected; + nano::observer_set> channel_connected; nano::container_info container_info () const; }; diff --git a/nano/node/repcrawler.cpp b/nano/node/repcrawler.cpp index a0a67b0e6a..786bd4c5e7 100644 --- a/nano/node/repcrawler.cpp +++ b/nano/node/repcrawler.cpp @@ -14,7 +14,7 @@ nano::rep_crawler::rep_crawler (nano::rep_crawler_config const & config_a, nano: network_constants{ node_a.network_params.network }, active{ node_a.active } { - node.observers.endpoint.add ([this] (std::shared_ptr const & channel) { + node.observers.channel_connected.add ([this] (std::shared_ptr const & channel) { if (!node.flags.disable_rep_crawler) { { @@ -260,7 +260,7 @@ std::deque> nano::rep_crawler::prepare // Crawl more aggressively if we lack sufficient total peer weight. auto const required_peer_count = sufficient_weight ? conservative_count : aggressive_count; - auto random_peers = node.network.random_set (required_peer_count, 0, /* include channels with ephemeral remote ports */ true); + auto random_peers = node.network.random_set (required_peer_count); auto should_query = [&, this] (std::shared_ptr const & channel) { if (auto rep = reps.get ().find (channel); rep != reps.get ().end ()) @@ -339,8 +339,6 @@ void nano::rep_crawler::query (std::dequesend ( - req, - [this] (auto & ec, auto size) { - if (ec) - { - stats.inc (nano::stat::type::rep_crawler, nano::stat::detail::write_error, nano::stat::dir::out); - } - }, - nano::transport::buffer_drop_policy::no_socket_drop); + channel->send (req, nano::transport::traffic_type::rep_crawler, [this] (auto & ec, auto size) { + stats.inc (nano::stat::type::rep_crawler_send, to_stat_detail (ec), nano::stat::dir::out); + }); } else { diff --git a/nano/node/request_aggregator.cpp b/nano/node/request_aggregator.cpp index 3311978d22..8a0c9224f4 100644 --- a/nano/node/request_aggregator.cpp +++ b/nano/node/request_aggregator.cpp @@ -26,13 +26,6 @@ nano::request_aggregator::request_aggregator (request_aggregator_config const & generator (generator_a), final_generator (final_generator_a) { - generator.set_reply_action ([this] (std::shared_ptr const & vote_a, std::shared_ptr const & channel_a) { - this->reply_action (vote_a, channel_a); - }); - final_generator.set_reply_action ([this] (std::shared_ptr const & vote_a, std::shared_ptr const & channel_a) { - this->reply_action (vote_a, channel_a); - }); - queue.max_size_query = [this] (auto const & origin) { return config.max_queue; }; @@ -159,7 +152,7 @@ void nano::request_aggregator::run_batch (nano::unique_lock & lock) transaction.refresh_if_needed (); - if (!channel->max ()) + if (!channel->max (nano::transport::traffic_type::vote_reply)) { process (transaction, request, channel); } @@ -192,12 +185,6 @@ void nano::request_aggregator::process (nano::secure::transaction const & transa } } -void nano::request_aggregator::reply_action (std::shared_ptr const & vote_a, std::shared_ptr const & channel_a) const -{ - nano::confirm_ack confirm{ network_constants, vote_a }; - channel_a->send (confirm); -} - void nano::request_aggregator::erase_duplicates (std::vector> & requests_a) const { std::sort (requests_a.begin (), requests_a.end (), [] (auto const & pair1, auto const & pair2) { diff --git a/nano/node/telemetry.cpp b/nano/node/telemetry.cpp index 7a349ff6b4..cadc3a7d80 100644 --- a/nano/node/telemetry.cpp +++ b/nano/node/telemetry.cpp @@ -214,7 +214,7 @@ void nano::telemetry::request (std::shared_ptr const & stats.inc (nano::stat::type::telemetry, nano::stat::detail::request); nano::telemetry_req message{ network_params.network }; - channel->send (message); + channel->send (message, nano::transport::traffic_type::telemetry); } void nano::telemetry::run_broadcasts () @@ -233,7 +233,7 @@ void nano::telemetry::broadcast (std::shared_ptr const stats.inc (nano::stat::type::telemetry, nano::stat::detail::broadcast); nano::telemetry_ack message{ network_params.network, telemetry }; - channel->send (message); + channel->send (message, nano::transport::traffic_type::telemetry); } void nano::telemetry::cleanup () diff --git a/nano/node/transport/channel.cpp b/nano/node/transport/channel.cpp index 7a0a9ce40c..ea470f2ec1 100644 --- a/nano/node/transport/channel.cpp +++ b/nano/node/transport/channel.cpp @@ -14,33 +14,12 @@ nano::transport::channel::channel (nano::node & node_a) : set_network_version (node_a.network_params.network.protocol_version); } -void nano::transport::channel::send (nano::message & message_a, std::function const & callback_a, nano::transport::buffer_drop_policy drop_policy_a, nano::transport::traffic_type traffic_type) +bool nano::transport::channel::send (nano::message const & message, nano::transport::traffic_type traffic_type, callback_t callback) { - auto buffer = message_a.to_shared_const_buffer (); - - bool is_droppable_by_limiter = (drop_policy_a == nano::transport::buffer_drop_policy::limiter); - bool should_pass = node.outbound_limiter.should_pass (buffer.size (), traffic_type); - bool pass = !is_droppable_by_limiter || should_pass; - - node.stats.inc (pass ? nano::stat::type::message : nano::stat::type::drop, to_stat_detail (message_a.type ()), nano::stat::dir::out, /* aggregate all */ true); - node.logger.trace (nano::log::type::channel_sent, to_log_detail (message_a.type ()), - nano::log::arg{ "message", message_a }, - nano::log::arg{ "channel", *this }, - nano::log::arg{ "dropped", !pass }); - - if (pass) - { - send_buffer (buffer, callback_a, drop_policy_a, traffic_type); - } - else - { - if (callback_a) - { - node.io_ctx.post ([callback_a] () { - callback_a (boost::system::errc::make_error_code (boost::system::errc::not_supported), 0); - }); - } - } + auto buffer = message.to_shared_const_buffer (); + bool sent = send_buffer (buffer, traffic_type, std::move (callback)); + node.stats.inc (sent ? nano::stat::type::message : nano::stat::type::drop, to_stat_detail (message.type ()), nano::stat::dir::out, /* aggregate all */ true); + return sent; } void nano::transport::channel::set_peering_endpoint (nano::endpoint endpoint) diff --git a/nano/node/transport/channel.hpp b/nano/node/transport/channel.hpp index 8f860f58c3..5911f1249c 100644 --- a/nano/node/transport/channel.hpp +++ b/nano/node/transport/channel.hpp @@ -22,21 +22,15 @@ enum class transport_type : uint8_t class channel { +public: + using callback_t = std::function; + public: explicit channel (nano::node &); virtual ~channel () = default; - void send (nano::message & message_a, - std::function const & callback_a = nullptr, - nano::transport::buffer_drop_policy policy_a = nano::transport::buffer_drop_policy::limiter, - nano::transport::traffic_type = nano::transport::traffic_type::generic); - - // TODO: investigate clang-tidy warning about default parameters on virtual/override functions - virtual void send_buffer (nano::shared_const_buffer const &, - std::function const & = nullptr, - nano::transport::buffer_drop_policy = nano::transport::buffer_drop_policy::limiter, - nano::transport::traffic_type = nano::transport::traffic_type::generic) - = 0; + /// @returns true if the message was sent (or queued to be sent), false if it was immediately dropped + bool send (nano::message const &, nano::transport::traffic_type, callback_t = nullptr); virtual void close () = 0; @@ -46,7 +40,7 @@ class channel virtual std::string to_string () const = 0; virtual nano::transport::transport_type get_type () const = 0; - virtual bool max (nano::transport::traffic_type = nano::transport::traffic_type::generic) + virtual bool max (nano::transport::traffic_type) { return false; } @@ -125,6 +119,9 @@ class channel std::shared_ptr owner () const; +protected: + virtual bool send_buffer (nano::shared_const_buffer const &, nano::transport::traffic_type, callback_t) = 0; + protected: nano::node & node; mutable nano::mutex mutex; diff --git a/nano/node/transport/fake.cpp b/nano/node/transport/fake.cpp index e99bc69f2f..81d3fec7aa 100644 --- a/nano/node/transport/fake.cpp +++ b/nano/node/transport/fake.cpp @@ -14,16 +14,16 @@ nano::transport::fake::channel::channel (nano::node & node) : /** * The send function behaves like a null device, it throws the data away and returns success. */ -void nano::transport::fake::channel::send_buffer (nano::shared_const_buffer const & buffer_a, std::function const & callback_a, nano::transport::buffer_drop_policy drop_policy_a, nano::transport::traffic_type traffic_type) +bool nano::transport::fake::channel::send_buffer (nano::shared_const_buffer const & buffer, nano::transport::traffic_type traffic_type, nano::transport::channel::callback_t callback) { - // auto bytes = buffer_a.to_bytes (); - auto size = buffer_a.size (); - if (callback_a) + auto size = buffer.size (); + if (callback) { - node.io_ctx.post ([callback_a, size] () { - callback_a (boost::system::errc::make_error_code (boost::system::errc::success), size); + node.io_ctx.post ([callback, size] () { + callback (boost::system::errc::make_error_code (boost::system::errc::success), size); }); } + return true; } std::string nano::transport::fake::channel::to_string () const diff --git a/nano/node/transport/fake.hpp b/nano/node/transport/fake.hpp index d9ce585cbb..f503b2bd58 100644 --- a/nano/node/transport/fake.hpp +++ b/nano/node/transport/fake.hpp @@ -19,12 +19,6 @@ namespace transport std::string to_string () const override; - void send_buffer ( - nano::shared_const_buffer const &, - std::function const & = nullptr, - nano::transport::buffer_drop_policy = nano::transport::buffer_drop_policy::limiter, - nano::transport::traffic_type = nano::transport::traffic_type::generic) override; - void set_endpoint (nano::endpoint const & endpoint_a) { endpoint = endpoint_a; @@ -55,6 +49,9 @@ namespace transport return !closed; } + protected: + bool send_buffer (nano::shared_const_buffer const &, nano::transport::traffic_type, nano::transport::channel::callback_t) override; + private: nano::endpoint endpoint; diff --git a/nano/node/transport/inproc.cpp b/nano/node/transport/inproc.cpp index 1c9f4c462b..78a4be9904 100644 --- a/nano/node/transport/inproc.cpp +++ b/nano/node/transport/inproc.cpp @@ -18,10 +18,10 @@ nano::transport::inproc::channel::channel (nano::node & node, nano::node & desti * Send the buffer to the peer and call the callback function when done. The call never fails. * Note that the inbound message visitor will be called before the callback because it is called directly whereas the callback is spawned in the background. */ -void nano::transport::inproc::channel::send_buffer (nano::shared_const_buffer const & buffer_a, std::function const & callback_a, nano::transport::buffer_drop_policy drop_policy_a, nano::transport::traffic_type traffic_type) +bool nano::transport::inproc::channel::send_buffer (nano::shared_const_buffer const & buffer, nano::transport::traffic_type traffic_type, nano::transport::channel::callback_t callback) { std::size_t offset{ 0 }; - auto const buffer_read_fn = [&offset, buffer_v = buffer_a.to_bytes ()] (std::shared_ptr> const & data_a, std::size_t size_a, std::function callback_a) { + auto const buffer_read_fn = [&offset, buffer_v = buffer.to_bytes ()] (std::shared_ptr> const & data_a, std::size_t size_a, std::function callback_a) { debug_assert (buffer_v.size () >= (offset + size_a)); data_a->resize (size_a); auto const copy_start = buffer_v.begin () + offset; @@ -48,12 +48,14 @@ void nano::transport::inproc::channel::send_buffer (nano::shared_const_buffer co } }); - if (callback_a) + if (callback) { - node.io_ctx.post ([callback_l = std::move (callback_a), buffer_size = buffer_a.size ()] () { + node.io_ctx.post ([callback_l = std::move (callback), buffer_size = buffer.size ()] () { callback_l (boost::system::errc::make_error_code (boost::system::errc::success), buffer_size); }); } + + return true; } std::string nano::transport::inproc::channel::to_string () const diff --git a/nano/node/transport/inproc.hpp b/nano/node/transport/inproc.hpp index d93fbed2d5..dfe932e777 100644 --- a/nano/node/transport/inproc.hpp +++ b/nano/node/transport/inproc.hpp @@ -17,9 +17,6 @@ namespace transport public: explicit channel (nano::node & node, nano::node & destination); - // TODO: investigate clang-tidy warning about default parameters on virtual/override functions - void send_buffer (nano::shared_const_buffer const &, std::function const & = nullptr, nano::transport::buffer_drop_policy = nano::transport::buffer_drop_policy::limiter, nano::transport::traffic_type = nano::transport::traffic_type::generic) override; - std::string to_string () const override; nano::endpoint get_remote_endpoint () const override @@ -42,6 +39,9 @@ namespace transport // Can't be closed } + protected: + bool send_buffer (nano::shared_const_buffer const &, nano::transport::traffic_type, nano::transport::channel::callback_t) override; + private: nano::node & destination; nano::endpoint const endpoint; diff --git a/nano/node/transport/tcp_channel.cpp b/nano/node/transport/tcp_channel.cpp index e596ec9951..3cf1297018 100644 --- a/nano/node/transport/tcp_channel.cpp +++ b/nano/node/transport/tcp_channel.cpp @@ -1,89 +1,195 @@ +#include #include #include #include #include #include +#include /* * tcp_channel */ -nano::transport::tcp_channel::tcp_channel (nano::node & node_a, std::weak_ptr socket_a) : +nano::transport::tcp_channel::tcp_channel (nano::node & node_a, std::shared_ptr socket_a) : channel (node_a), - socket (std::move (socket_a)) + socket{ socket_a }, + strand{ node_a.io_ctx.get_executor () }, + sending_task{ strand } { + stacktrace = nano::generate_stacktrace (); + remote_endpoint = socket_a->remote_endpoint (); + local_endpoint = socket_a->local_endpoint (); + start (); } nano::transport::tcp_channel::~tcp_channel () { - if (auto socket_l = socket.lock ()) + close (); + release_assert (!sending_task.joinable ()); +} + +void nano::transport::tcp_channel::close () +{ + stop (); + socket->close (); + closed = true; +} + +void nano::transport::tcp_channel::start () +{ + sending_task = nano::async::task (strand, [this] (nano::async::condition & condition) { + return start_sending (condition); // This is not a coroutine, but a corotuine factory + }); +} + +asio::awaitable nano::transport::tcp_channel::start_sending (nano::async::condition & condition) +{ + debug_assert (strand.running_in_this_thread ()); + try + { + co_await run_sending (condition); + } + catch (boost::system::system_error const & ex) { - socket_l->close (); + // Operation aborted is expected when cancelling the acceptor + debug_assert (ex.code () == asio::error::operation_aborted); } + debug_assert (strand.running_in_this_thread ()); } -void nano::transport::tcp_channel::update_endpoints () +void nano::transport::tcp_channel::stop () { - nano::lock_guard lock{ mutex }; + if (sending_task.joinable ()) + { + // Node context must be running to gracefully stop async tasks + debug_assert (!node.io_ctx.stopped ()); + // Ensure that we are not trying to await the task while running on the same thread / io_context + debug_assert (!node.io_ctx.get_executor ().running_in_this_thread ()); + sending_task.cancel (); + sending_task.join (); + } +} - debug_assert (remote_endpoint == nano::endpoint{}); // Not initialized endpoint value - debug_assert (local_endpoint == nano::endpoint{}); // Not initialized endpoint value +bool nano::transport::tcp_channel::max (nano::transport::traffic_type traffic_type) +{ + nano::lock_guard guard{ mutex }; + return queue.max (traffic_type); +} - if (auto socket_l = socket.lock ()) +bool nano::transport::tcp_channel::send_buffer (nano::shared_const_buffer const & buffer, nano::transport::traffic_type type, nano::transport::channel::callback_t callback) +{ + nano::unique_lock lock{ mutex }; + if (!queue.full (type)) + { + queue.push (type, { buffer, callback }); + lock.unlock (); + node.stats.inc (nano::stat::type::tcp_channel, nano::stat::detail::queued, nano::stat::dir::out); + node.stats.inc (nano::stat::type::tcp_channel_queued, to_stat_detail (type), nano::stat::dir::out); + sending_task.notify (); + return true; + } + else { - remote_endpoint = socket_l->remote_endpoint (); - local_endpoint = socket_l->local_endpoint (); + node.stats.inc (nano::stat::type::tcp_channel, nano::stat::detail::drop, nano::stat::dir::out); + node.stats.inc (nano::stat::type::tcp_channel_drop, to_stat_detail (type), nano::stat::dir::out); } + return false; } -void nano::transport::tcp_channel::send_buffer (nano::shared_const_buffer const & buffer_a, std::function const & callback_a, nano::transport::buffer_drop_policy policy_a, nano::transport::traffic_type traffic_type) +asio::awaitable nano::transport::tcp_channel::run_sending (nano::async::condition & condition) { - if (auto socket_l = socket.lock ()) + while (!co_await nano::async::cancelled ()) { - if (!socket_l->max (traffic_type) || (policy_a == nano::transport::buffer_drop_policy::no_socket_drop && !socket_l->full (traffic_type))) + debug_assert (strand.running_in_this_thread ()); + + auto next_batch = [this] () { + const size_t max_batch = 8; // TODO: Make this configurable + nano::lock_guard lock{ mutex }; + return queue.next_batch (max_batch); + }; + + if (auto batch = next_batch (); !batch.empty ()) { - socket_l->async_write ( - buffer_a, [this_s = shared_from_this (), endpoint_a = socket_l->remote_endpoint (), node = std::weak_ptr{ node.shared () }, callback_a] (boost::system::error_code const & ec, std::size_t size_a) { - if (auto node_l = node.lock ()) - { - if (!ec) - { - this_s->set_last_packet_sent (std::chrono::steady_clock::now ()); - } - if (ec == boost::system::errc::host_unreachable) - { - node_l->stats.inc (nano::stat::type::error, nano::stat::detail::unreachable_host, nano::stat::dir::out); - } - if (callback_a) - { - callback_a (ec, size_a); - } - } - }, - traffic_type); + for (auto const & [type, item] : batch) + { + co_await send_one (type, item); + } } else { - if (policy_a == nano::transport::buffer_drop_policy::no_socket_drop) - { - node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_no_socket_drop, nano::stat::dir::out); - } - else - { - node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_drop, nano::stat::dir::out); - } - if (callback_a) - { - callback_a (boost::system::errc::make_error_code (boost::system::errc::no_buffer_space), 0); - } + co_await condition.wait (); } } - else if (callback_a) +} + +asio::awaitable nano::transport::tcp_channel::send_one (traffic_type type, tcp_channel_queue::entry_t const & item) +{ + debug_assert (strand.running_in_this_thread ()); + + auto const & [buffer, callback] = item; + auto const size = buffer.size (); + + // Wait for socket + while (socket->full ()) + { + node.stats.inc (nano::stat::type::tcp_channel_wait, nano::stat::detail::wait_socket, nano::stat::dir::out); + co_await nano::async::sleep_for (100ms); // TODO: Exponential backoff + } + + // Wait for bandwidth + // This is somewhat inefficient + // The performance impact *should* be mitigated by the fact that we allocate it in larger chunks, so this happens relatively infrequently + const size_t bandwidth_chunk = 128 * 1024; // TODO: Make this configurable + while (allocated_bandwidth < size) { - node.io_ctx.post ([callback_a] () { - callback_a (boost::system::errc::make_error_code (boost::system::errc::not_supported), 0); - }); + // TODO: Consider implementing a subsribe/notification mechanism for bandwidth allocation + if (node.outbound_limiter.should_pass (bandwidth_chunk, type)) // Allocate bandwidth in larger chunks + { + allocated_bandwidth += bandwidth_chunk; + } + else + { + node.stats.inc (nano::stat::type::tcp_channel_wait, nano::stat::detail::wait_bandwidth, nano::stat::dir::out); + co_await nano::async::sleep_for (100ms); // TODO: Exponential backoff + } } + allocated_bandwidth -= size; + + node.stats.inc (nano::stat::type::tcp_channel, nano::stat::detail::send, nano::stat::dir::out); + node.stats.inc (nano::stat::type::tcp_channel_send, to_stat_detail (type), nano::stat::dir::out); + + socket->async_write (buffer, [this_w = weak_from_this (), callback, type] (boost::system::error_code const & ec, std::size_t size) { + if (auto this_l = this_w.lock ()) + { + this_l->node.stats.inc (nano::stat::type::tcp_channel_error, nano::to_stat_detail (ec), nano::stat::dir::out); + if (!ec) + { + this_l->node.stats.add (nano::stat::type::traffic_tcp_type, to_stat_detail (type), nano::stat::dir::out, size); + this_l->set_last_packet_sent (std::chrono::steady_clock::now ()); + } + } + if (callback) + { + callback (ec, size); + } + }); +} + +bool nano::transport::tcp_channel::alive () const +{ + return socket->alive (); +} + +nano::endpoint nano::transport::tcp_channel::get_remote_endpoint () const +{ + nano::lock_guard lock{ mutex }; + return remote_endpoint; +} + +nano::endpoint nano::transport::tcp_channel::get_local_endpoint () const +{ + nano::lock_guard lock{ mutex }; + return local_endpoint; } std::string nano::transport::tcp_channel::to_string () const @@ -94,6 +200,132 @@ std::string nano::transport::tcp_channel::to_string () const void nano::transport::tcp_channel::operator() (nano::object_stream & obs) const { nano::transport::channel::operator() (obs); // Write common data - obs.write ("socket", socket); } + +/* + * tcp_channel_queue + */ + +nano::transport::tcp_channel_queue::tcp_channel_queue () +{ + for (auto type : all_traffic_types ()) + { + queues.at (type) = { type, {} }; + } +} + +bool nano::transport::tcp_channel_queue::empty () const +{ + return std::all_of (queues.begin (), queues.end (), [] (auto const & queue) { + return queue.second.empty (); + }); +} + +size_t nano::transport::tcp_channel_queue::size () const +{ + return std::accumulate (queues.begin (), queues.end (), size_t{ 0 }, [] (size_t acc, auto const & queue) { + return acc + queue.second.size (); + }); +} + +size_t nano::transport::tcp_channel_queue::size (traffic_type type) const +{ + return queues.at (type).second.size (); +} + +bool nano::transport::tcp_channel_queue::max (traffic_type type) const +{ + return size (type) >= max_size; +} + +bool nano::transport::tcp_channel_queue::full (traffic_type type) const +{ + return size (type) >= full_size; +} + +void nano::transport::tcp_channel_queue::push (traffic_type type, entry_t entry) +{ + debug_assert (!full (type)); // Should be checked before calling this function + queues.at (type).second.push_back (entry); +} + +auto nano::transport::tcp_channel_queue::next () -> value_t +{ + debug_assert (!empty ()); // Should be checked before calling next + + auto should_seek = [&, this] () { + if (current == queues.end ()) + { + return true; + } + auto & queue = current->second; + if (queue.empty ()) + { + return true; + } + // Allow up to `priority` requests to be processed before moving to the next queue + if (counter >= priority (current->first)) + { + return true; + } + return false; + }; + + if (should_seek ()) + { + seek_next (); + } + + release_assert (current != queues.end ()); + + auto & source = current->first; + auto & queue = current->second; + + ++counter; + + release_assert (!queue.empty ()); + auto entry = queue.front (); + queue.pop_front (); + return { source, entry }; +} + +auto nano::transport::tcp_channel_queue::next_batch (size_t max_count) -> batch_t +{ + // TODO: Naive implementation, could be optimized + std::deque result; + while (!empty () && result.size () < max_count) + { + result.emplace_back (next ()); + } + return result; +} + +size_t nano::transport::tcp_channel_queue::priority (traffic_type type) const +{ + switch (type) + { + case traffic_type::block_broadcast: + case traffic_type::vote_rebroadcast: + return 1; + default: + return 4; + } +} + +void nano::transport::tcp_channel_queue::seek_next () +{ + counter = 0; + do + { + if (current != queues.end ()) + { + ++current; + } + if (current == queues.end ()) + { + current = queues.begin (); + } + release_assert (current != queues.end ()); + } while (current->second.empty ()); +} diff --git a/nano/node/transport/tcp_channel.hpp b/nano/node/transport/tcp_channel.hpp index 3936e3e9ac..f76634b672 100644 --- a/nano/node/transport/tcp_channel.hpp +++ b/nano/node/transport/tcp_channel.hpp @@ -1,81 +1,99 @@ #pragma once +#include +#include #include +#include #include namespace nano::transport { -class tcp_server; -class tcp_channels; -class tcp_channel; +class tcp_channel_queue final +{ +public: + explicit tcp_channel_queue (); + + using callback_t = std::function; + using entry_t = std::pair; + using value_t = std::pair; + using batch_t = std::deque; + + bool empty () const; + size_t size () const; + size_t size (traffic_type) const; + void push (traffic_type, entry_t); + value_t next (); + batch_t next_batch (size_t max_count); + + bool max (traffic_type) const; + bool full (traffic_type) const; + + constexpr static size_t max_size = 8; + constexpr static size_t full_size = 4 * max_size; + +private: + void seek_next (); + size_t priority (traffic_type) const; -class tcp_channel : public nano::transport::channel, public std::enable_shared_from_this + using queue_t = std::pair>; + nano::enum_array queues{}; + nano::enum_array::iterator current{ queues.end () }; + size_t counter{ 0 }; +}; + +class tcp_channel final : public nano::transport::channel, public std::enable_shared_from_this { friend class nano::transport::tcp_channels; public: - tcp_channel (nano::node &, std::weak_ptr); + tcp_channel (nano::node &, std::shared_ptr); ~tcp_channel () override; - void update_endpoints (); + void close () override; - // TODO: investigate clang-tidy warning about default parameters on virtual/override functions// - void send_buffer (nano::shared_const_buffer const &, std::function const & = nullptr, nano::transport::buffer_drop_policy = nano::transport::buffer_drop_policy::limiter, nano::transport::traffic_type = nano::transport::traffic_type::generic) override; + bool max (nano::transport::traffic_type traffic_type) override; + bool alive () const override; - std::string to_string () const override; - - nano::endpoint get_remote_endpoint () const override - { - nano::lock_guard lock{ mutex }; - return remote_endpoint; - } - - nano::endpoint get_local_endpoint () const override - { - nano::lock_guard lock{ mutex }; - return local_endpoint; - } + nano::endpoint get_remote_endpoint () const override; + nano::endpoint get_local_endpoint () const override; nano::transport::transport_type get_type () const override { return nano::transport::transport_type::tcp; } - bool max (nano::transport::traffic_type traffic_type) override - { - bool result = true; - if (auto socket_l = socket.lock ()) - { - result = socket_l->max (traffic_type); - } - return result; - } + std::string to_string () const override; - bool alive () const override - { - if (auto socket_l = socket.lock ()) - { - return socket_l->alive (); - } - return false; - } +protected: + bool send_buffer (nano::shared_const_buffer const &, nano::transport::traffic_type, nano::transport::channel::callback_t) override; - void close () override - { - if (auto socket_l = socket.lock ()) - { - socket_l->close (); - } - } +private: + void start (); + void stop (); + + asio::awaitable start_sending (nano::async::condition &); + asio::awaitable run_sending (nano::async::condition &); + asio::awaitable send_one (traffic_type, tcp_channel_queue::entry_t const &); public: - std::weak_ptr socket; + std::shared_ptr socket; private: nano::endpoint remote_endpoint; nano::endpoint local_endpoint; + nano::async::strand strand; + nano::async::task sending_task; + + mutable nano::mutex mutex; + tcp_channel_queue queue; + std::atomic allocated_bandwidth{ 0 }; + + // Debugging + std::atomic closed{ false }; + std::string stacktrace; + public: // Logging void operator() (nano::object_stream &) const override; }; -} \ No newline at end of file +} diff --git a/nano/node/transport/tcp_channels.cpp b/nano/node/transport/tcp_channels.cpp index 4b983cb7bb..62cc25b65b 100644 --- a/nano/node/transport/tcp_channels.cpp +++ b/nano/node/transport/tcp_channels.cpp @@ -34,17 +34,11 @@ void nano::transport::tcp_channels::close () { nano::lock_guard lock{ mutex }; - for (auto const & channel : channels) + for (auto const & entry : channels) { - if (channel.socket) - { - channel.socket->close (); - } - // Remove response server - if (channel.response_server) - { - channel.response_server->stop (); - } + entry.socket->close (); + entry.server->stop (); + entry.channel->close (); } channels.clear (); @@ -62,7 +56,7 @@ bool nano::transport::tcp_channels::check (const nano::tcp_endpoint & endpoint, if (node.network.not_a_peer (nano::transport::map_tcp_to_endpoint (endpoint), node.config.allow_local_peers)) { node.stats.inc (nano::stat::type::tcp_channels_rejected, nano::stat::detail::not_a_peer); - node.logger.debug (nano::log::type::tcp_channels, "Rejected invalid endpoint channel from: {}", fmt::streamed (endpoint)); + node.logger.debug (nano::log::type::tcp_channels, "Rejected invalid endpoint channel: {}", fmt::streamed (endpoint)); return false; // Reject } @@ -82,7 +76,7 @@ bool nano::transport::tcp_channels::check (const nano::tcp_endpoint & endpoint, if (has_duplicate) { node.stats.inc (nano::stat::type::tcp_channels_rejected, nano::stat::detail::channel_duplicate); - node.logger.debug (nano::log::type::tcp_channels, "Duplicate channel rejected from: {} ({})", fmt::streamed (endpoint), node_id.to_node_id ()); + node.logger.debug (nano::log::type::tcp_channels, "Rejected duplicate channel: {} ({})", fmt::streamed (endpoint), node_id.to_node_id ()); return false; // Reject } @@ -106,19 +100,19 @@ std::shared_ptr nano::transport::tcp_channels::cre if (!check (endpoint, node_id)) { node.stats.inc (nano::stat::type::tcp_channels, nano::stat::detail::channel_rejected); - node.logger.debug (nano::log::type::tcp_channels, "Rejected new channel from: {} ({})", fmt::streamed (endpoint), node_id.to_node_id ()); + node.logger.debug (nano::log::type::tcp_channels, "Rejected channel: {} ({})", fmt::streamed (endpoint), node_id.to_node_id ()); // Rejection reason should be logged earlier return nullptr; } node.stats.inc (nano::stat::type::tcp_channels, nano::stat::detail::channel_accepted); - node.logger.debug (nano::log::type::tcp_channels, "Accepted new channel from: {} ({})", + node.logger.debug (nano::log::type::tcp_channels, "Accepted channel: {} ({}) ({})", fmt::streamed (socket->remote_endpoint ()), + to_string (socket->endpoint_type ()), node_id.to_node_id ()); auto channel = std::make_shared (node, socket); - channel->update_endpoints (); channel->set_node_id (node_id); attempts.get ().erase (endpoint); @@ -128,7 +122,7 @@ std::shared_ptr nano::transport::tcp_channels::cre lock.unlock (); - node.network.channel_observer (channel); + node.observers.channel_connected.notify (channel); return channel; } @@ -157,7 +151,7 @@ std::shared_ptr nano::transport::tcp_channels::fin return result; } -std::unordered_set> nano::transport::tcp_channels::random_set (std::size_t count_a, uint8_t min_version, bool include_temporary_channels_a) const +std::unordered_set> nano::transport::tcp_channels::random_set (std::size_t count_a, uint8_t min_version) const { std::unordered_set> result; result.reserve (count_a); @@ -350,6 +344,7 @@ void nano::transport::tcp_channels::purge (std::chrono::steady_clock::time_point if (!entry.channel->alive ()) { node.logger.debug (nano::log::type::tcp_channels, "Removing dead channel: {}", entry.channel->to_string ()); + entry.channel->close (); return true; // Erase } return false; @@ -383,7 +378,7 @@ void nano::transport::tcp_channels::keepalive () for (auto & channel : to_wakeup) { - channel->send (message); + channel->send (message, nano::transport::traffic_type::keepalive); } } @@ -395,7 +390,7 @@ std::optional nano::transport::tcp_channels::sample_keepalive ( while (counter++ < channels.size ()) { auto index = rng.random (channels.size ()); - if (auto server = channels.get ()[index].response_server) + if (auto server = channels.get ()[index].server) { if (auto keepalive = server->pop_last_keepalive ()) { @@ -407,19 +402,24 @@ std::optional nano::transport::tcp_channels::sample_keepalive ( return std::nullopt; } -void nano::transport::tcp_channels::list (std::deque> & deque_a, uint8_t minimum_version_a, bool include_temporary_channels_a) +std::deque> nano::transport::tcp_channels::list (uint8_t minimum_version) const { nano::lock_guard lock{ mutex }; - // clang-format off - nano::transform_if (channels.get ().begin (), channels.get ().end (), std::back_inserter (deque_a), - [include_temporary_channels_a, minimum_version_a](auto & channel_a) { return channel_a.channel->get_network_version () >= minimum_version_a; }, - [](auto const & channel) { return channel.channel; }); - // clang-format on + + std::deque> result; + for (auto const & entry : channels) + { + if (entry.channel->get_network_version () >= minimum_version) + { + result.push_back (entry.channel); + } + } + return result; } -void nano::transport::tcp_channels::start_tcp (nano::endpoint const & endpoint) +bool nano::transport::tcp_channels::start_tcp (nano::endpoint const & endpoint) { - node.tcp_listener.connect (endpoint.address (), endpoint.port ()); + return node.tcp_listener.connect (endpoint.address (), endpoint.port ()); } nano::container_info nano::transport::tcp_channels::container_info () const diff --git a/nano/node/transport/tcp_channels.hpp b/nano/node/transport/tcp_channels.hpp index 8e1aeeccc0..939d2040fa 100644 --- a/nano/node/transport/tcp_channels.hpp +++ b/nano/node/transport/tcp_channels.hpp @@ -4,6 +4,7 @@ #include #include #include +#include #include #include @@ -40,22 +41,22 @@ class tcp_channels final std::size_t size () const; std::shared_ptr find_channel (nano::tcp_endpoint const &) const; void random_fill (std::array &) const; - std::unordered_set> random_set (std::size_t, uint8_t = 0, bool = false) const; std::shared_ptr find_node_id (nano::account const &); // Get the next peer for attempting a tcp connection nano::tcp_endpoint bootstrap_peer (); - bool max_ip_connections (nano::tcp_endpoint const & endpoint_a); - bool max_subnetwork_connections (nano::tcp_endpoint const & endpoint_a); - bool max_ip_or_subnetwork_connections (nano::tcp_endpoint const & endpoint_a); + bool max_ip_connections (nano::tcp_endpoint const & endpoint); + bool max_subnetwork_connections (nano::tcp_endpoint const & endpoint); + bool max_ip_or_subnetwork_connections (nano::tcp_endpoint const & endpoint); // Should we reach out to this endpoint with a keepalive message? If yes, register a new reachout attempt bool track_reachout (nano::endpoint const &); void purge (std::chrono::steady_clock::time_point cutoff_deadline); - void list (std::deque> &, uint8_t = 0, bool = true); + std::deque> list (uint8_t minimum_version = 0) const; + std::unordered_set> random_set (std::size_t max_count, uint8_t minimum_version = 0) const; void keepalive (); std::optional sample_keepalive (); // Connection start - void start_tcp (nano::endpoint const &); + bool start_tcp (nano::endpoint const &); nano::container_info container_info () const; @@ -70,14 +71,19 @@ class tcp_channels final class channel_entry final { public: - std::shared_ptr channel; - std::shared_ptr socket; - std::shared_ptr response_server; + std::shared_ptr channel; + std::shared_ptr socket; + std::shared_ptr server; public: - channel_entry (std::shared_ptr channel_a, std::shared_ptr socket_a, std::shared_ptr server_a) : - channel (std::move (channel_a)), socket (std::move (socket_a)), response_server (std::move (server_a)) + channel_entry (std::shared_ptr channel_a, std::shared_ptr socket_a, std::shared_ptr server_a) : + channel (std::move (channel_a)), + socket (std::move (socket_a)), + server (std::move (server_a)) { + release_assert (socket); + release_assert (server); + release_assert (channel); } nano::tcp_endpoint endpoint () const { diff --git a/nano/node/transport/tcp_listener.cpp b/nano/node/transport/tcp_listener.cpp index ad3fc63329..372d9bbbc6 100644 --- a/nano/node/transport/tcp_listener.cpp +++ b/nano/node/transport/tcp_listener.cpp @@ -27,7 +27,7 @@ nano::transport::tcp_listener::tcp_listener (uint16_t port_a, tcp_config const & task{ strand } { connection_accepted.add ([this] (auto const & socket, auto const & server) { - node.observers.socket_connected.notify (*socket); + node.observers.socket_connected.notify (socket); }); } @@ -66,40 +66,43 @@ void nano::transport::tcp_listener::start () throw; } - task = nano::async::task (strand, [this] () -> asio::awaitable { - try - { - logger.debug (nano::log::type::tcp_listener, "Starting acceptor"); + task = nano::async::task (strand, start_impl ()); - try - { - co_await run (); - } - catch (boost::system::system_error const & ex) - { - // Operation aborted is expected when cancelling the acceptor - debug_assert (ex.code () == asio::error::operation_aborted); - } - debug_assert (strand.running_in_this_thread ()); + cleanup_thread = std::thread ([this] { + nano::thread_role::set (nano::thread_role::name::tcp_listener); + run_cleanup (); + }); +} - logger.debug (nano::log::type::tcp_listener, "Stopped acceptor"); - } - catch (std::exception const & ex) +asio::awaitable nano::transport::tcp_listener::start_impl () +{ + try + { + logger.debug (nano::log::type::tcp_listener, "Starting acceptor"); + + try { - logger.critical (nano::log::type::tcp_listener, "Error: {}", ex.what ()); - release_assert (false); // Unexpected error + co_await run (); } - catch (...) + catch (boost::system::system_error const & ex) { - logger.critical (nano::log::type::tcp_listener, "Unknown error"); - release_assert (false); // Unexpected error + // Operation aborted is expected when cancelling the acceptor + debug_assert (ex.code () == asio::error::operation_aborted); } - }); + debug_assert (strand.running_in_this_thread ()); - cleanup_thread = std::thread ([this] { - nano::thread_role::set (nano::thread_role::name::tcp_listener); - run_cleanup (); - }); + logger.debug (nano::log::type::tcp_listener, "Stopped acceptor"); + } + catch (std::exception const & ex) + { + logger.critical (nano::log::type::tcp_listener, "Error: {}", ex.what ()); + release_assert (false); // Unexpected error + } + catch (...) + { + logger.critical (nano::log::type::tcp_listener, "Unknown error"); + release_assert (false); // Unexpected error + } } void nano::transport::tcp_listener::stop () @@ -408,7 +411,7 @@ auto nano::transport::tcp_listener::accept_one (asio::ip::tcp::socket raw_socket auto socket = std::make_shared (node, std::move (raw_socket), remote_endpoint, local_endpoint, to_socket_endpoint (type)); auto server = std::make_shared (socket, node.shared (), true); - connections.emplace_back (connection{ remote_endpoint, socket, server }); + connections.emplace_back (connection{ type, remote_endpoint, socket, server }); lock.unlock (); @@ -435,7 +438,9 @@ auto nano::transport::tcp_listener::check_limits (asio::ip::address const & ip, if (node.network.excluded_peers.check (ip)) // true => error { stats.inc (nano::stat::type::tcp_listener_rejected, nano::stat::detail::excluded, to_stat_dir (type)); - logger.debug (nano::log::type::tcp_listener, "Rejected connection from excluded peer: {}", ip.to_string ()); + logger.debug (nano::log::type::tcp_listener, "Rejected connection from excluded peer: {} ({})", + ip.to_string (), + to_string (type)); return accept_result::rejected; } @@ -445,21 +450,25 @@ auto nano::transport::tcp_listener::check_limits (asio::ip::address const & ip, if (auto count = count_per_ip (ip); count >= node.config.network.max_peers_per_ip) { stats.inc (nano::stat::type::tcp_listener_rejected, nano::stat::detail::max_per_ip, to_stat_dir (type)); - logger.debug (nano::log::type::tcp_listener, "Max connections per IP reached ({}), unable to open new connection: {}", - count, ip.to_string ()); + logger.debug (nano::log::type::tcp_listener, "Max connections: {} per IP: {} reached, unable to open a new connection ({})", + count, + ip.to_string (), + to_string (type)); return accept_result::rejected; } } - // If the address is IPv4 we don't check for a network limit, since its address space isn't big as IPv6/64. + // If the address is IPv4 we don't check for a subnetwork limit, since its address space isn't big as IPv6/64. if (!node.flags.disable_max_peers_per_subnetwork && !nano::transport::is_ipv4_or_v4_mapped_address (ip)) { if (auto count = count_per_subnetwork (ip); count >= node.config.network.max_peers_per_subnetwork) { stats.inc (nano::stat::type::tcp_listener_rejected, nano::stat::detail::max_per_subnetwork, to_stat_dir (type)); - logger.debug (nano::log::type::tcp_listener, "Max connections per subnetwork reached ({}), unable to open new connection: {}", - count, ip.to_string ()); + logger.debug (nano::log::type::tcp_listener, "Max connections: {} per subnetwork of IP: {} reached, unable to open a new connection ({})", + count, + ip.to_string (), + to_string (type)); return accept_result::rejected; } @@ -472,7 +481,7 @@ auto nano::transport::tcp_listener::check_limits (asio::ip::address const & ip, if (auto count = count_per_type (connection_type::inbound); count >= config.max_inbound_connections) { stats.inc (nano::stat::type::tcp_listener_rejected, nano::stat::detail::max_attempts, to_stat_dir (type)); - logger.debug (nano::log::type::tcp_listener, "Max inbound connections reached ({}), unable to accept new connection: {}", + logger.debug (nano::log::type::tcp_listener, "Max inbound connections reached: {}, unable to accept new connection: {}", count, ip.to_string ()); return accept_result::rejected; @@ -483,7 +492,7 @@ auto nano::transport::tcp_listener::check_limits (asio::ip::address const & ip, if (auto count = count_per_type (connection_type::outbound); count >= config.max_outbound_connections) { stats.inc (nano::stat::type::tcp_listener_rejected, nano::stat::detail::max_attempts, to_stat_dir (type)); - logger.debug (nano::log::type::tcp_listener, "Max outbound connections reached ({}), unable to initiate new connection: {}", + logger.debug (nano::log::type::tcp_listener, "Max outbound connections reached: {}, unable to initiate new connection: {}", count, ip.to_string ()); return accept_result::rejected; @@ -540,21 +549,15 @@ size_t nano::transport::tcp_listener::bootstrap_count () const size_t nano::transport::tcp_listener::count_per_type (connection_type type) const { debug_assert (!mutex.try_lock ()); - - return std::count_if (connections.begin (), connections.end (), [type] (auto const & connection) { - if (auto socket = connection.socket.lock ()) - { - return socket->endpoint_type () == to_socket_endpoint (type); - } - return false; + return std::count_if (connections.begin (), connections.end (), [&] (auto const & connection) { + return connection.type == type; }); } size_t nano::transport::tcp_listener::count_per_ip (asio::ip::address const & ip) const { debug_assert (!mutex.try_lock ()); - - return std::count_if (connections.begin (), connections.end (), [&ip] (auto const & connection) { + return std::count_if (connections.begin (), connections.end (), [&] (auto const & connection) { return nano::transport::is_same_ip (connection.address (), ip); }); } @@ -562,8 +565,7 @@ size_t nano::transport::tcp_listener::count_per_ip (asio::ip::address const & ip size_t nano::transport::tcp_listener::count_per_subnetwork (asio::ip::address const & ip) const { debug_assert (!mutex.try_lock ()); - - return std::count_if (connections.begin (), connections.end (), [this, &ip] (auto const & connection) { + return std::count_if (connections.begin (), connections.end (), [&] (auto const & connection) { return nano::transport::is_same_subnetwork (connection.address (), ip); }); } @@ -571,8 +573,7 @@ size_t nano::transport::tcp_listener::count_per_subnetwork (asio::ip::address co size_t nano::transport::tcp_listener::count_attempts (asio::ip::address const & ip) const { debug_assert (!mutex.try_lock ()); - - return std::count_if (attempts.begin (), attempts.end (), [&ip] (auto const & attempt) { + return std::count_if (attempts.begin (), attempts.end (), [&] (auto const & attempt) { return nano::transport::is_same_ip (attempt.address (), ip); }); } diff --git a/nano/node/transport/tcp_listener.hpp b/nano/node/transport/tcp_listener.hpp index 94fc9f12ae..b5634b31e1 100644 --- a/nano/node/transport/tcp_listener.hpp +++ b/nano/node/transport/tcp_listener.hpp @@ -80,13 +80,13 @@ class tcp_listener final size_t realtime_count () const; size_t bootstrap_count () const; - std::vector> sockets () const; - std::vector> servers () const; + std::vector> sockets () const; + std::vector> servers () const; nano::container_info container_info () const; public: // Events - using connection_accepted_event_t = nano::observer_set, std::shared_ptr>; + using connection_accepted_event_t = nano::observer_set, std::shared_ptr>; connection_accepted_event_t connection_accepted; private: // Dependencies @@ -96,6 +96,7 @@ class tcp_listener final nano::logger & logger; private: + asio::awaitable start_impl (); asio::awaitable run (); asio::awaitable wait_available_slots () const; @@ -133,9 +134,10 @@ class tcp_listener final private: struct connection { + connection_type type; asio::ip::tcp::endpoint endpoint; - std::weak_ptr socket; - std::weak_ptr server; + std::weak_ptr socket; + std::weak_ptr server; asio::ip::address address () const { diff --git a/nano/node/transport/tcp_socket.cpp b/nano/node/transport/tcp_socket.cpp index af8a9ed4e2..07bc9a4ec8 100644 --- a/nano/node/transport/tcp_socket.cpp +++ b/nano/node/transport/tcp_socket.cpp @@ -5,8 +5,6 @@ #include #include -#include - #include #include #include @@ -18,13 +16,14 @@ * socket */ -nano::transport::tcp_socket::tcp_socket (nano::node & node_a, nano::transport::socket_endpoint endpoint_type_a, std::size_t max_queue_size_a) : - tcp_socket{ node_a, boost::asio::ip::tcp::socket{ node_a.io_ctx }, {}, {}, endpoint_type_a, max_queue_size_a } +nano::transport::tcp_socket::tcp_socket (nano::node & node_a, nano::transport::socket_endpoint endpoint_type_a, size_t queue_size_a) : + tcp_socket{ node_a, boost::asio::ip::tcp::socket{ node_a.io_ctx }, {}, {}, endpoint_type_a, queue_size_a } { } -nano::transport::tcp_socket::tcp_socket (nano::node & node_a, boost::asio::ip::tcp::socket raw_socket_a, boost::asio::ip::tcp::endpoint remote_endpoint_a, boost::asio::ip::tcp::endpoint local_endpoint_a, nano::transport::socket_endpoint endpoint_type_a, std::size_t max_queue_size_a) : - send_queue{ max_queue_size_a }, +nano::transport::tcp_socket::tcp_socket (nano::node & node_a, boost::asio::ip::tcp::socket raw_socket_a, boost::asio::ip::tcp::endpoint remote_endpoint_a, boost::asio::ip::tcp::endpoint local_endpoint_a, nano::transport::socket_endpoint endpoint_type_a, size_t queue_size_a) : + queue_size{ queue_size_a }, + send_queue{ queue_size }, node_w{ node_a.shared () }, strand{ node_a.io_ctx.get_executor () }, raw_socket{ std::move (raw_socket_a) }, @@ -35,8 +34,7 @@ nano::transport::tcp_socket::tcp_socket (nano::node & node_a, boost::asio::ip::t last_completion_time_or_init{ nano::seconds_since_epoch () }, last_receive_time_or_init{ nano::seconds_since_epoch () }, default_timeout{ node_a.config.tcp_io_timeout }, - silent_connection_tolerance_time{ node_a.network_params.network.silent_connection_tolerance_time }, - max_queue_size{ max_queue_size_a } + silent_connection_tolerance_time{ node_a.network_params.network.silent_connection_tolerance_time } { } @@ -61,8 +59,7 @@ void nano::transport::tcp_socket::async_connect (nano::tcp_endpoint const & endp boost::asio::post (strand, [this_l = shared_from_this (), endpoint_a, callback = std::move (callback_a)] () { this_l->raw_socket.async_connect (endpoint_a, - boost::asio::bind_executor (this_l->strand, - [this_l, callback = std::move (callback), endpoint_a] (boost::system::error_code const & ec) { + boost::asio::bind_executor (this_l->strand, [this_l, callback = std::move (callback), endpoint_a] (boost::system::error_code const & ec) { debug_assert (this_l->strand.running_in_this_thread ()); auto node_l = this_l->node_w.lock (); @@ -72,6 +69,7 @@ void nano::transport::tcp_socket::async_connect (nano::tcp_endpoint const & endp } this_l->remote = endpoint_a; + if (ec) { node_l->stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_connect_error, nano::stat::dir::in); @@ -85,7 +83,10 @@ void nano::transport::tcp_socket::async_connect (nano::tcp_endpoint const & endp boost::system::error_code ec; this_l->local = this_l->raw_socket.local_endpoint (ec); } - node_l->observers.socket_connected.notify (*this_l); + + node_l->logger.debug (nano::log::type::tcp_socket, "Successfully connected to: {}, local: {}", + fmt::streamed (this_l->remote), + fmt::streamed (this_l->local)); } callback (ec); })); @@ -137,7 +138,7 @@ void nano::transport::tcp_socket::async_read (std::shared_ptr callback_a, nano::transport::traffic_type traffic_type) +void nano::transport::tcp_socket::async_write (nano::shared_const_buffer const & buffer_a, std::function callback_a) { auto node_l = node_w.lock (); if (!node_l) @@ -156,7 +157,7 @@ void nano::transport::tcp_socket::async_write (nano::shared_const_buffer const & return; } - bool queued = send_queue.insert (buffer_a, callback_a, traffic_type); + bool queued = send_queue.insert (buffer_a, callback_a, traffic_type::generic); if (!queued) { if (callback_a) @@ -215,7 +216,6 @@ void nano::transport::tcp_socket::write_queued_messages () else { node_l->stats.add (nano::stat::type::traffic_tcp, nano::stat::detail::all, nano::stat::dir::out, size, /* aggregate all */ true); - node_l->stats.add (nano::stat::type::traffic_tcp_type, to_stat_detail (type), nano::stat::dir::out, size); this_l->set_last_completion (); } @@ -231,14 +231,14 @@ void nano::transport::tcp_socket::write_queued_messages () })); } -bool nano::transport::tcp_socket::max (nano::transport::traffic_type traffic_type) const +bool nano::transport::tcp_socket::max () const { - return send_queue.size (traffic_type) >= max_queue_size; + return send_queue.size (traffic_type::generic) >= queue_size; } -bool nano::transport::tcp_socket::full (nano::transport::traffic_type traffic_type) const +bool nano::transport::tcp_socket::full () const { - return send_queue.size (traffic_type) >= 2 * max_queue_size; + return send_queue.size (traffic_type::generic) >= 2 * queue_size; } /** Call set_timeout with default_timeout as parameter */ @@ -317,8 +317,8 @@ void nano::transport::tcp_socket::ongoing_checkup () if (condition_to_disconnect) { - node_l->logger.debug (nano::log::type::tcp_server, "Closing socket due to timeout ({})", nano::util::to_str (this_l->remote)); - + // TODO: Stats + node_l->logger.debug (nano::log::type::tcp_socket, "Socket timeout, closing: {}", fmt::streamed (this_l->remote)); this_l->timed_out = true; this_l->close (); } @@ -394,7 +394,14 @@ void nano::transport::tcp_socket::close_internal () if (ec) { node_l->stats.inc (nano::stat::type::socket, nano::stat::detail::error_socket_close); - node_l->logger.error (nano::log::type::socket, "Failed to close socket gracefully: {} ({})", ec.message (), nano::util::to_str (remote)); + node_l->logger.error (nano::log::type::tcp_socket, "Failed to close socket gracefully: {} ({})", + fmt::streamed (remote), + ec.message ()); + } + else + { + // TODO: Stats + node_l->logger.debug (nano::log::type::tcp_socket, "Closed socket: {}", fmt::streamed (remote)); } } @@ -458,10 +465,6 @@ auto nano::transport::socket_queue::pop () -> std::optional { return item; } - if (auto item = try_pop (nano::transport::traffic_type::bootstrap)) - { - return item; - } return std::nullopt; } @@ -490,45 +493,6 @@ bool nano::transport::socket_queue::empty () const }); } -/* - * socket_functions - */ - -boost::asio::ip::network_v6 nano::transport::socket_functions::get_ipv6_subnet_address (boost::asio::ip::address_v6 const & ip_address, std::size_t network_prefix) -{ - return boost::asio::ip::make_network_v6 (ip_address, static_cast (network_prefix)); -} - -boost::asio::ip::address nano::transport::socket_functions::first_ipv6_subnet_address (boost::asio::ip::address_v6 const & ip_address, std::size_t network_prefix) -{ - auto range = get_ipv6_subnet_address (ip_address, network_prefix).hosts (); - debug_assert (!range.empty ()); - return *(range.begin ()); -} - -boost::asio::ip::address nano::transport::socket_functions::last_ipv6_subnet_address (boost::asio::ip::address_v6 const & ip_address, std::size_t network_prefix) -{ - auto range = get_ipv6_subnet_address (ip_address, network_prefix).hosts (); - debug_assert (!range.empty ()); - return *(--range.end ()); -} - -std::size_t nano::transport::socket_functions::count_subnetwork_connections ( -nano::transport::address_socket_mmap const & per_address_connections, -boost::asio::ip::address_v6 const & remote_address, -std::size_t network_prefix) -{ - auto range = get_ipv6_subnet_address (remote_address, network_prefix).hosts (); - if (range.empty ()) - { - return 0; - } - auto const first_ip = first_ipv6_subnet_address (remote_address, network_prefix); - auto const last_ip = last_ipv6_subnet_address (remote_address, network_prefix); - auto const counted_connections = std::distance (per_address_connections.lower_bound (first_ip), per_address_connections.upper_bound (last_ip)); - return counted_connections; -} - /* * */ diff --git a/nano/node/transport/tcp_socket.hpp b/nano/node/transport/tcp_socket.hpp index c09448533b..6e1ba40da4 100644 --- a/nano/node/transport/tcp_socket.hpp +++ b/nano/node/transport/tcp_socket.hpp @@ -67,10 +67,10 @@ class tcp_socket final : public std::enable_shared_from_this friend class tcp_listener; public: - static std::size_t constexpr default_max_queue_size = 128; + static size_t constexpr default_queue_size = 16; public: - explicit tcp_socket (nano::node &, nano::transport::socket_endpoint = socket_endpoint::client, std::size_t max_queue_size = default_max_queue_size); + explicit tcp_socket (nano::node &, nano::transport::socket_endpoint = socket_endpoint::client, size_t queue_size = default_queue_size); // TODO: Accepting remote/local endpoints as a parameter is unnecessary, but is needed for now to keep compatibility with the legacy code tcp_socket ( @@ -79,7 +79,7 @@ class tcp_socket final : public std::enable_shared_from_this boost::asio::ip::tcp::endpoint remote_endpoint, boost::asio::ip::tcp::endpoint local_endpoint, nano::transport::socket_endpoint = socket_endpoint::server, - std::size_t max_queue_size = default_max_queue_size); + size_t queue_size = default_queue_size); ~tcp_socket (); @@ -97,8 +97,7 @@ class tcp_socket final : public std::enable_shared_from_this void async_write ( nano::shared_const_buffer const &, - std::function callback = {}, - traffic_type = traffic_type::generic); + std::function callback = nullptr); boost::asio::ip::tcp::endpoint remote_endpoint () const; boost::asio::ip::tcp::endpoint local_endpoint () const; @@ -110,8 +109,8 @@ class tcp_socket final : public std::enable_shared_from_this std::chrono::seconds get_default_timeout_value () const; void set_timeout (std::chrono::seconds); - bool max (nano::transport::traffic_type = traffic_type::generic) const; - bool full (nano::transport::traffic_type = traffic_type::generic) const; + bool max () const; + bool full () const; nano::transport::socket_type type () const { @@ -143,6 +142,7 @@ class tcp_socket final : public std::enable_shared_from_this } private: + size_t const queue_size; socket_queue send_queue; protected: @@ -200,20 +200,7 @@ class tcp_socket final : public std::enable_shared_from_this socket_endpoint const endpoint_type_m; std::atomic type_m{ socket_type::undefined }; -public: - std::size_t const max_queue_size; - public: // Logging virtual void operator() (nano::object_stream &) const; }; - -using address_socket_mmap = std::multimap>; - -namespace socket_functions -{ - boost::asio::ip::network_v6 get_ipv6_subnet_address (boost::asio::ip::address_v6 const &, std::size_t); - boost::asio::ip::address first_ipv6_subnet_address (boost::asio::ip::address_v6 const &, std::size_t); - boost::asio::ip::address last_ipv6_subnet_address (boost::asio::ip::address_v6 const &, std::size_t); - std::size_t count_subnetwork_connections (nano::transport::address_socket_mmap const &, boost::asio::ip::address_v6 const &, std::size_t); -} } diff --git a/nano/node/transport/traffic_type.cpp b/nano/node/transport/traffic_type.cpp index bfb12a6573..9d170c7133 100644 --- a/nano/node/transport/traffic_type.cpp +++ b/nano/node/transport/traffic_type.cpp @@ -1,6 +1,19 @@ #include +#include #include +#include + +std::string_view nano::transport::to_string (nano::transport::traffic_type type) +{ + return nano::enum_util::name (type); +} + +std::vector nano::transport::all_traffic_types () +{ + return nano::enum_util::values (); +} + nano::stat::detail nano::transport::to_stat_detail (nano::transport::traffic_type type) { return nano::enum_util::cast (type); diff --git a/nano/node/transport/traffic_type.hpp b/nano/node/transport/traffic_type.hpp index 1f0914cb06..f0f451b3a7 100644 --- a/nano/node/transport/traffic_type.hpp +++ b/nano/node/transport/traffic_type.hpp @@ -2,16 +2,30 @@ #include +#include +#include + namespace nano::transport { -/** - * Used for message prioritization and bandwidth limits - */ enum class traffic_type { generic, - bootstrap, // Ascending bootstrap (asc_pull_ack, asc_pull_req) traffic + bootstrap_server, + bootstrap_requests, + block_broadcast, + block_broadcast_initial, + block_broadcast_rpc, + confirmation_requests, + keepalive, + vote, + vote_rebroadcast, + vote_reply, + rep_crawler, + telemetry, + test, }; +std::string_view to_string (traffic_type); +std::vector all_traffic_types (); nano::stat::detail to_stat_detail (traffic_type); } \ No newline at end of file diff --git a/nano/node/transport/transport.cpp b/nano/node/transport/transport.cpp index 21f5fea047..2b7d83630b 100644 --- a/nano/node/transport/transport.cpp +++ b/nano/node/transport/transport.cpp @@ -165,3 +165,61 @@ bool nano::transport::reserved_address (nano::endpoint const & endpoint_a, bool } return result; } + +nano::stat::detail nano::to_stat_detail (boost::system::error_code const & ec) +{ + switch (ec.value ()) + { + case boost::system::errc::success: + return nano::stat::detail::success; + case boost::system::errc::no_buffer_space: + return nano::stat::detail::no_buffer_space; + case boost::system::errc::timed_out: + return nano::stat::detail::timed_out; + case boost::system::errc::host_unreachable: + return nano::stat::detail::host_unreachable; + case boost::system::errc::not_supported: + return nano::stat::detail::not_supported; + default: + return nano::stat::detail::other; + } +} + +/* + * socket_functions + */ + +boost::asio::ip::network_v6 nano::transport::socket_functions::get_ipv6_subnet_address (boost::asio::ip::address_v6 const & ip_address, std::size_t network_prefix) +{ + return boost::asio::ip::make_network_v6 (ip_address, static_cast (network_prefix)); +} + +boost::asio::ip::address nano::transport::socket_functions::first_ipv6_subnet_address (boost::asio::ip::address_v6 const & ip_address, std::size_t network_prefix) +{ + auto range = get_ipv6_subnet_address (ip_address, network_prefix).hosts (); + debug_assert (!range.empty ()); + return *(range.begin ()); +} + +boost::asio::ip::address nano::transport::socket_functions::last_ipv6_subnet_address (boost::asio::ip::address_v6 const & ip_address, std::size_t network_prefix) +{ + auto range = get_ipv6_subnet_address (ip_address, network_prefix).hosts (); + debug_assert (!range.empty ()); + return *(--range.end ()); +} + +std::size_t nano::transport::socket_functions::count_subnetwork_connections ( +nano::transport::address_socket_mmap const & per_address_connections, +boost::asio::ip::address_v6 const & remote_address, +std::size_t network_prefix) +{ + auto range = get_ipv6_subnet_address (remote_address, network_prefix).hosts (); + if (range.empty ()) + { + return 0; + } + auto const first_ip = first_ipv6_subnet_address (remote_address, network_prefix); + auto const last_ip = last_ipv6_subnet_address (remote_address, network_prefix); + auto const counted_connections = std::distance (per_address_connections.lower_bound (first_ip), per_address_connections.upper_bound (last_ip)); + return counted_connections; +} \ No newline at end of file diff --git a/nano/node/transport/transport.hpp b/nano/node/transport/transport.hpp index 5703a8c73d..0d87402ba2 100644 --- a/nano/node/transport/transport.hpp +++ b/nano/node/transport/transport.hpp @@ -24,4 +24,19 @@ bool is_same_subnetwork (boost::asio::ip::address const &, boost::asio::ip::addr // Unassigned, reserved, self bool reserved_address (nano::endpoint const &, bool allow_local_peers = false); + +using address_socket_mmap = std::multimap>; + +namespace socket_functions +{ + boost::asio::ip::network_v6 get_ipv6_subnet_address (boost::asio::ip::address_v6 const &, std::size_t); + boost::asio::ip::address first_ipv6_subnet_address (boost::asio::ip::address_v6 const &, std::size_t); + boost::asio::ip::address last_ipv6_subnet_address (boost::asio::ip::address_v6 const &, std::size_t); + std::size_t count_subnetwork_connections (nano::transport::address_socket_mmap const &, boost::asio::ip::address_v6 const &, std::size_t); } +} + +namespace nano +{ +nano::stat::detail to_stat_detail (boost::system::error_code const &); +} \ No newline at end of file diff --git a/nano/node/vote_generator.cpp b/nano/node/vote_generator.cpp index 2c0e4292df..c1651a69c1 100644 --- a/nano/node/vote_generator.cpp +++ b/nano/node/vote_generator.cpp @@ -173,12 +173,6 @@ std::size_t nano::vote_generator::generate (std::vector const &, std::shared_ptr const &)> action_a) -{ - release_assert (!reply_action); - reply_action = action_a; -} - void nano::vote_generator::broadcast (nano::unique_lock & lock_a) { debug_assert (lock_a.owns_lock ()); @@ -218,6 +212,10 @@ void nano::vote_generator::broadcast (nano::unique_lock & lock_a) void nano::vote_generator::reply (nano::unique_lock & lock_a, request_t && request_a) { + if (request_a.second->max (nano::transport::traffic_type::vote_reply)) + { + return; + } lock_a.unlock (); auto i (request_a.first.cbegin ()); auto n (request_a.first.cend ()); @@ -246,9 +244,11 @@ void nano::vote_generator::reply (nano::unique_lock & lock_a, reque if (!hashes.empty ()) { stats.add (nano::stat::type::requests, nano::stat::detail::requests_generated_hashes, stat::dir::in, hashes.size ()); - vote (hashes, roots, [this, &channel = request_a.second] (std::shared_ptr const & vote_a) { - this->reply_action (vote_a, channel); - this->stats.inc (nano::stat::type::requests, nano::stat::detail::requests_generated_votes, stat::dir::in); + + vote (hashes, roots, [this, channel = request_a.second] (std::shared_ptr const & vote_a) { + nano::confirm_ack confirm{ config.network_params.network, vote_a }; + channel->send (confirm, nano::transport::traffic_type::vote_reply); + stats.inc (nano::stat::type::requests, nano::stat::detail::requests_generated_votes, stat::dir::in); }); } } diff --git a/nano/node/vote_generator.hpp b/nano/node/vote_generator.hpp index 76fc0c583a..dbec5fb25f 100644 --- a/nano/node/vote_generator.hpp +++ b/nano/node/vote_generator.hpp @@ -40,7 +40,6 @@ class vote_generator final void add (nano::root const &, nano::block_hash const &); /** Queue blocks for vote generation, returning the number of successful candidates.*/ std::size_t generate (std::vector> const & blocks_a, std::shared_ptr const & channel_a); - void set_reply_action (std::function const &, std::shared_ptr const &)>); void start (); void stop (); @@ -59,9 +58,6 @@ class vote_generator final bool should_vote (transaction_variant_t const &, nano::root const &, nano::block_hash const &) const; bool broadcast_predicate () const; -private: - std::function const &, std::shared_ptr &)> reply_action; // must be set only during initialization by using set_reply_action - private: // Dependencies nano::node_config const & config; nano::node & node; diff --git a/nano/node/wallet.cpp b/nano/node/wallet.cpp index c2c2101c70..9d2fa9d74d 100644 --- a/nano/node/wallet.cpp +++ b/nano/node/wallet.cpp @@ -970,7 +970,7 @@ std::shared_ptr nano::wallet::send_action (nano::account const & so if (block != nullptr) { cached_block = true; - wallets.node.network.flood_block (block, nano::transport::buffer_drop_policy::no_limiter_drop); + wallets.node.network.flood_block (block, nano::transport::traffic_type::block_broadcast_initial); } } else if (status != MDB_NOTFOUND) diff --git a/nano/qt/qt.cpp b/nano/qt/qt.cpp index b2a17b3d92..84a215c451 100644 --- a/nano/qt/qt.cpp +++ b/nano/qt/qt.cpp @@ -739,7 +739,7 @@ void nano_qt::block_viewer::rebroadcast_action (nano::block_hash const & hash_a) auto block (wallet.node.ledger.any.block_get (transaction, hash_a)); if (block != nullptr) { - wallet.node.network.flood_block (block); + wallet.node.network.flood_block (block, nano::transport::traffic_type::block_broadcast_initial); auto successor = wallet.node.ledger.any.block_successor (transaction, hash_a); if (successor) { @@ -1356,7 +1356,7 @@ void nano_qt::wallet::start () })); } }); - node.observers.endpoint.add ([this_w] (std::shared_ptr const &) { + node.observers.channel_connected.add ([this_w] (std::shared_ptr const &) { if (auto this_l = this_w.lock ()) { this_l->application.postEvent (&this_l->processor, new eventloop_event ([this_w] () { diff --git a/nano/slow_test/node.cpp b/nano/slow_test/node.cpp index 75bf0ea2ff..a96e12b47e 100644 --- a/nano/slow_test/node.cpp +++ b/nano/slow_test/node.cpp @@ -300,7 +300,7 @@ TEST (node, fork_storm) auto open_result (node_i->process (open)); ASSERT_EQ (nano::block_status::progress, open_result); auto transaction (node_i->store.tx_begin_read ()); - node_i->network.flood_block (open); + node_i->network.flood_block (open, nano::transport::traffic_type::test); } } auto again (true);