Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Blockprocessor context #4424

Merged
merged 13 commits into from
Feb 14, 2024
6 changes: 6 additions & 0 deletions nano/core_test/active_transactions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,12 @@ TEST (active_transactions, keep_local)
auto const send4 = wallet.send_action (nano::dev::genesis_key.pub, key4.pub, node.config.receive_minimum.number ());
auto const send5 = wallet.send_action (nano::dev::genesis_key.pub, key5.pub, node.config.receive_minimum.number ());
auto const send6 = wallet.send_action (nano::dev::genesis_key.pub, key6.pub, node.config.receive_minimum.number ());
ASSERT_NE (nullptr, send1);
ASSERT_NE (nullptr, send2);
ASSERT_NE (nullptr, send3);
ASSERT_NE (nullptr, send4);
ASSERT_NE (nullptr, send5);
ASSERT_NE (nullptr, send6);

// force-confirm blocks
for (auto const & block : { send1, send2, send3, send4, send5, send6 })
Expand Down
49 changes: 1 addition & 48 deletions nano/core_test/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2111,53 +2111,6 @@ TEST (node, block_confirm)
ASSERT_TIMELY_EQ (10s, node1.active.recently_cemented.list ().size (), 1);
}

TEST (node, block_arrival)
{
nano::test::system system (1);
auto & node (*system.nodes[0]);
ASSERT_EQ (0, node.block_arrival.arrival.size ());
nano::block_hash hash1 (1);
node.block_arrival.add (hash1);
ASSERT_EQ (1, node.block_arrival.arrival.size ());
node.block_arrival.add (hash1);
ASSERT_EQ (1, node.block_arrival.arrival.size ());
nano::block_hash hash2 (2);
node.block_arrival.add (hash2);
ASSERT_EQ (2, node.block_arrival.arrival.size ());
}

TEST (node, block_arrival_size)
{
nano::test::system system (1);
auto & node (*system.nodes[0]);
auto time (std::chrono::steady_clock::now () - nano::block_arrival::arrival_time_min - std::chrono::seconds (5));
nano::block_hash hash (0);
for (auto i (0); i < nano::block_arrival::arrival_size_min * 2; ++i)
{
node.block_arrival.arrival.push_back (nano::block_arrival_info{ time, hash });
++hash.qwords[0];
}
ASSERT_EQ (nano::block_arrival::arrival_size_min * 2, node.block_arrival.arrival.size ());
node.block_arrival.recent (0);
ASSERT_EQ (nano::block_arrival::arrival_size_min, node.block_arrival.arrival.size ());
}

TEST (node, block_arrival_time)
{
nano::test::system system (1);
auto & node (*system.nodes[0]);
auto time (std::chrono::steady_clock::now ());
nano::block_hash hash (0);
for (auto i (0); i < nano::block_arrival::arrival_size_min * 2; ++i)
{
node.block_arrival.arrival.push_back (nano::block_arrival_info{ time, hash });
++hash.qwords[0];
}
ASSERT_EQ (nano::block_arrival::arrival_size_min * 2, node.block_arrival.arrival.size ());
node.block_arrival.recent (0);
ASSERT_EQ (nano::block_arrival::arrival_size_min * 2, node.block_arrival.arrival.size ());
}

