Skip to content

Commit

Permalink
Merge branch 'develop' into frontier-scan-5
Browse files Browse the repository at this point in the history
  • Loading branch information
pwojcikdev committed Oct 28, 2024
2 parents 1e15e8d + 104b74d commit 6e68af5
Show file tree
Hide file tree
Showing 15 changed files with 240 additions and 101 deletions.
1 change: 1 addition & 0 deletions nano/core_test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ add_executable(
node.cpp
numbers.cpp
object_stream.cpp
observer_set.cpp
optimistic_scheduler.cpp
processing_queue.cpp
processor_service.cpp
Expand Down
129 changes: 129 additions & 0 deletions nano/core_test/observer_set.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
#include <nano/lib/observer_set.hpp>
#include <nano/lib/timer.hpp>

#include <gtest/gtest.h>

#include <atomic>
#include <thread>

using namespace std::chrono_literals;

TEST (observer_set, notify_one)
{
nano::observer_set<int> set;
int value{ 0 };
set.add ([&value] (int v) {
value = v;
});
set.notify (1);
ASSERT_EQ (1, value);
}

TEST (observer_set, notify_multiple)
{
nano::observer_set<int> set;
int value{ 0 };
set.add ([&value] (int v) {
value = v;
});
set.add ([&value] (int v) {
value += v;
});
set.notify (1);
ASSERT_EQ (2, value);
}

TEST (observer_set, notify_empty)
{
nano::observer_set<int> set;
set.notify (1);
}

TEST (observer_set, notify_multiple_types)
{
nano::observer_set<int, std::string> set;
int value{ 0 };
std::string str;
set.add ([&value, &str] (int v, std::string s) {
value = v;
str = s;
});
set.notify (1, "test");
ASSERT_EQ (1, value);
ASSERT_EQ ("test", str);
}

TEST (observer_set, empty_params)
{
nano::observer_set<> set;
set.notify ();
}

// Make sure there are no TSAN warnings
TEST (observer_set, parallel_notify)
{
nano::observer_set<int> set;
std::atomic<int> value{ 0 };
set.add ([&value] (int v) {
std::this_thread::sleep_for (100ms);
value = v;
});
nano::timer timer{ nano::timer_state::started };
std::vector<std::thread> threads;
for (int i = 0; i < 10; ++i)
{
threads.emplace_back ([&set] {
set.notify (1);
});
}
for (auto & thread : threads)
{
thread.join ();
}
ASSERT_EQ (1, value);
// Notification should be done in parallel
ASSERT_LT (timer.since_start (), 300ms);
}

namespace
{
struct move_only
{
move_only () = default;
move_only (move_only &&) = default;
move_only & operator= (move_only &&) = default;
move_only (move_only const &) = delete;
move_only & operator= (move_only const &) = delete;
};

struct copy_throw
{
copy_throw () = default;
copy_throw (copy_throw &&) = default;
copy_throw & operator= (copy_throw &&) = default;
copy_throw (copy_throw const &)
{
throw std::runtime_error ("copy_throw");
}
copy_throw & operator= (copy_throw const &) = delete;
};
}

// Ensure that parameters are not unnecessarily copied, this should compile
TEST (observer_set, move_only)
{
nano::observer_set<move_only> set;
set.add ([] (move_only const &) {
});
move_only value;
set.notify (value);
}

TEST (observer_set, copy_throw)
{
nano::observer_set<copy_throw> set;
set.add ([] (copy_throw const &) {
});
copy_throw value;
ASSERT_NO_THROW (set.notify (value));
}
16 changes: 10 additions & 6 deletions nano/lib/observer_set.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,25 @@ template <typename... T>
class observer_set final
{
public:
void add (std::function<void (T...)> const & observer_a)
using observer_type = std::function<void (T const &...)>;

public:
void add (observer_type observer)
{
nano::lock_guard<nano::mutex> lock{ mutex };
observers.push_back (observer_a);
observers.push_back (observer);
}

void notify (T... args) const
void notify (T const &... args) const
{
// Make observers copy to allow parallel notifications
nano::unique_lock<nano::mutex> lock{ mutex };
auto observers_copy = observers;
lock.unlock ();

for (auto & i : observers_copy)
for (auto const & observer : observers_copy)
{
i (args...);
observer (args...);
}
}

Expand All @@ -53,7 +57,7 @@ class observer_set final

private:
mutable nano::mutex mutex{ mutex_identifier (mutexes::observer_set) };
std::vector<std::function<void (T...)>> observers;
std::vector<observer_type> observers;
};

}
3 changes: 3 additions & 0 deletions nano/lib/thread_roles.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ std::string nano::thread_role::get_string (nano::thread_role::name role)
case nano::thread_role::name::block_processing:
thread_role_name_string = "Blck processing";
break;
case nano::thread_role::name::block_processing_notifications:
thread_role_name_string = "Blck proc notif";
break;
case nano::thread_role::name::request_loop:
thread_role_name_string = "Request loop";
break;
Expand Down
1 change: 1 addition & 0 deletions nano/lib/thread_roles.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ enum class name
vote_processing,
vote_cache_processing,
block_processing,
block_processing_notifications,
request_loop,
wallet_actions,
bootstrap_initiator,
Expand Down
111 changes: 59 additions & 52 deletions nano/node/blockprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,36 +12,14 @@

