Skip to content

Commit

Permalink
Offload block broadcasting to background worker
Browse files Browse the repository at this point in the history
  • Loading branch information
pwojcikdev committed Nov 11, 2023
1 parent e22434b commit 050cd76
Show file tree
Hide file tree
Showing 7 changed files with 89 additions and 17 deletions.
5 changes: 5 additions & 0 deletions nano/lib/stats_enums.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ enum class type : uint8_t
vote_cache,
hinting,
blockprocessor,
block_broadcaster,
bootstrap_server,
active,
active_started,
Expand Down Expand Up @@ -294,6 +295,10 @@ enum class detail : uint8_t
deprioritize,
deprioritize_failed,

// block_broadcaster
broadcast_normal,
broadcast_aggressive,

_last // Must be the last enum
};

Expand Down
3 changes: 3 additions & 0 deletions nano/lib/thread_roles.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ std::string nano::thread_role::get_string (nano::thread_role::name role)
case nano::thread_role::name::block_processing:
thread_role_name_string = "Blck processing";
break;
case nano::thread_role::name::block_broadcasting:
thread_role_name_string = "Blck broadcast";
break;
case nano::thread_role::name::request_loop:
thread_role_name_string = "Request loop";
break;
Expand Down
1 change: 1 addition & 0 deletions nano/lib/thread_roles.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ enum class name
packet_processing,
vote_processing,
block_processing,
block_broadcasting,
request_loop,
wallet_actions,
bootstrap_initiator,
Expand Down
1 change: 1 addition & 0 deletions nano/lib/threading.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <nano/lib/thread_roles.hpp>
#include <nano/lib/utility.hpp>

#include <boost/thread/thread.hpp>
Expand Down
62 changes: 55 additions & 7 deletions nano/node/block_broadcast.cpp
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
#include <nano/lib/threading.hpp>
#include <nano/lib/utility.hpp>
#include <nano/node/block_arrival.hpp>
#include <nano/node/block_broadcast.hpp>
#include <nano/node/blockprocessor.hpp>
#include <nano/node/network.hpp>

nano::block_broadcast::block_broadcast (nano::block_processor & block_processor, nano::network & network, nano::block_arrival & block_arrival, bool enabled) :
block_processor{ block_processor },
network{ network },
block_arrival{ block_arrival },
enabled{ enabled }
nano::block_broadcast::block_broadcast (nano::block_processor & block_processor_a, nano::network & network_a, nano::block_arrival & block_arrival_a, nano::stats & stats_a, bool enabled_a) :
block_processor{ block_processor_a },
network{ network_a },
block_arrival{ block_arrival_a },
stats{ stats_a },
enabled{ enabled_a },
queue{ stats_a, nano::stat::type::block_broadcaster, nano::thread_role::name::block_broadcasting, /* single thread */ 1, max_size }
{
if (!enabled)
{
Expand All @@ -25,6 +29,28 @@ nano::block_broadcast::block_broadcast (nano::block_processor & block_processor,
}
local.erase (block->hash ());
});

queue.process_batch = [this] (auto & batch) {
process_batch (batch);
};
}

nano::block_broadcast::~block_broadcast ()
{
}

void nano::block_broadcast::start ()
{
if (!enabled)
{
return;
}
queue.start ();
}

void nano::block_broadcast::stop ()
{
queue.stop ();
}

void nano::block_broadcast::observe (std::shared_ptr<nano::block> const & block)
Expand All @@ -34,14 +60,14 @@ void nano::block_broadcast::observe (std::shared_ptr<nano::block> const & block)
{
// Block created on this node
// Perform more agressive initial flooding
network.flood_block_initial (block);
queue.add (entry{ block, broadcast_strategy::aggressive });
}
else
{
if (block_arrival.recent (block->hash ()))
{
// Block arrived from realtime traffic, do normal gossip.
network.flood_block (block, nano::transport::buffer_drop_policy::limiter);
queue.add (entry{ block, broadcast_strategy::normal });
}
else
{
Expand All @@ -60,6 +86,28 @@ void nano::block_broadcast::track_local (nano::block_hash const & hash)
local.add (hash);
}

void nano::block_broadcast::process_batch (queue_t::batch_t & batch)
{
for (auto & [block, strategy] : batch)
{
switch (strategy)
{
case broadcast_strategy::normal:
{
stats.inc (nano::stat::type::block_broadcaster, nano::stat::detail::broadcast_normal, nano::stat::dir::out);
network.flood_block (block, nano::transport::buffer_drop_policy::limiter);
}
break;
case broadcast_strategy::aggressive:
{
stats.inc (nano::stat::type::block_broadcaster, nano::stat::detail::broadcast_aggressive, nano::stat::dir::out);
network.flood_block_initial (block);
}
break;
}
}
}

/*
* hash_tracker
*/
Expand Down
30 changes: 21 additions & 9 deletions nano/node/block_broadcast.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <nano/lib/blocks.hpp>
#include <nano/lib/locks.hpp>
#include <nano/lib/processing_queue.hpp>

#include <boost/multi_index/hashed_index.hpp>
#include <boost/multi_index/ordered_index.hpp>
Expand All @@ -28,8 +29,21 @@ namespace nano
*/
class block_broadcast
{
enum class broadcast_strategy
{
normal,
aggressive,
};

using entry = std::pair<std::shared_ptr<nano::block>, broadcast_strategy>;
using queue_t = nano::processing_queue<entry>;

public:
block_broadcast (nano::block_processor &, nano::network &, nano::block_arrival &, bool enabled = false);
block_broadcast (nano::block_processor &, nano::network &, nano::block_arrival &, nano::stats &, bool enabled = false);
~block_broadcast ();

void start ();
void stop ();

// Mark a block as originating locally
void track_local (nano::block_hash const &);
Expand All @@ -38,13 +52,13 @@ class block_broadcast
nano::block_processor & block_processor;
nano::network & network;
nano::block_arrival & block_arrival;
nano::stats & stats;

private:
// Block_processor observer
void observe (std::shared_ptr<nano::block> const & block);

void run ();

void process_batch (queue_t::batch_t & batch);

private:
class hash_tracker
{
Expand Down Expand Up @@ -73,13 +87,11 @@ class block_broadcast

static std::size_t constexpr max_size = 1024 * 128;
};

hash_tracker local;

bool enabled{ false };
bool stopped{ false };
nano::condition_variable condition;
mutable nano::mutex mutex;
std::thread thread;
queue_t queue;

static std::size_t constexpr max_size = 1024 * 32;
};
}
4 changes: 3 additions & 1 deletion nano/node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ nano::node::node (boost::asio::io_context & io_ctx_a, std::filesystem::path cons
epoch_upgrader{ *this, ledger, store, network_params, logger },
startup_time (std::chrono::steady_clock::now ()),
node_seq (seq),
block_broadcast{ block_processor, network, block_arrival, !flags.disable_block_processor_republishing },
block_broadcast{ block_processor, network, block_arrival, stats, !flags.disable_block_processor_republishing },
gap_tracker{ gap_cache },
process_live_dispatcher{ ledger, scheduler.priority, vote_cache, websocket }
{
Expand Down Expand Up @@ -689,6 +689,7 @@ void nano::node::start ()
}
websocket.start ();
telemetry.start ();
block_broadcast.start ();
}

void nano::node::stop ()
Expand All @@ -711,6 +712,7 @@ void nano::node::stop ()
}
unchecked.stop ();
block_processor.stop ();
block_broadcast.stop ();
aggregator.stop ();
vote_processor.stop ();
scheduler.stop ();
Expand Down

0 comments on commit 050cd76

Please sign in to comment.