diff --git a/nano/core_test/bootstrap_server.cpp b/nano/core_test/bootstrap_server.cpp index 4f37d25e43..659ecd46ba 100644 --- a/nano/core_test/bootstrap_server.cpp +++ b/nano/core_test/bootstrap_server.cpp @@ -92,7 +92,7 @@ TEST (bootstrap_server, serve_account_blocks) request.payload = request_payload; request.update_header (); - node.network.inbound (request, nano::test::fake_channel (node)); + node.inbound (request, nano::test::fake_channel (node)); ASSERT_TIMELY_EQ (5s, responses.size (), 1); @@ -137,7 +137,7 @@ TEST (bootstrap_server, serve_hash) request.payload = request_payload; request.update_header (); - node.network.inbound (request, nano::test::fake_channel (node)); + node.inbound (request, nano::test::fake_channel (node)); ASSERT_TIMELY_EQ (5s, responses.size (), 1); @@ -182,7 +182,7 @@ TEST (bootstrap_server, serve_hash_one) request.payload = request_payload; request.update_header (); - node.network.inbound (request, nano::test::fake_channel (node)); + node.inbound (request, nano::test::fake_channel (node)); ASSERT_TIMELY_EQ (5s, responses.size (), 1); @@ -221,7 +221,7 @@ TEST (bootstrap_server, serve_end_of_chain) request.payload = request_payload; request.update_header (); - node.network.inbound (request, nano::test::fake_channel (node)); + node.inbound (request, nano::test::fake_channel (node)); ASSERT_TIMELY_EQ (5s, responses.size (), 1); @@ -260,7 +260,7 @@ TEST (bootstrap_server, serve_missing) request.payload = request_payload; request.update_header (); - node.network.inbound (request, nano::test::fake_channel (node)); + node.inbound (request, nano::test::fake_channel (node)); ASSERT_TIMELY_EQ (5s, responses.size (), 1); @@ -303,7 +303,7 @@ TEST (bootstrap_server, serve_multiple) request.payload = request_payload; request.update_header (); - node.network.inbound (request, nano::test::fake_channel (node)); + node.inbound (request, nano::test::fake_channel (node)); } } @@ -359,7 +359,7 @@ TEST (bootstrap_server, serve_account_info) request.payload = request_payload; request.update_header (); - node.network.inbound (request, nano::test::fake_channel (node)); + node.inbound (request, nano::test::fake_channel (node)); ASSERT_TIMELY_EQ (5s, responses.size (), 1); @@ -405,7 +405,7 @@ TEST (bootstrap_server, serve_account_info_missing) request.payload = request_payload; request.update_header (); - node.network.inbound (request, nano::test::fake_channel (node)); + node.inbound (request, nano::test::fake_channel (node)); ASSERT_TIMELY_EQ (5s, responses.size (), 1); @@ -450,7 +450,7 @@ TEST (bootstrap_server, serve_frontiers) request.payload = request_payload; request.update_header (); - node.network.inbound (request, nano::test::fake_channel (node)); + node.inbound (request, nano::test::fake_channel (node)); ASSERT_TIMELY_EQ (5s, responses.size (), 1); @@ -503,7 +503,7 @@ TEST (bootstrap_server, serve_frontiers_invalid_count) request.payload = request_payload; request.update_header (); - node.network.inbound (request, nano::test::fake_channel (node)); + node.inbound (request, nano::test::fake_channel (node)); } ASSERT_TIMELY_EQ (5s, node.stats.count (nano::stat::type::bootstrap_server, nano::stat::detail::invalid), 1); @@ -521,7 +521,7 @@ TEST (bootstrap_server, serve_frontiers_invalid_count) request.payload = request_payload; request.update_header (); - node.network.inbound (request, nano::test::fake_channel (node)); + node.inbound (request, nano::test::fake_channel (node)); } ASSERT_TIMELY_EQ (5s, node.stats.count (nano::stat::type::bootstrap_server, nano::stat::detail::invalid), 2); @@ -539,7 +539,7 @@ TEST (bootstrap_server, serve_frontiers_invalid_count) request.payload = request_payload; request.update_header (); - node.network.inbound (request, nano::test::fake_channel (node)); + node.inbound (request, nano::test::fake_channel (node)); } ASSERT_TIMELY_EQ (5s, node.stats.count (nano::stat::type::bootstrap_server, nano::stat::detail::invalid), 3); diff --git a/nano/core_test/network.cpp b/nano/core_test/network.cpp index eb4713f946..c06b28bfd2 100644 --- a/nano/core_test/network.cpp +++ b/nano/core_test/network.cpp @@ -377,7 +377,7 @@ TEST (receivable_processor, confirm_insufficient_pos) nano::confirm_ack con1{ nano::dev::network_params.network, vote }; auto channel1 = std::make_shared (node1, node1); ASSERT_EQ (1, election->votes ().size ()); - node1.network.inbound (con1, channel1); + node1.inbound (con1, channel1); ASSERT_TIMELY_EQ (5s, 2, election->votes ().size ()) ASSERT_FALSE (election->confirmed ()); } @@ -402,7 +402,7 @@ TEST (receivable_processor, confirm_sufficient_pos) nano::confirm_ack con1{ nano::dev::network_params.network, vote }; auto channel1 = std::make_shared (node1, node1); ASSERT_EQ (1, election->votes ().size ()); - node1.network.inbound (con1, channel1); + node1.inbound (con1, channel1); ASSERT_TIMELY_EQ (5s, 2, election->votes ().size ()) ASSERT_TRUE (election->confirmed ()); } @@ -743,10 +743,10 @@ TEST (network, duplicate_revert_publish) auto channel = nano::test::establish_tcp (system, *other_node, node.network.endpoint ()); ASSERT_NE (nullptr, channel); ASSERT_EQ (0, publish.digest); - node.network.inbound (publish, channel); + node.inbound (publish, nano::test::fake_channel (node)); ASSERT_TRUE (node.network.filter.apply (bytes.data (), bytes.size ())); publish.digest = digest; - node.network.inbound (publish, channel); + node.inbound (publish, nano::test::fake_channel (node)); ASSERT_FALSE (node.network.filter.apply (bytes.data (), bytes.size ())); } diff --git a/nano/core_test/node.cpp b/nano/core_test/node.cpp index 54d1ef29f6..86cb7d2581 100644 --- a/nano/core_test/node.cpp +++ b/nano/core_test/node.cpp @@ -688,16 +688,15 @@ TEST (node, fork_flip) .work (*system.work.generate (nano::dev::genesis->hash ())) .build (); nano::publish publish2{ nano::dev::network_params.network, send2 }; - auto ignored_channel = nano::test::fake_channel (node1); - node1.network.inbound (publish1, ignored_channel); - node2.network.inbound (publish2, ignored_channel); + node1.inbound (publish1, nano::test::fake_channel (node1)); + node2.inbound (publish2, nano::test::fake_channel (node2)); ASSERT_TIMELY_EQ (5s, 1, node1.active.size ()); ASSERT_TIMELY_EQ (5s, 1, node2.active.size ()); system.wallet (0)->insert_adhoc (nano::dev::genesis_key.prv); // Fill nodes with forked blocks - node1.network.inbound (publish2, ignored_channel); + node1.inbound (publish2, nano::test::fake_channel (node1)); ASSERT_TIMELY (5s, node1.active.active (*send2)); - node2.network.inbound (publish1, ignored_channel); + node2.inbound (publish1, nano::test::fake_channel (node2)); ASSERT_TIMELY (5s, node2.active.active (*send1)); auto election1 (node2.active.election (nano::qualified_root (nano::dev::genesis->hash (), nano::dev::genesis->hash ()))); ASSERT_NE (nullptr, election1); @@ -829,7 +828,7 @@ TEST (node, fork_open) .build (); nano::publish publish1{ nano::dev::network_params.network, send1 }; auto channel1 = std::make_shared (node); - node.network.inbound (publish1, channel1); + node.inbound (publish1, channel1); ASSERT_TIMELY (5s, (election = node.active.election (publish1.block->qualified_root ())) != nullptr); election->force_confirm (); ASSERT_TIMELY (5s, node.active.empty () && node.block_confirmed (publish1.block->hash ())); @@ -848,7 +847,7 @@ TEST (node, fork_open) .work (*system.work.generate (key1.pub)) .build (); nano::publish publish2{ nano::dev::network_params.network, open1 }; - node.network.inbound (publish2, channel1); + node.inbound (publish2, channel1); ASSERT_TIMELY_EQ (5s, 1, node.active.size ()); // create 2nd open block, which is a fork of open1 block @@ -860,7 +859,7 @@ TEST (node, fork_open) .work (*system.work.generate (key1.pub)) .build (); nano::publish publish3{ nano::dev::network_params.network, open2 }; - node.network.inbound (publish3, channel1); + node.inbound (publish3, channel1); ASSERT_TIMELY (5s, (election = node.active.election (publish3.block->qualified_root ())) != nullptr); // we expect to find 2 blocks in the election and we expect the first block to be the winner just because it was first @@ -1856,14 +1855,14 @@ TEST (node, DISABLED_local_votes_cache) nano::confirm_req message1{ nano::dev::network_params.network, send1->hash (), send1->root () }; nano::confirm_req message2{ nano::dev::network_params.network, send2->hash (), send2->root () }; auto channel = std::make_shared (node); - node.network.inbound (message1, channel); + node.inbound (message1, channel); ASSERT_TIMELY_EQ (3s, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes), 1); - node.network.inbound (message2, channel); + node.inbound (message2, channel); ASSERT_TIMELY_EQ (3s, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes), 2); for (auto i (0); i < 100; ++i) { - node.network.inbound (message1, channel); - node.network.inbound (message2, channel); + node.inbound (message1, channel); + node.inbound (message2, channel); } // Make sure a new vote was not generated ASSERT_TIMELY_EQ (3s, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes), 2); @@ -1873,7 +1872,7 @@ TEST (node, DISABLED_local_votes_cache) ASSERT_EQ (nano::block_status::progress, node.ledger.process (transaction, send3)); } nano::confirm_req message3{ nano::dev::network_params.network, send3->hash (), send3->root () }; - node.network.inbound (message3, channel); + node.inbound (message3, channel); ASSERT_TIMELY_EQ (3s, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes), 3); ASSERT_TIMELY (3s, !node.history.votes (send1->root (), send1->hash ()).empty ()); ASSERT_TIMELY (3s, !node.history.votes (send2->root (), send2->hash ()).empty ()); @@ -1881,7 +1880,7 @@ TEST (node, DISABLED_local_votes_cache) // All requests should be served from the cache for (auto i (0); i < 100; ++i) { - node.network.inbound (message3, channel); + node.inbound (message3, channel); } ASSERT_TIMELY_EQ (3s, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes), 3); } @@ -1935,26 +1934,26 @@ TEST (node, DISABLED_local_votes_cache_batch) nano::confirm_req message{ nano::dev::network_params.network, batch }; auto channel = std::make_shared (node); // Generates and sends one vote for both hashes which is then cached - node.network.inbound (message, channel); + node.inbound (message, channel); ASSERT_TIMELY_EQ (3s, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out), 1); ASSERT_EQ (1, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out)); ASSERT_FALSE (node.history.votes (send2->root (), send2->hash ()).empty ()); ASSERT_FALSE (node.history.votes (receive1->root (), receive1->hash ()).empty ()); // Only one confirm_ack should be sent if all hashes are part of the same vote - node.network.inbound (message, channel); + node.inbound (message, channel); ASSERT_TIMELY_EQ (3s, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out), 2); ASSERT_EQ (2, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out)); // Test when votes are different node.history.erase (send2->root ()); node.history.erase (receive1->root ()); - node.network.inbound (nano::confirm_req{ nano::dev::network_params.network, send2->hash (), send2->root () }, channel); + node.inbound (nano::confirm_req{ nano::dev::network_params.network, send2->hash (), send2->root () }, channel); ASSERT_TIMELY_EQ (3s, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out), 3); ASSERT_EQ (3, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out)); - node.network.inbound (nano::confirm_req{ nano::dev::network_params.network, receive1->hash (), receive1->root () }, channel); + node.inbound (nano::confirm_req{ nano::dev::network_params.network, receive1->hash (), receive1->root () }, channel); ASSERT_TIMELY_EQ (3s, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out), 4); ASSERT_EQ (4, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out)); // There are two different votes, so both should be sent in response - node.network.inbound (message, channel); + node.inbound (message, channel); ASSERT_TIMELY_EQ (3s, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out), 6); ASSERT_EQ (6, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out)); } @@ -1975,7 +1974,7 @@ TEST (node, DISABLED_local_votes_cache_generate_new_vote) // Send a confirm req for genesis block to node nano::confirm_req message1{ nano::dev::network_params.network, nano::dev::genesis->hash (), nano::dev::genesis->root () }; auto channel = std::make_shared (node); - node.network.inbound (message1, channel); + node.inbound (message1, channel); // check that the node generated a vote for the genesis block and that it is stored in the local vote cache and it is the only vote ASSERT_TIMELY (5s, !node.history.votes (nano::dev::genesis->root (), nano::dev::genesis->hash ()).empty ()); @@ -1998,7 +1997,7 @@ TEST (node, DISABLED_local_votes_cache_generate_new_vote) // One of the hashes is cached std::vector> roots_hashes{ std::make_pair (nano::dev::genesis->hash (), nano::dev::genesis->root ()), std::make_pair (send1->hash (), send1->root ()) }; nano::confirm_req message2{ nano::dev::network_params.network, roots_hashes }; - node.network.inbound (message2, channel); + node.inbound (message2, channel); ASSERT_TIMELY (3s, !node.history.votes (send1->root (), send1->hash ()).empty ()); auto votes2 (node.history.votes (send1->root (), send1->hash ())); ASSERT_EQ (1, votes2.size ()); @@ -2441,13 +2440,13 @@ TEST (node, fork_election_invalid_block_signature) .build (); auto channel1 = std::make_shared (node1); - node1.network.inbound (nano::publish{ nano::dev::network_params.network, send1 }, channel1); + node1.inbound (nano::publish{ nano::dev::network_params.network, send1 }, channel1); ASSERT_TIMELY (5s, node1.active.active (send1->qualified_root ())); auto election (node1.active.election (send1->qualified_root ())); ASSERT_NE (nullptr, election); ASSERT_EQ (1, election->blocks ().size ()); - node1.network.inbound (nano::publish{ nano::dev::network_params.network, send3 }, channel1); - node1.network.inbound (nano::publish{ nano::dev::network_params.network, send2 }, channel1); + node1.inbound (nano::publish{ nano::dev::network_params.network, send3 }, channel1); + node1.inbound (nano::publish{ nano::dev::network_params.network, send2 }, channel1); ASSERT_TIMELY (3s, election->blocks ().size () > 1); ASSERT_EQ (election->blocks ()[send2->hash ()]->block_signature (), send2->block_signature ()); } diff --git a/nano/core_test/telemetry.cpp b/nano/core_test/telemetry.cpp index 4ebe65c5c9..750d18b99b 100644 --- a/nano/core_test/telemetry.cpp +++ b/nano/core_test/telemetry.cpp @@ -253,7 +253,7 @@ TEST (telemetry, invalid_signature) telemetry.block_count = 9999; // Change data so signature is no longer valid auto message = nano::telemetry_ack{ nano::dev::network_params.network, telemetry }; - node.network.inbound (message, nano::test::fake_channel (node)); + node.inbound (message, nano::test::fake_channel (node)); ASSERT_TIMELY (5s, node.stats.count (nano::stat::type::telemetry, nano::stat::detail::invalid_signature) > 0); ASSERT_ALWAYS (1s, node.stats.count (nano::stat::type::telemetry, nano::stat::detail::process) == 0) @@ -267,7 +267,7 @@ TEST (telemetry, mismatched_node_id) auto telemetry = node.local_telemetry (); auto message = nano::telemetry_ack{ nano::dev::network_params.network, telemetry }; - node.network.inbound (message, nano::test::fake_channel (node, /* node id */ { 123 })); + node.inbound (message, nano::test::fake_channel (node, /* node id */ { 123 })); ASSERT_TIMELY (5s, node.stats.count (nano::stat::type::telemetry, nano::stat::detail::node_id_mismatch) > 0); ASSERT_ALWAYS (1s, node.stats.count (nano::stat::type::telemetry, nano::stat::detail::process) == 0) diff --git a/nano/core_test/websocket.cpp b/nano/core_test/websocket.cpp index dacfd7b10e..5305b518e1 100644 --- a/nano/core_test/websocket.cpp +++ b/nano/core_test/websocket.cpp @@ -165,7 +165,7 @@ TEST (websocket, started_election) .build (); nano::publish publish1{ nano::dev::network_params.network, send1 }; auto channel1 = std::make_shared (*node1); - node1->network.inbound (publish1, channel1); + node1->inbound (publish1, channel1); ASSERT_TIMELY (1s, node1->active.election (send1->qualified_root ())); ASSERT_TIMELY_EQ (5s, future.wait_for (0s), std::future_status::ready); @@ -213,7 +213,7 @@ TEST (websocket, stopped_election) .build (); nano::publish publish1{ nano::dev::network_params.network, send1 }; auto channel1 = std::make_shared (*node1); - node1->network.inbound (publish1, channel1); + node1->inbound (publish1, channel1); ASSERT_TIMELY (5s, node1->active.election (send1->qualified_root ())); node1->active.erase (*send1); diff --git a/nano/node/network.cpp b/nano/node/network.cpp index d780f634e8..a764f45309 100644 --- a/nano/node/network.cpp +++ b/nano/node/network.cpp @@ -314,14 +314,6 @@ void nano::network::flood_block_many (std::deque> b } } -void nano::network::inbound (const nano::message & message, const std::shared_ptr & channel) -{ - debug_assert (message.header.network == node.network_params.network.current_network); - debug_assert (message.header.version_using >= node.network_params.network.protocol_version_min); - - node.message_processor.process (message, channel); -} - // Send keepalives to all the peers we've been notified of void nano::network::merge_peers (std::array const & peers_a) { diff --git a/nano/node/network.hpp b/nano/node/network.hpp index f8aac622b0..5aa6ec8772 100644 --- a/nano/node/network.hpp +++ b/nano/node/network.hpp @@ -127,7 +127,6 @@ class network final void erase (nano::transport::channel const &); /** Disconnects and adds peer to exclusion list */ void exclude (std::shared_ptr const & channel); - void inbound (nano::message const &, std::shared_ptr const &); nano::container_info container_info () const; diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 3cda7ce369..ceb034f361 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -518,6 +518,16 @@ void nano::node::keepalive (std::string const & address_a, uint16_t port_a) }); } +void nano::node::inbound (const nano::message & message, const std::shared_ptr & channel) +{ + debug_assert (channel->owner () == shared_from_this ()); // This node should be the channel owner + + debug_assert (message.header.network == network_params.network.current_network); + debug_assert (message.header.version_using >= network_params.network.protocol_version_min); + + message_processor.process (message, channel); +} + void nano::node::process_active (std::shared_ptr const & incoming) { block_processor.add (incoming); diff --git a/nano/node/node.hpp b/nano/node/node.hpp index c411b6d9a5..af5f0e41d0 100644 --- a/nano/node/node.hpp +++ b/nano/node/node.hpp @@ -94,6 +94,7 @@ class node final : public std::enable_shared_from_this bool copy_with_compaction (std::filesystem::path const &); void keepalive (std::string const &, uint16_t); int store_version (); + void inbound (nano::message const &, std::shared_ptr const &); void process_confirmed (nano::election_status const &, uint64_t = 0); void process_active (std::shared_ptr const &); std::optional process_local (std::shared_ptr const &); diff --git a/nano/node/transport/channel.cpp b/nano/node/transport/channel.cpp index 1837044490..6c31d95b52 100644 --- a/nano/node/transport/channel.cpp +++ b/nano/node/transport/channel.cpp @@ -63,6 +63,11 @@ nano::endpoint nano::transport::channel::get_peering_endpoint () const } } +std::shared_ptr nano::transport::channel::owner () const +{ + return node.shared (); +} + void nano::transport::channel::operator() (nano::object_stream & obs) const { obs.write ("endpoint", get_endpoint ()); diff --git a/nano/node/transport/channel.hpp b/nano/node/transport/channel.hpp index e01cc8c579..184168823b 100644 --- a/nano/node/transport/channel.hpp +++ b/nano/node/transport/channel.hpp @@ -130,6 +130,8 @@ class channel nano::endpoint get_peering_endpoint () const; void set_peering_endpoint (nano::endpoint endpoint); + std::shared_ptr owner () const; + mutable nano::mutex channel_mutex; private: diff --git a/nano/node/transport/inproc.cpp b/nano/node/transport/inproc.cpp index c1cf7a5b9f..abeca40067 100644 --- a/nano/node/transport/inproc.cpp +++ b/nano/node/transport/inproc.cpp @@ -44,7 +44,7 @@ void nano::transport::inproc::channel::send_buffer (nano::shared_const_buffer co // process message { node.stats.inc (nano::stat::type::message, to_stat_detail (message_a->type ()), nano::stat::dir::in); - destination.network.inbound (*message_a, remote_channel); + destination.inbound (*message_a, remote_channel); } });