Skip to content

Commit

Permalink
Local block broadcaster
Browse files Browse the repository at this point in the history
  • Loading branch information
pwojcikdev committed Mar 4, 2024
1 parent 162fee5 commit 9c77aee
Show file tree
Hide file tree
Showing 10 changed files with 279 additions and 84 deletions.
7 changes: 7 additions & 0 deletions nano/lib/stats_enums.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ enum class type : uint8_t
election_scheduler,
optimistic_scheduler,
handshake,
block_broadcaster,

bootstrap_ascending,
bootstrap_ascending_accounts,
Expand Down Expand Up @@ -323,6 +324,12 @@ enum class detail : uint8_t
deprioritize,
deprioritize_failed,

// block broadcaster
broadcast_normal,
broadcast_aggressive,
erase_old,
erase_confirmed,

_last // Must be the last enum
};

Expand Down
3 changes: 3 additions & 0 deletions nano/lib/thread_roles.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ std::string nano::thread_role::get_string (nano::thread_role::name role)
case nano::thread_role::name::scheduler_priority:
thread_role_name_string = "Sched Priority";
break;
case nano::thread_role::name::local_block_broadcasting:
thread_role_name_string = "Local broadcast";
break;
default:
debug_assert (false && "nano::thread_role::get_string unhandled thread role");
}
Expand Down
1 change: 1 addition & 0 deletions nano/lib/thread_roles.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ enum class name
scheduler_manual,
scheduler_optimistic,
scheduler_priority,
local_block_broadcasting,
};