#include <utility>

/*
* block_processor::context
*/

nano::block_processor::context::context (std::shared_ptr<nano::block> block, nano::block_source source_a, callback_t callback_a) :
block{ std::move (block) },
source{ source_a },
callback{ std::move (callback_a) }
{
debug_assert (source != nano::block_source::unknown);
}

auto nano::block_processor::context::get_future () -> std::future<result_t>
{
return promise.get_future ();
}

void nano::block_processor::context::set_result (result_t const & result)
{
promise.set_value (result);
}

/*
* block_processor
*/

nano::block_processor::block_processor (nano::node & node_a) :
config{ node_a.config.block_processor },
node (node_a),
next_log (std::chrono::steady_clock::now ())
node{ node_a },
workers{ 1, nano::thread_role::name::block_processing_notifications }
{
batch_processed.add ([this] (auto const & items) {
// For every batch item: notify the 'processed' observer.
Expand Down Expand Up @@ -84,12 +62,15 @@ nano::block_processor::~block_processor ()
{
// Thread must be stopped before destruction
debug_assert (!thread.joinable ());
debug_assert (!workers.alive ());
}

void nano::block_processor::start ()
{
debug_assert (!thread.joinable ());

workers.start ();

thread = std::thread ([this] () {
nano::thread_role::set (nano::thread_role::name::block_processing);
run ();
Expand All @@ -107,6 +88,7 @@ void nano::block_processor::stop ()
{
thread.join ();
}
workers.stop ();
}

// TODO: Remove and replace all checks with calls to size (block_source)
Expand Down Expand Up @@ -229,13 +211,24 @@ void nano::block_processor::rollback_competitor (secure::write_transaction const

void nano::block_processor::run ()
{
nano::interval log_interval;
nano::unique_lock<nano::mutex> lock{ mutex };
while (!stopped)
{
if (!queue.empty ())
{
// TODO: Cleaner periodical logging
if (should_log ())
// It's possible that ledger processing happens faster than the notifications can be processed by other components, cooldown here
while (workers.queued_tasks () >= config.max_queued_notifications)
{
node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::cooldown);
condition.wait_for (lock, 100ms, [this] { return stopped; });
if (stopped)
{
return;
}
}

if (log_interval.elapsed (15s))
{
node.logger.info (nano::log::type::blockprocessor, "{} blocks (+ {} forced) in processing queue",
queue.size (),
Expand All @@ -244,41 +237,32 @@ void nano::block_processor::run ()

auto processed = process_batch (lock);
debug_assert (!lock.owns_lock ());
lock.lock ();

// Set results for futures when not holding the lock
for (auto & [result, context] : processed)
{
if (context.callback)
// Queue notifications to be dispatched in the background
workers.post ([this, processed = std::move (processed)] () mutable {
node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::notify);
// Set results for futures when not holding the lock
for (auto & [result, context] : processed)
{
context.callback (result);
if (context.callback)
{
context.callback (result);
}
context.set_result (result);
}
context.set_result (result);
}

batch_processed.notify (processed);

lock.lock ();
batch_processed.notify (processed);
});
}
else
{
condition.notify_one ();
condition.wait (lock);
condition.wait (lock, [this] {
return stopped || !queue.empty ();
});
}
}
}

bool nano::block_processor::should_log ()
{
auto result (false);
auto now (std::chrono::steady_clock::now ());
if (next_log < now)
{
next_log = now + std::chrono::seconds (15);
result = true;
}
return result;
}

auto nano::block_processor::next () -> context
{
debug_assert (!mutex.try_lock ());
Expand Down Expand Up @@ -315,7 +299,7 @@ auto nano::block_processor::process_batch (nano::unique_lock<nano::mutex> & lock
debug_assert (!mutex.try_lock ());
debug_assert (!queue.empty ());

auto batch = next_batch (256);
auto batch = next_batch (config.batch_size);

lock.unlock ();

Expand Down Expand Up @@ -466,9 +450,32 @@ nano::container_info nano::block_processor::container_info () const
info.put ("blocks", queue.size ());
info.put ("forced", queue.size ({ nano::block_source::forced }));
info.add ("queue", queue.container_info ());
info.add ("workers", workers.container_info ());
return info;
}

/*
* block_processor::context
*/

nano::block_processor::context::context (std::shared_ptr<nano::block> block, nano::block_source source_a, callback_t callback_a) :
block{ std::move (block) },
source{ source_a },
callback{ std::move (callback_a) }
{
debug_assert (source != nano::block_source::unknown);
}

auto nano::block_processor::context::get_future () -> std::future<result_t>
{
return promise.get_future ();
}

void nano::block_processor::context::set_result (result_t const & result)
{
promise.set_value (result);
}

/*
* block_processor_config
*/
Expand Down
Loading

0 comments on commit 6e68af5

Please sign in to comment.