From 9ca0c64dd7d5b2b5c75294a751e95179a9bcba63 Mon Sep 17 00:00:00 2001 From: Sergey Kroshnin Date: Wed, 22 May 2019 17:45:16 +0300 Subject: [PATCH] Send live network messages over TCP (#1962) - merge_peer function modified to try TCP connection before UDP - TCP channels ongoing_keepalive function used to wake up long not sending channels to prevent TCP disconnection from server - Live TCP connection starts with node ID handshake - After handshake node sends preferred peering ports for response channels (keepalive self) - Received TCP live message is responded to sender with response channels - Using https://github.com/nanocurrency/nano-node/pull/1938 multi-writer for TCP channel - Multiple nodes system () tests using TCP connections by default - Some tests modified to check 2 live network options: TCP connections & UDP connections - tcp_incoming_connections_max added, max established incoming TCP connections - bootstrap_server modified to accept live network messages --- nano/core_test/active_transactions.cpp | 8 + nano/core_test/network.cpp | 236 ++++++---- nano/core_test/node.cpp | 297 +++++++------ nano/core_test/peer_container.cpp | 18 +- nano/core_test/wallets.cpp | 1 + nano/nano_node/entry.cpp | 2 +- nano/node/bootstrap.cpp | 222 +++++++--- nano/node/bootstrap.hpp | 11 + nano/node/common.cpp | 1 + nano/node/common.hpp | 18 + nano/node/json_handler.cpp | 11 +- nano/node/node.cpp | 215 ++++++++- nano/node/node.hpp | 19 + nano/node/nodeconfig.cpp | 3 + nano/node/nodeconfig.hpp | 2 + nano/node/repcrawler.cpp | 11 +- nano/node/testing.cpp | 32 +- nano/node/testing.hpp | 4 +- nano/node/transport/tcp.cpp | 581 ++++++++++++++++++++++++- nano/node/transport/tcp.hpp | 135 ++++++ nano/node/transport/transport.cpp | 6 +- nano/node/transport/transport.hpp | 3 + nano/node/transport/udp.cpp | 132 +++--- nano/node/transport/udp.hpp | 19 +- nano/qt/qt.cpp | 6 +- nano/rpc_test/rpc.cpp | 2 +- nano/slow_test/node.cpp | 2 +- 27 files changed, 1602 insertions(+), 395 deletions(-) diff --git a/nano/core_test/active_transactions.cpp b/nano/core_test/active_transactions.cpp index b59f31a588..3e23cd6d50 100644 --- a/nano/core_test/active_transactions.cpp +++ b/nano/core_test/active_transactions.cpp @@ -105,6 +105,7 @@ TEST (active_transactions, adjusted_difficulty_priority) } } + system.deadline_set (10s); while (node1.active.confirmed.size () != 4) { ASSERT_NO_ERROR (system.poll ()); @@ -126,6 +127,7 @@ TEST (active_transactions, adjusted_difficulty_priority) node1.process_active (send6); //key1 node1.process_active (send8); //key2 + system.deadline_set (10s); while (node1.active.size () != 6) { ASSERT_NO_ERROR (system.poll ()); @@ -183,12 +185,14 @@ TEST (active_transactions, keep_local) auto open2 (std::make_shared (key4.pub, 0, key4.pub, nano::xrb_ratio, send4->hash (), key4.prv, key4.pub, system.work.generate (key4.pub))); node1.process_active (open2); //none are dropped since none are long_unconfirmed + system.deadline_set (10s); while (node1.active.size () != 4) { ASSERT_NO_ERROR (system.poll ()); } auto done (false); //wait for all to be long_unconfirmed + system.deadline_set (10s); while (!done) { ASSERT_FALSE (node1.active.empty ()); @@ -201,6 +205,7 @@ TEST (active_transactions, keep_local) auto send5 (wallet.send_action (nano::test_genesis_key.pub, key1.pub, node1.config.receive_minimum.number ())); node1.active.start (send5); //drop two lowest non-wallet managed active_transactions before inserting a new into active as all are long_unconfirmed + system.deadline_set (10s); while (node1.active.size () != 3) { ASSERT_NO_ERROR (system.poll ()); @@ -256,6 +261,7 @@ TEST (active_transactions, prioritize_chains) node1.process_active (send4); node1.process_active (send6); + system.deadline_set (10s); while (node1.active.size () != 4) { ASSERT_NO_ERROR (system.poll ()); @@ -273,12 +279,14 @@ TEST (active_transactions, prioritize_chains) } std::this_thread::sleep_for (1s); node1.process_active (open2); + system.deadline_set (10s); while (node1.active.size () != 4) { ASSERT_NO_ERROR (system.poll ()); } //wait for all to be long_unconfirmed done = false; + system.deadline_set (10s); while (!done) { { diff --git a/nano/core_test/network.cpp b/nano/core_test/network.cpp index 968d630566..f6c6681bc0 100644 --- a/nano/core_test/network.cpp +++ b/nano/core_test/network.cpp @@ -92,9 +92,65 @@ TEST (network, send_node_id_handshake) } ASSERT_EQ (1, system.nodes[0]->network.size ()); ASSERT_EQ (1, node1->network.size ()); - auto list1 (system.nodes[0]->network.udp_channels.list (1)); + auto list1 (system.nodes[0]->network.list (1)); ASSERT_EQ (node1->network.endpoint (), list1[0]->get_endpoint ()); - auto list2 (node1->network.udp_channels.list (1)); + auto list2 (node1->network.list (1)); + ASSERT_EQ (system.nodes[0]->network.endpoint (), list2[0]->get_endpoint ()); + node1->stop (); +} + +TEST (network, send_node_id_handshake_tcp) +{ + nano::system system (24000, 1); + ASSERT_EQ (0, system.nodes[0]->network.size ()); + nano::node_init init1; + auto node1 (std::make_shared (init1, system.io_ctx, 24001, nano::unique_path (), system.alarm, system.logging, system.work)); + node1->start (); + system.nodes.push_back (node1); + auto initial (system.nodes[0]->stats.count (nano::stat::type::message, nano::stat::detail::node_id_handshake, nano::stat::dir::in)); + auto initial_node1 (node1->stats.count (nano::stat::type::message, nano::stat::detail::node_id_handshake, nano::stat::dir::in)); + auto initial_keepalive (system.nodes[0]->stats.count (nano::stat::type::message, nano::stat::detail::keepalive, nano::stat::dir::in)); + std::weak_ptr node_w (system.nodes[0]); + system.nodes[0]->network.tcp_channels.start_tcp (node1->network.endpoint (), [node_w](std::shared_ptr channel_a) { + if (auto node_l = node_w.lock ()) + { + node_l->network.send_keepalive (channel_a); + } + }); + ASSERT_EQ (0, system.nodes[0]->network.size ()); + ASSERT_EQ (0, node1->network.size ()); + system.deadline_set (10s); + while (system.nodes[0]->stats.count (nano::stat::type::message, nano::stat::detail::node_id_handshake, nano::stat::dir::in) < initial + 2) + { + ASSERT_NO_ERROR (system.poll ()); + } + system.deadline_set (5s); + while (node1->stats.count (nano::stat::type::message, nano::stat::detail::node_id_handshake, nano::stat::dir::in) < initial_node1 + 2) + { + ASSERT_NO_ERROR (system.poll ()); + } + system.deadline_set (5s); + while (system.nodes[0]->network.response_channels_size () != 1 || node1->network.response_channels_size () != 1) + { + ASSERT_NO_ERROR (system.poll ()); + } + system.deadline_set (5s); + while (system.nodes[0]->stats.count (nano::stat::type::message, nano::stat::detail::keepalive, nano::stat::dir::in) < initial_keepalive + 2) + { + ASSERT_NO_ERROR (system.poll ()); + } + system.deadline_set (5s); + while (node1->stats.count (nano::stat::type::message, nano::stat::detail::keepalive, nano::stat::dir::in) < initial_keepalive + 2) + { + ASSERT_NO_ERROR (system.poll ()); + } + ASSERT_EQ (1, system.nodes[0]->network.size ()); + ASSERT_EQ (1, node1->network.size ()); + auto list1 (system.nodes[0]->network.list (1)); + ASSERT_EQ (nano::transport::transport_type::tcp, list1[0]->get_type ()); + ASSERT_EQ (node1->network.endpoint (), list1[0]->get_endpoint ()); + auto list2 (node1->network.list (1)); + ASSERT_EQ (nano::transport::transport_type::tcp, list2[0]->get_type ()); ASSERT_EQ (system.nodes[0]->network.endpoint (), list2[0]->get_endpoint ()); node1->stop (); } @@ -213,49 +269,57 @@ TEST (network, send_invalid_publish) TEST (network, send_valid_confirm_ack) { - nano::system system (24000, 2); - nano::keypair key2; - system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv); - system.wallet (1)->insert_adhoc (key2.prv); - nano::block_hash latest1 (system.nodes[0]->latest (nano::test_genesis_key.pub)); - nano::send_block block2 (latest1, key2.pub, 50, nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (latest1)); - nano::block_hash latest2 (system.nodes[1]->latest (nano::test_genesis_key.pub)); - system.nodes[0]->process_active (std::make_shared (block2)); - system.deadline_set (10s); - // Keep polling until latest block changes - while (system.nodes[1]->latest (nano::test_genesis_key.pub) == latest2) - { - ASSERT_NO_ERROR (system.poll ()); + std::vector types{ nano::transport::transport_type::tcp, nano::transport::transport_type::udp }; + for (auto & type : types) + { + nano::system system (24000, 2, type); + nano::keypair key2; + system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv); + system.wallet (1)->insert_adhoc (key2.prv); + nano::block_hash latest1 (system.nodes[0]->latest (nano::test_genesis_key.pub)); + nano::send_block block2 (latest1, key2.pub, 50, nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (latest1)); + nano::block_hash latest2 (system.nodes[1]->latest (nano::test_genesis_key.pub)); + system.nodes[0]->process_active (std::make_shared (block2)); + system.deadline_set (10s); + // Keep polling until latest block changes + while (system.nodes[1]->latest (nano::test_genesis_key.pub) == latest2) + { + ASSERT_NO_ERROR (system.poll ()); + } + // Make sure the balance has decreased after processing the block. + ASSERT_EQ (50, system.nodes[1]->balance (nano::test_genesis_key.pub)); } - // Make sure the balance has decreased after processing the block. - ASSERT_EQ (50, system.nodes[1]->balance (nano::test_genesis_key.pub)); } TEST (network, send_valid_publish) { - nano::system system (24000, 2); - system.nodes[0]->bootstrap_initiator.stop (); - system.nodes[1]->bootstrap_initiator.stop (); - system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv); - nano::keypair key2; - system.wallet (1)->insert_adhoc (key2.prv); - nano::block_hash latest1 (system.nodes[0]->latest (nano::test_genesis_key.pub)); - nano::send_block block2 (latest1, key2.pub, 50, nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (latest1)); - auto hash2 (block2.hash ()); - nano::block_hash latest2 (system.nodes[1]->latest (nano::test_genesis_key.pub)); - system.nodes[1]->process_active (std::make_shared (block2)); - system.deadline_set (10s); - while (system.nodes[0]->stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in) == 0) - { - ASSERT_NO_ERROR (system.poll ()); - } - ASSERT_NE (hash2, latest2); - system.deadline_set (10s); - while (system.nodes[1]->latest (nano::test_genesis_key.pub) == latest2) - { - ASSERT_NO_ERROR (system.poll ()); + std::vector types{ nano::transport::transport_type::tcp, nano::transport::transport_type::udp }; + for (auto & type : types) + { + nano::system system (24000, 2, type); + system.nodes[0]->bootstrap_initiator.stop (); + system.nodes[1]->bootstrap_initiator.stop (); + system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv); + nano::keypair key2; + system.wallet (1)->insert_adhoc (key2.prv); + nano::block_hash latest1 (system.nodes[0]->latest (nano::test_genesis_key.pub)); + nano::send_block block2 (latest1, key2.pub, 50, nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (latest1)); + auto hash2 (block2.hash ()); + nano::block_hash latest2 (system.nodes[1]->latest (nano::test_genesis_key.pub)); + system.nodes[1]->process_active (std::make_shared (block2)); + system.deadline_set (10s); + while (system.nodes[0]->stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in) == 0) + { + ASSERT_NO_ERROR (system.poll ()); + } + ASSERT_NE (hash2, latest2); + system.deadline_set (10s); + while (system.nodes[1]->latest (nano::test_genesis_key.pub) == latest2) + { + ASSERT_NO_ERROR (system.poll ()); + } + ASSERT_EQ (50, system.nodes[1]->balance (nano::test_genesis_key.pub)); } - ASSERT_EQ (50, system.nodes[1]->balance (nano::test_genesis_key.pub)); } TEST (network, send_insufficient_work) @@ -308,34 +372,38 @@ TEST (receivable_processor, confirm_sufficient_pos) TEST (receivable_processor, send_with_receive) { - auto amount (std::numeric_limits::max ()); - nano::system system (24000, 2); - nano::keypair key2; - system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv); - nano::block_hash latest1 (system.nodes[0]->latest (nano::test_genesis_key.pub)); - system.wallet (1)->insert_adhoc (key2.prv); - auto block1 (std::make_shared (latest1, key2.pub, amount - system.nodes[0]->config.receive_minimum.number (), nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (latest1))); - ASSERT_EQ (amount, system.nodes[0]->balance (nano::test_genesis_key.pub)); - ASSERT_EQ (0, system.nodes[0]->balance (key2.pub)); - ASSERT_EQ (amount, system.nodes[1]->balance (nano::test_genesis_key.pub)); - ASSERT_EQ (0, system.nodes[1]->balance (key2.pub)); - system.nodes[0]->process_active (block1); - system.nodes[0]->block_processor.flush (); - system.nodes[1]->process_active (block1); - system.nodes[1]->block_processor.flush (); - ASSERT_EQ (amount - system.nodes[0]->config.receive_minimum.number (), system.nodes[0]->balance (nano::test_genesis_key.pub)); - ASSERT_EQ (0, system.nodes[0]->balance (key2.pub)); - ASSERT_EQ (amount - system.nodes[0]->config.receive_minimum.number (), system.nodes[1]->balance (nano::test_genesis_key.pub)); - ASSERT_EQ (0, system.nodes[1]->balance (key2.pub)); - system.deadline_set (10s); - while (system.nodes[0]->balance (key2.pub) != system.nodes[0]->config.receive_minimum.number () || system.nodes[1]->balance (key2.pub) != system.nodes[0]->config.receive_minimum.number ()) - { - ASSERT_NO_ERROR (system.poll ()); + std::vector types{ nano::transport::transport_type::tcp, nano::transport::transport_type::udp }; + for (auto & type : types) + { + nano::system system (24000, 2, type); + auto amount (std::numeric_limits::max ()); + nano::keypair key2; + system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv); + nano::block_hash latest1 (system.nodes[0]->latest (nano::test_genesis_key.pub)); + system.wallet (1)->insert_adhoc (key2.prv); + auto block1 (std::make_shared (latest1, key2.pub, amount - system.nodes[0]->config.receive_minimum.number (), nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (latest1))); + ASSERT_EQ (amount, system.nodes[0]->balance (nano::test_genesis_key.pub)); + ASSERT_EQ (0, system.nodes[0]->balance (key2.pub)); + ASSERT_EQ (amount, system.nodes[1]->balance (nano::test_genesis_key.pub)); + ASSERT_EQ (0, system.nodes[1]->balance (key2.pub)); + system.nodes[0]->process_active (block1); + system.nodes[0]->block_processor.flush (); + system.nodes[1]->process_active (block1); + system.nodes[1]->block_processor.flush (); + ASSERT_EQ (amount - system.nodes[0]->config.receive_minimum.number (), system.nodes[0]->balance (nano::test_genesis_key.pub)); + ASSERT_EQ (0, system.nodes[0]->balance (key2.pub)); + ASSERT_EQ (amount - system.nodes[0]->config.receive_minimum.number (), system.nodes[1]->balance (nano::test_genesis_key.pub)); + ASSERT_EQ (0, system.nodes[1]->balance (key2.pub)); + system.deadline_set (10s); + while (system.nodes[0]->balance (key2.pub) != system.nodes[0]->config.receive_minimum.number () || system.nodes[1]->balance (key2.pub) != system.nodes[0]->config.receive_minimum.number ()) + { + ASSERT_NO_ERROR (system.poll ()); + } + ASSERT_EQ (amount - system.nodes[0]->config.receive_minimum.number (), system.nodes[0]->balance (nano::test_genesis_key.pub)); + ASSERT_EQ (system.nodes[0]->config.receive_minimum.number (), system.nodes[0]->balance (key2.pub)); + ASSERT_EQ (amount - system.nodes[0]->config.receive_minimum.number (), system.nodes[1]->balance (nano::test_genesis_key.pub)); + ASSERT_EQ (system.nodes[0]->config.receive_minimum.number (), system.nodes[1]->balance (key2.pub)); } - ASSERT_EQ (amount - system.nodes[0]->config.receive_minimum.number (), system.nodes[0]->balance (nano::test_genesis_key.pub)); - ASSERT_EQ (system.nodes[0]->config.receive_minimum.number (), system.nodes[0]->balance (key2.pub)); - ASSERT_EQ (amount - system.nodes[0]->config.receive_minimum.number (), system.nodes[1]->balance (nano::test_genesis_key.pub)); - ASSERT_EQ (system.nodes[0]->config.receive_minimum.number (), system.nodes[1]->balance (key2.pub)); } TEST (network, receive_weight_change) @@ -1161,7 +1229,10 @@ TEST (network, endpoint_bad_fd) TEST (network, reserved_address) { nano::system system (24000, 1); - ASSERT_FALSE (nano::transport::reserved_address (nano::endpoint (boost::asio::ip::address_v6::from_string ("2001::"), 0))); + // 0 port test + ASSERT_TRUE (nano::transport::reserved_address (nano::endpoint (boost::asio::ip::address_v6::from_string ("2001::"), 0))); + // Valid address test + ASSERT_FALSE (nano::transport::reserved_address (nano::endpoint (boost::asio::ip::address_v6::from_string ("2001::"), 1))); nano::endpoint loopback (boost::asio::ip::address_v6::from_string ("::1"), 1); ASSERT_FALSE (nano::transport::reserved_address (loopback)); nano::endpoint private_network_peer (boost::asio::ip::address_v6::from_string ("::ffff:10.0.0.0"), 1); @@ -1384,14 +1455,16 @@ TEST (bulk_pull_account, basics) } } -TEST (bootstrap, keepalive) +TEST (bootstrap, tcp_node_id_handshake) { nano::system system (24000, 1); auto socket (std::make_shared (system.nodes[0])); - nano::keepalive keepalive; - auto input (keepalive.to_bytes ()); + auto bootstrap_endpoint (system.nodes[0]->bootstrap.endpoint ()); + auto cookie (system.nodes[0]->network.udp_channels.assign_syn_cookie (nano::transport::map_tcp_to_endpoint (bootstrap_endpoint))); + nano::node_id_handshake node_id_handshake (cookie, boost::none); + auto input (node_id_handshake.to_bytes ()); std::atomic write_done (false); - socket->async_connect (system.nodes[0]->bootstrap.endpoint (), [&input, socket, &write_done](boost::system::error_code const & ec) { + socket->async_connect (bootstrap_endpoint, [&input, socket, &write_done](boost::system::error_code const & ec) { ASSERT_FALSE (ec); socket->async_write (input, [&input, &write_done](boost::system::error_code const & ec, size_t size_a) { ASSERT_FALSE (ec); @@ -1406,7 +1479,9 @@ TEST (bootstrap, keepalive) ASSERT_NO_ERROR (system.poll ()); } - auto output (keepalive.to_bytes ()); + boost::optional> response_zero (std::make_pair (nano::account (0), nano::signature (0))); + nano::node_id_handshake node_id_handshake_response (boost::none, response_zero); + auto output (node_id_handshake_response.to_bytes ()); std::atomic done (false); socket->async_read (output, output->size (), [&output, &done](boost::system::error_code const & ec, size_t size_a) { ASSERT_FALSE (ec); @@ -2010,7 +2085,7 @@ TEST (confirmation_height, conflict_rollback_cemented) } auto rollback_log_entry = boost::str (boost::format ("Failed to roll back %1%") % send2->hash ().to_string ()); - system.deadline_set (10s); + system.deadline_set (20s); auto done (false); while (!done) { @@ -2056,14 +2131,15 @@ TEST (bootstrap, tcp_listener_timeout_empty) } } -TEST (bootstrap, tcp_listener_timeout_keepalive) +TEST (bootstrap, tcp_listener_timeout_node_id_handshake) { nano::system system (24000, 1); auto node0 (system.nodes[0]); node0->config.tcp_idle_timeout = std::chrono::seconds (1); auto socket (std::make_shared (node0)); - nano::keepalive keepalive; - auto input (keepalive.to_bytes ()); + auto cookie (node0->network.tcp_channels.assign_syn_cookie (node0->bootstrap.endpoint ())); + nano::node_id_handshake node_id_handshake (cookie, boost::none); + auto input (node_id_handshake.to_bytes ()); socket->async_connect (node0->bootstrap.endpoint (), [&input, socket](boost::system::error_code const & ec) { ASSERT_FALSE (ec); socket->async_write (input, [&input](boost::system::error_code const & ec, size_t size_a) { @@ -2072,7 +2148,7 @@ TEST (bootstrap, tcp_listener_timeout_keepalive) }); }); system.deadline_set (std::chrono::seconds (5)); - while (node0->stats.count (nano::stat::type::message, nano::stat::detail::keepalive) == 0) + while (node0->stats.count (nano::stat::type::message, nano::stat::detail::node_id_handshake) == 0) { ASSERT_NO_ERROR (system.poll ()); } @@ -2107,7 +2183,7 @@ TEST (network, replace_port) channel->set_node_id (node1->node_id.pub); } } - auto peers_list (system.nodes[0]->network.udp_channels.list (std::numeric_limits::max ())); + auto peers_list (system.nodes[0]->network.list (std::numeric_limits::max ())); ASSERT_EQ (peers_list[0]->get_node_id ().get (), node1->node_id.pub); auto channel (std::make_shared (system.nodes[0]->network.udp_channels, node1->network.endpoint ())); system.nodes[0]->network.send_keepalive (channel); @@ -2122,9 +2198,9 @@ TEST (network, replace_port) ASSERT_NO_ERROR (system.poll ()); } ASSERT_EQ (system.nodes[0]->network.udp_channels.size (), 1); - auto list1 (system.nodes[0]->network.udp_channels.list (1)); + auto list1 (system.nodes[0]->network.list (1)); ASSERT_EQ (node1->network.endpoint (), list1[0]->get_endpoint ()); - auto list2 (node1->network.udp_channels.list (1)); + auto list2 (node1->network.list (1)); ASSERT_EQ (system.nodes[0]->network.endpoint (), list2[0]->get_endpoint ()); // Remove correct peer (same node ID) system.nodes[0]->network.udp_channels.clean_node_id (nano::endpoint (node1->network.endpoint ().address (), 23000), node1->node_id.pub); diff --git a/nano/core_test/node.cpp b/nano/core_test/node.cpp index 7f79976ff1..2540632154 100644 --- a/nano/core_test/node.cpp +++ b/nano/core_test/node.cpp @@ -703,6 +703,7 @@ TEST (node_config, v16_v17_upgrade) ASSERT_FALSE (tree.get_optional_child ("pow_sleep_interval")); ASSERT_FALSE (tree.get_optional_child ("external_address")); ASSERT_FALSE (tree.get_optional_child ("external_port")); + ASSERT_FALSE (tree.get_optional_child ("tcp_incoming_connections_max")); ASSERT_FALSE (tree.get_optional_child ("diagnostics")); config.deserialize_json (upgraded, tree); @@ -712,6 +713,7 @@ TEST (node_config, v16_v17_upgrade) ASSERT_TRUE (!!tree.get_optional_child ("pow_sleep_interval")); ASSERT_TRUE (!!tree.get_optional_child ("external_address")); ASSERT_TRUE (!!tree.get_optional_child ("external_port")); + ASSERT_TRUE (!!tree.get_optional_child ("tcp_incoming_connections_max")); ASSERT_TRUE (!!tree.get_optional_child ("diagnostics")); ASSERT_TRUE (upgraded); @@ -738,6 +740,7 @@ TEST (node_config, v17_values) tree.put ("pow_sleep_interval", 0); tree.put ("external_address", "::1"); tree.put ("external_port", 0); + tree.put ("tcp_incoming_connections_max", 1); nano::jsonconfig txn_tracking_l; txn_tracking_l.put ("enable", false); txn_tracking_l.put ("min_read_txn_time", 0); @@ -755,6 +758,7 @@ TEST (node_config, v17_values) ASSERT_EQ (config.pow_sleep_interval.count (), 0); ASSERT_EQ (config.external_address, boost::asio::ip::address_v6::from_string ("::1")); ASSERT_EQ (config.external_port, 0); + ASSERT_EQ (config.tcp_incoming_connections_max, 1); ASSERT_FALSE (config.diagnostics_config.txn_tracking.enable); ASSERT_EQ (config.diagnostics_config.txn_tracking.min_read_txn_time.count (), 0); ASSERT_EQ (config.diagnostics_config.txn_tracking.min_write_txn_time.count (), 0); @@ -766,6 +770,7 @@ TEST (node_config, v17_values) tree.put ("pow_sleep_interval", std::numeric_limits::max () - 100); tree.put ("external_address", "::ffff:192.168.1.1"); tree.put ("external_port", std::numeric_limits::max () - 1); + tree.put ("tcp_incoming_connections_max", std::numeric_limits::max ()); nano::jsonconfig txn_tracking_l; txn_tracking_l.put ("enable", true); txn_tracking_l.put ("min_read_txn_time", 1234); @@ -783,8 +788,10 @@ TEST (node_config, v17_values) ASSERT_EQ (config.pow_sleep_interval.count (), std::numeric_limits::max () - 100); ASSERT_EQ (config.external_address, boost::asio::ip::address_v6::from_string ("::ffff:192.168.1.1")); ASSERT_EQ (config.external_port, std::numeric_limits::max () - 1); + ASSERT_EQ (config.tcp_incoming_connections_max, std::numeric_limits::max ()); ASSERT_TRUE (config.diagnostics_config.txn_tracking.enable); ASSERT_EQ (config.diagnostics_config.txn_tracking.min_read_txn_time.count (), 1234); + ASSERT_EQ (config.tcp_incoming_connections_max, std::numeric_limits::max ()); ASSERT_EQ (config.diagnostics_config.txn_tracking.min_write_txn_time.count (), std::numeric_limits::max ()); ASSERT_FALSE (config.diagnostics_config.txn_tracking.ignore_writes_below_block_processor_max_time); } @@ -1456,75 +1463,79 @@ TEST (node, DISABLED_fork_stale) TEST (node, broadcast_elected) { - nano::system system (24000, 3); - auto node0 (system.nodes[0]); - auto node1 (system.nodes[1]); - auto node2 (system.nodes[2]); - nano::keypair rep_big; - nano::keypair rep_small; - nano::keypair rep_other; - //std::cerr << "Big: " << rep_big.pub.to_account () << std::endl; - //std::cerr << "Small: " << rep_small.pub.to_account () << std::endl; - //std::cerr << "Other: " << rep_other.pub.to_account () << std::endl; - { - auto transaction0 (node0->store.tx_begin_write ()); - auto transaction1 (node1->store.tx_begin_write ()); - auto transaction2 (node2->store.tx_begin_write ()); - nano::send_block fund_big (node0->ledger.latest (transaction0, nano::test_genesis_key.pub), rep_big.pub, nano::Gxrb_ratio * 5, nano::test_genesis_key.prv, nano::test_genesis_key.pub, 0); - nano::open_block open_big (fund_big.hash (), rep_big.pub, rep_big.pub, rep_big.prv, rep_big.pub, 0); - nano::send_block fund_small (fund_big.hash (), rep_small.pub, nano::Gxrb_ratio * 2, nano::test_genesis_key.prv, nano::test_genesis_key.pub, 0); - nano::open_block open_small (fund_small.hash (), rep_small.pub, rep_small.pub, rep_small.prv, rep_small.pub, 0); - nano::send_block fund_other (fund_small.hash (), rep_other.pub, nano::Gxrb_ratio * 1, nano::test_genesis_key.prv, nano::test_genesis_key.pub, 0); - nano::open_block open_other (fund_other.hash (), rep_other.pub, rep_other.pub, rep_other.prv, rep_other.pub, 0); - node0->work_generate_blocking (fund_big); - node0->work_generate_blocking (open_big); - node0->work_generate_blocking (fund_small); - node0->work_generate_blocking (open_small); - node0->work_generate_blocking (fund_other); - node0->work_generate_blocking (open_other); - ASSERT_EQ (nano::process_result::progress, node0->ledger.process (transaction0, fund_big).code); - ASSERT_EQ (nano::process_result::progress, node1->ledger.process (transaction1, fund_big).code); - ASSERT_EQ (nano::process_result::progress, node2->ledger.process (transaction2, fund_big).code); - ASSERT_EQ (nano::process_result::progress, node0->ledger.process (transaction0, open_big).code); - ASSERT_EQ (nano::process_result::progress, node1->ledger.process (transaction1, open_big).code); - ASSERT_EQ (nano::process_result::progress, node2->ledger.process (transaction2, open_big).code); - ASSERT_EQ (nano::process_result::progress, node0->ledger.process (transaction0, fund_small).code); - ASSERT_EQ (nano::process_result::progress, node1->ledger.process (transaction1, fund_small).code); - ASSERT_EQ (nano::process_result::progress, node2->ledger.process (transaction2, fund_small).code); - ASSERT_EQ (nano::process_result::progress, node0->ledger.process (transaction0, open_small).code); - ASSERT_EQ (nano::process_result::progress, node1->ledger.process (transaction1, open_small).code); - ASSERT_EQ (nano::process_result::progress, node2->ledger.process (transaction2, open_small).code); - ASSERT_EQ (nano::process_result::progress, node0->ledger.process (transaction0, fund_other).code); - ASSERT_EQ (nano::process_result::progress, node1->ledger.process (transaction1, fund_other).code); - ASSERT_EQ (nano::process_result::progress, node2->ledger.process (transaction2, fund_other).code); - ASSERT_EQ (nano::process_result::progress, node0->ledger.process (transaction0, open_other).code); - ASSERT_EQ (nano::process_result::progress, node1->ledger.process (transaction1, open_other).code); - ASSERT_EQ (nano::process_result::progress, node2->ledger.process (transaction2, open_other).code); - } - system.wallet (0)->insert_adhoc (rep_big.prv); - system.wallet (1)->insert_adhoc (rep_small.prv); - system.wallet (2)->insert_adhoc (rep_other.prv); - auto fork0 (std::make_shared (node2->latest (nano::test_genesis_key.pub), rep_small.pub, 0, nano::test_genesis_key.prv, nano::test_genesis_key.pub, 0)); - node0->work_generate_blocking (*fork0); - node0->process_active (fork0); - node1->process_active (fork0); - auto fork1 (std::make_shared (node2->latest (nano::test_genesis_key.pub), rep_big.pub, 0, nano::test_genesis_key.prv, nano::test_genesis_key.pub, 0)); - node0->work_generate_blocking (*fork1); - system.wallet (2)->insert_adhoc (rep_small.prv); - node2->process_active (fork1); - //std::cerr << "fork0: " << fork_hash.to_string () << std::endl; - //std::cerr << "fork1: " << fork1.hash ().to_string () << std::endl; - while (!node0->ledger.block_exists (fork0->hash ()) || !node1->ledger.block_exists (fork0->hash ())) - { - ASSERT_NO_ERROR (system.poll ()); - } - system.deadline_set (50s); - while (!node2->ledger.block_exists (fork0->hash ())) - { - auto ec = system.poll (); - ASSERT_TRUE (node0->ledger.block_exists (fork0->hash ())); - ASSERT_TRUE (node1->ledger.block_exists (fork0->hash ())); - ASSERT_NO_ERROR (ec); + std::vector types{ nano::transport::transport_type::tcp, nano::transport::transport_type::udp }; + for (auto & type : types) + { + nano::system system (24000, 3, type); + auto node0 (system.nodes[0]); + auto node1 (system.nodes[1]); + auto node2 (system.nodes[2]); + nano::keypair rep_big; + nano::keypair rep_small; + nano::keypair rep_other; + //std::cerr << "Big: " << rep_big.pub.to_account () << std::endl; + //std::cerr << "Small: " << rep_small.pub.to_account () << std::endl; + //std::cerr << "Other: " << rep_other.pub.to_account () << std::endl; + { + auto transaction0 (node0->store.tx_begin_write ()); + auto transaction1 (node1->store.tx_begin_write ()); + auto transaction2 (node2->store.tx_begin_write ()); + nano::send_block fund_big (node0->ledger.latest (transaction0, nano::test_genesis_key.pub), rep_big.pub, nano::Gxrb_ratio * 5, nano::test_genesis_key.prv, nano::test_genesis_key.pub, 0); + nano::open_block open_big (fund_big.hash (), rep_big.pub, rep_big.pub, rep_big.prv, rep_big.pub, 0); + nano::send_block fund_small (fund_big.hash (), rep_small.pub, nano::Gxrb_ratio * 2, nano::test_genesis_key.prv, nano::test_genesis_key.pub, 0); + nano::open_block open_small (fund_small.hash (), rep_small.pub, rep_small.pub, rep_small.prv, rep_small.pub, 0); + nano::send_block fund_other (fund_small.hash (), rep_other.pub, nano::Gxrb_ratio * 1, nano::test_genesis_key.prv, nano::test_genesis_key.pub, 0); + nano::open_block open_other (fund_other.hash (), rep_other.pub, rep_other.pub, rep_other.prv, rep_other.pub, 0); + node0->work_generate_blocking (fund_big); + node0->work_generate_blocking (open_big); + node0->work_generate_blocking (fund_small); + node0->work_generate_blocking (open_small); + node0->work_generate_blocking (fund_other); + node0->work_generate_blocking (open_other); + ASSERT_EQ (nano::process_result::progress, node0->ledger.process (transaction0, fund_big).code); + ASSERT_EQ (nano::process_result::progress, node1->ledger.process (transaction1, fund_big).code); + ASSERT_EQ (nano::process_result::progress, node2->ledger.process (transaction2, fund_big).code); + ASSERT_EQ (nano::process_result::progress, node0->ledger.process (transaction0, open_big).code); + ASSERT_EQ (nano::process_result::progress, node1->ledger.process (transaction1, open_big).code); + ASSERT_EQ (nano::process_result::progress, node2->ledger.process (transaction2, open_big).code); + ASSERT_EQ (nano::process_result::progress, node0->ledger.process (transaction0, fund_small).code); + ASSERT_EQ (nano::process_result::progress, node1->ledger.process (transaction1, fund_small).code); + ASSERT_EQ (nano::process_result::progress, node2->ledger.process (transaction2, fund_small).code); + ASSERT_EQ (nano::process_result::progress, node0->ledger.process (transaction0, open_small).code); + ASSERT_EQ (nano::process_result::progress, node1->ledger.process (transaction1, open_small).code); + ASSERT_EQ (nano::process_result::progress, node2->ledger.process (transaction2, open_small).code); + ASSERT_EQ (nano::process_result::progress, node0->ledger.process (transaction0, fund_other).code); + ASSERT_EQ (nano::process_result::progress, node1->ledger.process (transaction1, fund_other).code); + ASSERT_EQ (nano::process_result::progress, node2->ledger.process (transaction2, fund_other).code); + ASSERT_EQ (nano::process_result::progress, node0->ledger.process (transaction0, open_other).code); + ASSERT_EQ (nano::process_result::progress, node1->ledger.process (transaction1, open_other).code); + ASSERT_EQ (nano::process_result::progress, node2->ledger.process (transaction2, open_other).code); + } + system.wallet (0)->insert_adhoc (rep_big.prv); + system.wallet (1)->insert_adhoc (rep_small.prv); + system.wallet (2)->insert_adhoc (rep_other.prv); + auto fork0 (std::make_shared (node2->latest (nano::test_genesis_key.pub), rep_small.pub, 0, nano::test_genesis_key.prv, nano::test_genesis_key.pub, 0)); + node0->work_generate_blocking (*fork0); + node0->process_active (fork0); + node1->process_active (fork0); + auto fork1 (std::make_shared (node2->latest (nano::test_genesis_key.pub), rep_big.pub, 0, nano::test_genesis_key.prv, nano::test_genesis_key.pub, 0)); + node0->work_generate_blocking (*fork1); + system.wallet (2)->insert_adhoc (rep_small.prv); + node2->process_active (fork1); + //std::cerr << "fork0: " << fork_hash.to_string () << std::endl; + //std::cerr << "fork1: " << fork1.hash ().to_string () << std::endl; + while (!node0->ledger.block_exists (fork0->hash ()) || !node1->ledger.block_exists (fork0->hash ())) + { + ASSERT_NO_ERROR (system.poll ()); + } + system.deadline_set (50s); + while (!node2->ledger.block_exists (fork0->hash ())) + { + auto ec = system.poll (); + ASSERT_TRUE (node0->ledger.block_exists (fork0->hash ())); + ASSERT_TRUE (node1->ledger.block_exists (fork0->hash ())); + ASSERT_NO_ERROR (ec); + } } } @@ -1970,35 +1981,39 @@ TEST (node, online_reps) TEST (node, block_confirm) { - nano::system system (24000, 2); - nano::genesis genesis; - nano::keypair key; - system.wallet (1)->insert_adhoc (nano::test_genesis_key.prv); - auto send1 (std::make_shared (nano::test_genesis_key.pub, genesis.hash (), nano::test_genesis_key.pub, nano::genesis_amount - nano::Gxrb_ratio, key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.nodes[0]->work_generate_blocking (genesis.hash ()))); - system.nodes[0]->block_processor.add (send1, nano::seconds_since_epoch ()); - system.nodes[1]->block_processor.add (send1, nano::seconds_since_epoch ()); - system.deadline_set (std::chrono::seconds (5)); - while (!system.nodes[0]->ledger.block_exists (send1->hash ()) || !system.nodes[1]->ledger.block_exists (send1->hash ())) + std::vector types{ nano::transport::transport_type::tcp, nano::transport::transport_type::udp }; + for (auto & type : types) { - ASSERT_NO_ERROR (system.poll ()); - } - ASSERT_TRUE (system.nodes[0]->ledger.block_exists (send1->hash ())); - ASSERT_TRUE (system.nodes[1]->ledger.block_exists (send1->hash ())); - auto send2 (std::make_shared (nano::test_genesis_key.pub, send1->hash (), nano::test_genesis_key.pub, nano::genesis_amount - nano::Gxrb_ratio * 2, key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.nodes[0]->work_generate_blocking (send1->hash ()))); - { - auto transaction (system.nodes[0]->store.tx_begin_write ()); - ASSERT_EQ (nano::process_result::progress, system.nodes[0]->ledger.process (transaction, *send2).code); - } - { - auto transaction (system.nodes[1]->store.tx_begin_write ()); - ASSERT_EQ (nano::process_result::progress, system.nodes[1]->ledger.process (transaction, *send2).code); - } - system.nodes[0]->block_confirm (send2); - ASSERT_TRUE (system.nodes[0]->active.list_confirmed ().empty ()); - system.deadline_set (10s); - while (system.nodes[0]->active.list_confirmed ().empty ()) - { - ASSERT_NO_ERROR (system.poll ()); + nano::system system (24000, 2, type); + nano::genesis genesis; + nano::keypair key; + system.wallet (1)->insert_adhoc (nano::test_genesis_key.prv); + auto send1 (std::make_shared (nano::test_genesis_key.pub, genesis.hash (), nano::test_genesis_key.pub, nano::genesis_amount - nano::Gxrb_ratio, key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.nodes[0]->work_generate_blocking (genesis.hash ()))); + system.nodes[0]->block_processor.add (send1, nano::seconds_since_epoch ()); + system.nodes[1]->block_processor.add (send1, nano::seconds_since_epoch ()); + system.deadline_set (std::chrono::seconds (5)); + while (!system.nodes[0]->ledger.block_exists (send1->hash ()) || !system.nodes[1]->ledger.block_exists (send1->hash ())) + { + ASSERT_NO_ERROR (system.poll ()); + } + ASSERT_TRUE (system.nodes[0]->ledger.block_exists (send1->hash ())); + ASSERT_TRUE (system.nodes[1]->ledger.block_exists (send1->hash ())); + auto send2 (std::make_shared (nano::test_genesis_key.pub, send1->hash (), nano::test_genesis_key.pub, nano::genesis_amount - nano::Gxrb_ratio * 2, key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.nodes[0]->work_generate_blocking (send1->hash ()))); + { + auto transaction (system.nodes[0]->store.tx_begin_write ()); + ASSERT_EQ (nano::process_result::progress, system.nodes[0]->ledger.process (transaction, *send2).code); + } + { + auto transaction (system.nodes[1]->store.tx_begin_write ()); + ASSERT_EQ (nano::process_result::progress, system.nodes[1]->ledger.process (transaction, *send2).code); + } + system.nodes[0]->block_confirm (send2); + ASSERT_TRUE (system.nodes[0]->active.list_confirmed ().empty ()); + system.deadline_set (10s); + while (system.nodes[0]->active.list_confirmed ().empty ()) + { + ASSERT_NO_ERROR (system.poll ()); + } } } @@ -2173,43 +2188,47 @@ TEST (node, vote_republish) TEST (node, vote_by_hash_republish) { - nano::system system (24000, 2); - nano::keypair key2; - system.wallet (1)->insert_adhoc (key2.prv); - nano::genesis genesis; - auto send1 (std::make_shared (genesis.hash (), key2.pub, std::numeric_limits::max () - system.nodes[0]->config.receive_minimum.number (), nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (genesis.hash ()))); - auto send2 (std::make_shared (genesis.hash (), key2.pub, std::numeric_limits::max () - system.nodes[0]->config.receive_minimum.number () * 2, nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (genesis.hash ()))); - system.nodes[0]->process_active (send1); - system.deadline_set (5s); - while (!system.nodes[1]->block (send1->hash ())) - { - ASSERT_NO_ERROR (system.poll ()); - } - system.nodes[0]->active.publish (send2); - std::vector vote_blocks; - vote_blocks.push_back (send2->hash ()); - auto vote (std::make_shared (nano::test_genesis_key.pub, nano::test_genesis_key.prv, 0, vote_blocks)); - ASSERT_TRUE (system.nodes[0]->active.active (*send1)); - ASSERT_TRUE (system.nodes[1]->active.active (*send1)); - system.nodes[0]->vote_processor.vote (vote, std::make_shared (system.nodes[0]->network.udp_channels, system.nodes[0]->network.endpoint ())); - while (!system.nodes[0]->block (send2->hash ())) - { - ASSERT_NO_ERROR (system.poll ()); - } - while (!system.nodes[1]->block (send2->hash ())) - { - ASSERT_NO_ERROR (system.poll ()); - } - ASSERT_FALSE (system.nodes[0]->block (send1->hash ())); - ASSERT_FALSE (system.nodes[1]->block (send1->hash ())); - system.deadline_set (5s); - while (system.nodes[1]->balance (key2.pub) != system.nodes[0]->config.receive_minimum.number () * 2) + std::vector types{ nano::transport::transport_type::tcp, nano::transport::transport_type::udp }; + for (auto & type : types) { - ASSERT_NO_ERROR (system.poll ()); - } - while (system.nodes[0]->balance (key2.pub) != system.nodes[0]->config.receive_minimum.number () * 2) - { - ASSERT_NO_ERROR (system.poll ()); + nano::system system (24000, 2, type); + nano::keypair key2; + system.wallet (1)->insert_adhoc (key2.prv); + nano::genesis genesis; + auto send1 (std::make_shared (genesis.hash (), key2.pub, std::numeric_limits::max () - system.nodes[0]->config.receive_minimum.number (), nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (genesis.hash ()))); + auto send2 (std::make_shared (genesis.hash (), key2.pub, std::numeric_limits::max () - system.nodes[0]->config.receive_minimum.number () * 2, nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (genesis.hash ()))); + system.nodes[0]->process_active (send1); + system.deadline_set (5s); + while (!system.nodes[1]->block (send1->hash ())) + { + ASSERT_NO_ERROR (system.poll ()); + } + system.nodes[0]->active.publish (send2); + std::vector vote_blocks; + vote_blocks.push_back (send2->hash ()); + auto vote (std::make_shared (nano::test_genesis_key.pub, nano::test_genesis_key.prv, 0, vote_blocks)); + ASSERT_TRUE (system.nodes[0]->active.active (*send1)); + ASSERT_TRUE (system.nodes[1]->active.active (*send1)); + system.nodes[0]->vote_processor.vote (vote, std::make_shared (system.nodes[0]->network.udp_channels, system.nodes[0]->network.endpoint ())); + while (!system.nodes[0]->block (send2->hash ())) + { + ASSERT_NO_ERROR (system.poll ()); + } + while (!system.nodes[1]->block (send2->hash ())) + { + ASSERT_NO_ERROR (system.poll ()); + } + ASSERT_FALSE (system.nodes[0]->block (send1->hash ())); + ASSERT_FALSE (system.nodes[1]->block (send1->hash ())); + system.deadline_set (5s); + while (system.nodes[1]->balance (key2.pub) != system.nodes[0]->config.receive_minimum.number () * 2) + { + ASSERT_NO_ERROR (system.poll ()); + } + while (system.nodes[0]->balance (key2.pub) != system.nodes[0]->config.receive_minimum.number () * 2) + { + ASSERT_NO_ERROR (system.poll ()); + } } } @@ -2473,18 +2492,20 @@ TEST (node, peers) node->start (); system.deadline_set (10s); - while (system.nodes.back ()->network.empty ()) + while (system.nodes.back ()->network.empty () || system.nodes.front ()->network.empty ()) { ASSERT_NO_ERROR (system.poll ()); } // Confirm that the peers match with the endpoints we are expecting ASSERT_EQ (1, system.nodes.front ()->network.size ()); - auto list1 (system.nodes[0]->network.udp_channels.list (2)); + auto list1 (system.nodes[0]->network.list (2)); ASSERT_EQ (system.nodes[1]->network.endpoint (), list1[0]->get_endpoint ()); + ASSERT_EQ (nano::transport::transport_type::tcp, list1[0]->get_type ()); ASSERT_EQ (1, node->network.size ()); - auto list2 (system.nodes[1]->network.udp_channels.list (2)); + auto list2 (system.nodes[1]->network.list (2)); ASSERT_EQ (system.nodes[0]->network.endpoint (), list2[0]->get_endpoint ()); + ASSERT_EQ (nano::transport::transport_type::tcp, list2[0]->get_type ()); // Stop the peer node and check that it is removed from the store system.nodes.front ()->stop (); @@ -2528,7 +2549,7 @@ TEST (node, peer_cache_restart) ASSERT_NO_ERROR (system.poll ()); } // Confirm that the peers match with the endpoints we are expecting - auto list (node->network.udp_channels.list (2)); + auto list (node->network.list (2)); ASSERT_EQ (system.nodes[0]->network.endpoint (), list[0]->get_endpoint ()); ASSERT_EQ (1, node->network.size ()); node->stop (); @@ -2552,7 +2573,7 @@ TEST (node, peer_cache_restart) ASSERT_NO_ERROR (system.poll ()); } // Confirm that the peers match with the endpoints we are expecting - auto list (node->network.udp_channels.list (2)); + auto list (node->network.list (2)); ASSERT_EQ (system.nodes[0]->network.endpoint (), list[0]->get_endpoint ()); ASSERT_EQ (1, node->network.size ()); node->stop (); diff --git a/nano/core_test/peer_container.cpp b/nano/core_test/peer_container.cpp index 85544356c0..e802073d84 100644 --- a/nano/core_test/peer_container.cpp +++ b/nano/core_test/peer_container.cpp @@ -71,20 +71,20 @@ TEST (peer_container, split) system.nodes[0]->network.cleanup (now); ASSERT_EQ (1, system.nodes[0]->network.size ()); ASSERT_EQ (1, system.nodes[0]->network.udp_channels.size ()); - auto list (system.nodes[0]->network.udp_channels.list (1)); + auto list (system.nodes[0]->network.list (1)); ASSERT_EQ (endpoint2, list[0]->get_endpoint ()); } -TEST (udp_channels, fill_random_clear) +TEST (channels, fill_random_clear) { nano::system system (24000, 1); std::array target; std::fill (target.begin (), target.end (), nano::endpoint (boost::asio::ip::address_v6::loopback (), 10000)); - system.nodes[0]->network.udp_channels.random_fill (target); + system.nodes[0]->network.random_fill (target); ASSERT_TRUE (std::all_of (target.begin (), target.end (), [](nano::endpoint const & endpoint_a) { return endpoint_a == nano::endpoint (boost::asio::ip::address_v6::any (), 0); })); } -TEST (udp_channels, fill_random_full) +TEST (channels, fill_random_full) { nano::system system (24000, 1); for (auto i (0); i < 100; ++i) @@ -93,11 +93,11 @@ TEST (udp_channels, fill_random_full) } std::array target; std::fill (target.begin (), target.end (), nano::endpoint (boost::asio::ip::address_v6::loopback (), 10000)); - system.nodes[0]->network.udp_channels.random_fill (target); + system.nodes[0]->network.random_fill (target); ASSERT_TRUE (std::none_of (target.begin (), target.end (), [](nano::endpoint const & endpoint_a) { return endpoint_a == nano::endpoint (boost::asio::ip::address_v6::loopback (), 10000); })); } -TEST (udp_channels, fill_random_part) +TEST (channels, fill_random_part) { nano::system system (24000, 1); std::array target; @@ -107,7 +107,7 @@ TEST (udp_channels, fill_random_part) system.nodes[0]->network.udp_channels.insert (nano::endpoint (boost::asio::ip::address_v6::loopback (), i + 1), 0); } std::fill (target.begin (), target.end (), nano::endpoint (boost::asio::ip::address_v6::loopback (), 10000)); - system.nodes[0]->network.udp_channels.random_fill (target); + system.nodes[0]->network.random_fill (target); ASSERT_TRUE (std::none_of (target.begin (), target.begin () + half, [](nano::endpoint const & endpoint_a) { return endpoint_a == nano::endpoint (boost::asio::ip::address_v6::loopback (), 10000); })); ASSERT_TRUE (std::none_of (target.begin (), target.begin () + half, [](nano::endpoint const & endpoint_a) { return endpoint_a == nano::endpoint (boost::asio::ip::address_v6::loopback (), 0); })); ASSERT_TRUE (std::all_of (target.begin () + half, target.end (), [](nano::endpoint const & endpoint_a) { return endpoint_a == nano::endpoint (boost::asio::ip::address_v6::any (), 0); })); @@ -116,13 +116,13 @@ TEST (udp_channels, fill_random_part) TEST (peer_container, list_fanout) { nano::system system (24000, 1); - auto list1 (system.nodes[0]->network.udp_channels.list_fanout ()); + auto list1 (system.nodes[0]->network.list_fanout ()); ASSERT_TRUE (list1.empty ()); for (auto i (0); i < 1000; ++i) { ASSERT_NE (nullptr, system.nodes[0]->network.udp_channels.insert (nano::endpoint (boost::asio::ip::address_v6::loopback (), 10000 + i), nano::protocol_version)); } - auto list2 (system.nodes[0]->network.udp_channels.list_fanout ()); + auto list2 (system.nodes[0]->network.list_fanout ()); ASSERT_EQ (32, list2.size ()); } diff --git a/nano/core_test/wallets.cpp b/nano/core_test/wallets.cpp index 2267d6230a..38b0395ec2 100644 --- a/nano/core_test/wallets.cpp +++ b/nano/core_test/wallets.cpp @@ -148,6 +148,7 @@ TEST (wallets, reload) ASSERT_FALSE (error); ASSERT_EQ (1, system.nodes[0]->wallets.items.size ()); { + std::lock_guard lock_wallet (system.nodes[0]->wallets.mutex); nano::inactive_node node (system.nodes[0]->application_path, 24001); auto wallet (node.node->wallets.create (one)); ASSERT_NE (wallet, nullptr); diff --git a/nano/nano_node/entry.cpp b/nano/nano_node/entry.cpp index 6765dfb32c..067ad1de0c 100644 --- a/nano/nano_node/entry.cpp +++ b/nano/nano_node/entry.cpp @@ -71,7 +71,7 @@ int main (int argc, char * const * argv) ("disable_lazy_bootstrap", "Disables lazy bootstrap") ("disable_legacy_bootstrap", "Disables legacy bootstrap") ("disable_wallet_bootstrap", "Disables wallet lazy bootstrap") - ("disable_bootstrap_listener", "Disables bootstrap listener (incoming connections)") + ("disable_bootstrap_listener", "Disables bootstrap processing for TCP listener (not including realtime network TCP connections)") ("disable_unchecked_cleanup", "Disables periodic cleanup of old records from unchecked table") ("disable_unchecked_drop", "Disables drop of unchecked table at startup") ("fast_bootstrap", "Increase bootstrap speed for high end nodes with higher limits") diff --git a/nano/node/bootstrap.cpp b/nano/node/bootstrap.cpp index 12f9e6ffc5..5db1f54aa6 100644 --- a/nano/node/bootstrap.cpp +++ b/nano/node/bootstrap.cpp @@ -4,6 +4,7 @@ #include #include #include +#include #include #include @@ -1086,9 +1087,8 @@ void nano::bootstrap_attempt::populate_connections () // Not many peers respond, need to try to make more connections than we need. for (auto i = 0u; i < delta; i++) { - auto peer (node->network.udp_channels.tcp_peer ()); - auto endpoint (nano::tcp_endpoint (peer.address (), peer.port ())); - if (peer != nano::endpoint (boost::asio::ip::address_v6::any (), 0) && endpoints.find (endpoint) == endpoints.end ()) + auto endpoint (node->network.bootstrap_peer ()); + if (endpoint != nano::tcp_endpoint (boost::asio::ip::address_v6::any (), 0) && endpoints.find (endpoint) == endpoints.end ()) { connect_client (endpoint); std::lock_guard lock (mutex); @@ -1817,12 +1817,12 @@ port (port_a) void nano::bootstrap_listener::start () { - listening_socket = std::make_shared (node.shared (), boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v6::any (), port), node.config.bootstrap_connections_max); + listening_socket = std::make_shared (node.shared (), boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v6::any (), port), node.config.tcp_incoming_connections_max); boost::system::error_code ec; listening_socket->start (ec); if (ec) { - node.logger.try_log (boost::str (boost::format ("Error while binding for bootstrap on port %1%: %2%") % listening_socket->listening_port () % ec.message ())); + node.logger.try_log (boost::str (boost::format ("Error while binding for incoming TCP/bootstrap on port %1%: %2%") % listening_socket->listening_port () % ec.message ())); throw std::runtime_error (ec.message ()); } listening_socket->on_connection ([this](std::shared_ptr new_connection, boost::system::error_code const & ec_a) { @@ -1830,7 +1830,7 @@ void nano::bootstrap_listener::start () if (ec_a) { keep_accepting = false; - this->node.logger.try_log (boost::str (boost::format ("Error while accepting bootstrap connections: %1%") % ec_a.message ())); + this->node.logger.try_log (boost::str (boost::format ("Error while accepting incoming TCP/bootstrap connections: %1%") % ec_a.message ())); } else { @@ -1891,12 +1891,35 @@ nano::bootstrap_server::~bootstrap_server () { if (node->config.logging.bulk_pull_logging ()) { - node->logger.try_log ("Exiting bootstrap server"); + node->logger.try_log ("Exiting incoming TCP/bootstrap server"); } + if (bootstrap_connection) + { + --node->bootstrap.bootstrap_count; + } + if (node_id_handshake_finished) + { + --node->bootstrap.realtime_count; + node->network.remove_response_channel (remote_endpoint); + } + stop (); std::lock_guard lock (node->bootstrap.mutex); node->bootstrap.connections.erase (this); } +void nano::bootstrap_server::stop () +{ + if (!stopped) + { + stopped = true; + std::lock_guard lock (mutex); + if (socket != nullptr) + { + socket->close (); + } + } +} + nano::bootstrap_server::bootstrap_server (std::shared_ptr socket_a, std::shared_ptr node_a) : receive_buffer (std::make_shared> ()), socket (socket_a), @@ -1909,6 +1932,12 @@ void nano::bootstrap_server::receive () { auto this_l (shared_from_this ()); socket->async_read (receive_buffer, 8, [this_l](boost::system::error_code const & ec, size_t size_a) { + // Set remote_endpoint + if (this_l->remote_endpoint.port () == 0) + { + this_l->remote_endpoint = this_l->socket->remote_endpoint (); + } + // Receive header this_l->receive_header_action (ec, size_a); }); } @@ -1928,7 +1957,6 @@ void nano::bootstrap_server::receive_header_action (boost::system::error_code co case nano::message_type::bulk_pull: { node->stats.inc (nano::stat::type::bootstrap, nano::stat::detail::bulk_pull, nano::stat::dir::in); - auto this_l (shared_from_this ()); socket->async_read (receive_buffer, header.payload_length_bytes (), [this_l, header](boost::system::error_code const & ec, size_t size_a) { this_l->receive_bulk_pull_action (ec, size_a, header); @@ -1956,7 +1984,10 @@ void nano::bootstrap_server::receive_header_action (boost::system::error_code co case nano::message_type::bulk_push: { node->stats.inc (nano::stat::type::bootstrap, nano::stat::detail::bulk_push, nano::stat::dir::in); - add_request (std::unique_ptr (new nano::bulk_push (header))); + if (is_bootstrap_connection ()) + { + add_request (std::unique_ptr (new nano::bulk_push (header))); + } break; } case nano::message_type::keepalive: @@ -2032,7 +2063,10 @@ void nano::bootstrap_server::receive_bulk_pull_action (boost::system::error_code { node->logger.try_log (boost::str (boost::format ("Received bulk pull for %1% down to %2%, maximum of %3%") % request->start.to_string () % request->end.to_string () % (request->count ? request->count : std::numeric_limits::infinity ()))); } - add_request (std::unique_ptr (request.release ())); + if (is_bootstrap_connection ()) + { + add_request (std::unique_ptr (request.release ())); + } receive (); } } @@ -2052,7 +2086,10 @@ void nano::bootstrap_server::receive_bulk_pull_account_action (boost::system::er { node->logger.try_log (boost::str (boost::format ("Received bulk pull account for %1% with a minimum amount of %2%") % request->account.to_account () % nano::amount (request->minimum_amount).format_balance (nano::Mxrb_ratio, 10, true))); } - add_request (std::unique_ptr (request.release ())); + if (is_bootstrap_connection ()) + { + add_request (std::unique_ptr (request.release ())); + } receive (); } } @@ -2071,7 +2108,10 @@ void nano::bootstrap_server::receive_frontier_req_action (boost::system::error_c { node->logger.try_log (boost::str (boost::format ("Received frontier request for %1% with age %2%") % request->start.to_string () % request->age)); } - add_request (std::unique_ptr (request.release ())); + if (is_bootstrap_connection ()) + { + add_request (std::unique_ptr (request.release ())); + } receive (); } } @@ -2093,7 +2133,10 @@ void nano::bootstrap_server::receive_keepalive_action (boost::system::error_code std::unique_ptr request (new nano::keepalive (error, stream, header_a)); if (!error) { - add_request (std::unique_ptr (request.release ())); + if (node_id_handshake_finished) + { + add_request (std::unique_ptr (request.release ())); + } receive (); } } @@ -2115,7 +2158,10 @@ void nano::bootstrap_server::receive_publish_action (boost::system::error_code c std::unique_ptr request (new nano::publish (error, stream, header_a)); if (!error) { - add_request (std::unique_ptr (request.release ())); + if (node_id_handshake_finished) + { + add_request (std::unique_ptr (request.release ())); + } receive (); } } @@ -2137,7 +2183,10 @@ void nano::bootstrap_server::receive_confirm_req_action (boost::system::error_co std::unique_ptr request (new nano::confirm_req (error, stream, header_a)); if (!error) { - add_request (std::unique_ptr (request.release ())); + if (node_id_handshake_finished) + { + add_request (std::unique_ptr (request.release ())); + } receive (); } } @@ -2156,7 +2205,10 @@ void nano::bootstrap_server::receive_confirm_ack_action (boost::system::error_co std::unique_ptr request (new nano::confirm_ack (error, stream, header_a)); if (!error) { - add_request (std::unique_ptr (request.release ())); + if (node_id_handshake_finished) + { + add_request (std::unique_ptr (request.release ())); + } receive (); } } @@ -2175,7 +2227,10 @@ void nano::bootstrap_server::receive_node_id_handshake_action (boost::system::er std::unique_ptr request (new nano::node_id_handshake (error, stream, header_a)); if (!error) { - add_request (std::unique_ptr (request.release ())); + if (!node_id_handshake_finished) + { + add_request (std::unique_ptr (request.release ())); + } receive (); } } @@ -2187,6 +2242,7 @@ void nano::bootstrap_server::receive_node_id_handshake_action (boost::system::er void nano::bootstrap_server::add_request (std::unique_ptr message_a) { + assert (message_a != nullptr); std::lock_guard lock (mutex); auto start (requests.empty ()); requests.push (std::move (message_a)); @@ -2216,6 +2272,17 @@ void nano::bootstrap_server::finish_request () } } +void nano::bootstrap_server::finish_request_async () +{ + std::weak_ptr this_w (shared_from_this ()); + node->background ([this_w]() { + if (auto this_l = this_w.lock ()) + { + this_l->finish_request (); + } + }); +} + void nano::bootstrap_server::timeout () { if (socket != nullptr) @@ -2224,7 +2291,7 @@ void nano::bootstrap_server::timeout () { if (node->config.logging.bulk_pull_logging ()) { - node->logger.try_log ("Closing bootstrap server by timeout"); + node->logger.try_log ("Closing incoming tcp / bootstrap server by timeout"); } { std::lock_guard lock (node->bootstrap.mutex); @@ -2252,45 +2319,40 @@ class request_response_visitor : public nano::message_visitor virtual ~request_response_visitor () = default; void keepalive (nano::keepalive const & message_a) override { - if (connection->node->config.logging.network_keepalive_logging ()) - { - connection->node->logger.try_log (boost::str (boost::format ("Received keepalive message from %1%") % connection->socket->remote_endpoint ())); - } - connection->node->stats.inc (nano::stat::type::message, nano::stat::detail::keepalive, nano::stat::dir::in); - connection->node->network.merge_peers (message_a.peers); - nano::keepalive message; - connection->node->network.udp_channels.random_fill (message.peers); - auto bytes = message.to_bytes (); - if (connection->node->config.logging.network_keepalive_logging ()) + bool first_keepalive (connection->keepalive_first); + if (first_keepalive) { - connection->node->logger.try_log (boost::str (boost::format ("Keepalive req sent to %1%") % connection->socket->remote_endpoint ())); + connection->keepalive_first = false; } - connection->socket->async_write (bytes, [connection = connection](boost::system::error_code const & ec, size_t size_a) { - if (ec) - { - if (connection->node->config.logging.network_keepalive_logging ()) - { - connection->node->logger.try_log (boost::str (boost::format ("Error sending keepalive to %1%: %2%") % connection->socket->remote_endpoint () % ec.message ())); - } - } - else - { - connection->node->stats.inc (nano::stat::type::message, nano::stat::detail::keepalive, nano::stat::dir::out); - connection->finish_request (); - } + connection->finish_request_async (); + auto connection_l (connection->shared_from_this ()); + connection->node->background ([connection_l, message_a, first_keepalive]() { + connection_l->node->network.tcp_channels.process_keepalive (message_a, connection_l->remote_endpoint, first_keepalive); }); } - void publish (nano::publish const &) override + void publish (nano::publish const & message_a) override { - assert (false); + connection->finish_request_async (); + auto connection_l (connection->shared_from_this ()); + connection->node->background ([connection_l, message_a]() { + connection_l->node->network.tcp_channels.process_message (message_a, connection_l->remote_endpoint, connection_l->remote_node_id); + }); } - void confirm_req (nano::confirm_req const &) override + void confirm_req (nano::confirm_req const & message_a) override { - assert (false); + connection->finish_request_async (); + auto connection_l (connection->shared_from_this ()); + connection->node->background ([connection_l, message_a]() { + connection_l->node->network.tcp_channels.process_message (message_a, connection_l->remote_endpoint, connection_l->remote_node_id); + }); } - void confirm_ack (nano::confirm_ack const &) override + void confirm_ack (nano::confirm_ack const & message_a) override { - assert (false); + connection->finish_request_async (); + auto connection_l (connection->shared_from_this ()); + connection->node->background ([connection_l, message_a]() { + connection_l->node->network.tcp_channels.process_message (message_a, connection_l->remote_endpoint, connection_l->remote_node_id); + }); } void bulk_pull (nano::bulk_pull const &) override { @@ -2312,9 +2374,59 @@ class request_response_visitor : public nano::message_visitor auto response (std::make_shared (connection, std::unique_ptr (static_cast (connection->requests.front ().release ())))); response->send_next (); } - void node_id_handshake (nano::node_id_handshake const &) override + void node_id_handshake (nano::node_id_handshake const & message_a) override { - assert (false); + if (connection->node->config.logging.network_node_id_handshake_logging ()) + { + connection->node->logger.try_log (boost::str (boost::format ("Received node_id_handshake message from %1%") % connection->remote_endpoint)); + } + if (message_a.query) + { + boost::optional> response (std::make_pair (connection->node->node_id.pub, nano::sign_message (connection->node->node_id.prv, connection->node->node_id.pub, *message_a.query))); + assert (!nano::validate_message (response->first, *message_a.query, response->second)); + auto cookie (connection->node->network.tcp_channels.assign_syn_cookie (connection->remote_endpoint)); + nano::node_id_handshake response_message (cookie, response); + auto bytes = response_message.to_bytes (); + connection->socket->async_write (bytes, [ bytes, connection = connection ](boost::system::error_code const & ec, size_t size_a) { + if (ec) + { + if (connection->node->config.logging.network_node_id_handshake_logging ()) + { + connection->node->logger.try_log (boost::str (boost::format ("Error sending node_id_handshake to %1%: %2%") % connection->remote_endpoint % ec.message ())); + } + // Stop invalid handshake + connection->stop (); + } + else + { + connection->node->stats.inc (nano::stat::type::message, nano::stat::detail::node_id_handshake, nano::stat::dir::out); + connection->finish_request (); + } + }); + } + else if (message_a.response) + { + connection->remote_node_id = message_a.response->first; + if (!connection->node->network.tcp_channels.validate_syn_cookie (connection->remote_endpoint, connection->remote_node_id, message_a.response->second) && connection->remote_node_id != connection->node->node_id.pub) + { + connection->node_id_handshake_finished = true; + ++connection->node->bootstrap.realtime_count; + connection->finish_request_async (); + } + else + { + // Stop invalid handshake + connection->stop (); + } + } + else + { + connection->finish_request_async (); + } + auto connection_l (connection->shared_from_this ()); + connection->node->background ([connection_l, message_a]() { + connection_l->node->network.tcp_channels.process_message (message_a, connection_l->remote_endpoint, connection_l->remote_node_id); + }); } std::shared_ptr connection; }; @@ -2327,6 +2439,16 @@ void nano::bootstrap_server::run_next () requests.front ()->visit (visitor); } +bool nano::bootstrap_server::is_bootstrap_connection () +{ + if (!bootstrap_connection && !node->flags.disable_bootstrap_listener && node->bootstrap.bootstrap_count < node->config.bootstrap_connections_max) + { + ++node->bootstrap.bootstrap_count; + bootstrap_connection = true; + } + return bootstrap_connection; +} + /** * Handle a request for the pull of all blocks associated with an account * The account is supplied as the "start" member, and the final block to diff --git a/nano/node/bootstrap.hpp b/nano/node/bootstrap.hpp index 5352209410..0067971c67 100644 --- a/nano/node/bootstrap.hpp +++ b/nano/node/bootstrap.hpp @@ -279,6 +279,8 @@ class bootstrap_listener final nano::node & node; std::shared_ptr listening_socket; bool on; + std::atomic bootstrap_count{ 0 }; + std::atomic realtime_count{ 0 }; private: uint16_t port; @@ -292,6 +294,7 @@ class bootstrap_server final : public std::enable_shared_from_this, std::shared_ptr); ~bootstrap_server (); + void stop (); void receive (); void receive_header_action (boost::system::error_code const &, size_t); void receive_bulk_pull_action (boost::system::error_code const &, size_t, nano::message_header const &); @@ -304,13 +307,21 @@ class bootstrap_server final : public std::enable_shared_from_this); void finish_request (); + void finish_request_async (); void run_next (); void timeout (); + bool is_bootstrap_connection (); std::shared_ptr> receive_buffer; std::shared_ptr socket; std::shared_ptr node; std::mutex mutex; std::queue> requests; + std::atomic stopped{ false }; + std::atomic bootstrap_connection{ false }; + std::atomic node_id_handshake_finished{ false }; + std::atomic keepalive_first{ true }; + nano::tcp_endpoint remote_endpoint{ boost::asio::ip::address_v6::any (), 0 }; + nano::account remote_node_id{ 0 }; }; class bulk_pull; class bulk_pull_server final : public std::enable_shared_from_this diff --git a/nano/node/common.cpp b/nano/node/common.cpp index 776831e8fe..4290b9eaa9 100644 --- a/nano/node/common.cpp +++ b/nano/node/common.cpp @@ -551,6 +551,7 @@ block (block_a) { header.block_type_set (block->type ()); } + nano::confirm_req::confirm_req (std::vector> const & roots_hashes_a) : message (nano::message_type::confirm_req), roots_hashes (roots_hashes_a) diff --git a/nano/node/common.hpp b/nano/node/common.hpp index 41c4759c51..1e1c1dc7bd 100644 --- a/nano/node/common.hpp +++ b/nano/node/common.hpp @@ -147,6 +147,24 @@ struct hash<::nano::endpoint> return hash (endpoint_a); } }; +template <> +struct hash<::nano::tcp_endpoint> +{ + size_t operator() (::nano::tcp_endpoint const & endpoint_a) const + { + std::hash<::nano::tcp_endpoint> hash; + return hash (endpoint_a); + } +}; +template <> +struct hash +{ + size_t operator() (boost::asio::ip::address const & ip_a) const + { + std::hash hash; + return hash (ip_a); + } +}; } namespace nano diff --git a/nano/node/json_handler.cpp b/nano/node/json_handler.cpp index 8be6142ce1..873998b29d 100644 --- a/nano/node/json_handler.cpp +++ b/nano/node/json_handler.cpp @@ -2471,8 +2471,10 @@ void nano::json_handler::peers () { boost::property_tree::ptree peers_l; const bool peer_details = request.get ("peer_details", false); - auto peers_list (node.network.udp_channels.list (std::numeric_limits::max ())); - std::sort (peers_list.begin (), peers_list.end ()); + auto peers_list (node.network.list (std::numeric_limits::max ())); + std::sort (peers_list.begin (), peers_list.end (), [](const auto & lhs, const auto & rhs) { + return lhs->get_endpoint () < rhs->get_endpoint (); + }); for (auto i (peers_list.begin ()), n (peers_list.end ()); i != n; ++i) { std::stringstream text; @@ -4356,11 +4358,12 @@ void nano::json_handler::work_generate () auto callback = [rpc_l, hash, this](boost::optional const & work_a) { if (work_a) { + uint64_t work (work_a.value ()); boost::property_tree::ptree response_l; - response_l.put ("work", nano::to_string_hex (work_a.value ())); + response_l.put ("work", nano::to_string_hex (work)); std::stringstream ostream; uint64_t result_difficulty; - nano::work_validate (hash, work_a.value (), &result_difficulty); + nano::work_validate (hash, work, &result_difficulty); response_l.put ("difficulty", nano::to_string_hex (result_difficulty)); auto multiplier = nano::difficulty::to_multiplier (result_difficulty, this->node.network_params.network.publish_threshold); response_l.put ("multiplier", nano::to_string (multiplier)); diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 3e922d3728..92032bcce4 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -34,6 +34,7 @@ buffer_container (node_a.stats, nano::network::buffer_size, 4096), // 2Mb receiv resolver (node_a.io_ctx), node (node_a), udp_channels (node_a, port_a), +tcp_channels (node_a), disconnect_observer ([]() {}) { boost::thread::attributes attrs; @@ -86,11 +87,13 @@ void nano::network::start () { ongoing_cleanup (); udp_channels.start (); + tcp_channels.start (); } void nano::network::stop () { udp_channels.stop (); + tcp_channels.stop (); resolver.cancel (); buffer_container.stop (); } @@ -98,23 +101,29 @@ void nano::network::stop () void nano::network::send_keepalive (std::shared_ptr channel_a) { nano::keepalive message; - udp_channels.random_fill (message.peers); + random_fill (message.peers); channel_a->send (message); } void nano::network::send_keepalive_self (std::shared_ptr channel_a) { nano::keepalive message; - udp_channels.random_fill (message.peers); if (node.config.external_address != boost::asio::ip::address_v6{} && node.config.external_port != 0) { message.peers[0] = nano::endpoint (node.config.external_address, node.config.external_port); } else { - message.peers[0] = nano::endpoint (boost::asio::ip::address_v6{}, endpoint ().port ()); - message.peers[1] = node.port_mapping.external_address (); - message.peers[2] = nano::endpoint (boost::asio::ip::address_v6{}, node.port_mapping.external_address ().port ()); // If UPnP reported wrong external IP address + auto external_address (node.port_mapping.external_address ()); + if (external_address.address () != boost::asio::ip::address_v4::any ()) + { + message.peers[0] = nano::endpoint (boost::asio::ip::address_v6{}, endpoint ().port ()); + message.peers[1] = external_address; + } + else + { + message.peers[0] = nano::endpoint (boost::asio::ip::address_v6{}, endpoint ().port ()); + } } channel_a->send (message); } @@ -128,12 +137,21 @@ void nano::node::keepalive (std::string const & address_a, uint16_t port_a) for (auto i (i_a), n (boost::asio::ip::udp::resolver::iterator{}); i != n; ++i) { auto endpoint (nano::transport::map_endpoint_to_v6 (i->endpoint ())); + std::weak_ptr node_w (node_l); auto channel (node_l->network.find_channel (endpoint)); if (!channel) { - channel = std::make_shared (node_l->network.udp_channels, endpoint); + node_l->network.tcp_channels.start_tcp (endpoint, [node_w](std::shared_ptr channel_a) { + if (auto node_l = node_w.lock ()) + { + node_l->network.send_keepalive (channel_a); + } + }); + } + else + { + node_l->network.send_keepalive (channel); } - node_l->network.send_keepalive (channel); } } else @@ -253,7 +271,7 @@ bool nano::network::send_votes_cache (std::shared_ptr void nano::network::flood_message (nano::message const & message_a) { - auto list (node.network.udp_channels.list_fanout ()); + auto list (list_fanout ()); for (auto i (list.begin ()), n (list.end ()); i != n; ++i) { (*i)->send (message_a); @@ -283,7 +301,7 @@ void nano::network::broadcast_confirm_req (std::shared_ptr block_a) if (list->empty () || node.rep_crawler.total_weight () < node.config.online_weight_minimum.number ()) { // broadcast request to all peers (with max limit 2 * sqrt (peers count)) - auto peers (node.network.udp_channels.list (std::min (static_cast (100), 2 * node.network.size_sqrt ()))); + auto peers (node.network.list (std::min (static_cast (100), 2 * node.network.size_sqrt ()))); list->clear (); for (auto & peer : peers) { @@ -566,8 +584,13 @@ void nano::network::merge_peer (nano::endpoint const & peer_a) { if (!reachout (peer_a, node.config.allow_local_peers)) { - auto channel (std::make_shared (node.network.udp_channels, peer_a)); - send_keepalive (channel); + std::weak_ptr node_w (node.shared ()); + node.network.tcp_channels.start_tcp (peer_a, [node_w](std::shared_ptr channel_a) { + if (auto node_l = node_w.lock ()) + { + node_l->network.send_keepalive (channel_a); + } + }); } } @@ -596,17 +619,152 @@ bool nano::network::reachout (nano::endpoint const & endpoint_a, bool allow_loca if (!error) { error |= udp_channels.reachout (endpoint_a); + error |= tcp_channels.reachout (endpoint_a); } return error; } +std::deque> nano::network::list (size_t count_a) +{ + std::deque> result; + tcp_channels.list (result); + udp_channels.list (result); + random_pool::shuffle (result.begin (), result.end ()); + if (result.size () > count_a) + { + result.resize (count_a, nullptr); + } + return result; +} + +// Simulating with sqrt_broadcast_simulate shows we only need to broadcast to sqrt(total_peers) random peers in order to successfully publish to everyone with high probability +std::deque> nano::network::list_fanout () +{ + auto result (list (size_sqrt ())); + return result; +} + +std::unordered_set> nano::network::random_set (size_t count_a) const +{ + std::unordered_set> result (tcp_channels.random_set (count_a)); + std::unordered_set> udp_random (udp_channels.random_set (count_a)); + for (auto i (udp_random.begin ()), n (udp_random.end ()); i != n && result.size () < count_a * 1.5; ++i) + { + result.insert (*i); + } + while (result.size () > count_a) + { + result.erase (result.begin ()); + } + return result; +} + +void nano::network::random_fill (std::array & target_a) const +{ + auto peers (random_set (target_a.size ())); + assert (peers.size () <= target_a.size ()); + auto endpoint (nano::endpoint (boost::asio::ip::address_v6{}, 0)); + assert (endpoint.address ().is_v6 ()); + std::fill (target_a.begin (), target_a.end (), endpoint); + auto j (target_a.begin ()); + for (auto i (peers.begin ()), n (peers.end ()); i != n; ++i, ++j) + { + assert ((*i)->get_endpoint ().address ().is_v6 ()); + assert (j < target_a.end ()); + *j = (*i)->get_endpoint (); + } +} + +nano::tcp_endpoint nano::network::bootstrap_peer () +{ + auto result (udp_channels.bootstrap_peer ()); + if (result == nano::tcp_endpoint (boost::asio::ip::address_v6::any (), 0)) + { + result = tcp_channels.bootstrap_peer (); + } + return result; +} + std::shared_ptr nano::network::find_channel (nano::endpoint const & endpoint_a) { - std::shared_ptr result; - result = udp_channels.channel (endpoint_a); + std::shared_ptr result (tcp_channels.find_channel (nano::transport::map_endpoint_to_tcp (endpoint_a))); + if (!result) + { + result = udp_channels.channel (endpoint_a); + } + return result; +} + +std::shared_ptr nano::network::find_node_id (nano::account const & node_id_a) +{ + std::shared_ptr result (tcp_channels.find_node_id (node_id_a)); + if (!result) + { + result = udp_channels.find_node_id (node_id_a); + } return result; } +void nano::network::add_response_channels (nano::tcp_endpoint const & endpoint_a, std::vector insert_channels) +{ + std::lock_guard lock (response_channels_mutex); + response_channels.emplace (endpoint_a, insert_channels); +} + +std::shared_ptr nano::network::search_response_channel (nano::tcp_endpoint const & endpoint_a, nano::account const & node_id_a) +{ + // Search by node ID + std::shared_ptr result (find_node_id (node_id_a)); + if (!result) + { + // Search in response channels + std::unique_lock lock (response_channels_mutex); + auto existing (response_channels.find (endpoint_a)); + if (existing != response_channels.end ()) + { + auto channels_list (existing->second); + lock.unlock (); + // TCP + for (auto & i : channels_list) + { + auto search_channel (tcp_channels.find_channel (i)); + if (search_channel != nullptr) + { + result = search_channel; + break; + } + } + // UDP + if (!result) + { + for (auto & i : channels_list) + { + auto udp_endpoint (nano::transport::map_tcp_to_endpoint (i)); + auto search_channel (udp_channels.channel (udp_endpoint)); + if (search_channel != nullptr) + { + result = search_channel; + break; + } + } + } + } + } + return result; +} + +void nano::network::remove_response_channel (nano::tcp_endpoint const & endpoint_a) +{ + std::lock_guard lock (response_channels_mutex); + response_channels.erase (endpoint_a); +} + +size_t nano::network::response_channels_size () +{ + std::lock_guard lock (response_channels_mutex); + return response_channels.size (); +} + bool nano::operation::operator> (nano::operation const & other_a) const { return wakeup > other_a.wakeup; @@ -1209,7 +1367,14 @@ startup_time (std::chrono::steady_clock::now ()) }); } observers.endpoint.add ([this](std::shared_ptr channel_a) { - this->network.send_keepalive (channel_a); + if (channel_a->get_type () == nano::transport::transport_type::udp) + { + this->network.send_keepalive (channel_a); + } + else + { + this->network.send_keepalive_self (channel_a); + } }); observers.vote.add ([this](nano::transaction const & transaction, std::shared_ptr vote_a, std::shared_ptr channel_a) { this->gap_cache.vote (vote_a); @@ -1471,6 +1636,7 @@ std::unique_ptr collect_seq_con_info (node & node, const composite->add_component (collect_seq_con_info (node.active, "active")); composite->add_component (collect_seq_con_info (node.bootstrap_initiator, "bootstrap_initiator")); composite->add_component (collect_seq_con_info (node.bootstrap, "bootstrap")); + composite->add_component (node.network.tcp_channels.collect_seq_con_info ("tcp_channels")); composite->add_component (node.network.udp_channels.collect_seq_con_info ("udp_channels")); composite->add_component (collect_seq_con_info (node.observers, "observers")); composite->add_component (collect_seq_con_info (node.wallets, "wallets")); @@ -1625,7 +1791,7 @@ void nano::node::start () ongoing_rep_calculation (); ongoing_peer_store (); ongoing_online_weight_calculation_queue (); - if (!flags.disable_bootstrap_listener) + if (config.tcp_incoming_connections_max > 0) { bootstrap.start (); } @@ -1776,7 +1942,8 @@ void nano::node::ongoing_store_flush () void nano::node::ongoing_peer_store () { - network.udp_channels.store_all (*this); + bool stored (network.tcp_channels.store_all (true)); + network.udp_channels.store_all (!stored); std::weak_ptr node_w (shared_from_this ()); alarm.add (std::chrono::steady_clock::now () + network_params.node.peer_interval, [node_w]() { if (auto node_l = node_w.lock ()) @@ -2228,9 +2395,14 @@ void nano::node::add_initial_peers () nano::endpoint endpoint (boost::asio::ip::address_v6 (i->first.address_bytes ()), i->first.port ()); if (!network.reachout (endpoint, config.allow_local_peers)) { - auto channel (std::make_shared (network.udp_channels, endpoint)); - network.send_keepalive (channel); - rep_crawler.query (channel); + std::weak_ptr node_w (shared_from_this ()); + network.tcp_channels.start_tcp (endpoint, [node_w](std::shared_ptr channel_a) { + if (auto node_l = node_w.lock ()) + { + node_l->network.send_keepalive (channel_a); + node_l->rep_crawler.query (channel_a); + } + }); } } } @@ -2414,7 +2586,8 @@ nano::endpoint nano::network::endpoint () void nano::network::cleanup (std::chrono::steady_clock::time_point const & cutoff_a) { - node.network.udp_channels.purge (cutoff_a); + tcp_channels.purge (cutoff_a); + udp_channels.purge (cutoff_a); if (node.network.empty ()) { disconnect_observer (); @@ -2435,7 +2608,7 @@ void nano::network::ongoing_cleanup () size_t nano::network::size () const { - return udp_channels.size (); + return tcp_channels.size () + udp_channels.size (); } size_t nano::network::size_sqrt () const diff --git a/nano/node/node.hpp b/nano/node/node.hpp index c002255c74..83d0b933e3 100644 --- a/nano/node/node.hpp +++ b/nano/node/node.hpp @@ -13,6 +13,7 @@ #include #include #include +#include #include #include #include @@ -262,10 +263,23 @@ class network final void broadcast_confirm_req_batch (std::deque, std::shared_ptr>>>>, unsigned = broadcast_interval_ms); void confirm_hashes (nano::transaction const &, std::shared_ptr, std::vector); bool send_votes_cache (std::shared_ptr, nano::block_hash const &); + std::shared_ptr find_node_id (nano::account const &); std::shared_ptr find_channel (nano::endpoint const &); bool not_a_peer (nano::endpoint const &, bool); // Should we reach out to this endpoint with a keepalive message bool reachout (nano::endpoint const &, bool = false); + std::deque> list (size_t); + // A list of random peers sized for the configured rebroadcast fanout + std::deque> list_fanout (); + void random_fill (std::array &) const; + std::unordered_set> random_set (size_t) const; + // Get the next peer for attempting a tcp bootstrap connection + nano::tcp_endpoint bootstrap_peer (); + // Response channels + void add_response_channels (nano::tcp_endpoint const &, std::vector); + std::shared_ptr search_response_channel (nano::tcp_endpoint const &, nano::account const &); + void remove_response_channel (nano::tcp_endpoint const &); + size_t response_channels_size (); nano::endpoint endpoint (); void cleanup (std::chrono::steady_clock::time_point const &); void ongoing_cleanup (); @@ -277,12 +291,17 @@ class network final std::vector packet_processing_threads; nano::node & node; nano::transport::udp_channels udp_channels; + nano::transport::tcp_channels tcp_channels; std::function disconnect_observer; // Called when a new channel is observed std::function)> channel_observer; static unsigned const broadcast_interval_ms = 10; static size_t const buffer_size = 512; static size_t const confirm_req_hashes_max = 6; + +private: + std::mutex response_channels_mutex; + std::unordered_map> response_channels; }; class node_init final diff --git a/nano/node/nodeconfig.cpp b/nano/node/nodeconfig.cpp index 814b3a4f32..bb8ea6d055 100644 --- a/nano/node/nodeconfig.cpp +++ b/nano/node/nodeconfig.cpp @@ -119,6 +119,7 @@ nano::error nano::node_config::serialize_json (nano::jsonconfig & json) const json.put ("pow_sleep_interval", pow_sleep_interval.count ()); json.put ("external_address", external_address.to_string ()); json.put ("external_port", external_port); + json.put ("tcp_incoming_connections_max", tcp_incoming_connections_max); nano::jsonconfig websocket_l; websocket_config.serialize_json (websocket_l); json.put_child ("websocket", websocket_l); @@ -243,6 +244,7 @@ bool nano::node_config::upgrade_json (unsigned version_a, nano::jsonconfig & jso json.put (pow_sleep_interval_key, pow_sleep_interval.count ()); json.put ("external_address", external_address.to_string ()); json.put ("external_port", external_port); + json.put ("tcp_incoming_connections_max", tcp_incoming_connections_max); } case 17: break; @@ -384,6 +386,7 @@ nano::error nano::node_config::deserialize_json (bool & upgraded_a, nano::jsonco json.get (signature_checker_threads_key, signature_checker_threads); json.get ("external_address", external_address); json.get ("external_port", external_port); + json.get ("tcp_incoming_connections_max", tcp_incoming_connections_max); auto pow_sleep_interval_l (pow_sleep_interval.count ()); json.get (pow_sleep_interval_key, pow_sleep_interval_l); diff --git a/nano/node/nodeconfig.hpp b/nano/node/nodeconfig.hpp index 99878d9850..d02906e47f 100644 --- a/nano/node/nodeconfig.hpp +++ b/nano/node/nodeconfig.hpp @@ -66,6 +66,8 @@ class node_config /** Default maximum idle time for a socket before it's automatically closed */ std::chrono::seconds tcp_idle_timeout{ std::chrono::minutes (2) }; std::chrono::nanoseconds pow_sleep_interval{ 0 }; + /** Default maximum incoming TCP connections, including realtime network & bootstrap */ + unsigned tcp_incoming_connections_max{ 1024 }; static std::chrono::seconds constexpr keepalive_period = std::chrono::seconds (60); static std::chrono::seconds constexpr keepalive_cutoff = keepalive_period * 5; static std::chrono::minutes constexpr wallet_backup_interval = std::chrono::minutes (5); diff --git a/nano/node/repcrawler.cpp b/nano/node/repcrawler.cpp index 2fb0028172..cc0191b038 100644 --- a/nano/node/repcrawler.cpp +++ b/nano/node/repcrawler.cpp @@ -78,7 +78,7 @@ std::vector> nano::rep_crawler::get_cr required_peer_count += required_peer_count / 2; // The rest of the endpoints are picked randomly - auto random_peers (node.network.udp_channels.random_set (required_peer_count)); + auto random_peers (node.network.random_set (required_peer_count)); std::vector> result; result.insert (result.end (), random_peers.begin (), random_peers.end ()); return result; @@ -89,6 +89,15 @@ void nano::rep_crawler::query (std::vector block (node.store.block_random (transaction)); auto hash (block->hash ()); + // Don't send same block multiple times in tests + if (node.network_params.network.is_test_network ()) + { + for (auto i (0); exists (hash) && i < 4; ++i) + { + block = node.store.block_random (transaction); + hash = block->hash (); + } + } add (hash); for (auto i (channels_a.begin ()), n (channels_a.end ()); i != n; ++i) { diff --git a/nano/node/testing.cpp b/nano/node/testing.cpp index 55c7d7df30..bc77c204cf 100644 --- a/nano/node/testing.cpp +++ b/nano/node/testing.cpp @@ -21,7 +21,7 @@ std::string nano::error_system_messages::message (int ev) const } /** Returns the node added. */ -std::shared_ptr nano::system::add_node (nano::node_config const & node_config_a, bool delay_frontier_confirmation_height_updating_a) +std::shared_ptr nano::system::add_node (nano::node_config const & node_config_a, bool delay_frontier_confirmation_height_updating_a, nano::transport::transport_type type_a) { nano::node_init init; auto node (std::make_shared (init, io_ctx, nano::unique_path (), alarm, node_config_a, work, node_flags (), delay_frontier_confirmation_height_updating_a)); @@ -40,17 +40,39 @@ std::shared_ptr nano::system::add_node (nano::node_config const & no auto node1 (*i); auto node2 (*j); auto starting1 (node1->network.size ()); + size_t starting_listener1 (node1->bootstrap.realtime_count); decltype (starting1) new1; auto starting2 (node2->network.size ()); + size_t starting_listener2 (node2->bootstrap.realtime_count); decltype (starting2) new2; - auto channel (std::make_shared ((*j)->network.udp_channels, (*i)->network.endpoint ())); - (*j)->network.send_keepalive (channel); + if (type_a == nano::transport::transport_type::tcp) + { + (*j)->network.merge_peer ((*i)->network.endpoint ()); + } + else + { + // UDP connection + auto channel (std::make_shared ((*j)->network.udp_channels, (*i)->network.endpoint ())); + (*j)->network.send_keepalive (channel); + } do { poll (); new1 = node1->network.size (); new2 = node2->network.size (); } while (new1 == starting1 || new2 == starting2); + if (type_a == nano::transport::transport_type::tcp) + { + // Wait for initial connection finish + decltype (starting_listener1) new_listener1; + decltype (starting_listener2) new_listener2; + do + { + poll (); + new_listener1 = node1->bootstrap.realtime_count; + new_listener2 = node2->bootstrap.realtime_count; + } while (new_listener1 == starting_listener1 || new_listener2 == starting_listener2); + } } auto iterations1 (0); while (std::any_of (begin, nodes.end (), [](std::shared_ptr const & node_a) { return node_a->bootstrap_initiator.in_progress (); })) @@ -84,14 +106,14 @@ nano::system::system () logging.init (nano::unique_path ()); } -nano::system::system (uint16_t port_a, uint16_t count_a) : +nano::system::system (uint16_t port_a, uint16_t count_a, nano::transport::transport_type type_a) : system () { nodes.reserve (count_a); for (uint16_t i (0); i < count_a; ++i) { nano::node_config config (port_a + i, logging); - nano::system::add_node (config, false); + nano::system::add_node (config, false, type_a); } } diff --git a/nano/node/testing.hpp b/nano/node/testing.hpp index 91beda6739..aaff94a453 100644 --- a/nano/node/testing.hpp +++ b/nano/node/testing.hpp @@ -17,7 +17,7 @@ class system final { public: system (); - system (uint16_t, uint16_t); + system (uint16_t, uint16_t, nano::transport::transport_type = nano::transport::transport_type::tcp); ~system (); void generate_activity (nano::node &, std::vector &); void generate_mass_activity (uint32_t, nano::node &); @@ -40,7 +40,7 @@ class system final std::error_code poll (const std::chrono::nanoseconds & sleep_time = std::chrono::milliseconds (50)); void stop (); void deadline_set (const std::chrono::duration & delta); - std::shared_ptr add_node (nano::node_config const &, bool = false); + std::shared_ptr add_node (nano::node_config const &, bool = false, nano::transport::transport_type = nano::transport::transport_type::tcp); boost::asio::io_context io_ctx; nano::alarm alarm{ io_ctx }; std::vector> nodes; diff --git a/nano/node/transport/tcp.cpp b/nano/node/transport/tcp.cpp index c18c39dc01..d590cff1b5 100644 --- a/nano/node/transport/tcp.cpp +++ b/nano/node/transport/tcp.cpp @@ -35,17 +35,25 @@ bool nano::transport::channel_tcp::operator== (nano::transport::channel const & void nano::transport::channel_tcp::send_buffer (std::shared_ptr> buffer_a, nano::stat::detail detail_a, std::function const & callback_a) { - set_last_packet_sent (std::chrono::steady_clock::now ()); - socket->async_write (buffer_a, callback (buffer_a, detail_a, callback_a)); + socket->async_write (buffer_a, tcp_callback (buffer_a, detail_a, socket->remote_endpoint (), callback_a)); } std::function nano::transport::channel_tcp::callback (std::shared_ptr> buffer_a, nano::stat::detail detail_a, std::function const & callback_a) const +{ + return callback_a; +} + +std::function nano::transport::channel_tcp::tcp_callback (std::shared_ptr> buffer_a, nano::stat::detail detail_a, nano::tcp_endpoint const & endpoint_a, std::function const & callback_a) const { // clang-format off - return [ buffer_a, node = std::weak_ptr (node.shared ()), callback_a ](boost::system::error_code const & ec, size_t size_a) + return [ buffer_a, endpoint_a, node = std::weak_ptr (node.shared ()), callback_a ](boost::system::error_code const & ec, size_t size_a) { if (auto node_l = node.lock ()) { + if (!ec) + { + node_l->network.tcp_channels.update (endpoint_a); + } if (ec == boost::system::errc::host_unreachable) { node_l->stats.inc (nano::stat::type::error, nano::stat::detail::unreachable_host, nano::stat::dir::out); @@ -63,3 +71,570 @@ std::string nano::transport::channel_tcp::to_string () const { return boost::str (boost::format ("%1%") % socket->remote_endpoint ()); } + +nano::transport::tcp_channels::tcp_channels (nano::node & node_a) : +node (node_a) +{ +} + +bool nano::transport::tcp_channels::insert (std::shared_ptr channel_a) +{ + auto endpoint (channel_a->get_tcp_endpoint ()); + assert (endpoint.address ().is_v6 ()); + auto udp_endpoint (nano::transport::map_tcp_to_endpoint (endpoint)); + bool error (true); + if (!node.network.not_a_peer (udp_endpoint, node.config.allow_local_peers)) + { + std::unique_lock lock (mutex); + auto existing (channels.get ().find (endpoint)); + if (existing == channels.get ().end ()) + { + channels.get ().insert ({ channel_a }); + error = false; + lock.unlock (); + node.network.channel_observer (channel_a); + // Remove UDP channel to same IP:port if exists + node.network.udp_channels.erase (udp_endpoint); + } + } + return error; +} + +void nano::transport::tcp_channels::erase (nano::tcp_endpoint const & endpoint_a) +{ + std::lock_guard lock (mutex); + channels.get ().erase (endpoint_a); +} + +size_t nano::transport::tcp_channels::size () const +{ + std::lock_guard lock (mutex); + return channels.size (); +} + +std::shared_ptr nano::transport::tcp_channels::find_channel (nano::tcp_endpoint const & endpoint_a) const +{ + std::lock_guard lock (mutex); + std::shared_ptr result; + auto existing (channels.get ().find (endpoint_a)); + if (existing != channels.get ().end ()) + { + result = existing->channel; + } + return result; +} + +std::unordered_set> nano::transport::tcp_channels::random_set (size_t count_a) const +{ + std::unordered_set> result; + result.reserve (count_a); + std::lock_guard lock (mutex); + // Stop trying to fill result with random samples after this many attempts + auto random_cutoff (count_a * 2); + auto peers_size (channels.size ()); + // Usually count_a will be much smaller than peers.size() + // Otherwise make sure we have a cutoff on attempting to randomly fill + if (!channels.empty ()) + { + for (auto i (0); i < random_cutoff && result.size () < count_a; ++i) + { + auto index (nano::random_pool::generate_word32 (0, static_cast (peers_size - 1))); + result.insert (channels.get ()[index].channel); + } + } + return result; +} + +void nano::transport::tcp_channels::random_fill (std::array & target_a) const +{ + auto peers (random_set (target_a.size ())); + assert (peers.size () <= target_a.size ()); + auto endpoint (nano::endpoint (boost::asio::ip::address_v6{}, 0)); + assert (endpoint.address ().is_v6 ()); + std::fill (target_a.begin (), target_a.end (), endpoint); + auto j (target_a.begin ()); + for (auto i (peers.begin ()), n (peers.end ()); i != n; ++i, ++j) + { + assert ((*i)->get_endpoint ().address ().is_v6 ()); + assert (j < target_a.end ()); + *j = (*i)->get_endpoint (); + } +} + +bool nano::transport::tcp_channels::store_all (bool clear_peers) +{ + // We can't hold the mutex while starting a write transaction, so + // we collect endpoints to be saved and then relase the lock. + std::vector endpoints; + { + std::lock_guard lock (mutex); + endpoints.reserve (channels.size ()); + std::transform (channels.begin (), channels.end (), + std::back_inserter (endpoints), [](const auto & channel) { return nano::transport::map_tcp_to_endpoint (channel.endpoint ()); }); + } + bool result (false); + if (!endpoints.empty ()) + { + // Clear all peers then refresh with the current list of peers + auto transaction (node.store.tx_begin_write ()); + if (clear_peers) + { + node.store.peer_clear (transaction); + } + for (auto endpoint : endpoints) + { + nano::endpoint_key endpoint_key (endpoint.address ().to_v6 ().to_bytes (), endpoint.port ()); + node.store.peer_put (transaction, std::move (endpoint_key)); + } + result = true; + } + return result; +} + +std::shared_ptr nano::transport::tcp_channels::find_node_id (nano::account const & node_id_a) +{ + std::shared_ptr result; + std::lock_guard lock (mutex); + auto existing (channels.get ().find (node_id_a)); + if (existing != channels.get ().end ()) + { + result = existing->channel; + } + return result; +} + +nano::tcp_endpoint nano::transport::tcp_channels::bootstrap_peer () +{ + nano::tcp_endpoint result (boost::asio::ip::address_v6::any (), 0); + std::lock_guard lock (mutex); + for (auto i (channels.get ().begin ()), n (channels.get ().end ()); i != n;) + { + if (i->channel->get_network_version () >= protocol_version_reasonable_min) + { + result = i->endpoint (); + channels.get ().modify (i, [](channel_tcp_wrapper & wrapper_a) { + wrapper_a.channel->set_last_bootstrap_attempt (std::chrono::steady_clock::now ()); + }); + i = n; + } + else + { + ++i; + } + } + return result; +} + +void nano::transport::tcp_channels::process_message (nano::message const & message_a, nano::tcp_endpoint const & endpoint_a, nano::account const & node_id_a) +{ + if (!stopped) + { + auto channel (node.network.find_channel (nano::transport::map_tcp_to_endpoint (endpoint_a))); + if (channel) + { + node.process_message (message_a, channel); + } + else + { + channel = node.network.search_response_channel (endpoint_a, node_id_a); + if (channel) + { + node.process_message (message_a, channel); + } + else + { + auto udp_channel (std::make_shared (node.network.udp_channels, nano::transport::map_tcp_to_endpoint (endpoint_a))); + node.process_message (message_a, udp_channel); + } + } + } +} + +void nano::transport::tcp_channels::process_keepalive (nano::keepalive const & message_a, nano::tcp_endpoint const & endpoint_a, bool keepalive_first) +{ + if (!max_ip_connections (endpoint_a)) + { + // Check for special node port data + std::vector insert_response_channels; + auto peer0 (message_a.peers[0]); + auto peer1 (message_a.peers[1]); + if (peer0.address () == boost::asio::ip::address_v6{} && peer0.port () != 0) + { + nano::endpoint new_endpoint (endpoint_a.address (), peer0.port ()); + node.network.merge_peer (new_endpoint); + if (keepalive_first) + { + insert_response_channels.push_back (nano::transport::map_endpoint_to_tcp (new_endpoint)); + } + } + if (peer1.address () != boost::asio::ip::address_v6{} && peer1.port () != 0 && keepalive_first) + { + insert_response_channels.push_back (nano::transport::map_endpoint_to_tcp (peer1)); + } + // Insert preferred response channels from first TCP keepalive + if (!insert_response_channels.empty ()) + { + node.network.add_response_channels (endpoint_a, insert_response_channels); + } + auto udp_channel (std::make_shared (node.network.udp_channels, nano::transport::map_tcp_to_endpoint (endpoint_a))); + node.process_message (message_a, udp_channel); + } +} + +void nano::transport::tcp_channels::start () +{ + ongoing_keepalive (); + ongoing_syn_cookie_cleanup (); +} + +void nano::transport::tcp_channels::stop () +{ + stopped = true; + // Close all TCP sockets + for (auto i (channels.begin ()), j (channels.end ()); i != j; ++i) + { + if (i->channel->socket != nullptr) + { + i->channel->socket->close (); + } + } +} + +bool nano::transport::tcp_channels::max_ip_connections (nano::tcp_endpoint const & endpoint_a) +{ + std::unique_lock lock (mutex); + bool result (channels.get ().count (endpoint_a.address ()) >= nano::transport::max_peers_per_ip); + return result; +} + +bool nano::transport::tcp_channels::reachout (nano::endpoint const & endpoint_a) +{ + auto tcp_endpoint (nano::transport::map_endpoint_to_tcp (endpoint_a)); + // Don't overload single IP + bool error = max_ip_connections (tcp_endpoint); + if (!error) + { + // Don't keepalive to nodes that already sent us something + error |= find_channel (tcp_endpoint) != nullptr; + std::lock_guard lock (mutex); + auto existing (attempts.find (tcp_endpoint)); + error |= existing != attempts.end (); + attempts.insert ({ tcp_endpoint, std::chrono::steady_clock::now () }); + } + return error; +} + +std::unique_ptr nano::transport::tcp_channels::collect_seq_con_info (std::string const & name) +{ + size_t channels_count = 0; + size_t attemps_count = 0; + size_t syn_cookies_count = 0; + size_t syn_cookies_per_ip_count = 0; + { + std::lock_guard guard (mutex); + channels_count = channels.size (); + attemps_count = attempts.size (); + } + { + std::lock_guard syn_cookie_guard (syn_cookie_mutex); + syn_cookies_count = syn_cookies.size (); + syn_cookies_per_ip_count = syn_cookies_per_ip.size (); + } + + auto composite = std::make_unique (name); + composite->add_component (std::make_unique (seq_con_info{ "channels", channels_count, sizeof (decltype (channels)::value_type) })); + composite->add_component (std::make_unique (seq_con_info{ "attempts", attemps_count, sizeof (decltype (attempts)::value_type) })); + composite->add_component (std::make_unique (seq_con_info{ "syn_cookies", syn_cookies_count, sizeof (decltype (syn_cookies)::value_type) })); + composite->add_component (std::make_unique (seq_con_info{ "syn_cookies_per_ip", syn_cookies_per_ip_count, sizeof (decltype (syn_cookies_per_ip)::value_type) })); + + return composite; +} + +void nano::transport::tcp_channels::purge (std::chrono::steady_clock::time_point const & cutoff_a) +{ + std::lock_guard lock (mutex); + auto disconnect_cutoff (channels.get ().lower_bound (cutoff_a)); + channels.get ().erase (channels.get ().begin (), disconnect_cutoff); + // Remove keepalive attempt tracking for attempts older than cutoff + auto attempts_cutoff (attempts.get<1> ().lower_bound (cutoff_a)); + attempts.get<1> ().erase (attempts.get<1> ().begin (), attempts_cutoff); +} + +boost::optional nano::transport::tcp_channels::assign_syn_cookie (nano::tcp_endpoint const & endpoint_a) +{ + auto ip_addr (endpoint_a.address ()); + assert (ip_addr.is_v6 ()); + std::lock_guard lock (syn_cookie_mutex); + unsigned & ip_cookies = syn_cookies_per_ip[ip_addr]; + boost::optional result; + if (ip_cookies < nano::transport::max_peers_per_ip) + { + if (syn_cookies.find (endpoint_a) == syn_cookies.end ()) + { + nano::uint256_union query; + random_pool::generate_block (query.bytes.data (), query.bytes.size ()); + syn_cookie_info info{ query, std::chrono::steady_clock::now () }; + syn_cookies[endpoint_a] = info; + ++ip_cookies; + result = query; + } + } + return result; +} + +bool nano::transport::tcp_channels::validate_syn_cookie (nano::tcp_endpoint const & endpoint_a, nano::account const & node_id, nano::signature const & sig) +{ + auto ip_addr (endpoint_a.address ()); + assert (ip_addr.is_v6 ()); + std::lock_guard lock (syn_cookie_mutex); + auto result (true); + auto cookie_it (syn_cookies.find (endpoint_a)); + if (cookie_it != syn_cookies.end () && !nano::validate_message (node_id, cookie_it->second.cookie, sig)) + { + result = false; + syn_cookies.erase (cookie_it); + unsigned & ip_cookies = syn_cookies_per_ip[ip_addr]; + if (ip_cookies > 0) + { + --ip_cookies; + } + else + { + assert (false && "More SYN cookies deleted than created for IP"); + } + } + return result; +} + +void nano::transport::tcp_channels::purge_syn_cookies (std::chrono::steady_clock::time_point const & cutoff_a) +{ + std::lock_guard lock (syn_cookie_mutex); + auto it (syn_cookies.begin ()); + while (it != syn_cookies.end ()) + { + auto info (it->second); + if (info.created_at < cutoff_a) + { + unsigned & per_ip = syn_cookies_per_ip[it->first.address ()]; + if (per_ip > 0) + { + --per_ip; + } + else + { + assert (false && "More SYN cookies deleted than created for IP"); + } + it = syn_cookies.erase (it); + } + else + { + ++it; + } + } +} + +void nano::transport::tcp_channels::ongoing_syn_cookie_cleanup () +{ + purge_syn_cookies (std::chrono::steady_clock::now () - nano::transport::syn_cookie_cutoff); + std::weak_ptr node_w (node.shared ()); + node.alarm.add (std::chrono::steady_clock::now () + (nano::transport::syn_cookie_cutoff * 2), [node_w]() { + if (auto node_l = node_w.lock ()) + { + node_l->network.tcp_channels.ongoing_syn_cookie_cleanup (); + } + }); +} + +void nano::transport::tcp_channels::ongoing_keepalive () +{ + nano::keepalive message; + node.network.random_fill (message.peers); + std::unique_lock lock (mutex); + // Wake up channels + std::vector> send_list; + auto keepalive_sent_cutoff (channels.get ().lower_bound (std::chrono::steady_clock::now () - node.network_params.node.period)); + for (auto i (channels.get ().begin ()); i != keepalive_sent_cutoff; ++i) + { + send_list.push_back (i->channel); + } + lock.unlock (); + for (auto & channel : send_list) + { + std::weak_ptr node_w (node.shared ()); + channel->send (message); + } + std::weak_ptr node_w (node.shared ()); + node.alarm.add (std::chrono::steady_clock::now () + node.network_params.node.period, [node_w]() { + if (auto node_l = node_w.lock ()) + { + node_l->network.tcp_channels.ongoing_keepalive (); + } + }); +} + +void nano::transport::tcp_channels::list (std::deque> & deque_a) +{ + std::lock_guard lock (mutex); + for (auto i (channels.begin ()), j (channels.end ()); i != j; ++i) + { + deque_a.push_back (i->channel); + } +} + +void nano::transport::tcp_channels::modify (std::shared_ptr channel_a, std::function)> modify_callback_a) +{ + std::lock_guard lock (mutex); + auto existing (channels.get ().find (channel_a->get_tcp_endpoint ())); + if (existing != channels.get ().end ()) + { + channels.get ().modify (existing, [modify_callback_a](channel_tcp_wrapper & wrapper_a) { + modify_callback_a (wrapper_a.channel); + }); + } +} + +void nano::transport::tcp_channels::update (nano::tcp_endpoint const & endpoint_a) +{ + std::lock_guard lock (mutex); + auto existing (channels.get ().find (endpoint_a)); + if (existing != channels.get ().end ()) + { + channels.get ().modify (existing, [](channel_tcp_wrapper & wrapper_a) { + wrapper_a.channel->set_last_packet_sent (std::chrono::steady_clock::now ()); + }); + } +} + +void nano::transport::tcp_channels::start_tcp (nano::endpoint const & endpoint_a, std::function)> const & callback_a) +{ + auto socket (std::make_shared (node.shared_from_this (), boost::none, nano::socket::concurrency::multi_writer)); + auto channel (std::make_shared (node, socket)); + std::weak_ptr node_w (node.shared ()); + channel->socket->async_connect (nano::transport::map_endpoint_to_tcp (endpoint_a), + [node_w, channel, endpoint_a, callback_a](boost::system::error_code const & ec) { + if (auto node_l = node_w.lock ()) + { + if (!ec && channel) + { + // TCP node ID handshake + auto cookie (node_l->network.tcp_channels.assign_syn_cookie (nano::transport::map_endpoint_to_tcp (endpoint_a))); + nano::node_id_handshake message (cookie, boost::none); + auto bytes = message.to_bytes (); + if (node_l->config.logging.network_node_id_handshake_logging ()) + { + node_l->logger.try_log (boost::str (boost::format ("Node ID handshake request sent with node ID %1% to %2%: query %3%") % node_l->node_id.pub.to_account () % endpoint_a % (*cookie).to_string ())); + } + std::shared_ptr> receive_buffer (std::make_shared> ()); + receive_buffer->resize (256); + channel->send_buffer (bytes, nano::stat::detail::node_id_handshake, [node_w, channel, endpoint_a, receive_buffer, callback_a](boost::system::error_code const & ec, size_t size_a) { + if (auto node_l = node_w.lock ()) + { + if (!ec && channel) + { + node_l->network.tcp_channels.start_tcp_receive_node_id (channel, endpoint_a, receive_buffer, callback_a); + } + else + { + if (node_l->config.logging.network_node_id_handshake_logging ()) + { + node_l->logger.try_log (boost::str (boost::format ("Error sending node_id_handshake to %1%: %2%") % endpoint_a % ec.message ())); + } + node_l->network.tcp_channels.udp_fallback (endpoint_a, callback_a); + } + } + }); + } + else + { + node_l->network.tcp_channels.udp_fallback (endpoint_a, callback_a); + } + } + }); +} + +void nano::transport::tcp_channels::start_tcp_receive_node_id (std::shared_ptr channel_a, nano::endpoint const & endpoint_a, std::shared_ptr> receive_buffer_a, std::function)> const & callback_a) +{ + std::weak_ptr node_w (node.shared ()); + channel_a->socket->async_read (receive_buffer_a, 8 + sizeof (nano::account) + sizeof (nano::account) + sizeof (nano::signature), [node_w, channel_a, endpoint_a, receive_buffer_a, callback_a](boost::system::error_code const & ec, size_t size_a) { + if (auto node_l = node_w.lock ()) + { + if (!ec && channel_a) + { + node_l->stats.inc (nano::stat::type::message, nano::stat::detail::node_id_handshake, nano::stat::dir::in); + auto error (false); + nano::bufferstream stream (receive_buffer_a->data (), size_a); + nano::message_header header (error, stream); + if (!error && header.type == nano::message_type::node_id_handshake && header.version_using >= nano::protocol_version_min) + { + nano::node_id_handshake message (error, stream, header); + if (!error && message.response && message.query) + { + channel_a->set_network_version (header.version_using); + auto node_id (message.response->first); + if (!node_l->network.tcp_channels.validate_syn_cookie (nano::transport::map_endpoint_to_tcp (endpoint_a), node_id, message.response->second) && node_id != node_l->node_id.pub && !node_l->network.tcp_channels.find_node_id (node_id)) + { + channel_a->set_node_id (node_id); + channel_a->set_last_packet_received (std::chrono::steady_clock::now ()); + boost::optional> response (std::make_pair (node_l->node_id.pub, nano::sign_message (node_l->node_id.prv, node_l->node_id.pub, *message.query))); + nano::node_id_handshake response_message (boost::none, response); + auto bytes = response_message.to_bytes (); + if (node_l->config.logging.network_node_id_handshake_logging ()) + { + node_l->logger.try_log (boost::str (boost::format ("Node ID handshake response sent with node ID %1% to %2%: query %3%") % node_l->node_id.pub.to_account () % endpoint_a % (*message.query).to_string ())); + } + channel_a->send_buffer (bytes, nano::stat::detail::node_id_handshake, [node_w, channel_a, endpoint_a, callback_a](boost::system::error_code const & ec, size_t size_a) { + if (auto node_l = node_w.lock ()) + { + if (!ec && channel_a) + { + // Insert new node ID connection + channel_a->set_last_packet_sent (std::chrono::steady_clock::now ()); + node_l->network.tcp_channels.insert (channel_a); + if (callback_a) + { + callback_a (channel_a); + } + } + else + { + if (node_l->config.logging.network_node_id_handshake_logging ()) + { + node_l->logger.try_log (boost::str (boost::format ("Error sending node_id_handshake to %1%: %2%") % endpoint_a % ec.message ())); + } + node_l->network.tcp_channels.udp_fallback (endpoint_a, callback_a); + } + } + }); + } + // If node ID is known, don't establish new connection + } + else + { + node_l->network.tcp_channels.udp_fallback (endpoint_a, callback_a); + } + } + else + { + node_l->network.tcp_channels.udp_fallback (endpoint_a, callback_a); + } + } + else + { + if (node_l->config.logging.network_node_id_handshake_logging ()) + { + node_l->logger.try_log (boost::str (boost::format ("Error reading node_id_handshake from %1%: %2%") % endpoint_a % ec.message ())); + } + node_l->network.tcp_channels.udp_fallback (endpoint_a, callback_a); + } + } + }); +} + +void nano::transport::tcp_channels::udp_fallback (nano::endpoint const & endpoint_a, std::function)> const & callback_a) +{ + if (callback_a) + { + auto channel_udp (node.network.udp_channels.create (endpoint_a)); + callback_a (channel_udp); + } +} diff --git a/nano/node/transport/tcp.hpp b/nano/node/transport/tcp.hpp index 3a502adc83..24076409ca 100644 --- a/nano/node/transport/tcp.hpp +++ b/nano/node/transport/tcp.hpp @@ -8,8 +8,11 @@ namespace nano { namespace transport { + class tcp_channels; class channel_tcp : public nano::transport::channel { + friend class nano::transport::tcp_channels; + public: channel_tcp (nano::node &, std::shared_ptr); ~channel_tcp (); @@ -17,6 +20,7 @@ namespace transport bool operator== (nano::transport::channel const &) const override; void send_buffer (std::shared_ptr>, nano::stat::detail, std::function const & = nullptr) override; std::function callback (std::shared_ptr>, nano::stat::detail, std::function const & = nullptr) const override; + std::function tcp_callback (std::shared_ptr>, nano::stat::detail, nano::tcp_endpoint const &, std::function const & = nullptr) const; std::string to_string () const override; bool operator== (nano::transport::channel_tcp const & other_a) const { @@ -55,5 +59,136 @@ namespace transport return nano::transport::transport_type::tcp; } }; + class tcp_channels final + { + friend class nano::transport::channel_tcp; + + public: + tcp_channels (nano::node &); + bool insert (std::shared_ptr); + void erase (nano::tcp_endpoint const &); + size_t size () const; + std::shared_ptr find_channel (nano::tcp_endpoint const &) const; + void random_fill (std::array &) const; + std::unordered_set> random_set (size_t) const; + bool store_all (bool = true); + std::shared_ptr find_node_id (nano::account const &); + // Get the next peer for attempting a tcp connection + nano::tcp_endpoint bootstrap_peer (); + void receive (); + void start (); + void stop (); + void process_message (nano::message const &, nano::tcp_endpoint const &, nano::account const &); + void process_keepalive (nano::keepalive const &, nano::tcp_endpoint const &, bool); + bool max_ip_connections (nano::tcp_endpoint const &); + // Should we reach out to this endpoint with a keepalive message + bool reachout (nano::endpoint const &); + std::unique_ptr collect_seq_con_info (std::string const &); + void purge (std::chrono::steady_clock::time_point const &); + void purge_syn_cookies (std::chrono::steady_clock::time_point const &); + // Returns boost::none if the IP is rate capped on syn cookie requests, + // or if the endpoint already has a syn cookie query + boost::optional assign_syn_cookie (nano::tcp_endpoint const &); + // Returns false if valid, true if invalid (true on error convention) + // Also removes the syn cookie from the store if valid + bool validate_syn_cookie (nano::tcp_endpoint const &, nano::account const &, nano::signature const &); + void ongoing_keepalive (); + void list (std::deque> &); + void modify (std::shared_ptr, std::function)>); + void update (nano::tcp_endpoint const &); + // Connection start + void start_tcp (nano::endpoint const &, std::function)> const & = nullptr); + void start_tcp_receive_node_id (std::shared_ptr, nano::endpoint const &, std::shared_ptr>, std::function)> const &); + void udp_fallback (nano::endpoint const &, std::function)> const &); + nano::node & node; + + private: + void ongoing_syn_cookie_cleanup (); + class endpoint_tag + { + }; + class ip_address_tag + { + }; + class random_access_tag + { + }; + class last_packet_sent_tag + { + }; + class last_bootstrap_attempt_tag + { + }; + class node_id_tag + { + }; + class channel_tcp_wrapper final + { + public: + std::shared_ptr channel; + nano::tcp_endpoint endpoint () const + { + return channel->get_tcp_endpoint (); + } + std::chrono::steady_clock::time_point last_packet_sent () const + { + return channel->get_last_packet_sent (); + } + std::chrono::steady_clock::time_point last_bootstrap_attempt () const + { + return channel->get_last_bootstrap_attempt (); + } + boost::asio::ip::address ip_address () const + { + return endpoint ().address (); + } + nano::account node_id () const + { + auto node_id_l (channel->get_node_id ()); + if (node_id_l.is_initialized ()) + { + return node_id_l.get (); + } + else + { + assert (false); + return 0; + } + } + }; + class tcp_endpoint_attempt final + { + public: + nano::tcp_endpoint endpoint; + std::chrono::steady_clock::time_point last_attempt; + }; + class syn_cookie_info final + { + public: + nano::uint256_union cookie; + std::chrono::steady_clock::time_point created_at; + }; + mutable std::mutex mutex; + mutable std::mutex syn_cookie_mutex; + boost::multi_index_container< + channel_tcp_wrapper, + boost::multi_index::indexed_by< + boost::multi_index::random_access>, + boost::multi_index::ordered_non_unique, boost::multi_index::const_mem_fun>, + boost::multi_index::hashed_unique, boost::multi_index::const_mem_fun>, + boost::multi_index::hashed_non_unique, boost::multi_index::const_mem_fun>, + boost::multi_index::ordered_non_unique, boost::multi_index::const_mem_fun>, + boost::multi_index::ordered_non_unique, boost::multi_index::const_mem_fun>>> + channels; + boost::multi_index_container< + tcp_endpoint_attempt, + boost::multi_index::indexed_by< + boost::multi_index::hashed_unique>, + boost::multi_index::ordered_non_unique>>> + attempts; + std::unordered_map syn_cookies; + std::unordered_map syn_cookies_per_ip; + std::atomic stopped{ false }; + }; } // namespace transport } // namespace nano diff --git a/nano/node/transport/transport.cpp b/nano/node/transport/transport.cpp index 09930cbb59..38c6703e40 100644 --- a/nano/node/transport/transport.cpp +++ b/nano/node/transport/transport.cpp @@ -123,7 +123,11 @@ bool nano::transport::reserved_address (nano::endpoint const & endpoint_a, bool static auto const rfc4193_max (boost::asio::ip::address_v6::from_string ("fd00:ffff:ffff:ffff:ffff:ffff:ffff:ffff")); static auto const ipv6_multicast_min (boost::asio::ip::address_v6::from_string ("ff00::")); static auto const ipv6_multicast_max (boost::asio::ip::address_v6::from_string ("ff00:ffff:ffff:ffff:ffff:ffff:ffff:ffff")); - if (bytes >= rfc1700_min && bytes <= rfc1700_max) + if (endpoint_a.port () == 0) + { + result = true; + } + else if (bytes >= rfc1700_min && bytes <= rfc1700_max) { result = true; } diff --git a/nano/node/transport/transport.hpp b/nano/node/transport/transport.hpp index 496f92bf36..bae4a29828 100644 --- a/nano/node/transport/transport.hpp +++ b/nano/node/transport/transport.hpp @@ -15,6 +15,9 @@ namespace transport nano::tcp_endpoint map_endpoint_to_tcp (nano::endpoint const &); // Unassigned, reserved, self bool reserved_address (nano::endpoint const &, bool = false); + // Maximum number of peers per IP + static size_t constexpr max_peers_per_ip = 10; + static std::chrono::seconds constexpr syn_cookie_cutoff = std::chrono::seconds (5); enum class transport_type : uint8_t { undefined = 0, diff --git a/nano/node/transport/udp.cpp b/nano/node/transport/udp.cpp index ecd9a4d49f..69b32c96ae 100644 --- a/nano/node/transport/udp.cpp +++ b/nano/node/transport/udp.cpp @@ -2,8 +2,6 @@ #include #include -std::chrono::seconds constexpr nano::transport::udp_channels::syn_cookie_cutoff; - nano::transport::channel_udp::channel_udp (nano::transport::udp_channels & channels_a, nano::endpoint const & endpoint_a, unsigned network_version_a) : channel (channels_a.node), endpoint (endpoint_a), @@ -129,17 +127,17 @@ std::shared_ptr nano::transport::udp_channels::cha { std::lock_guard lock (mutex); std::shared_ptr result; - auto existing (channels.get ().find (endpoint_a)); - if (existing != channels.get ().end ()) + auto existing (channels.get ().find (endpoint_a)); + if (existing != channels.get ().end ()) { result = existing->channel; } return result; } -std::unordered_set> nano::transport::udp_channels::random_set (size_t count_a) const +std::unordered_set> nano::transport::udp_channels::random_set (size_t count_a) const { - std::unordered_set> result; + std::unordered_set> result; result.reserve (count_a); std::lock_guard lock (mutex); // Stop trying to fill result with random samples after this many attempts @@ -168,13 +166,13 @@ void nano::transport::udp_channels::random_fill (std::array & auto j (target_a.begin ()); for (auto i (peers.begin ()), n (peers.end ()); i != n; ++i, ++j) { - assert ((*i)->endpoint.address ().is_v6 ()); + assert ((*i)->get_endpoint ().address ().is_v6 ()); assert (j < target_a.end ()); - *j = (*i)->endpoint; + *j = (*i)->get_endpoint (); } } -void nano::transport::udp_channels::store_all (nano::node & node_a) +bool nano::transport::udp_channels::store_all (bool clear_peers) { // We can't hold the mutex while starting a write transaction, so // we collect endpoints to be saved and then relase the lock. @@ -185,17 +183,35 @@ void nano::transport::udp_channels::store_all (nano::node & node_a) std::transform (channels.begin (), channels.end (), std::back_inserter (endpoints), [](const auto & channel) { return channel.endpoint (); }); } + bool result (false); if (!endpoints.empty ()) { // Clear all peers then refresh with the current list of peers - auto transaction (node_a.store.tx_begin_write ()); - node_a.store.peer_clear (transaction); + auto transaction (node.store.tx_begin_write ()); + if (clear_peers) + { + node.store.peer_clear (transaction); + } for (auto endpoint : endpoints) { nano::endpoint_key endpoint_key (endpoint.address ().to_v6 ().to_bytes (), endpoint.port ()); - node_a.store.peer_put (transaction, std::move (endpoint_key)); + node.store.peer_put (transaction, std::move (endpoint_key)); } + result = true; + } + return result; +} + +std::shared_ptr nano::transport::udp_channels::find_node_id (nano::account const & node_id_a) +{ + std::shared_ptr result; + std::lock_guard lock (mutex); + auto existing (channels.get ().find (node_id_a)); + if (existing != channels.get ().end ()) + { + result = existing->channel; } + return result; } void nano::transport::udp_channels::clean_node_id (nano::endpoint const & endpoint_a, nano::account const & node_id_a) @@ -213,15 +229,15 @@ void nano::transport::udp_channels::clean_node_id (nano::endpoint const & endpoi } } -nano::endpoint nano::transport::udp_channels::tcp_peer () +nano::tcp_endpoint nano::transport::udp_channels::bootstrap_peer () { - nano::endpoint result (boost::asio::ip::address_v6::any (), 0); + nano::tcp_endpoint result (boost::asio::ip::address_v6::any (), 0); std::lock_guard lock (mutex); for (auto i (channels.get ().begin ()), n (channels.get ().end ()); i != n;) { if (i->channel->get_network_version () >= protocol_version_reasonable_min) { - result = i->endpoint (); + result = nano::transport::map_endpoint_to_tcp (i->endpoint ()); channels.get ().modify (i, [](channel_udp_wrapper & wrapper_a) { wrapper_a.channel->set_last_bootstrap_attempt (std::chrono::steady_clock::now ()); }); @@ -337,26 +353,25 @@ class udp_message_visitor : public nano::message_visitor if (cookie) { // New connection - auto channel (node.network.udp_channels.channel (endpoint)); - if (channel) + auto find_channel (node.network.udp_channels.channel (endpoint)); + if (find_channel) { - node.network.send_node_id_handshake (channel, *cookie, boost::none); - node.network.send_keepalive_self (channel); + node.network.send_node_id_handshake (find_channel, *cookie, boost::none); + node.network.send_keepalive_self (find_channel); } - else + else if (!node.network.tcp_channels.find_channel (nano::transport::map_endpoint_to_tcp (endpoint))) { - channel = std::make_shared (node.network.udp_channels, endpoint); - node.network.send_node_id_handshake (channel, *cookie, boost::none); + // Don't start connection if TCP channel to same IP:port exists + find_channel = std::make_shared (node.network.udp_channels, endpoint); + node.network.send_node_id_handshake (find_channel, *cookie, boost::none); } } // Check for special node port data - for (auto & peer : message_a.peers) + auto peer0 (message_a.peers[0]); + if (peer0.address () == boost::asio::ip::address_v6{} && peer0.port () != 0) { - if (peer.address () == boost::asio::ip::address_v6{} && peer.port () != 0) - { - nano::endpoint new_endpoint (endpoint.address (), peer.port ()); - node.network.merge_peer (new_endpoint); - } + nano::endpoint new_endpoint (endpoint.address (), peer0.port ()); + node.network.merge_peer (new_endpoint); } } message (message_a); @@ -407,7 +422,7 @@ class udp_message_visitor : public nano::message_visitor if (!node.network.udp_channels.validate_syn_cookie (endpoint, message_a.response->first, message_a.response->second)) { validated_response = true; - if (message_a.response->first != node.node_id.pub) + if (message_a.response->first != node.node_id.pub && !node.network.tcp_channels.find_node_id (message_a.response->first)) { node.network.udp_channels.clean_node_id (endpoint, message_a.response->first); auto new_channel (node.network.udp_channels.insert (endpoint, message_a.header.version_using)); @@ -431,24 +446,24 @@ class udp_message_visitor : public nano::message_visitor } if (out_query || out_respond_to) { - auto channel (node.network.find_channel (endpoint)); - if (!channel) + auto find_channel (node.network.udp_channels.channel (endpoint)); + if (!find_channel) { - channel = std::make_shared (node.network.udp_channels, endpoint); + find_channel = std::make_shared (node.network.udp_channels, endpoint); } - node.network.send_node_id_handshake (channel, out_query, out_respond_to); + node.network.send_node_id_handshake (find_channel, out_query, out_respond_to); } message (message_a); } void message (nano::message const & message_a) { - auto channel (node.network.udp_channels.channel (endpoint)); - if (channel) + auto find_channel (node.network.udp_channels.channel (endpoint)); + if (find_channel) { - node.network.udp_channels.modify (channel, [](std::shared_ptr channel_a) { + node.network.udp_channels.modify (find_channel, [](std::shared_ptr channel_a) { channel_a->set_last_packet_received (std::chrono::steady_clock::now ()); }); - node.process_message (message_a, channel); + node.process_message (message_a, find_channel); } } nano::node & node; @@ -463,6 +478,10 @@ void nano::transport::udp_channels::receive_action (nano::message_buffer * data_ { allowed_sender = false; } + else if (data_a->endpoint.address ().to_v6 ().is_unspecified ()) + { + allowed_sender = false; + } else if (nano::transport::reserved_address (data_a->endpoint, node.config.allow_local_peers)) { allowed_sender = false; @@ -555,7 +574,7 @@ std::shared_ptr nano::transport::udp_channels::create bool nano::transport::udp_channels::max_ip_connections (nano::endpoint const & endpoint_a) { std::unique_lock lock (mutex); - bool result (channels.get ().count (endpoint_a.address ()) >= max_peers_per_ip); + bool result (channels.get ().count (endpoint_a.address ()) >= nano::transport::max_peers_per_ip); return result; } @@ -616,7 +635,7 @@ boost::optional nano::transport::udp_channels::assign_syn_c std::lock_guard lock (mutex); unsigned & ip_cookies = syn_cookies_per_ip[ip_addr]; boost::optional result; - if (ip_cookies < nano::transport::udp_channels::max_peers_per_ip) + if (ip_cookies < nano::transport::max_peers_per_ip) { if (syn_cookies.find (endpoint) == syn_cookies.end ()) { @@ -684,9 +703,9 @@ void nano::transport::udp_channels::purge_syn_cookies (std::chrono::steady_clock void nano::transport::udp_channels::ongoing_syn_cookie_cleanup () { - purge_syn_cookies (std::chrono::steady_clock::now () - syn_cookie_cutoff); + purge_syn_cookies (std::chrono::steady_clock::now () - nano::transport::syn_cookie_cutoff); std::weak_ptr node_w (node.shared ()); - node.alarm.add (std::chrono::steady_clock::now () + (syn_cookie_cutoff * 2), [node_w]() { + node.alarm.add (std::chrono::steady_clock::now () + (nano::transport::syn_cookie_cutoff * 2), [node_w]() { if (auto node_l = node_w.lock ()) { node_l->network.udp_channels.ongoing_syn_cookie_cleanup (); @@ -697,15 +716,16 @@ void nano::transport::udp_channels::ongoing_syn_cookie_cleanup () void nano::transport::udp_channels::ongoing_keepalive () { nano::keepalive message; - random_fill (message.peers); + node.network.random_fill (message.peers); std::unique_lock lock (mutex); - auto keepalive_cutoff (channels.get ().lower_bound (std::chrono::steady_clock::now () - network_params.node.period)); + auto keepalive_cutoff (channels.get ().lower_bound (std::chrono::steady_clock::now () - node.network_params.node.period)); for (auto i (channels.get ().begin ()); i != keepalive_cutoff; ++i) { + i->channel->set_last_packet_sent (std::chrono::steady_clock::now ()); i->channel->send (message); } std::weak_ptr node_w (node.shared ()); - node.alarm.add (std::chrono::steady_clock::now () + network_params.node.period, [node_w]() { + node.alarm.add (std::chrono::steady_clock::now () + node.network_params.node.period, [node_w]() { if (auto node_l = node_w.lock ()) { node_l->network.udp_channels.ongoing_keepalive (); @@ -713,29 +733,13 @@ void nano::transport::udp_channels::ongoing_keepalive () }); } -std::deque> nano::transport::udp_channels::list (size_t count_a) +void nano::transport::udp_channels::list (std::deque> & deque_a) { - std::deque> result; - { - std::lock_guard lock (mutex); - for (auto i (channels.begin ()), j (channels.end ()); i != j; ++i) - { - result.push_back (i->channel); - } - } - random_pool::shuffle (result.begin (), result.end ()); - if (result.size () > count_a) + std::lock_guard lock (mutex); + for (auto i (channels.begin ()), j (channels.end ()); i != j; ++i) { - result.resize (count_a, nullptr); + deque_a.push_back (i->channel); } - return result; -} - -// Simulating with sqrt_broadcast_simulate shows we only need to broadcast to sqrt(total_peers) random peers in order to successfully publish to everyone with high probability -std::deque> nano::transport::udp_channels::list_fanout () -{ - auto result (list (node.network.size_sqrt ())); - return result; } void nano::transport::udp_channels::modify (std::shared_ptr channel_a, std::function)> modify_callback_a) diff --git a/nano/node/transport/udp.hpp b/nano/node/transport/udp.hpp index d4b95bbfe8..31e0d1c30c 100644 --- a/nano/node/transport/udp.hpp +++ b/nano/node/transport/udp.hpp @@ -59,11 +59,12 @@ namespace transport size_t size () const; std::shared_ptr channel (nano::endpoint const &) const; void random_fill (std::array &) const; - std::unordered_set> random_set (size_t) const; - void store_all (nano::node &); + std::unordered_set> random_set (size_t) const; + bool store_all (bool = true); + std::shared_ptr find_node_id (nano::account const &); void clean_node_id (nano::endpoint const &, nano::account const &); - // Get the next peer for attempting a tcp connection - nano::endpoint tcp_peer (); + // Get the next peer for attempting a tcp bootstrap connection + nano::tcp_endpoint bootstrap_peer (); void receive (); void start (); void stop (); @@ -85,13 +86,9 @@ namespace transport // Also removes the syn cookie from the store if valid bool validate_syn_cookie (nano::endpoint const &, nano::account const &, nano::signature const &); void ongoing_keepalive (); - std::deque> list (size_t); - // A list of random peers sized for the configured rebroadcast fanout - std::deque> list_fanout (); + void list (std::deque> &); void modify (std::shared_ptr, std::function)>); - // Maximum number of peers per IP - static size_t constexpr max_peers_per_ip = 10; - static std::chrono::seconds constexpr syn_cookie_cutoff = std::chrono::seconds (5); + nano::node & node; private: void close_socket (); @@ -178,11 +175,9 @@ namespace transport attempts; std::unordered_map syn_cookies; std::unordered_map syn_cookies_per_ip; - nano::node & node; boost::asio::strand strand; boost::asio::ip::udp::socket socket; nano::endpoint local_endpoint; - nano::network_params network_params; std::atomic stopped{ false }; }; } // namespace transport diff --git a/nano/qt/qt.cpp b/nano/qt/qt.cpp index 1fb509a3fd..65753bae7d 100644 --- a/nano/qt/qt.cpp +++ b/nano/qt/qt.cpp @@ -1928,8 +1928,10 @@ wallet (wallet_a) void nano_qt::advanced_actions::refresh_peers () { peers_model->removeRows (0, peers_model->rowCount ()); - auto list (wallet.node.network.udp_channels.list (std::numeric_limits::max ())); - std::sort (list.begin (), list.end ()); + auto list (wallet.node.network.list (std::numeric_limits::max ())); + std::sort (list.begin (), list.end (), [](const auto & lhs, const auto & rhs) { + return lhs->get_endpoint () < rhs->get_endpoint (); + }); for (auto i (list.begin ()), n (list.end ()); i != n; ++i) { std::stringstream endpoint; diff --git a/nano/rpc_test/rpc.cpp b/nano/rpc_test/rpc.cpp index d4e6d6c6b7..44df1a3951 100644 --- a/nano/rpc_test/rpc.cpp +++ b/nano/rpc_test/rpc.cpp @@ -1838,7 +1838,7 @@ TEST (rpc, keepalive) } ASSERT_EQ (200, response.status); system.deadline_set (10s); - while (system.nodes[0]->network.udp_channels.channel (node1->network.endpoint ()) == nullptr) + while (system.nodes[0]->network.find_channel (node1->network.endpoint ()) == nullptr) { ASSERT_EQ (0, system.nodes[0]->network.size ()); ASSERT_NO_ERROR (system.poll ()); diff --git a/nano/slow_test/node.cpp b/nano/slow_test/node.cpp index c711828f2b..69961f9dd0 100644 --- a/nano/slow_test/node.cpp +++ b/nano/slow_test/node.cpp @@ -369,7 +369,7 @@ TEST (peer_container, random_set) auto current (std::chrono::steady_clock::now ()); for (auto i (0); i < 10000; ++i) { - auto list (system.nodes[0]->network.udp_channels.random_set (15)); + auto list (system.nodes[0]->network.random_set (15)); } auto end (std::chrono::steady_clock::now ()); (void)end;