TEST (node, confirm_quorum)
{
nano::test::system system (1);
Expand Down Expand Up @@ -2958,7 +2911,7 @@ TEST (node, block_processor_reject_state)
send1->signature.bytes[0] ^= 1;
ASSERT_FALSE (node.ledger.block_or_pruned_exists (send1->hash ()));
node.process_active (send1);
ASSERT_TIMELY_EQ (5s, 1, node.stats.count (nano::stat::type::blockprocessor, nano::stat::detail::bad_signature));
ASSERT_TIMELY_EQ (5s, 1, node.stats.count (nano::stat::type::blockprocessor_result, nano::stat::detail::bad_signature));
ASSERT_FALSE (node.ledger.block_or_pruned_exists (send1->hash ()));
auto send2 = builder.make_block ()
.account (nano::dev::genesis_key.pub)
Expand Down
2 changes: 0 additions & 2 deletions nano/lib/locks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,6 @@ char const * nano::mutex_identifier (mutexes mutex)
{
case mutexes::active:
return "active";
case mutexes::block_arrival:
return "block_arrival";
case mutexes::block_processor:
return "block_processor";
case mutexes::block_uniquer:
Expand Down
1 change: 0 additions & 1 deletion nano/lib/locks.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ bool any_filters_registered ();
enum class mutexes
{
active,
block_arrival,
block_processor,
block_uniquer,
blockstore_cache,
Expand Down
16 changes: 16 additions & 0 deletions nano/lib/stats_enums.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ enum class type : uint8_t
vote_cache,
hinting,
blockprocessor,
blockprocessor_source,
blockprocessor_result,
bootstrap_server,
active,
active_started,
Expand Down Expand Up @@ -71,6 +73,7 @@ enum class detail : uint8_t
top,
none,
success,
unknown,

// processing queue
queue,
Expand Down Expand Up @@ -110,6 +113,19 @@ enum class detail : uint8_t
representative_mismatch,
block_position,

// blockprocessor
process_blocking,
process_blocking_timeout,
force,

// block source
live,
bootstrap,
bootstrap_legacy,
unchecked,
local,
forced,

// message specific
not_a_type,
invalid,
Expand Down
4 changes: 0 additions & 4 deletions nano/node/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,8 @@ add_library(
backlog_population.cpp
bandwidth_limiter.hpp
bandwidth_limiter.cpp
block_arrival.hpp
block_arrival.cpp
block_broadcast.cpp
block_broadcast.hpp
blocking_observer.cpp
blocking_observer.hpp
blockprocessor.hpp
blockprocessor.cpp
bootstrap/block_deserializer.hpp
Expand Down
4 changes: 2 additions & 2 deletions nano/node/active_transactions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ nano::active_transactions::active_transactions (nano::node & node_a, nano::confi
});

// Notify elections about alternative (forked) blocks
block_processor.processed.add ([this] (auto const & result, auto const & block) {
block_processor.block_processed.add ([this] (auto const & result, auto const & context) {
switch (result.code)
{
case nano::process_result::fork:
publish (block);
publish (context.block);
break;
default:
break;
Expand Down
35 changes: 0 additions & 35 deletions nano/node/block_arrival.cpp

This file was deleted.

49 changes: 0 additions & 49 deletions nano/node/block_arrival.hpp

This file was deleted.

40 changes: 7 additions & 33 deletions nano/node/block_broadcast.cpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
#include <nano/node/block_arrival.hpp>
#include <nano/node/block_broadcast.hpp>
#include <nano/node/blockprocessor.hpp>
#include <nano/node/network.hpp>

nano::block_broadcast::block_broadcast (nano::network & network, nano::block_arrival & block_arrival, bool enabled) :
nano::block_broadcast::block_broadcast (nano::network & network, bool enabled) :
network{ network },
block_arrival{ block_arrival },
enabled{ enabled }
{
}
Expand All @@ -16,34 +14,30 @@ void nano::block_broadcast::connect (nano::block_processor & block_processor)
{
return;
}
block_processor.processed.add ([this] (auto const & result, auto const & block) {
block_processor.block_processed.add ([this] (auto const & result, auto const & context) {
switch (result.code)
{
case nano::process_result::progress:
observe (block);
observe (context);
break;
default:
break;
}
erase (block);
});
}

void nano::block_broadcast::observe (std::shared_ptr<nano::block> block)
void nano::block_broadcast::observe (nano::block_processor::context const & context)
{
nano::unique_lock<nano::mutex> lock{ mutex };
auto existing = local.find (block);
auto local_l = existing != local.end ();
lock.unlock ();
if (local_l)
auto const & block = context.block;
if (context.source == nano::block_source::local)
{
// Block created on this node
// Perform more agressive initial flooding
network.flood_block_initial (block);
}
else
{
if (block_arrival.recent (block->hash ()))
if (context.source != nano::block_source::bootstrap && context.source != nano::block_source::bootstrap_legacy)
{
// Block arrived from realtime traffic, do normal gossip.
network.flood_block (block, nano::transport::buffer_drop_policy::limiter);
Expand All @@ -55,23 +49,3 @@ void nano::block_broadcast::observe (std::shared_ptr<nano::block> block)
}
}
}

void nano::block_broadcast::set_local (std::shared_ptr<nano::block> block)
{
if (!enabled)
{
return;
}
nano::lock_guard<nano::mutex> lock{ mutex };
local.insert (block);
}

void nano::block_broadcast::erase (std::shared_ptr<nano::block> block)
{
if (!enabled)
{
return;
}
nano::lock_guard<nano::mutex> lock{ mutex };
local.erase (block);
}
14 changes: 4 additions & 10 deletions nano/node/block_broadcast.hpp
Original file line number Diff line number Diff line change
@@ -1,34 +1,28 @@
#pragma once

#include <nano/lib/blocks.hpp>
#include <nano/node/blockprocessor.hpp>

#include <memory>
#include <unordered_set>

namespace nano
{
class block_arrival;
class block_processor;
class network;

// This class tracks blocks that originated from this node.
class block_broadcast
{
public:
block_broadcast (nano::network & network, nano::block_arrival & block_arrival, bool enabled = false);
block_broadcast (nano::network & network, bool enabled = false);
// Add batch_processed observer to block_processor if enabled
void connect (nano::block_processor & block_processor);
// Mark a block as originating locally
void set_local (std::shared_ptr<nano::block> block);
void erase (std::shared_ptr<nano::block> block);

private:
// Block_processor observer
void observe (std::shared_ptr<nano::block> block);
void observe (nano::block_processor::context const &);

nano::network & network;
nano::block_arrival & block_arrival;
std::unordered_set<std::shared_ptr<nano::block>> local; // Blocks originated on this node
nano::mutex mutex;
bool enabled;
};
}
Loading
Loading