Skip to content

Commit

Permalink
Make block_broadcast logic cleaner and more robust
Browse files Browse the repository at this point in the history
  • Loading branch information
pwojcikdev committed Nov 11, 2023
1 parent 8cae757 commit e22434b
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 35 deletions.
51 changes: 31 additions & 20 deletions nano/node/block_broadcast.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,17 @@
#include <nano/node/blockprocessor.hpp>
#include <nano/node/network.hpp>

nano::block_broadcast::block_broadcast (nano::network & network, nano::block_arrival & block_arrival, bool enabled) :
nano::block_broadcast::block_broadcast (nano::block_processor & block_processor, nano::network & network, nano::block_arrival & block_arrival, bool enabled) :
block_processor{ block_processor },
network{ network },
block_arrival{ block_arrival },
enabled{ enabled }
{
}

void nano::block_broadcast::connect (nano::block_processor & block_processor)
{
if (!enabled)
{
return;
}

block_processor.processed.add ([this] (auto const & result, auto const & block) {
switch (result.code)
{
Expand All @@ -25,17 +23,14 @@ void nano::block_broadcast::connect (nano::block_processor & block_processor)
default:
break;
}
erase (block);
local.erase (block->hash ());
});
}

void nano::block_broadcast::observe (std::shared_ptr<nano::block> block)
void nano::block_broadcast::observe (std::shared_ptr<nano::block> const & block)
{
nano::unique_lock<nano::mutex> lock{ mutex };
auto existing = local.find (block);
auto local_l = existing != local.end ();
lock.unlock ();
if (local_l)
bool is_local = local.contains (block->hash ());
if (is_local)
{
// Block created on this node
// Perform more agressive initial flooding
Expand All @@ -56,22 +51,38 @@ void nano::block_broadcast::observe (std::shared_ptr<nano::block> block)
}
}

void nano::block_broadcast::set_local (std::shared_ptr<nano::block> block)
void nano::block_broadcast::track_local (nano::block_hash const & hash)
{
if (!enabled)
{
return;
}
nano::lock_guard<nano::mutex> lock{ mutex };
local.insert (block);
local.add (hash);
}

void nano::block_broadcast::erase (std::shared_ptr<nano::block> block)
/*
* hash_tracker
*/

void nano::block_broadcast::hash_tracker::add (nano::block_hash const & hash)
{
if (!enabled)
nano::lock_guard<nano::mutex> guard{ mutex };
hashes.emplace_back (hash);
while (hashes.size () > max_size)
{
return;
// Erase oldest hashes
hashes.pop_front ();
}
nano::lock_guard<nano::mutex> lock{ mutex };
local.erase (block);
}

void nano::block_broadcast::hash_tracker::erase (nano::block_hash const & hash)
{
nano::lock_guard<nano::mutex> guard{ mutex };
hashes.get<tag_hash> ().erase (hash);
}

bool nano::block_broadcast::hash_tracker::contains (nano::block_hash const & hash) const
{
nano::lock_guard<nano::mutex> guard{ mutex };
return hashes.get<tag_hash> ().find (hash) != hashes.get<tag_hash> ().end ();
}
75 changes: 63 additions & 12 deletions nano/node/block_broadcast.hpp
Original file line number Diff line number Diff line change
@@ -1,34 +1,85 @@
#pragma once

#include <nano/lib/blocks.hpp>
#include <nano/lib/locks.hpp>

#include <boost/multi_index/hashed_index.hpp>
#include <boost/multi_index/ordered_index.hpp>
#include <boost/multi_index_container.hpp>

#include <memory>
#include <thread>
#include <unordered_set>

namespace mi = boost::multi_index;

namespace nano
{
class block_arrival;
class block_processor;
class network;
// This class tracks blocks that originated from this node.
}

namespace nano
{
/**
* Broadcasts blocks to the network
* Tracks local blocks for more aggressive propagation
*/
class block_broadcast
{
public:
block_broadcast (nano::network & network, nano::block_arrival & block_arrival, bool enabled = false);
// Add batch_processed observer to block_processor if enabled
void connect (nano::block_processor & block_processor);
block_broadcast (nano::block_processor &, nano::network &, nano::block_arrival &, bool enabled = false);

// Mark a block as originating locally
void set_local (std::shared_ptr<nano::block> block);
void erase (std::shared_ptr<nano::block> block);
void track_local (nano::block_hash const &);

private: // Dependencies
nano::block_processor & block_processor;
nano::network & network;
nano::block_arrival & block_arrival;

private:
// Block_processor observer
void observe (std::shared_ptr<nano::block> block);
void observe (std::shared_ptr<nano::block> const & block);

nano::network & network;
nano::block_arrival & block_arrival;
std::unordered_set<std::shared_ptr<nano::block>> local; // Blocks originated on this node
nano::mutex mutex;
bool enabled;
void run ();

private:
class hash_tracker
{
public:
void add (nano::block_hash const &);
void erase (nano::block_hash const &);
bool contains (nano::block_hash const &) const;

private:
mutable nano::mutex mutex;

// clang-format off
class tag_sequenced {};
class tag_hash {};

using ordered_hashes = boost::multi_index_container<nano::block_hash,
mi::indexed_by<
mi::sequenced<mi::tag<tag_sequenced>>,
mi::hashed_unique<mi::tag<tag_hash>,
mi::identity<nano::block_hash>>
>>;
// clang-format on

// Blocks originated on this node
ordered_hashes hashes;

static std::size_t constexpr max_size = 1024 * 128;
};

hash_tracker local;

bool enabled{ false };
bool stopped{ false };
nano::condition_variable condition;
mutable nano::mutex mutex;
std::thread thread;
};
}
5 changes: 2 additions & 3 deletions nano/node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,11 +196,10 @@ nano::node::node (boost::asio::io_context & io_ctx_a, std::filesystem::path cons
epoch_upgrader{ *this, ledger, store, network_params, logger },
startup_time (std::chrono::steady_clock::now ()),
node_seq (seq),
block_broadcast{ network, block_arrival, !flags.disable_block_processor_republishing },
block_broadcast{ block_processor, network, block_arrival, !flags.disable_block_processor_republishing },
gap_tracker{ gap_cache },
process_live_dispatcher{ ledger, scheduler.priority, vote_cache, websocket }
{
block_broadcast.connect (block_processor);
gap_tracker.connect (block_processor);
process_live_dispatcher.connect (block_processor);

Expand Down Expand Up @@ -599,7 +598,7 @@ std::optional<nano::process_return> nano::node::process_local (std::shared_ptr<n
{
// Add block hash as recently arrived to trigger automatic rebroadcast and election
block_arrival.add (block_a->hash ());
block_broadcast.set_local (block_a);
block_broadcast.track_local (block_a->hash ());
return block_processor.add_blocking (block_a);
}

Expand Down

0 comments on commit e22434b

Please sign in to comment.