/*
Expand Down
4 changes: 2 additions & 2 deletions nano/node/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ add_library(
backlog_population.cpp
bandwidth_limiter.hpp
bandwidth_limiter.cpp
block_broadcast.cpp
block_broadcast.hpp
block_broadcaster.cpp
block_broadcaster.hpp
blockprocessor.hpp
blockprocessor.cpp
bootstrap/block_deserializer.hpp
Expand Down
50 changes: 0 additions & 50 deletions nano/node/block_broadcast.cpp

This file was deleted.

28 changes: 0 additions & 28 deletions nano/node/block_broadcast.hpp

This file was deleted.

160 changes: 160 additions & 0 deletions nano/node/block_broadcaster.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
#include <nano/lib/threading.hpp>
#include <nano/lib/utility.hpp>
#include <nano/node/block_broadcaster.hpp>
#include <nano/node/blockprocessor.hpp>
#include <nano/node/network.hpp>
#include <nano/node/node.hpp>

nano::block_broadcaster::block_broadcaster (nano::node & node_a, nano::block_processor & block_processor_a, nano::network & network_a, nano::stats & stats_a, bool enabled_a) :
node{ node_a },
block_processor{ block_processor_a },
network{ network_a },
stats{ stats_a },
enabled{ enabled_a }
{
if (!enabled)
{
return;
}

block_processor.batch_processed.add ([this] (auto const & batch) {
bool should_notify = false;
for (auto const & [result, block, context] : batch)
{
// Only rebroadcast local blocks that were successfully processed (no forks or gaps)
if (result.code == nano::process_result::progress && context.source == nano::block_source::local)
{
nano::lock_guard<nano::mutex> guard{ mutex };
local_blocks.emplace_back (local_entry{ block, std::chrono::steady_clock::now () });
stats.inc (nano::stat::type::block_broadcaster, nano::stat::detail::insert);
should_notify = true;
}
}
if (should_notify)
{
condition.notify_all ();
}
});

block_processor.rolled_back.add ([this] (auto const & block) {
nano::lock_guard<nano::mutex> guard{ mutex };
auto erased = local_blocks.get<tag_hash> ().erase (block->hash ());
stats.add (nano::stat::type::block_broadcaster, nano::stat::detail::rollback, nano::stat::dir::in, erased);
});
}

nano::block_broadcaster::~block_broadcaster ()
{
// Thread must be stopped before destruction
debug_assert (!thread.joinable ());
}

void nano::block_broadcaster::start ()
{
if (!enabled)
{
return;
}

debug_assert (!thread.joinable ());

thread = std::thread{ [this] () {
nano::thread_role::set (nano::thread_role::name::local_block_broadcasting);
run ();
} };
}

void nano::block_broadcaster::stop ()
{
{
nano::lock_guard<nano::mutex> lock{ mutex };
stopped = true;
}
condition.notify_all ();
nano::join_or_pass (thread);
}

void nano::block_broadcaster::run ()
{
nano::unique_lock<nano::mutex> lock{ mutex };
while (!stopped)
{
stats.inc (nano::stat::type::block_broadcaster, nano::stat::detail::loop);

condition.wait_for (lock, check_interval);
debug_assert ((std::this_thread::yield (), true)); // Introduce some random delay in debug builds

if (!stopped)
{
cleanup ();
run_broadcasts (lock);
debug_assert (lock.owns_lock ());
}
}
}

void nano::block_broadcaster::run_broadcasts (nano::unique_lock<nano::mutex> & lock)
{
debug_assert (lock.owns_lock ());

std::vector<std::shared_ptr<nano::block>> to_broadcast;

auto const now = std::chrono::steady_clock::now ();
for (auto & entry : local_blocks)
{
if (elapsed (entry.last_broadcast, broadcast_interval, now))
{
entry.last_broadcast = now;
to_broadcast.push_back (entry.block);
}
}

lock.unlock ();

for (auto const & block : to_broadcast)
{
stats.inc (nano::stat::type::block_broadcaster, nano::stat::detail::broadcast, nano::stat::dir::out);
network.flood_block_initial (block);
}

lock.lock ();
}

void nano::block_broadcaster::cleanup ()
{
debug_assert (!mutex.try_lock ());

// Erase oldest blocks if the queue gets too big
while (local_blocks.size () > max_size)
{
stats.inc (nano::stat::type::block_broadcaster, nano::stat::detail::erase_oldest);
local_blocks.pop_front ();
}

// TODO: Mutex is held during IO, but it should be fine since it's not performance critical
auto transaction = node.store.tx_begin_read ();
erase_if (local_blocks, [this, &transaction] (auto const & entry) {
transaction.refresh_if_needed ();

if (entry.last_broadcast == std::chrono::steady_clock::time_point{})
{
// This block has never been broadcasted, keep it so it's broadcasted at least once
return false;
}
if (node.block_confirmed_or_being_confirmed (transaction, entry.block->hash ()))
{
stats.inc (nano::stat::type::block_broadcaster, nano::stat::detail::erase_confirmed);
return true;
}
return false;
});
}

std::unique_ptr<nano::container_info_component> nano::block_broadcaster::collect_container_info (const std::string & name) const
{
nano::lock_guard<nano::mutex> guard{ mutex };

auto composite = std::make_unique<container_info_composite> (name);
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "local", local_blocks.size (), sizeof (decltype (local_blocks)::value_type) }));
return composite;
}
100 changes: 100 additions & 0 deletions nano/node/block_broadcaster.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
#pragma once

#include <nano/lib/blocks.hpp>
#include <nano/lib/locks.hpp>
#include <nano/lib/processing_queue.hpp>
#include <nano/node/blockprocessor.hpp>
#include <nano/secure/common.hpp>

#include <boost/multi_index/hashed_index.hpp>
#include <boost/multi_index/mem_fun.hpp>
#include <boost/multi_index/ordered_index.hpp>
#include <boost/multi_index/sequenced_index.hpp>
#include <boost/multi_index_container.hpp>

#include <memory>
#include <thread>
#include <unordered_set>

namespace mi = boost::multi_index;

namespace nano
{
class node;
class network;
}

namespace nano
{
/**
* Broadcasts blocks to the network
* Tracks local blocks for more aggressive propagation
*/
class block_broadcaster
{
enum class broadcast_strategy
{
normal,
aggressive,
};

public:
block_broadcaster (nano::node &, nano::block_processor &, nano::network &, nano::stats &, bool enabled = false);
~block_broadcaster ();

void start ();
void stop ();

std::unique_ptr<container_info_component> collect_container_info (std::string const & name) const;

private:
void run ();
void run_broadcasts (nano::unique_lock<nano::mutex> &);
void cleanup ();

private: // Dependencies
nano::node & node;
nano::block_processor & block_processor;
nano::network & network;
nano::stats & stats;

private:
struct local_entry
{
std::shared_ptr<nano::block> const block;
std::chrono::steady_clock::time_point const arrival;
mutable std::chrono::steady_clock::time_point last_broadcast{}; // Not part of any index

nano::block_hash hash () const
{
return block->hash ();
}
};

// clang-format off
class tag_sequenced {};
class tag_hash {};

using ordered_locals = boost::multi_index_container<local_entry,
mi::indexed_by<
mi::sequenced<mi::tag<tag_sequenced>>,
mi::hashed_unique<mi::tag<tag_hash>,
mi::const_mem_fun<local_entry, nano::block_hash, &local_entry::hash>>
>>;
// clang-format on

ordered_locals local_blocks;

private:
bool enabled{ false };

std::atomic<bool> stopped{ false };
nano::condition_variable condition;
mutable nano::mutex mutex;
std::thread thread;

static std::size_t constexpr max_size{ 1024 * 8 };
static std::chrono::seconds constexpr check_interval{ 30 };
static std::chrono::seconds constexpr broadcast_interval{ 60 };
};
}
Loading

0 comments on commit 9c77aee

Please sign in to comment.