From 159c946278c6b492a3979bd5425c8ea03b9f4658 Mon Sep 17 00:00:00 2001 From: gr0vity Date: Sun, 7 Jul 2024 13:00:49 +0200 Subject: [PATCH] Squash merge PR #4654: Add `is_originator` flag to publish messages --- nano/core_test/message.cpp | 90 ++++++++++++++++++++++++++------- nano/lib/stats_enums.hpp | 1 + nano/node/blockprocessor.cpp | 2 + nano/node/blockprocessor.hpp | 15 +++--- nano/node/message_processor.cpp | 5 +- nano/node/messages.cpp | 10 +++- nano/node/messages.hpp | 11 +++- nano/node/network.cpp | 18 +++---- 8 files changed, 112 insertions(+), 40 deletions(-) diff --git a/nano/core_test/message.cpp b/nano/core_test/message.cpp index 1e9728b65f..e093aa6eec 100644 --- a/nano/core_test/message.cpp +++ b/nano/core_test/message.cpp @@ -26,6 +26,31 @@ std::shared_ptr random_block () } } +TEST (message, header_version) +{ + // Simplest message type + nano::keepalive original{ nano::dev::network_params.network }; + + // Serialize the original keepalive message + std::vector bytes; + { + nano::vectorstream stream (bytes); + original.serialize (stream); + } + + // Deserialize the byte stream back to a message header + nano::bufferstream stream (bytes.data (), bytes.size ()); + bool error = false; + nano::message_header header (error, stream); + ASSERT_FALSE (error); + + // Check header versions + ASSERT_EQ (nano::dev::network_params.network.protocol_version_min, header.version_min); + ASSERT_EQ (nano::dev::network_params.network.protocol_version, header.version_using); + ASSERT_EQ (nano::dev::network_params.network.protocol_version, header.version_max); + ASSERT_EQ (nano::message_type::keepalive, header.type); +} + TEST (message, keepalive_serialization) { nano::keepalive request1{ nano::dev::network_params.network }; @@ -62,33 +87,60 @@ TEST (message, keepalive_deserialize) ASSERT_EQ (message1.peers, message2.peers); } -TEST (message, publish_serialization) +TEST (message, publish) { + // Create a random block auto block = random_block (); - nano::publish publish{ nano::dev::network_params.network, block }; - ASSERT_EQ (nano::block_type::send, publish.header.block_type ()); + nano::publish original{ nano::dev::network_params.network, block }; + ASSERT_FALSE (original.is_originator ()); + + // Serialize the original publish message std::vector bytes; { nano::vectorstream stream (bytes); - publish.header.serialize (stream); - } - ASSERT_EQ (8, bytes.size ()); - ASSERT_EQ (0x52, bytes[0]); - ASSERT_EQ (0x41, bytes[1]); - ASSERT_EQ (nano::dev::network_params.network.protocol_version, bytes[2]); - ASSERT_EQ (nano::dev::network_params.network.protocol_version, bytes[3]); - ASSERT_EQ (nano::dev::network_params.network.protocol_version_min, bytes[4]); - ASSERT_EQ (static_cast (nano::message_type::publish), bytes[5]); - ASSERT_EQ (0x00, bytes[6]); // extensions - ASSERT_EQ (static_cast (nano::block_type::send), bytes[7]); + original.serialize (stream); + } + + // Deserialize the byte stream back to a publish message nano::bufferstream stream (bytes.data (), bytes.size ()); - auto error (false); + bool error = false; nano::message_header header (error, stream); ASSERT_FALSE (error); - ASSERT_EQ (nano::dev::network_params.network.protocol_version_min, header.version_min); - ASSERT_EQ (nano::dev::network_params.network.protocol_version, header.version_using); - ASSERT_EQ (nano::dev::network_params.network.protocol_version, header.version_max); - ASSERT_EQ (nano::message_type::publish, header.type); + nano::publish deserialized (error, stream, header); + ASSERT_FALSE (error); + + // Assert that the original and deserialized messages are equal + ASSERT_EQ (original, deserialized); + ASSERT_EQ (*original.block, *deserialized.block); + ASSERT_EQ (original.is_originator (), deserialized.is_originator ()); +} + +TEST (message, publish_originator_flag) +{ + // Create a random block + auto block = random_block (); + nano::publish original{ nano::dev::network_params.network, block, /* originator */ true }; + ASSERT_TRUE (original.is_originator ()); + + // Serialize the original publish message + std::vector bytes; + { + nano::vectorstream stream (bytes); + original.serialize (stream); + } + + // Deserialize the byte stream back to a publish message + nano::bufferstream stream (bytes.data (), bytes.size ()); + bool error = false; + nano::message_header header (error, stream); + ASSERT_FALSE (error); + nano::publish deserialized (error, stream, header); + ASSERT_FALSE (error); + + // Assert that the originator flag is set correctly in both the original and deserialized messages + ASSERT_TRUE (deserialized.is_originator ()); + ASSERT_EQ (original, deserialized); + ASSERT_EQ (*original.block, *deserialized.block); } TEST (message, confirm_header_flags) diff --git a/nano/lib/stats_enums.hpp b/nano/lib/stats_enums.hpp index eb14f2ceaf..1f6b9e74c1 100644 --- a/nano/lib/stats_enums.hpp +++ b/nano/lib/stats_enums.hpp @@ -171,6 +171,7 @@ enum class detail // block source live, + live_originator, bootstrap, bootstrap_legacy, unchecked, diff --git a/nano/node/blockprocessor.cpp b/nano/node/blockprocessor.cpp index 4c36cd60d0..7e2cdf027a 100644 --- a/nano/node/blockprocessor.cpp +++ b/nano/node/blockprocessor.cpp @@ -54,6 +54,7 @@ nano::block_processor::block_processor (nano::node & node_a) : switch (origin.source) { case nano::block_source::live: + case nano::block_source::live_originator: return config.max_peer_queue; default: return config.max_system_queue; @@ -64,6 +65,7 @@ nano::block_processor::block_processor (nano::node & node_a) : switch (origin.source) { case nano::block_source::live: + case nano::block_source::live_originator: return config.priority_live; case nano::block_source::bootstrap: case nano::block_source::bootstrap_legacy: diff --git a/nano/node/blockprocessor.hpp b/nano/node/blockprocessor.hpp index 8561c635f0..1fa3b71f74 100644 --- a/nano/node/blockprocessor.hpp +++ b/nano/node/blockprocessor.hpp @@ -27,6 +27,7 @@ enum class block_source { unknown = 0, live, + live_originator, bootstrap, bootstrap_legacy, unchecked, @@ -67,10 +68,10 @@ class block_processor final class context { public: - context (std::shared_ptr block, block_source source); + context (std::shared_ptr block, nano::block_source source); std::shared_ptr const block; - block_source const source; + nano::block_source const source; std::chrono::steady_clock::time_point const arrival{ std::chrono::steady_clock::now () }; public: @@ -85,16 +86,16 @@ class block_processor final }; public: - block_processor (nano::node &); + explicit block_processor (nano::node &); ~block_processor (); void start (); void stop (); std::size_t size () const; - std::size_t size (block_source) const; - bool add (std::shared_ptr const &, block_source = block_source::live, std::shared_ptr const & channel = nullptr); - std::optional add_blocking (std::shared_ptr const & block, block_source); + std::size_t size (nano::block_source) const; + bool add (std::shared_ptr const &, nano::block_source = nano::block_source::live, std::shared_ptr const & channel = nullptr); + std::optional add_blocking (std::shared_ptr const & block, nano::block_source); void force (std::shared_ptr const &); bool should_log (); @@ -127,7 +128,7 @@ class block_processor final nano::node & node; private: - nano::fair_queue queue; + nano::fair_queue queue; std::chrono::steady_clock::time_point next_log; diff --git a/nano/node/message_processor.cpp b/nano/node/message_processor.cpp index 89902b43f4..a479b7bf7b 100644 --- a/nano/node/message_processor.cpp +++ b/nano/node/message_processor.cpp @@ -157,7 +157,6 @@ void nano::message_processor::run_batch (nano::unique_lock & lock) namespace { -// TODO: This was moved, so compare with latest develop before merging to avoid merge bugs class process_visitor : public nano::message_visitor { public: @@ -184,7 +183,9 @@ class process_visitor : public nano::message_visitor void publish (nano::publish const & message) override { - bool added = node.block_processor.add (message.block, nano::block_source::live, channel); + // Put blocks that are being initally broadcasted in a separate queue, so that they won't have to compete with rebroadcasted blocks + // Both queues have the same priority and size, so the potential for exploiting this is limited + bool added = node.block_processor.add (message.block, message.is_originator () ? nano::block_source::live_originator : nano::block_source::live, channel); if (!added) { node.network.publish_filter.clear (message.digest); diff --git a/nano/node/messages.cpp b/nano/node/messages.cpp index ac82d67b27..0143a62b2e 100644 --- a/nano/node/messages.cpp +++ b/nano/node/messages.cpp @@ -433,11 +433,12 @@ nano::publish::publish (bool & error_a, nano::stream & stream_a, nano::message_h } } -nano::publish::publish (nano::network_constants const & constants, std::shared_ptr const & block_a) : +nano::publish::publish (nano::network_constants const & constants, std::shared_ptr const & block_a, bool is_originator_a) : message (constants, nano::message_type::publish), block (block_a) { header.block_type_set (block->type ()); + header.flag_set (originator_flag, is_originator_a); } void nano::publish::serialize (nano::stream & stream_a) const @@ -465,11 +466,17 @@ bool nano::publish::operator== (nano::publish const & other_a) const return *block == *other_a.block; } +bool nano::publish::is_originator () const +{ + return header.flag_test (originator_flag); +} + void nano::publish::operator() (nano::object_stream & obs) const { nano::message::operator() (obs); // Write common data obs.write ("block", block); + obs.write ("originator", is_originator ()); } /* @@ -682,6 +689,7 @@ void nano::confirm_ack::operator() (nano::object_stream & obs) const nano::message::operator() (obs); // Write common data obs.write ("vote", vote); + obs.write ("rebroadcasted", is_rebroadcasted ()); } /* diff --git a/nano/node/messages.hpp b/nano/node/messages.hpp index 5046eae812..7cba524971 100644 --- a/nano/node/messages.hpp +++ b/nano/node/messages.hpp @@ -183,16 +183,23 @@ class keepalive final : public message * * Header extensions: * - [0x0f00] Block type: Identifies the specific type of the block. + * - [0x0004] Originator flag */ class publish final : public message { public: publish (bool &, nano::stream &, nano::message_header const &, nano::uint128_t const & = 0, nano::block_uniquer * = nullptr); - publish (nano::network_constants const & constants, std::shared_ptr const &); - void visit (nano::message_visitor &) const override; + publish (nano::network_constants const & constants, std::shared_ptr const &, bool is_originator = false); + void serialize (nano::stream &) const override; bool deserialize (nano::stream &, nano::block_uniquer * = nullptr); + void visit (nano::message_visitor &) const override; bool operator== (nano::publish const &) const; + + static uint8_t constexpr originator_flag = 2; // 0x0004 + bool is_originator () const; + +public: // Payload std::shared_ptr block; nano::uint128_t digest{ 0 }; diff --git a/nano/node/network.cpp b/nano/node/network.cpp index f741aa7f2f..1f4051017d 100644 --- a/nano/node/network.cpp +++ b/nano/node/network.cpp @@ -247,22 +247,22 @@ void nano::network::flood_keepalive_self (float const scale_a) flood_message (message, nano::transport::buffer_drop_policy::limiter, scale_a); } -void nano::network::flood_block (std::shared_ptr const & block_a, nano::transport::buffer_drop_policy const drop_policy_a) +void nano::network::flood_block (std::shared_ptr const & block, nano::transport::buffer_drop_policy const drop_policy) { - nano::publish message (node.network_params.network, block_a); - flood_message (message, drop_policy_a); + nano::publish message{ node.network_params.network, block }; + flood_message (message, drop_policy); } -void nano::network::flood_block_initial (std::shared_ptr const & block_a) +void nano::network::flood_block_initial (std::shared_ptr const & block) { - nano::publish message (node.network_params.network, block_a); - for (auto const & i : node.rep_crawler.principal_representatives ()) + nano::publish message{ node.network_params.network, block, /* is_originator */ true }; + for (auto const & rep : node.rep_crawler.principal_representatives ()) { - i.channel->send (message, nullptr, nano::transport::buffer_drop_policy::no_limiter_drop); + rep.channel->send (message, nullptr, nano::transport::buffer_drop_policy::no_limiter_drop); } - for (auto & i : list_non_pr (fanout (1.0))) + for (auto & peer : list_non_pr (fanout (1.0))) { - i->send (message, nullptr, nano::transport::buffer_drop_policy::no_limiter_drop); + peer->send (message, nullptr, nano::transport::buffer_drop_policy::no_limiter_drop); } }